10. Patterns: ETL

Now that we've gained familiarity with a number of frequently used connectors, we're going to look more at several high level patterns that are used in real-world scenarios. In each section, we'll begin by describing the objectives of each pattern. In real-world scenarios, you'll be blending aspects of different patterns together.

The first pattern we'll look at is called ETL (Extract, Transform, Load). These steps are fundamental to most integrations but there are a few important steps that are often implemented around that actual Extract, Transform and Load steps.

Isolate delta data

The first thing to consider is that once you've acquired data from a source, you likely need to be able to filter out data that hasn't changed since the last time you queried it. For example, if you're doing a master data sync of customers, you'll want to exclude customers that haven't changed since the last sync so you're not wasting resources unnecessarily processing unchanged data.

Where possible, you should only query changed (delta data). For example, you may be working with a connector that allows you to filter for data that has changed after a timestamp. As another example, when you're querying data from a database, you might be able to find a field to filter on based on date-last-changed or a flag that is set to indicate whether a record has been sync'd.

In cases where you can't filter at the source, you can use the Reduce 2 Node to remove data that has not changed since the last iteration.

The Reduce Node iterates over the individual records within a document and tests whether a hash of the record matches a previously seen hash. If it does, that record is excluded on the output side of the Reduce Node. Note that Flowgear doesn't actually store the record - it only stores a hash of a record.

Chunking data into pages

It's common that the number of records you can query from a source app or service is greater than the number of records that the target app or service is able to accept at once.

To compensate for this mismatch, we use a For Each Node to chunk the returned data into a pages containing a desired number of records. The For Each Node will fire repeatedly until all data initially given to it has been iterated (i.e. passed along to later stages of the Workflow).

Mapping data

Earlier, we saw an example of how to use QuickMap to transform data from one schema into another. Most ETL-style Workflows will include this step although occasionally using a Script or XSL Transform Node is more efficient.

Tagging & Correlating

Once we've integrated a record into a destination, we'll generally use Key/Values to record it's status. Exactly what 'status' means depends on the context but generally we want to track whether a record integrating successfully or not and we also want to correlate a source record with its target equivalent.

For example, if we're integrating orders from ecommerce, we may have a number order id but after integrating into ERP, we are given back an alphanumeric invoice number. In this case, we'd use Key/Values to store order id in Key and order number in Value.

When we are able to correlate records in this way, we'll be able to update records at a later point in time. A common example of when that might be needed is when the order status changes. The Workflow needs to be able to tell whether it should create a new order or update an existing one - we'll look up the Key (invoice id) of the order to make that decision.

Exercise 12: ETL Pattern Part 1

We'll now apply all of the above concepts while creating a Workflow that reads contact information from a file and sync's the records to an API.

  1. Create a text file on your computer containing the content below. We'll assume the path c:\temp\etlcontacts.txt.

     Id,First Name,Last Name,Email,Phone,Company Name,State
     1,Dennis,Brown,juanjennings@hotmail.com,001-670-283-7481,Pioneer Pharmaceuticals,West Virginia
     2,Joseph,Lucas,sean27@gmail.com,422.631.1017x95303,Ascend Healthcare Partners,New York
     3,Robert,Medina,william51@gmail.com,8737236607,Apex Innovations,South Dakota
     4,Matthew,Flowers,valenzuelarodney@howell-walters.com,758.052.9923,EchoNet Communications,New York
     5,Jacqueline,Carroll,edwardhodge@yahoo.com,203-862-8915x3140,Vertex Construction,Alaska
     6,Diana,Harrison,andrea83@yahoo.com,547.430.8570,GreenLeaf Agriculture,Arkansas
     7,Mary,Jackson,robertsmith@caldwell.info,001-247-971-0674x837,Insight Marketing Solutions,Indiana
     8,Kelly,Richardson,sandra16@johnson.net,001-309-461-5944x9969,Swift Logistics,Pennsylvania
     9,Jay,Taylor,larry55@yahoo.com,001-036-973-5467,Skyline Engineering,New York
     10,Tina,Hill,savannahrussell@glover.com,0018245617,BlueHorizon Travel,Idaho
    
  2. In a new Workflow, add File Read and Flat File and Reduce

  3. Configure File Read.Connection to use your local DropPoint, set File Read.Path to c:\temp\etlcontacts.txt. If you are not using a Windows PC, configure the step but manually paste the text document into File.Content and when connecting up execution Flows, skip File Read and connect from Start.RunNow → Flat File.

  4. Set Flat File.ColumnDelimiter to ,, set Flat File.HasColumnNames to True (i.e. toggle on).

  5. Connect File Read.Content → Flat File.FlatFileDocument.

  6. Connect Start.RunNow → File Read, File Read → Flat File.

    Run the Workflow to confirm that the file content is being parsed into an XML in Flat File.XmlDocument.

    If Flat File.XmlDocument is empty (i.e. <Document />), this is probably because the line endings are incorrect. By default, Windows uses \r\n for line endings but if you are using an app other than Notepad to save the file, it may use different line endings. Try changing Flat File.RowDelimiter to \n or \r' if you are having this problem.

  7. Add JSON Convert, connect Flat File → JSON Convert.

  8. Set JSON Convert.Action to XmltoJson.

  9. Connect Flat File.XmlDocument → JSON Convert.Xml.

    Run the Workflow at this point.

  10. Add Reduce 2, rename it to Reduce Prepare, connect JSON Convert → Reduce Prepare.

  11. Set Reduce Prepare.Group to contacts.

  12. Connect JSON Convert.Json to Reduce Prepare.SourceDocument.

  13. Focus Reduce Prepare.Path and select Row. When you leave the Property it should show the value Document.Row[*].

    When you focussed the Path Property, Flowgear checked recent Workflow Logs and found one for the JSON Convert.Json Property which is how it was able to present you a tree view of fields.

    One thing to bear in mind is that if the data changes, you won't see that in the tree view explorer because the original log is loaded into the design. To deal with this, you could clear out the JSON Convert.Json Property and Flowgear would re-load it the next time it needed to generate a tree view.

  14. Set Reduce Prepare.Action to Reduce.

    We are going to be using a two-step Reduce just like we'd do in a real-world scenario. This means that we won't 'commit' the reduce action until we know we've successfully integrated the data to the destination.

  15. Set Reduce Prepare.KeyPath to Id.

    Id matches the Id Property in the contact record. This enables the Reduce 2 Node to know how to identify not just individual rows but also which field within a row represents the unique key for the record.

    Run the Workflow a few times. You should see that the Reduce Node returns all rows. This is because no commit action has been run against it yet (we're going to do that at a later stage).

  16. Add For Each and connect Reduce Prepare → For Each.

    For this exercise, we're going to work with one record at a time so we're leaving ChunkSize set to 1. If we had used a value greater than 1, we'd need to use bulk Connectors downstream from For Each.

  17. Connect Reduce Prepare.ReducedDocument → For Each.SourceDocument.

  18. Focus For Each.Path and select Row from the tree view. When you leave the Property is should show the Value Document.Row[*].

    Just as before, we needed to run the Workflow up to the preceding step so that Flowgear could load a tree view based on data in the Workflow Logs.

  19. Set For Each.Encapsulation to ParentNode

    Run the Workflow again at this point and you should see the For Each Node firing for each record.

  20. Add Web Request 2, rename it to Create Contact and connect For Each.Item (Execution Socket) → Create Contact.

  21. Create a Connection for Create Contact and set ReturnFailureResponses to True in the Connection.

  22. Set Create Contact.Method to POST.

  23. Set Create Contact.Url to https://zorkco.flowgear.net/contacts?auth-key={auth}.

  24. Set Create Contact.RequestHeaders to Content-Type: application/json.

  25. Connect For Each.Item (Data Socket) → Create Contact.RequestBody

  26. Add Create Contact.auth.

  27. Set Create Contact.auth to dMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q.

    Run the Workflow and check that the Create Contact.StatusCode is 200 (to indicate success). If it is not, review Create Contact.ResponseBody to see what the error is.

Save your Workflow, then click Submit Exercise to grade it.

Exercise 13: ETL Pattern Part 2

In the previous exercise, we created a basic ETL. We will now enhance it to track which records we've created and conditionally perform and update instead of an insert when we encounter an existing record. We'll also complete implementation of Reduce 2 to filter out data that has not changed since the Workflow last ran.

  1. Copy the Nodes in the Workflow from the previous exercise into a new Workflow, run the Workflow.

  2. Add If, rename it to to Check Create Success, connect Create Contact → Check Create Success.

  3. Connect Create Contact.ResponseBody → Check Create Success.Value. Set the Data Mapping expression to ZorkId on the new Flow Connector.

  4. Set Check Create Success.Expression to Value <> "".

  5. Add Set Key-Value 2, rename to Mark Created, connect Check Create Success.True → Mark Created.

  6. Set Mark Created.Group to contacts and Mark Created.Status to Success.

  7. Connect For Each.Item → Mark Created.Key, set the Data Mapping Expression to Id.

  8. Connect Create Contact.ResponseBody → Mark Created.Value, set the Data Mapping Expression to ZorkId.

    The Mark Created Key/Value is going to correlate between our original record id and the id of the record that is created at the web service (ZorkId). We'll now be able to use that correlation to figure out whether we should do an update or an insert of a record. When we need to do an update, we'll be able to use the ZorkId value in the Url of the request.

    Run the Workflow to confirm that the Key/Value associations are working correctly.

    Note that in a real-world scenario, we would attach a separate Set Key-Value 2 Node to the False Output of Check Create Success and use it to store error information.

  9. Drag the Create Contact Node one block to the right to create a gap, add Get Key-Value 2 into the space that is created, rename it to Check Created.

  10. Rewire the Execution Flows from For Each.Item → Check Created.Missing → Create Contact.

    The Check Created Node will try to locate a record by the Id of the contact. If there is no record, we will create a new record, otherwise we'll update an existing record.

  11. Set Check Created.Group to contacts.

  12. Connect For Each.Item → Check Created.Key and set the Data Mapping Expression to Id.

  13. Add Web Request 2 above Create Contact, rename it to Update Contact.

  14. Connect Get Key-Value 2.Found → Update Contact.

  15. Set Update Contact.Connection to the same Connection you created for Create Contact.

  16. Set Update Contact.Method to PUT.

    With a REST Service, POST is generally used to create a new resource while PUT is used to update an existing resource.

  17. Set Update Contact.Url to https://zorkco.flowgear.net/contacts/{id}?auth-key={auth}

  18. Add Update Contact.id and Update Contact.auth'.

  19. Connect Check Created.Value → Update Contact.id.

    The ID that we send to the web service is the ZorkId value we stored using Set Key-Value 2, not the Id field contained in our source data.

  20. Set Update Contact.auth to dMFROUiQadVEWaHZe8qTEHM9TieeyGo7PBhY9Ln1TwKCsGu-sfgnJUh4OKIrBZLplNtXKWdcJDdqgHjVScr24Q.

    Run the Workflow and you should see all contacts following the update path instead of the create path since they've already been created.

  21. Add If to the right of Update Contact, rename Check Update Success, connect Update Contact → Check Update Success.

  22. Connect Update Contact.StatusCode → Check Update Success.Value.

  23. Set Check Update Success.Expression to Value = 200.

    When we update a contact, we can't look at the returned ID to determine success so instead we check that the response code is 200 which is the HTTP response code used to indicate success.

  24. Add Reduce 2 to the right of Correlate Ids, rename it to Reduce Complete.

    Now that we have integrated records to the web service, we'll finish the Reduce implementation so it marks records as sync'd.

    There are two ways that the a record can be processed successfully - either when it is created for the first time or when it is updated.

  25. Connect Correlate Ids → Reduce Complete and Check Update Success.True → Reduce Complete.

  26. Set Reduce Complete.Group to contacts.

  27. Set Reduce Complete.Action to Commit.

  28. Connect For Each.Item → Reduce Complete.SourceDocument.

    The Reduce Node works by comparing changes on the source document so when we are using its Commit action, it needs the same view of the records it had at the time we ran it's Reduce action.

    In our first Reduce Node (Reduce Prepare), we set Path to Document.Row because we needed to show the Node how to identify individual records within the document. For this second Node, we will leave Path empty because the entire document contains only one record (since it is the output from the For Each Node).

  29. Set Reduce Complete.KeyPath to id.

    Run the Workflow twice. On the second iteration, you should see no rows injected into For Each and it simply fires its Finished Output.

    If you change a row in the text file, that row will process as an update. If you add a row, it will process as a create.

    For simplicity, we've kept all steps for this solution in a single Workflow. In a real-world scenario, it would be best to refactor sets of steps to sub-Workflows. You can do this by selecting a range of steps by holding CTRL (Windows) or (Mac) and dragging a selection rectangle, then clicking the Refactor to Sub-Workflow button (wand icon).

Save your Workflow, then click Submit Exercise to grade it.