I'm in trouble trying to remove rows from a dataframe based on two-column list of items to filter. For example, for this dataframe:
df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
# +------+------+---+
# |number|letter| id|
# +------+------+---+
# | 100| A|304|
# | 200| B|305|
# | 300| C|306|
# +------+------+---+
I can easily remove rows using isin
on one column:
df.where(~col('number').isin([100, 200])).show()
# +------+------+---+
# |number|letter| id|
# +------+------+---+
# | 300| C|306|
# +------+------+---+
But when I try to remove them by two columns I get an exception:
df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions.lit(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
After some investigation, I realized that the root cause of the problem is creating literals from non-primitive types. I tried the following code in PySpark:
lit((100, 'A'))
lit([100, 'A'])
and the following in Scala:
lit((100, "A"))
lit(List(100, "A"))
lit(Seq(100, "A"))
lit(Array(100, "A"))
but with no luck. Does anyone know the way to create literal array in Spark/PySpark? Or is there another method to filter dataframe by two columns?