企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
### **4.1.1 配置主题(Configuring Topics)** ***** 如果您在应用程序上下文中定义了一个 `KafkaAdmin` Bean,则它可以自动将主题(Topic)添加到代理(Broker)。 为此,您可以将每个主题的 `NewTopic` `@Bean` 添加到应用程序上下文中。 版本 2.3 引入了新类 `TopicBuilder`,以使 Bean 的创建更加方便。 以下示例显示了如何执行此操作: ~~~ @Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ...); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return TopicBuilder.name("thing1") .partitions(10) .replicas(3) .compact() .build(); } @Bean public NewTopic topic2() { return TopicBuilder.name("thing2") .partitions(10) .replicas(3) .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd") .build(); } @Bean public NewTopic topic3() { return TopicBuilder.name("thing3") .assignReplicas(0, Arrays.asList(0, 1)) .assignReplicas(1, Arrays.asList(1, 2)) .assignReplicas(2, Arrays.asList(2, 0)) .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd") .build(); } ~~~ <br > > 注意:使用 Spring Boot 时,会自动注册一个 KafkaAdmin Bean,因此您只需要 NewTopic @Bean 即可。 <br > 默认情况下,如果代理(Broker)不可用,则会记录一条消息,但是上下文会继续加载。 您可以以编程方式调用 Admin 的 initialize() 方法,以便稍后再试。 如果您希望这种情况被认为是致命的,请将 Admin 的 fatalIfBrokerNotAvailable 属性设置为 true。 这样上下文就无法初始化了。 <br > > 如果代理(Broker)支持(1.0.0或更高版本),并且 Admin 发现现有主题的分区数少于 NewTopic.numPartitions,则 Admin 会自动增加分区数。 <br > 要获得更多高级功能,可以直接使用 AdminClient。 以下示例显示了如何执行此操作: ~~~ @Autowired private KafkaAdmin admin; ... AdminClient client = AdminClient.create(admin.getConfig()); ... client.close(); ~~~