为了账号安全,请及时绑定邮箱和手机立即绑定

嵌入式 Kafka 以错误的分区数开始

嵌入式 Kafka 以错误的分区数开始

呼唤远方 2023-06-04 15:39:23
我在 JUnit 测试中启动了一个 EmbeddedKafka 实例。我可以在应用程序中正确读取已推送到流的记录,但我注意到的一件事是每个主题只有一个分区。谁能解释为什么?在我的应用程序中,我有以下内容:List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);这将返回一个包含一项的列表。当针对具有 3 个分区的本地 Kafka 运行时,它会按预期返回包含 3 个项目的列表。我的测试看起来像:@RunWith(SpringRunner.class)@SpringBootTest@EmbeddedKafka(partitions = 3)@ActiveProfiles("inmemory")@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)@TestPropertySource(                locations = "classpath:application-test.properties",                properties = {"app.onlyMonitorIfDataUpdated=true"})public class MonitorRestKafkaIntegrationTest {    @Autowired    private EmbeddedKafkaBroker embeddedKafkaBroker;    @Value("${spring.embedded.kafka.brokers}")    private String embeddedBrokers;    @Autowired    private WebApplicationContext wac;    @Autowired    private JsonUtility jsonUtility;    private MockMvc mockMvc;    @Before    public void setup() {            mockMvc = webAppContextSetup(wac).build();            UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("dummyUser"));    }        private ResultActions interactiveMonitoringREST(String eggID, String monitoringParams) throws Exception {            return mockMvc.perform(post(String.format("/eggs/%s/interactive", eggID)).contentType(MediaType.APPLICATION_JSON_VALUE).content(monitoringParams));        }
查看完整描述

2 回答

?
慕容3067478

TA贡献1773条经验 获得超3个赞

您需要告诉经纪人预先创建主题...


@SpringBootTest

@EmbeddedKafka(topics = "foo", partitions = 3)

class So57481979ApplicationTests {


    @Test

    void testPartitions(@Autowired KafkaAdmin admin) throws InterruptedException, ExecutionException {

        AdminClient client = AdminClient.create(admin.getConfig());

        Map<String, TopicDescription> map = client.describeTopics(Collections.singletonList("foo")).all().get();

        System.out.println(map.values().iterator().next().partitions().size());

    }


}

num.partitions或者,如果您希望代理在首次使用时为您自动创建主题,则设置代理属性。


我们可能应该根据分区属性自动执行此操作。


查看完整回答
反对 回复 2023-06-04
?
慕丝7291255

TA贡献1859条经验 获得超6个赞

我发现bootstrapServersPropertyis important in @EmbeddedKafka,它用于填充 中的属性application-test.yml,然后可用于创建消费者/侦听器容器。



查看完整回答
反对 回复 2023-06-04
  • 2 回答
  • 0 关注
  • 98 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信