13 Patterns: Collector/Processor
All the Workflows we've built so far have processed data synchronously. In other words, the integration objective was completely achieved by the time the Workflow finished executing.
In many cases, processing data may require much more than a few seconds. For example, there may be cases where a large number of records need to processed or apps and services we integrate with that are slow to respond or can't accept more than one record at a time.
This is usually not an issue if the Workflow is running in the "background" as an Always On Workflow. However, if a third party is waiting for the results of the Workflow, we generally don't want it to run for more than a few seconds. This is especially relevant when the Workflow is being invoked via API which we will discuss in a later section.
For these cases, rather than processing data in realtime, a collector/processor pattern is appropriate. The collector Workflow accepts data or a task and stores it (typically in a queue) while a processor Workflow pulls a task from the store (or queue), processes it and then starts over.
Here are a few benefits to this approach:
- The consumer (i.e. the app or service that called the collector Workflow) gets an almost immediate response back (although it can't always be informed whether the task will succeed).
- The collector Workflow consumes an Active Workflow slot for a very short period of time (usually milliseconds).
- The processor Workflow consumes only one Active Workflow slot (unless it is specifically designed to process tasks in parallel).
Exercise 17: Collector Workflow
In this exercise we'll create a Workflow that loads a request for an invoice payment on to a queue.
- Add - Variable Barto a new Workflow.
- Add - Variable Bar.invoice,- Variable Bar.creditorand- Variable Bar.value, set all of these Properties as Outputs.
- Add - Formatter, connect- Start.RunNow → Formatter.
- Set - Formatter.Escapingto- JSONand- Formatter.Expressionto the expression below:- { "invoice": "{invoice}", "creditor": "{creditor}", "value": "{value}" }
- Add - Formatter.invoice,- Formatter.creditor,- Formatter.value, create Data Flow Connectors from the Properties with corresponding names on- Variable Bar.
- Add - Flowgear Queue, connect- Formatter → Flowgear Queue.
- Set - Flowgear Queue.Queueto- payments.
- Set - Flowgear Queue.Actionto- Enqueue.
- Connect - Formatter.Result → Flowgear Queue.Message.
- Open Workflow Settings, enable - Allow Run On Demand.
- Place some values in to the Properties in - Variable Bar.- Run the Workflow to check it's executed successfully. - Save the Workflow, then click Back ( - <), then- Run Now. Switch from- Productionto- Testby clicking- Production ▼.- Provide a different set of payment information and click - Run Workflow, repeat a few times so that a number of items are added to the queue.
Return to the Workflow Design once, run the Workflow again and save it, then click Submit Exercise to grade it.
Exercise 18: Processor Workflow
- Add - Flowgear Dequeueto a new Workflow, connect- Start.RunNow → Flowgear Dequeue.
- Set - Flowgear Dequeue.Queueto- payments.
- Add - Email Alertand connect- Flowgear Dequeue → Email Alert.
- Set - Email Alert.Recipientsto your email address and- Email Alert.Subjectto- Payment information.
- Connect - Flowgear Dequeue.Message → Email Alert.Body.
- In the Workflow Settings, enable - Allow Always On.
- Save the Workflow and click Back ( - <).- Note that the - Always Onoption is shown - normally this Workflow would be run by enabling Always On against the appropriate Environment.
- Click - Designto return to the Workflow Design.- Run the Workflow to check it's executed successfully. Because the - Flowgear DequeueNode is a Trigger Node, the Workflow will idle on that step waiting for the next message. Click- Stopto stop the Workflow.
Save and run your Workflow, then click Submit Exercise to grade it.
More about the Collector/Processor Pattern
Task Storage
In the exercises above we used the Flowgear Queue Node which manages automatically configuring a queue. If you require more control over the queue, you could also look at Azure Queue or Amazon SQS Queue.
You could also consider using a table in a database or Redis to hold a list of items.
Note that most queues do not allow individual records to be large. If you do have a large amount of data (more than say, 50KB), you should use a different Node to store the data and then just push an identifier in to the queue that points to that data. The queue is then used to "hold a place" in the line but the actual data that needs to be processed is stored separately.
A blob store is often used for this purpose - consider Azure Blob Storage or Amazon S3 Object. When using this approach, don't forget to remove the stored data once you've processed it.
Parallel Processing
If you have a large volume of incoming requests, it may be helpful to process tasks in parallel from the processor Workflow.
To do this, factor the processing step to a separate Workflow, then reference that Workflow from the processor Workflow and click Enable Parallel Mode on the sub-Workflow. More about Parallel Workflows.
Completion Notification
When a task is running asynchronously, the consumer needs a way to know that processing has completed and what the result was. There are two options here:
Poll Method
- Modify the collector Workflow so that it generates an ID for the message. One way to do that is to use the - ExpressionNode with its- ExpressionProperty set to- Guid.NewGuid(). This will cause a unique identifier to be generated for each message. Store this identifier as part of the message and return it to the consumer via a- Variable BarOutput Property.
- In the processor Workflow, use a Key/Value Node to store outcome information keyed on the identifier. 
- Create a third status query Workflow that takes the identifier as an input and returns the Key/Value record associated with it. 
The consumer will then send the task by calling the collector Workflow and receiving back an identifier. It can then poll the status query Workflow at a regular interval to see whether any results are available.
Event Method
- As part of the input to the collector Workflow, include a callback URL property, store that property as part of the data loaded into the queue. 
- When the processor Workflow has finished processing a record, use - Web Request 2to fire the callback URL, advising the consumer of completion.