Building an Real Time Event Processor – Part 3

See Part 2 here

Up to this point we have not really stored any data for our real time application, outside of the intrinsic storage we get with Event Hub and the temporary storage that occurs with a Service Bus queue. Now we get into storing data and alerting users to its presence so they can update.

Understanding the nature of stored data in real time data applications

It is a bit of an oxymoron but, there is no such thing as real time data. What we really mean for term is “continuously producing data within intervals” to give the appearance of real time. Another aspect to this is that our data is often “pushed” to clients so that visualizations can be appropriately updated.

In this sense, it is best to approach this as you are storing a lot of data and on top of that data you are creating aggregations which are stored and used for updates. It is not tenable for front end systems to constantly query an ever larger set of data, it simply gets slow. We need to localize our efforts and use cumulative or window based approaches.

As an example, if your organization wanted to know how much product had been sold for a day. You would keep a cumulative total using an aggregating window rather than query a larger data set for the answer. The later approach would work but would slow down with time. You still want to store the raw data as there are benefits to using large data sets to see trends but, in the moment, you only need a very small piece of the data.

Now that we understand that, let’s talk about how this application deals with its data.

Save our aggregate data

So, when the Stream Analytics function runs its query it selects the data INTO an output. In this case, the result is set to an Azure Function that creates a Document in CosmosDB with the data.

[FunctionName("CreateAggregateDocument")]
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
[CosmosDB(databaseName: "namesdata", collectionName: "firstletterstrend",
ConnectionStringSetting = "CosmosDbConnection")]out dynamic document,
ILogger log)
{
var eventDataContents = req.ReadAsStringAsync().GetAwaiter().GetResult();
// create the collated event data into our document db
document = new {
id = Guid.NewGuid(),
timestamp = DateTime.UtcNow,
eventData = JArray.Parse(eventDataContents)
};
return new OkObjectResult("received");
}

view raw
doc_create.cs
hosted with ❤ by GitHub

For a Function App, Stream Analytics calls the Azure function via Http thus, this is set as a HttpTrigger using the POST method and expecting the data in the body.

Since the Stream Analytics job expects a successful HttpResponse we must reserve the output for this purpose, we cannot return the new CosmosDb document. Instead, we set an Output binding which allows us to create the document as an out parameter.

We could choose to use a POCO (Plain Old C# Object) to represent what is going into the Document but, given the simplicity of this example it is not necessary.

The result here is as the function receives data from Stream Analytics it is automatically written to CosmosDB. This will also invoke the Change Feed which our next function will listen for. But, before we talk about that I want to ask the question of whether using CosmosDB for this is correct.

Choosing the right database

The short answer is kind of, but not really. I often advise clients to stay away from CosmosDB Core SQL offering due to the pricing. You have to remember that CosmosDb is aimed at high volume and multi-region scenarios. You should only use it if you have such a requirement. The one exception is table storage which is a Cosmos branded service replacing the old Azure Table Storage that predates Cosmos.

In short,  due to its price and target use cases, for most organizations, it does not make a lot of sense. I decided to use it here because it is aimed for high availability so it can take in data much faster than a traditional RDBMS – plus the data being generated is nearly guaranteed to be consistent due to what it is.

The other thing to consider, especially with Real Time applications is whether there is a need to store aggregated data from a long time ago. Remember, we do still store the raw data in Azure Blob storage (which is much cheaper than Cosmos). We have to ask the question, does knowing the aggregate for 1 week ago really provide business value? Would it be cheaper to accept the cost of querying it from our historical records. The answer here is usually that it is better to query.

The point here is, real time aggregations like this often work best with a time series database, such as Redis which can be configured to prune old data automatically. In fact, Redis would be the go to for this sort of operation in a production scenario. But, as of this writing, there is NO trigger and bindings for Redis service so, I would not have been able to stay to my goal of writing no plumbing logic for this exercise. If this were a production situation, Redis would be highly considered far more so than Cosmos.

I also think it means I need to write such a binding for other to use.

Notify Users

Modern NoSQL databases, almost without exception, expose a feed which indicates when a change occurred to the data. This is fundamental for building event streams around database which are the building block for real time applications as well as Event Driven Architectures, Event Sourced Architectures, and other distributed data systems.

You can think of this, in some ways, as the modern version of the Trigger concept from RDBMS except, rather than living in the database, these triggers are at the application level and can be used to tie components together in a decoupled fashion.

Here is our code which listens to the CosmosDB change feed and sends the new document created to users via the Azure SignalR service:

[FunctionName("NewDocumentNotify")]
public static async Task Run(
[CosmosDBTrigger(databaseName: "namesdata", collectionName: "firstletterstrend",
ConnectionStringSetting = "CosmosDbConnection",
CreateLeaseCollectionIfNotExists = true)]IReadOnlyList<Document> newDocuments,
[SignalR(HubName = "FirstNameLetterTrend")]IAsyncCollector<SignalRMessage> signalRMessage,
ILogger log)
{
log.LogInformation($"Sending {newDocuments.Count} documents");
await signalRMessage.AddAsync(new SignalRMessage
{
Target = "newFirstLetterData",
Arguments = newDocuments
.Select(doc => JsonConvert.DeserializeObject<AggregateLetterCountDocument>(doc.ToString()))
.ToArray()
});
}
[FunctionName("negotiate")]
public static SignalRConnectionInfo GetSignalRInfo(
[HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequest req,
[SignalRConnectionInfo(HubName = "FirstNameLetterTrend")] SignalRConnectionInfo connectionInfo)
{
return connectionInfo;
}

view raw
notify.cs
hosted with ❤ by GitHub

The use of the CosmosDBTrigger is what hooks into the change feed for the target database and collection where our data is going to be added. Additionally, CosmosDB requires a lease collection which tracks who is consuming the change feed for that document.

Our next parameter allow us to send documents to the Azure SignalR service for a particular Hub. I wont go into explaining SignalR and what Hubs are you can learn more about that here: https://docs.microsoft.com/en-us/aspnet/signalr/overview/getting-started/introduction-to-signalr#connections-and-hubs

Our binding looks for the AzureSignalRConnectionString to know which SignalR service to connect to. Since this is the Azure SignalR service is it imperative that our service is configured for serverless mode under Settings. Update the CORS setting for SignalR service to be * or whatever is appropriate. Not doing this will cause problems when we attempt to connect.

Above, you notice that we defined a negotiate function with a Output binding of type SignalRConnectionInfo. This is the method that will be initially contacted by our clients to get our information, it beats having to store the access key somewhere where its widely available – using negotiate the key is downloaded and used internally to the client (though with Developer Tools you can still pick it out). It is the serverless setting in the SignalR service which drives this behavior.

Our client looks like this:

<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/@aspnet/signalr@1.1.0/dist/browser/signalr.min.js"></script>
<script type="text/javascript">
document.addEventListener('DOMContentLoaded', function () {
var connection = new signalR.HubConnectionBuilder()
.withUrl('https://func.azurewebsites.net/api&#39;)
.configureLogging(signalR.LogLevel.Information)
.build();
// Create a function that the hub can call to broadcast messages.
connection.on('newFirstLetterData', (name, message) => {
$("#reportOutput").append(`<div class="entry">${JSON.stringify(message)}</div>`);
});
connection.start()
.then(() => {
alert("connected");
})
.catch((error) => {
alert("connection failed");
});
});
</script>

view raw
client.html
hosted with ❤ by GitHub

Note the .withUrl call that specifies the base URL to our Azure Function. The first call made will be this URL appended with /negotiate. After that call completes, SignalR will attempt to open the socket.

Remember to configure CORS in your Azure Function app as well – its under Platform Features.

The final bit to call out is the newFirstLetterData event. SignalR works by “calling” a JavaScript function from the backend (or that is how it makes it seem). In our code we add instances of SignalRMessage to an instance of IAsyncCollector – one of the parameters TargetName is set to this value indicating what method should be called when that message lands on a client.

To ensure I did not miss anything with this step (it took me the longest to figure out) here is a tutorial that covers this topic: https://docs.microsoft.com/en-us/azure/azure-signalr/signalr-quickstart-azure-functions-javascript

Up Next

So that completes the overall pipeline technically. My earnest hope is your is working as well as mine. For our final discussion I want to discuss areas we can improve and what the key takeaways from this exercise were.

4 thoughts on “Building an Real Time Event Processor – Part 3

  1. I’ve read all your posts about the event processor with great interest. I am currently investigating an architecture called 3-factor-app (https://3factor.app/) and we are building real-time event processors too. We were struggling with the aggregation of data and added Stream Analytics at the list of tools we need to dig into. Thanks!

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s