I try to write a simple Scala program that dumps data to Parquet files into HDFS.
I create an Avro schema, initialize a ParquetWriter
with this schema, map my records to GenericRecords
following the defined schema and then try to write them with the parquet writer.
But unfortunately I get the following exception when running my program:
java.lang.ClassCastException: parquet.io.MessageColumnIO cannot be cast to parquet.io.PrimitiveColumnIO
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.getColumnWriter(MessageColumnIO.java:339)
at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:376)
at parquet.io.ValidatingRecordConsumer.addBinary(ValidatingRecordConsumer.java:211)
at parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:260)
at parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:167)
at parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:142)
at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:116)
at parquet.hadoop.ParquetWriter.write(ParquetWriter.java:324)
Schema defintion:
val avroSchema: Schema = SchemaBuilder.record("event_snapshots").fields()
.requiredString("userid")
.requiredString("event")
.requiredString("firstevent")
.requiredString("lastevent")
.requiredInt("count")
.endRecord()
val parquetSchema = new AvroSchemaConverter().convert(avroSchema)
Writer:
val writeSupport = new AvroWriteSupport[GenericRecord](parquetSchema, avroSchema, null)
val blockSize = 256 * 1024 * 1024
val pageSize = 64 * 1024
val writer = new ParquetWriter[GenericRecord](outputDir, writeSupport,
CompressionCodecName.SNAPPY, blockSize,
pageSize, pageSize, false, true, configuration)
Record build and write:
val recordBuilder = new GenericRecordBuilder(avroSchema)
recordBuilder.set(avroSchema.getField("userid"), userKey)
recordBuilder.set(avroSchema.getField("event"), eventKey)
recordBuilder.set(avroSchema.getField("firstevent"),
dateTimeDateFormat.format(firstEvent))
recordBuilder.set(avroSchema.getField("lastevent"),
dateTimeDateFormat.format(lastEvent))
recordBuilder.set(avroSchema.getField("count"), event.count)
val record = recordBuilder.build()
writer.write(record)
Any ideas?