Showing posts with label VARIABLES. Show all posts
Showing posts with label VARIABLES. Show all posts

(2020-Aug-14) Setting default values for my parameters or variables in Azure Data Factory (ADF) may be a trivial task, but it gets more interesting when those elements are Arrays. The recent addition of Global Parameters to the Data Factory by Microsoft may create an extra thrill to learn how to set values for such parameters too: https://docs.microsoft.com/en-ca/azure/data-factory/author-global-parameters

Photo by Mihis Alex from Pexels

I had previously blogged about Arrays and how we can use them in ADF: https://server.hoit.asia/2019/06/working-with-arrays-in-azure-data.html and this post is just a short memo to myself on how default array values can be prepared and used as well.


Array Parameters

A very simple, but a very straightforward way to set a default value for an array parameter is just to pass a text string that visually represents a collection of elements. In my example below I am setting the par_meal_array variable with the default value of '["Egg", "Greek Yogurt", "Coffee"]', which I can then further pass to my For Each Meal loop processing task as the list of items.


Array String converted into Array Variable 

In case if I have less flexibility to set a straightforward array default value, then I can start with a String parameter which can be converted into a collection of elements (array of elements). In my example below I am setting the par_meal_string variable with the default value of 'Egg,Greek Yogurt,Coffee', which I can then transform into variable array by using this function @split(pipeline().parameters.par_meal_string,',') in my Set Variable activity and further pass its output to my For Each Meal loop processing task as the list of items.


Configuring Pipeline Parameters in Triggers

With triggers in ADF, it can become more interesting when we can reuse the very same pipeline and configure its execution by different triggers while passing different input (default) parameters' values.


Let us create 3 triggers to cover a daily meal consumption and pass each individual trigger menu :-)


Which now clearly can be done and very nutritious (or not) meals can be consumed. Obviously, you can see that this list of meals is just an analogy to something else, more applicable to your specific data factory development situation. Or you can just enjoy my writing and define your own carte du jour :-)


Final thoughts

Actually, I don't any have any closing remarks for this post. In my consense, I think everybody already knows all about this and I'm not revealing anything significantly new. However, I've tested this all, I have finished writing the blog post about this, and I will remember this even better now :-) 


(2020-May-01) There is a current limitation in Azure Data Factory (ADF) to accept only two conditions for the OR function.



You won't be able to specify the following expression to evaluate 3 possible values for a variable:
or(equals(variables('var1'), 'A'), equals(variables('var1'), 'B'), equals(variables('var1'), 'C')) - not possible.

However only this use-case for the OR function with 2 condition could be possible:
or(equals(variables('var1'), 'A'), equals(variables('var1'), 'B')) - limit of two conditions

But what if we have an ability to check if a particular element variable/parameter/other ADF object value belongs to a range of values (array of value), similarly to what we can do with the IN operator in SQL language, this would definitely solve our problem and remove the limitation of logical conditions to check.

The previous attempted OR expression with 3 conditions can be successfully written this way:
contains(createArray('A', 'B', 'C'),variables('var1'))

An array of values in your real-life ADF pipelines could come as an output of other previous activities or it can be a hardcoded list of values that you would want your variables to evaluate.

A similar SQL IN construction:


Could be created in Azure Data Factory this way:


Simple ADF tip to share! :-)

(2019-June-06) I remember that I had a professor at my university who would often encourage his students to learn and get more experienced with simple things first. Learn the basics in and out and then move forward to more complicated concepts and practices; that was his motto, and he really tried to share this idea with us.

Previously published blog posts about using Variables in Azure Data Factory:
Setting Variables in Azure Data Factory Pipelines
Append Variable activity in Azure Data Factory: Story of combining things together  
System Variables in Azure Data Factory: Your Everyday Toolbox 
Azure Data Factory: Extracting array first element

Simple things sometimes can be overlooked as well. With the addition of Variables in Azure Data Factory Control Flow (there were not available there at the beginning), Arrays have become one of those simple things to me. 

Image by Magnascan from Pixabay

Currently, there are 3 data types supported in ADF variables: String, Boolean, and Array. The first two are pretty easy to use: Boolean for logical binary results and String for everything else, including the numbers (no wonder there are so many conversion functions in Azure Data Factory that we can use).

Going back to my memory flashback of the professor guidance for learning and using simple things, I've finally realized that they are worth to get more experienced with! Why? Because arrays are everywhere in the Control Flow of Azure Data Factory:
(1) JSON output most of the activity tasks in ADF can be treated as multiple level arrays
(2) Collections that are required for the "ForEach" activity can be outsourced from the preceding (1) activity outputs
(3)  "Set Variable" and "Append Variable" activity could be used to store receding (1) activity outputs for further data transformation
(4) You can create Arrays manually by transforming existing linear values or setting them with hard-coded values (fixed collections).

Don't forget about various functions and expressions to support your work with Arrays in Azure Data Factory (https://docs.microsoft.com/en-us/azure/data-factory/control-flow-expression-language-functions):
intersection Returns a single array or object with the common elements between the arrays or objects passed to it.
union Returns a single array or object with all of the elements that are in either array or object passed to it.
first Returns the first element in the array or string passed in.
last Returns the last element in the array or string passed in.
skip Returns the elements in the array starting at index Count.
length Returns the number of elements in an array or string.
jsonConvert the parameter to a JSON type value.
arrayConvert the parameter to an array.
createArrayCreates an array from the parameters. 
rangeGenerates an array of integers starting from a certain number, and you define the length of the returned array.

Just to show a quick example of some simple operations with arrays, I had created this ADF pipeline with 4 main components:



(1) Lookup task to read a CSV file with 2 columns of Syllabic music notes:

Where the JSON output of this activity task contains 7 elements:


(2) Set Variable task converts a text string of "C-D-E-F-G-A-B"

into an array variable Notes_Alpabet using this expression:
@split(variables('Notes_Alphabet_String'),'-')



(3) Then looping through a collection of array elements of the (1) activity task output:
@activity('Lookup Notes_Syllabic').output.value

I then append a combination of Syllabic and Alphabet music notes into the Notes_Combined array variable

using this expression
@concat(item().ID,'-',item().Note,'-',variables('Notes_Alphabet')[add(int(item().ID),-1)])
important part is that I can locate the Notes_Alphabet variable element with the index of the Lookup Notes_Syllabic collection index: 
variables('Notes_Alphabet')[add(int(item().ID),-1)] where item().ID value comes from the first column of my sourcing file.

(4) As a result, I'm copying the content of the array Notes_Combined variable into another array Notes_Combined_View variable for Debug purpose:


And this helped me to see how both Syllabic and Alphabet music notes correspond to each other.

The code of this ADF pipeline can be found here:
https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/adf_arrays_sample_pl.json

And I hope you will find this blog post helpful in your journey to explore simple things of the Azure Data Factory!

(2019-Apr-28) Full credit goes to Microsoft for the latest efforts updating Azure products with new features and documenting corresponding changes for the end users. Azure Data Factory (ADF) is a great example of this. 

A user recently asked me a question on my previous blog post (Setting Variables in Azure Data Factory Pipelines) about possibility extracting the first element of a variable if this variable is set of elements (array).

So as a spoiler alert, before writing a blog post and adding a bit more clarity to the existing Microsoft ADF documentation, here is a quick answer to this question. The first element of an ArrayVariable is extracted by this expression: @variables('ArrayVariable')[0].

I created a simple ADF pipeline to test this out:


And here the list of variables that I plan to use in this testing ADF pipeline:


1) I Set my ArrayVariable to two characters "A" & "B": @createArray('A', 'B') 
2) Then I extract the first element of this array into ArrayElement_1:=@variables('ArrayVariable')[0]
3) Second element of this array is extracted into ArrayElement_2:=@variables('ArrayVariable')[1]
4) Also, I check an out-of-range behavior of the ArrayVariable by setting ArrayElement_OutOfRange: =@variables('ArrayVariable')[2]

Which gives me the following error message and confirms that I can't make a reference to an element in my array variable that is outside of its boundaries.


Summary:
The use-case of referencing array elements by using square brackets is quite broad. And now it will be easier to use another activity output, like child items of GetMetaData activity task. You either can scan this list of values using ForEachLoop container or in case if you know the exact position of your array elements, then you can just extract those values with square brackets referencing.

Code of this blog post ADF pipeline can be accessed in my personal GitHub repository:
https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/bdata_adf_variable_array_many_elements.json

Happy data adventures!

(2018-Nov-27) When something goes wrong with your data transformation process in Azure Data Factory, the last thing you expect to happen is to lose control and visibility. You want to be notified right after this error event occurs or you need your technical support team to become aware and engage to resolve this issue in a timely manner. So, in this case, an email alert could be a very good tool to provide an appropriate way of communication.



Currently, as I'm writing this blog post, Azure Data Factory doesn't provide an out-of-box functionality to send emails. However, we can use Web Activity task in ADF that would use a custom Azure Logic App to help us to deliver an email notification.

Writing this blog, I give a full credit to this resource - http://microsoft-bitools.blogspot.com/2018/03/add-email-notification-in-azure-data.html by Joost van Rossum from Netherland, that has step-by-step instructions to create ADF Notification Logic Apps in Azure. I have just made a slight modification to this solution to support the functionality of the existing ADF pipeline from my previous blog post - https://server.hoit.asia/2018/11/system-variables-in-azure-data-factory.html

In my current ADF pipeline, I want to implement email notification functionality in two possible cases:
Case A - Email Notification or failed Copy Data task (explicitly on a Failed event)
Case B - Email Notification for logged even with Failed Status (Since the logging Information would have data for both Failed and Succeeded activities, I would like to generate email notifications only for the Failed events at the very end of my pipeline).





So, first, I would need to create a Logic App in my Azure portal that would contain two steps:
1) Trigger: When an HTTP POST request is received 
2) Action: Send Email (Gmail) 


The Trigger would have this JSON body definition:

{
"properties": {
"DataFactoryName": {
"type": "string"
},
"PipelineName": {
"type": "string"
},
"ErrorMessage": {
"type": "string"
},
"EmailTo": {
"type": "string"
}
},
"type": "object"
}

and Gmail Send Email action would further use elements of this data construction.




So, to support my "Case A - Email Notification or failed Copy Data task":
1) I open my pipeline "Load data into SQL" container and add & connect Web Task for a failed event from the "Copy Data Blob to SQL" activity task.

2) Then I copy HTTP Url from the Logic App in the Azure portal and paste its value in the Web task
3) I add a new header with the following values:
     Name: Content-Type
     Value: application/json
4) And then I put a body definition to my POST request and fill all the data elements for the initially defined JSON construction of the Logic App:

{
"DataFactoryName":
"@{pipeline().DataFactory}",
"PipelineName":
"@{pipeline().Pipeline}",
"ErrorMessage":
"@{activity('Copy Data Blob to SQL').Error.message}",
"EmailTo":
"@pipeline().parameters.var_email_address"
}


To support my "Case B - Email Notification for logged even with Failed Status":
1) I open my pipeline "Logging to SQL" container and add another container to check if the logged event has a Failed status by the following conditional expression @equals(split(item(),'|')[12], 'Failed')

2) Then I add two activity tasks within this Condition activity to support email notifications:


3) Similarly to the previous Case A, I copy HTTP Url from the Logic App in the Azure portal and paste its value in the Web task and add the same new header.
4) And then I put a body definition to my POST request and fill all the data elements for the initially defined JSON construction of the Logic App:

{
"DataFactoryName":
"@{pipeline().DataFactory}",
"PipelineName":
"@{pipeline().Pipeline}",
"ErrorMessage":
"@variables('var_activity_error_message')",
"EmailTo":
"@pipeline().parameters.var_email_address"
}

where "var_activity_error_message" variable is defined by this expression @split(item(),'|')[4] in the pipeline "Set Variable" activity task.

As a result, after running a pipeline, along with the expected results I now can receive an email notification if something fails (to test the failed event notification, I temporarily changed the name of a table that I use as a destination in my Copy Data task). 




It's working!
(2018-Nov-20) After working and testing the functionality of variables within Azure Data Factory pipelines, I realized that it's worth to explore existing system variables. That's basically could be my toolbox to collect and store control flow metrics of my pipelines.



Looking at the official Microsoft resource System variables supported by Azure Data Factory you're given with a modest selection of system variables that you can analyze and use both on a pipeline and pipeline trigger level. Currently, you have three ways to monitor Azure Data Factory: visually, with the help of Azure Monitor or using a code to retrieve those metrics.

But here is a case of how I want to monitor a control flow of my pipeline in Azure Data Factory:



This the same data ingestion pipeline from my previous blog post - Story of combining things together that builds a list of files from a Blob storage and then data from those files are copied to a SQL database in Azure. My intention is to collect and store event information of all the completed tasks, such as Get Metadata and Copy Data.

Here is a current list of pipeline system variable in my disposal:
@pipeline().DataFactory - Name of the data factory the pipeline run is running within
@pipeline().Pipeline - Name of the pipeline
@pipeline().RunId - ID of the specific pipeline run
@pipeline().TriggerType - Type of the trigger that invoked the pipeline (Manual, Scheduler)
@pipeline().TriggerId - ID of the trigger that invokes the pipeline
@pipeline().TriggerName - Name of the trigger that invokes the pipeline
@pipeline().TriggerTime - Time when the trigger that invoked the pipeline. The trigger time is the actual fired time, not the scheduled time.

And after digging a bit more and testing pipeline activity, I've discovered additional metrics that I can retrieve on the level of each individual task:
PipelineName, 
JobId, 
ActivityRunId, 
Status, 
StatusCode, 
Output, 
Error, 
ExecutionStartTime, 
ExecutionEndTime, 
ExecutionDetails, 
Duration

Here is my final pipeline in ADF that can populate all these metrics into my custom logging database table:



And this is how I made it work:

1) First I created dbo.adf_pipeline_log table in my SQL database in Azure:


2) Then I used [Append Variable] Activity task as "On Completion" outcome from the "Get Metadata" activity with the following expression to populate a new array type var_logging variable:




var_logging = 
@concat('Metadata Store 01|Copy|',
,pipeline().DataFactory,'|'
,activity('Metadata Store 01').Duration,'|'
,activity('Metadata Store 01').Error,'|'
,activity('Metadata Store 01').ExecutionDetails,'|'
,activity('Metadata Store 01').ExecutionEndTime,'|'
,activity('Metadata Store 01').ExecutionStartTime,'|'
,activity('Metadata Store 01').JobId,'|'
,activity('Metadata Store 01').Output,'|'
,pipeline().Pipeline,'|'
,activity('Metadata Store 01').ActivityRunId,'|'
,activity('Metadata Store 01').Status,'|'
,activity('Metadata Store 01').StatusCode)

where each of the system variables is concatenated and separated with pipe character "|". 

I did a similar thing to populate the very same var_logging variable in the ForEach container where actual data copy operation occurs:



3) And then I used this final tasks to populate my dbo.adf_pipeline_log table using data from the var_logging variable by calling a stored procedure:


Where the whole trick is to split each of the text lines of the var_logging variable into another array of values split by "|" characters. Then by knowing the position of each individual system variables values, I can set them to their appropriate stored procedure parameters / columns in my logging table (e.g. @split(item(),'|')[0] for the ActivityTask).




This provided me a flexibility to see both Completed and Failed activity runs (to test a failed activity I had to temporarily rename the target table of my Copy Data task). I can now read this data and get more additional insights from the SQL Server table. 



Let me know what you think about this, and have a happy data adventure!

(2018-Oct-29) There are only a few sentences in the official Microsoft web page that describe newly introduced activity task (Append Variable) to add a value to an existing array variable defined in Azure Data Factory - Append Variable Activity in Azure Data Factory But it significantly improves your ability to control a workflow of the data transformation activities of your Data Factory pipeline.



Suppose, you have several folders in your Azure Blob storage container, where daily product inventory data from different stores is saved. You need to transfer those files with product stock daily snapshots to a database.



It will be a natural way to get a list of files from all sourcing folders and then load them all into your database.

Technically:
1) We can read metadata of our sourcing folders from the Blob storage
2) Then we can extract all the files names and save them in one queue object
3) And finally, use this file list queue to read and transfer data into a SQL database. 

Let's recreate this use case in our Azure Data Factory pipeline.

1) To get metadata of our sourcing folders, we need to select "Child Items" for the output of our [Get Metadata] activity task:



Which provides a list of sub-folders and files inside the given folder with a list of name and type of each child item. 
Here is an output of this task using a list of files from my first store:


2) Then I need to extract file names. To make it happen I pass the output of the [Get Metadata] activity task to my [ForEach] container. 
Where Items parameter is set to @activity('Metadata Store 01').output.childitems.



And then within this loop container, I actually start using this new [Append Variable] activity task and specifically choose to extract only names and not types from the previous task output set. Please note that this task can only be used for 'Array' type of variables in your pipeline.



Then all that I have to do is to replicate this logic for other sourcing folders and stream the list of the file names to the very same variable. 
In my case it's var_file_list. Don't forget to define all the necessary variables within your pipeline.


3) Once all the file names are extracted into my array variable, then I can use this as a queue for my data load task. My next [ForEach] loop container's Items parameter will be set to this value: @variables('var_file_list')


And internal [Copy Data] activity task within this loop container will receive file names by using individual item names from my variable loop set [@item()].


List of files is appended from each sourcing folders and then all the files are successfully loaded into my Azure SQL database. Just to check a final list of file names, I copied the content of my var_file_list variable into another testing  var_file_list_check variable to validate its content.



Azure Data Factory allows more flexibility with this new [Append Variable] activity task and I do recommend to use it more and more in your data flow pipelines! :-)