为了账号安全,请及时绑定邮箱和手机立即绑定

KStreams:如何获取记录的(原始)主题?

KStreams:如何获取记录的(原始)主题?

德玛西亚99 2022-12-28 15:46:59
我有以下//Config setupProperties props = ...; //setupList<String> topicList = Arrays.asList({"A", "B", "C"});StreamBuilder builder = new StreamBuilder();KStream<String, String> source = builder.stream(topicList);source  .map((k,v) -> {    //How can i get the topic of the record here  })  .to((k,v,r) -> {//busy code for topic routing});new KafkaStream(builder.build(), properties).start();
查看完整描述

1 回答

?
慕哥6287543

TA贡献1831条经验 获得超10个赞

您可以使用ProcessorContext.topic()获取所需的主题名称。要访问 ProcessorContext,请使用 KStream.process() 为其提供适当的Processor实现。

您也可以使用 KStream.transform():

KStream<InputKeyType, InputValueType> stream2 = stream.transform(new TransformerSupplier<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {

            @Override

            public Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>> get() {

                return new Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {

                    private ProcessorContext context;


                    @Override

                    public void init(ProcessorContext context) {

                        this.context = context;

                    }


                    @Override

                    public KeyValue<OutputKeyType, OutputValueType> transform(InputKeyType key, InputValueType value) {


                        this.context.topic() // topic name you need

                        // logic here

                        return new KeyValue<>(OutputKeyType key, OutputValueType value);


                    }


                    @Override

                    public void close() {


                    }

                };

            }

        });


查看完整回答
反对 回复 2022-12-28
  • 1 回答
  • 0 关注
  • 75 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号