Kafka consumer in Spark Streaming
Asked Answered
T

5

6

Trying to write a Spark Streaming job that consumes messages from Kafka. Here’s what I’ve done so far:

  1. Started Zookeeper
  2. Started Kafka Server
  3. Sent a few messages to the server. I can see them when I run the following:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
    
  4. Now trying to write a program to count # of messages coming in within 5 minutes.

The code looks something like this:

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
        sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

Not sure what value to use for the 3rd argument (consumer group). When I run this I get Unable to connect to zookeeper server. But Zookeeper is running on port 2181; otherwise step #3 would not have worked.

Seems like I am not using KafkaUtils.createStream properly. Any ideas?

Tali answered 3/11, 2014 at 23:58 Comment(3)
Is zookeeper running on the same box as Spark? Have you tried to verify that Zookeeper is up and running by connecting to Zookeeper using <zookeeper_dir>/current/bin/zkCli.sh?Staple
I am so dumb! I changed 'localhost' to the actual machine name & got past this error. BUT - It's not quite working yet. Any one knows what the 'default' value of 'consumer group' is under Kafka? It doesn't seem to consume any messages.Tali
I am facing the same issue where i am not getting any messages from the producer. I am using python producer. And also I am able to get the msg from console consumer. The numofparitions also 1 in my config. @Tali How did u solve this ?Mande
S
2

There is no such thing as default consumer group. You can use an arbitrary non-empty string there. If you have only one consumer, its consumer group doesn't really matter. If there are two or more consumers, they can either be a part of the same consumer group or belong to different consumer groups.

From http://kafka.apache.org/documentation.html :

Consumers

...

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

I think the problem may be in 'topics' parameter. From Spark docs:

Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

You only specified a single partition for your topic, namely '1'. Depending on broker's setting (num.partitions), there may be more than one partitions and your messages may be sent to other partitions which aren't read by your program.

Besides, I believe the partitionIds are 0 based. So if you have only one partition, it will have the id equal to 0.

Staple answered 4/11, 2014 at 7:51 Comment(2)
Not sure if the partition ids are 0 based as you suggested. When I use: map.put("mytopic", new Integer(0)); I get this error: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AssertionError: assertion failedTali
Shouldn't following code print something? JavaDStream<String> statuses = tweets.map( new Function<String, String>() { public String call(String status) { System.out.println(status); return status; } } );Tali
A
0

I think you should specify the ip for zookeeper instead of localhost. Also, the third argument is for consumer group name. It can be any name you like. It is for the time when you have more than one consumer tied to the same group,topic partitions are distributed accordingly.Your tweets should be:

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map);
Ambiguous answered 10/7, 2015 at 13:49 Comment(0)
M
0

I was facing the same issue. Here is the solution that worked for me.

  • The number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.So Spark Streaming requires minimum of two cores . So in my spark-submit, I should mention at-least two cores.
  • kafka-clients-version.jar should be included in the list of dependent jars in spark-submit.
Mande answered 28/9, 2015 at 7:44 Comment(0)
H
0

If zookeeper is running on the same machine as your streaming application then "localhost:2181" will work. Otherwise, you have to mention the address of the host where zookeeper is running and ensure that machine on which streaming app is running is able to talk to zookeeper host on port 2181.

Hanaper answered 25/5, 2016 at 5:53 Comment(1)
Don't post comment as answer. This should be commentNovitiate
C
-2

I think, in your code, the second argument for the call KafkaUtils.createStream, should be the host:port of the kafka server, not the zookeeper host and port. check that once.

EDIT: Kafka Utils API Documentation

As per the document above, it should be the zookeeper quorum . So Zookeeper hostname and port should be used.

zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).

Cureton answered 13/6, 2015 at 11:12 Comment(2)
If I give host:port then connection fails. It is the zookeeper host and port only.Mande
It is the list of zookeepers only, ie the zookeeperQorumElma

© 2022 - 2024 — McMap. All rights reserved.