Getting Started with Kafka and .NET Core on Kubernetes

Real time streaming is at the hard of  many modern business critical systems. The ability for data to be constantly streamed can give organizations new insights into their data and the real time nature means these trends can be seen in real time, creating possible value streams for organizations.

It being so popular there are many options available from cloud hosted to self hosted. One of the big ones is Apache Kafka which enables this sort of “pubsub on steroids” that enables it to scale its data injest and streaming capabilities to fit the needs of almost any organization. In this post, I want to walk through a basic setup of Apache Kafka using Bitnami’s Helm Chart.

Prepare your cluster

Truth be told, I did this using an Azure Kubernetes Service cluster that I spin up on occasion that has three large VMs backing it. I have found using things like Kubernetes for Docker, minikube, and others that you run into resource limitations that make it hard to deploy. For that reason, either give your local cluster an immense amount of resources or using a cloud managed one. I recommend Azure simply because by default the AKS cluster is backed by a scale set that you can Deallocate as needed – saves immensely on cost

Bitnami Helm Chart

I love Helm because it enables quick deployment of supporting software that would otherwise take a lot of reading and learning and tweaking. Instead, you can use Helm to execute this: https://github.com/bitnami/charts/tree/master/bitnami/kafka

The instructions above are actually written to target Helm2, latest is Helm3. For the most part its similar enough, though I love the removal of Tiller in Helm3. Syntax of the command is a little different – here is what I used:

helm install kafka-release –namespace kube-kafka bitnami/kafka

This creates a release (an instance of deployment in Helm terminology) called kafka-release places the deployed components in a Kubernetes namespace called kube-kafka and deploys resources based on the bitnami/kafka Helm chart – if you look at the above link there are many ways to override how things are deployed via this chart

After running the command, Helm will start deploying resources which then have to spin up. In addition, it will layout some instructions for how you can play with the Kafka cluster on your own, using a temporary pod. But before you can do this, the deployment must enter the Ready state. Best to run the following command to know when things are ready:

kubectl get pods -n kube-kafka –watch

This will watch the Pods (which are the things that have to spin up due to container creation) and when both are ready you can safely assume Kafka is ready.

Kafka oversimplified

So, I am still learning the architecture internally of Kafka. I know that it is basically a message broker system that enables a multicast eventing to various topics that can have multiple subscribers. But I honestly need to do more to learn its internals – suffice to say for this exercise, you send a message to a topic in Kafka and all listeners for that topic will receive the message.

The term Producer and Consumer will be used throughout the remainder of this post. Producer sends data to the cluster nodes, and Consumers receive that data.

Create the Producer

Our producer will be rudimentary and over simplified but just to get the idea of the sort of structure these types of applications take. Remember, this is not Production level code so, copy and paste at your own risk.


using System;
using Confluent.Kafka;
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig{ BootstrapServers = "kafka-release.kube-kafka.svc.cluster.local:9092" };
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
var increment = 0;
while (increment < int.MaxValue)
{
try
{
var produceResult = producer.ProduceAsync("increment-topic",
new Message<Null, string> { Value = $"The number is {increment}"})
.GetAwaiter()
.GetResult();
Console.WriteLine($"Success = {produceResult.Value} to {produceResult.TopicPartitionOffset}");
}
catch (Exception ex)
{
Console.WriteLine($"Error {ex.Message}");
}
increment++;
}
}
Console.WriteLine("Hello World!");
}
}
}

view raw

producer.cs

hosted with ❤ by GitHub

So to start you need to add the Confluent.Kafka NuGet package (current version as of this writing is 1.4.0).

Next, create a config type and set the BootstrapServers – this is the server your code will contact to setup the message broker and send messages to based on where that broker ends up (not sure how all of that works yet). Suffice to say, when you finished running your Helm install this is the Service DNS name you are given – it follows the standard convention used by Kubernetes to name services.

For our example we cycle over all of the int values available to .NET (all the way up to int.MaxValue) so we can keep our producer going for a long time, if need be. For each iteration our code simply writes a message to the broker indicating the current iteration number.

We use the ProduceAsync method to send this message to Kafka – we use a try/catch here to catch any sending errors. Everything is written out to STDOUT via Console.WriteLine.

One of the key arguments to ProduceAsync is the name of the topic to associate our message to. This is what our consumers will listen to so a single message sent to this topic can be fanned out to as many consumers as are listening. This is the power of this type of architecture as it allows for event based processing with a high degree of decoupling. This allows different parts of our application to simply respond to the event rather than being part of a longer chain of functions.

Build the Consumer


using System;
using System.Threading;
using Confluent.Kafka;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "kafka-release.kube-kafka.svc.cluster.local:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("increment-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Message: {consumeResult.Message.Value}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Thread.Sleep(100);
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
}
}

view raw

consumer.cs

hosted with ❤ by GitHub

As with the Producer, the first step here is to add the Confluent.Kafka NuGet package


using System;
using System.Threading;
using Confluent.Kafka;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "kafka-release.kube-kafka.svc.cluster.local:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("increment-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Message: {consumeResult.Message.Value}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Thread.Sleep(100);
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
}
}

view raw

consumer.cs

hosted with ❤ by GitHub

As with the Producer, our first step is to add the Confluent.Kafka Nuget package so we can use the built-in types to communicate with Kafka.

You can see with the Consumer that we subscribe to our topic (increment-topic in this case). I was a bit surprised this was using the built in .NET eventing model since I think that would make more sense. Instead, we have to create a busy loop that attempts to consume each time and checks if it gets anything.

From there we just bring the message we received.  You notice that our BootstrapServers value is the same as it was in the Producer, should be – we are writing and reading from the same Broker.

The GroupId, I do not know what this is – need to read up on it but I set it all the same.

Testing our Example

Our goal is to run against Kafka hosted in Kubernetes and thus we will want both our Producer and Consumer there as well. You could use kubectl port-forward to expose the Kafka port locally but, I didnt have much luck with that as the code would immediately try to call a Kubernetes generated service name which was not exposed. I might tinker with this some more. My main goal with this exercise is to see this working in Kubernetes.

The best way to do this, or at least the simplest is to create a vanilla pod with the consumer and producer executing as separate containers. Truthfully, you would never want to use a vanilla PodSpec in Production (usually you want it in a Deployment or ReplicaSet) since doing so would not maintain high availability and resiliency (if the Pod dies, everything stops, it doesnt get recreated). Nevertheless, for this simple example I will be creating a multi-container pod – we can check the log files to ensure things are working.

Here is the PodSpec:


apiVersion: v1
kind: Pod
metadata:
name: kafka-example
namespace: kube-kafka
spec:
containers:
– name: producer
image: clusterkuberegistry.azurecr.io/kafkaproducer:v2
– name: consumer
image: clusterkuberegistry.azurecr.io/kafkaconsumer:v1

view raw

podspec.yaml

hosted with ❤ by GitHub

I will assume that you know how to build and push Docker images to a repository like Docker Hub (or Azure Container Registry in this case).

Next, we apply this spec file to our Kubernetes cluster

kubectl apply -f podspec.yaml

And we can use the following to wait until our pod is running (we should see 2/2 when ready)

kubectl get pods -n kube-kafka

Next let’s get a sample of the logs from our Producer to ensure things are connecting our messages are going out. We can run the following command (using the PodName from the PodSpec above):

kubectl logs kafka-example -n kube-kafka producer

If things are working you should see a lot of message indicating a message send. If you see errors, double check that you typed everything right and that, in particular, your BootstrapServer value is correct (you can use kubectl to query the services in the deployed namespace to ensure you have the right name)

Assuming that is working, we can perform a similar command to see the logs for the Consumer:

kubectl logs kafka-example -n kube-kafka consumer

Once again, if all things are working you should see messages being read and displayed in the console.

Congratulations!! You have just completed your first end to end Kafka example (maybe its not your first but congrats all the same).

Why is this important?

I talked about real time applications at the onset – these are comprised, generally, of event streaming whereby as data enters the system it causes events to be generated (often times the events are the data) which can be sent all over the system. It can create records in a database, updated metric counters, flip switches, and many other things – this is actually the basis behind IoT applications, streaming a tremendous amount of telemetry data and using systems like Kakfa to analyze and process that data in real time.

I love these types of applications because they enable so many unique business scenarios and really cool real time charts. In my next post, I hope to take my Kafka knowledge further and do something more useful.


apiVersion: v1
kind: Pod
metadata:
name: kafka-example
namespace: kube-kafka
spec:
containers:
– name: producer
image: clusterkuberegistry.azurecr.io/kafkaproducer:v2
– name: consumer
image: clusterkuberegistry.azurecr.io/kafkaconsumer:v1

view raw

podspec.yaml

hosted with ❤ by GitHub

One thought on “Getting Started with Kafka and .NET Core on Kubernetes

Leave a comment