V1

13. Patterns: Collector/Processor

All the Workflows we've built so far have processed data synchronously. In other words, the integration objective was completely achieved by the time the Workflow finished executing.

In many cases, processing data may require much more than a few seconds. For example, there may be cases where a large number of records need to be processed, or apps and services we integrate with that are either slow to respond or cannot accept more than one record at a time.

This is usually not an issue if the Workflow is running in the "background" as an Always On Workflow. However, if a third party is waiting for the results of the Workflow, we generally don't want it to run for more than a few seconds. This is especially relevant when the Workflow is being invoked via API, which we will discuss in a later section.

For these cases, rather than processing data in real-time, a collector/processor pattern is appropriate. The collector Workflow accepts data or a task, and stores it (typically in a queue), while a processor Workflow pulls a task from the store (or queue), processes it, and then starts over.

Here are a few benefits to this approach:

  • The consumer (i.e. the app or service that called the collector Workflow), gets an almost immediate response back. However, it cannot always be informed whether the task will succeed.
  • The collector Workflow consumes an Active Workflow slot for a very short period of time, usually milliseconds.
  • The processor Workflow consumes only one Active Workflow slot, unless it is specifically designed to process tasks in parallel.

Exercise 17: Collector Workflow

In this exercise, we'll create a Workflow that loads a request for an invoice payment on to a queue.

  1. Add Variable Bar to a new Workflow.

  2. Add Properties on Variable Bar named invoice, creditor, and value. Set all of these Properties as outputs.

  3. Add Formatter, and connect Start.RunNow → Formatter.

  4. Set Formatter.Escaping to JSON, and Formatter.Expression to the expression below:

    {
        "invoice": "{invoice}",
        "creditor": "{creditor}",
        "value": "{value}"
    }
    
  5. Add Custom Properties on Formatter named invoice, creditor, and value. Create Data Flow Connectors to these Properties from the corresponding names on Variable Bar.

  6. Add Flowgear Queue, and connect Formatter → Flowgear Queue.

  7. Set Flowgear Queue.Queue to payments.

  8. Set Flowgear Queue.Action to Enqueue.

  9. Connect Formatter.Result → Flowgear Queue.Message.

  10. Open Workflow Settings, and enable Allow Run On Demand.

  11. Place some values into the Properties in Variable Bar.

    Run the Workflow to check that it executed successfully.

    Save the Workflow, click Back (<), and then Run Now. Switch from Production to Test by clicking Production ▼.

    Provide a different set of payment information and click Run Workflow. Repeat a few times, so that a number of items are added to the queue.

Return to the Workflow Design once, run the Workflow again, and save it. Then, click Submit Exercise to grade it.

Exercise 18: Processor Workflow

  1. Add Flowgear Dequeue to a new Workflow, and connect Start.RunNow → Flowgear Dequeue.

  2. Set Flowgear Dequeue.Queue to payments.

  3. Add Email Alert, and connect Flowgear Dequeue → Email Alert.

  4. Set Email Alert.Recipients to your email address, and Email Alert.Subject to Payment information.

  5. Connect Flowgear Dequeue.Message → Email Alert.Body.

  6. In the Workflow Settings, under Enabled start modes, enable Always on.

  7. Save the Workflow and click Back (<).

    Note that the Always on option is shown. Normally, this Workflow would be run by enabling Always on against the appropriate Environment.

  8. Click Design to return to the Workflow Design.

    Run the Workflow to check that it executed successfully. Because the Flowgear Dequeue Node is a Trigger Node, the Workflow will idle on that step waiting for the next message. Click Stop to stop the Workflow.

Save and run your Workflow, and then click Submit Exercise to grade it.

More about the Collector/Processor Pattern

Task Storage

In the exercises above, we used the Flowgear Queue Node, which manages configuring a queue automatically. If you require more control over the queue, you could also look at Azure Queue or Amazon SQS Queue.

You could also consider using a table in a database or Redis to hold a list of items.

Note that most queues do not allow individual records to be large. If you do have a large amount of data (more than say, 50KB), you should use a different Node to store the data, and then push an identifier onto the queue that points to that data. The queue is then used to "hold a place" in the line, but the actual data that needs to be processed is stored separately.

A blob store is often used for this purpose. Consider Azure Blob Storage or Amazon S3 Object. When using this approach, don't forget to remove the stored data once you've processed it.

Parallel Processing

If you have a large volume of incoming requests, it may be helpful to process tasks in parallel from the processor Workflow.

To do this, factor the processing step to a separate Workflow, reference that Workflow from the processor Workflow, and click Enable Parallel Mode on the sub-Workflow. See more about Parallel Workflows.

Completion Notification

When a task is running asynchronously, the consumer needs a way to know that processing has completed and what the result was. There are two options here:

(1) Poll Method

  1. Modify the collector Workflow so that it generates an ID for the message. One way to do that is to use the Expression Node with its Expression Property set to Guid.NewGuid(). This will cause a unique identifier to be generated for each message. Store this identifier as part of the message. and return it to the consumer via a Variable Bar Output Property.

  2. In the processor Workflow, use a Key/Value Node to store outcome information keyed on the identifier.

  3. Create a third status query Workflow that takes the identifier as an input and returns the Key/Value record associated with it.

  4. The consumer will then send the task by calling the collector Workflow and receiving back an identifier. It can then poll the status query Workflow at a regular interval to see whether any results are available.

(2) Event Method

  1. As part of the input to the collector Workflow, include a callback URL property, and store that property as part of the data loaded into the queue.

  2. When the processor Workflow has finished processing a record, use Web Request 2 to fire the callback URL, advising the consumer of completion.