How to extract doc from avro data and add it to dataframe
Asked Answered
B

3

6

I'm trying to create hive/impala tables base on avro files in HDFS. The tool for doing the transformations is Spark.

I can't use spark.read.format("avro") to load the data into a dataframe, as in that way the doc part (description of the column) will be lost. I can see the doc by doing:

 input = sc.textFile("/path/to/avrofile")
 avro_schema = input.first() # not sure what type it is 

The problem is, it's a nested schema and I'm not sure how to traverse it to map the doc to the column description in dataframe. I'd like to have doc to the column description of the table. For example, the input schema looks like:

"fields": [
    {
     "name":"productName",
     "type": [
       "null",
       "string"
      ],
     "doc": "Real name of the product"
     "default": null
    },
    {
     "name" : "currentSellers",
     "type": [
        "null",
        {
         "type": "record",
         "name": "sellers",
         "fields":[
             {
              "name": "location",
              "type":[
                 "null",
                  {
                   "type": "record"
                   "name": "sellerlocation",
                   "fields": [
                      {
                       "name":"locationName",
                       "type": [
                           "null",
                           "string"
                         ],
                       "doc": "Name of the location",
                       "default":null
                       },
                       {
                       "name":"locationArea",
                       "type": [
                           "null",
                           "string"
                         ],
                       "doc": "Area of the location",#The comment needs to be added to table comments
                       "default":null
                         .... #These are nested fields 

In the final table, for example one field name would be currentSellers_locationName, with column description "Name of the location". Could someone please help to shed some light on how to parse the schema and add the doc to description? and explain a bit about what this below bit is about outside of the fields? Many thanks. Let me know if I can explain it better.

         "name" : "currentSellers",
     "type": [
        "null",
        {
         "type": "record",
         "name": "sellers",
         "fields":[
             {
  
Bushnell answered 22/7, 2022 at 5:54 Comment(6)
Could you please explain what map the doc to the column description of the dataframe means? Where do you want to add this description?Deannadeanne
@Deannadeanne I edited the post. Thanks. I'd like to have doc to the column description of the table.Bushnell
Sorry, I'm not able to understand. In hive tables you can add comments to a column. Is this what you need? I'm not sure if comments can be added directly from the schema. Also, in spark dataframes you can add metadata to columns but again I don't think when the dataframe is written to the table it will write the metadata as comments into the hive table. Please correct me if my understanding of your problem is wrong.Deannadeanne
@Deannadeanne yes I'm trying to add metadata to columns via spark, but the problem is I'm not sure how to automatically parse the doc that comes with avro, and added it to the corresponding column in the dataframe.Bushnell
@Bushnell How do you plan to flatten the nested Avro data to spark table? And how do you extract the original schema?Adelladella
You haven't really provided the full output file example, have you? The last code section seems incomplete and not even formatted, so I'm not sure what does it represent, as even the string you mentioned above ("Name of the location") does not exist the following code section.Mariannamarianne
A
1

If you would like to parse the schema yourself and manually add metadata to spark, I would suggest flatdict package:

from flatdict import FlatterDict

flat_schema = FlatterDict(schema)  # schema as python dict

names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}

# keep only keys which are present in both names and docs
keys_with_doc = set(names.keys()) & set(docs.keys())

full_name = lambda key: '_'.join(
    names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}

A typical set of keys in flat_schema.keys() is:

'fields:1:type:1:fields:0:type:1:fields:0:type:1',
'fields:1:type:1:fields:0:type:1:fields:0:name',
'fields:1:type:1:fields:0:type:1:fields:0:default',
'fields:1:type:1:fields:0:type:1:fields:0:doc',

These strings can now be manipulated:

  1. extract only the ones ending with "name" and "doc" (ignore "default", etc.)
  2. get set intersection to remove the ones that do not have both fields present
  3. get a list of all field names from higher levels of hierarchy: fields:1:type:1:fields is one of parents of fields:1:type:1:fields:0:type:1:fields (the condition is that they have the same start and they end with "fields")
Adelladella answered 6/8, 2022 at 7:52 Comment(2)
Thanks! Could you explain a bit more about set(names.keys()) & set(docs.keys()) and how you used full_name?Bushnell
@Bushnell I added some more explanation, but it will be easiest to understand if you look at separate parts of the code and what they output. set(list_a) & set(list_b) is just a convenient way to remove elements which are not present in both lists.Adelladella
P
1

Looking at: https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L58

It seems like Spark's implementation of the schema converter does not call the getDoc function in the org.apache.avro.Schema object or the doc function of the org.apache.avro.Schema.Field objects it iterates through

It only reads the type metadata and tries to convert it to Spark's internal type objects (StructType, StringType, LongType, etc...)

The only 2 solutions I can think of are:

  1. reading one or some of the files yourself and parsing the schema
  2. Save the schema in a seperate file which should be easy enough to parse since its plain JSON
Pfaff answered 1/8, 2022 at 13:59 Comment(0)
A
1

If you would like to parse the schema yourself and manually add metadata to spark, I would suggest flatdict package:

from flatdict import FlatterDict

flat_schema = FlatterDict(schema)  # schema as python dict

names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}

# keep only keys which are present in both names and docs
keys_with_doc = set(names.keys()) & set(docs.keys())

full_name = lambda key: '_'.join(
    names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}

A typical set of keys in flat_schema.keys() is:

'fields:1:type:1:fields:0:type:1:fields:0:type:1',
'fields:1:type:1:fields:0:type:1:fields:0:name',
'fields:1:type:1:fields:0:type:1:fields:0:default',
'fields:1:type:1:fields:0:type:1:fields:0:doc',

These strings can now be manipulated:

  1. extract only the ones ending with "name" and "doc" (ignore "default", etc.)
  2. get set intersection to remove the ones that do not have both fields present
  3. get a list of all field names from higher levels of hierarchy: fields:1:type:1:fields is one of parents of fields:1:type:1:fields:0:type:1:fields (the condition is that they have the same start and they end with "fields")
Adelladella answered 6/8, 2022 at 7:52 Comment(2)
Thanks! Could you explain a bit more about set(names.keys()) & set(docs.keys()) and how you used full_name?Bushnell
@Bushnell I added some more explanation, but it will be easiest to understand if you look at separate parts of the code and what they output. set(list_a) & set(list_b) is just a convenient way to remove elements which are not present in both lists.Adelladella
A
0

Code from @bzu

from flatdict import FlatterDict

flat_schema = FlatterDict(schema)  # schema as python dict

names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}

keys_with_doc = set(names.keys()) & set(docs.keys())

full_name = lambda key: '_'.join(
    names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}

Explanation

FlatterDict

E.g.:

value = flatdict.FlatterDict({'list': ['a', 'b', 'c']})

will be the same as:

value == {'list:0': 'a', 'list:1': 'b', 'list:2': 'c'}

So basically your nested list basically will look like this:

{"fields0": ..., "fields1": ...}

More information about FlatterDict here

set(names.keys()) & set(docs.keys())

The names.keys() and docs.keys get all the keys from the dictionary, and currently there are multiple values, so we have to put them into a set to be grouped into one variable. The line then uses & to perform a bitwise operator on the two sets, essentially finding all duplicates in both sets and putting them into keys_with_doc. E.g.

a={1:"a", 3:"c", 4:"d"}
b={1:"a", 2:"b"}
name=set(a.keys()) & set(b.keys())
name

>>> {1}

Keys are the ones on the left:

dict = {1:"a",2:"b"}

full_name

This lambda function is tricky. It joins a bunch of stuff with "_". This "bunch of stuff" is the dictionary of names, which a loop will run through and extract names[k] from the sorted names dictionary using key=len and an if statement. Basically by using this lambda in name_doc_map, you are creating a new dictionary that resorts and conditions from the original set keys_with_doc along with docs.

Assemblyman answered 8/8, 2022 at 7:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.