In our organization, we have been trying to use hadoop ecosystem based tools to implement ETLs lately. Although the ecosystem itself is quite big, we are using only a very limited set of tools at the moment. Our typical pipeline flow is as follows:
Source Database (1 or more) -> sqoop import -> pig scripts -> sqoop export -> Destination Database (1 or more)
Over a period of time, we have encountered multiple issues with the above approach to implementing ETLs. One problem we notice a fair bit is that the fields don't align properly when trying to read a field from HDFS using pig (where data on HDFS has been typically imported with sqoop) and pig script fails with an error. For instance, a string may end up in a numeric field type due to misalignment.
It appears that there are two approaches to this problem:
Remove the problem characters you know of in fields before processing with pig. This is the approach we have taken in the past. We do know that we have some bad data in our source databases - typically new lines and tabs in fields that shouldn't exist. (NOTE: we used to have tabs as field delimiters). So what we did is to use a DB view or a free-form query option sqoop that in turn uses a REPLACE function or its equivalent that's available in the source DB (typically mysql but less often postgres). This approach does work but it has the side effect of HDFS data not matching the source data. In addition, some of the other imported fields will no longer make sense - for instance, imagine that you have a MD5 or SHA1 hash on a field but the field has been modified to replace some characters, so we have to compute MD5 or SHA1 to be consistent instead of importing the one from the source DB. In addition, this approach involves trial and error to a certain extent. We wouldn't necessarily know which fields need to be modified ahead of time (and which characters to remove) so we might need more than one iteration to reach our end goal.
Use enclosure feature with sqoop in combination with escaping and combine this with a loader of an appropriate type in pig so that not only do the fields do line up properly but a given field (and its associated values) are represented the same way as the data moves through the pipeline.
I was trying to figure out a good way to accomplish #2 using different options available in sqoop and pig. Presented below is an outline of what I have tried so far in addition to the findings.
Below are the specific versions of software used for this experiment:
Sqoop: 1.4.3
Pig: 0.12.0
Hadoop: 2.0.0
Since our data sets are typically large (and would take several hours to process), I figured I'll come up with an extremely small data set that kind of mimics some of the data issues we have had. Towards this end, I put together a small table in mysql (which will be used as source DB):
mysql> desc example;
+-------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(1024) | YES | | NULL | |
| v1 | int(11) | YES | | NULL | |
| v2 | int(11) | YES | | NULL | |
| v3 | int(11) | YES | | NULL | |
+-------+---------------+------+-----+---------+----------------+
5 rows in set (0.00 sec)
After data has been added with INSERT statement, here are the contents of the example table:
mysql> select * from example;
+----+----------------------------------------------------------------------------+------+------+------+
| id | name | v1 | v2 | v3 |
+----+----------------------------------------------------------------------------+------+------+------+
| 1 | Some string, with a comma. | 1 | 2 | 3 |
| 2 | Another "string with quotes" | 4 | 5 | 6 |
| 3 | A string with
new line | 7 | 8 | 9 |
| 4 | A string with 3 new lines -
first new line
second new line
third new line | 10 | 11 | 12 |
| 5 | a string with "quote" and a
new line | 13 | 14 | 15 |
| 6 | clean record | 0 | 1 | 2 |
| 7 | single
newline | 0 | 1 | 2 |
| 8 | | 51 | 52 | 53 |
| 9 | NULL | 105 | NULL | 103 |
+----+----------------------------------------------------------------------------+------+------+------+
9 rows in set (0.00 sec)
We can readily see the new lines in name field. I didn't include tabs in this data set as I switched the delimiter from tab to comma, so there is one record with a comma. Since typical enclosing character is a double quote, there are some records with double quotes. Finally in the last two records (id = 8 and 9), I wanted to see how the empty string and nulls are represented in char type field and how null is represented in numeric type field.
I tried the following sqoop import on the above table:
sqoop import --connect jdbc:mysql://localhost/test --username user --password pass --table example --columns 'id, name, v1, v2, v3' --verbose --split-by id --target-dir example --fields-terminated-by , --escaped-by \\ --enclosed-by \" --num-mappers 1
Notice that blackslash has been used an escape character, double quote as an enclosure, and comma as field delimiter.
Here is how the data looks on HDFS:
$hadoop fs -cat example/part-m-00000
"1","Some string, with a comma.","1","2","3"
"2","Another \"string with quotes\"","4","5","6"
"3","A string with
new line","7","8","9"
"4","A string with 3 new lines -
first new line
second new line
third new line","10","11","12"
"5","a string with \"quote\" and a
new line","13","14","15"
"6","clean record","0","1","2"
"7","single
newline","0","1","2"
"8","","51","52","53"
"9","null","105","null","103"
I created a small pig script to read and parse the above data:
REGISTER '……./pig/contrib/piggybank/java/piggybank.jar';
data = LOAD 'example' USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE') AS (id:int, name:chararray, v1:int, v2:int, v3:int);
dump data;
Notice the usage of CSVExcelStorage loader that's available in piggybank. Since we have newlines in the incoming data set, we enable MULTILINE option. The above script produces the following output:
(1,Some string, with a comma.,1,2,3)
(2,Another \string with quotes\",4,5,6)
(3,A string with
new line,7,8,9)
(4,A string with 3 new lines -
first new line
second new line
third new line,10,11,12)
(5,a string with \quote\" and a
new line,13,14,15)
(6,clean record,0,1,2)
(7,single
newline,0,1,2)
(8,",51,52,53)
(9,null,105,,103)
In records with id 2 and 5, a blackslash remains in the place of the very first double quote while for subsequent double quotes, both the slash as well as the quote remains. This is not exactly what I want. Noting that CSVExcelStorage, based on Excel 2007, uses double quotes to escape quote (i.e., consecutive double quotes are treated as single double quote), I made the escape character a double quote:
sqoop import --connect jdbc:mysql://localhost/test --username user --password pass --table example --columns 'name, v1, v2, v3' --verbose --split-by id --target-dir example --fields-terminated-by , --escaped-by '\"' --enclosed-by '\"' --num-mappers 1
Before executing the above command, I deleted the existing data: $hadoop fs -rm -r example
After the sqoop import runs through, here is how data looks on HDFS now:
$hadoop fs -cat example/part-m-00000
"1","Some string, with a comma.","1","2","3"
"2","Another """"string with quotes""""","4","5","6"
"3","A string with
new line","7","8","9"
"4","A string with 3 new lines -
first new line
second new line
third new line","10","11","12"
"5","a string with """"quote"""" and a
new line","13","14","15"
"6","clean record","0","1","2"
"7","single
newline","0","1","2"
"8","","51","52","53"
"9","null","105","null","103"
I ran the same pig script once more on this data and it produces the following output:
(1,Some string, with a comma.,1,2,3)
(2,Another ""string with quotes"",4,5,6)
(3,A string with
new line,7,8,9)
(4,A string with 3 new lines -
first new line
second new line
third new line,10,11,12)
(5,a string with ""quote"" and a
new line,13,14,15)
(6,clean record,0,1,2)
(7,single
newline,0,1,2)
(8,",51,52,53)
(9,null,105,,103)
Noticing that any double quotes in the string are now doubled effectively, I can get rid of this by using REPLACE function in pig:
data2 = FOREACH data GENERATE id, REPLACE(name, '""', '"') as name, v1, v2, v3;
dump data2;
The above script produces the following output:
(1,Some string, with a comma.,1,2,3)
(2,Another "string with quotes",4,5,6)
(3,A string with
new line,7,8,9)
(4,A string with 3 new lines -
first new line
second new line
third new line,10,11,12)
(5,a string with "quote" and a
new line,13,14,15)
(6,clean record,0,1,2)
(7,single
newline,0,1,2)
(8,",51,52,53)
(9,null,105,,103)
The above looks much more like the output I want. One last item I need to ensure is that nulls and empty strings for chararray type and nulls for int type are accounted for.
Towards that end, I add one more section to the above pig script that generates null and empty strings for char type and null for int type:
data3 = FOREACH data2 GENERATE id, name, v1, v2, v3, null as name2:chararray, '' as name3:chararray, null as v4:int;
dump data3;
The output looks as follows:
(1,Some string, with a comma.,1,2,3,,,)
(2,Another "string with quotes",4,5,6,,,)
(3,A string with
new line,7,8,9,,,)
(4,A string with 3 new lines -
first new line
second new line
third new line,10,11,12,,,)
(5,a string with "quote" and a
new line,13,14,15,,,)
(6,clean record,0,1,2,,,)
(7,single
newline,0,1,2,,,)
(8,",51,52,53,,,)
(9,null,105,,103,,,)
I stored the same output in HDFS using the following pig script:
STORE data3 INTO 'example_output' USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE');
Here is how data on HDFS looks like:
$hadoop fs -cat example_output/part-m-00000
1,"Some string, with a comma.",1,2,3,,,
2,"Another ""string with quotes""",4,5,6,,,
3,"A string with
new line",7,8,9,,,
4,"A string with 3 new lines -
first new line
second new line
third new line",10,11,12,,,
5,"a string with ""quote"" and a
new line",13,14,15,,,
6,clean record,0,1,2,,,
7,"single
newline",0,1,2,,,
8,"""",51,52,53,,,
9,null,105,,103,,,
For nulls and empty strings, the only two records of interest are the bottom two ones (id = 8 and 9). It’s clear that there is a difference between empty string and null from source using sqoop versus that which is generated from pig. I could account for null and empty strings in the name field above similar to how I have done for the double quote but it seems rather manual and more steps than needed.
Notice that although we have used "enclosed-by" option in sqoop import (as opposed to "optionally-enclosed-by" option), the output from PIG uses enclosure only when there is a need for it i.e., if a quote or a comma appears in the field, then enclosing is performed, otherwise not - in other words, this looks like the sqoop equivalent of "optionally-enclosed-by" option.
The final stage in the pipeline is sqoop export. I put together the following table:
mysql> desc example_output;
+-------+---------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------------+------+-----+---------+-------+
| id | int(11) | YES | | NULL | |
| name | varchar(1024) | YES | | NULL | |
| v1 | int(11) | YES | | NULL | |
| v2 | int(11) | YES | | NULL | |
| v3 | int(11) | YES | | NULL | |
| name2 | varchar(1024) | YES | | NULL | |
| name3 | varchar(1024) | YES | | NULL | |
| v4 | int(11) | YES | | NULL | |
+-------+---------------+------+-----+---------+-------+
8 rows in set (0.00 sec)
Here is the sqoop export command I used:
sqoop export --connect jdbc:mysql://localhost/test --username user --password pass --table example_output --export-dir example_output --input-fields-terminated-by , --input-escaped-by '\"' --input-optionally-enclosed-by '\"' --num-mappers 1 --verbose
The export options are similar to import options except that the "enclosed-by" has been replaced by "optionally-enclosed-by" and an "input-" prefix has been added to some of the options (e.g: --input-fields-terminated-by) since sqoop export uses those while reading input from HDFS.
This fails with the following error in the logs:
2014-02-25 22:19:05,750 ERROR org.apache.sqoop.mapreduce.TextExportMapper: Exception:
java.lang.RuntimeException: Can't parse input data: 'Some string, with a comma.,1,2,3,,,'
at example_output.__loadFromFields(example_output.java:396)
at example_output.parse(example_output.java:309)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:83)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:794)
at example_output.__loadFromFields(example_output.java:366)
... 12 more
2014-02-25 22:19:05,756 ERROR org.apache.sqoop.mapreduce.TextExportMapper: On input: 1,"Some string, with a comma.",1,2,3,,,
2014-02-25 22:19:05,757 ERROR org.apache.sqoop.mapreduce.TextExportMapper: On input file: hdfs://nameservice1/user/xyz/example_output/part-m-00000
2014-02-25 22:19:05,757 ERROR org.apache.sqoop.mapreduce.TextExportMapper: At position 0
In an effort to troubleshoot this problem, I created a HDFS location that has only one record (id = 6) from the input data set:
$ hadoop fs -cat example_output_single_record/part-m-00000
6,clean record,0,1,2,,,
Now the sqoop export command becomes:
sqoop export --connect jdbc:mysql://localhost/test --username user --password pass --table example_output --export-dir example_output_single_record --input-fields-terminated-by , --input-escaped-by '\"' --input-optionally-enclosed-by '\"' --num-mappers 1 --verbose
The above command runs through fine and produces the desired result of inserting the single record into the destination DB:
mysql> select * from example_output;
+------+--------------+------+------+------+-------+-------+------+
| id | name | v1 | v2 | v3 | name2 | name3 | v4 |
+------+--------------+------+------+------+-------+-------+------+
| 6 | clean record | 0 | 1 | 2 | | | NULL |
+------+--------------+------+------+------+-------+-------+------+
1 row in set (0.00 sec)
While null value has been preserved for the numeric field, both null and empty string mapped to empty string in the destination DB.
With the above as the background, here are the questions:
I think it would be easier if we can ensure that a given value for a given data type will be represented/processed exactly the same way regardless of whether it’s coming from sqoop or generated by pig. Has anyone figured out a way to ensure consistent representation/processing of a given data type while preserving the original field values? I have covered only two data types here (chararray and int) but I suppose some of the other data types also have potentially similar issues.
I have used "enclosed-by" option in sqoop import instead of "optionally-enclosed-by" so that every field value will be enclosed within double quotes. I just thought it would be a source of less confusion if every value in every field was enclosed instead of just those that need enclosing. What do others use and has one of these options worked better for your use case relative to the other? It looks like CSVExcelStorage doesn't support a notion of "enclosed-by" - are there any other storage functions that support this mechanism?
Any suggestions on how to get the sqoop export to work as intended on the full output of pig script (i.e., example_output on HDFS)?