13 Patterns: Collector/Processor
All the Workflows we've built so far have processed data synchronously. In other words, the integration objective was completely achieved by the time the Workflow finished executing.
In many cases, processing data may require much more than a few seconds. For example, there may be cases where a large number of records need to processed or apps and services we integrate with that are slow to respond or can't accept more than one record at a time.
This is usually not an issue if the Workflow is running in the "background" as an Always On Workflow. However, if a third party is waiting for the results of the Workflow, we generally don't want it to run for more than a few seconds. This is especially relevant when the Workflow is being invoked via API which we will discuss in a later section.
For these cases, rather than processing data in realtime, a collector/processor pattern is appropriate. The collector Workflow accepts data or a task and stores it (typically in a queue) while a processor Workflow pulls a task from the store (or queue), processes it and then starts over.
Here are a few benefits to this approach:
- The consumer (i.e. the app or service that called the collector Workflow) gets an almost immediate response back (although it can't always be informed whether the task will succeed).
- The collector Workflow consumes an Active Workflow slot for a very short period of time (usually milliseconds).
- The processor Workflow consumes only one Active Workflow slot (unless it is specifically designed to process tasks in parallel).
Exercise 17: Collector Workflow
In this exercise we'll create a Workflow that loads a request for an invoice payment on to a queue.
Add
Variable Bar
to a new Workflow.Add
Variable Bar.invoice
,Variable Bar.creditor
andVariable Bar.value
, set all of these Properties as Outputs.Add
Formatter
, connectStart.RunNow → Formatter
.Set
Formatteer.Escaping
toJSON
andFormatter.Expression
to the expression below:{ "invoice": "{invoice}", "creditor": "{creditor}", "value": "{value}" }
Add
Formatter.invoice
,Formatter.creditor
,Formatter.value
, create Data Flow Connectors from the Properties with corresponding names onVariable Bar
.Add
Flowgear Queue
, connectFormatter → Flowgear Queue
.Set
Flowgear Queue.Queue
topayments
.Set
Flowgear Queue.Action
toEnqueue
.Connect
Formatter.Result → Flowgear Queue.Message
.Open Workflow Settings, toggle on
Allow Run On Demand
.Place some values in to the Properties in
Variable Bar
.Run the Workflow to check it's executed successfully.
Save the Workflow, then click Back (
<
), thenRun Now
. Switch fromProduction
toTest
by clicking...
.Provide a different set of payment information and click
Run Workflow
, repeat a few times so that a number of items are added to the queue.
Return to the Workflow Design once, run the Workflow again and save it, then click Submit Exercise
to grade it.
Exercise 18: Processor Workflow
Add
Flowgear Dequeue
to a new Workflow, connectStart.RunNow → Flowgear Dequeue
.Set
Flowgear Dequeue.Queue
topayments
.Add
Email Alert
and connectFlowgear Dequeue → Email Alert
.Set
Email Alert.Recipients
to your email address andEmail Alert.Subject
toPayment information
.Connect
Flowgear Dequeue.Message → Email Alert.Body
.In Workflow Settings, toggle on
Allow Always On
.Save the Workflow and click Back (
<
).Note that the
Always On
option is shown - normally this Workflow would be run by enabling Always On against the appropriate Environment.Click
Design
to return to the Workflow Design.Run the Workflow to check it's executed successfully. Because the
Flowgear Dequeue
Node is a Trigger Node, the Workflow will idle on that step waiting for the next message. ClickStop
to stop the Workflow.
Save your Workflow, then click Submit Exercise
to grade it.
More about the Collector/Processor Pattern
Task Storage
In the exercises above we used the Flowgear Queue Node which manages automatically configuring a queue. If you require more control over the queue, you could also look at Azure Queue or Amazon SQS Queue.
You could also consider using a table in a database or Redis to hold a list of items.
Note that most queues do not allow individual records to be large. If you do have a large amount of data (more than say, 50KB), you should use a different Node to store the data and then just push an identifier in to the queue that points to that data. The queue is then used to "hold a place" in the line but the actual data that needs to be processed is stored separately.
A blob store is often used for this purpose - consider Azure Blob Storage or Amazon S3 Object. When using this approach, don't forget to remove the stored data once you've processed it.
Parallel Processing
If you have a large volume of incoming requests, it may be helpful to process tasks in parallel from the processor Workflow.
To do this, factor the processing step to a separate Workflow, then reference that Workflow from the processor Workflow and click Enable Parallel Mode
on the sub-Workflow. [More about Parallel Workflows](/articles/nodes/workflow#parallel workflows).
Completion Notification
When a task is running asynchronously, the consumer needs a way to know that processing has completed and what the result was. There are two options here:
Poll Method
Modify the collector Workflow so that it generates an ID for the message. One way to do that is to use the
Expression
Node with itsExpression
Property set toGuid.NewGuid()
. This will cause a unique identifier to be generated for each message. Store this identifier as part of the message and return it to the consumer via aVariable Bar
Output Property.In the processor Workflow, use a Key/Value Node to store outcome information keyed on the identifier.
Create a third status query Workflow that takes the identifier as an input and returns the Key/Value record associated with it.
The consumer will then send the task by calling the collector Workflow and receiving back an identifier. It can then poll the status query Workflow at a regular interval to see whether any results are available.
Event Method
As part of the input to the collector Workflow, include a callback URL property, store that property as part of the data loaded into the queue.
When the processor Workflow has finished processing a record, use
Web Request 2
to fire the callback URL, advising the consumer of completion.