Processing Record Queue
During the execution of the process flow, the entire input data is processed at a time. Adeptia Connect allows you to process the input data record-by-record using Record Queue Processor. Using Record Queue Processor, you can:
- Process the input data record-by-record, andÂ
- Specify the number of records to be processed.
How does Record Queue Processor work?
Record Queue Processor consists of Record Queue Producer and Record Queue Receiver. Record Queue Producer is an asynchronous activity, which sets records one by one in a queue and waits for it to get consumed by the Record Queue Receiver. Record Queue Receiver consumes the record from the queue and produces a stream. This stream can be further processed by other activities of the process flow. Once the record is consumed by the Record Queue Receiver, Record Queue Producer sets the next record and waits for it to get consumed by the Record Queue Receiver. This loop continues until all the records are queued and processed. Once all the records are processed, the loop is broken and the process flow stops. The usage of the Record Queue Processor is depicted in the below diagram.
As shown in the diagram, data from the FileSource is consumed by the TextSchema, which further passes it to the Record Producer. Record Producer takes the first record and sets it into Queue as specified by the Record Queue Producer properties. Gateway is used to check the availability of records. Below code is used at Gateway to check the availability of the record in the queue.
Â
String queue = context.get( "Service.queueName.nextRecord" ); if (queue.equals( "true" )){ return  true ;  } return  false ; |
where,
<queueName> is the name of the queue specified in the Record Queue Producer.
<nextRecord> is a variable which is used to decide whether the record is available in the queue or not. Value of the variable NextRecord can be true or false.
When the Record Queue Producer sets a record in the queue, the value of the nextRecord variable becomes true. If the value is true, it means the next record is available in the queue for processing. In this case, Record Queue Receiver takes the record from the queue, changes the value of the nextRecord variable to false and passes the record to the File Target. In the meantime, Record Queue Producer again sets the record in the queue and changes the value of the nextRecord to true and waits for it to get consumed by the Record Queue Receiver. If the value of the NextRecord variable is not changed to true, it means there is no record available for processing and the record queue processor is stopped.
Processing records using Record Queue Producer and Record Queue Receiver
This section is explained by considering an example. You can create your own process flow and use Record Queue Producer and Receiver.
- In Web Process Designer, in the Repository View panel, select Activities.
- Drag the File Source and Text Schema activities to the Canvas area.
- Expand RecordQueueProducer, select the RecordQueueProducer activity, and drag it to the Canvas area.
- Drag a Gateway element and File Target to the Canvas area.
- Expand RecordQueueReceiver, select the RecordQueueReceiver activity, and drag it to the Canvas area.
- Connect all the activities as shown in the below diagram.
Click RecordQueueProducer and select View Properties from context pad. Its properties are displayed in the Activity Properties Panel.
Â- Set the appropriate properties for the RecordQueueProducer.Â
Â
Properties
Description
Label
Label of the Record Queue Producer activity displayed in the Canvas area.
Type
Type of the activity. By default, the value is filled automatically and is non-editable. Alias Name of the Record Queue Producer activity. By default, it is Record Queue Producer. Activity Maximum Retries on Failure Maximum number of retries on the failure of an activity to connect to the server. The process flow resumes, if it finds a connection within the specified retries, otherwise aborts the process flow. Activity Wait Time Time (in seconds) between successive retries. Async Specifies whether the activity will be executed in Sync mode or A-sync mode. Record Queue Producer is always executed in a-sync mode. This field is non-editable. To know more about Sync and A-sync mode of execution, refer to Working with Process Flow. Character Set Encoding Character set encoding used for parsing, in case input data is XML. Input Format Format of the input data. It can be XML or Native. Output Format Format of the output record. It can be XML or Native. Queue Name Name of the queue. This will be the queue name in which records are set. It must be same as sourceQueueName of the Record Queue Receiver activity used in the process flow. Queue Size Size of the queue. Schema Typed Id It is the combination of TypedId and the 30-digit activity ID of the source schema separated by colon ':', for example, TextSchema:192168001158117196729809300003. Click RecordQueueReceiver and select View Properties from context pad. Its properties are displayed in the Activity Properties Panel.
ÂSet the appropriate properties for the RecordQueueReceiver.
Properties
Description
Label
Label of the Record Queue Receiver activity displayed in the Canvas area.
Type Type of the activity. By default, the value is filled automatically and is non-editable. Alias Name of the Record Queue Receiver activity. By default, it is Record Queue Receiver. Activity Maximum Retries on Failure  Activity Wait Time  Async Specifies whether the activity will be executed in Synch mode or A-synch mode. Record Queue Receiver can be executed in A-synch or Synch mode. To know more about Synch and Asynch mode of execution, refer to Working with Process Flow. Character Set Encoding Character set encoding used for parsing, in case input data is XML. Source Queue Name Name of the queue from which Record Queue Receiver will fetch the record. sourceQueueName must be same as QueueName of Record Queue Producer activity used in the process flow.
Based on the selected properties for the RecordQueueProducer and RecordQueueReceiver, and the code specified for the Gateway element, the records are processed.