Showing posts with label Azure Data Factory. Show all posts
Showing posts with label Azure Data Factory. Show all posts

(2020-Dec-21) While working with Azure Functions that provide a serverless environment to run my computer program code, I’m still struggling to understand how it actually works. Yes, I admit, there is no bravado in my conversation about Function Apps, I really don’t understand what happens behind a scene, when a front-end application submits a request to execute my function code in a cloud environment, and how this request is processed via a durable function framework (starter => orchestrator => activity). 

Azure Data Factory provides an interface to execute your Azure Function, and if you wish, then the output result of your function code can be further processed in your Data Factory workflow. The more I work with this couple, the more I trust how a function app can work differently under various Azure Service Plans available for me. The more parallel Azure Function requests I submit from my Data Factory, the more trust I put into my Azure Function App that it will properly and gracefully scale out from “Always Ready instances”, to “Pre-warmed instances”, and to “Maximum instances” available for my Function App. Supported runtime version for PowerShell durable functions, along with data exchange possibilities between orchestrator function and activity function requires a lot of trust too because the latter is still not well documented.

My current journey of using Azure Functions in Data Factory has been marked with two milestones so far:

  1. Initial overview of what is possible - https://server.hoit.asia/2020/04/using-azure-functions-in-azure-data.html
  2. Further advancement to enable long-running function processes and keep data factory from failing - https://server.hoit.asia/2020/10/using-durable-functions-in-azure-data.html

Photo by Jesse Dodds on Unsplash

Recently I realized that the initially proposed HTTP Polling of long-running function process in a data factory can be simplified even further.

An early version (please check the 2nd blog post listed above) suggested that I can execute a durable function orchestrator, which eventually will execute a function activity. Then I would check the status of my function app execution by polling the statusQueryGetUri URI from my data factory pipeline, if its status is not Completed, then I would poll it again. 

In reality, the combination of Until loop container along with Wait and Web call activities can just be replaced by a single Web call activity. The reason for this is that simple: when you initially execute your durable Azure Function (even if it will take minutes, hours, or days to finish), it will almost instantly provide you with an execution HTTP status code 202 (Accepted). Then Azure Data Factory Web activity will poll the statusQueryGetUri URI of your Azure Function on its own until the HTTP status code becomes 200 (OK). Web activity will run this step as long as necessary or unless the Azure Function timeout is reached; this can vary for different pricing tiers - https://docs.microsoft.com/en-us/azure/azure-functions/functions-scale#timeout

The structure of statusQueryGetUri URI is simple: it has a reference to your azure function app along with the execution instance GUID. And how Azure Data Factory polls this URI, is unknown to me, it's all about trust, please see the beginning of this blog post :-)

https://<your-function-app>.azurewebsites.net/runtime/webhooks/durabletask/instances/<GUID>?taskHub=DurableFunctionsHub&connection=Storage&code=<code-value>


This has been an introduction, now the real blog post begins. Naturally, you can execute multiple instances of your Azure Function at the same time (event-driven processes or front-end parallel execution steps) and the Azure Function App will handle them. My recent work project requirement indicated that when a parallel execution happens, a certain operation still needed to be throttled and artificially sequenced, again, it was a special use case, and it may not happen in your projects.

I tried to put such throttling logic inside of my durable azure function activity, however, with many concurrent requests to execute this one particular operation, my function had app used all of the available instances, while the instances were active and running, my function became not available to the existing data factory workflows.

There is a good wiki page about Writing Tasks Orchestrators that states, “Code should be non-blocking i.e. no thread sleep or Task.WaitXXX() methods.” So, that was my aha moment to remove the throttling logic from my azure function activity to the data factory.

Now, when an instance of my Azure Function finds itself that it can’t proceed further due to other operation running, it completes with HTTP status code 200 (OK), releases the azure function instance, and also provides an additional execution output status that it’s not really “OK” and needs to re-executed.


The Until loop container now will handle two types of scenario:

  1. HTTP Status Code 200 (OK) and custom output Status "OK", then it exits the loop container and proceeds further with the "Get Function App Output" activity.
  2. HTTP Status Code 200 (OK) and custom output Status is not "OK" (you can provide more descriptive info of what your not OK scenario might be), then execution continues with another round of "Call Durable Azure Function" & "Get Current Function Status".

This new approach for gracefully handling conflicts in functions required some changes in Azure Function Activity: (1) to run regular operation and completes with the custom "OK" status or identify another running instance, completes the current function instance and proved "Conflict" custom status, (2) Data Factory adjustments to check the custom Status output and decides what to do next.

Azure Function HTTP long polling mission was accomplished, however, now it has two layers of HTTP polling: natural webhook status collection and data factory custom logic to check if my webhook received OK status was really OK.

(2020-Oct-14Ok, here is my problem: I have an Azure Data Factory (ADF) workflow that includes an Azure Function call to perform external operations and returns output result, which in return is used further down my ADF pipeline. My ADF workflow (1) depends on the output result of the Azure Function call; (2) plus a time efficiency of the Azure Function call is another factor to consider, if its time execution hits 230 seconds or more, ADF Azure Function will fail with a time-out error message and my workflow is screwed.

Image by Ichigo121212 from Pixabay 

I either have some high hopes that my Azure Function calls in a data factory pipeline will stay within 230 seconds or I need to make a change and replace a generic Azure Function call with something else, something more stable and reliable.

The time of 230 seconds is the maximum amount of time that an HTTP triggered function can take to respond to a request and Microsoft recommends either to refactor your serverless code execution or try and use Durable Functions, which is an extension of Azure Functions - https://docs.microsoft.com/en-us/azure/data-factory/control-flow-azure-function-activity#timeout-and-long-running-functions

Back in April of 2020, I have already blogged about the use of Azure Functions in Data Factory pipelines - https://server.hoit.asia/2020/04/using-azure-functions-in-azure-data.html. I had already described possible variations of using Web, Webhook, and Azure Function activities to execute your Function App code and my frustration with the 230 seconds time limit.

So, I decided to check if a Durable Function could be a remedy for a long-running process that Azure Data Factory tries to govern. The official documentation describes Durable Functions as, “stateful functions in a serverless compute environment… they let you define stateful workflows by writing orchestrator functions and stateful entities by writing entity functions using the Azure Functions programming model”. I’m still confused by this definition, let I will be the only one confused. But for me the term “durable” for a function, means that it should provide a stable execution of long-running processes and support for a reliable orchestration of my serverless Function App code.

The first thing, I did, I searched online if anyone else had already shared their pain points and possible solutions of using Durable Functions in Azure Data Factory:

The first two ADF posts gave me some confidence that Durable Functions could be used in ADF, however, they only provided some screen-shots, no code examples, and no pattern to pass input to a Durable function and process its output in the end, which was critical to my real project use-case; but I still give credit to both guys for sharing this information. The third post is one of many very detailed and well written about Durable Functions, but they didn’t contain information about ADF and PowerShell code for my Function App that I was looking for. So, this was my leap of faith to do further exploration and possibly create an ADF solution with the Durable Functions that I needed.

Initial Information and Tutorial for Azure Durable Functions
Microsoft provides some very good examples and tutorials to start working with Durable Functions in the Azure Portal - https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=powershell. You have a way to create three types of Durable Functions or components; all of them will be necessary to build a single durable Function App workflow:
- Starter: to “start” a durable function “orchestrator
- Orchestrator: to “orchestrate” execution of an “activity” function
- Activity: actual serverless code of your function app that you want to perform

Then you can create sample durable functions in your Azure Function App:

The sample code is a simple solution to write the output of different city's names:









(2020-Oct-05Adding a row number to your dataset could a trivial task. Both ANSI and Spark SQL have the row_number() window function that can enrich your data with a unique number for your whole or partitioned data recordset. 

Recently I had a case of creating a data flow in Azure Data Factory (ADF) where there was a need to add a row number.

Photo by Micah Boerma from Pexels

Instant reaction was to this new additional row number column using a derived column transformation - https://docs.microsoft.com/en-us/azure//data-factory/data-flow-derived-column. However, this was my mistake and ADF notified me that rowNumber() -  data flow function was only available in the Windows transformations - https://docs.microsoft.com/en-us/azure/data-factory/data-flow-window.


OK, I moved on and added a Window Transformation task, which does require at least one sorting column.

However, this was a bit of an issue in my data case. I didn’t need to sort and change the order of my recordset. Transformed dataset needed to be in the same order as its original sourcing data, as it would help to locate a transformed data record with its sourcing sibling record with the help of the new Row Number column.

So I thought that it wouldn't hurt to add just a static constant value column (let’s say with value  = 1 ) and then use this new (1) valued column in my rowNumber() window transformation as a sorting attribute. 

1) First I added a Derived Column transformation with the Column_Value_1 column value set to 1 (or any other constant value of your preference).
2) Then I added a Window transformation with the rowNumber() function and Column_Value_1 as a sorting column.
3) Then I was able to see my output result with preserved order of records and additional Row Number column.



Update:
Joseph Edwards has just pointed out that using the Surrogate Key transformation would achieve the very same result of adding a row number column, and this step doesn't even require sorting columns. I've just tested, and it's working, amazing! I have written this whole blog post just to realize that there is a better way to number rows in the Azure Data Factory flows :-)



(2020-Sep-26) Last week one of my Azure Data Factory (ADF) deployment pipelines failed with an error that it couldn’t find some of the deployment parameters that I try to override, and I wondered what might have caused the issue. Usually, after configuring and testing all the deployment steps, there are very few things that can topple a normal and successful deployment process of your data factory, and it’s got to be big.

My first step was to remove parameters overriding instances from a deployment pipeline. There is no need to reference and override a deployment pipeline parameter if it hadn’t come out of the process of creating my ADF Azure Resource Manager (ARM) template. That quick fix worked and my ADF deployment process successfully continued and new code was deployed to a testing environment.

Photo by Karolina Grabowska from Pexels

However, when I checked one of the web-activity tasks in my testing Data Factory after the deployment was finished, that activity still had a URL reference to a resource from the Development environment and not the Testing. That URL reference correction (overriding) used to properly happen during the deployment process. You can change the default parameterization template and prescribe your Data Factory to include some specific properties as additional custom parameters to deploy - https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment#use-custom-parameters-with-the-resource-manager-template. I have also blogged about it two months ago - https://datanrg.blogspot.com/2020/06/raking-custom-parametersvariables-for.html

My initial custom parameterization to collect all the web activities URLs from an ADF pipeline was configured the following way with the Pipelines/Activities path to my URL properties.



If those settings are still correct, then why I’m not getting any of the web activities URL properties extracted into my ADF ARM template. And then I realized after moving my web-activity task from the main pipeline level into a Switch Case activity, the original custom parameterization process couldn’t longer find them. I needed to make further adjustments to my custom parameters template.



By going to one level deeper into my ADF activities properties by including an additional “Cases” case Pipelines/Activities/Cases, I was able to capture my web-activities URL attributes.

This aha moment had shown me that modifying my ADF custom parameters template is a very good way to capture very specific attributes for deployment, however, it’s also very important to remember to review this parameter template file after making some major modification to your ADF code and see if some of the settings are no longer relevant.

That’s why I have tagged this post as “Forget Me Not” because I’m a regular person and tend to forget things :-)


(2020-Aug-14) Setting default values for my parameters or variables in Azure Data Factory (ADF) may be a trivial task, but it gets more interesting when those elements are Arrays. The recent addition of Global Parameters to the Data Factory by Microsoft may create an extra thrill to learn how to set values for such parameters too: https://docs.microsoft.com/en-ca/azure/data-factory/author-global-parameters

Photo by Mihis Alex from Pexels

I had previously blogged about Arrays and how we can use them in ADF: https://server.hoit.asia/2019/06/working-with-arrays-in-azure-data.html and this post is just a short memo to myself on how default array values can be prepared and used as well.


Array Parameters

A very simple, but a very straightforward way to set a default value for an array parameter is just to pass a text string that visually represents a collection of elements. In my example below I am setting the par_meal_array variable with the default value of '["Egg", "Greek Yogurt", "Coffee"]', which I can then further pass to my For Each Meal loop processing task as the list of items.


Array String converted into Array Variable 

In case if I have less flexibility to set a straightforward array default value, then I can start with a String parameter which can be converted into a collection of elements (array of elements). In my example below I am setting the par_meal_string variable with the default value of 'Egg,Greek Yogurt,Coffee', which I can then transform into variable array by using this function @split(pipeline().parameters.par_meal_string,',') in my Set Variable activity and further pass its output to my For Each Meal loop processing task as the list of items.


Configuring Pipeline Parameters in Triggers

With triggers in ADF, it can become more interesting when we can reuse the very same pipeline and configure its execution by different triggers while passing different input (default) parameters' values.


Let us create 3 triggers to cover a daily meal consumption and pass each individual trigger menu :-)


Which now clearly can be done and very nutritious (or not) meals can be consumed. Obviously, you can see that this list of meals is just an analogy to something else, more applicable to your specific data factory development situation. Or you can just enjoy my writing and define your own carte du jour :-)


Final thoughts

Actually, I don't any have any closing remarks for this post. In my consense, I think everybody already knows all about this and I'm not revealing anything significantly new. However, I've tested this all, I have finished writing the blog post about this, and I will remember this even better now :-) 


(2020-July-29) There is a well known and broadly advertised message from Microsoft that Azure Data Factory (ADF) is a code-free environment to help you to create your data integration solutions - https://azure.microsoft.com/en-us/resources/videos/microsoft-azure-data-factory-code-free-cloud-data-integration-at-scale/. I agree and support this approach of using drag and drop visual UI to build and automate data pipelines without writing code. However, I'm also interested to try if I can recreate certain ADF operations by writing code, just out of my curiosity.

Previously I have written a blog post about using ADF Data Flow Flatten operation to transform a JSON file - Part 1: Transforming JSON to CSV with the help of Azure Data Factory - Mapping Data Flows


This time I would like to check and compare Databricks code development experience to Flatten the very same sourcing JSON file.
[
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" },
{ "id": "1004", "type": "Devil's Food" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
},
{
"id": "0002",
"type": "donut",
"name": "Raised",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
},
{
"id": "0003",
"type": "donut",
"name": "Old Fashioned",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}
]


(1) Data connection
First, I want to compare Databricks' understanding of this JSON file vs. ADF Mapping Data Flow data connector.

ADF Mapping Data Flow provides the following file structure projection of my JSON file:

This is how Databricks understood my JSON file, so both tools are in sync in this.


(2) Flattening topping JSON array column
The very first use of the Flatten data transformation in my ADF data flow expands the topping column:

Databricks use of the explode Spark function provides similar results:


(3) Flattening batter JSON array column
The next use of the Flatten data transformation in my ADF data flow expands the batter column:

Which doesn't look any different in my next Spark DataFrame with the help of using the same explode function:


(4) Validating results in ADF Mapping Data Flow
My ADF data flow final transformation output matched with my Databricks notebook DataFrame result: all necessary columns have been flattened (or "exploded") and JSON data file schema in both cases have been properly interpreted.

I'm just curious how different my appreciation for the ADF data flow Flatten transformation would be if it was named Explode after its Spark explode sibling :-), because the actual Spark flatten function doesn't expand data structures into multiple rows but it transforms an array of arrays into a single array.

So, my data adventures journey continues :-)



(2020-June-07) It is a very simple concept, you have a database query result, either a direct database table output or a stored procedure returned result that needs to be sent as email notification. This email could serve as a status report of your daily data processing job or alert notification with some metrics that require immediate attention, and you want to be as user-friendly as possible with this message aka HTML format for your dataset is required.


I've built this solution approach over a year go and wanted to document and share it after recently using it in one of my projects. Again, I agree, there other more or less effective ways to achieve the very same goal, this blog post is just a recollection of my efforts to send HTML formatted SQL query results using Azure Data Factory and Logic App.

(A) Database Query
Let's say I want to send an email message with the Top 10 Selling Products using Microsoft AdventureWorks Sales LT database:

I also created a stored procedure to prepare an HTML output result. This where the whole trick is created (or the purpose for this blog post = spoiler alert :-). Knowing, that my Azure Logic App to transmit email message usually takes one variable for email body message then this drives my efforts to convert SQL based multiple records' result into a single string data structure along with HTML formatting options applied.

1) First I need to use <li> HTML tag to list all of my dataset records
2) Then I need to wrap this list of items with the <ol> HTML tag to add numbers for each of the lines (your case might be different: a table definition or text coloring might be necessary).

Achieving my first requirement was easy by combining all the data elements into one column with the "li" name, and the second requirement was accomplished by converting all my records into one data cell with FOR XML RAW ('') option and adding "ol" as my ROOT.

The output of this query is not very pretty but you need to see how my ugly duckling converts into a beautiful HTML swan :-)


The rest is easy.  

(B) Azure Logic App
My Logic App has an HTTP trigger and one "Send an email" action


(C) Azure Data Factory
The main job of my Azure Data Factory (ADF) pipeline is done by the Lookup activity, the only ADF activity that could return output result of a SQL Server stored procedure, regular Stored Procedure Activity in ADF won't help me with this.



The output of my Lookup activity looks ugly as expected, but we only care that it should come out as one single string data element:


(D) Received HTML formated email message
After passing the output of my stored procedure via ADF Lookup activity to my Logic App, I received an email message as I expected: all lines formatted and nothing more.


Little did I know how much can be done with this FOR XML T-SQL clause and embedding HTML tags within. 
My data adventure continues! :-) 

(2020-May-24) It has never been my plan to write a series of articles about how I can work with JSON files in Azure Data Factory (ADF). While working with one particular ADF component I then had discovered other possible options to use richness and less constrained JSON file format, which in a nutshell is just a text file with one or more ("key" : "value") pair elements. 

Sometimes you could have an array of those pair elements or an array of other arrays; now you see where I'm going with the name of my current blog post :-) 




Previous blog posts on using JSON files in Azure Data Factory:

There are several ways how you can explore the JSON way of doing things in the Azure Data Factory. The first two that come right to my mind are:
(1) ADF activities' output - they are JSON formatted
(2) Reading JSON files - the task itself produces JSON formatted results too 

It's not a new thing to know that we can reference nested elements of ADF activities' output since it's represented in JSON format or pass the JSON file content to other tasks/components that can process this format.

But what if you need to pass a complete output of your ADF activity task further down your pipeline. Or you need to pass a JSON array elements to another ADF activity or sub-pipeline as the parameter value. Let's explore what other options available in Azure Data Factory for this very interesting use case.

(1) ADF activities' output
Let's say I have a "Get Metadata" task in my ADF pipeline that reads the content of a particular folder in my storage account.


The main portion of the output for this activity may look this way:


I already know that I can get access to the list of files/folders using this ADF expression:
@activity('List of files and folders').output.childitems

Or getting to the first file by referencing a very first element [0] of this JSON array:
@activity('List of files and folders').output.childitems[0]


However, if I want to store the complete output of this activity into a separate Array variable with preserving the option of referencing all nested elements, then this can only be done by wrapping the output into array by using the function with the same name:
@array(activity('List of files and folders').output)

Otherwise you will get a data mismatch error message. This will become very helpful if you want to pass the JSON output into the next activity of your ADF pipeline to process. It works and it's fun! :-)


(2) Reading JSON files 
Let's get more creative and read a real JSON file where you have more flexibility and control over its content.

Let's say I want to read the following JSON file of various baking lists of ingredients:
https://opensource.adobe.com/Spry/samples/data_region/JSONDataSetSample.html
[
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" },
{ "id": "1004", "type": "Devil's Food" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
},
{
"id": "0002",
"type": "donut",
"name": "Raised",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
},
{
"id": "0003",
"type": "donut",
"name": "Old Fashioned",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}
]

The output of my ADF Lookup activity that references this file

will correctly show me 3 elements of this JSON array:



Where a list of items for my "For Each" loop activity can be simply set with the following expression:
@activity('Reading JSON file').output.value

Here is the interesting part, let's say I want to execute another ADF pipeline within my "For Each" loop activity and pass one baking recipe (or list of ingredients) as a parameter. Azure Data Factory is flexible enough and I can accomplish this with the following expression:
@array(item())


My sub-pipeline accepts this array parameter value and does further JSON data elements referencing jobs:


I can save incoming parameter value into (var_baking_payload variable) with this expression:
@pipeline().parameters.param_baking_payload 

List of toppings (var_toppings variable) can be set with this expression:
@variables('var_baking_payload')[0].topping

List of batters (var_batters variable) can be set with this expression:
@variables('var_baking_payload')[0].batters.batter



The thing that I got excited the most while working with JSON file outputs in Data Factory pipelines was that I could still pass JSON arrays between different tasks/activities or pass those arrays to another sub-pipeline as a parameter, and nested elements referencing still worked! 

If I'm the last one to learn this, I'm still excited :-)

Happy Data Adventures!