Apache Ignite Availability Issue w/Custom CacheStoreAdapter
Asked Answered
A

1

9

I'm doing a PoC using apache ignite. Here is the scenario I'm testing:

  1. Start a cluster of 3 nodes and a client.
  2. Call get key. I log on node that caches this key.
  3. Call get key. I verify it gets stored value.
  4. Do a loadCache(). All nodes report successfully Loading cache.
  5. Kill node that originally loaded key
  6. Restart node that I just killed.
  7. Query for key again.

Steps 6 and 7 have some trouble. If I wait Long enough between the two everything works as it should. However if try to do 6 and 7 too close together then I get this error on the client and this error on the node.

I see the error IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected. However is there a way to avoid this issue? Setting a longer time to wait for a topology update isn't really an option because a client may try to connect at any time. Is it to do with my cluster configuration? I saw this documentation which suggests infinitely trying to connect which seems like it would just keep erring.

Also, we would need to be able to dynamically grow/shrink the cluster. Is this possible? Would having in memory backups fix the functionality?

Note, if I omit step 6 I've not seen it fail.

Cluster Node Config

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--<import resource="./cache.xml"/>-->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>

        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- Set a cache name. -->
                <property name="name" value="recordData"/>
                <!--<property name="rebalanceMode" value="SYNC"/>-->
                <!-- Set cache mode. -->
                <property name="cacheMode" value="PARTITIONED"/>

                <property name="cacheStoreFactory">
                    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                        <constructor-arg value="Application.RecordDataStore"/>
                    </bean>
                </property>
                <property name="readThrough" value="true"/>
                <property name="writeThrough"  value="true"/>

            </bean>
        </property>

        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <!-- Override local port. -->
                <property name="localPort" value="8000"/>
            </bean>
        </property>

        <property name="communicationSpi">
            <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <!-- Override local port. -->
                <property name="localPort" value="8100"/>
            </bean>
        </property>
    </bean>
</beans>

Client Config

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Set to true to enable distributed class loading for examples, default is false. -->
        <property name="peerClassLoadingEnabled" value="true"/>
        <property name="clientMode" value="true"/>

        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- Set a cache name. -->
                <property name="name" value="recordData"/>
                <!--<property name="rebalanceMode" value="SYNC"/>-->

                <!-- Set cache mode. -->
                <property name="cacheMode" value="PARTITIONED"/>

                <property name="cacheStoreFactory">
                    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                        <constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/>
                    </bean>
                </property>
                <property name="readThrough" value="true"/>
                <property name="writeThrough"  value="true"/>

            </bean>
        </property>

        <!-- Enable task execution events for examples. -->
        <property name="includeEventTypes">
            <list>
                <!--Task execution events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
                <!--Cache events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            </list>
        </property>

        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>localhost:8000..8099</value>
                                <!--<value>127.0.0.1:47500..47509</value>-->
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Implemented methods of CacheStoreAdaptor

public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> {

  // This method is called whenever "get(...)" methods are called on IgniteCache.
    @Override public List<Record> load(Long key) {
        System.out.println("Load data for pel: " + key);
        try {
            CouchDbConnector db = RecordDataStore.getDb();
            ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
            List<Record> list = db.queryView(viewQuery,Record.class);
            HashMultimap<Long,Record> multimap = HashMultimap.create();

            list.forEach(r -> {
                multimap.put(r.getId(),r);
            });
            return new LinkedList<>(multimap.get(key));
        } catch (MalformedURLException e) {
            throw new CacheLoaderException("Failed to load values from cache store.", e);
        }
    }
    ....
    @Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) {
        if (args == null || args.length == 0 || args[0] == null) {
            throw new CacheLoaderException("Expected entry count parameter is not provided.");
        }

        System.out.println("Loading Cache...");
        final long entryCnt = (Long)args[0];

        try{
            CouchDbConnector db = RecordDataStore.getDb();
            ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
            List<Record> list = db.queryView(viewQuery,Record.class);
            HashMultimap<Long,Record> multimap = HashMultimap.create();

            long count = 0;
            for(Record r : list) {
                multimap.put(r.getPel(),r);
                count++;
                if(count == entryCnt)
                    break;
            }

            multimap.keySet().forEach(key -> {
                clo.apply(key,new LinkedList<>(multimap.get(key)));
            });
        }
        catch (MalformedURLException e) {
            throw new CacheLoaderException("Failed to load values from cache store.", e);
        }

        System.out.println("Loaded Cache");
    }

    public static CouchDbConnector getDb() throws MalformedURLException {
        HttpClient httpClient = new StdHttpClient.Builder()
            .url("server:1111/")
            .build();

        CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
        CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance);

        return db;
    }
}
Auricula answered 3/8, 2016 at 20:43 Comment(2)
Are you sure that server nodes actually discover each other? Can you somewhere upload the logs from all the nodes?Kobarid
It happens before nodes discover each other and its starting up. At least it happens before you see the topology update in the console. I'll get logs for them sometime today.Auricula
P
4

http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-recovery-after-network-partition-td2775.html stresses that the IgniteClientDisconnectedException provides a IgniteFuture that can be accessed by calling

IgniteFuture f = myException.reconnectFuture();

That future has a get()-method, which waits for the node to reconnect:

Synchronously waits for completion of the computation and returns computation result.

Thus, the following should finish when the client has reconnected:

f.get();
Puglia answered 13/8, 2016 at 12:23 Comment(1)
Sorry I missed responding to your answer BEFORE the bounty expired. Thanks!Auricula

© 2022 - 2024 — McMap. All rights reserved.