Playing with Kong Gateway

I recently took some time to play with Kong Gateway – which supports a Kubernetes compatible Ingress controller. I find that by playing with the various Ingress controllers I can get a better sense of their abilities and how they can relate to common use cases such as Strangler Pattern and Microservice management.

Setting Up

I am a big fan of Helm Package Manager and I used that to install Kong to my Kubernetes cluster. Kong lays the instructions out nicely: https://docs.konghq.com/2.0.x/kong-for-kubernetes/install/

Here is the sequence of commands I used – note I am using Helm 3

helm install kong-gateway kong/kong --namespace kong-gate
   --set ingressController.installCRDs=false

Running this command with the appropriate configurations will install the Kong components to your Kubernetes cluster in the kong-gate namespace.

Configuration

Kubernetes leverages components like Kong through Ingress resources that identify the path and what class of Ingress to use, this is where you indicate to use Kong. Here is the configuration for the Ingress I arrived at.

Ingress components use annotations to not only define which Ingress type to use but also what configurations can be applied to that Ingress route (as defined by the CRD). In this case, you can see three custom annotations with the konghq identifier. This link lays out the various annotations that are supported: https://github.com/Kong/kubernetes-ingress-controller/blob/master/docs/references/annotations.md

In this case, weather-service is a Kubernetes Service that references the pods that contain the source code. Next, we want to leverage the Kong plugin system to apply rate limiting to this Ingress route.

Plugins

One of the aspects that makes Kong better than the standard Nginx Ingress controller is the bevy of plugins supported which can make common tasks much easier. There is a fully catalog of them here: https://docs.konghq.com/hub/

This part was more difficult because there does not seem to be a lot of documentation around how this works. I ended up stumbling upon a GitHub issue that showed some source that helped me see how this works – here is the plugin configuration code that I arrived at:

For reference to this, here is the “home” screen for the plugin – https://docs.konghq.com/hub/kong-inc/rate-limiting/. From here you can get a sense of the configurations. Most of it is shown as sending CURL commands to the Kong Admin API. But it turns out you can follow the model pretty easily when defining your KongPlugin.

The key connector here is the name (weather-rate-limit) and its use in the annotations of the Ingress route (see above). This is how the Ingress knows which Plugin configuration to use. Also important is the plugin name value pair which defines the name of the plugin being configured. This is the same name as is listed in the Kong plugin catalog.

I used this with the default .NET Core Web API example that returns randomized Forecast data. I was able to successfully send 6 requests in sequence and get a Too Many Requests message on the sixth. My next challenge will be JWT token validation.

Thoughts

Ingress controllers like Kong, and Envoy, Traefik, and others are essential tools when dealing with Kubernetes. Not only can they make dealing with Microservices easier but they can also lend themselves to making the break up of a monolith through the Strangler Pattern easier.

Building an Real Time Event Processor – Part 4

I always believe in reflecting on one’s design and always trying to get a better understanding of the problem. In this sample, there are areas that are rather obvious for improvement.

Do you really need that queue?

In Part 1, I discussed a Timer job I had created which fed data into a Service Bus queue. This data is summarily dequeued in the next job and its data written off to Event Hub (one name at a time) and Blob storage (where the raw JSON can be stored). So I wondered if I actually needed to have that queue.

The answer is yes, here is why – the way the bindings work is it happens not when the method returns or data is written to the out parameter, it happens at the very start. Since our starter here is a Timer there is no incoming data to suggest what the id value of the outgoing blob will be.

So the queue really one serves the purpose to create an input with the id value embedded in it so the binding can work, which is why the blob return works for the Dequeue method but cannot work for the trigger.

What I can do is change the code so it looks like this:

As you an see, we moved the Event Hub code to the Timer function so the purpose of the Dequeue method is literally just to write the blob. I honestly dont like this even though it make more sense. I just dislike having an Azure function that is so simplistic that I feel like that functionality should just exist somewhere else.

What about storing Time Series data?

Initially I thought Redis might make a lot of sense since I cam easily expire old aggregate data – keep in mind, often the historical data for aggregates are not important as they can be derived from raw historical data sources. Further, storing excess data for systems that are high volume adds to cost. Work with your teams and leaders to determine how much historical aggregate data makes the most sense.

Azure does offer a platform know as Time Series Insights which is designed for MASSIVE scale, usually output from an IoT style platform where you are gathering telemetry data from all over the world. Such a thing would dwarf what we are after with this.

Storing data for these systems is always a challenge. In this situation the best solution is to write a Timer Azure function that deletes data from Cosmos to ensure ONLY the data for the time period that makes the most sense is kept. Again, this data can be derived again from historical data sources.

Concluding Thoughts

I was amazed at how easy this process was. If you really focus on Azure Functions being the glue in a distributed process it is amazing how much functionality you can achieved. I think I wrote around 40 lines of code in the backend, and I got so much.

When you start to work on real time or streaming data platforms it really is important to have conversations, discuss your data needs, and try new technologies. The simple truth is, you can pay for the top tier Azure SQL database and likely handle the volume but, your costs will be enormous. Understanding other options and patterns can help select a strategy that not only works but is cost effective.

I hope you have enjoyed this series. Good lucking with Azure and everything else.

Part 1

Part 2

Part 3

 

Using MSI with API Management and Azure Storage

I always enjoy applications where I can achieve complex functionality while writing the least amount of code. I believe that, by doing so, I can reduce the amount of mainteance and application requires as well as reduce the possibility of bugs or other problems because I am relying on functionality that others have tested; in the case of REST APIs for Azure services, I am relying on code that millions of people use every day.

To this end, I have been constantly researching the concept of “codeless” applications. This is not necessarily devoid of code but, limits the amount of custom code that is to be written. Central to this are API Gateway like tools, such as the component found in Azure API Management.

In this blog post, I want to create a set of operation in API Management that allows me to save and retrieve an image placed in a Blob storage account. Using this approach maximizes security and eliminates the need to store Access Keys or other sensitive information.

Provision our Infrastructure and Identities

For this sample you will need to create an Azure API Management instance (20m to deploy) and a Storage Account.

To start, access your API Management service and select the Managed Identities under the Security section. Set the Status to On and click Save.

Once you hit Save Azure will get to work create a new Service Principal with the same name as your API Management instance. You will want to remember this for later steps.

Create Read/Write Role Access for Azure Storage

Open your Azure Storage account and select the Access Policy (IAM) option. We need to add two role assignments to this Storage Account.

Note: for simplicity we are adding this access policy at the account level, which means the Role could talk to any Storage service in the account. To further limit this, create the role at the specific service level.

From the Access Policy (IAM) main page select Add a role assignment. Here on this page we can associate certain users with roles that grant them permissions to the storage account. We are interested in using two roles for our needs:

  • Storage Blob Data Reader
  • Storage Blob Data Contributor

For the role select Storage Blob Data Reader.

Make sure the Assign access to is set to the value which indicates the assignment will be to a service principal.

The Select field should be set to your user. This field supports search so, just type your APIM instance name and select it when it appears.

Click Save and repeat for Storage Blob Data Contributor.

This is all you need to do for Storage account. Make sure to create a Container and make a note of it. You will need it for the next step.

Create the Operations in API Management

To start with we need to create an API that can hold the operations we will call to save and retrieve the images we save.https://gist.github.com/xximjasonxx/273a751fd2011c91ffd06e804eafaaa9

Click APIs under General.

As you create this API fill out the fields as you desire BUT ensure to set the Web Service Url to the base URL of your storage account + container. For example, my storage account is called mystorageaccount123 and the container is called images. Therefore my base URL is:

https://mystorageaccount123.blob.core.windows.net/images

The reason to do this is, we are going to route all calls within this API to the same URL (its just the way the REST API for storage account works).

Click Create and your API will be created and added to the display column.

Now, the trick with this is, the processing we need to do to decorate the incoming request so it can communicate with Azure Storage accounts is the same for all of the endpoints. So, rather than duplicating the processing, we can select All Operations and then enter the code view for Inbound processing and use the following policy definitions to ensure all operations are affected.

This is LITERALLY all we need to provide a “codeless” image store and retrieval system. This will access our blob storage with only the access needed. We dont need to store keys or any sensitive data anywhere.

All we have left to do is create our operations. You will need two:

  • PUT /{id}
  • GET /{id}

That is it. You can now upload and download images from Blob storage.

Testing things out

API Management provides a very sophisticated and useful test harness. You can use this to test your PUT endpoint, you should receive a 201 Created if things are working. DO NOT attempt the GET endpoint with the test harness, it doesnt seem to like binary data coming back (assuming you upload an image and not a text file).

To test the GET endpoint, you will need to create a Subscription and use the subscription key to test the endpoint in either Postman or a browser; I recommend a browser. Here is how I did it:

  1. Access the Subscriptions link under General
  2. This page contains a list of ALL subscriptions that your API currently has assigned to it. You can use the Add Subscription option to create a One-Off subscription to an API
  3. Create a subscription to the API – I called mine images
  4. Open a browser and access your GET endpoint setting the value of the new Subscription Key to the subscription-key query string parameter
  5. Upon browsing you should receive the item you just uploaded

Congrats, everything is working if this step worked.

Going Further

This really is only scratching the surface, when you start to involve things like BlobTrigger in Azure Functions or BlobCreated events registered with an Azure Event Grid you can just start to see the amount functionality you can get with a minimal amount of code. You can refer to my Event Pipeline series here where I used Azure Function triggers to create a real time data processing pipeline that probably had about 40 lines of code total – and I am working to reduce that even further.

I really believe it is beneficial to look at these types of approaches because they have the potential to really leverage the cloud and execute complex functionality with minimal code.

Building an Real Time Event Processor – Part 3

See Part 2 here

Up to this point we have not really stored any data for our real time application, outside of the intrinsic storage we get with Event Hub and the temporary storage that occurs with a Service Bus queue. Now we get into storing data and alerting users to its presence so they can update.

Understanding the nature of stored data in real time data applications

It is a bit of an oxymoron but, there is no such thing as real time data. What we really mean for term is “continuously producing data within intervals” to give the appearance of real time. Another aspect to this is that our data is often “pushed” to clients so that visualizations can be appropriately updated.

In this sense, it is best to approach this as you are storing a lot of data and on top of that data you are creating aggregations which are stored and used for updates. It is not tenable for front end systems to constantly query an ever larger set of data, it simply gets slow. We need to localize our efforts and use cumulative or window based approaches.

As an example, if your organization wanted to know how much product had been sold for a day. You would keep a cumulative total using an aggregating window rather than query a larger data set for the answer. The later approach would work but would slow down with time. You still want to store the raw data as there are benefits to using large data sets to see trends but, in the moment, you only need a very small piece of the data.

Now that we understand that, let’s talk about how this application deals with its data.

Save our aggregate data

So, when the Stream Analytics function runs its query it selects the data INTO an output. In this case, the result is set to an Azure Function that creates a Document in CosmosDB with the data.

For a Function App, Stream Analytics calls the Azure function via Http thus, this is set as a HttpTrigger using the POST method and expecting the data in the body.

Since the Stream Analytics job expects a successful HttpResponse we must reserve the output for this purpose, we cannot return the new CosmosDb document. Instead, we set an Output binding which allows us to create the document as an out parameter.

We could choose to use a POCO (Plain Old C# Object) to represent what is going into the Document but, given the simplicity of this example it is not necessary.

The result here is as the function receives data from Stream Analytics it is automatically written to CosmosDB. This will also invoke the Change Feed which our next function will listen for. But, before we talk about that I want to ask the question of whether using CosmosDB for this is correct.

Choosing the right database

The short answer is kind of, but not really. I often advise clients to stay away from CosmosDB Core SQL offering due to the pricing. You have to remember that CosmosDb is aimed at high volume and multi-region scenarios. You should only use it if you have such a requirement. The one exception is table storage which is a Cosmos branded service replacing the old Azure Table Storage that predates Cosmos.

In short,  due to its price and target use cases, for most organizations, it does not make a lot of sense. I decided to use it here because it is aimed for high availability so it can take in data much faster than a traditional RDBMS – plus the data being generated is nearly guaranteed to be consistent due to what it is.

The other thing to consider, especially with Real Time applications is whether there is a need to store aggregated data from a long time ago. Remember, we do still store the raw data in Azure Blob storage (which is much cheaper than Cosmos). We have to ask the question, does knowing the aggregate for 1 week ago really provide business value? Would it be cheaper to accept the cost of querying it from our historical records. The answer here is usually that it is better to query.

The point here is, real time aggregations like this often work best with a time series database, such as Redis which can be configured to prune old data automatically. In fact, Redis would be the go to for this sort of operation in a production scenario. But, as of this writing, there is NO trigger and bindings for Redis service so, I would not have been able to stay to my goal of writing no plumbing logic for this exercise. If this were a production situation, Redis would be highly considered far more so than Cosmos.

I also think it means I need to write such a binding for other to use.

Notify Users

Modern NoSQL databases, almost without exception, expose a feed which indicates when a change occurred to the data. This is fundamental for building event streams around database which are the building block for real time applications as well as Event Driven Architectures, Event Sourced Architectures, and other distributed data systems.

You can think of this, in some ways, as the modern version of the Trigger concept from RDBMS except, rather than living in the database, these triggers are at the application level and can be used to tie components together in a decoupled fashion.

Here is our code which listens to the CosmosDB change feed and sends the new document created to users via the Azure SignalR service:

The use of the CosmosDBTrigger is what hooks into the change feed for the target database and collection where our data is going to be added. Additionally, CosmosDB requires a lease collection which tracks who is consuming the change feed for that document.

Our next parameter allow us to send documents to the Azure SignalR service for a particular Hub. I wont go into explaining SignalR and what Hubs are you can learn more about that here: https://docs.microsoft.com/en-us/aspnet/signalr/overview/getting-started/introduction-to-signalr#connections-and-hubs

Our binding looks for the AzureSignalRConnectionString to know which SignalR service to connect to. Since this is the Azure SignalR service is it imperative that our service is configured for serverless mode under Settings. Update the CORS setting for SignalR service to be * or whatever is appropriate. Not doing this will cause problems when we attempt to connect.

Above, you notice that we defined a negotiate function with a Output binding of type SignalRConnectionInfo. This is the method that will be initially contacted by our clients to get our information, it beats having to store the access key somewhere where its widely available – using negotiate the key is downloaded and used internally to the client (though with Developer Tools you can still pick it out). It is the serverless setting in the SignalR service which drives this behavior.

Our client looks like this:

Note the .withUrl call that specifies the base URL to our Azure Function. The first call made will be this URL appended with /negotiate. After that call completes, SignalR will attempt to open the socket.

Remember to configure CORS in your Azure Function app as well – its under Platform Features.

The final bit to call out is the newFirstLetterData event. SignalR works by “calling” a JavaScript function from the backend (or that is how it makes it seem). In our code we add instances of SignalRMessage to an instance of IAsyncCollector – one of the parameters TargetName is set to this value indicating what method should be called when that message lands on a client.

To ensure I did not miss anything with this step (it took me the longest to figure out) here is a tutorial that covers this topic: https://docs.microsoft.com/en-us/azure/azure-signalr/signalr-quickstart-azure-functions-javascript

Up Next

So that completes the overall pipeline technically. My earnest hope is your is working as well as mine. For our final discussion I want to discuss areas we can improve and what the key takeaways from this exercise were.

Building an Real Time Event Processor – Part 2

Part 1: Building a Real Time Event Processor – Part 1

Welcome to Part 2 of this series on building a real time pipeline using Azure Services and Azure Functions. In the first part, we talked through the services we will use and laid out our goals for this project. Chief among them was to ensure we leveraged Azure Function bindings to their fullest to eliminate the need to write boilerplate connect code for various Azure services. By doing this we allow the underlying service to handle this for us.

At the end of Part 1 we had create our Timer function which was responsible for routinely running and sending data into our pipeline by way of a Service Bus Queue. Now, we will write the code to dequeue our data and send it to Azure Blob Storage and an Azure Event Hub.

Dequeue Our Data

The data we stored in the queue takes the following format:

Remember, when messages are stored in Service Bus they are stored as UTF8 byte arrays to reduce space. The trigger will convert this to a string or if the destination type is a complex object, it will attempt to deserialize it to that type. Here is the code for our function:

In my opinion, when you see code like this you are using Azure Functions to their fullest extend. Our function consists of a loop and that is it – and truthfully we could probably use some LinqFu to remove even that.

What this method does is a few things:

  • First, the trigger for the Service Bus upconverts the byte array into a standard string. But because the destination type is not a primitive (NamesGenerationRecord) the trigger will attempt to deserialize the string into that object.
  • Next, we define an Output binding for an Event Hub which forces the destination type of IAsyncCollector<T> to capture event string that need to be set to our Event Hub. And because it would not make sense to send only one event, it is a collection which will send asynchronously as we add additional messages
  • Finally, we define the return as going to a Blob. This return will write our raw Json contents to a Blob entry with the {id} name. More on this is a second as its really neat but not well documented.

We get all of that functionality with mostly attribution. That is what I call super cool. Before we move on, let’s talk about the return of this function and how it works.

Understanding Bindings

So, what gets stored in the Service Bus is a JSON string that looks like this:

When the ServiceBusTrigger delivers this string to the function it notes that its not delivering a string but rather an instance of NamesGenerationRecord which has this definition:

Crucial here is that we create a property called Id. It is this property value that will be bound to the {id} in our return template:

[return: Blob(“raw-names/{id}.txt”, FileAccess.Write, Connection = “AzureWebJobsStorage”)]

Initially, I thought this would view the string as JSON and create a JObject from it and look for an JToken with the name id but, when I tried that I received errors around the runtime being unable to find the id field to bind to. Using a custom object solved this problem.

I found this very interesting and it gives enormous possibilities. Given the way Microsoft tends to operate its likely the many of the functions support this for both Output and Return bindings.

The end result here is for each item dequeued from our Service Bus Queue we will create a blob in the raw-names container with a name of a Guid ensuring a high degree of uniqueness.

Preparing the Event Hub

Simply storing this data is not all that interesting although this is a great way to build a Data Lake that can serve well for running queries of a historical nature. The value we want is to analyze our data in real time.  There are services that could handle this, with vary degrees of scalability and success:

  • Azure Event Hub
  • Azure Event Grid
  • Azure Service Bus
  • Azure Blob Storage

Some of these options may not make sense to you and that is fine. I said they COULD work for this scenario but, its unlikely using something like the change feed of an Azure Blob Storage would make a lot of sense but it could work.

Among these options, the BEST service, in my view, for our scenario is Azure Event Hub due to being cost effective and highly scalable with minimal latency. It also enforces ordering within partitions and can hold vast amounts of data for days at a time.

Further, it also hooks in well with Azure Stream Analytics Jobs which are vital to real time processing pipelines.

To setup and Event Hub, follow the instructions here: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create

Here is a visual of what the communication between the Azure Function and Event Hub looks like – I am assuming 5 names per queue message which translates to 5 events sent to Event Hub.

Queueing

For this visualization I created a view inside the Event Hub as messages are being sent along. In this situation EACH execution of the Dequeue Processor Function will send 5 envelopes to Event Hub. These will get processed in order as they all live within the same partition.

Real Time Aggregation with Stream Analytics Job

I cannot count how many times I have come across code bases where developers have chosen to use an Azure Function or some other code based implementation to implement aggregation. While this works it is far from ideal. The Stream Analytics Jobs are the recommended approach both from the standpoint of scale and ease of use – better to write a SQL query to perform aggregation using many machines than attempt to rely on the Azure Function runtime.

Here is an example of the query I used for this exercise:

This query executes in three phases.

First, it reads from our Input which in this case is NamesData an alias for the Event Hub which our Azure Function was pouring data into in the previous section.  This looks at each name entry and grabs the first character (base 1) from each Name in the data set. This subquery results are named FirstLetters.

Next, the subquery results are used as the source for another subquery, LetterCounts. This query takes the first data and counts how many times each letter appears in the set. Key to this query is not just the grouping logic but the TumblingWindow. Let me explain.

Remember that you are not querying against a standard database of data with these queries. Your data set is consistently moving, considerably so in high volume cases. For this reason, you MUST restrict what data your query considers using windows. Stream Analytics supports 4 of these windows:

  • Tumbling
  • Session
  • Hopping
  • Sliding

Depending on how fluid you want your data to be generated you would pick the appropriate window. They are described here: https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions.

For our example, we use Tumbling which segments the event stream into a distinct windows and ensures events do not cross into other segments. In the case of the query above, we get the events broken into 30s intervals with each event belonging to one and only one segment. There are use cases for wanting events to belong to multiple segments, as is the case with Sliding Windows.

The final bit of this query select the results of LetterCounts into the Output SaveGroupdData which fronts an Azure Function to take our aggregated data to the next stage.

Up Next

In this post, we continued to lessons of Part 1 and dequeue our raw data, parsed it and sent the raw events into our Event Hub. Within the Event Hub our data is stored (temporarily for x days) and it can be read by various processes in real time. One of the best ways to perform this reading is using Azure Stream Analytics jobs which enable SQL like querying of data in the stream using windows to execute data segmentation. The result was a sequence of writes to our output Azure Function (we will discuss in Part 3) with the aggregated data results from the query.

In the next segment we will store our Aggregate results and notify connected clients who may update their UIs display our real time data.

One Note

One of the most popular outputs for Stream Analytics jobs is Power BI. This tool can take in the data and build these auto updating charts to represent data within the stream. I was not able to use Power BI for this but it is a common output for use cases like this.

Building an Real Time Event Processor – Part 1

So, I decided recently to undertake a challenge to build an end to end real time processing pipeline using Azure Functions. They key here would be to only use Outbinding bindings, triggers, and return bindings to communicate with other Azure services. In this way, it lessens the amount of code needed and reduces overall chances for bugs.

In addition, it gives insight into potential ways of building real time data processing clients, a scenario becoming increasingly common in business. Further, they are a challenge and require intense communication and collaboration with various persons to ensure the data is partitioned and chunked appropriately. Throw in the cloud dimension where our setup needs to be cost effective as well as functional and its quite the challenge.

Let’s get started.

Understanding Bindings

One of the most useful feature in Azure Functions is bindings whereby we can encapsulate functionality that would normally necessitate boilerplate code for a simple attribute on a parameter. This allows the function to communicate with many things and the management of that communication to be handled by Azure.

Perhaps the most popular and well known binding is HttpTrigger whereby a Azure Function is triggered when a Http request is made that matches a specific pattern. This trigger outputs itself as HttpRequest instance which contains all of the information about the incoming request. Other triggers function in similar ways – I even have a post about writing a custom binding.

Regardless, when using Azure Functions you should aim to heavily use bindings as much as possible. It will save you from writing bad or incorrect service connection code.

Enough of that, let’s talk shop.

The Architecture of our Pipeline

Selection of services is critical when designing a real time pipeline, each case will be different and have different limitations. Recently, I helped a client understand the difference between Azure IoT Hubs and Event Hubs and what scenario is geared for. In that case explaining that IoT hubs allow for an enormous amount of concurrent connections (sensible given the use case) where Event Hubs allow a fair number but much less than IoT. This limitation often informs other architecture decisions you would make.

For our sample, I choose the following services:

  • Azure Functions – should not come as a surprise. They are the backbone and connectors of the event pipeline. As a rule of thumb, which building an Azure Function be mindful of its responsibilities. They are not designed to be large and there are certainly better services for performing heavy amounts of processing
  • Event Hub – Event Hubs are designed to handle large amounts of data and maintain order within partitions and minimal latency. It is able to send its incoming data blocks to multiple services for processing. If Capture is enabled, Event Hub can store each event it receives in an associated blob storage. The blob storage could serve as a simple Data Lake over time
  • Stream Analytics Job – This service enables queries to be run over the data in Event Hub. It is commonly used to aggregate windows of data and send results to other services for storage and further processing. These windows exists because, in a real time application, the data set is never fixed.
  • CosmosDb (Core) – CosmosDB is Microsoft’s global NoSQL database solution used to support various providers for global and multi-region scenarios. Due to it being NoSQL it favors the AP portion of the CAP theorem (https://en.wikipedia.org/wiki/CAP_theorem) and this is well suited to a situation where it will receive data frequently and in a controlled manner where consistency is not a major concern
  • Azure SignalR Service – SignalR enables transmission of new aggregate data sets from Stream Analytics to connected users. This could power charts or graphs in real time to allow for more visibility of per minute operations

Now that we know the services here is a visual of how they are connected:

EventFlow

A few things of note that we take advantage of in this pipeline:

  • CosmosDb supports change feeds which an Azure Function can monitor and trigger itself when a change is detected. This enables the so called streaming architecture that underpins this sort of pipeline. It lets the entire system respond when something in data changes
  • At the Dequeue stage we use a combination of return and output binding to send our data to two places. Further, we use a binding to parameterize out Blob return so we can set the name of the blob created (we will explore in code later in this series). The use of Output bindings with Azure functions enables the function to initiate more than one service as an output OR, in the case of Event Hub, notify the service multiple times through the course of one iteratiom

Now that we understand our flow, let’s look at our first Azure Function – a Timer Trigger.

Phase 1: Generate our Data

Data plays a huge role in a real time processing pipeline. In my smaller example I have service which generates random Western style names. My goal is to capture these names and send them to the queue. The size of this data can vary since I may get back 4 names or 10. Here is the code for the function:

This code uses a ServiceBus return string – this will cause whatever string we return from this function to be added to a Service Bus queue named newnames-queue in its raw JSON form (this is why we use the Linq to Json sytle from Json.Net library). The binding will take care of encoding the string into a byte array to minimize transport and storage size while it resides in the queue.

In the attribution for this function, you can see we specify the queue name as well as the connection string (without the Entity Path) to the Service Bus. The Configuration setting ServiceBusConnection is set for the Azure Function app and the attribute will handle resolving it.

Up next

This brings us to the end of Part 1 of this series. Our next part will show how we dequeue this message in a way that maintain strong typing and send it to both blob storage and Event Hub in one fell swoop.

Part 2

Part 3

 

Getting Started with Kafka and .NET Core on Kubernetes

Real time streaming is at the hard of  many modern business critical systems. The ability for data to be constantly streamed can give organizations new insights into their data and the real time nature means these trends can be seen in real time, creating possible value streams for organizations.

It being so popular there are many options available from cloud hosted to self hosted. One of the big ones is Apache Kafka which enables this sort of “pubsub on steroids” that enables it to scale its data injest and streaming capabilities to fit the needs of almost any organization. In this post, I want to walk through a basic setup of Apache Kafka using Bitnami’s Helm Chart.

Prepare your cluster

Truth be told, I did this using an Azure Kubernetes Service cluster that I spin up on occasion that has three large VMs backing it. I have found using things like Kubernetes for Docker, minikube, and others that you run into resource limitations that make it hard to deploy. For that reason, either give your local cluster an immense amount of resources or using a cloud managed one. I recommend Azure simply because by default the AKS cluster is backed by a scale set that you can Deallocate as needed – saves immensely on cost

Bitnami Helm Chart

I love Helm because it enables quick deployment of supporting software that would otherwise take a lot of reading and learning and tweaking. Instead, you can use Helm to execute this: https://github.com/bitnami/charts/tree/master/bitnami/kafka

The instructions above are actually written to target Helm2, latest is Helm3. For the most part its similar enough, though I love the removal of Tiller in Helm3. Syntax of the command is a little different – here is what I used:

helm install kafka-release –namespace kube-kafka bitnami/kafka

This creates a release (an instance of deployment in Helm terminology) called kafka-release places the deployed components in a Kubernetes namespace called kube-kafka and deploys resources based on the bitnami/kafka Helm chart – if you look at the above link there are many ways to override how things are deployed via this chart

After running the command, Helm will start deploying resources which then have to spin up. In addition, it will layout some instructions for how you can play with the Kafka cluster on your own, using a temporary pod. But before you can do this, the deployment must enter the Ready state. Best to run the following command to know when things are ready:

kubectl get pods -n kube-kafka –watch

This will watch the Pods (which are the things that have to spin up due to container creation) and when both are ready you can safely assume Kafka is ready.

Kafka oversimplified

So, I am still learning the architecture internally of Kafka. I know that it is basically a message broker system that enables a multicast eventing to various topics that can have multiple subscribers. But I honestly need to do more to learn its internals – suffice to say for this exercise, you send a message to a topic in Kafka and all listeners for that topic will receive the message.

The term Producer and Consumer will be used throughout the remainder of this post. Producer sends data to the cluster nodes, and Consumers receive that data.

Create the Producer

Our producer will be rudimentary and over simplified but just to get the idea of the sort of structure these types of applications take. Remember, this is not Production level code so, copy and paste at your own risk.

So to start you need to add the Confluent.Kafka NuGet package (current version as of this writing is 1.4.0).

Next, create a config type and set the BootstrapServers – this is the server your code will contact to setup the message broker and send messages to based on where that broker ends up (not sure how all of that works yet). Suffice to say, when you finished running your Helm install this is the Service DNS name you are given – it follows the standard convention used by Kubernetes to name services.

For our example we cycle over all of the int values available to .NET (all the way up to int.MaxValue) so we can keep our producer going for a long time, if need be. For each iteration our code simply writes a message to the broker indicating the current iteration number.

We use the ProduceAsync method to send this message to Kafka – we use a try/catch here to catch any sending errors. Everything is written out to STDOUT via Console.WriteLine.

One of the key arguments to ProduceAsync is the name of the topic to associate our message to. This is what our consumers will listen to so a single message sent to this topic can be fanned out to as many consumers as are listening. This is the power of this type of architecture as it allows for event based processing with a high degree of decoupling. This allows different parts of our application to simply respond to the event rather than being part of a longer chain of functions.

Build the Consumer

As with the Producer, the first step here is to add the Confluent.Kafka NuGet package

As with the Producer, our first step is to add the Confluent.Kafka Nuget package so we can use the built-in types to communicate with Kafka.

You can see with the Consumer that we subscribe to our topic (increment-topic in this case). I was a bit surprised this was using the built in .NET eventing model since I think that would make more sense. Instead, we have to create a busy loop that attempts to consume each time and checks if it gets anything.

From there we just bring the message we received.  You notice that our BootstrapServers value is the same as it was in the Producer, should be – we are writing and reading from the same Broker.

The GroupId, I do not know what this is – need to read up on it but I set it all the same.

Testing our Example

Our goal is to run against Kafka hosted in Kubernetes and thus we will want both our Producer and Consumer there as well. You could use kubectl port-forward to expose the Kafka port locally but, I didnt have much luck with that as the code would immediately try to call a Kubernetes generated service name which was not exposed. I might tinker with this some more. My main goal with this exercise is to see this working in Kubernetes.

The best way to do this, or at least the simplest is to create a vanilla pod with the consumer and producer executing as separate containers. Truthfully, you would never want to use a vanilla PodSpec in Production (usually you want it in a Deployment or ReplicaSet) since doing so would not maintain high availability and resiliency (if the Pod dies, everything stops, it doesnt get recreated). Nevertheless, for this simple example I will be creating a multi-container pod – we can check the log files to ensure things are working.

Here is the PodSpec:

I will assume that you know how to build and push Docker images to a repository like Docker Hub (or Azure Container Registry in this case).

Next, we apply this spec file to our Kubernetes cluster

kubectl apply -f podspec.yaml

And we can use the following to wait until our pod is running (we should see 2/2 when ready)

kubectl get pods -n kube-kafka

Next let’s get a sample of the logs from our Producer to ensure things are connecting our messages are going out. We can run the following command (using the PodName from the PodSpec above):

kubectl logs kafka-example -n kube-kafka producer

If things are working you should see a lot of message indicating a message send. If you see errors, double check that you typed everything right and that, in particular, your BootstrapServer value is correct (you can use kubectl to query the services in the deployed namespace to ensure you have the right name)

Assuming that is working, we can perform a similar command to see the logs for the Consumer:

kubectl logs kafka-example -n kube-kafka consumer

Once again, if all things are working you should see messages being read and displayed in the console.

Congratulations!! You have just completed your first end to end Kafka example (maybe its not your first but congrats all the same).

Why is this important?

I talked about real time applications at the onset – these are comprised, generally, of event streaming whereby as data enters the system it causes events to be generated (often times the events are the data) which can be sent all over the system. It can create records in a database, updated metric counters, flip switches, and many other things – this is actually the basis behind IoT applications, streaming a tremendous amount of telemetry data and using systems like Kakfa to analyze and process that data in real time.

I love these types of applications because they enable so many unique business scenarios and really cool real time charts. In my next post, I hope to take my Kafka knowledge further and do something more useful.