queue
> 文档中心 > 文档中心 > INFINI Gateway > 功能手册 > 在线过滤器 > queue

queue #

描述 #

queue 过滤器用来保存请求到消息队列。

配置示例 #

一个简单的示例如下:

flow:
  - name: queue
    filter:
      - queue: #handle dirty_writes, second-commit
          queue_name: "primary_final_commit_log##$[[partition_id]]"
          labels:
            type: "primary_final_commit_log"
            partition_id: "$[[partition_id]]"
          message: "$[[_ctx.request.header.X-Replicated-ID]]#$[[_ctx.request.header.LAST_PRODUCED_MESSAGE_OFFSET]]#$[[_sys.unix_timestamp_of_now]]"
          when:
            equals:
              _ctx.request.header.X-Replicated: "true"

参数说明 #

名称类型说明
depth_thresholdint大于队列指定深度才能存入队列,默认为 0
typestring指定消息队列的类型,支持 kafkadisk
queue_namestring消息队列名称
labelsmap给新增的消息队列 Topic 添加自定义的标签
messagestring自定义消息内容,支持变量
save_last_produced_message_offsetbool是否保留最后一次写入成功的消息的 Offset 到上下文中,可以作为变量随后使用
last_produced_message_offset_keystring自定义最后一次写入成功的消息的 Offset 保留到上下文中的变量名,默认 LAST_PRODUCED_MESSAGE_OFFSET