Building Reliable Kafka Producers and Consumers in .NET

Sending messages reliably and rapidly between providers is a core requirement for many distributed methods. Apache Kafka is a well-liked, sturdy message dealer that permits purposes to course of, persist and re-process streamed information with low latency, excessive throughput, and fault tolerance. In case you are a newbie in Kafka, please contemplate studying the articles in my Apache Kafka collection to rise up to hurry very quickly.

Certainly one of my Twitter buddies, amongst others, reached out to me to grasp easy methods to implement retries in Kafka. I’m at all times trying to find subjects to discover and information my viewers, and Twitter is a good medium to work together with me (cue to affix me on Twitter).

Within the following sections, we’ll take a look at key configurations and implementations of the producer and client that assist construct dependable purposes with Kafka. Let’s first talk about the necessities for constructing a dependable client software.

Dependable Shoppers

There are three fashions by which Kafka can ship messages to a client:

  • A minimum of as soon as: That is the default processing mannequin of Kafka. On this mannequin, a client commits the offsets after processing the batch of messages it receives from Kafka. In case of an error, the patron will obtain the messages once more, and therefore it must be idempotent.
  • At most as soon as: On this mannequin, the patron commits the offsets proper after receiving a batch of messages. If, throughout processing, the patron encounters an error, the messages will likely be misplaced.
  • Precisely as soon as: Stream processing purposes learn information from a Kafka subject, course of it, and writes information to a different subject. In such purposes, we will use the Kafka transaction API to make sure that a message is taken into account consumed solely whether it is efficiently written to the vacation spot subject.

No matter the supply mannequin you employ, it’s best to design client purposes to be idempotent for top reliability. To grasp why a client would possibly obtain the identical message a number of occasions, let’s research the workflow adopted by a fundamental client:

  1. Pull a message from a Kafka subject.
  2. Course of the message.
  3. Commit the message to the Kafka dealer.

The next points could happen throughout the execution of the workflow:

  • State of affairs 1: Client crashes earlier than committing the offset. When the patron restarts, it is going to obtain the identical message from the subject.
  • State of affairs 2: Client sends the request to commit the offsets however crashes earlier than it receives a response. Upon restart, the patron will likely be indeterminate as a result of it would not know whether or not it efficiently dedicated the offsets. To resolve its state, it is going to fetch the messages from the outdated offset.

For exactly-once processing, the Kafka producer should be idempotent. Additionally, the patron ought to solely learn dedicated messages (by setting isolation stage to read_committed) of a transaction and never the messages from a transaction that has not but been dedicated. Nevertheless, there are caveats to exactly-once processing in each the producer and the patron purposes. Idempotence within the producer software cannot assure that the producer can’t produce and queue duplicate messages. Additionally, if the processing of a message entails exterior providers, similar to databases, and providers, we should make sure that they’ll additionally assure exactly-once processing. The exactly-once processing requires cooperation between producers and customers, which is perhaps arduous to attain in a big distributed software.

For dependable processing of occasions by a client, the next three configurations are essential:

  1. group.id: If a number of customers have the identical group ID, Kafka will allocate a subset of partitions to every client, and they also will obtain a subset of messages. To learn all messages from a subject, the patron ought to have a novel group ID.
  2. auto.offset.reset: This parameter controls the offset from which the patron will begin receiving messages when the patron first begins or when the patron asks for offsets that do not exist within the dealer. In case you set the worth to earliest, the patron will begin studying messages from the start of the partition. In case you set the worth to newest, the patron will begin studying messages from the top of the partition.
  3. allow.auto.commit: For dependable processing of messages, with as few reprocessing of duplicate messages as attainable, it’s best to commit the offsets manually in your code. You possibly can examine the implementation of a dependable client in my earlier article on Kafka Occasion Shoppers. In case you select to commit offsets manually, it is going to negate the setting auto.commit.interval.ms, which controls how usually the messages are routinely dedicated. For computerized commits, maintaining the worth of this setting low ensures that you’ll not obtain many duplicate messages when a client abruptly stops.

Let’s now talk about the steps to implement a dependable Kafka producer software.

Dependable Producers

Assuming that the brokers are configured with essentially the most dependable configuration attainable, we should make sure that the producers are configured to be dependable as nicely.

The next producer settings are needed to make sure that our producer would not by accident lose messages. You possibly can learn extra in regards to the particular person settings intimately within the producer configuration part of the Confluent SDK documentation:

  1. acks: This area determines the variety of acknowledgments that the chief dealer ought to obtain from in-sync replicas earlier than responding to the shopper. Setting it to all makes the chief dealer block the request till all in-sync replicas commit the message. It’s the most secure possibility and offers the very best reliability.
  2. message.ship.max.retries: The worth of this setting determines the variety of occasions to retry sending a failing message.
  3. retry.backoff.ms: Backoff time in milliseconds earlier than retrying an operation.
  4. allow.idempotence: Setting the worth of this property to true ensures that the producer doesn’t produce duplicate messages and the messages are delivered so as inside the partition.

Following are the 2 kinds of errors (as response error codes) that the dealer will return to producers:

  1. Retriable errors: These are transient errors similar to LEADER_NOT_AVAILABLE, which the producer will try and get well from routinely. After the producer exhausts the interval specified within the message.timeout.ms setting (default 300000), the producer will throw an exception that must be dealt with within the code.
  2. Everlasting errors: Errors similar to INVALID_CONFIG can’t be resolved with retries. The producer will obtain an exception from the dealer that must be dealt with within the code.

Do not forget that retries would possibly result in writing duplicate messages to the dealer. Subsequently, it’s a affordable design consideration so as to add a novel identifier to the messages, which can assist the customers detect duplicates and clear them earlier than processing a message. If the patron is idempotent, then processing duplicate messages can have no impression on the correctness of the appliance’s state.

A wonderful method to managing errors within the producer software is to leverage the producer’s retry mechanism, deal with the exceptions, retailer the message in searchable logs or databases, and lift alerts for handbook intervention.

Supply Code

Please obtain the supply code of the pattern software from the GitHub repository.

The pattern software incorporates a dependable producer implementation and a easy client that you need to use to construct your purposes.

Constructing a Dependable Producer

Create a brand new .NET Core console software and add a category named Producer to it. Set up the Confluent.Kafka NuGet bundle to the appliance.

Set up-Package deal Confluent.Kafka

Let’s instantiate a ProducerConfig object with the settings required to construct a dependable producer.

public Producer(string bootstrapServer)

    _producerConfig = new ProducerConfig
    
        BootstrapServers = bootstrapServer,
        EnableDeliveryReports = true,
        ClientId = Dns.GetHostName(),
        // Emit debug logs for message author course of, take away this setting in manufacturing
        Debug = "msg",

        // retry settings:
        // Obtain acknowledgement from all sync replicas
        Acks = Acks.All,
        // Variety of occasions to retry earlier than giving up
        MessageSendMaxRetries = 3,
        // Length to retry earlier than subsequent try
        RetryBackoffMs = 1000,
        // Set to true in case you do not need to reorder messages on retry
        EnableIdempotence = true
    ;

Create a operate named StartSendingMessages that may write alphabets from A to Z as messages to the dealer. Let’s start with constructing a producer that writes logs and errors (transient and everlasting) within the desired format. You may as well use the customized handlers to put in writing the logs and errors to your alternative of log service, e.g., Splunk.

public async Activity StartSendingMessages(string topicName)

    utilizing var producer = new ProducerBuilder<lengthy, string>(_producerConfig)
        .SetKeySerializer(Serializers.Int64)
        .SetValueSerializer(Serializers.Utf8)
        .SetLogHandler((_, message) =>
            Console.WriteLine($"Facility: message.Facility-message.Degree Message: message.Message"))
        .SetErrorHandler((_, e) => Console.WriteLine($"Error: e.Purpose. Is Deadly: e.IsFatal"))
        .Construct();
    ...

Let’s begin producing some messages and file any points. I choose to kill the producer course of in case of everlasting failures to keep away from including too many errors or failed messages to my log shops.

Key = DateTime.UtcNow.Ticks,
Worth = message
);

Console.WriteLine($”Message despatched (worth: ‘message’). Supply standing: deliveryReport.Standing”);
if (deliveryReport.Standing != PersistenceStatus.Endured)

// supply might need failed after retries. This message requires handbook processing.
Console.WriteLine(
$”ERROR: Message not ack’d by all brokers (worth: ‘message’). Supply standing: deliveryReport.Standing”);

Thread.Sleep(TimeSpan.FromSeconds(2));

}
catch (ProduceException e)

Console.WriteLine($”Everlasting error: e.Message for message (worth: ‘e.DeliveryResult.Worth’)”);
Console.WriteLine(“Exiting producer…”);
” data-lang=”textual content/x-csharp”>

attempt

    Console.WriteLine("nProducer loop began...nn");
    for (var character="A"; character <= 'Z'; character++)
    
        var message = $"Character #character despatched at DateTime.Now:yyyy-MM-dd_HH:mm:ss";

        var deliveryReport = await producer.ProduceAsync(topicName,
            new Message<lengthy, string>
            
                Key = DateTime.UtcNow.Ticks,
                Worth = message
            );

        Console.WriteLine($"Message despatched (worth: 'message'). Supply standing: deliveryReport.Standing");
        if (deliveryReport.Standing != PersistenceStatus.Endured)
        
            // supply might need failed after retries. This message requires handbook processing.
            Console.WriteLine(
                $"ERROR: Message not ack'd by all brokers (worth: 'message'). Supply standing: deliveryReport.Standing");
        

        Thread.Sleep(TimeSpan.FromSeconds(2));
    

catch (ProduceException<lengthy, string> e)

    Console.WriteLine($"Everlasting error: e.Message for message (worth: 'e.DeliveryResult.Worth')");
    Console.WriteLine("Exiting producer...");

The Kafka SDK has the smarts to retry on failures, so we needn’t retry failed operations ourselves.

To check the appliance, we require a easy client that consumes the messages that the producer provides to the dealer. You possibly can observe my step-by-step directions to construct a client software intimately in my earlier article from the Kafka collection.

Demo

You will see that a Docker Compose specification within the GitHub repository, which you need to use to arrange an area Kafka cluster. You possibly can learn the elements that make up the specification in my earlier weblog submit. Alternatively, you create and use Azure Occasion Hubs for debugging the appliance.

Launch two situations of the appliance and execute the producer in a single and the patron within the different as follows:

Producer and Consumer in Action

Producer and client in motion.

You possibly can kill the Kafka container whereas the producer continues to be operating to simulate an error which can elevate an exception. On encountering an exception, it’s best to log the message that you simply did not ship so as to resume processing from there simply.

Producer Retries and Failure

Producer retries and failure.

Conclusion

Constructing a dependable producer and client software may be very simple with the Kafka SDK. Do not forget that making your client software idempotent will protect your system from falling aside when it receives a reproduction message. Precisely-once message supply may be very a lot attainable in Kafka however requires tight coordination between the producer and client. Because the producer and client purposes develop in quantity and complexity in a corporation, this coordination is usually not attainable. In a dependable system, each part: the dealer, the producer, and the patron should be dependable individually.




Supply hyperlink

About PARTH SHAH

Check Also

Galaxy Unpacked August 2021: Official Trailer

Change is the one fixed on the earth of innovation. By driving new concepts ahead …

Leave a Reply

Your email address will not be published. Required fields are marked *

x