consumer
> 文档中心 > 文档中心 > INFINI Gateway > 功能手册 > 离线处理器 > consumer

consumer #

描述 #

consumer 处理器用来消费 queue 记录的消息请求,但是不处理,目标是提供数据消费管道的入口,由后续的 processor 进行数据加工。

配置示例 #

一个简单的示例如下:

pipeline:
  - name: consume_queue_messages
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 5000
    processor:
      - consumer:
          consumer:
            fetch_max_messages: 1
          max_worker_size: 200
          num_of_slices: 1
          idle_timeout_in_seconds: 30
          queue_selector:
            keys:
              - email_messages
          processor:
            - xxx1:
            - xxx2:

上面的例子,订阅并消费队列 email_messages,队列消息保存在当前 Pipeline 管道的上下文里面,Consumer 提供了一个 processor 参数,这个参数里面是一系列 Processor,依次执行,任何一个 Processor 如果执行返回出错,consumer 则退出切不会 commit 这批数据。

参数说明 #

名称类型说明
message_fieldstring从队列获取到的消息,存放到上下文的字段名称, 默认 messages
max_worker_sizeint最大允许同时运行的 worker 大小,默认 10
num_of_slicesint并行消费单个队列的线程, 运行时最大的 slice 大小
slicesarray允许的 slice 编号, int 数组
queue_selector.labelsmap根据 Label 来过滤一组需要消费的队列, 同 queues 配置
queue_selector.idsarray指定要消费的队列的 UUID, 字符数组
queue_selector.keysarray指定要消费的队列的唯一 Key 路径, 字符数组
queuesmap根据 Label 来过滤一组需要消费的队列, 同 queue_selector.labels 配置
waiting_afterarray是否等待指定队列消费完成才开始消费, 队列的 UUID, 字符数组
idle_timeout_in_secondsint消费队列的超时时间,默认 5, 即 5s
detect_active_queuebool是否自动检测符合条件的新的队列,默认 true
detect_intervalint自动检测符合条件的新的队列的时间间隔,单位毫秒, 默认 5000
quite_detect_after_idle_in_msbool退出自动检测的闲置时间间隔,单位毫秒, 默认 30000
skip_empty_queuebool是否跳过空队列的消费, 默认 true
quit_on_eof_queuebool队列执行到最后一条消息自动退出消费, 默认 true
consumer.sourcestring消费者来源
consumer.idstring消费者唯一标识
consumer.namestring消费者名称
consumer.groupstring消费者组名称
consumer.fetch_min_bytesint拉取消息最小的字节大小, 默认 1
consumer.fetch_max_bytesint拉取消息最大的字节大小, 默认 10485760, 即 10MB
consumer.fetch_max_messagesint拉取最大的消息个数, 默认 1
consumer.fetch_max_wait_msint拉取最大的等待时间, 单位毫秒, 默认 10000
consumer.eof_retry_delay_in_msint达到文件末尾重试的等待时间, 单位毫秒, 默认 500