Learning Kafka: Kafka Connect + Wikipedia

I’ve been developing backend software for almost 10 years already and never had a chance to work closely with Apache Kafka. After a couple of technical interviews recently I’ve realized that it’s a significant gap in my experience. So, I’ve decided to learn it by playing with publicly available Wikipedia’s recent changes event stream. Join me in this article, where I’ll be developing a Kafka Connect application listening for the latest Wikipedia edits and storing them in a Kafka topic! I’ll develop a Kafka Streams application processing this topic in one of the future articles, so stay tuned.

Wikipedia’s recent changes stream

I’ve been looking for a public event stream with a reasonable load to learn Kafka and found out that Wikipedia publishes all its recent changes on a special page: Special:RecentChanges. It seems to be a perfect event source!

A few clicks (1234) and I found a developer-friendly API. It appears to be a very simple SSE endpoint. All you have to do is to connect to https://stream.wikimedia.org/v2/stream/recentchange and listen for events. There is an OpenAPI specification available at stream.wikimedia.org and the payload schema is documented on GitHub.

If I were writing a real production application, I would use a Gradle jsonschema2pojo plugin to convert the schema into Java classes at build time. However, let me just use its web version today and generate models manually. Four Java classes with Jackson and JSR 303 annotations were generated: Length, Meta, RecentChange, Revision, and I’ve just placed them in src/main/java with no changes.

SSE & Kotlin

As a backend developer, I have some experience in developing SSE APIs with Ktor, but I’ve never actually made SSE clients. Fortunately, there is an easy-to-use Kotlin SSE client library on GitHub.

Let’s get our hands dirty and write a simple console application to check the stream using RxSSE:

fun main() {
    val rxsse = RxSSE()
    val mapper = ObjectMapper()

    rxsse
            .connectTo("https://stream.wikimedia.org/v2/stream/recentchange")
            .flatMap {
                try {
                    Flowable.just(mapper.readValue(it.data, RecentChange::class.java))
                } catch (_: Exception) {
                    Flowable.empty<RecentChange>()
                }
            }
            .forEach { println(it) }
}

Probably, not the code I should be proud of, but it works! Here we just connect to Wikipedia’s recent changes stream, try to parse the events, and print them. Run this code and see the latest modifications in real-time!

At this point, I’ve noticed a problem with those auto-generated classes. Field log_params was defined a bit too… chaotically and Jackson sometimes fails to deserialize it:

log_params:
  description: Property only exists if event has rc_params.
  type:
    - array
    - object
    - string
  additionalProperties: true

I’ve just removed the logParams property from the generated Java source, so Jackson will not try to deserialize it.

Kafka Connect

If you are new to Kafka Connect, I would recommend reading the comprehensive Confluent guide about Kafka Connect. It covers all the topics: concepts and architecture of Kafka Connect; developing connectors; deployment, administration, and monitoring of Kafka Connect applications; security considerations. It’s awesome.

Shorter but still sufficient Kafka Connect guide could be found in the official Kafka documentation.

Finally, if you’re really scarce of time and don’t enjoy theory — read the 7th chapter (Building Data Pipelines) of freely available "Kafka: The Definitive Guide" book. Brought to you by, again, Confluent. They should probably pay me something for the ad, but without any jokes the book is great.

After grasping some basics, you’ll realize, that the task of writing a Kafka Connect application for Wikipedia’s latest changes ingestion, facing us, is as simple as implementing only two classes: SourceConnector and SourceTask!

Let’s start with a…

SourceConnector

A Connector is kind of a plugin for Kafka Connect. Connectors manage the integration of Kafka Connect with other systems, either as an input that ingests data (SourceConnector) into Kafka or an output that passes data to an external system (SinkConnector). Connectors are responsible for creating configurations for a set of Tasks that do the actual data processing. The number of tasks may vary and it’s a connector’s responsibility to decide how many of them are required to perform the job. A good example, mentioned everywhere, is an RDBMS source connector: whenever it detects a new table (or a dropped table) it asks the framework to spawn (or kill) a task to process that table.

In our case, no reconfiguration is needed and a single task is enough.

Let’s define a configuration for out connector:

const val VERSION = "1.0.0"
const val TOPIC_CONFIG = "topic"
const val STREAM_URL_CONFIG = "streamUrl"

private val config: ConfigDef = ConfigDef()
    .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "The topic to publish data to")
    .define(STREAM_URL_CONFIG, ConfigDef.Type.STRING, "https://stream.wikimedia.org/v2/stream/recentchange", ConfigDef.Importance.MEDIUM, "MediaWiki's RecentChanges event stream URL")

This ConfigDef simply states that our connector has two string configuration parameters: topic and streamUrl. topic defines a Kafka topic to publish data to. It has high importance and no default values, thus it is required. streamUrl is a URL for MediaWiki’s recent changes event stream. It has a default value and moderate importance: if you don’t provide any, the default one will be used, but you could set it to other value if you want (e.g. for testing).

This configuration definition is declared in our WikipediaSourceConnector's companion object, along with the connector version and logger.

With these values defined, we can easily implement version and config methods:

override fun version(): String = VERSION

override fun config(): ConfigDef = config

taskClass is trivial as well:

override fun taskClass(): Class<out Task> = WikipediaSourceTask::class.java

We’ll impmelement the WikipediaSourceTask later.

When the framework instantiates a connector, it calls its start method. This method is used to prepare the connector: parse the provided configuration, allocate required resources, start monitoring the source for any changes. Our connector is ordinary. It doesn’t need any resources, it doesn’t allocate anything or start any background threads. All we need to do is to parse the configuration into local properties to use them later:

override fun start(props: Map<String, String>) {
    val parsedConfig = AbstractConfig(config, props)

    topic = parsedConfig.getString(TOPIC_CONFIG)?.takeUnless { it.isBlank() } ?: throw ConfigException("Topic must be set")
    streamUrl = parsedConfig.getString(STREAM_URL_CONFIG)?.takeUnless { it.isBlank() } ?: throw ConfigException("SSE URL must be set")

    logger.info("Started WikipediaSourceConnector with topic '{}'", topic)
}

As long as we don’t allocate anything or start any threads, the stop method is a no-op:

override fun stop() {
    logger.info("Stopped WikipediaSourceConnector with topic '{}'", topic)
}

Finally, taskConfigs. Remember, that we need only a single task, so the result of this method should have only one config for that task. The maxTasks parameter could be ignored. Here we simply pass the properties down to the task:

override fun taskConfigs(maxTasks: Int): List<Map<String, String>> {
    if (maxTasks != 1) {
        logger.info("Ignoring maxTasks={}", maxTasks)
    }

    return listOf(
            mapOf(
                    TOPIC_CONFIG to topic,
                    STREAM_URL_CONFIG to streamUrl
            )
    )
}

We’re done! You can find the full source at my GitLab.

Now it’s time for a…

SourceTask

I will not repeat the documentation and tell you about tasks and workers and all those things. Just remember: Tasks contain the code that actually copies data to or from another system. SourceTasks do that by implementing a poll method, which will be called in a loop by the framework. Task’s start method is called at the beginning of its lifecycle, and stop method is called, you’ve guessed it, at the end. There is also a version method, but it is trivial.

Let’s start our task:

override fun start(props: Map<String, String>) {
    // (1)
    topic = props[WikipediaSourceConnector.TOPIC_CONFIG] ?: throw ConfigException("Topic must be set")
    streamUrl = props[WikipediaSourceConnector.STREAM_URL_CONFIG] ?: throw ConfigException("SSE URL must be set")

    // (2)
    recentChanges = RxSSE()
            .connectTo(streamUrl)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(::sse2model) // (3)
            .observeOn(Schedulers.computation())
            .doOnNext { logger.debug("Next SSE: {}", it?.meta?.id) }
            .onBackpressureDrop { logger.warn("Dropping SSE: {}", it?.meta?.id) } // (4)
            .retry() // (5)
            .observeOn(Schedulers.io())
            .subscribeWith(recentChangesSubscriber) // (6)

    logger.info("Started WikipediaSourceTask with topic '{}'", topic)
}

A lot of things happen here!

  1. I’m extracting the configuration parameters from the provided properties. These are the same parameters returned from the connector’s taskConfigs.

  2. I’m creating a disposable subscription to Wikipedia’s recent changes event stream, just like we did at the beginning of this article. Although we have more sophisticated processing here. The subscription is stored into a local variable for a later reference in the stop method.

  3. This flatMap simply parses JSON payloads into Java classes.

  4. In case of events incoming at a higher rate, that we’re able to process, they should be dropped. More on that below.

  5. I’m declaring that the stream should restart itself in case of any errors.

  6. A subscriber is added to a fully defined stream.

Now, let’s get distracted a little bit of Kafka Connect and talk about combining push-based SSE stream and pull-based SourceTask's behavior and backpressure.

Wikipedia generates events independently of our tasks. We do not own its event stream and we cannot pause it or ask it for a specific number of new events only when we’re ready to accept them. Events just happen and we need to deal with them. This event stream follows the push model.

On the other side, we have a poll method that is invoked by Kafka Connect when it’s ready to accept new records. Upon the invocation, poll may return a list of records or block if there are no new records. It may also return null (I guess an empty list will work too) to yield the thread periodically. Kafka Connect is calling this method in a loop, and this call follows the pull model.

And here we have a problem of combining a push-based SSE stream nature with a pull-based framework.

Imagine a chameleon and a swarm of flies around it. Flies fly in and fly out and the chameleon periodically fires its tongue to catch a fly.

We’ll use a SynchronousQueue as a "chameleon’s tongue" between the SSE stream and Kafka Connect. Every time there is a new event in the SSE stream we put it to the queue. This call blocks until the take method is called at the other side. take is called in the task’s poll. Actually, I’ve replaced take with its time-limited poll counterpart (another poll, no puns intended here) to yield the thread periodically.

That’s not probably how real chameleons work, but, you know, the software was always only an approximation of the real world.

Now you may ask: what happens when a fresh event arrives and we’re still not yet finished with the previous one. And this is the problem of backpressure.

Our SSE stream is a Flowable and it supports backpressure out-of-the-box: it allows subscribers to signal a demand for new events via request method. But in our case the events arrive from the source that doesn’t support backpressure: Wikipedia’s event stream is not pausable, events just happen, we don’t have control over them and they may arrive faster then we call the request.

Basically, we could either buffer or drop extra events. Buffering only helps to mitigate bursts of events, i.e. when events generally arrive at a slower than the processing rate. It won’t help when the event rate exceeds the processing speed. In this case, you need to either scale your app or drop the exceeding events.

I decided to drop the events for simplicity as this is not a production connector and that’s what onBackpressureDrop means in the stream definition above. I want to note that even this naive implementation doesn’t actually drop any events with a single task running on my laptop along with a three-node Kafka cluster.

Back to our WikipediaSourceTask, let’s look at the recentChangesSubscriber. This is a final destination of the SSE stream:

private val rendezvous = SynchronousQueue<RecentChange>()

private val recentChangesSubscriber = object : DisposableSubscriber<RecentChange>() {
    override fun onStart() {
        request(1)
    }

    override fun onNext(t: RecentChange?) {
        t?.let {
            rendezvous.put(it)
        }
        request(1)
    }

    override fun onComplete() {
    }

    override fun onError(t: Throwable?) {
        logger.error("Error in SSE stream", t)
    }
}

Now, the poll method, the other side of the rendezvous queue. The implementation is conceptually straightforward: take the event from the queue and turn it into a SourceRecord.

override fun poll(): List<SourceRecord> {
    return rendezvous.poll(1, TimeUnit.SECONDS)?.let { event ->
        val record = SourceRecord(
                /* sourcePartition */ mapOf("domain" to event.meta?.domain),
                /* sourceOffset */ mapOf("dt" to event.meta?.dt?.time),
                /* topic */ topic,
                /* partition */ null,
                /* keySchema */ Schema.STRING_SCHEMA,
                /* key */ event.meta?.id ?: "",
                /* valueSchema */ Schema.BYTES_SCHEMA,
                /* value */ mapper.writeValueAsBytes(event)
        )

        logger.debug("Producing a record: {}", record)

        listOf(
                record
        )
    } ?: emptyList()
}

Points of interest in this code are sourcePartition, sourceOffset, partition, and schemas.

sourcePartition and sourceOffset are dictionaries of strings to primitives with arbitrary content. Kafka Connect periodically commits these values to internal topics. Whenever your task is restarted, you could access the latest committed offset for a given partition from the task’s context and proceed from that position. I don’t actually use this feature, but it may be very helpful in other connectors.

partition, set to null, just means that the record should be published in no specific partition of a target topic. This behaviour is on par with Kafka’s ProducerRecord.

Schemas specify types for keys and values: keys are strings and values are raw bytes.

Finally, our task’s stop method just disposes the subscriber:

override fun stop() {
    recentChanges.dispose()

    logger.info("Stopped WikipediaSourceTask with topic '{}'", topic)
}

Again, the full source resides in my GitLab.

Packaging & Deployment

We’re done with the code, let’s finally deploy it!

Kafka Connect searches for available connectors in its plugin.path configured in connect-distributed.properties or connect-standalone.properties. Connectors can be packaged either into usual JAR files and placed in the plugin.path along with their dependencies, or into fat JARs with all the dependencies packed inside.

I like fat JARs and Gradle Shadow Plugin helps me build them. The only gotcha to be aware of is that a connector should never contain any libraries provided by the Kafka Connect runtime. So, put that provided dependencies into the shadow configuration, provided by the plugin.

shadowJar task assembles a fat JAR in the project’s build directory. I’ve just added it to the Kafka Connect’s plugin.path, so whenever I change something I just reassemble the JAR and restart the framework. This trick significantly accelerates the development.

I decided to play with Kafka Connect in the distributed mode, but things should be the same in a standalone mode. Let’s start the framework:

./bin/connect-distributed.sh config/connect-distributed.properties

I almost forgot to mention, that you should start Kafka and create a topic for our stream!

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 9 --replication-factor 3 --config min.insync.replicas=2 --topic wiki

Kafka Connect doesn’t have any GUI or CLI for connector deployment by default and all the interactions are made via its REST API. To deploy a connector one should POST a specifically crafted JSON to the /connectors endpoint:

curl -X POST -d @connect-config.json http://localhost:8083/connectors --header "Content-Type:application/json"

The content of the connect-config.json (filename is arbitrary) is simple:

{
  "name": "wiki", // (1)

  "config": {
    "connector.class": "me.madhead.playgrounds.kafka.connect.WikipediaSourceConnector", // (2)

    "topic": "wiki", // (3)

    // (4)
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
  1. Connector’s name.

  2. FQDN of the connector class.

  3. Connector-specific configuration. Remember we need a topic to publish events to?

  4. Classes to be used for keys and values serialization. Kafka Connect uses JSON converters by default (configured in connect-distributed.properties or connect-standalone.properties). I decided to try simpler conversions: raw strings for keys and bytes for values. This configuration matches schemas provided to the SourceRecord constructor above.

After running the cURL command a new connector named "wiki" should appear at http://localhost:8083/connectors. Open http://localhost:8083/connectors/wiki to check its details.

Finally, when everything is running, all that is left is to check the topic:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic wiki

a455e93a-f921-4944-b32f-496e5dcb740c    {"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://nl.wikipedia.org/wiki/Cl%C3%A9ment_Lenglet","request_id":"aee180ab-a61b-42a4-979b-6cc260142c0c","id":"a455e93a-f921-4944-b32f-496e5dcb740c","dt":1592339206000,"domain":"nl.wikipedia.org","stream":"mediawiki.recentchange","partition":0,"offset":2492455062,"topic":"eqiad.mediawiki.recentchange"},"id":112930853,"type":"edit","title":"Clément Lenglet","namespace":0,"comment":"","parsedcomment":"","timestamp":1592339206,"user":"2A02:A446:99EF:1:CC20:4CAE:8399:76BB","bot":false,"server_url":"https://nl.wikipedia.org","server_name":"nl.wikipedia.org","server_script_path":"/w","wiki":"nlwiki","minor":false,"patrolled":false,"length":{"old":6153,"new":6153},"revision":{"new":56562546,"old":56562537}}
…

Nice!

Let’s do a…

Recap

We’ve developed a simple Wikipedia connector for Kafka Connect in this article. It’s not perfect, but it could be a good starting point for something more serious. All the code for this article could be found in my GitLab profile, enjoy! I hope you are not afraid of Kafka Connect anymore: a basic connector is just two classes and ten methods big.

We’ve also learned a little bit about SSE (server-sent events), RxJava, flowables, subscriptions, backpressure, and combining push and pull models.

And chameleons!

I’ll be writing another article about Kafka Streams soon, and till then…