为了账号安全,请及时绑定邮箱和手机立即绑定

如何处理 Kafka Streams 中的不同时区?

如何处理 Kafka Streams 中的不同时区?

HUWWW 2022-05-25 10:54:00
因此,我正在评估 Kafka Streams 以及它可以做些什么来查看它是否适合我的用例,因为我需要每隔 15 分钟、每小时、每天聚合一次传感器的数据,并且由于它的 Windowing 功能而发现它很有用。因为我可以通过应用创建窗口,windowedBy()但KGroupedStream问题是窗口是在 UTC 中创建的,我希望我的数据按其原始时区而不是按 UTC 时区分组,因为它阻碍了聚合,所以任何人都可以帮助我解决这个问题。
查看完整描述

2 回答

?
明月笑刀无情

TA贡献1828条经验 获得超4个赞

您可以使用自定义“移动”时间戳TimestampExtractor- 在将结果写回输出主题之前,您可以使用 aTransformer并通过context.forward(key, value, To.all().withTimestamps()).

功能请求票:https ://issues.apache.org/jira/browse/KAFKA-7911


查看完整回答
反对 回复 2022-05-25
?
侃侃无极

TA贡献2051条经验 获得超10个赞

因此,为了解决这个问题,我创建了自定义TimestampExtractor并使用它来更改流窗口创建时间以记录来自有效负载的时间,如下所示。


public class RecordTimeStampExtractor implements TimestampExtractor {


    @Override

    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {

        JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());

        Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());

        return recordTimestamp.getTime();

    }


}

所以现在我已经用我的本地时区测试了它,因为昨天是 IST 05:30,它的工作正常,kafka 流也正在根据记录时间戳创建窗口。也将使用其他时区进行测试并更新答案


查看完整回答
反对 回复 2022-05-25
  • 2 回答
  • 0 关注
  • 213 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号