13 Patterns: Collector/Processor

All the Workflows we've built so far have processed data synchronously. In other words, the integration objective was completely achieved by the time the Workflow finished executing.

In many cases, processing data may require much more than a few seconds. For example, there may be cases where a large number of records need to processed or apps and services we integrate with that are slow to respond or can't accept more than one record at a time.

This is usually not an issue if the Workflow is running in the "background" as an Always On Workflow. However, if a third party is waiting for the results of the Workflow, we generally don't want it to run for more than a few seconds. This is especially relevant when the Workflow is being invoked via API which we will discuss in a later section.

For these cases, rather than processing data in realtime, a collector/processor pattern is appropriate. The collector Workflow accepts data or a task and stores it (typically in a queue) while a processor Workflow pulls a task from the store (or queue), processes it and then starts over.

Here are a few benefits to this approach:

  • The consumer (i.e. the app or service that called the collector Workflow) gets an almost immediate response back (although it can't always be informed whether the task will succeed).
  • The collector Workflow consumes an Active Workflow slot for a very short period of time (usually milliseconds).
  • The processor Workflow consumes only one Active Workflow slot (unless it is specifically designed to process tasks in parallel).

Exercise 17: Collector Workflow

In this exercise we'll create a Workflow that loads a request for an invoice payment on to a queue.

  1. Add Variable Bar to a new Workflow.

  2. Add Variable Bar.invoice, Variable Bar.creditor and Variable Bar.value, set all of these Properties as Outputs.

  3. Add Formatter, connect Start.RunNow → Formatter.

  4. Set Formatteer.Escaping to JSON and Formatter.Expression to the expression below:

     {
     	"invoice": "{invoice}",
     	"creditor": "{creditor}",
     	"value": "{value}"
     }
    
  5. Add Formatter.invoice, Formatter.creditor, Formatter.value, create Data Flow Connectors from the Properties with corresponding names on Variable Bar.

  6. Add Flowgear Queue, connect Formatter → Flowgear Queue.

  7. Set Flowgear Queue.Queue to payments.

  8. Set Flowgear Queue.Action to Enqueue.

  9. Connect Formatter.Result → Flowgear Queue.Message.

  10. Open Workflow Settings, toggle on Allow Run On Demand.

  11. Place some values in to the Properties in Variable Bar.

    Run the Workflow to check it's executed successfully.

    Save the Workflow, then click Back (<), then Run Now. Switch from Production to Test by clicking ....

    Provide a different set of payment information and click Run Workflow, repeat a few times so that a number of items are added to the queue.

Return to the Workflow Design once, run the Workflow again and save it, then click Submit Exercise to grade it.

Exercise 18: Processor Workflow

  1. Add Flowgear Dequeue to a new Workflow, connect Start.RunNow → Flowgear Dequeue.

  2. Set Flowgear Dequeue.Queue to payments.

  3. Add Email Alert and connect Flowgear Dequeue → Email Alert.

  4. Set Email Alert.Recipients to your email address and Email Alert.Subject to Payment information.

  5. Connect Flowgear Dequeue.Message → Email Alert.Body.

  6. In Workflow Settings, toggle on Allow Always On.

  7. Save the Workflow and click Back (<).

    Note that the Always On option is shown - normally this Workflow would be run by enabling Always On against the appropriate Environment.

  8. Click Design to return to the Workflow Design.

    Run the Workflow to check it's executed successfully. Because the Flowgear Dequeue Node is a Trigger Node, the Workflow will idle on that step waiting for the next message. Click Stop to stop the Workflow.

Save your Workflow, then click Submit Exercise to grade it.

More about the Collector/Processor Pattern

Task Storage

In the exercises above we used the Flowgear Queue Node which manages automatically configuring a queue. If you require more control over the queue, you could also look at Azure Queue or Amazon SQS Queue.

You could also consider using a table in a database or Redis to hold a list of items.

Note that most queues do not allow individual records to be large. If you do have a large amount of data (more than say, 50KB), you should use a different Node to store the data and then just push an identifier in to the queue that points to that data. The queue is then used to "hold a place" in the line but the actual data that needs to be processed is stored separately.

A blob store is often used for this purpose - consider Azure Blob Storage or Amazon S3 Object. When using this approach, don't forget to remove the stored data once you've processed it.

Parallel Processing

If you have a large volume of incoming requests, it may be helpful to process tasks in parallel from the processor Workflow.

To do this, factor the processing step to a separate Workflow, then reference that Workflow from the processor Workflow and click Enable Parallel Mode on the sub-Workflow. [More about Parallel Workflows](/articles/nodes/workflow#parallel workflows).

Completion Notification

When a task is running asynchronously, the consumer needs a way to know that processing has completed and what the result was. There are two options here:

Poll Method

  1. Modify the collector Workflow so that it generates an ID for the message. One way to do that is to use the Expression Node with its Expression Property set to Guid.NewGuid(). This will cause a unique identifier to be generated for each message. Store this identifier as part of the message and return it to the consumer via a Variable Bar Output Property.

  2. In the processor Workflow, use a Key/Value Node to store outcome information keyed on the identifier.

  3. Create a third status query Workflow that takes the identifier as an input and returns the Key/Value record associated with it.

The consumer will then send the task by calling the collector Workflow and receiving back an identifier. It can then poll the status query Workflow at a regular interval to see whether any results are available.

Event Method

  1. As part of the input to the collector Workflow, include a callback URL property, store that property as part of the data loaded into the queue.

  2. When the processor Workflow has finished processing a record, use Web Request 2 to fire the callback URL, advising the consumer of completion.