Using the Management API with .NET Core 3.1

As part of my continuing dig into Auth0 I wanted to see how I could access user information for a user represented by a JWT Access Token issued by Auth0. For the uninitiated, Auth0 login will yield, to front end applications, 2 tokens: id_token and access_token.

The id_token is provided from Auth0 to represent the user who logged into the registered application. The access token is provided as a means to make calls to supported audiences. We talked a little about this in the previous post (Authorization with Auth0 in .NET Core 3.1). Thus, following login, we would keep the id_token stored as a way to identify our user to the application and the access_token to enable access to allowed APIs.

However, if you run each of these tokens through jwt.io you will see a large difference. Notably, the access_token contains NO user information, with the exception of the user_id as the subject (sub). The reason for this is to keep the payload small and to reduce the attack surface. The truth is, there is often no reason for the API being called to know anything about the user outside of permissions and the User_Id – passing around name, email, and other information is most often going to be superfluous and thus needs to be avoided.

The question is though, if we do need the user’s information, how can we get it? That is what I want to explore in this entry.

How should we view Auth0?

First and foremost, I want to point out that Auth0 is more than just a way to validate and generate tokens. Honestly, it should almost be viewed as a UserService as a Service. It replaces the service that would traditionally handle user related responsibilities. It even has the capability to integrate with existing accounts and data store mechanisms. As I have continued to use it, I have found that using it for login and user management is a speed boost to development and alleviates my teams from having to build the necessary UI that goes with such a system.

How do we get the UserId from our Token in .NET Core?

Honestly, I was disappointed here – I would have hoped .NET Core was smart enough to map certain well known claims to ClaimsIdentity properties so you could easily user the this.User property that exists on ControllerBase. Maybe I missed something but, given the popularity of JWT and token style logins, I would this this would be more readily available. This is how I ended up pulling out the UserId:

((ClaimsIdentity)User.Identity).Claims.First(x => x.Type.EndsWith(“nameidentifier”)).Value;
Auth0 sub claim gets mapped to the nameidentifier claim (endswith is used since its a namespaced type). I find this rather messy – I would expect common claims to be mapped to properties automatically or with some configuration – I was not able to find either.

Getting the User Information

Auth0 offers a standard API to all users known as the Management API. This is registered to your account with its own ClientId and Secret. When an access_token is provided we can use it, with a tenant level endpoint, to get an access_token to the Management API. Using this we can information about users, the tenant, just about anything.

First, you need to contact your tenant level OAuth endpoint and acquire an access_token that can be used to access the Management API. This call is relatively simple and is shown below – I am using the standard HttpClient class from System.Net.Http.

The client_id and client_secret are from the M2M application created for your API (under Applications) – hint, if you do not see this, access the API in question (WeatherApi for me) and select the API Explorer, this will auto create the M2M application with the client secret and id that are needed for this exercise.

Selection_011

The end result of this call is a JSON payload that contains, among other things, an access_token – it is this token we can use to access the Management API.

The best way to access the Management API with .NET is to use the supplied NuGet packages from Auth0:

  • Auth0.Core
  • Auth0.ManagementApi

The rest is very easy, we simply new up an instance of ManagementApiClient and call the appropriate method. Here is the complete code sample:

Really pretty straightforward – again my only complaint was getting the UserId (sub) out of the claims in .NET, I would expect it to be easier.

Final Thoughts

Centrally, I am starting to see Auth0 more as a service for user management than a simple vendor. If you think about it, having user data in a separate service does make things more secure since if you DB is breached the passwords are safe as they are physically not there. Further, you can take advantage, to a point, of the scale offered by Auth0 and the ability for it to integrate with services like Salesforce, Azure AD, Facebook, and other user info sources.

I am personally looking forward to using Auth0 extensively for my next project.

Authorization with Auth0 in .NET Core 3.1

Auth0 (https://auth0.com) remains one of the leaders in handling authentication and user management for sites. While it may seem odd to some to offload such a critical aspect of your application to a third party, the truth is, its not as far fetched as you think. Consider the popularity of touch to pay systems like Samsung Pay, Google Pay, and Apple Pay. Each of these (along with others) use a one time token exchange to allow payment. This enables payment information to be collected elsewhere lessening the impact in the event of a breach.

For this article, I wont dive to deeply into the intricacies of Auth0, its a very wide platform with a lot of functionality, a lot of which I am not an expert on. For my goal here, I wanted to show how I could use a Google and Username/Password login to access a service requiring a token and FURTHER how I could surface the permissions defined in Auth0 to facilitate authorization within our API (which uses .NET Core 3.1).

Authorization vs Authentication: Know the Difference

One of the key things that need to be understood when using Auth0 (or any identity provider) is the difference between Authentication and Authorization, in this case how it relates to APIs.

When we talk about Authentication we talk about someone have access to a system in a general sense. Usually, for an API, this is distinguished by the request having a token (JWT or otherwise) that is not expired and valid for the system. If this is not passed, or that which is passed is invalid the API shall return a 401 Unauthorized.

When we talk about Authorization we talk about whether someone CAN call an API endpoint. This is delineated through claims that are registered with the token. Information in these claims can be roles, permissions, or other data. When the given token does NOT supply the appropriate claims or data to access an endpoint the API shall return a 403 Forbidden.

Now that we understand that, let’s move on.

Establish a check for Authentication

There are multiple ways to go about this each with varying degrees of appropriateness. In my previous blog entry I did this through Kong (https://jfarrell.net/2020/06/11/kong-jwt-and-auth0/) and I will show it here in .NET Core as well.  What is the difference? It comes down to what you are trying to do.

With Kong JWT plugin you can ideally require authentication across MANY APIs that are being fronted by the Kong Gateway. In general, its very common when you have microservices that requires a token to place them behind such a gateway that enforces the token being present – this way you do not need to configure and maintain the middleware for such an operation across multiple APIs that may be in different languages and even managed by disparate teams (or external teams).

.NET Core (as with other API frameworks) supports this for the API itself either via a custom connection or an identity provider, such as Auth0.

To enable this feature you need to add two NuGet packages:

  • System.IdentityModel.Tokens.Jwt
  • Microsoft.AspNetCore.Authentication.JwtBearer

The middleware is configured in the Startup.cs file via the IServiceCollection::UseAuthentication extension method. The following code does this and instructs the service to contact Auth0 to validates the tokens authenticity.

The first thing to notice here is the Auth0:Domain value which is the full URL of your Auth0 tenant (mine is farrellsoft). This domain informs the underlying mechanisms where to look for the OAuth endpoints.

The second thing is Auth0:Audience and this is more specific to the OAuth flow. The audience is, in this context, a validation parameter. That is to say, we want to validate that the token being received can access our API (which is the audience). This will map to some unique value that identifies your API. In my case, this is https://weatherforecast.com. I do not own that URL, it is used as a way to identify this API.

When we authenticate to our frontend application, we specify what audience we want. Assuming the frontend application has access to that audience, we will receive a token that we can pass with API calls.

oauth1

This is another reason to look into a managed identity provider even beyond security. In high volume systems, the User API is constantly under load and can be a bottleneck if not thought through properly. You can imagine an architecture with MANY microservices, each needing to validate the token and get user information relevant to their task. Netflix has entire talks about this principle – with particular emphasis on the contribution of such services to cascade failures.

The final step is to enforce the presence of this token is the Authorize attribute on either the controller or specific endpoint method. Once you add this, not passing a valid JWT token will cause the middleware to return a 401 Unauthorized, as desired.

(Note: I am using the default controller implementation that comes out of the box with a new WebApi in .NET Core 3.1)

If you were to now, contact https://localhost:5001/weatherforecast in Postman (assuming you have SSL verification turned off in the settings) you will get a 401 (assuming you do not pass a token).

To generate a token that is valid, the easiest way is to create an SPA application in your Auth0 tenant and deploy a Quickstart (I recommend the Angular variant). Remember to have the the Web Console -> Network tab open as you login to this sample application – you can extract your token from the token endpoint call.

An additional bit of information is the site jwt.io which, given a token, can you show you all of the information contained within. Do not worry, nothing sensitive is exposed by default, just ALWAYS be mindful of what claims and properties you add to the token since they can be viewed here.

Establish a check for Authorization

While the authentication piece is commonly tied to a backend validation mechanism, authorization is commonly not the case, at least with JWT token. The reason is, we do not want to incur additional round trips if we can, safely, store that data in the token and have it decrypted.

This is an important aspect of this process because Authorization is ALWAYS the responsibility of the specific backend. There are many ways to accomplish it, but for this we are going to use .NET Core Authorization Requirement framework. This will allow us to inspect the valid token and indicate if certain requirements have been fulfilled. Based on this, we can create a policy for the user identified. Based on this policy the endpoint can be invoked or it cannot.

For this to work with Auth0 we need to ensure we create permissions and roles in the portal, enable RBAC and indicate we want assigned permissions for a specific audience (API) to be returned in the token.

First, we will need to create permissions:
Selection_008

Here I have created two permissions for my WeatherApiread:weather and write:weather

Next, we need to assign these permissions to users. Note, this can also be done by assigning permissions to specific roles and assigning that role to that user. Roles are groups of permissions. This is all done in the Users and Roles section.
Selection_009

Here you can see the permissions we added can be assigned to this user.

Next, we need to enable RBAC and toggle on for Auth0 to send assigned permissions down with the user. This is done from within the API configuration under RBAC (Role Based Access Control) Settings.
Selection_010

Finally, we have to modify the way we get our token to be specific to our API (https://weatherforecast.com). That is the trick you see, the token is SPECIFIC to a particular API – this is what ensure that permissions with the same name do not enter into the same token.

Now, on this you may wonder, well how do I handle it with multiple services that have different names and permissions? To that I will say, there are ways but they get at the heart of what makes designing effective microservice architectures so challenging and are not appropriate to delve into in this entry.

Assuming you are using the Angular Quickstart you only need to ensure that, when the Login request is made, the audience is provided. As of the Quickstart I downloaded today (07/04/2020):

  • Open the file at <root>/src/app/auth/auth.service.ts
  • Insert a new line after Line 18 with the contents: audience: <your API identifier>

Refresh your session either by revoking the user’s access in the Auth0 portal (under Authorized Applications) or simply by logging out. Log back in and recopy the token. Head over to jwt.io and run the token through – if everything is good, you will now see a permissions block in the decoded response.

You can now user this token in Postman or whatever to access the API and implement Authorization.

Building Authorization in .NET Core 3.1 WebApi

Now that the access token is coming into our backend we can analyze it for the qualities needed to “check” an authorization. In .NET Core this is handled via implementations of IAuthorizationRequirement and AuthorizationHandler<T> which work together to check the token for properties and validate fulfillment of policies.

We will start with implementing IAuthorizationRequirement – this class represents a desired requirement that we want to fulfill. In general it contains bits of information that the related handler will use to determine whether the requirement is fulfilled. Here is a sample of this Handler and Requirement working together:

Here, the token is passed to our IsWeathermanAuthorizationHandler where it looks for the permission as indicated by the requirement. If it finds it, it marks it as fulfilled. You can see the potential here for more sophisticated logic aimed at validating a requirement as fulfilled.

The final piece is the definition of a policy. A policy is a composition of requirements that need to be fulfilled to grant the user with the specific policy (by default ALL must be fulfilled but overloads can enable other functionality). In our example, we have created the IsWeatherman policy as such:

Notice the AuthorizationHandler is added via its interface as a singleton. Obviously use the instantiation strategy that makes the most sense. For our IsWeatherman policy to be fulfilled the single requirement (IsWeathermanAuthorizationRequirement) must be marked successful. This then allows the use of this policy via the Authorize attribute:

Pretty neat eh? Now, you can simply decorate the controller class or action method and forgo any logic in the method. If the policy is not given to the user a 403 Forbidden will be returned.

What about multiple policies? As of right now, based on what I can find, this is NOT supported. But given the flexibility of this implementation, it would not be challenging to create composite policies and go about it that way.

To validate this functionality, our user from previously should have the permission and, as such, should be able to access your endpoint. To further verify, create a new user in Auth0 with no permissions, sign in, and use the token to access the endpoint, you should get a 403 Forbidden.

Congrats on getting this to work. If you have questions please leave a comment I will do my best to answer.

Kong JWT and Auth0

Continuing my experiments with Kong Gateway (https://konghq.com/) I decided to take on a more complex and more valuable challenge – create an API which used Auth0 (http://auth0.com) to drive JWT authentication. I have done this before in both Azure API Management and API Gateway so I figured it would, generally, be pretty straight forward: I was wrong.

What I have come to discover is, the Kong documentation is written very deliberately for their traditional approach leveraging the Admin API which has admins sending JSON payloads against the API to create objects or using their custom types in a YAML file that can be provided at startup to the Gateway. My aim was to approach this for a Cloud Native mindset and really leverage Kubernetes and appropriate CRDs to make this work.

To say the documentation does not cover this approach is a gross understatement. There are examples, trivial and over simplified ones, but nothing substantive. No spec examples, no literature on common scenarios. This made things impressively difficult. Were it not for the help of their team on Twitter and in the forums I cannot say for certain if I would have been able to figure this out. Thus, it is hard for me to recommend Kong to anyone who plans to run in Kubernetes since their existing methods hardly following the Cloud Native mindset.

But, without further adieu let’s talk about how I eventually got this to work.

Planning

Our aim will a simple API with one endpoint which will require a JWT token and the other will be anonymous. Kong operates as an Ingress controller (install: https://docs.konghq.com/2.0.x/kong-for-kubernetes/install/#helm-chart) and thus relies on the Ingress spec (https://kubernetes.io/docs/concepts/services-networking/ingress/) to route incoming requests to services which then routes to the underlying pods.

Along the way we will leverage a couple custom resources defined within Kong: KongConsumer and KongPlugin.

Authentication will be handled via Auth0 through my Farrellsoft tenant. This provides maximum security and flexibility and keeps any super sensitive details out of our definitions and therefore out of source control.

For this exercise I will make the following assumptions:

  • You have a working understanding of Auth0 and can navigate its portal
  • You have a Kubernetes cluster you have already installed Kong to (install link si above)
  • You have the ability to use Deployments and Services within Kubernetes

Let’s get started

Create a basic Ingress route

Ingress in Kubernetes is often the point that is publicly exposed by the cluster. It serves as the traffic cop, directing incoming requests to services based on matching criteria. Its presence negates the need to spin up greater numbers of load balancers and creating cost overruns. It is built as a plugable component and many vendors have created their own, Kong is such a vendor. Here is a basic route to our API:

Here we use default backend notation (that is a spec with no routes defined) to direct ALL traffic to the weather-service. If we run the following kubectl command we can see our public IP address four the Ingress controller:

kubectl get ing -n weather-api

Here is a shot of what mine looks like (you may need to run it a few times):

ing-get

For this post we will use the IP address to query our controller. You could create a CNAME or A Record for this entry if you wanted something easier to read

If we call our API endpoint in Postman we get back our random weather forecast data as expected. Great, now let’s add authentication

My Url was http://52.154.abc.def/weatherforecast

Let’s require a JWT token

One of the things I like with Kong is the plugin model. Its very similar to how policies are used in Azure API Management. Effectively it can give you a wealth of functionality without having to write custom code and make the separation of responsibility that much cleaner.

For Kong, we must first define a KongPlugin to enable the plugin for use by our Ingress:

Yes, I know. Its weird. You would think that we would configure the plugin using this definition, and you would be wrong. We will get to that later. For now, this is basically just activating the plugin for use. To use  it, we need to update our Ingress as such:

We added the plugins.konghq.com and indicated the name of our JWT plugin which we defined in the previous example.

To test this, make a request to /weatherforecast (or whatever you endpoint address is) and you should now get Unauthorized. Great, this means the plugin is active and working.

Not working? I have a whole section on debugging at the end.

Setup Authentication

Won’t lie this was the trickiest part because it took piecing together examples from bug reports, guessing and, eventually, members of the Kong support team to figure it out. So here we go.

Make sure you have an Auth0 account (mine is https://farrellsoft.auth0.com) and that you grab the public key. This bit of the docs will explain this: https://docs.konghq.com/hub/kong-inc/jwt/#using-the-jwt-plugin-with-auth0/. Be careful only focus on the bit about getting the .pem file and the call to openssl.

Once you perform that you should end up with your tenant specific public key in a file. Dont worry, this is the public key and is thus designed to be shared – Auth0 takes good care of the private key.

Create a secret that looks something like this:

Kong performs the authentication using a KongConsumer which effectively represents the user of an incoming request – in our case we would want all users to be seen as the same (even though the underlying app logic will be different).

Now, with this created all of the pieces should be in place to enable JWT verification. Let’s test it out.

The easiest way is to go to your Auth0 portal and select APIs. Select the API in the box (I believe one gets created if none exist). Once selected, you should see API Explorer as a tab option (follow Create and Authorize if prompted). With this selected you will see a JWT token presented. Copy it.

You will want to use this as a Bearer token. I recommend Postman’s Authorization tab

Selection_004

If everything works, you should get back data.

Debugging

There are a number of ways to debug and troubleshoot this approach. They range from token validation, entity checking, and log files. I used all three to get this working.

JWT Token Validation

JWT tokens are based on a known standard and they can be examined at jwt.io where you can paste a token and see all of the values therein. For our example, Kong JWT keys off the key in our Secret and attempts to match the iss value in the token to a known credential.

You can use jwt.io to inspect the token to ensure the iss is what you expect and what you defined as the key in the secret. BE CAREFUL, the trailing slashes count too.

As a side note, tools like jwt.io are why it can never be understated to be very careful what you put in a JWT token. The values can be seen, and rather easily. Always use values that would mean nothing to another use and only mean something to the consuming application.

Using the Admin API

When running in db-less mode (the default for Kubernetes and the recommended approach) you will not be able to use the API to create Kong resources, other then via the /config endpoint. You can however, perform GET calls against the API and validate that Kong is creating appropriate resources based on Kubernetes resources (I wont cover syncing and how resources get translated from K8s to Kong in this post).

I especially found it useful to query my consumers and their corresponding jwt credentials. I continuously got an error regarding No Creds for ISS which was due to the fact that for a long time I was simply not creating any jwt credentials. You can validate this by calling the Admin API at:

/consumers/<username>/jwt

This will show an empty list if things are not working properly.

For Kubernetes to get at the Kong Admin API the easiest way is port-forward. Given this is really only used for validating in db-less mode having it available to the world is not needed. Here is the kubectl command I used to port forward this:

kubectl port-forward service/kong-gateway-kong-admin -n kong-gate 8080:8444

Then you can do the following to get all consumers:

http://localhost:8080/consumers

Reading the Logs

I found out, much too late, that all syncing operations are logged in the Kong Ingress Controller Pod. This let me discover when I was missing kongCredType (its not mentioned anywhere in the docs) in my Secret.

Remember, when you create resources via kubectl Kong monitors this and creates the appropriate Kong types. This log file will give you a view of any syncing errors that might occur. Here is how I accessed mine:

kubectl logs pod/kong-gateway-kong-7f878d48-tglk2 -n kong-gate -c ingress-controller

Conclusion

So what is my final assessment of Kong. Honestly, I like it and it has some great ideas – I am not aware of any other Ingress providers leveraging a plugin model which undoubtedly gives Kong an incredible amount of potential.

That said, the docs for db-less Kubernetes need A LOT of work and are, by my estimation, incomplete. So, it would be hard to suggest a larger enterprise take this tool on expecting to lean on a support staff for help with an angle that is surely going to be very common.

So, what I would say is, if you are prepared to really have to think to get things to work or you are comfortable using the Kong YAML resources Kong is for you. If you are looking for an Ingress for you enterprise critical application, not yet I would say.

Playing with Kong Gateway

I recently took some time to play with Kong Gateway – which supports a Kubernetes compatible Ingress controller. I find that by playing with the various Ingress controllers I can get a better sense of their abilities and how they can relate to common use cases such as Strangler Pattern and Microservice management.

Setting Up

I am a big fan of Helm Package Manager and I used that to install Kong to my Kubernetes cluster. Kong lays the instructions out nicely: https://docs.konghq.com/2.0.x/kong-for-kubernetes/install/

Here is the sequence of commands I used – note I am using Helm 3

helm install kong-gateway kong/kong --namespace kong-gate
   --set ingressController.installCRDs=false

Running this command with the appropriate configurations will install the Kong components to your Kubernetes cluster in the kong-gate namespace.

Configuration

Kubernetes leverages components like Kong through Ingress resources that identify the path and what class of Ingress to use, this is where you indicate to use Kong. Here is the configuration for the Ingress I arrived at.

Ingress components use annotations to not only define which Ingress type to use but also what configurations can be applied to that Ingress route (as defined by the CRD). In this case, you can see three custom annotations with the konghq identifier. This link lays out the various annotations that are supported: https://github.com/Kong/kubernetes-ingress-controller/blob/master/docs/references/annotations.md

In this case, weather-service is a Kubernetes Service that references the pods that contain the source code. Next, we want to leverage the Kong plugin system to apply rate limiting to this Ingress route.

Plugins

One of the aspects that makes Kong better than the standard Nginx Ingress controller is the bevy of plugins supported which can make common tasks much easier. There is a fully catalog of them here: https://docs.konghq.com/hub/

This part was more difficult because there does not seem to be a lot of documentation around how this works. I ended up stumbling upon a GitHub issue that showed some source that helped me see how this works – here is the plugin configuration code that I arrived at:

For reference to this, here is the “home” screen for the plugin – https://docs.konghq.com/hub/kong-inc/rate-limiting/. From here you can get a sense of the configurations. Most of it is shown as sending CURL commands to the Kong Admin API. But it turns out you can follow the model pretty easily when defining your KongPlugin.

The key connector here is the name (weather-rate-limit) and its use in the annotations of the Ingress route (see above). This is how the Ingress knows which Plugin configuration to use. Also important is the plugin name value pair which defines the name of the plugin being configured. This is the same name as is listed in the Kong plugin catalog.

I used this with the default .NET Core Web API example that returns randomized Forecast data. I was able to successfully send 6 requests in sequence and get a Too Many Requests message on the sixth. My next challenge will be JWT token validation.

Thoughts

Ingress controllers like Kong, and Envoy, Traefik, and others are essential tools when dealing with Kubernetes. Not only can they make dealing with Microservices easier but they can also lend themselves to making the break up of a monolith through the Strangler Pattern easier.

Building an Real Time Event Processor – Part 4

I always believe in reflecting on one’s design and always trying to get a better understanding of the problem. In this sample, there are areas that are rather obvious for improvement.

Do you really need that queue?

In Part 1, I discussed a Timer job I had created which fed data into a Service Bus queue. This data is summarily dequeued in the next job and its data written off to Event Hub (one name at a time) and Blob storage (where the raw JSON can be stored). So I wondered if I actually needed to have that queue.

The answer is yes, here is why – the way the bindings work is it happens not when the method returns or data is written to the out parameter, it happens at the very start. Since our starter here is a Timer there is no incoming data to suggest what the id value of the outgoing blob will be.

So the queue really one serves the purpose to create an input with the id value embedded in it so the binding can work, which is why the blob return works for the Dequeue method but cannot work for the trigger.

What I can do is change the code so it looks like this:

As you an see, we moved the Event Hub code to the Timer function so the purpose of the Dequeue method is literally just to write the blob. I honestly dont like this even though it make more sense. I just dislike having an Azure function that is so simplistic that I feel like that functionality should just exist somewhere else.

What about storing Time Series data?

Initially I thought Redis might make a lot of sense since I cam easily expire old aggregate data – keep in mind, often the historical data for aggregates are not important as they can be derived from raw historical data sources. Further, storing excess data for systems that are high volume adds to cost. Work with your teams and leaders to determine how much historical aggregate data makes the most sense.

Azure does offer a platform know as Time Series Insights which is designed for MASSIVE scale, usually output from an IoT style platform where you are gathering telemetry data from all over the world. Such a thing would dwarf what we are after with this.

Storing data for these systems is always a challenge. In this situation the best solution is to write a Timer Azure function that deletes data from Cosmos to ensure ONLY the data for the time period that makes the most sense is kept. Again, this data can be derived again from historical data sources.

Concluding Thoughts

I was amazed at how easy this process was. If you really focus on Azure Functions being the glue in a distributed process it is amazing how much functionality you can achieved. I think I wrote around 40 lines of code in the backend, and I got so much.

When you start to work on real time or streaming data platforms it really is important to have conversations, discuss your data needs, and try new technologies. The simple truth is, you can pay for the top tier Azure SQL database and likely handle the volume but, your costs will be enormous. Understanding other options and patterns can help select a strategy that not only works but is cost effective.

I hope you have enjoyed this series. Good lucking with Azure and everything else.

Part 1

Part 2

Part 3

 

Using MSI with API Management and Azure Storage

I always enjoy applications where I can achieve complex functionality while writing the least amount of code. I believe that, by doing so, I can reduce the amount of mainteance and application requires as well as reduce the possibility of bugs or other problems because I am relying on functionality that others have tested; in the case of REST APIs for Azure services, I am relying on code that millions of people use every day.

To this end, I have been constantly researching the concept of “codeless” applications. This is not necessarily devoid of code but, limits the amount of custom code that is to be written. Central to this are API Gateway like tools, such as the component found in Azure API Management.

In this blog post, I want to create a set of operation in API Management that allows me to save and retrieve an image placed in a Blob storage account. Using this approach maximizes security and eliminates the need to store Access Keys or other sensitive information.

Provision our Infrastructure and Identities

For this sample you will need to create an Azure API Management instance (20m to deploy) and a Storage Account.

To start, access your API Management service and select the Managed Identities under the Security section. Set the Status to On and click Save.

Once you hit Save Azure will get to work create a new Service Principal with the same name as your API Management instance. You will want to remember this for later steps.

Create Read/Write Role Access for Azure Storage

Open your Azure Storage account and select the Access Policy (IAM) option. We need to add two role assignments to this Storage Account.

Note: for simplicity we are adding this access policy at the account level, which means the Role could talk to any Storage service in the account. To further limit this, create the role at the specific service level.

From the Access Policy (IAM) main page select Add a role assignment. Here on this page we can associate certain users with roles that grant them permissions to the storage account. We are interested in using two roles for our needs:

  • Storage Blob Data Reader
  • Storage Blob Data Contributor

For the role select Storage Blob Data Reader.

Make sure the Assign access to is set to the value which indicates the assignment will be to a service principal.

The Select field should be set to your user. This field supports search so, just type your APIM instance name and select it when it appears.

Click Save and repeat for Storage Blob Data Contributor.

This is all you need to do for Storage account. Make sure to create a Container and make a note of it. You will need it for the next step.

Create the Operations in API Management

To start with we need to create an API that can hold the operations we will call to save and retrieve the images we save.https://gist.github.com/xximjasonxx/273a751fd2011c91ffd06e804eafaaa9

Click APIs under General.

As you create this API fill out the fields as you desire BUT ensure to set the Web Service Url to the base URL of your storage account + container. For example, my storage account is called mystorageaccount123 and the container is called images. Therefore my base URL is:

https://mystorageaccount123.blob.core.windows.net/images

The reason to do this is, we are going to route all calls within this API to the same URL (its just the way the REST API for storage account works).

Click Create and your API will be created and added to the display column.

Now, the trick with this is, the processing we need to do to decorate the incoming request so it can communicate with Azure Storage accounts is the same for all of the endpoints. So, rather than duplicating the processing, we can select All Operations and then enter the code view for Inbound processing and use the following policy definitions to ensure all operations are affected.

This is LITERALLY all we need to provide a “codeless” image store and retrieval system. This will access our blob storage with only the access needed. We dont need to store keys or any sensitive data anywhere.

All we have left to do is create our operations. You will need two:

  • PUT /{id}
  • GET /{id}

That is it. You can now upload and download images from Blob storage.

Testing things out

API Management provides a very sophisticated and useful test harness. You can use this to test your PUT endpoint, you should receive a 201 Created if things are working. DO NOT attempt the GET endpoint with the test harness, it doesnt seem to like binary data coming back (assuming you upload an image and not a text file).

To test the GET endpoint, you will need to create a Subscription and use the subscription key to test the endpoint in either Postman or a browser; I recommend a browser. Here is how I did it:

  1. Access the Subscriptions link under General
  2. This page contains a list of ALL subscriptions that your API currently has assigned to it. You can use the Add Subscription option to create a One-Off subscription to an API
  3. Create a subscription to the API – I called mine images
  4. Open a browser and access your GET endpoint setting the value of the new Subscription Key to the subscription-key query string parameter
  5. Upon browsing you should receive the item you just uploaded

Congrats, everything is working if this step worked.

Going Further

This really is only scratching the surface, when you start to involve things like BlobTrigger in Azure Functions or BlobCreated events registered with an Azure Event Grid you can just start to see the amount functionality you can get with a minimal amount of code. You can refer to my Event Pipeline series here where I used Azure Function triggers to create a real time data processing pipeline that probably had about 40 lines of code total – and I am working to reduce that even further.

I really believe it is beneficial to look at these types of approaches because they have the potential to really leverage the cloud and execute complex functionality with minimal code.

Building an Real Time Event Processor – Part 3

See Part 2 here

Up to this point we have not really stored any data for our real time application, outside of the intrinsic storage we get with Event Hub and the temporary storage that occurs with a Service Bus queue. Now we get into storing data and alerting users to its presence so they can update.

Understanding the nature of stored data in real time data applications

It is a bit of an oxymoron but, there is no such thing as real time data. What we really mean for term is “continuously producing data within intervals” to give the appearance of real time. Another aspect to this is that our data is often “pushed” to clients so that visualizations can be appropriately updated.

In this sense, it is best to approach this as you are storing a lot of data and on top of that data you are creating aggregations which are stored and used for updates. It is not tenable for front end systems to constantly query an ever larger set of data, it simply gets slow. We need to localize our efforts and use cumulative or window based approaches.

As an example, if your organization wanted to know how much product had been sold for a day. You would keep a cumulative total using an aggregating window rather than query a larger data set for the answer. The later approach would work but would slow down with time. You still want to store the raw data as there are benefits to using large data sets to see trends but, in the moment, you only need a very small piece of the data.

Now that we understand that, let’s talk about how this application deals with its data.

Save our aggregate data

So, when the Stream Analytics function runs its query it selects the data INTO an output. In this case, the result is set to an Azure Function that creates a Document in CosmosDB with the data.

For a Function App, Stream Analytics calls the Azure function via Http thus, this is set as a HttpTrigger using the POST method and expecting the data in the body.

Since the Stream Analytics job expects a successful HttpResponse we must reserve the output for this purpose, we cannot return the new CosmosDb document. Instead, we set an Output binding which allows us to create the document as an out parameter.

We could choose to use a POCO (Plain Old C# Object) to represent what is going into the Document but, given the simplicity of this example it is not necessary.

The result here is as the function receives data from Stream Analytics it is automatically written to CosmosDB. This will also invoke the Change Feed which our next function will listen for. But, before we talk about that I want to ask the question of whether using CosmosDB for this is correct.

Choosing the right database

The short answer is kind of, but not really. I often advise clients to stay away from CosmosDB Core SQL offering due to the pricing. You have to remember that CosmosDb is aimed at high volume and multi-region scenarios. You should only use it if you have such a requirement. The one exception is table storage which is a Cosmos branded service replacing the old Azure Table Storage that predates Cosmos.

In short,  due to its price and target use cases, for most organizations, it does not make a lot of sense. I decided to use it here because it is aimed for high availability so it can take in data much faster than a traditional RDBMS – plus the data being generated is nearly guaranteed to be consistent due to what it is.

The other thing to consider, especially with Real Time applications is whether there is a need to store aggregated data from a long time ago. Remember, we do still store the raw data in Azure Blob storage (which is much cheaper than Cosmos). We have to ask the question, does knowing the aggregate for 1 week ago really provide business value? Would it be cheaper to accept the cost of querying it from our historical records. The answer here is usually that it is better to query.

The point here is, real time aggregations like this often work best with a time series database, such as Redis which can be configured to prune old data automatically. In fact, Redis would be the go to for this sort of operation in a production scenario. But, as of this writing, there is NO trigger and bindings for Redis service so, I would not have been able to stay to my goal of writing no plumbing logic for this exercise. If this were a production situation, Redis would be highly considered far more so than Cosmos.

I also think it means I need to write such a binding for other to use.

Notify Users

Modern NoSQL databases, almost without exception, expose a feed which indicates when a change occurred to the data. This is fundamental for building event streams around database which are the building block for real time applications as well as Event Driven Architectures, Event Sourced Architectures, and other distributed data systems.

You can think of this, in some ways, as the modern version of the Trigger concept from RDBMS except, rather than living in the database, these triggers are at the application level and can be used to tie components together in a decoupled fashion.

Here is our code which listens to the CosmosDB change feed and sends the new document created to users via the Azure SignalR service:

The use of the CosmosDBTrigger is what hooks into the change feed for the target database and collection where our data is going to be added. Additionally, CosmosDB requires a lease collection which tracks who is consuming the change feed for that document.

Our next parameter allow us to send documents to the Azure SignalR service for a particular Hub. I wont go into explaining SignalR and what Hubs are you can learn more about that here: https://docs.microsoft.com/en-us/aspnet/signalr/overview/getting-started/introduction-to-signalr#connections-and-hubs

Our binding looks for the AzureSignalRConnectionString to know which SignalR service to connect to. Since this is the Azure SignalR service is it imperative that our service is configured for serverless mode under Settings. Update the CORS setting for SignalR service to be * or whatever is appropriate. Not doing this will cause problems when we attempt to connect.

Above, you notice that we defined a negotiate function with a Output binding of type SignalRConnectionInfo. This is the method that will be initially contacted by our clients to get our information, it beats having to store the access key somewhere where its widely available – using negotiate the key is downloaded and used internally to the client (though with Developer Tools you can still pick it out). It is the serverless setting in the SignalR service which drives this behavior.

Our client looks like this:

Note the .withUrl call that specifies the base URL to our Azure Function. The first call made will be this URL appended with /negotiate. After that call completes, SignalR will attempt to open the socket.

Remember to configure CORS in your Azure Function app as well – its under Platform Features.

The final bit to call out is the newFirstLetterData event. SignalR works by “calling” a JavaScript function from the backend (or that is how it makes it seem). In our code we add instances of SignalRMessage to an instance of IAsyncCollector – one of the parameters TargetName is set to this value indicating what method should be called when that message lands on a client.

To ensure I did not miss anything with this step (it took me the longest to figure out) here is a tutorial that covers this topic: https://docs.microsoft.com/en-us/azure/azure-signalr/signalr-quickstart-azure-functions-javascript

Up Next

So that completes the overall pipeline technically. My earnest hope is your is working as well as mine. For our final discussion I want to discuss areas we can improve and what the key takeaways from this exercise were.

Building an Real Time Event Processor – Part 2

Part 1: Building a Real Time Event Processor – Part 1

Welcome to Part 2 of this series on building a real time pipeline using Azure Services and Azure Functions. In the first part, we talked through the services we will use and laid out our goals for this project. Chief among them was to ensure we leveraged Azure Function bindings to their fullest to eliminate the need to write boilerplate connect code for various Azure services. By doing this we allow the underlying service to handle this for us.

At the end of Part 1 we had create our Timer function which was responsible for routinely running and sending data into our pipeline by way of a Service Bus Queue. Now, we will write the code to dequeue our data and send it to Azure Blob Storage and an Azure Event Hub.

Dequeue Our Data

The data we stored in the queue takes the following format:

Remember, when messages are stored in Service Bus they are stored as UTF8 byte arrays to reduce space. The trigger will convert this to a string or if the destination type is a complex object, it will attempt to deserialize it to that type. Here is the code for our function:

In my opinion, when you see code like this you are using Azure Functions to their fullest extend. Our function consists of a loop and that is it – and truthfully we could probably use some LinqFu to remove even that.

What this method does is a few things:

  • First, the trigger for the Service Bus upconverts the byte array into a standard string. But because the destination type is not a primitive (NamesGenerationRecord) the trigger will attempt to deserialize the string into that object.
  • Next, we define an Output binding for an Event Hub which forces the destination type of IAsyncCollector<T> to capture event string that need to be set to our Event Hub. And because it would not make sense to send only one event, it is a collection which will send asynchronously as we add additional messages
  • Finally, we define the return as going to a Blob. This return will write our raw Json contents to a Blob entry with the {id} name. More on this is a second as its really neat but not well documented.

We get all of that functionality with mostly attribution. That is what I call super cool. Before we move on, let’s talk about the return of this function and how it works.

Understanding Bindings

So, what gets stored in the Service Bus is a JSON string that looks like this:

When the ServiceBusTrigger delivers this string to the function it notes that its not delivering a string but rather an instance of NamesGenerationRecord which has this definition:

Crucial here is that we create a property called Id. It is this property value that will be bound to the {id} in our return template:

[return: Blob(“raw-names/{id}.txt”, FileAccess.Write, Connection = “AzureWebJobsStorage”)]

Initially, I thought this would view the string as JSON and create a JObject from it and look for an JToken with the name id but, when I tried that I received errors around the runtime being unable to find the id field to bind to. Using a custom object solved this problem.

I found this very interesting and it gives enormous possibilities. Given the way Microsoft tends to operate its likely the many of the functions support this for both Output and Return bindings.

The end result here is for each item dequeued from our Service Bus Queue we will create a blob in the raw-names container with a name of a Guid ensuring a high degree of uniqueness.

Preparing the Event Hub

Simply storing this data is not all that interesting although this is a great way to build a Data Lake that can serve well for running queries of a historical nature. The value we want is to analyze our data in real time.  There are services that could handle this, with vary degrees of scalability and success:

  • Azure Event Hub
  • Azure Event Grid
  • Azure Service Bus
  • Azure Blob Storage

Some of these options may not make sense to you and that is fine. I said they COULD work for this scenario but, its unlikely using something like the change feed of an Azure Blob Storage would make a lot of sense but it could work.

Among these options, the BEST service, in my view, for our scenario is Azure Event Hub due to being cost effective and highly scalable with minimal latency. It also enforces ordering within partitions and can hold vast amounts of data for days at a time.

Further, it also hooks in well with Azure Stream Analytics Jobs which are vital to real time processing pipelines.

To setup and Event Hub, follow the instructions here: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create

Here is a visual of what the communication between the Azure Function and Event Hub looks like – I am assuming 5 names per queue message which translates to 5 events sent to Event Hub.

Queueing

For this visualization I created a view inside the Event Hub as messages are being sent along. In this situation EACH execution of the Dequeue Processor Function will send 5 envelopes to Event Hub. These will get processed in order as they all live within the same partition.

Real Time Aggregation with Stream Analytics Job

I cannot count how many times I have come across code bases where developers have chosen to use an Azure Function or some other code based implementation to implement aggregation. While this works it is far from ideal. The Stream Analytics Jobs are the recommended approach both from the standpoint of scale and ease of use – better to write a SQL query to perform aggregation using many machines than attempt to rely on the Azure Function runtime.

Here is an example of the query I used for this exercise:

This query executes in three phases.

First, it reads from our Input which in this case is NamesData an alias for the Event Hub which our Azure Function was pouring data into in the previous section.  This looks at each name entry and grabs the first character (base 1) from each Name in the data set. This subquery results are named FirstLetters.

Next, the subquery results are used as the source for another subquery, LetterCounts. This query takes the first data and counts how many times each letter appears in the set. Key to this query is not just the grouping logic but the TumblingWindow. Let me explain.

Remember that you are not querying against a standard database of data with these queries. Your data set is consistently moving, considerably so in high volume cases. For this reason, you MUST restrict what data your query considers using windows. Stream Analytics supports 4 of these windows:

  • Tumbling
  • Session
  • Hopping
  • Sliding

Depending on how fluid you want your data to be generated you would pick the appropriate window. They are described here: https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions.

For our example, we use Tumbling which segments the event stream into a distinct windows and ensures events do not cross into other segments. In the case of the query above, we get the events broken into 30s intervals with each event belonging to one and only one segment. There are use cases for wanting events to belong to multiple segments, as is the case with Sliding Windows.

The final bit of this query select the results of LetterCounts into the Output SaveGroupdData which fronts an Azure Function to take our aggregated data to the next stage.

Up Next

In this post, we continued to lessons of Part 1 and dequeue our raw data, parsed it and sent the raw events into our Event Hub. Within the Event Hub our data is stored (temporarily for x days) and it can be read by various processes in real time. One of the best ways to perform this reading is using Azure Stream Analytics jobs which enable SQL like querying of data in the stream using windows to execute data segmentation. The result was a sequence of writes to our output Azure Function (we will discuss in Part 3) with the aggregated data results from the query.

In the next segment we will store our Aggregate results and notify connected clients who may update their UIs display our real time data.

One Note

One of the most popular outputs for Stream Analytics jobs is Power BI. This tool can take in the data and build these auto updating charts to represent data within the stream. I was not able to use Power BI for this but it is a common output for use cases like this.

Building an Real Time Event Processor – Part 1

So, I decided recently to undertake a challenge to build an end to end real time processing pipeline using Azure Functions. They key here would be to only use Outbinding bindings, triggers, and return bindings to communicate with other Azure services. In this way, it lessens the amount of code needed and reduces overall chances for bugs.

In addition, it gives insight into potential ways of building real time data processing clients, a scenario becoming increasingly common in business. Further, they are a challenge and require intense communication and collaboration with various persons to ensure the data is partitioned and chunked appropriately. Throw in the cloud dimension where our setup needs to be cost effective as well as functional and its quite the challenge.

Let’s get started.

Understanding Bindings

One of the most useful feature in Azure Functions is bindings whereby we can encapsulate functionality that would normally necessitate boilerplate code for a simple attribute on a parameter. This allows the function to communicate with many things and the management of that communication to be handled by Azure.

Perhaps the most popular and well known binding is HttpTrigger whereby a Azure Function is triggered when a Http request is made that matches a specific pattern. This trigger outputs itself as HttpRequest instance which contains all of the information about the incoming request. Other triggers function in similar ways – I even have a post about writing a custom binding.

Regardless, when using Azure Functions you should aim to heavily use bindings as much as possible. It will save you from writing bad or incorrect service connection code.

Enough of that, let’s talk shop.

The Architecture of our Pipeline

Selection of services is critical when designing a real time pipeline, each case will be different and have different limitations. Recently, I helped a client understand the difference between Azure IoT Hubs and Event Hubs and what scenario is geared for. In that case explaining that IoT hubs allow for an enormous amount of concurrent connections (sensible given the use case) where Event Hubs allow a fair number but much less than IoT. This limitation often informs other architecture decisions you would make.

For our sample, I choose the following services:

  • Azure Functions – should not come as a surprise. They are the backbone and connectors of the event pipeline. As a rule of thumb, which building an Azure Function be mindful of its responsibilities. They are not designed to be large and there are certainly better services for performing heavy amounts of processing
  • Event Hub – Event Hubs are designed to handle large amounts of data and maintain order within partitions and minimal latency. It is able to send its incoming data blocks to multiple services for processing. If Capture is enabled, Event Hub can store each event it receives in an associated blob storage. The blob storage could serve as a simple Data Lake over time
  • Stream Analytics Job – This service enables queries to be run over the data in Event Hub. It is commonly used to aggregate windows of data and send results to other services for storage and further processing. These windows exists because, in a real time application, the data set is never fixed.
  • CosmosDb (Core) – CosmosDB is Microsoft’s global NoSQL database solution used to support various providers for global and multi-region scenarios. Due to it being NoSQL it favors the AP portion of the CAP theorem (https://en.wikipedia.org/wiki/CAP_theorem) and this is well suited to a situation where it will receive data frequently and in a controlled manner where consistency is not a major concern
  • Azure SignalR Service – SignalR enables transmission of new aggregate data sets from Stream Analytics to connected users. This could power charts or graphs in real time to allow for more visibility of per minute operations

Now that we know the services here is a visual of how they are connected:

EventFlow

A few things of note that we take advantage of in this pipeline:

  • CosmosDb supports change feeds which an Azure Function can monitor and trigger itself when a change is detected. This enables the so called streaming architecture that underpins this sort of pipeline. It lets the entire system respond when something in data changes
  • At the Dequeue stage we use a combination of return and output binding to send our data to two places. Further, we use a binding to parameterize out Blob return so we can set the name of the blob created (we will explore in code later in this series). The use of Output bindings with Azure functions enables the function to initiate more than one service as an output OR, in the case of Event Hub, notify the service multiple times through the course of one iteratiom

Now that we understand our flow, let’s look at our first Azure Function – a Timer Trigger.

Phase 1: Generate our Data

Data plays a huge role in a real time processing pipeline. In my smaller example I have service which generates random Western style names. My goal is to capture these names and send them to the queue. The size of this data can vary since I may get back 4 names or 10. Here is the code for the function:

This code uses a ServiceBus return string – this will cause whatever string we return from this function to be added to a Service Bus queue named newnames-queue in its raw JSON form (this is why we use the Linq to Json sytle from Json.Net library). The binding will take care of encoding the string into a byte array to minimize transport and storage size while it resides in the queue.

In the attribution for this function, you can see we specify the queue name as well as the connection string (without the Entity Path) to the Service Bus. The Configuration setting ServiceBusConnection is set for the Azure Function app and the attribute will handle resolving it.

Up next

This brings us to the end of Part 1 of this series. Our next part will show how we dequeue this message in a way that maintain strong typing and send it to both blob storage and Event Hub in one fell swoop.

Part 2

Part 3

 

Getting Started with Kafka and .NET Core on Kubernetes

Real time streaming is at the hard of  many modern business critical systems. The ability for data to be constantly streamed can give organizations new insights into their data and the real time nature means these trends can be seen in real time, creating possible value streams for organizations.

It being so popular there are many options available from cloud hosted to self hosted. One of the big ones is Apache Kafka which enables this sort of “pubsub on steroids” that enables it to scale its data injest and streaming capabilities to fit the needs of almost any organization. In this post, I want to walk through a basic setup of Apache Kafka using Bitnami’s Helm Chart.

Prepare your cluster

Truth be told, I did this using an Azure Kubernetes Service cluster that I spin up on occasion that has three large VMs backing it. I have found using things like Kubernetes for Docker, minikube, and others that you run into resource limitations that make it hard to deploy. For that reason, either give your local cluster an immense amount of resources or using a cloud managed one. I recommend Azure simply because by default the AKS cluster is backed by a scale set that you can Deallocate as needed – saves immensely on cost

Bitnami Helm Chart

I love Helm because it enables quick deployment of supporting software that would otherwise take a lot of reading and learning and tweaking. Instead, you can use Helm to execute this: https://github.com/bitnami/charts/tree/master/bitnami/kafka

The instructions above are actually written to target Helm2, latest is Helm3. For the most part its similar enough, though I love the removal of Tiller in Helm3. Syntax of the command is a little different – here is what I used:

helm install kafka-release –namespace kube-kafka bitnami/kafka

This creates a release (an instance of deployment in Helm terminology) called kafka-release places the deployed components in a Kubernetes namespace called kube-kafka and deploys resources based on the bitnami/kafka Helm chart – if you look at the above link there are many ways to override how things are deployed via this chart

After running the command, Helm will start deploying resources which then have to spin up. In addition, it will layout some instructions for how you can play with the Kafka cluster on your own, using a temporary pod. But before you can do this, the deployment must enter the Ready state. Best to run the following command to know when things are ready:

kubectl get pods -n kube-kafka –watch

This will watch the Pods (which are the things that have to spin up due to container creation) and when both are ready you can safely assume Kafka is ready.

Kafka oversimplified

So, I am still learning the architecture internally of Kafka. I know that it is basically a message broker system that enables a multicast eventing to various topics that can have multiple subscribers. But I honestly need to do more to learn its internals – suffice to say for this exercise, you send a message to a topic in Kafka and all listeners for that topic will receive the message.

The term Producer and Consumer will be used throughout the remainder of this post. Producer sends data to the cluster nodes, and Consumers receive that data.

Create the Producer

Our producer will be rudimentary and over simplified but just to get the idea of the sort of structure these types of applications take. Remember, this is not Production level code so, copy and paste at your own risk.

So to start you need to add the Confluent.Kafka NuGet package (current version as of this writing is 1.4.0).

Next, create a config type and set the BootstrapServers – this is the server your code will contact to setup the message broker and send messages to based on where that broker ends up (not sure how all of that works yet). Suffice to say, when you finished running your Helm install this is the Service DNS name you are given – it follows the standard convention used by Kubernetes to name services.

For our example we cycle over all of the int values available to .NET (all the way up to int.MaxValue) so we can keep our producer going for a long time, if need be. For each iteration our code simply writes a message to the broker indicating the current iteration number.

We use the ProduceAsync method to send this message to Kafka – we use a try/catch here to catch any sending errors. Everything is written out to STDOUT via Console.WriteLine.

One of the key arguments to ProduceAsync is the name of the topic to associate our message to. This is what our consumers will listen to so a single message sent to this topic can be fanned out to as many consumers as are listening. This is the power of this type of architecture as it allows for event based processing with a high degree of decoupling. This allows different parts of our application to simply respond to the event rather than being part of a longer chain of functions.

Build the Consumer

As with the Producer, the first step here is to add the Confluent.Kafka NuGet package

As with the Producer, our first step is to add the Confluent.Kafka Nuget package so we can use the built-in types to communicate with Kafka.

You can see with the Consumer that we subscribe to our topic (increment-topic in this case). I was a bit surprised this was using the built in .NET eventing model since I think that would make more sense. Instead, we have to create a busy loop that attempts to consume each time and checks if it gets anything.

From there we just bring the message we received.  You notice that our BootstrapServers value is the same as it was in the Producer, should be – we are writing and reading from the same Broker.

The GroupId, I do not know what this is – need to read up on it but I set it all the same.

Testing our Example

Our goal is to run against Kafka hosted in Kubernetes and thus we will want both our Producer and Consumer there as well. You could use kubectl port-forward to expose the Kafka port locally but, I didnt have much luck with that as the code would immediately try to call a Kubernetes generated service name which was not exposed. I might tinker with this some more. My main goal with this exercise is to see this working in Kubernetes.

The best way to do this, or at least the simplest is to create a vanilla pod with the consumer and producer executing as separate containers. Truthfully, you would never want to use a vanilla PodSpec in Production (usually you want it in a Deployment or ReplicaSet) since doing so would not maintain high availability and resiliency (if the Pod dies, everything stops, it doesnt get recreated). Nevertheless, for this simple example I will be creating a multi-container pod – we can check the log files to ensure things are working.

Here is the PodSpec:

I will assume that you know how to build and push Docker images to a repository like Docker Hub (or Azure Container Registry in this case).

Next, we apply this spec file to our Kubernetes cluster

kubectl apply -f podspec.yaml

And we can use the following to wait until our pod is running (we should see 2/2 when ready)

kubectl get pods -n kube-kafka

Next let’s get a sample of the logs from our Producer to ensure things are connecting our messages are going out. We can run the following command (using the PodName from the PodSpec above):

kubectl logs kafka-example -n kube-kafka producer

If things are working you should see a lot of message indicating a message send. If you see errors, double check that you typed everything right and that, in particular, your BootstrapServer value is correct (you can use kubectl to query the services in the deployed namespace to ensure you have the right name)

Assuming that is working, we can perform a similar command to see the logs for the Consumer:

kubectl logs kafka-example -n kube-kafka consumer

Once again, if all things are working you should see messages being read and displayed in the console.

Congratulations!! You have just completed your first end to end Kafka example (maybe its not your first but congrats all the same).

Why is this important?

I talked about real time applications at the onset – these are comprised, generally, of event streaming whereby as data enters the system it causes events to be generated (often times the events are the data) which can be sent all over the system. It can create records in a database, updated metric counters, flip switches, and many other things – this is actually the basis behind IoT applications, streaming a tremendous amount of telemetry data and using systems like Kakfa to analyze and process that data in real time.

I love these types of applications because they enable so many unique business scenarios and really cool real time charts. In my next post, I hope to take my Kafka knowledge further and do something more useful.