How to access org.apache.hadoop.fs.FileUtil from pyspark?
Asked Answered
C

1

6

I am trying to access the org.apache.hadoop.fs.FileUtil.unTar directly from a pyspark shell.

I understand that I can access the underlying virtual machine (via py4j) sc._jvm to do this, but am struggling to actually connect to hdfs (despite my pyspark sessions being completely otherwise functional, and able to run jobs across the cluster against jobs inside the cluster).

For example:

hdpUntar = sc._jvm.org.apache.hadoop.fs.FileUtil.unTar
hdpFile = sc._jvm.java.io.File

root    = hdpFile("hdfs://<url>/user/<file>")
target  = hdpFile("hdfs://<url>/user/myuser/untar")

hdpUntar(root, target)

Unfortunately, this doesn't work:

Py4JJavaError: An error occurred while calling z:org.apache.hadoop.fs.FileUtil.unTar.

: ExitCodeException exitCode=128: tar: Cannot connect to hdfs: resolve failed
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
    at org.apache.hadoop.fs.FileUtil.unTarUsingTar(FileUtil.java:675)
    at org.apache.hadoop.fs.FileUtil.unTar(FileUtil.java:651)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Cyan answered 25/4, 2016 at 12:6 Comment(1)
Later, tried it from scala - looks like the code just pipes it out locally anyway.Cyan
F
0

This should work in databricks:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("dbfs:///"), Configuration())

status = fs.listStatus(Path('dbfs:/mnt/gobidatagen/tpch/sf100_parquet/'))

for fileStatus in status:
    print(fileStatus.getPath())
Flank answered 30/11, 2021 at 16:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.