10. Patterns: Extract, Transform, Load
Now that we've gained familiarity with a number of frequently used connectors, we're going to look more at several high level patterns that are used in real-world scenarios. In each section, we'll begin by describing the objectives of each pattern. In real-world scenarios, you'll be blending aspects of different patterns together.
The first pattern we'll look at is called ETL (Extract, Transform, Load). These steps are fundamental to most integrations but there are a few important steps that are often implemented around the actual Extract, Transform and Load steps.
Isolate delta data
The first thing to consider is that once you've acquired data from a source, you likely need to be able to filter out data that hasn't changed since the last time you queried it. For example, if you're doing a master data sync of customers, you'll want to exclude customers that haven't changed since the last sync so you're not wasting resources unnecessarily processing unchanged data.
Where possible, you should only query changed (delta data). For example, you may be working with a connector that allows you to filter for data that has changed after a timestamp. As another example, when you're querying data from a database, you might be able to find a field to filter on based on date-last-changed or a flag that is set to indicate whether a record has been sync'd.
In cases where you can't filter at the source, you can use the Reduce 2 Node to remove data that has not changed since the last iteration.
The Reduce 2 Node iterates over the individual records within a document and tests whether a hash of the record matches a previously seen hash. If it does, that record is excluded on the output side of the Reduce 2 Node. Note that Flowgear doesn't actually store the record - it only stores a hash of a record.
Chunking data into pages
It's common that the number of records you can query from a source app or service is greater than the number of records that the target app or service is able to accept at once.
To compensate for this mismatch, we use a For Each Node to chunk the returned data into a pages containing the desired number of records. The For Each Node will fire repeatedly until all data initially given to it has been iterated (i.e. passed along to later stages of the Workflow).
Mapping data
Earlier, we saw an example of how to use QuickMap to transform data from one schema into another. Most ETL-style Workflows will include this step although occasionally using a Script or XSL Transform Node is more efficient.
Tagging & Correlating
Once we've integrated a record into a destination, we'll generally use Key/Values to record it's status. Exactly what 'status' means depends on the context but generally we want to track whether a record integrating successfully or not and we also want to correlate a source record with its target equivalent.
For example, if we're integrating orders from ecommerce, we may have a number order id but after integrating into ERP, we are given back an alphanumeric invoice number. In this case, we'd use Key/Values to store order id in Key and order number in Value.
When we are able to correlate records in this way, we'll be able to update records at a later point in time. A common example of when that might be needed is when the order status changes. The Workflow needs to be able to tell whether it should create a new order or update an existing one - we'll look up the Key (invoice id) of the order to make that decision.
Exercise 12: ETL Pattern Part 1
We'll now apply all of the above concepts while creating a Workflow that reads contact information from a file and sync's the records to an API.
Create a text file on your computer containing the content below. We'll assume the path
c:\temp\etlcontacts.txt.Id,First Name,Last Name,Email,Phone,Company Name,State 1,Dennis,Brown,juanjennings@hotmail.com,001-670-283-7481,Pioneer Pharmaceuticals,West Virginia 2,Joseph,Lucas,sean27@gmail.com,422.631.1017x95303,Ascend Healthcare Partners,New York 3,Robert,Medina,william51@gmail.com,8737236607,Apex Innovations,South Dakota 4,Matthew,Flowers,valenzuelarodney@howell-walters.com,758.052.9923,EchoNet Communications,New York 5,Jacqueline,Carroll,edwardhodge@yahoo.com,203-862-8915x3140,Vertex Construction,Alaska 6,Diana,Harrison,andrea83@yahoo.com,547.430.8570,GreenLeaf Agriculture,Arkansas 7,Mary,Jackson,robertsmith@caldwell.info,001-247-971-0674x837,Insight Marketing Solutions,Indiana 8,Kelly,Richardson,sandra16@johnson.net,001-309-461-5944x9969,Swift Logistics,Pennsylvania 9,Jay,Taylor,larry55@yahoo.com,001-036-973-5467,Skyline Engineering,New York 10,Tina,Hill,savannahrussell@glover.com,0018245617,BlueHorizon Travel,IdahoIn a new Workflow, add
File ReadandFlat File.Configure
File Read.Connectionto use your local DropPoint, setFile Read.Pathtoc:\temp\etlcontacts.txt. If you are not using a Windows PC, configure the step but manually paste the text document intoFile.Contentand when connecting up execution Flows, skipFile Readand connect fromStart.RunNow → Flat File.Set
Flat File.ColumnDelimiterto,, setFlat File.HasColumnNamestoTrue(i.e. toggle on).Connect
File Read.Content → Flat File.FlatFileDocument.Connect
Start.RunNow → File Read,File Read → Flat File.Run the Workflow to confirm that the file content is being parsed into an XML in
Flat File.XmlDocument.If
Flat File.XmlDocumentis empty (i.e.<Document />), this is probably because the line endings are incorrect. By default, Windows uses\r\nfor line endings but if you are using an app other than Notepad to save the file, it may use different line endings. Try changingFlat File.RowDelimiterto\nor\r' if you are having this problem.Add
JSON Convert, connectFlat File → JSON Convert.Set
JSON Convert.ActiontoXmltoJson.Connect
Flat File.XmlDocument → JSON Convert.Xml.Run the Workflow at this point.
Add
Reduce 2, rename it toReduce Prepare, connectJSON Convert → Reduce Prepare.Set
Reduce Prepare.Grouptocontacts.Connect
JSON Convert.JsontoReduce Prepare.SourceDocument.Focus
Reduce Prepare.Pathand selectRow. When you leave the Property it should show the valueDocument.Row[*].When you focussed the
PathProperty, Flowgear checked recent Workflow Logs and found one for theJSON Convert.JsonProperty which is how it was able to present you a tree view of fields.One thing to bear in mind is that if the data changes, you won't see that in the tree view explorer because the original log is loaded into the design. To deal with this, you could clear out the
JSON Convert.JsonProperty and Flowgear would re-load it the next time it needed to generate a tree view.Set
Reduce Prepare.ActiontoReduce.We are going to be using a two-step Reduce just like we'd do in a real-world scenario. This means that we won't 'commit' the reduce action until we know we've successfully integrated the data to the destination.
Set
Reduce Prepare.KeyPathtoId.Idmatches theIdProperty in the contact record. This enables theReduce 2Node to know how to identify not just individual rows but also which field within a row represents the unique key for the record.Run the Workflow a few times. You should see that the
Reduce 2Node returns all rows. This is because no commit action has been run against it yet (we're going to do that at a later stage).Add
For Eachand connectReduce Prepare → For Each.For this exercise, we're going to work with one record at a time so we're leaving
ChunkSizeset to1. If we had used a value greater than1, we'd need to use bulk Connectors downstream fromFor Each.Connect
Reduce Prepare.ReducedDocument → For Each.SourceDocument.Focus
For Each.Pathand selectRowfrom the tree view. When you leave the Property is should show the ValueDocument.Row[*].Just as before, we needed to run the Workflow up to the preceding step so that Flowgear could load a tree view based on data in the Workflow Logs.
Set
For Each.EncapsulationtoParentNodeRun the Workflow again at this point and you should see the
For EachNode firing for each record.Add
Web Request 2, rename it toCreate Contactand connectFor Each.Item (Execution Socket) → Create Contact.Create a Connection for
Create Contactand setReturnFailureResponsestoTruein the Connection.Set
Create Contact.MethodtoPOST.Set
Create Contact.Urltohttps://zorkco.flowgear.net/contacts?auth-key={auth}.Set
Create Contact.RequestHeaderstoContent-Type: application/json.Connect
For Each.Item (Data Socket) → Create Contact.RequestBodyAdd
Create Contact.idandCreate Contact.auth.Connect
For Each.ItemtoCreate Contact.id, hover over the Flow Connector then click thefxand select theIdfield.Set
Create Contact.authtodMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q.Run the Workflow and check that the
Create Contact.StatusCodeis200(to indicate success). If it is not, reviewCreate Contact.ResponseBodyto see what the error is.
Save and run your Workflow, then click Submit Exercise to grade it.
Exercise 13: ETL Pattern Part 2
In the previous exercise, we created a basic ETL. We will now enhance it to track which records we've created and conditionally perform and update instead of an insert when we encounter an existing record. We'll also complete implementation of Reduce 2 to filter out data that has not changed since the Workflow last ran.
Copy the Nodes in the Workflow from the previous exercise into a new Workflow, run the Workflow.
Add
If, rename it to toCheck Create Success, connectCreate Contact → Check Create Success.Connect
Create Contact.ResponseBody → Check Create Success.Value, set the Data Mapping expression toZorkIdon the new Flow Connector.Set
Check Create Success.ExpressiontoValue <> "".Add
Set Key-Value 2, rename toCorrelate Ids, connectCheck Create Success.True → Correlate Ids.Set
Correlate Ids.GrouptocontactsandCorrelate Ids.StatustoSuccess.Connect
For Each.Item → Correlate Ids.Key, set the Data Mapping Expression toId.Connect
Create Contact.ResponseBody → Correlate Ids.Value, set the Data Mapping Expression toZorkId.The
Correlate IdsKey/Value is going to correlate between our original recordidand the id of the record that is created at the web service (ZorkId). We'll now be able to use that correlation to figure out whether we should do an update or an insert of a record. When we need to do an update, we'll be able to use theZorkIdvalue in the Url of the request.Run the Workflow to confirm that the Key/Value associations are working correctly.
Note that in a real-world scenario, we would attach a separate
Set Key-Value 2Node to theFalseOutput ofCheck Create Successand use it to store error information.Drag the
Create ContactNode one block to the right to create a gap, addGet Key-Value 2into the space that is created, rename it toCheck Created.Rewire the Execution Flows from
For Each.Item → Check Created.Missing → Create Contact.The
Check CreatedNode will try to locate a record by theIdof the contact. If there is no record, we will create a new record, otherwise we'll update an existing record.Set
Check Created.Grouptocontacts.Connect
For Each.Item → Check Created.Key, set the Data Mapping Expression toId.Add
Web Request 2aboveCreate Contact, rename it toUpdate Contact.Connect
Check Created.Found → Update Contact.Set
Update Contact.Connectionto the same Connection you created forCreate Contact.Set
Update Contact.MethodtoPUT.With a REST Service,
POSTis generally used to create a new resource whilePUTis used to update an existing resource.Set
Update Contact.Urltohttps://zorkco.flowgear.net/contacts/{id}?auth-key={auth}Add
Update Contact.idandUpdate Contact.auth'.Connect
Check Created.Value → Update Contact.id.The ID that we send to the web service is the
ZorkIdvalue we stored usingSet Key-Value 2, not theIdfield contained in our source data.Set
Update Contact.authtodMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q.Run the Workflow and you should see all contacts following the update path instead of the create path since they've already been created.
Add
Ifto the right ofUpdate Contact, renameCheck Update Success, connectUpdate Contact → Check Update Success.Connect
Update Contact.StatusCode → Check Update Success.Value.Set
Check Update Success.ExpressiontoValue = 200.When we update a contact, we can't look at the returned ID to determine success so instead we check that the response code is
200which is the HTTP response code used to indicate success.Add
Reduce 2to the right ofCorrelate Ids, rename it toReduce Complete.Now that we have integrated records to the web service, we'll finish the Reduce implementation so it marks records as sync'd.
There are two ways that the a record can be processed successfully - either when it is created for the first time or when it is updated.
Connect
Correlate Ids → Reduce CompleteandCheck Update Success.True → Reduce Complete.Set
Reduce Complete.Grouptocontacts.Set
Reduce Complete.ActiontoCommit.Connect
For Each.Item → Reduce Complete.SourceDocument.The Reduce Node works by comparing changes on the source document so when we are using its
Commitaction, it needs the same view of the records it had at the time we ran it'sReduceaction.In our first Reduce Node (
Reduce Prepare), we setPathtoDocument.Rowbecause we needed to show the Node how to identify individual records within the document. For this second Node, we will leavePathempty because the entire document contains only one record (since it is the output from theFor EachNode).Set
Reduce Complete.KeyPathtoId.Run the Workflow twice. On the second iteration, you should see no rows injected into
For Eachand it simply fires itsFinishedOutput.If you change a row in the text file, that row will process as an update. If you add a row, it will process as a create.
For simplicity, we've kept all steps for this solution in a single Workflow. In a real-world scenario, it would be best to refactor sets of steps to sub-Workflows. You can do this by selecting a range of steps by holding
CTRL(Windows) or⌘(Mac) and dragging a selection rectangle, then clicking theRefactor to Sub-Workflowbutton (wand icon).Change the name of one of the contacts in the file for grading purposes.
Save and run your Workflow, then click Submit Exercise to grade it.