Spark SQL: How to call UDF from DataFrame operation using JAVA
Asked Answered
O

3

2

I would like to know how to call UDF function from function of domain-specific language(DSL) in Spark SQL using JAVA.

I have UDF function (just for example):

UDF2 equals = new UDF2<String, String, Boolean>() {
   @Override
   public Boolean call(String first, String second) throws Exception {
       return first.equals(second);
   }
};

I've registered it to sqlContext

sqlContext.udf().register("equals", equals, DataTypes.BooleanType);

When I run following query, my UDF is called and I get a result.

sqlContext.sql("SELECT p0.value FROM values p0 WHERE equals(p0.value, 'someString')");

I would transfrom this query using functions of domain specific language in Spark SQL, and I am not sure how to do it.

valuesDF.select("value").where(???);

I found that there exists callUDF() function where one of its parameters is Function2 fnctn but not UDF2. How can I use UDF and functions from DSL?

Overdue answered 20/11, 2015 at 14:45 Comment(0)
O
4

I found a solution with which I am half-satisfied. It is possible to call UDF as a Column Condition such as:

valuesDF.filter("equals(columnName, 'someString')").select("columnName");

But I still wonder if it is possible to call UDF directly.


Edit:

Btw, it is possible to call udf directly e.g:

df.where(callUdf("equals", scala.collection.JavaConversions.asScalaBuffer(
                        Arrays.asList(col("columnName"), col("otherColumnName"))
                    ).seq())).select("columnName");

import of org.​apache.​spark.​sql.​functions is required.

Overdue answered 23/11, 2015 at 15:42 Comment(1)
That's interesting, a subtle difference from the scala API!Subterrane
S
0

When querying a dataframe, you should just be able to execute the UDF using something like this:

sourceDf.filter(equals(col("columnName"), "someString")).select("columnName")

where col("columnName") is the column you want to compare.

Subterrane answered 20/11, 2015 at 15:32 Comment(3)
I assumed that it should work like you described, but it doesn't work. I got this exception java.lang.RuntimeException: Uncompilable source code - Erroneous tree type: <any>Overdue
Ok it ^^ was error in my NetBeans... Your solution does not work. It is because: method equals in class Object cannot be applied to given types; | required: Object | found: Column,String | reason: actual and formal argument lists differ in length Also calling equals.call(col("columnName"), "someString") is not solution, since call() require String, String as parameter and col() returns Column. Does anyone have any suggestion how to deal with UDFs?Overdue
Hmm, sorry I don't know then, that's an odd one! That works in the Scala API, but I can't get it to work in the Java API either.Subterrane
P
0

Here is working code example. It works with Spark 1.5.x and 1.6.x. The trick to calling UDF's from within a pipeline transformer is to use the sqlContext() on the DataFrame to register your UDF

@Test
public void test() {
    // https://issues.apache.org/jira/browse/SPARK-12484
    logger.info("BEGIN");

    DataFrame df = createData();        
    final String tableName = "myTable";
    sqlContext.registerDataFrameAsTable(df, tableName);

    logger.info("print schema");
    df.printSchema();
    logger.info("original data before we applied UDF");
    df.show();

    MyUDF udf = new MyUDF();
    final String udfName = "myUDF";
    sqlContext.udf().register(udfName, udf, DataTypes.StringType);

    String fmt = "SELECT *, %s(%s) as transformedByUDF FROM %s";
    String stmt = String.format(fmt, udfName, tableName+".labelStr", tableName); 
    logger.info("AEDWIP stmt:{}", stmt);
    DataFrame udfDF = sqlContext.sql(stmt);
    Row[] results = udfDF.head(3);
    for (Row row : results) {
        logger.info("row returned by applying UDF {}", row);
    }

    logger.info("AEDWIP udfDF schema");
    udfDF.printSchema();
    logger.info("AEDWIP udfDF data");
    udfDF.show();


    logger.info("END");
}

DataFrame createData() {
    Features f1 = new Features(1, category1);
    Features f2 = new Features(2, category2);
    ArrayList<Features> data = new ArrayList<Features>(2);
    data.add(f1);
    data.add(f2);
    //JavaRDD<Features> rdd = javaSparkContext.parallelize(Arrays.asList(f1, f2));
    JavaRDD<Features> rdd = javaSparkContext.parallelize(data);
    DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
    return df;
}

class MyUDF implements UDF1<String, String> {
    private static final long serialVersionUID = 1L;

    @Override
    public String call(String s) throws Exception {
        logger.info("AEDWIP s:{}", s);
        String ret = s.equalsIgnoreCase(category1) ?  category1 : category3;
        return ret;
    }
}

public class Features implements Serializable{
    private static final long serialVersionUID = 1L;
    int id;
    String labelStr;

    Features(int id, String l) {
        this.id = id;
        this.labelStr = l;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getLabelStr() {
        return labelStr;
    }

    public void setLabelStr(String labelStr) {
        this.labelStr = labelStr;
    }
}

this is the output

+---+--------+
| id|labelStr|
+---+--------+
|  1|   noise|
|  2|     ack|
+---+--------+

root
 |-- id: integer (nullable = false)
 |-- labelStr: string (nullable = true)
 |-- transformedByUDF: string (nullable = true)

+---+--------+----------------+
| id|labelStr|transformedByUDF|
+---+--------+----------------+
|  1|   noise|           noise|
|  2|     ack|          signal|
+---+--------+----------------+
Pipestone answered 21/1, 2016 at 0:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.