2 回答

TA贡献1828条经验 获得超4个赞
您可以使用自定义“移动”时间戳TimestampExtractor
- 在将结果写回输出主题之前,您可以使用 aTransformer
并通过context.forward(key, value, To.all().withTimestamps())
.
功能请求票:https ://issues.apache.org/jira/browse/KAFKA-7911

TA贡献2051条经验 获得超10个赞
因此,为了解决这个问题,我创建了自定义TimestampExtractor并使用它来更改流窗口创建时间以记录来自有效负载的时间,如下所示。
public class RecordTimeStampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());
Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());
return recordTimestamp.getTime();
}
}
所以现在我已经用我的本地时区测试了它,因为昨天是 IST 05:30,它的工作正常,kafka 流也正在根据记录时间戳创建窗口。也将使用其他时区进行测试并更新答案
添加回答
举报