Failing to write offset data to zookeeper in kafka-storm
Asked Answered
L

2

9

I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout (the source code for kafka-spout comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout in this way:

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager tries to look for the file with the offsets, then nothing is found:

2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO  PartitionManager:86 - No partition information found, using configuration to determine offset

Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.

I also looked a bit more into the PartitionManager class which uses Zkstate class to write the offsets, from this code snippet:

PartitionManeger

public void commit() {
    long lastCompletedOffset = lastCompletedOffset();
    if (_committedTo != lastCompletedOffset) {
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    } else {
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    }
}

ZkState

public void writeBytes(String path, byte[] bytes) {
    try {
        if (_curator.checkExists().forPath(path) == null) {
            _curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, bytes);
        } else {
            _curator.setData().forPath(path, bytes);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

I could see that for the first message, the writeBytes method gets into the if block and tries to create a path, then for the second message it goes into the else block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information can be found.

Lardner answered 25/6, 2014 at 11:58 Comment(3)
hello Juto, I met the problems... have you fixed this problem? thank you, i am waiting for you nowTechnicality
Hi @kaitian, I left the company which I did this project for, therefore I don't have access to the code anymore, I never had a solution for this problem. :(Lardner
Anthony's answer works and the reason is quite obvious as in Local Mode, the zookeeper is different from the one which is used by kafka !Mithridate
J
10

I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.

To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState that stores the offset, you need to set the SpoutConfig.zkServers, SpoutConfig.zkPort, and SpoutConfig.zkRoot in addition to the ZkHosts. For example

import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;

...

    final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
    final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
    final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
    final Integer zkPort = serverInetAddresses.get(0).getPort();
    for (InetSocketAddress serverInetAddress : serverInetAddresses) {
        serverAddresses.add(serverInetAddress.getHostName());
    }

    final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
    zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;

    final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
    spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);

    spoutConfig.zkServers = serverAddresses;
    spoutConfig.zkPort = zkPort;
    spoutConfig.zkRoot = kafkaZnode;
Justice answered 11/9, 2014 at 23:30 Comment(1)
just to make the answer easier to grasp: when you have a working topology ready to deploy on remote server, add 3 last lines to make it connect to remote zookeeper when used in local modeMaureen
T
0

I think you are hitting this bug:

https://community.hortonworks.com/questions/66524/closedchannelexception-kafka-spout-cannot-read-kaf.html

And the comment from the colleague above fixed my issue. I added some newer libraries to.

Twentyfour answered 16/7, 2017 at 11:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.