What are schemas for in Apache Beam?
Asked Answered
M

3

6

I was reading the docs about SCHEMAS in Apache BEAM but i can not understand what its purpose is, how and why or in which cases should i need to use them. What is the difference between using schemas or using a class that extends the Serializable interface?

The docs has an example:

@DefaultSchema(JavaFieldSchema.class)
public class TransactionPojo {
  public String bank;
  public double purchaseAmount;
}
PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();

But it doesn't explain how readTransactionsAsPojo function is built. I think there are a lot of missing explanation about this.

Mai answered 16/6, 2020 at 16:13 Comment(1)
Yeah. I was trying to read using JdbcIO after implementing the schema as mentioned in Doc , but apparently they we must pass RowMapper for reading via Jdbc. I don't understand how we can use the already defined POJO Schema here.Balfore
R
1

There are several reasons to use Beam Schema, some of them are below:

  • You won't need to specify a Coder for objects with schema;
  • If you have the objects with the same schema, but represented in a different way (like, JavaBean and Pojo in your example), then Beam Schema will allow to use the same Schema PTransforms for the PCollections of these objects;
  • With Schema-aware PCollections it's much easier to write joins since it will require much less code boilerplate;
  • To use BeamSQL over PCollection it will require you to have a Beam Schema. Like, you can read Avro files with a schema that will be automatically converted into Beam Schema and then you apply a Beam SQL transform over these Avro records.

Also, I'd recommend to watch these talk from Beam Summit 2019 about Schema-aware PCollections and Beam SQL.

Regular answered 18/6, 2020 at 17:56 Comment(0)
K
0

Still there is NO answer as how readTransactionsAsPojo() has been implemented

PCollection<TransactionPojos> transactionPojos = readTransactionsAsPojo();

Keeping document abstract and not having complete code in repo, is hard to understand!!

Kiser answered 22/7, 2022 at 7:48 Comment(0)
C
0

A sample code which worked for me

package com.beam.test;

import com.beam.test.schema.Address;
import com.beam.test.schema.Purchase;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

import java.util.ArrayList;
import java.util.List;

public class SchemaExample {
    public static void main(String[] args) {
        PipelineOptions options= PipelineOptionsFactory.create();
        Pipeline pipeline=Pipeline.create(options);
    pipeline.apply("Create input:", TextIO.read().from("path\to\input\file.txt"))
                 .apply(ParDo.of(new ConvertToPurchase())).
                apply(ParDo.of(new DoFn<Purchase, Void>() {
                    @ProcessElement
                    public void processElement(@Element Purchase purchase){
                        System.out.println(purchase.getUserId()+":"+purchase.getAddress().getHouseName());
                    }
                }));

         pipeline.run().waitUntilFinish();

    }

    static class ConvertToPurchase extends DoFn<String,Purchase>{
        @ProcessElement
        public void processElement(@Element String input,OutputReceiver<Purchase> outputReceiver){
            String[] inputArr=input.split(",");
            Purchase purchase=new Purchase(inputArr[0],new Address(inputArr[1],inputArr[2]));
            outputReceiver.output(purchase);
        }
    }
}

package com.beam.test.schema;

import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;

@DefaultSchema(JavaBeanSchema.class)
public class Purchase {

    private String userId;
    private Address address;
    public String getUserId(){
        return userId;
    }
    public Address getAddress(){
        return address;
    }

    @SchemaCreate
    public Purchase(String userId, Address address){
        this.userId=userId;
        this.address=address;
    }
}


package com.beam.test.schema;

import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;

@DefaultSchema(JavaBeanSchema.class)
public class Address {
    private String houseName;
    private String postalCode;

    public String getHouseName(){
        return houseName;
    }
    public String getPostalCode(){
        return postalCode;
    }
    @SchemaCreate
    public Address(String houseName,String postalCode){
        this.houseName=houseName;
        this.postalCode=postalCode;
    }
}

My test file contains data in below format

user1,abc,1234
user2,def,3456
Cloris answered 22/8, 2022 at 15:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.