How to pass array column as argument in VectorUdf in .Net Spark?
Asked Answered
C

1

6

I'm trying to implement Vector Udf in C# Spark.

I have created .Net Spark environment by following Spark .Net. Vector Udf (Apache arrow and Microsoft.Data.Analysis both) worked for me for IntegerType column. Now, trying to send the Integer array type column to Vector Udf and couldn't find the way to achieve this.

usings

using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.Sql;
using func = Microsoft.Spark.Sql.Functions;
using DataFrame = Microsoft.Spark.Sql.DataFrame;
using Arrow = Apache.Arrow;

program

SparkSession spark = SparkSession
                .Builder()
                .AppName("sample")
                .GetOrCreate();

DataFrame dataFrame = spark.Range(0, 100).Repartition(4);
            
            Func<Column, Column> array20 = func.Udf<int, int[]>(
                (col1) => Enumerable.Range(0, col1).ToArray());

            dataFrame = dataFrame.WithColumn("array", array20(dataFrame["id"]));

// Apache Arrow
            var arrowVectorUdf = ArrowFunctions.VectorUdf<Arrow.UInt64Array, Arrow.Int64Array>((id) =>
            {
                var int32Array = new Arrow.Int64Array.Builder();
                var count = id.Length;
                foreach (var item in id.Data.Children)
                {
                    int32Array.Append(item.Length + count);
                }
                return int32Array.Build();
            });

// Microsoft.Data.Analysis
            var dataFrameVector = DataFrameFunctions.VectorUdf<Int64DataFrameColumn, Int64DataFrameColumn>((id) => id + id.Length);

Working

            dataFrame = dataFrame.WithColumn("arrowVectorUdfId", arrowVectorUdf(dataFrame["id"]));

            dataFrame = dataFrame.WithColumn("dataFrameVectorId", dataFrameVector(dataFrame["id"]));

Not working

            dataFrame = dataFrame.WithColumn("arrowVectorUdf", arrowVectorUdf(dataFrame["array"]));

            dataFrame = dataFrame.WithColumn("dataFrameVector", dataFrameVector(dataFrame["array"]));

Above Udfs will work, if i send the "id" column instead of "array" column. I'm not sure , what type should be the argument of the Udfs for "array" column. Above code results same error like below for Apache.Arrow and Microsoft.Data.Analysis,

 [2021-03-25T07:02:05.9218517Z] [LAPTOP-0S8GNQ52] [Error] [TaskRunner] [0] Exiting with exception: System.IO.InvalidDataException: Arrow primitive 'List' is unsupported.
   at Apache.Arrow.Ipc.MessageSerializer.GetFieldArrowType(Field field)
   at Apache.Arrow.Ipc.MessageSerializer.GetSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.<ReadSchema>b__12_0(Memory`1 buff)
   at Apache.Arrow.ArrayPoolExtensions.RentReturn(ArrayPool`1 pool, Int32 length, Action`1 action)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.ReadRecordBatch()
   at Microsoft.Spark.Worker.Command.ArrowBasedCommandExecutor.<GetInputIterator>d__2.MoveNext()
   at Microsoft.Spark.Worker.Command.ArrowOrDataFrameSqlCommandExecutor.ExecuteDataFrameSqlCommand(Stream inputStream, Stream outputStream, SqlCommand[] commands)
   at Microsoft.Spark.Worker.TaskRunner.ProcessStream(Stream inputStream, Stream outputStream, Version version, Boolean& readComplete)
   at Microsoft.Spark.Worker.TaskRunner.Run()
[2021-03-25T07:02:05.9245061Z] [LAPTOP-0S8GNQ52] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2021-03-25T07:02:05.9249567Z] [LAPTOP-0S8GNQ52] [Info] [SimpleWorker] RunSimpleWorker() finished successfully
21/03/25 12:32:05 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/03/25 12:32:06 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
21/03/25 12:32:06 ERROR DotnetBackendHandler: Failed to execute 'showString' on 'org.apache.spark.sql.Dataset' with args=([Type=java.lang.Integer, Value: 20], [Type=java.lang.Integer, Value: 20], [Type=java.lang.Boolean, Value: false])
Cerberus answered 25/3, 2021 at 7:38 Comment(8)
you say this works with "id" but not with "array" column; can u separately paste both code pieces - working and non-working onesJennifferjennilee
@harshvchawla updatedCerberus
i dont know spark at all, so all this is mere speculation: (and dont know how dataFrame looks like when u call the Repartition) - your func.Udf requires an int Array[] argument so maybe the contents in column dataFram["id"] are int array[] but contents of column dataFrame["array"] are of a different type?Jennifferjennilee
yes they are different types and array udf requires different argument.. im trying to find out what i should as argument type.. i couldnt find a doc or sample to achieve the sameCerberus
how does ur data after the repartition call look like; any details of sample u can post? also try debugging by way of using typeof or just doing a ToArray / ToString stuff to see if the compiler tells u of any problem? Like u can even try to enforce data types instead of var while debugging maybeJennifferjennilee
you can pass array like this dataFrame = dataFrame.WithColumn("coordinateArray", udfArray(dataFrame["array"])); as mentioned in the following link medium.com/@3rd/…Babysitter
@harshvchawla problem is just with the argument type of udf. I couldnt set correct argument type.Cerberus
@FerasSalim i'm having problem with vector udf. simple udf sends a single row of data at a time, but i want to send multiple row of data at a time. im trying to send multiples rows of array data with the provisions like in code snippet. ThanksCerberus
P
1

It works for me with both of your code samples. I've created the spark environment just like you, except that the environment was not working for me with Hadoop 2.7 and I separately installed Hadoop 2.7.4

Popple answered 11/4, 2021 at 19:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.