Trying to write a Spark Streaming job that consumes messages from Kafka. Here’s what I’ve done so far:
- Started Zookeeper
- Started Kafka Server
Sent a few messages to the server. I can see them when I run the following:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
Now trying to write a program to count # of messages coming in within 5 minutes.
The code looks something like this:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
Not sure what value to use for the 3rd argument (consumer group). When I run this I get Unable to connect to zookeeper server
. But Zookeeper is running on port 2181
; otherwise step #3 would not have worked.
Seems like I am not using KafkaUtils.createStream
properly. Any ideas?