为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 Spring Kafka 中接收来自 KSQL 的流式响应?

如何在 Spring Kafka 中接收来自 KSQL 的流式响应?

喵喵时光机 2022-05-12 15:34:15
java - 如何在java spring boot应用程序中接收来自kafka KSQL服务器的分块响应?当我对端点进行休息呼叫时,/query我只得到 1 行并且连接关闭。如何保持连接打开并接收多行?医生说响应流回,直到达到语句中指定的 LIMIT,或者客户端关闭连接。在java中实现这一点的方法是什么?即使对于 KTable,我也只能得到 1 行作为回报。https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output
查看完整描述

1 回答

?
千巷猫影

TA贡献1829条经验 获得超7个赞

我能够解决的方法如下:


以字符串形式获取响应

逐行解析 JSON 对象(KafkaQueryResponse是代表 1 行的对象)


    ResponseEntity<String> result = template.exchange("/query",

        HttpMethod.POST,

        new HttpEntity<>(params, headers),

        String.class);


    List<KafkaQueryResponse> array = new ArrayList<>();

    JsonFactory jsonFactory = new JsonFactory();

    try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {

        Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);

        value.forEachRemaining(e -> {

            if (e.getRow() != null) {

                array.add(e);

            }

        });

    }

    array <----  this is the list of JSON objects

KafkaQueryResponse


    @Data

    @JsonIgnoreProperties(ignoreUnknown = true)

    public class KafkaQueryResponse {

        private KafkaQueryRow row;

        private String finalMessage;

        private String errorMessage;


        @Data

        @JsonIgnoreProperties(ignoreUnknown = true)

        public static class KafkaQueryRow {

            private List<Object> columns;

        }

    }

此解决方案不允许以块的形式读取流式响应。它等待整个响应到达客户端,然后关闭连接,然后解析所有 json 对象。


查看完整回答
反对 回复 2022-05-12
  • 1 回答
  • 0 关注
  • 129 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号