Showing posts with label Mapping Data Flows. Show all posts
Showing posts with label Mapping Data Flows. Show all posts

(2020-Oct-05Adding a row number to your dataset could a trivial task. Both ANSI and Spark SQL have the row_number() window function that can enrich your data with a unique number for your whole or partitioned data recordset. 

Recently I had a case of creating a data flow in Azure Data Factory (ADF) where there was a need to add a row number.

Photo by Micah Boerma from Pexels

Instant reaction was to this new additional row number column using a derived column transformation - https://docs.microsoft.com/en-us/azure//data-factory/data-flow-derived-column. However, this was my mistake and ADF notified me that rowNumber() -  data flow function was only available in the Windows transformations - https://docs.microsoft.com/en-us/azure/data-factory/data-flow-window.


OK, I moved on and added a Window Transformation task, which does require at least one sorting column.

However, this was a bit of an issue in my data case. I didn’t need to sort and change the order of my recordset. Transformed dataset needed to be in the same order as its original sourcing data, as it would help to locate a transformed data record with its sourcing sibling record with the help of the new Row Number column.

So I thought that it wouldn't hurt to add just a static constant value column (let’s say with value  = 1 ) and then use this new (1) valued column in my rowNumber() window transformation as a sorting attribute. 

1) First I added a Derived Column transformation with the Column_Value_1 column value set to 1 (or any other constant value of your preference).
2) Then I added a Window transformation with the rowNumber() function and Column_Value_1 as a sorting column.
3) Then I was able to see my output result with preserved order of records and additional Row Number column.



Update:
Joseph Edwards has just pointed out that using the Surrogate Key transformation would achieve the very same result of adding a row number column, and this step doesn't even require sorting columns. I've just tested, and it's working, amazing! I have written this whole blog post just to realize that there is a better way to number rows in the Azure Data Factory flows :-)



(2020-Apr-24) Using UNION SQL operator is a very common practice to combine two or more datasets together, it helps to create a single result set from many sourcing ones. 

Azure Data Factory (ADF) Mapping Data Flows has a similar capability to combine two streams of data with the help of Union transformation, where both data streams can be stacked using either columns' names or columns' positions within datasets. Nothing special, it's a usual way of doing things.


Image by Free-Photos from Pixabay 

Recently I had a chance to work on a special case of creating a file where the first two rows were static and contained output table metadata (columns names and column types), assuming that further reading process would be able to explicitly identify each column data type and consume new file correctly in a single run.



Another way to look at this case will be when a table header would provide both columns names and column data types (source: https://data.world/data-society/global-climate-change-data).


As I have mentioned earlier, it's very easy to create this data flow process in the Azure Data Factory.

Sourcing files
My header file will contain only 2 records (column names and column data types):



Incoming data file contained temperature records for some of the major cities around the world and I was only interested to extract Canadian cities temperature information:



ADF data flow transformation steps


(1) Filter the incoming data sets with the following condition: Country == 'Canada'
(2Derived Column row_id with a value equals to 2 (or something greater than 1).
(3Union SourceHeader & updated SourceTemperature datasets:

(4Sort combined datasets by row_id column in ascending order before saving it into an output file.


Output result set
So, even this whole process to create a mapping data flow in Azure Data Factory to union two data files was very trivial, and I definitely didn't try to show off my development skills :-) 

Just an idea that we can physically segregate header metadata with column names and data types in a separate static file and then always attach it to various dynamic incoming data sets with the UNION data transformation, had simplified my process to create the output file that I needed.


Another benefit of using Data Factory Mapping Data Flows!
(2020-Mar-19) Recently, Microsoft introduced a new Flatten task to the existing set of powerful transformations available in the Azure Data Factory (ADF) Mapping Data Flows - https://docs.microsoft.com/en-us/azure/data-factory/data-flow-flatten.

What this new task does it helps to transform/transpose/flatten your JSON structure into a denormalized flatten datasets that you can upload into a new or existing flat database table.

2020-Mar-26 Update:
Part 2Transforming JSON to CSV with the help of Flatten task in Azure Data Factory - Part 2 (Wrangling data flows) 

I like the analogy of the Transpose function in Excel that helps to rotate your vertical set of data pairs (name : value) into a table with the column names and values for corresponding objects. And when this vertical JSON structural set contains several similar sets (array) then ADF Mapping Data Flows Flatten does a really good job by transforming it into a table with several rows (records).


Let's use this JSON data file as an example

{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" },
{ "id": "1004", "type": "Devil's Food" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}

and create a simple ADF mapping data flow to Flatten this JSON file into a CSV sink dataset.

On a high level my data flow will have 4 components:
1) Source connection to my JSON data file
2) Flatten transformation to transpose my Cake to Toppings
3) Further Flattent transformation to transpose my Cake > Toppings to Batters
4) Sink output Flatten result in a CSV file


(1) Source connection to my JSON data file
Connection to my JSON file is simple, however, it's interesting to see how the output of my consumed JSON file is shown in the Data Preview tab, which shows one row with several array objects.




(2) Flatten transformation to transpose my Cake to Toppings
My next Flatten transformation task transposes the JSON Topping array with 2 objects (id, type) into 7 flatten rows, where JSON Batter objects are now visible as individual arrays.


(3) Further Flattent transformation to transpose my Cake > Toppings to Batters
All 7 records that came out from the previous Flatten transformation task can now be used as input for my further Flatten transformation. This helps to convert (unroll) 2 additional fields from the Batter JSON subset (id, type).


(4) Sink output Flatten result in a CSV file
Data Preview option in my Sink doesn't get changed from its sibling in the previous task, so I thought to challenge the Data Factory and replaced my initial JSON file that contained one object with another file that would contain several similar objects.
[
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" },
{ "id": "1004", "type": "Devil's Food" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
},
{
"id": "0002",
"type": "donut",
"name": "Raised",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
},
{
"id": "0003",
"type": "donut",
"name": "Old Fashioned",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}
]

Source file Data Preview correctly showed me 3 rows. All next Flatten transformation tasks' outputs were tripled in their results, and my final Output file contained all 41 expected records!



Well done, Microsft team! 

I really like this visual way to transform (flatten) a sourcing JSON stream into a CSV file.
(2020-Mar-15) Video recording of my webinar session on Using Azure Data Factory Mapping Data Flows to populate Data Vault at the recent PASS Data Architecture Virtual Group meeting.


It was based on my previously published blog post:
https://server.hoit.asia/2019/05/using-azure-data-factory-mapping-data.html 

Link to the PASS Data Architecture Virtual Group webinar:
https://dataarch.pass.org/MeetingDetails.aspx?EventID=14450


The recording is also can be accessed through the PASS recordings:
https://www.pass.org/Learn/Recordings/Listing.aspx?




(2019-May-24) Data Flow as a data transformation engine has been introduced to the Microsoft Azure Data Factory (ADF) last year as a private feature preview. This privacy restriction has been lifted during the last Microsoft Build conference and Data Flow feature has become a public preview component of the ADF.

2020-Mar-15 update: a video recording of my webinar session on Using Azure Data Factory Mapping Data Flows to populate Data Vault at the recent PASS Data Architecture Virtual Group meeting - https://server.hoit.asia/2020/03/using-azure-data-factory-mapping-data.html 

There are many different use-case scenarios that can be covered by Data Flows, considering that Data Flows in SQL Integration Service (SSIS) projects are still playing a big role to fulfill Extracting-Loading-Transforming (ETL) patterns for your data.

In this blog post, I will share my experience of populating a Data Vault repository by using Data Flows in Azure Data Factory. 


First, I need to create my Data Vault model. 

Data Model

For this exercise I've taken a date warehouse sample of AdventureWorksDW2017 SQL Server database, twhere I limited a set of entities to a small set of dimension tables (DimProduct, DimCustomer, DimGeography) and one fact table (FactInternateSales).




From the existing list of tables in my DW instance I came up with the list of entities for my new alternative Data Vault model:

To make a full transition from the existing  DW model to an alternative Data Vault I removed all Surrogate Keys and other attributes that are only necessary to support Kimball data warehouse methodology. Also, I needed to add necessary Hash keys to all my Hub, Link and Satellite tables. The target environment for my Data Vault would be SQL Azure database and I decided to use a built-in crc32 function of the Mapping Data Flow to calculate hash keys (HK) of my business data sourcing keys and composite hash keys of satellite tables attributes (HDIFF).



Sourcing Data
To test my Data Vault loading process in ADF Data Flows I extracted data from the existing Adventure Works SQL Server database instance and excluded DW related data columns. This is an example of my Product flat file in my Azure blob storage with the following list of columns:
       ProductAlternateKey
      ,WeightUnitMeasureCode
      ,SizeUnitMeasureCode
      ,EnglishProductName
      ,StandardCost
      ,FinishedGoodsFlag
      ,Color
      ,SafetyStockLevel
      ,ReorderPoint
      ,ListPrice
      ,Size
      ,SizeRange
      ,Weight
      ,ModelName



Also, I created a set of views to show the latest version of the satellite table that I want to collide with sourcing data files. Examples of those views for SAT_InternetSales and SAT_Product tables:

CREATE VIEW [dbo].[vw_latest_sat_internetsales] 
AS
SELECT [link_internetsales_hk],
[sat_internetsales_hdiff]
FROM (SELECT [link_internetsales_hk],
[sat_internetsales_hdiff],
Row_number()
OVER(
partition BY SAT.link_internetsales_hk
ORDER BY SAT.load_ts DESC) AS row_id
FROM [dbo].[sat_internetsales] SAT) data
WHERE row_id = 1;

CREATE VIEW [dbo].[vw_latest_sat_product]
AS
SELECT [hub_product_hk],
[sat_product_hdiff]
FROM (SELECT [hub_product_hk],
[sat_product_hdiff],
[load_ts],
Max(load_ts)
OVER (
partition BY sat.hub_product_hk) AS latest_load_ts
FROM [dbo].[sat_product] sat) data
WHERE load_ts = latest_load_ts;

Data Integration
If you're new to building data integration projects in the Microsoft Data Platform (SSIS, Azure Data Factory, others), I would suggest this approach:
   1) Learn more about different components of the existing data integration tools and their capabilities.
   2) Go back to your real use case scenario and describe the business logic of your data integration/data transformation steps.
   3) Then merge those business rules with corresponding technical components in order to come up with a technical architecture of your business solution.

Mapping Data Flows
Now, after preparing all of this, I'm ready to create Mapping Data Flows in Azure Data Factory.

Example of the Product data flow:


1) Source (Source Product): connection to the Product CSV data file in my blob storage account
2) Derived Columns (Hash Columns): to calculate hash columns and load timestamps.


HUB_Product_HK  crc32(ProductAlternateKey)
SAT_Product_HDIFF  crc32(ProductAlternateKey, WeightUnitMeasureCode, SizeUnitMeasureCode, EnglishProductName, StandardCost, FinishedGoodsFlag, Color, SafetyStockLevel, ReorderPoint, ListPrice, Size, SizeRange, Weight, ModelName) 

3) Exists transformation (CheckNewProduct): Filtering rows that are not matching the following criteria: move forward records from the Sourcing data files with records that are new to the HUB_Product table:


4) Select transformation: to select only 3 columns (ProductAlternateKey, HUB_Product_HK, LOAD_TS
5) Sink: These 3 columns are then sinked my target HUB_Product data vault table.

Since Product data file is used both to load data to HUB_Product and SAT_Product (Step 1 & 2), a separate data stream is created to populate data the SAT_Product as well.
3) Exists transformation (CheckNewSatProduct): Filtering rows that are not matching the following criteria: move forward records from the Sourcing data files that are new to the SAT_Product table:

4) Select transformation: to select columns for my next Sink Step (HUB_Product_HK, WeightUnitMeasureCode, SizeUnitMeasureCode, EnglishProductName, StandardCost, FinishedGoodsFlag, Color, SafetyStockLevel, ReorderPoint, ListPrice, Size, SizeRange, Weight, ModelName, SAT_Product_HDIFF, LOAD_TS
5) Sink: These 18 columns are then sinked my target SAT_Product data vault table.

Similarly, I created another Data Flow task to load Fact Internet Sales data (dvs_internetsales_big.csv) and populate LINK_InternetSales & SAT_InternetSales tables:

where LINK_InternetSales_HK & SAT_InternetSales_HDIFF  hash keys were used to identify new data for the data vault tables.

This case is solved, and now I can use Azure Data Factory Mapping Data Flows to load data into a data vault repository.

Please let me know if you have further questions or you can get a closer look into the code of those developed pipelines and mapping data flows in my GitHub repository:
https://github.com/NrgFly/Azure-DataFactory/tree/master/Samples

Happy Data Adventures!

(2019-May-27) update:
After Mark Kromer had included this original blog post into his list of ADF data flows patterns (https://github.com/kromerm/adfdataflowdocs/blob/master/patterns/adf-data-flow-patterns.md), there was a comment about different hash functions that are available in the ADF Mapping Data Flows (CRC32, MD5, SHA1, SHA2). At my company production environment, we use text fields and MD5 function to populate them with hash values using Azure Databricks workflow.

So for my mapping data flows exercise, I decided to change hash columns from Bigint to Varchar and replace CRC32 with SHA2(512). It was interesting to see my hash key columns with 128 characters values at the end, but it still worked. ADF Mapping Data Flows proved to be a solid environment for data integration projects to load data into a Data Vault environment!