I am working on Lettuce cluster Java client. It is set up inside a bolt
topology (Apache Strom). spout
is reading data from kafka
and passing it to bolt
. However, when I am starting my topology, I am getting below error message and program terminated. Am I missing something? what is causing this?
Stack trace
29502 [Thread-17-RecommendationLettuceBolt-executor[2 2]] ERROR o.a.s.util - Async loop died! io.lettuce.core.RedisException: Cannot retrieve initial cluster partitions from initial URIs [RedisURI [host='127.0.0.1', port=7001]] at io.lettuce.core.cluster.RedisClusterClient.loadPartitions(RedisClusterClient.java:865) ~[lettuce-core-5.1.7.RELEASE.jar:?] at io.lettuce.core.cluster.RedisClusterClient.initializePartitions(RedisClusterClient.java:819) ~[lettuce-core-5.1.7.RELEASE.jar:?] at io.lettuce.core.cluster.RedisClusterClient.connect(RedisClusterClient.java:345) ~[lettuce-core-5.1.7.RELEASE.jar:?] at com.projectName.indexer.lettuce.LettuceClusterClientProvider.getConnection(LettuceClusterClientProvider.java:72) ~[classes/:?] at com.projectName.indexer.lettuce.LettuceCacheRepopulationHandler.openLettuceConnection(LettuceCacheRepopulationHandler.java:42) ~[classes/:?] at com.projectName.indexer.bolts.RecommendationLettuceBolt.prepare(RecommendationLettuceBolt.java:35) ~[classes/:?] at org.apache.storm.daemon.executor$fn__8058$fn__8071.invoke(executor.clj:795) ~[storm-core-1.0.2.jar:1.0.2] at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) [storm-core-1.0.2.jar:1.0.2] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.base/java.lang.Thread.run(Thread.java:844) [?:?] Caused by: io.lettuce.core.RedisConnectionException: Unable to establish a connection to Redis Cluster at [RedisURI [host='127.0.0.1', port=7001]] at io.lettuce.core.cluster.topology.AsyncConnections.get(AsyncConnections.java:89) ~[lettuce-core-5.1.7.RELEASE.jar:?] at io.lettuce.core.cluster.topology.ClusterTopologyRefresh.loadViews(ClusterTopologyRefresh.java:73) ~[lettuce-core-5.1.7.RELEASE.jar:?] at io.lettuce.core.cluster.RedisClusterClient.doLoadPartitions(RedisClusterClient.java:871) ~[lettuce-core-5.1.7.RELEASE.jar:?] at io.lettuce.core.cluster.RedisClusterClient.loadPartitions(RedisClusterClient.java:844) ~[lettuce-core-5.1.7.RELEASE.jar:?] ... 9 more
Input Code
private void init() {
redisUri = RedisURI.Builder
.redis(lettuceConfig.getLettuceClusterHost())
.withPort(lettuceConfig.getLettuceClusterPort())
.withTimeout(Duration.ofMillis(lettuceConfig.getLettuceClusterTimeout()))
.build();
}
public StatefulRedisClusterConnection getConnection() {
if (connection == null || !connection.isOpen()) {
redisClusterClient = RedisClusterClient.create(redisUri);
final ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofMinutes(BoltConstants.Lettuce.PERIODIC_REFRESH_TIME_IN_MIN))
.enableAdaptiveRefreshTrigger()
.build();
final ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.autoReconnect(true)
.topologyRefreshOptions(topologyRefreshOptions)
.build();
redisClusterClient.setOptions(clusterClientOptions);
connection = redisClusterClient.connect(SnappyCompressor.wrap(new StringCodec()));
log.info("Connected to Redis client lettuce. lettuce connection is up and running.");
}
return connection;
}
Environment
compile 'io.lettuce:lettuce-core:5.1.7.RELEASE'