Processing Record Queue

During the execution of the process flow, the entire input data is processed at a time. Adeptia Suite allows you to process the input data record-by-record using Record Queue Processor. Using Record Queue Processor, you can:

  • Process the input data record-by-record, and 
  • Specify the number of records to be processed.

How does Record Queue Processor work?

Record Queue Processor consists of Record Queue Producer and Record Queue Receiver. Record Queue Producer is an asynchronous activity, which sets records one by one in a queue and waits for it to get consumed by the Record Queue Receiver. Record Queue Receiver consumes the record from the queue and produces a stream. This stream can be further processed by other activities of the process flow. Once the record is consumed by the Record Queue Receiver, Record Queue Producer sets the next record and waits for it to get consumed by the Record Queue Receiver. This loop continues until all the records are queued and processed. Once all the records are processed, the loop is broken and the process flow stops. The usage of the Record Queue Processor is depicted in the below diagram.



As shown in the diagram, data from the File_Source is consumed by the Text_Schema, which further passes it to the Record Producer. Record Producer takes the first record and sets it into Queue as specified by the Record Queue Producer properties. Gateway is used to check the availability of records. Below code is used at Gateway to check the availability of the record in the queue.

String queue = context.get("Service.queueName.nextRecord");if(queue.equals("true")){return true; 
 }return false;

where,

<queueName> is the name of the queue specified in the Record Queue Producer.

<nextRecord> is a variable which is used to decide whether the record is available in the queue or not. Value of the variable NextRecord can be true or false.

When the Record Queue Producer sets a record in the queue, the value of the nextRecord variable becomes true. If the value is true, it means the next record is available in the queue for processing. In this case, Record Queue Receiver takes the record from the queue, changes the value of the nextRecord variable to false and passes the record to the File Target. In the meantime, Record Queue Producer again sets the record in the queue and changes the value of the nextRecord to true and waits for it to get consumed by the Record Queue Receiver. If the value of the NextRecord variable is not changed to true, it means there is no record available for processing and the record queue processor is stopped.

Processing records using Record Queue Producer and Record Queue Receiver

This section is explained by considering an example. You can create your own process flow and use Record Queue Producer and Receiver.

  1. In the Process Designer, in the Repository View panel, expand Activities.
  2. Drag the File Source and Text Schema activities to the Graph Canvas area.
  3. Expand RecordQueueProducer, select the RecordQueueProducer activity, and drag it to the Graph Canvas area.



  4. Drag a Gateway element to the Graph Canvas area.
  5. Drag a File Target to the Graph Canvas area.
  6. Expand RecordQueueReceiver, select the RecordQueueReceiver activity, and drag it to the Graph Canvas area.
  7. Connect all the activities as shown in the below diagram.
  8. Right-click RecordQueueProducer and select View Properties. Its properties are displayed in the Properties Panel in the Bottom Pane.

     

  9. Set the appropriate properties for the RecordQueueProducer. 

     

    Properties

    Description

    Character Set Encoding

    Character set encoding used for parsing, in case input data is XML.

    inputFormat

    Format of the input data. It can be XML or Native.

    Label

    Label of the Record Queue Producer activity displayed in the Graph Canvas area.

    Name

    Name of the Record Queue Producer activity. By default, it is Record Queue Producer.

    outputFormat

    Format of the output record. It can be XML or Native.

    queueNameName of the queue. This will be the queue name in which records are set. It must be same as sourceQueueName of the Record Queue Receiver activity used in the process flow.
    queueSizeSize of the queue.

    schemaTypedId

    It is the combination of TypedId and the 30-digit activity ID of the source schema separated by colon ':', for example, TextSchema:192168001158117196729809300003.

    source

    Name of the activity, which is passing the record to the Record Queue Producer. By default, the value is filled automatically and is non-editable.

    streamNames

    Record Queue Producer doesn't produce any stream. This field remains blank.

    Sync

    Specifies whether the activity will be executed in Sync mode or A-sync mode. Record Queue Producer is always executed in a-sync mode. This field is non-editable. To know more about Sync and A-sync mode of execution, refer to Working with Process Flow.

    Type

    Type of the activity. By default, the value is filled automatically and is non-editable.

  10. Right-click RecordQueueReceiver and select View Properties. Its properties are displayed in the Properties Panel in the Bottom Pane.

     

  11. Set the appropriate properties for the RecordQueueReceiver. 

    Properties

    Description

    Character Set EncodingCharacter set encoding used for parsing, in case input data is XML.

    Label

    Label of the Record Queue Receiver activity displayed in the Graph Canvas area.

    Name

    Name of the Record Queue Receiver activity. By default, it is Record Queue Receiver.

    source

    Record Queue Receiver does not consume any stream. This field remains blank.

    sourceQueueNameName of the queue from which Record Queue Receiver will fetch the record. sourceQueueName must be same as QueueName of Record Queue Producer activity used in the process flow.

    streamNames

    Name of the stream produced by the Record Queue Receiver.

    Synch

    Specifies whether the activity will be executed in Synch mode or A-synch mode. Record Queue Receiver can be executed in A-synch or Synch mode. To know more about Synch and Asynch mode of execution, refer to Working with Process Flow.

    Type

    Type of the activity. By default, the value is filled automatically and is non-editable.


Based on the selected properties for the RecordQueueProducer and RecordQueueReceiver, and the code specified for the Gateway element, the records are processed.

Record processing can be stopped based on the specified conditions. If the condition is met, then the signal is set by the RecordQueueReceiver to stop further processing of records.