Read & write data into cassandra using apache flink Java API
Asked Answered
M

4

5

I intend to use apache flink for read/write data into cassandra using flink. I was hoping to use flink-connector-cassandra, I don't find good documentation/examples for the connector.

Can you please point me to the right way for read and write data from cassandra using Apache Flink. I see only sink example which are purely for write ? Is apache flink meant for reading data too from cassandra similar to apache spark ?

Moskow answered 6/3, 2017 at 4:30 Comment(3)
Did you have a look at this documentation and code examples?Slime
The example talks only about WRITE (insert), I am looking for READ operation as well.Moskow
The linked documentation refers to the streaming API, for which Flink only offers a sink. For the batch(DataSet) API there are Cassandra Input-/Outputformats that you could potentially re-use.Skin
H
5

I had the same question, and this is what I was looking for. I don't know if it is over simplified for what you need, but figured I should show it none the less.

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("urlToUse.com").withPort(9042).build();
        }
    };

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);

    cassandraInputFormat.configure(null);
    cassandraInputFormat.open(null);

    Tuple2<String, String> testOutputTuple = new Tuple2<>();
    cassandraInputFormat.nextRecord(testOutputTuple);

    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

The way I figured this out was thanks to finding the code for the "CassandraInputFormat" class and seeing how it worked (http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java). I honestly expected it to just be a format and not the full class of reading from Cassandra based on the name, and I have a feeling others might be thinking the same thing.

Hug answered 14/3, 2017 at 15:33 Comment(2)
Can I write POJO to cassandra that doesn't have Tuple2<String, String> format instead return its own event typeDinin
Yes, but it requires custom code. I made a version of the Cassandra output formatter that uses a POJO instead of a tuple with only a few modifications to the CassandraOutputFormat code.Hug
J
2
    ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("localhost").withPort(9042).build();
        }
    };

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));

    DataStreamSource t = env.createInput(inputFormat,  TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
    tableEnv.registerDataStream("t1",t);
    Table t2 = tableEnv.sql("select * from t1");

    t2.printSchema();
Jaban answered 20/9, 2018 at 7:9 Comment(0)
B
0

You can use RichFlatMapFunction to extend a class

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
  var userCollection: MongoCollection[Document] = _
  override def open(parameters: Configuration): Unit = {
// do something here like opening connection
    val client: MongoClient = MongoClient("mongodb://localhost:10000")

    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
    super.open(parameters)
  }
  override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {

// Do something here per record and this function can make use of objects initialized via open
      userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
        (result: Document) => {
//          println(result)
                      },
      (t: Throwable) =>{
        println(t)
      },
        ()=>{
          out.collect(event)
        }
      )
    }


  }

}

Basically open function executes once per worker and flatmap executes it per record. The example is for mongo but can be similarly used for cassandra

Blether answered 6/3, 2017 at 8:25 Comment(4)
Thanks @Gaurav Can you point me to a similar example in java instead of scala ?Moskow
#34224923Blether
Thank you @GauravMoskow
can you upvote/accept it as answer if that helps youBlether
P
0

In your case as I understand the first step of your pipeline is reading data from Cassandra rather than writing a RichFlatMapFunction you should write your own RichSourceFunction

As a reference you can have a look at simple implementation of WikipediaEditsSource.

Prominent answered 6/3, 2017 at 9:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.