From a couple of days I'm trying out ways to dynamically pass topics to Kafka listener rather than using them through keys from a Java DSL. Anyone around done this before or could throw some light on what is the best way to achieve this?
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.
回答2:
you can change Topics at runtime dynamicly!!!!
@Component
public class StoppingErrorHandler implements ErrorHandler {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer)kafkaListenerEndpointRegistry.getListenerContainer("fence");
ContainerProperties cp=listenerContainer.getContainerProperties();
String[] topics =cp.getTopics();
topics[0]="gaonb";
listenerContainer.stop();
listenerContainer.start();
}
}
回答3:
Here is a working solution:
// Start brokers without using the "@KafkaListener" annotation
Map<String, Object> consumerProps = consumerProps("my-srv1:9092", "my-group", "false");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties("my-topic");
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
log.error("Message received: " + record);
records.add(record);
});
container.start();
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param brokersCommaSep the bootstrapServers property (comma separated servers).
* @param group the group id.
* @param autoCommit the auto commit.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String brokersCommaSep, String group, String autoCommit) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersCommaSep);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
Hope it can help.