为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的

如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的

Go
倚天杖 2022-11-08 15:36:22
我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,其中kafka_key元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码的有效载荷的模式存储在模式注册表中,Benthos 有一个schema_registry_decode用于解码它们的处理器。我希望为每个包含两个字段的 Kafka 消息生成输出 JSON 消息,一个称为content包含解码的 AVRO 消息,另一个称为包含Benthos 收集metadata的各种元数据字段kafka_key,包括解码的有效负载。
查看完整描述

1 回答

?
隔江千里

TA贡献1906条经验 获得超10个赞

事实证明,可以使用这样的branch处理器来实现这一点:


input:

  kafka:

    addresses:

      - localhost:9092

    consumer_group: benthos_consumer_group

    topics:

      - benthos_input


pipeline:

  processors:

    # Decode the message

    - schema_registry_decode:

        url: http://localhost:8081


    # Populate output content field

    - bloblang: |

        root.content = this


    # Decode kafka_key metadata payload and populate output metadata field

    - branch:

        request_map: |

          root = meta("kafka_key")


        processors:

          - schema_registry_decode:

              url: http://localhost:8081


        result_map: |

          root.metadata = meta()

          root.metadata.kafka_key = this


output:

  stdout: {}


查看完整回答
反对 回复 2022-11-08
  • 1 回答
  • 0 关注
  • 152 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信