Save Spark Dataframe into Elasticsearch - Can’t handle type exception
Asked Answered
D

1

7

I have designed a simple job to read data from MySQL and save it in Elasticsearch with Spark.

Here is the code:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

You can see the code is very straightforward. It reads the data into a DataFrame, selects some columns and then performs a count as a basic action on the Dataframe. Everything works fine up to this point.

Then it tries to save the data into Elasticsearch, but it fails because it cannot handle some type. You can see the error log here.

I'm not sure about why it can't handle that type. Does anyone know why this is occurring?

I'm using Apache Spark 1.5.0, Elasticsearch 1.4.4 and elaticsearch-hadoop 2.1.1

EDIT:

  • I have updated the gist link with a sample dataset along with the source code.
  • I have also tried to use the elasticsearch-hadoop dev builds as mentionned by @costin on the mailing list.
Depopulate answered 19/9, 2015 at 10:21 Comment(0)
D
17

The answer for this one was tricky, but thanks to samklr, I have managed to figure about what the problem was.

The solution isn't straightforward nevertheless and might consider some “unnecessary” transformations.

First let's talk about Serialization.

There are two aspects of serialization to consider in Spark serialization of data and serialization of functions. In this case, it's about data serialization and thus de-serialization.

From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writables. As such, InputFormat and OutputFormats are required to return Writables which, out of the box, Spark does not understand.

With the elasticsearch-spark connector one must enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

Even since Kryo does not require that a class implement a particular interface to be serialized, which means POJOs can be used in RDDs without any further work beyond enabling Kryo serialization.

That said, @samklr pointed out to me that Kryo needs to register classes before using them.

This is because Kryo writes a reference to the class of the object being serialized (one reference is written for every object written), which is just an integer identifier if the class has been registered but is the full classname otherwise. Spark registers Scala classes and many other framework classes (like Avro Generic or Thrift classes) on your behalf.

Registering classes with Kryo is straightforward. Create a subclass of KryoRegistrator,and override the registerClasses() method:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

Finally, in your driver program, set the spark.kryo.registrator property to the fully qualified classname of your KryoRegistrator implementation:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

Secondly, even thought the Kryo serializer is set and the class registered, with changes made to Spark 1.5, and for some reason Elasticsearch couldn't de-serialize the Dataframe because it can't infer the SchemaType of the Dataframe into the connector.

So I had to convert the Dataframe to an JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

Now the data is ready to be written into elasticsearch :

JavaEsSpark.saveToEs(products, "test/test");

References:

  • Elasticsearch's Apache Spark support documentation.
  • Hadoop Definitive Guide, Chapter 19. Spark, ed. 4 – Tom White.
  • User samklr.
Depopulate answered 9/10, 2015 at 15:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.