Jobs vs Events
By Programming on Mon 01 January 2024
inKafka is popular as a backbone messaging service to connect services. You may be used to using background job processing services like Resque in Ruby or Oban in Elixir, and wonder whether you can replace them with Kafka. In practice, both have their benefits.
This post discusses issues that come up when using Kafka and architectural patterns that can help.
Kafka architecture
Kafka is fundamentally different from Resque and other messaging systems like RabbitMQ. These systems have items that are added to queues, processed, and removed. Kafka instead uses a "topic", which acts as a log or ledger. Publishers add records to the end of the log, and they never go away. The Kafka server assigns each record an ever-increasing integer "offset" which indicates its position in the log. For practical reasons, we may purge old records from the system, e.g., to save space, but the offset keeps increasing.
Multiple independent applications can read from the same topic. Each application keeps track of which records it has processed by remembering the offset. One application might start at the beginning of the available records and read each one by one, doing some work or analysis. Another might wait for new records and perform some action.
This architecture makes Kafka great as a permanent record of events, but it doesn't specify what should be done with the records. Most importantly, it doesn't cover error handling.
For example, assume we have an application that looks for new items in the product catalog and puts them into Elasticsearch. It reads a record from Kafka, parses the data, creates a JSON document, and submits it to Elasticsearch for indexing.
Permanent failures
If the app cannot parse a data record, then it needs to log the issue and continue, otherwise, the bad record blocks further processing. The failure is “permanent”, i.e., it's fundamental to the data.
In practice, it may be that changing the code would allow the data to be handled, and we could reprocess the failed records. We also need to be able to debug problems with the system, figuring out where the data came from and how often the issues are occurring. A common strategy is to write records to a "Dead Letter Queue" (DLQ) which we can monitor, review, and reprocess.
The big difference between normal processing and DLQ processing is error handling. When processing normally, we read a record from Kafka, try to parse it, and if it fails (or there is some other permanent error like data validation), we put it in the DLQ.
When processing the DLQ, we start at a particular offset in the DLQ and try to parse records. If they succeed, we do the normal processing. Otherwise we just move on to the next record. As we process the DLQ the same way as regular records, doing it in the same application generally makes sense. We can just logically "poke" the consumer and tell it to process the DLQ. When processing, we may want to keep track of which records we have already processed and avoid processing them again. This depends on the service, however, whether processing is idempotent and whether processing records out of order matters.
One of the most significant use cases is having a bug or version skew in the consumer, causing mass failures in parsing input records. In this case, we can update the code and then reprocess the DLQ, most of which would succeed. This can result in huge numbers of messages on the DLQ. We need to monitor the number of messages and the rate of errors, then ops in case of problems.
Transient failures
Temporary problems are more common in everyday processing. It might be that the target system, e.g., Elasticsearch, is unavailable or overloaded. The application needs to retry if it can't connect or rate-limit processing. A naive sender can overwhelm a target system, "kicking it when it's down".
Kakfa is generally silent about these kinds of error-handling strategies, as each application has different requirements. The application or programming language framework needs to make its own decisions. It needs to handle retry logic, rate limiting, duplicate processing, and coordinate between multiple processes working together.
Job frameworks
Job processing frameworks such as Resque focus almost entirely on the transient processing of messages. They consider each job to be unique. They have sophisticated functions to register the job, schedule it, retry it if it fails, and troubleshoot the system.
Resque is often used to deal with the fact that Ruby doesn't do concurrency particularly well and that Rails processes are heavyweight, taking up a lot of system resources. It is useful for performance to split processing into interactive parts and asynchronous background processes.
For example, in an e-commerce system, the user creates an order, and the application responds immediately with "Thank you for your order". It then triggers multiple background jobs, e.g., sending a confirmation email, running anti-fraud checks, and starting fulfillment. Any one of these processes might fail temporarily and be retried.
Sometimes we do work in the background for practical reasons. A good example is starting a report process which might take a long time to complete. If we could reliably handle it immediately from the same process, we would. It reduces system resources, however, to trigger a background job, returning immediately and then notifying the user when the job is done. It may also provide a better user experience.
Leveraging both
In practice, job handling systems are convenient to use and mature. We can combine Kafka for "events" and Resque/Oban for "jobs". A Kafka consumer simply reads records from a topic and creates job records, which it schedules for processing. We still need to avoid overloading the job system, but otherwise, the error-handling logic is relatively simple.
Access to data
A key question is how the application accesses the data it needs to process.
One option is that the record has everything needed to perform the work. Another is that it only has a reference to the data, and the app reads the data from an external source.
For simple events, we can put everything into the job. For example, we could generate an event whenever someone fails to log into the system. It might be an account ID, email address, timestamp, and metadata like IP address and browser. An anti-fraud system could look at that and identify that we are under attack from bots.
The data could be huge, though, with a complex schema. For example, when we create a new item for sale, we probably don't want to serialize it to JSON and put it into Kafka. The processes reading the record would need to understand the format and extract the parts they care about. Evolving the schema over time is hard, and we risk getting out of sync, resulting in many parse failures (see the DLQ discussion above).
If everything is on the same system, then we just need a database ID, and the processor can read from the same database using, e.g., the same ActiveRecord models.
In a services architecture, we create services that logically own data and other applications can access the service via an API to get the data they need. For example, we might have a Customer service that handles registrations and manages information such as shipping addresses and payment methods. The service would publish an event for a new registration and another whenever it changes. The event includes the customer's unique identifier that API consumers can use to access the data. Similarly, we can have a Catalog service that manages information about items for sale.
We could use gRPC for service-to-service communication, as it gives better performance, but the schema is relatively rigid. Using a GraphQL API may make clients more resilient to schema updates.
Another option is a “change data capture” stream. Whenever the system of record changes, it sends an update with the change. For example, if a customer changes their delivery address in a Customer service, the event might include the new address as a key/value pair. Receiving systems can then handle the update directly.
A similar pattern can be used for systems that require an audit trail that indicates who made a specific change. Kafka is ideal for this, as it provides an immutable record of changes over time, as opposed to retaining only the current value.
Events
A relatively generic event schema could be as follows:
- Event type, e.g.,
CUSTOMER_CREATED
- Key/value data related to the event, e.g.,
customer_id
- Timestamp
- Source, e.g.,
CUSTOMER_SERVICE
As an example, the order process could create an ITEM_SOLD
event, and
potentially multiple consumer processes would be interested in it. So one might
read the event, read the details of the item, then write a commissions table.
Similarly, publishing a product could trigger indexing Elasticsearch for it.
Event sourcing
Event sourcing is an architecture where a system records a series of updates, and other systems consume those updates to keep their own state of the world.
This is particularly helpful when tracking a series of changes over time. For hundreds of years, this has been the approach used for a financial ledger. For example, a bank account tracks deposits and withdrawals over time, keeping a running balance.
Ledgers are great for financial systems but may be too much detail for inter-service communication.
It is common to use this approach in hospital information systems to synchronize between systems. When a patient registers, it creates a new patient record, and any systems that might need to interact with patients (e.g., the pharmacy) can create a corresponding record in their system. If a change or deletion occurs, the patient management system can send an update message, and the systems can update their records.
Data ownership
Generally speaking, we would prefer to have one system that owns the data and keeps a single point of truth. This system generates events when it changes.
In a services architecture, however, we might have multiple systems that generate events based on their responsibility, and we need to put the pieces together to get the whole picture.
For example, we might have a catalog that owns product items, an order management system that records sales of items, and a logistics system that records shipments of the orders. These systems generate change events that may be incidental to their internal processing, but are useful triggers for other systems.
When a customer buys a product, we might need to update the quantity on hand in an inventory management system. That might trigger the product detail page to show a different availability date.
We might also separate internal systems from the services needed to run the public website, improving public site reliability and scalability.
It can be helpful to think about the business processes as a flow of data, coordinated between multiple systems. For example, we have the internal processes associated with defining products, then we "publish" them to the website. At that point, the public catalog, order handling, and customer management systems become critical. After an order is placed, internal fulfillment and accounting systems take over.
In between, some processes need to manage product data as a whole, e.g., merchandising looks for products in the Catalog which match conditions and create a marketing campaign for them. Machine learning processes may analyze all the products in the database to determine pricing, group similar products, create suggestions, or optimize product descriptions for SEO.