1 回答

TA贡献1831条经验 获得超10个赞
您可以使用ProcessorContext.topic()获取所需的主题名称。要访问 ProcessorContext,请使用 KStream.process() 为其提供适当的Processor实现。
您也可以使用 KStream.transform():
KStream<InputKeyType, InputValueType> stream2 = stream.transform(new TransformerSupplier<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
@Override
public Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>> get() {
return new Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<OutputKeyType, OutputValueType> transform(InputKeyType key, InputValueType value) {
this.context.topic() // topic name you need
// logic here
return new KeyValue<>(OutputKeyType key, OutputValueType value);
}
@Override
public void close() {
}
};
}
});
添加回答
举报