Durable Functions: Part 4 – Analyze and Download

All code for this series can be found here: https://github.com/jfarrell-examples/DurableFunctionExample

We are here now at the final part of our example (Part 1, Part 2, Part 3) that will focus on what happens after we approve our upload as shown in Part 3. That is, we will leverage Cognitive Services to gather data about image and store it in the Azure Table Storage we have been using. As to why this part is coming so much later, I moved into a house so I was rather busy 🙂

In the previous blog posts we built up a Durable Function Orchestrator which is initiated by a blob trigger from the file upload. To this point, we have uploaded the file and allowed a separate HTTP Trigger function to “approve” this upload, thereby demonstrating how Durable Functions enable support of workflows that can advance in a variety of different ways. Our next step will use an ActivityTrigger, which is a function that is ONLY ever executed in the context of an orchestrator by an orchestrator.

Building the Activity Trigger

ActivityTriggers are identified by their trigger parameter as shown in the code sample below (only the function declaration):

[FunctionName("ProcessFile")]
public static async Task<bool> ProcessFile(
[ActivityTrigger] string fileId,
[Blob("files/{fileId}", FileAccess.Read, Connection = "StorageAccountConnectionString")] Stream fileBlob,
[Table("ocrdata", Connection = "TableConnectionString")] CloudTable ocrDataTable,
ILogger log)
{
}
view raw trigger1.cs hosted with ❤ by GitHub

In this declaration we are indicating this function is called as an activity within an orchestration flow. Further, as we have with other functions we are referencing the related Blob and, new here, the ocrData cloud table which will hold the data outputted from the OCR process (Optical Character Recognition, Computer Vision essentially).

To “call” this method we expand our workflow and add the CallActivityAsync call:

[FunctionName("ProcessFileFlow")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
[Table("metadata", Connection = "TableConnectionString")] CloudTable metadataTable,
ILogger log)
{
var input = context.GetInput<ApprovalWorkflowData>();
var uploadApprovedEvent = context.WaitForExternalEvent<bool>("UploadApproved");
await Task.WhenAny(uploadApprovedEvent);
// run through OCR tools
var ocrProcessTask = context.CallActivityAsync<bool>(nameof(ProcessFileFunction.ProcessFile), input.TargetId);
await Task.WhenAny(ocrProcessTask);
}
view raw workflow3.cs hosted with ❤ by GitHub

This approach enables us to fire “parallel” tasks and further leverage our pool of Azure Functions handlers (200 at a time). This can be more effective than trying to leverage the parallel processing within the Azure Function instance itself, but always consider how best to approach a problem needing parallelism to solve.

I am not certain if the Function attribute is necessary on the function since, as you can see, we are referring to by its canonical name in C#. We also pass in the Target Id for the Azure Table record, this so a FK relationship can exist for this data. This is purely stylistic – in many cases it may make more sense for all data to live together, this is one of the strengths of document databases like DocumentDb and Mongo.

Finally, we have our Function “wait” for activity to complete. This activity, as I indicated, can spawn other activities and use its separate function space as needed.

Using Cognitive Services

A discussion on how to setup Cognitive Services within Azure is outside the scope of this article instead, I would invite you to follow Microsoft’s documentation here: https://docs.microsoft.com/en-us/azure/search/search-create-service-portal

Once you have cognitive services setup, you can update your settings so that your keys and URL match your service, install the necessary Nuget package:

  • Microsoft.Azure.CognitiveServices.Vision.ComputerVision (link)

As a first step, we need to make sure the OcrData table is created and indicate what bits of the computer vision data we want. To do this efficient I created the follow extension method:

public static List<OcrResult> AsResultList(this ImageAnalysis analysisResult, string fileId)
{
var returnList = new List<OcrResult>();
returnList.AddRange(analysisResult.Adult.AsOcrPairs(fileId, OcrType.ComputerVision));
returnList.AddRange(analysisResult.Color.AsOcrPairs(fileId, OcrType.ComputerVision));
returnList.AddRange(analysisResult.ImageType.AsOcrPairs(fileId, OcrType.ComputerVision));
returnList.AddRange(analysisResult.Description.Captions.FirstOrDefault()?.AsOcrPairs(fileId, OcrType.ComputerVision));
//returnList.AddRange(analysisResult.Brands.AsOcrPairs(fileId, OcrType.ComputerVision));
//returnList.AddRange(analysisResult.Faces.AsOcrPairs(fileId, OcrType.ComputerVision));
return returnList;
}
static IEnumerable<OcrResult> AsOcrPairs(this object obj, string fileId, OcrType ocrType)
{
foreach (var propertyInfo in obj.GetType().GetProperties())
{
if (typeof(IEnumerable).IsAssignableFrom(propertyInfo.PropertyType) == false || propertyInfo.PropertyType == typeof(string))
{
yield return new OcrResult(fileId)
{
KeyName = propertyInfo.Name,
OcrValue = propertyInfo.GetValue(obj).ToString(),
OcrType = ocrType
};
}
}
}
view raw extension.cs hosted with ❤ by GitHub

All this does is allow me to specify parent object points in the return structure for Ocr results and create a name value pair that I can return and more easily insert into the Table Storage schema I am aiming to achieve. Once I have all of these OcrPairs, I use a batch insert operation to update the OcrData table.

var computerVisionResults = await ProcessWithComputerVision(fileBlob, fileId);
// save the batch data
var batchOperation = new TableBatchOperation();
computerVisionResults.ForEach(result => batchOperation.Insert(result));
batchOperation.Insert(new OcrResult(fileId) { KeyName = FileLengthKeyName, OcrValue = fileBlob.Length.ToString(), OcrType = OcrType.None });
var executeReslt = await ocrDataTable.ExecuteBatchAsync(batchOperation);
view raw insert.cs hosted with ❤ by GitHub

Approve and allow the file to be downloaded

Now that the Ocr data has been generated our Task.WhenAny will allow the orchestrator to proceed. The next step is to wait for an external user to indicate their approval for the data to be downloaded – this is nearly a carbon copy of the step which approved the uploaded file for processing.

Once the approval is given, our user can call the DownloadFile function to download the data and get a tokenized URL to use for raw download (our blob storage is private and we want to control access to blobs). Here is our code for the download action:

[FunctionName("DownloadFile")]
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "download/{fileId}")] HttpRequest req,
string fileId,
[Table("metadata", "{fileId}", "{fileId}", Connection = "TableConnectionString")] FileMetadata fileMetadata,
[Table("ocrdata", Connection = "TableConnectionString")] CloudTable fileOcrDataTable,
[Blob("files/{fileId}", FileAccess.Read, Connection = "StorageAccountConnectionString")] CloudBlockBlob fileBlob,
ILogger log)
{
if (!fileMetadata.ApprovedForDownload)
{
return new StatusCodeResult(403);
}
var readQuery = new TableQuery<OcrResult>();
TableQuery.GenerateFilterCondition(nameof(OcrResult.PartitionKey), QueryComparisons.Equal, fileId);
var ocrResults = fileOcrDataTable.ExecuteQuery(readQuery).ToList();
return new OkObjectResult(new DownloadResponse
{
Metadata = ocrResults,
FileId = fileId,
DownloadUrl = GenerateSasUrlForFileDownload(fileBlob, fileId)
});
}
static string GenerateSasUrlForFileDownload(CloudBlockBlob blob, string fileId)
{
var policy = new SharedAccessBlobPolicy()
{
SharedAccessExpiryTime = DateTime.Now.AddHours(1),
Permissions = SharedAccessBlobPermissions.Read
};
return blob.Uri + blob.GetSharedAccessSignature(policy);
}
view raw download.cs hosted with ❤ by GitHub

That is quite a bit of code but, in essence, we are simply gathering all data associated with the data entry being requested for download and generating a special URL for download out of our blob storage that will be good for only one hour – a lot more restrictions can be placed on this so its an ideal way to allow external users to have temporary and tightly controlled access to blobs.

And that is it, you can call this function through Postman and it will give you all data collected for this file and a link to download the raw file. There is also a check to ensure the file has been approved for download.

Closing

When I started to explore Durable Functions this was the antithesis of what I was after: event based workflow execution with a minimal amount of code needing to be written and managed.

As I said in Part 1 – for me event driven programming is the way to go in 95% of cloud based backends; the entire platform is quite literally begging us to leverage the internal events and APIs to reduce the amount of code we need to write while still allowing us to deliver on value propositions. True, going to event approach does create new challenges but, I feel that trade-off in most cases is well worth it.

In one of my training classes I explore how we can write “codeless” applications using API Management by effectively using APIM to “proxy” Azure APIs (Key Vault and Storage notably). Sure, there are cases where we need to support additional business logic but, there are also many cases where we write a service to store data to blob storage when we dont need to – when we can just store it there and use events to filter and process things.

In the end, the cloud gives you a tremendous amount of options for what you can do and how to solve problems. And that really is the most important thing: having options and realizing the different ways you can solve problems and deliver value.

One thought on “Durable Functions: Part 4 – Analyze and Download

Leave a comment