10. Patterns: Extract, Transform, Load
Now that we've gained familiarity with a number of frequently used connectors, we're going to look at several high level patterns that are used in real-world scenarios. In each section, we'll begin by describing the objectives of each pattern. In real-world scenarios, you'll be blending aspects of different patterns together.
The first pattern we'll look at is called ETL (Extract, Transform, Load). These steps are fundamental to most integrations, but there are a few important steps that are often implemented around the actual Extract, Transform, and Load steps.
Isolate Delta Data
The first thing to consider is that once you've acquired data from a source, you likely need to be able to filter out data that hasn't changed since the last time you queried it. For example, if you're doing a master data sync of customers, you'll want to exclude customers that haven't changed since the last sync, so that you're not wasting resources unnecessarily processing unchanged data.
Where possible, you should only query changed (delta) data. For example, you may be working with a connector that allows you to filter for data that has changed after a timestamp. As another example, when you're querying data from a database, you may be able to find a field to filter on based on the date-last-changed or a flag that is set to indicate whether a record has been synced.
In cases where you can't filter at the source, you can use the Reduce 2 Node to remove data that has not changed since the last iteration. The Reduce 2 Node iterates over the individual records within a document and tests whether a hash of the record matches a previously seen hash. If it does, that record is excluded on the output side of the Reduce 2 Node. Note that Flowgear doesn't actually store the record; it only stores a hash of a record.
Chunking Data into Pages
It's common that the number of records you can query from a source app or service is greater than the number of records that the target app or service is able to accept at once.
To compensate for this mismatch, we use a For Each Node to chunk the returned data into a pages containing the desired number of records. The For Each Node will fire repeatedly until all data initially given to it has been iterated (i.e. passed along to later stages of the Workflow).
Mapping Data
Earlier, we saw an example of how to use QuickMap to transform data from one schema into another. Most ETL-style Workflows will include this step. However, occasionally using a Script or XSL Transform Node is more efficient.
Tagging & Correlating
Once we've integrated a record into a destination, we'll generally use Key/Values to record its status. Exactly what status means depends on the context, but generally we want to track whether a record is integrating successfully or not. We also want to correlate a source record with its target equivalent.
For example, if we're integrating orders from an e-Commerce source, we may have a number order id but after integrating into ERP, we are given back an alphanumeric invoice number. In this case, we'd use Key/Values to store order id in Key and order number in Value.
When we are able to correlate records in this way, we'll be able to update records at a later point in time. A common example of when that might be needed is when the order status changes. The Workflow needs to be able to tell whether it should create a new order or update an existing one. We'll look up the Key (invoice id) of the order to make that decision.
Exercise 12: ETL Pattern Part 1
We'll now apply all of the above concepts in creating a Workflow that reads contact information from a file and syncs the records to an API.
Create a text file on your computer containing the content below. We'll assume the path
c:\temp\etlcontacts.txt.Id,First Name,Last Name,Email,Phone,Company Name,State 1,Dennis,Brown,juanjennings@hotmail.com,001-670-283-7481,Pioneer Pharmaceuticals,West Virginia 2,Joseph,Lucas,sean27@gmail.com,422.631.1017x95303,Ascend Healthcare Partners,New York 3,Robert,Medina,william51@gmail.com,8737236607,Apex Innovations,South Dakota 4,Matthew,Flowers,valenzuelarodney@howell-walters.com,758.052.9923,EchoNet Communications,New York 5,Jacqueline,Carroll,edwardhodge@yahoo.com,203-862-8915x3140,Vertex Construction,Alaska 6,Diana,Harrison,andrea83@yahoo.com,547.430.8570,GreenLeaf Agriculture,Arkansas 7,Mary,Jackson,robertsmith@caldwell.info,001-247-971-0674x837,Insight Marketing Solutions,Indiana 8,Kelly,Richardson,sandra16@johnson.net,001-309-461-5944x9969,Swift Logistics,Pennsylvania 9,Jay,Taylor,larry55@yahoo.com,001-036-973-5467,Skyline Engineering,New York 10,Tina,Hill,savannahrussell@glover.com,0018245617,BlueHorizon Travel,IdahoIn a new Workflow, add
File ReadandFlat FileNodes.Configure
File Read.Connectionto use your local DropPoint. SetFile Read.Pathtoc:\temp\etlcontacts.txt. If you are not using a Windows PC, configure the step, but manually paste the text document intoFile.Contentand when connecting up execution Flows, skipFile Readand connectStart.RunNow → Flat File.Set
Flat File.ColumnDelimiterto,, andFlat File.HasColumnNamestoTrue(i.e. toggle on).Connect
File Read.Content → Flat File.FlatFileDocument.Connect
Start.RunNow → File ReadandFile Read → Flat File.Run the Workflow to confirm that the file content is being parsed into an XML in
Flat File.XmlDocument.If
Flat File.XmlDocumentis empty (i.e.<Document />), this is probably because the line endings are incorrect. By default, Windows uses\r\nfor line endings, but if you are using an app other than Notepad to save the file, it may use different line endings. Try changingFlat File.RowDelimiterto\nor\rif you are having this problem.Add
JSON ConvertNode. ConnectFlat File → JSON Convert.Set
JSON Convert.ActiontoXmltoJson.Connect
Flat File.XmlDocument → JSON Convert.Xml.Run the Workflow at this point.
Add
Reduce 2Node, and rename it toReduce Prepare. ConnectJSON Convert → Reduce Prepare.Set
Reduce Prepare.Grouptocontacts.Connect
JSON Convert.Json → Reduce Prepare.SourceDocument.Focus
Reduce Prepare.Pathand selectRow. When you leave the Property, it should show the valueDocument.Row[*].When you focused the
PathProperty, Flowgear checked recent Workflow Logs and found one for theJSON Convert.JsonProperty, which is how it was able to present you a tree view of fields.One thing to bear in mind is that if the data changes, you won't see that in the tree view explorer, because the original log is loaded into the design. To deal with this, you could clear out the
JSON Convert.JsonProperty and Flowgear would re-load it the next time it needed to generate a tree view.Set
Reduce Prepare.ActiontoReduce.We are going to be using a two-step Reduce, just like we'd do in a real-world scenario. This means that we won't commit the Reduce action until we know we've successfully integrated the data to the destination.
Set
Reduce Prepare.KeyPathtoId.Idmatches theIdProperty in the contact record. This enables theReduce 2Node to know how to identify not just individual rows, but also which field within a row represents the unique key for the record.Run the Workflow a few times. You should see that the
Reduce 2Node returns all rows. This is because no commit action has been run against it yet. We're going to do that at a later stage.Add
For Each. ConnectReduce Prepare → For Each.For this exercise, we're going to work with one record at a time, so we're leaving
ChunkSizeset to1. If we had used a value greater than1, we'd need to use bulk Connectors downstream fromFor Each.Connect
Reduce Prepare.ReducedDocument → For Each.SourceDocument.Focus
For Each.Path, and selectRowfrom the tree view. When you leave the Property, it should show the ValueDocument.Row[*].Just as before, we needed to run the Workflow up to the preceding step, so that Flowgear can load a tree view based on data in the Workflow Logs.
Set
For Each.EncapsulationtoParentNodeRun the Workflow again at this point, and you should see the
For EachNode firing for each record.Add
Web Request 2, and rename it toCreate Contact. ConnectFor Each.Item (Execution Socket) → Create Contact.Create a Connection for
Create Contactand setReturnFailureResponsestoTruein the Connection.Set
Create Contact.MethodtoPOST.Set
Create Contact.Urltohttps://zorkco.flowgear.net/contacts?auth-key={auth}.Set
Create Contact.RequestHeaderstoContent-Type: application/json.Connect
For Each.Item (Data Socket) → Create Contact.RequestBodyAdd Custom Properties on
Create Contactnamedidandauth.Connect
For Each.ItemtoCreate Contact.id. Hover over the Flow Connector, and then click thefxand select theIdfield.Set
Create Contact.authtodMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q.Run the Workflow and check that the
Create Contact.StatusCodeis200(to indicate success). If it is not, reviewCreate Contact.ResponseBodyto see what the error is.
Save and run your Workflow, and then click Submit Exercise to grade it.
Exercise 13: ETL Pattern Part 2
In the previous exercise, we created a basic ETL. We will now enhance it to track which records we've created and conditionally perform and update instead of an insert when we encounter an existing record. We'll also complete implementation of Reduce 2 to filter out data that has not changed since the Workflow last ran.
Copy the Nodes in the Workflow from the previous exercise into a new Workflow, run the Workflow.
Add
If, rename it to toCheck Create Success, connectCreate Contact → Check Create Success.Connect
Create Contact.ResponseBody → Check Create Success.Value, set the Data Mapping expression toZorkIdon the new Flow Connector.Set
Check Create Success.ExpressiontoValue <> "".Add
Set Key-Value 2, rename toCorrelate Ids, connectCheck Create Success.True → Correlate Ids.Set
Correlate Ids.GrouptocontactsandCorrelate Ids.StatustoSuccess.Connect
For Each.Item → Correlate Ids.Key, and set the Data Mapping Expression toId.Connect
Create Contact.ResponseBody → Correlate Ids.Value, and set the Data Mapping Expression toZorkId.The
Correlate IdsKey/Valueis going to correlate between our original recordidand the ID of the record that is created at the web service (ZorkId). We'll now be able to use that correlation to figure out whether we should do an update or an insert of a record. When we need to do an update, we'll be able to use theZorkIdvalue in the URL of the request.Run the Workflow to confirm that the
Key/Valueassociations are working correctly.Note that in a real-world scenario, we would attach a separate
Set Key-Value 2Node to theFalseOutput ofCheck Create Successand use it to store error information.Drag the
Create ContactNode one block to the right to create a gap. AddGet Key-Value 2Node into the space that is created, and rename it toCheck Created.Connect
For Each.Item → Check Created.Missing → Create Contact.The
Check CreatedNode will try to locate a record by theIdof the contact. If there is no record, we will create a new record. Otherwise, we'll update an existing record.Set
Check Created.Grouptocontacts.Connect
For Each.Item → Check Created.Key, and set the Data Mapping Expression toId.Add
Web Request 2Node aboveCreate Contact, and rename it toUpdate Contact.Connect
Check Created.Found → Update Contact.Set
Update Contact.Connectionto the same Connection you created forCreate Contact.Set
Update Contact.MethodtoPUT.With a REST Service,
POSTis generally used to create a new resource, whilePUTis used to update an existing resource.Set
Update Contact.Urltohttps://zorkco.flowgear.net/contacts/{id}?auth-key={auth}Add Custom Properties on
Update Contactnamedidandauth.Connect
Check Created.Value → Update Contact.id.The ID that we send to the web service is the
ZorkIdvalue we stored usingSet Key-Value 2, not theIdfield contained in our source data.Set
Update Contact.authtodMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q.Run the Workflow and you should see all contacts following the update path instead of the create path since they've already been created.
Add
IfNode to the right ofUpdate Contact, and rename it toCheck Update Success. ConnectUpdate Contact → Check Update Success.Connect
Update Contact.StatusCode → Check Update Success.Value.Set
Check Update Success.ExpressiontoValue = 200.When we update a contact, we can't look at the returned ID to determine success. Instead, we check that the response code is
200, which is the HTTP response code used to indicate success.Add
Reduce 2Node to the right ofCorrelate Ids, and rename it toReduce Complete.Now that we have integrated records to the web service, we'll finish the Reduce implementation so that it marks records as synced.
There are two ways that a record can be processed successfully - either when it is created for the first time, or when it is updated.
Connect
Correlate Ids → Reduce CompleteandCheck Update Success.True → Reduce Complete.Set
Reduce Complete.Grouptocontacts.Set
Reduce Complete.ActiontoCommit.Connect
For Each.Item → Reduce Complete.SourceDocument.The
ReduceNode works by comparing changes on the source document, so when we are using itsCommitaction, it needs the same view of the records that it had at the time we ran itsReduceaction.In our first
ReduceNode (Reduce Prepare), we setPathtoDocument.Rowbecause we needed to show the Node how to identify individual records within the document. For this second Node, we will leavePathempty because the entire document contains only one record (since it is the output from theFor EachNode).Set
Reduce Complete.KeyPathtoId.Run the Workflow twice. On the second iteration, you should see no rows injected into the
For Each, and it simply fires itsFinishedOutput.If you change a row in the text file, that row will process as an update. If you add a row, it will process as a create.
For simplicity, we've kept all steps for this solution in a single Workflow. In a real-world scenario, it would be best to refactor sets of steps to sub-Workflows. You can do this by selecting a range of steps by holding
CTRL(Windows) or⌘(Mac) and dragging a selection rectangle, and then clicking theRefactor to Sub-Workflowbutton (wand icon).Change the name of one of the contacts in the file for grading purposes.
Save and run your Workflow, and then click Submit Exercise to grade it.