10. Patterns: ETL
Now that we've gained familiarity with a number of frequently used connectors, we're going to look more at several high level patterns that are used in real-world scenarios. In each section, we'll begin by describing the objectives of each pattern. In real-world scenarios, you'll be blending aspects of different patterns together.
The first pattern we'll look at is called ETL (Extract, Transform, Load). These steps are fundamental to most integrations but there are a few important steps that are often implemented around that actual Extract, Transform and Load steps.
Isolate delta data
The first thing to consider is that once you've acquired data from a source, you likely need to be able to filter out data that hasn't changed since the last time you queried it. For example, if you're doing a master data sync of customers, you'll want to exclude customers that haven't changed since the last sync so you're not wasting resources unnecessarily processing unchanged data.
Where possible, you should only query changed (delta data). For example, you may be working with a connector that allows you to filter for data that has changed after a timestamp. As another example, when you're querying data from a database, you might be able to find a field to filter on based on date-last-changed or a flag that is set to indicate whether a record has been sync'd.
In cases where you can't filter at the source, you can use the Reduce 2 Node to remove data that has not changed since the last iteration.
The Reduce Node iterates over the individual records within a document and tests whether a hash of the record matches a previously seen hash. If it does, that record is excluded on the output side of the Reduce Node. Note that Flowgear doesn't actually store the record - it only stores a hash of a record.
Chunking data into pages
It's common that the number of records you can query from a source app or service is greater than the number of records that the target app or service is able to accept at once.
To compensate for this mismatch, we use a For Each Node to chunk the returned data into a pages containing a desired number of records. The For Each Node will fire repeatedly until all data initially given to it has been iterated (i.e. passed along to later stages of the Workflow).
Mapping data
Earlier, we saw an example of how to use QuickMap to transform data from one schema into another. Most ETL-style Workflows will include this step although occasionally using a Script or XSL Transform Node is more efficient.
Tagging & Correlating
Once we've integrated a record into a destination, we'll generally use Key/Values to record it's status. Exactly what 'status' means depends on the context but generally we want to track whether a record integrating successfully or not and we also want to correlate a source record with its target equivalent.
For example, if we're integrating orders from ecommerce, we may have a number order id
but after integrating into ERP, we are given back an alphanumeric invoice number
. In this case, we'd use Key/Values to store order id
in Key
and order number
in Value
.
When we are able to correlate records in this way, we'll be able to update records at a later point in time. A common example of when that might be needed is when the order status changes. The Workflow needs to be able to tell whether it should create a new order or update an existing one - we'll look up the Key
(invoice id
) of the order to make that decision.
Exercise 12: ETL Pattern Part 1
We'll now apply all of the above concepts while creating a Workflow that reads contact information from a file and sync's the records to an API.
Create a text file on your computer containing the content below. We'll assume the path
c:\temp\etlcontacts.txt
.Id,First Name,Last Name,Email,Phone,Company Name,State 1,Dennis,Brown,juanjennings@hotmail.com,001-670-283-7481,Pioneer Pharmaceuticals,West Virginia 2,Joseph,Lucas,sean27@gmail.com,422.631.1017x95303,Ascend Healthcare Partners,New York 3,Robert,Medina,william51@gmail.com,8737236607,Apex Innovations,South Dakota 4,Matthew,Flowers,valenzuelarodney@howell-walters.com,758.052.9923,EchoNet Communications,New York 5,Jacqueline,Carroll,edwardhodge@yahoo.com,203-862-8915x3140,Vertex Construction,Alaska 6,Diana,Harrison,andrea83@yahoo.com,547.430.8570,GreenLeaf Agriculture,Arkansas 7,Mary,Jackson,robertsmith@caldwell.info,001-247-971-0674x837,Insight Marketing Solutions,Indiana 8,Kelly,Richardson,sandra16@johnson.net,001-309-461-5944x9969,Swift Logistics,Pennsylvania 9,Jay,Taylor,larry55@yahoo.com,001-036-973-5467,Skyline Engineering,New York 10,Tina,Hill,savannahrussell@glover.com,0018245617,BlueHorizon Travel,Idaho
In a new Workflow, add
File Read
andFlat File
andReduce
Configure
File Read.Connection
to use your local DropPoint, setFile Read.Path
toc:\temp\etlcontacts.txt
. If you are not using a Windows PC, configure the step but manually paste the text document intoFile.Content
and when connecting up execution Flows, skipFile Read
and connect fromStart.RunNow → Flat File
.Set
Flat File.ColumnDelimiter
to,
, setFlat File.HasColumnNames
toTrue
(i.e. toggle on).Connect
File Read.Content → Flat File.FlatFileDocument
.Connect
Start.RunNow → File Read
,File Read → Flat File
.Run the Workflow to confirm that the file content is being parsed into an XML in
Flat File.XmlDocument
.If
Flat File.XmlDocument
is empty (i.e.<Document />
), this is probably because the line endings are incorrect. By default, Windows uses\r\n
for line endings but if you are using an app other than Notepad to save the file, it may use different line endings. Try changingFlat File.RowDelimiter
to\n
or\r
' if you are having this problem.Add
JSON Convert
, connectFlat File → JSON Convert
.Set
JSON Convert.Action
toXmltoJson
.Connect
Flat File.XmlDocument → JSON Convert.Xml
.Run the Workflow at this point.
Add
Reduce 2
, rename it toReduce Prepare
, connectJSON Convert → Reduce Prepare
.Set
Reduce Prepare.Group
tocontacts
.Connect
JSON Convert.Json
toReduce Prepare.SourceDocument
.Focus
Reduce Prepare.Path
and selectRow
. When you leave the Property it should show the valueDocument.Row[*]
.When you focussed the
Path
Property, Flowgear checked recent Workflow Logs and found one for theJSON Convert.Json
Property which is how it was able to present you a tree view of fields.One thing to bear in mind is that if the data changes, you won't see that in the tree view explorer because the original log is loaded into the design. To deal with this, you could clear out the
JSON Convert.Json
Property and Flowgear would re-load it the next time it needed to generate a tree view.Set
Reduce Prepare.Action
toReduce
.We are going to be using a two-step Reduce just like we'd do in a real-world scenario. This means that we won't 'commit' the reduce action until we know we've successfully integrated the data to the destination.
Set
Reduce Prepare.KeyPath
toId
.Id
matches theId
Property in the contact record. This enables theReduce 2
Node to know how to identify not just individual rows but also which field within a row represents the unique key for the record.Run the Workflow a few times. You should see that the
Reduce
Node returns all rows. This is because no commit action has been run against it yet (we're going to do that at a later stage).Add
For Each
and connectReduce Prepare → For Each
.For this exercise, we're going to work with one record at a time so we're leaving
ChunkSize
set to1
. If we had used a value greater than1
, we'd need to use bulk Connectors downstream fromFor Each
.Connect
Reduce Prepare.ReducedDocument → For Each.SourceDocument
.Focus
For Each.Path
and selectRow
from the tree view. When you leave the Property is should show the ValueDocument.Row[*]
.Just as before, we needed to run the Workflow up to the preceding step so that Flowgear could load a tree view based on data in the Workflow Logs.
Set
For Each.Encapsulation
toParentNode
Run the Workflow again at this point and you should see the
For Each
Node firing for each record.Add
Web Request 2
, rename it toCreate Contact
and connectFor Each.Item (Execution Socket) → Create Contact
.Create a Connection for
Create Contact
and setReturnFailureResponses
toTrue
in the Connection.Set
Create Contact.Method
toPOST
.Set
Create Contact.Url
tohttps://zorkco.flowgear.net/contacts?auth-key={auth}
.Set
Create Contact.RequestHeaders
toContent-Type: application/json
.Connect
For Each.Item (Data Socket) → Create Contact.RequestBody
Add
Create Contact.auth
.Set
Create Contact.auth
todMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q
.Run the Workflow and check that the
Create Contact.StatusCode
is200
(to indicate success). If it is not, reviewCreate Contact.ResponseBody
to see what the error is.
Save your Workflow, then click Submit Exercise
to grade it.
Exercise 13: ETL Pattern Part 2
In the previous exercise, we created a basic ETL. We will now enhance it to track which records we've created and conditionally perform and update instead of an insert when we encounter an existing record. We'll also complete implementation of Reduce 2
to filter out data that has not changed since the Workflow last ran.
Copy the Nodes in the Workflow from the previous exercise into a new Workflow, run the Workflow.
Add
If
, rename it to toCheck Create Success
, connectCreate Contact → Check Create Success
.Connect
Create Contact.ResponseBody → Check Create Success.Value
. Set the Data Mapping expression toZorkId
on the new Flow Connector.Set
Check Create Success.Expression
toValue <> ""
.Add
Set Key-Value 2
, rename toMark Created
, connectCheck Create Success.True → Mark Created
.Set
Mark Created.Group
tocontacts
andMark Created.Status
toSuccess
.Connect
For Each.Item → Mark Created.Key
, set the Data Mapping Expression toId
.Connect
Create Contact.ResponseBody → Mark Created.Value
, set the Data Mapping Expression toZorkId
.The
Mark Created
Key/Value is going to correlate between our original recordid
and the id of the record that is created at the web service (ZorkId
). We'll now be able to use that correlation to figure out whether we should do an update or an insert of a record. When we need to do an update, we'll be able to use theZorkId
value in the Url of the request.Run the Workflow to confirm that the Key/Value associations are working correctly.
Note that in a real-world scenario, we would attach a separate
Set Key-Value 2
Node to theFalse
Output ofCheck Create Success
and use it to store error information.Drag the
Create Contact
Node one block to the right to create a gap, addGet Key-Value 2
into the space that is created, rename it toCheck Created
.Rewire the Execution Flows from
For Each.Item → Check Created.Missing → Create Contact
.The
Check Created
Node will try to locate a record by theId
of the contact. If there is no record, we will create a new record, otherwise we'll update an existing record.Set
Check Created.Group
tocontacts
.Connect
For Each.Item → Check Created.Key
and set the Data Mapping Expression toId
.Add
Web Request 2
aboveCreate Contact
, rename it toUpdate Contact
.Connect
Get Key-Value 2.Found → Update Contact
.Set
Update Contact.Connection
to the same Connection you created forCreate Contact
.Set
Update Contact.Method
toPUT
.With a REST Service,
POST
is generally used to create a new resource whilePUT
is used to update an existing resource.Set
Update Contact.Url
tohttps://zorkco.flowgear.net/contacts/{id}?auth-key={auth}
Add
Update Contact.id
andUpdate Contact.auth
'.Connect
Check Created.Value → Update Contact.id
.The ID that we send to the web service is the
ZorkId
value we stored usingSet Key-Value 2
, not theId
field contained in our source data.Set
Update Contact.auth
todMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q
.Run the Workflow and you should see all contacts following the update path instead of the create path since they've already been created.
Add
If
to the right ofUpdate Contact
, renameCheck Update Success
, connectUpdate Contact → Check Update Success
.Connect
Update Contact.StatusCode → Check Update Success.Value
.Set
Check Update Success.Expression
toValue = 200
.When we update a contact, we can't look at the returned ID to determine success so instead we check that the response code is
200
which is the HTTP response code used to indicate success.Add
Reduce 2
to the right ofCorrelate Ids
, rename it toReduce Complete
.Now that we have integrated records to the web service, we'll finish the Reduce implementation so it marks records as sync'd.
There are two ways that the a record can be processed successfully - either when it is created for the first time or when it is updated.
Connect
Correlate Ids → Reduce Complete
andCheck Update Success.True → Reduce Complete
.Set
Reduce Complete.Group
tocontacts
.Set
Reduce Complete.Action
toCommit
.Connect
For Each.Item → Reduce Complete.SourceDocument
.The Reduce Node works by comparing changes on the source document so when we are using its
Commit
action, it needs the same view of the records it had at the time we ran it'sReduce
action.In our first Reduce Node (
Reduce Prepare
), we setPath
toDocument.Row
because we needed to show the Node how to identify individual records within the document. For this second Node, we will leavePath
empty because the entire document contains only one record (since it is the output from theFor Each
Node).Set
Reduce Complete.KeyPath
toid
.Run the Workflow twice. On the second iteration, you should see no rows injected into
For Each
and it simply fires itsFinished
Output.If you change a row in the text file, that row will process as an update. If you add a row, it will process as a create.
For simplicity, we've kept all steps for this solution in a single Workflow. In a real-world scenario, it would be best to refactor sets of steps to sub-Workflows. You can do this by selecting a range of steps by holding
CTRL
(Windows) or⌘
(Mac) and dragging a selection rectangle, then clicking theRefactor to Sub-Workflow
button (wand icon).
Save your Workflow, then click Submit Exercise
to grade it.