Dynamic Topic Name / Quarkus SmallRye Reactive Messaging Kafka
Asked Answered
R

2

6

I want to use this extension: [Quarkus Smallrye Reactive Messaging Kafka]

But in my application the name of the topics is not known in advance, it is specified according to the message received from the user at runtime. How can I specify the topic name and settings related to the topic without annotations and programmatically? (Only for send a message to Kafka -> Produce)

@ApplicationScoped
public class PriceGenerator {

    private Random random = new Random();

    // Don't want to use this 
    // "generated-price" not known at build time
    @Outgoing("generated-price")                       
    public Multi<Integer> generate() {                  
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onOverflow().drop()
                .map(tick -> random.nextInt(100));
    }

}

or these configs should set at runtime and programmatically

mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

Because I did not know the way, I used the native Kafka driver

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-kafka-client</artifactId>
    </dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "85.93.89.115:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topicName.toString(), messageFactory.MessageToString(message)));

Rhu answered 17/2, 2021 at 20:17 Comment(6)
You've got the topic name in the producer.send call. What's the issue?Idealize
@Idealize Thanks for your reply. I use quarkus-kafka-client dependency. but I don't want to use it. I want to use smallrye-kafka. I used quarkus-kafka-client because I had no choice. smallrye-kafka use annotation for the topic name.Rhu
@Idealize I'm looking for a way to use smallrye-kafka to manually set the topic name without the use of annotations.Rhu
I'm not super familiar with the smallrye library, but I think it requires the annotations so isn't able to dynamically redefine the outgoing channelIdealize
now it's possible with smallrye.io/smallrye-reactive-messaging/…Bulk
@bertranddeweer thank you. Yes, that's what I wanted. But your link gives me a 404 error. I put the link with the new version smallrye.io/smallrye-reactive-messaging/…Rhu
T
3

you can use emitter, with metadata the code will look like this


    @Channel("channel-out")
    Emitter<String> kafkaEventEmitter;

    public void publishToKafka(String data,String TOPICNAME) {

        OutgoingKafkaRecordMetadata<?> metadata= OutgoingKafkaRecordMetadata.builder()
                .withTopic(TOPICNAME)
                        .build();

        kafkaEventEmitter.send(Message.of(data).addMetadata(metadata));
}

and here is official documentation :https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.9/kafka/kafka.html#_dynamic_topic_names

Teodor answered 6/10, 2023 at 12:46 Comment(0)
H
1

You're able to override the value of topic dynamically at startup or at any time you need, but here is a snippet of code to indicates how to override the predefined value of topic:

@ApplicationScoped
public class AppLifecycleBean {

    void onStart(@Observes StartupEvent ev) {               
        System.setProperty("mp.messaging.outgoing.generated-price.topic", "example");
    }

    void onStop(@Observes ShutdownEvent ev) {               
    }

}
Hiphuggers answered 5/9, 2021 at 9:52 Comment(1)
This is "hacky", because you are overriding application config. Also not sure, if you can use this "any-time", because once the outgoing channel is bound to some magic handler object, it wont respond to changes in configurationOrangy

© 2022 - 2024 — McMap. All rights reserved.