Showing posts with label Data. Show all posts
Showing posts with label Data. Show all posts
(2020-Apr-24) Using UNION SQL operator is a very common practice to combine two or more datasets together, it helps to create a single result set from many sourcing ones. 

Azure Data Factory (ADF) Mapping Data Flows has a similar capability to combine two streams of data with the help of Union transformation, where both data streams can be stacked using either columns' names or columns' positions within datasets. Nothing special, it's a usual way of doing things.


Image by Free-Photos from Pixabay 

Recently I had a chance to work on a special case of creating a file where the first two rows were static and contained output table metadata (columns names and column types), assuming that further reading process would be able to explicitly identify each column data type and consume new file correctly in a single run.



Another way to look at this case will be when a table header would provide both columns names and column data types (source: https://data.world/data-society/global-climate-change-data).


As I have mentioned earlier, it's very easy to create this data flow process in the Azure Data Factory.

Sourcing files
My header file will contain only 2 records (column names and column data types):



Incoming data file contained temperature records for some of the major cities around the world and I was only interested to extract Canadian cities temperature information:



ADF data flow transformation steps


(1) Filter the incoming data sets with the following condition: Country == 'Canada'
(2Derived Column row_id with a value equals to 2 (or something greater than 1).
(3Union SourceHeader & updated SourceTemperature datasets:

(4Sort combined datasets by row_id column in ascending order before saving it into an output file.


Output result set
So, even this whole process to create a mapping data flow in Azure Data Factory to union two data files was very trivial, and I definitely didn't try to show off my development skills :-) 

Just an idea that we can physically segregate header metadata with column names and data types in a separate static file and then always attach it to various dynamic incoming data sets with the UNION data transformation, had simplified my process to create the output file that I needed.


Another benefit of using Data Factory Mapping Data Flows!
(2020-Mar-15) Video recording of my webinar session on Using Azure Data Factory Mapping Data Flows to populate Data Vault at the recent PASS Data Architecture Virtual Group meeting.


It was based on my previously published blog post:
https://server.hoit.asia/2019/05/using-azure-data-factory-mapping-data.html 

Link to the PASS Data Architecture Virtual Group webinar:
https://dataarch.pass.org/MeetingDetails.aspx?EventID=14450


The recording is also can be accessed through the PASS recordings:
https://www.pass.org/Learn/Recordings/Listing.aspx?




(2019-Nov-10) Microsoft has recently announced a public preview of the Wrangling data flows in Azure Data Factory (ADF). What used to be called Power Query in Excel, Query Editor in Power BI and Power Query source in SSIS is now available for everyone to explore in ADF data management workspace. 


I'm personally OK with the name of this new feature in Azure Data Factory, "Wrangling Data Flows", as long as I know what it does. 

However, if you want to get much more exposure on data transformation with the help of Power Query I would suggest reading and practice with the original engine in Microsoft Excel - Getting Started with Power Query


Also, Cathrine Wilhelmsen has written a really good post on - Comparing Mapping and Wrangling Data Flows in Azure Data Factory. She even went a bit further at the talking about too many data flows in Microsoft ecosystems, that's for you to decide.

And recently, Gaurav Malhotra and Scott Hanselman presented an Azure Friday video with 
a brief overview of features that are available in Wrangling Data Flows - How to prepare data using wrangling data flows in Azure Data Factory

Dataset and my use-case to Wrangling Data Flows
Dataset
While reading a Microsoft documentation page on Stream processing pipeline with Azure Databricks, I found a reference to a New York City Taxi Data dataset[1]. This dataset contains data about taxi trips in New York City over a four-year period (2010 – 2013). It contains two types of records: Ride data and fare data. Ride data includes trip duration, trip distance, and pickup and dropoff location. The data is stored in CSV format.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8 

The taxi trip data is organized as follows:
- medallion: a permit to operate a yellow taxi cab in New York City.
- hack license: a license to drive the vehicle.
- vender id: e.g., Verifone Transportation Systems (VTS), or Mobile Knowledge Systems
Inc (CMT).
- pickup datetime: start time of the trip, mm-dd-yyyy hh24:mm:ss EDT.
- dropoff datetime: end time of the trip, mm-dd-yyyy hh24:mm:ss EDT.
- passenger count: number of passengers on the trip, default value is one.
- trip time in secs: trip time measured by the taximeter in seconds.
- trip distance: trip distance measured by the taximeter in miles.
- pickup_longitude and pickup_latitude: GPS coordinates at the start of the trip.
- dropoff longitude and dropoff latitude: GPS coordinates at the end of the trip.pickup 

Use-case to Wrangling Data Flows
Brian Donovan and Dan Work from the University of Illinois has pointed out that this dataset "contains a large number of errors. For example, there are several trips where the reported meter distances are significantly shorter than the straight-line distance, violating Euclidean geometry". So, that triggered my interest to add an additional column to this dataset with a straight line distance between two geo-points of pickup and dropoff locations, and that's where I wanted Wrangling Data Flows to help me.


Developing Distance Calculation use-case with Wrangling data flows in Azure Data Factory
Data ingestion
A) I uploaded several months of Trip and Fare taxi data stored in Zip files into my blob storage account:



B) Then I created two data streams to unzip and copy actual CSV data files into my Azure Data Lake Storage Gen2 account:



C) This helped me to populate my ADLS Gen2 storage account containers with the data files for my use-case:



Wrangling Data Flow development
A) In Azure Data Factory I started creating a new data flow and chose it to a Wrangling data flow specifically:

 


B) Then I was asked to define my input and output datasets. Since Wrangling Data Flows doesn't support multiple data files per dataset, I created my TripData dataset and linked it to the first trip_data_1.csv data file. TaxiSink dataset was linked to an empty folder in my storage account. 
Please note Sink Properties that are available to configure, we will get them at the end of my blog post.



C) After finishing all the settings, I now can see a familiar Power Query interface with:
  (1) - Two datasets, my initial referenced single file-based TripData dataset, and new UserQuery, which will be used for my output sink dataset. 
  (2) - At the top, I see a ribbon with buttons to control my table and column transformations
  (3) - Transformation steps (first 4 of them) that were automatically created by ADF itself based on metadata of the sourcing data file. And with this, I do recognize all column names that I expected to see, Kudos to Wranglind Data Flows!



D) Now I'm ready to add an additional column to my output dataset which would show a calculated straight line distance between pickup and dropoff geo-locations.

There are several code samples available that explain geometry and calculations steps to find a distance between two coordinates, however, for this Power Query M language-based code I give full credit to Barbar Raney and her article about the same thing - Use Power Query to Calculate Distance. She even has a sample of the Excel workbook to download with the sample dataset and M query code behind it.

Her Excel formula: 

Was transformed into this M formula:
       Distance_Mi = Number.Acos(Number.Sin([Lat1_Rad]) * Number.Sin([Lat2_Rad]) + Number.Cos([Lat1_Rad]) * Number.Cos([Lat2_Rad]) * Number.Cos([Lon2_Rad]-[Lon1_Rad])) * 3959

And with some modification based on the column names within my dataset, here is a final M query for output dataset to calculate a Distance_Mi column:

 1
2
3
4
5
6
7
8
9
10
11
let
Source = TripData,
ChangedType = Table.TransformColumnTypes(Source, {{"trip_distance", type number}, {"pickup_longitude", type number}, {"pickup_latitude", type number}, {"dropoff_longitude", type number}, {"dropoff_latitude", type number}}),
Lat1_Rad = Table.AddColumn(ChangedType, "Lat1_Rad", each ([pickup_latitude] / 180) * Number.PI),
Lon1_Rad = Table.AddColumn(Lat1_Rad, "Lon1_Rad", each ([pickup_longitude] / 180) * Number.PI),
Lat2_Rad = Table.AddColumn(Lon1_Rad, "Lat2_Rad", each ([dropoff_latitude] / 180) * Number.PI),
Lon2_Rad = Table.AddColumn(Lat2_Rad, "Lon2_Rad", each ([dropoff_longitude] / 180) * Number.PI),
Distance_Mi = Table.AddColumn(Lon2_Rad, "Distance_Mi", each Number.Acos(Number.Sin([Lat1_Rad]) * Number.Sin([Lat2_Rad]) + Number.Cos([Lat1_Rad]) * Number.Cos([Lat2_Rad]) * Number.Cos([Lon2_Rad]-[Lon1_Rad])) * 3959),
RemovedColumns = Table.RemoveColumns(Distance_Mi, {"Lat1_Rad", "Lon1_Rad", "Lat2_Rad", "Lon2_Rad"})
in
RemovedColumns

This helped me to see new columns and distance calculated in miles:


E) I was super excited that my final calculation worked well on my dataset previewed for the first 100 records. And it was only a matter to save this new data flow and execute it in the Azure Data Factory pipeline on complete ~14.5 million records of my input data file. 

As a result, my ADF pipeline run and output data files were created, I saw the new Distance_Mi column was there. However, output files didn't contain any data for all the columns except the first one which I still don't know why. 

I tried to use Sink option to combine all output data files into a single file, or I used the option to enclose all values with quotes, the result was the same, not output data at all, however, the number of records in the output file was correct.


I understand that Wrangling Data Flows feature in Azure Data Factory is still in preview and hope that Microsoft will improve and stabilize its functionality.

Here a brief list of my observations, comments, and issues that I've had after building a wrangling data flow use-case:
1) Azure Data Factory has a couple of pre-build templates for Wrangling Data Flows. The TaxiDemo template built on the same New York City taxi data doesn't work; it fails on the last group by aggregation step in the output dataset.
    a) I can see both TripData and TripFares input datasets in my Query Editor, no issues. I've used trip_data_1.csv and trip_fare_1.csv of the 2013 year for this test.
    b) Output UserQuery fails at the last "Grouped rows" step with a generic error message, "We're sorry, an error occurred during evaluation." However, moving to the "Converting to Decimal Types" step, doesn't generate the error and I can see joined dataset of Trip and Fares data.
    c) The first time I ran the ADF pipeline with the wrangling data flow task within, it failed after executing it for 6 minutes with an error message that I didn't save. Right now I'm rerunning the same ADF pipeline, this time it hasn't failed, it ran for 34 minutes and output file is empty, it only contains column headers (vendor_id, total_passenger_count, total_trip_time_in_secs, total_trip_distance, total_trip_fare). Slow performance might be an issue as well; my compute settings were: AzureIntegrationRuntime, computeType = General, coreCount = 8.
2) Currently not all Power Query M functions are supported for data wrangling in ADF. Complete list of currently supported M functions can be found here - https://docs.microsoft.com/en-us/azure/data-factory/wrangling-data-flow-functions
3) Even if M functions support is good in Wrangling Data Flows, not all supported functions are exposed in the ADF UI. You can't find a simple "Add Custom Column" button in your ADF Query Editor, but only Index and Condition columns are available. In order to add a simple custom column, you will need to write an M query code for this.
4) Which leads to my next observation that it would be better and more stable to create a complete M query code in another editor (either Excel or Power BI) and then re-use this code in the ADF Wrangling editor. This actually worked pretty good for me; my Excel-based M query code worked with no issues in my Wrangling Data Flow.
5) Don't try to add new transformation steps to your input sourcing datasets, they won't be saved. ADF user interface will indicate the new changes are made and you will be prompted to save them, but the next time you open your wrangling data flow, your new changes in the input datasets will be lost. I hope this will be resolved by the Microsoft development team; for now, make all of your transformations changes in the UserQuery output dataset only.
6) Currently, Wrangling Data Flows in Azure Data Factory work with CSV files only, I hope that Parquet or other data file types support will be added in the future.
7) Sink properties of the output dataset support creation of single or partitioned multiple data files (default behavior). I also would like to have the support of multiple files or folder with files for input datasets.
8) Output into a single output file is just a combination of two steps: creating multiple partitioned data files and then merging them together (the merging part takes additional time during your data processing).
9) I still don't understand the purpose of the "Reset" and "Done" buttons in the Wrangling Data Flow query editor. More documentation would help to clarify this.
10) And the creation of the output file is the most important issue for me (no output data for all columns except for the first one).

I'm an optimist by nature and believe that it's better to identify issues and find solutions for them! 
Hope that my comments will be heard as well.

(2019-Oct-27) Creation or deletion files in your Azure Storage account may initiate data ingestion process and will support your event-driven data platform architecture.

 Image by Lars_Nissen_Photoart from Pixabay

Microsoft recently introduced an additional change to file-event triggers within Azure Data Factor. What this change does, it gives you a bit more control for files that you want to be used or not to be used in your data ingestion process.


Before we get into more details on how to use this new "Ignore empty blobs" feature, let's briefly review possible scenarios of using file event triggers in your data processing workflow.

Ingest new data in batches 

A batch of incoming sourcing data may come as a set of files and sometimes those files could be archives with other files within them. Those incoming files don't arrive at once, usually, it's a sequential process and it may have some delays between starting and ending files of this set. In order to orchestrate a synchronized data ingestion process and start loading those files as a complete set, your data provider will generate an additional flag-file (or end-file) to indicate the end of file uploading for a particular batch. And only after receiving this flag-file, your data ingestion process starts.

Ingest new data as it comes 

With this approach, you create your data ingestion framework to react to each incoming data file that may arrive at a particular location. And as soon as the new file arrives, it triggers your process to ingest just this new data file into your data store.

Where this new ADF triggers change is helpful, it places control on this particularly reactive process to load new data files. In case if your data vendor by mistake or other reasons sends you an empty file, then when you set this "Ignore empty blobs" setting to "Yes", your data ingestion pipeline wouldn't be triggered and you don't have to worry about creating a special logic to handle empty sourcing files in your data ingestion pipeline. Empty files won't be loaded at all.


Working with ADF triggers has become a bit easier ! :-)
(2018-Nov-20) After working and testing the functionality of variables within Azure Data Factory pipelines, I realized that it's worth to explore existing system variables. That's basically could be my toolbox to collect and store control flow metrics of my pipelines.



Looking at the official Microsoft resource System variables supported by Azure Data Factory you're given with a modest selection of system variables that you can analyze and use both on a pipeline and pipeline trigger level. Currently, you have three ways to monitor Azure Data Factory: visually, with the help of Azure Monitor or using a code to retrieve those metrics.

But here is a case of how I want to monitor a control flow of my pipeline in Azure Data Factory:



This the same data ingestion pipeline from my previous blog post - Story of combining things together that builds a list of files from a Blob storage and then data from those files are copied to a SQL database in Azure. My intention is to collect and store event information of all the completed tasks, such as Get Metadata and Copy Data.

Here is a current list of pipeline system variable in my disposal:
@pipeline().DataFactory - Name of the data factory the pipeline run is running within
@pipeline().Pipeline - Name of the pipeline
@pipeline().RunId - ID of the specific pipeline run
@pipeline().TriggerType - Type of the trigger that invoked the pipeline (Manual, Scheduler)
@pipeline().TriggerId - ID of the trigger that invokes the pipeline
@pipeline().TriggerName - Name of the trigger that invokes the pipeline
@pipeline().TriggerTime - Time when the trigger that invoked the pipeline. The trigger time is the actual fired time, not the scheduled time.

And after digging a bit more and testing pipeline activity, I've discovered additional metrics that I can retrieve on the level of each individual task:
PipelineName, 
JobId, 
ActivityRunId, 
Status, 
StatusCode, 
Output, 
Error, 
ExecutionStartTime, 
ExecutionEndTime, 
ExecutionDetails, 
Duration

Here is my final pipeline in ADF that can populate all these metrics into my custom logging database table:



And this is how I made it work:

1) First I created dbo.adf_pipeline_log table in my SQL database in Azure:


2) Then I used [Append Variable] Activity task as "On Completion" outcome from the "Get Metadata" activity with the following expression to populate a new array type var_logging variable:




var_logging = 
@concat('Metadata Store 01|Copy|',
,pipeline().DataFactory,'|'
,activity('Metadata Store 01').Duration,'|'
,activity('Metadata Store 01').Error,'|'
,activity('Metadata Store 01').ExecutionDetails,'|'
,activity('Metadata Store 01').ExecutionEndTime,'|'
,activity('Metadata Store 01').ExecutionStartTime,'|'
,activity('Metadata Store 01').JobId,'|'
,activity('Metadata Store 01').Output,'|'
,pipeline().Pipeline,'|'
,activity('Metadata Store 01').ActivityRunId,'|'
,activity('Metadata Store 01').Status,'|'
,activity('Metadata Store 01').StatusCode)

where each of the system variables is concatenated and separated with pipe character "|". 

I did a similar thing to populate the very same var_logging variable in the ForEach container where actual data copy operation occurs:



3) And then I used this final tasks to populate my dbo.adf_pipeline_log table using data from the var_logging variable by calling a stored procedure:


Where the whole trick is to split each of the text lines of the var_logging variable into another array of values split by "|" characters. Then by knowing the position of each individual system variables values, I can set them to their appropriate stored procedure parameters / columns in my logging table (e.g. @split(item(),'|')[0] for the ActivityTask).




This provided me a flexibility to see both Completed and Failed activity runs (to test a failed activity I had to temporarily rename the target table of my Copy Data task). I can now read this data and get more additional insights from the SQL Server table. 



Let me know what you think about this, and have a happy data adventure!

(2018-Oct-29) There are only a few sentences in the official Microsoft web page that describe newly introduced activity task (Append Variable) to add a value to an existing array variable defined in Azure Data Factory - Append Variable Activity in Azure Data Factory But it significantly improves your ability to control a workflow of the data transformation activities of your Data Factory pipeline.



Suppose, you have several folders in your Azure Blob storage container, where daily product inventory data from different stores is saved. You need to transfer those files with product stock daily snapshots to a database.



It will be a natural way to get a list of files from all sourcing folders and then load them all into your database.

Technically:
1) We can read metadata of our sourcing folders from the Blob storage
2) Then we can extract all the files names and save them in one queue object
3) And finally, use this file list queue to read and transfer data into a SQL database. 

Let's recreate this use case in our Azure Data Factory pipeline.

1) To get metadata of our sourcing folders, we need to select "Child Items" for the output of our [Get Metadata] activity task:



Which provides a list of sub-folders and files inside the given folder with a list of name and type of each child item. 
Here is an output of this task using a list of files from my first store:


2) Then I need to extract file names. To make it happen I pass the output of the [Get Metadata] activity task to my [ForEach] container. 
Where Items parameter is set to @activity('Metadata Store 01').output.childitems.



And then within this loop container, I actually start using this new [Append Variable] activity task and specifically choose to extract only names and not types from the previous task output set. Please note that this task can only be used for 'Array' type of variables in your pipeline.



Then all that I have to do is to replicate this logic for other sourcing folders and stream the list of the file names to the very same variable. 
In my case it's var_file_list. Don't forget to define all the necessary variables within your pipeline.


3) Once all the file names are extracted into my array variable, then I can use this as a queue for my data load task. My next [ForEach] loop container's Items parameter will be set to this value: @variables('var_file_list')


And internal [Copy Data] activity task within this loop container will receive file names by using individual item names from my variable loop set [@item()].


List of files is appended from each sourcing folders and then all the files are successfully loaded into my Azure SQL database. Just to check a final list of file names, I copied the content of my var_file_list variable into another testing  var_file_list_check variable to validate its content.



Azure Data Factory allows more flexibility with this new [Append Variable] activity task and I do recommend to use it more and more in your data flow pipelines! :-) 
(2018-July-15) Your case is to create an Excel template with data extracted from a backend SQL Server database; it takes a few minutes and Power Query is a very handy tool to connect/shape and extract your data into a worksheet format. What if you need to create multiple files where your SQL bases data source is only one parameter different (like a report date or a product name).

In my case, I have a list of 7000 geo stations across the globe, with my default extract logic I was able to pull a list of Canadian stations in my Excel file. Just connect to a SQL Server using Power Query, and the job is done. But that is still part of the problem to solve.







What if you need to create individual files for each of the main parameter's values, in my case, it's a country name. Let's explore if we can add some flexibility to the existing Power Query with Excel-based parameters.

1) A new Power Query is created based on my existing Excel table by pressing the "From Table" option:


2) One conversion step is removed from this table in Power Query:


3) A specific cell is selected from sourcing Excel table, which converts the table into a scalar value in Power Query:




4) A Custom function is then created based on the sourcing scalar value:




5) Then an explicit filtering condition in the main data query is replaced with the newly created function:



6) And now I can choose any country from the list that I want and see their geo stations, even Greenland :-)


Happy data adventures!