为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 kafka 流以块/批次的形式处理数据?

如何使用 kafka 流以块/批次的形式处理数据?

元芳怎么了 2021-09-26 14:30:55
对于大数据中的许多情况,最好一次处理一小块记录缓冲区,而不是一次处理一条记录。自然的例子是调用一些支持批处理以提高效率的外部 API。我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何看起来像我想要的东西。到目前为止,我有:builder.stream[String, String]("my-input-topic").mapValues(externalApiCall).to("my-output-topic")我想要的是:builder.stream[String, String]("my-input-topic").batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")在 Scala 和 Akka Streams 中,该函数被称为groupedor batch。在 Spark Structured Streaming 中,我们可以做到mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))。
查看完整描述

2 回答

  • 2 回答
  • 0 关注
  • 188 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信