consumer
> Documentation > Docs > INFINI Gateway > References > Offline Processor > consumer

consumer #

Description #

The consumer processor is used to consume messages recorded in the queue without processing them. Its purpose is to provide an entry point for data consumption pipeline, which will be further processed by subsequent processors.

Configuration Example #

Here is a simple configuration example:

pipeline:
  - name: consume_queue_messages
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 5000
    processor:
      - consumer:
          consumer:
            fetch_max_messages: 1
          max_worker_size: 200
          num_of_slices: 1
          idle_timeout_in_seconds: 30
          queue_selector:
            keys:
              - email_messages
          processor:
            - xxx1:
            - xxx2:

In the above example, it subscribes to and consumes the email_messages queue. The queue messages are stored in the context of the current pipeline. The consumer provides a processor parameter, which contains a series of processors that will be executed sequentially. If any processor encounters an error during execution, the consumer will exit without committing the batch of data.

Parameter Description #

NameTypeDescription
message_fieldstringThe field name in the context where messages from the queue are stored. Default is messages.
max_worker_sizeintThe maximum number of workers allowed to run simultaneously. Default is 10.
num_of_slicesintThe number of parallel threads for consuming a single queue. Maximum slice size at runtime.
slicesarrayAllowed slice numbers as an integer array.
queue_selector.labelsmapFilter a group of queues to be consumed based on labels, similar to queues configuration.
queue_selector.idsarraySpecifies the UUIDs of the queues to be consumed, as a string array.
queue_selector.keysarraySpecifies the unique key paths of the queues to be consumed, as a string array.
queuesmapFilter a group of queues to be consumed based on labels, similar to queue_selector.labels configuration.
waiting_afterarrayWhether to wait for specified queues to finish consumption before starting. UUIDs of the queues, as a string array.
idle_timeout_in_secondsintTimeout duration for consuming queues. Default is 5 seconds.
detect_active_queueboolWhether to automatically detect new queues that meet the conditions. Default is true.
detect_intervalintTime interval in milliseconds for automatically detecting new queues that meet the conditions. Default is 5000.
quiet_detect_after_idle_in_msboolIdle interval in milliseconds to exit automatic detection. Default is 30000.
skip_empty_queueboolWhether to skip consuming empty queues. Default is true.
quit_on_eof_queueboolAutomatically quit consuming when reaching the last message of a queue. Default is true.
consumer.sourcestringConsumer source.
consumer.idstringUnique identifier for the consumer.
consumer.namestringConsumer name.
consumer.groupstringConsumer group name.
consumer.fetch_min_bytesintMinimum size in bytes for fetching messages. Default is 1.
consumer.fetch_max_bytesintMaximum size in bytes for fetching messages. Default is 10485760, which is 10MB.
consumer.fetch_max_messagesintMaximum number of messages to fetch. Default is 1.
consumer.fetch_max_wait_msintMaximum wait time in milliseconds for fetching messages. Default is 10000.
consumer.eof_retry_delay_in_msintWaiting time in milliseconds for retrying when reaching the end of a file. Default is 500.