A sample code which worked for me
package com.beam.test;
import com.beam.test.schema.Address;
import com.beam.test.schema.Purchase;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import java.util.ArrayList;
import java.util.List;
public class SchemaExample {
public static void main(String[] args) {
PipelineOptions options= PipelineOptionsFactory.create();
Pipeline pipeline=Pipeline.create(options);
pipeline.apply("Create input:", TextIO.read().from("path\to\input\file.txt"))
.apply(ParDo.of(new ConvertToPurchase())).
apply(ParDo.of(new DoFn<Purchase, Void>() {
@ProcessElement
public void processElement(@Element Purchase purchase){
System.out.println(purchase.getUserId()+":"+purchase.getAddress().getHouseName());
}
}));
pipeline.run().waitUntilFinish();
}
static class ConvertToPurchase extends DoFn<String,Purchase>{
@ProcessElement
public void processElement(@Element String input,OutputReceiver<Purchase> outputReceiver){
String[] inputArr=input.split(",");
Purchase purchase=new Purchase(inputArr[0],new Address(inputArr[1],inputArr[2]));
outputReceiver.output(purchase);
}
}
}
package com.beam.test.schema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
@DefaultSchema(JavaBeanSchema.class)
public class Purchase {
private String userId;
private Address address;
public String getUserId(){
return userId;
}
public Address getAddress(){
return address;
}
@SchemaCreate
public Purchase(String userId, Address address){
this.userId=userId;
this.address=address;
}
}
package com.beam.test.schema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
@DefaultSchema(JavaBeanSchema.class)
public class Address {
private String houseName;
private String postalCode;
public String getHouseName(){
return houseName;
}
public String getPostalCode(){
return postalCode;
}
@SchemaCreate
public Address(String houseName,String postalCode){
this.houseName=houseName;
this.postalCode=postalCode;
}
}
My test file contains data in below format
user1,abc,1234
user2,def,3456