How to implement Batch Processing with Apache Kafka

Batch Processing using the advantages of streaming technologies

How to implement Batch Processing with Apache Kafka

Batch processing will not disappear from enterprises overnight. Read this article to learn how you can still take advantage of all the benefits of data streaming and combine it with batch processing by using Apache Kafka.

Data streaming is everywhere. If you haven’t heard of data streaming and you work in a data-driven company, which almost all modern businesses are today, it’s about time to start learning about the many streaming technologies and their benefits. Data is the new oil and the sooner data is available for analysis, the better your company can respond. Data streaming also offers several technical advantages. Among them are, for example, the complete decoupling of systems, data producers and data consumers, the easy integration of additional systems to an existing data stream and the resulting higher scalability and reliability.

Apache Kafka is one of the best-known proponents of streaming technologies and is experiencing a huge upward trend. The company behind Apache Kafka, Confluent Inc, has seen significant growth of up to 450% percent several years in a row and is now valued at $5B.

Now, existing companies cannot be transformed into real-time, data-driven enterprises overnight. Entire business processes have to be rethought and nightly batch processing has to be completely replaced piece by piece. Needless to say, this is a highly complex undertaking, as these processes are usually linked to countless systems, which in turn were designed for batch processing. However, changing the business processes is no less complex and requires motivated and tireless change management.

In addition, there are also natural business processes that run as batch processing in the real world, for example, the stock market close.

Whatever applies to your business, batch processing won’t disappear overnight. In this article, I’ll show how you can still leverage the benefits of data streaming and traditional batch processing together, using Apache Kafka as an example.

Before we start, I would like to mention that it is also worth looking at other streaming technologies, such as Amazon Kinesis Data Streams. All the approaches discussed in this article can be easily transferred with minor adjustments.

The differences between batches and streams

The transition from batch processing to data streaming is trivial at first glance: batches consist of individual data sets and data streaming is about streaming just individual data sets. The difficulty is that Data Streaming assumes an endless stream of data and a value is usually created from the individual records. In contrast, in batch processing the value is created only after complete processing of all related data.

The business process based on this relies on the related data records being available as a whole; the completeness of the data records is therefore of particular importance and a fundamental criterion. Other criteria may include, for example, respecting the order of the data records and avoiding duplicates. Furthermore, it is often desirable that a batch is processed only once. To meet these requirements, one can cleverly take advantage of some features of Apache Kafka.

How Apache Kafka works

Let’s start with Topics – this is what the individual data pots are called in Apache Kafka. Topics are in turn divided into partitions, which contain the actual data. The number of partitions determines the number of data streams that can be processed in parallel. Apache Kafka assigns incoming data to one of these partitions based on a configurable strategy. The assignment is done by the key of the dataset by default configuration.

The number of partitions can be freely defined. Partitions can be used to link several identical consumers (i.e., consumers of a consumer group) to a topic, thus greatly parallelizing data processing: Data records with the same key are always forwarded to the same consumer that has already processed similar data records based on the key.

However, care must be taken here: keeping a state on this basis within the consumer is not advisable, since partitions can be reassigned. This is the case, for example, when a consumer of a consumer group temporarily fails. In such a case, partitions are reallocated among the remaining consumers in the consumer group as part of a rebalancing process.

Within a partition, Apache Kafka guarantees that the order of records is maintained: thus when a producer sends the contents of a batch to a partition in Apache Kafka, the individual records within the partition maintain their order.

Once a consumer has consumed one or more records, it can acknowledge* the successful processing of that record. Thus, Apache Kafka “remembers” which records have already been delivered and does not resend them – even if the consumer temporarily leaves and another consumer steps in.

* To be precise, an internal topic is used in Apache Kafka for consumer offsets. So if the consumer uses the functionality provided via the standard consumer API to store the progress of the processing via Apache Kafka, this position is added to this topic by the consumer and read again on restart. It is of course also possible to store the consumer’s position in any other medium (e.g., a database).

Topic naming and partition selection

We start with the topics. In practice, reverse domain name notation (reverse DNS) has proven helpful for topic naming. If domain-driven design is used in your company, it makes sense to include both the domain and the business name of the data in the name of the topic:

com.xeotek.risk.loans

In order to communicate to other departments and applications whether this is an interface across the domain boundary that can be used outside the domain, this can also be added. Thus, our previous example becomes:

com.xeotek.public.risk.loans.data

or

com.xeotek.private.risk.loans.data

We refer to this topic as our data-bearing topic: the actual records of our batch are in this topic. The information about the batch itself, whether the associated records are ready and where they are located, we publish in a second Topic, the Event Topic:

com.xeotek.private.risk.loans.event

The desired degree of parallelization can be controlled by the number of partitions of the Event Topic.

We have published an article “Topic naming conventions” that outlines our suggestions and experiences with topic naming. To help with choosing the number of partitions, we have written this article: “How many partitions do I need?”.

The structure of the data

After the topics have been created, we start to design the data sets. Before we begin, let’s take another look at the requirements that will be imposed on our application:  

  1. a batch must be complete in order to start processing.
  2. a batch should not be processed twice.
  3. the order of the records must be preserved.
  4. duplicates must not occur in the batch to be processed.

Let’s start with the first requirement that a batch must always be completely available. To ensure this, we need to know when a complete batch has been transferred before we start processing it. Therefore we need  

  • the knowledge to which batch a data set belongs, and
  • how many data sets this batch contains in total.
Fig. 1: Representation of the relation of the Event-Data pairs in the Data and Event Topic.

To be able to filter duplicate data records, the data records in the batch are numbered consecutively.  A unique identifier of the batch is suitable as a key for the individual records of a batch. A UUID (Universally Unique Identifier) makes no sense if we generate it in the producer. In the event of an error, the producer would provide an already processed batch with new UUIDs, making it no longer possible to check which batch has already been published. It is therefore better to choose an identifier that remains stable even if the batch is processed again and can be associated with the original data of the source, even if the producer is restarted several times.

Table 1 shows the structure of a data set for the data-bearing topic.

Attribute Description
eventId Unique identifier – identical to the associated event, for example,

20201201-end-of-day.
id Unique identifier of the record within the batch.
offset Position of the record within the batch.
payload The payload of the record.

In order for our consumer to know that the complete batch, i.e. all records of the batch, are in the data-bearing Topic, and most importantly, where they are, i.e. in which partition, an associated record is published in the Event Topic.

Table 2 shows the structure of the dataset for the Event Topic.

Attribute Description
eventId Unique identifier – identical to the associated event, for example, 20201201-end-of-day.
capacity Number of records contained in the batch.
sourceDescriptor Contains all the information to locate a batch associated with the event in Apache Kafka.
topic: The name of the topic in which the associated data delivery can be found. partition: The partition within the topic in which the associated data delivery can be found. offset: The offset of the first record of the associated data delivery within the topic partition.
description A meaningful description for traceability, e.g. EOB import of the file 20201201.csv

The structure of the producer

The name of the batch, which is the key of our records, as well as the numbering of the individual records depends on the source, i.e. the original data, which we want to transfer as a batch, e.g. line numbers, in case of a CSV file.

The batch should get the same identifier, also if a new producer is created and starts working in case of an error. Thus, we use the identifier to identify batches that have already been processed and prevent republishing in Apache Kafka. In addition, it greatly simplifies tracking in operations if it is clear at a glance what the original data actually is.

The batch’s numbered records are published to the data-bearing Data Topic as part of a transaction. The position of the first data record as well as the partition is published in a last data record within the same transaction in the Event Topic and the transaction is closed. Using a transaction makes sense so that in the event of an error, our consumers do not have to face incomplete batches.

To record the progress of the producer, besides the obvious use of the Event Topic, there are other possibilities: if it is a file, it can be renamed on completion or other semaphore files can be created (e.g. the famous OK file).

If the Event Topic is used, in most cases it is necessary to adjust the Retention Period, i.e. the time that regulates the retention of records in a Topic, for the Event Topic. Let’s take a file that is read in for better understanding: if the status of processing is managed by the Event Topic, it is necessary to ensure that the associated record in the Event Topic is not deleted before the source file is deleted (or archived). The producer would otherwise assume that the source file, to stay with our example, has not been processed yet.

Kafka Connect can also be used as a basis for the producer. However, there are also some pitfalls and peculiarities here that should be known in detail. However, the topic Kafka Connect would go beyond the scope of this article, so only this much is said: pay special attention to the behavior in case of error and the actual guarantees if you use Kafka Connect. Only a few connectors meet the requirements necessary for this use case. Also, in this context, it is important to configure the Producer (of the Kafka Connect connector) with the correct acks setting (e.g., “acks all”).

The structure of the consumer

The consumer’s setup is much simpler than the producer’s setup: our consumer listens for new records in the Event Topic and, as soon as a new record appears, jumps to the data-bearing Data Topic. The associated records are identified by the key (Event Id) and consumed successively.

Duplicates are filtered via the Id of the individual data record, e.g. by saving the data records as a map. The processing of all data is started as soon as the last data record has arrived.  The last data record results from a comparison of the data records of the batch with the number of data records that are already available.

In order to avoid the acknowledgement of the data records in the meantime, i.e. before the batch processing is completely finished, the automatic acknowledgement of data records, which is activated by default, has to be deactivated via the following parameter for the consumer, especially for the event topic:

enable.auto.commit = false

Instead, the consumer must manually acknowledge successful processing on the Event Topic record once the batch processing is complete. Afterwards, of course, all related data of the completed batch should be deleted from memory.

Reprocessing

While batch processing could have been implemented exclusively via a single topic, i.e. without an event topic, in practice a dedicated topic, which contains the information about the available batches, is considerably easier in operation and indispensable for the selective reprocessing of batches, as I will explain in a moment.

If the processing of a batch is to be triggered again, this is possible by simply copying the data record of the desired batch in the Event Topic and creating it as a new data record. Our consumer will thus process this batch again.

To do this comfortably, I recommended to use a Kafka UI, such as Kadeck. The functionality to copy records or forward them to other topics is available in the free versions of Kadeck, be it the desktop or web service version for teams. Make sure to try it out.

As mentioned above, this procedure is indispensable for the selective reprocessing of a batch. To avoid a separate Event Topic, one could consider changing the position of the offset of a consumer externally afterwards and thus perform a reprocessing. However, if the batch to be reprocessed is not the last record in the Batch Topic, i.e., there are subsequent batches that have already been processed, these subsequent batches would also be reprocessed. The consumer would again work his way up from the desired, new position to the last data record. In addition, the traceability of this procedure suffers greatly, which is why it is not recommended.

Error processing

At this point it should be noted that in a persistent error case, i.e. one that is not solved by repeated attempts (e.g. incorrect data records, parsing errors), the batch must be acknowledged as processed. If this is not considered, the process chain risks to come to a halt: the batch would be endlessly handed over by Apache Kafka until it is processed. So it makes sense to catch any persistent errors that may occur during the processing of a batch, and thus prevent the consumer from crashing and starting to process the same, erroneous batch again.

The intercepted error can be reported externally either in log form or via a dedicated error topic.

The bottom line

So it is possible to implement a classic batch processing with Apache Kafka and thus to make the advantages of data streaming usable in an environment that is still based on classic batch processing. Especially in typical data integration scenarios, the properties of data-streaming such as reliability, scaling and decoupling can be used while still implementing classic batch processing.

A frequently mentioned disadvantage compared to conventional batch processing with files is that business departments can look into these files at any time and detect errors. However, this can be quickly remedied with an appropriate software solution for data monitoring. Once the data is available in Apache Kafka, complete analyses and reports can be created in real-time or data records can even be manually corrected and reprocessed.

Did you like the article, do you have questions or further ideas?

Feel free to let me know (Twitter: @benjaminbuick or the Xeotek team via @xeotekgmbh)!

Ben

How to implement Batch Processing with Apache Kafka

Software architect and engineer with a passion for data streaming and distributed systems.