pyspark and HDFS commands
Asked Answered
T

4

13

I would like to do some cleanup at the start of my Spark program (Pyspark). For example, I would like to delete data from previous HDFS run. In pig this can be done using commands such as

fs -copyFromLocal ....

rmf /path/to-/hdfs

or locally using sh command.

I was wondering how to do the same with Pyspark.

Tobacconist answered 1/12, 2015 at 4:45 Comment(1)
You cannot do such a thing with Spark. Maybe the best option is to use a oozie workflow in which you can put both HDFS commands and Spark jobs and you can combine them according to the logic you prefer.Priam
E
18

You can execute arbitrary shell command using form example subprocess.call or sh library so something like this should work just fine:

import subprocess

some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])

If you use Python 2.x you can try using spotify/snakebite:

from snakebite.client import Client

host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)

hdfs3 is yet another library which can be used to do the same thing:

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)

Apache Arrow Python bindings are the latest option (and that often is already available on Spark cluster, as it is required for pandas_udf):

from pyarrow import hdfs

fs = hdfs.connect(host, port)
fs.delete(some_path, recursive=True)
Ellata answered 1/12, 2015 at 11:26 Comment(0)
S
29

You can delete an hdfs path in PySpark without using third party dependencies as follows:

from pyspark.sql import SparkSession
# example of preparing a spark session
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
# Prepare a FileSystem manager
fs = (sc._jvm.org
      .apache.hadoop
      .fs.FileSystem
      .get(sc._jsc.hadoopConfiguration())
      )
path = "Your/hdfs/path"
# use the FileSystem manager to remove the path
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

To improve one step further, you can wrap the above idea into a helper function that you can re-use across jobs/packages:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

def delete_path(spark, path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

delete_path(spark, "Your/hdfs/path")
Sadoff answered 2/5, 2019 at 12:28 Comment(3)
I tried, it worked. Thanks. How do I list all the partitions of a directory in HDFS using this API?Ref
I guess using the wildcard '*', e.g. delete_path(spark, "Your/hdfs/path/*")Alban
the fs.delete method which you used, seems to be removed.Stanstance
E
18

You can execute arbitrary shell command using form example subprocess.call or sh library so something like this should work just fine:

import subprocess

some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])

If you use Python 2.x you can try using spotify/snakebite:

from snakebite.client import Client

host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)

hdfs3 is yet another library which can be used to do the same thing:

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)

Apache Arrow Python bindings are the latest option (and that often is already available on Spark cluster, as it is required for pandas_udf):

from pyarrow import hdfs

fs = hdfs.connect(host, port)
fs.delete(some_path, recursive=True)
Ellata answered 1/12, 2015 at 11:26 Comment(0)
A
7

from https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/ using only PySpark

######
# Get fs handler from java gateway
######
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("hdfs://somehost:8020"), sc._jsc.hadoopConfiguration())

# We can now use the Hadoop FileSystem API (https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
fs.listStatus(Path('/user/hive/warehouse'))
# or
fs.delete(Path('some_path'))

the other solutions didn't work in my case, but this blog post helped :)

Applicative answered 6/8, 2019 at 15:18 Comment(1)
Is somehost a host node running a particular service or any other specification? Getting java.net.ConnectException: Connection refused; error when trying using a httpfs-hosting node.Tenebrific
C
1

solution1 -subprocess


def copy_from_local(local_file, hdfs_file, logger):
    import subprocess
    proc = subprocess.Popen(["hdfs", "dfs", "-copyFromLocal", "-f", local_file, hdfs_file])
    proc.communicate()

    if proc.returncode != 0:
        logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))
        return False
    else:
        logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
        return True

solution2 -py4j


def copy_from_local_file(sc, logger, local_file, hdfs_file, delSrc=True, overwrite=True):
    # copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
    Path = sc._jvm.org.apache.hadoop.fs.Path
    try:
        getFileSystem(sc).copyFromLocalFile(delSrc, overwrite, Path(local_file), Path(hdfs_file))
        logger.info("copyFromLocal {} to {} success".format(local_file, hdfs_file))
    except Exception as e:
        logger.error(e)
        logger.info("copyFromLocal {} to {} error".format(local_file, hdfs_file))


def getFileSystem(sc):
    # Prepare a FileSystem manager
    FileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem
    fs = FileSystem.get(sc._jsc.hadoopConfiguration())
    return fs

and you can get the py4j jvm FileSystem objet and do file operation in above

getFileSystem(sc) = {JavaObject} DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_15601292_17, [email protected] (auth:KERBEROS)]]
 access = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e7d0>
 addCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432610>
 addCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435610>
 addDelegationTokens = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44323d0>
 allowSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432990>
 append = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c90>
 areSymlinksEnabled = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cf10>
 cancelDeleteOnExit = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432850>
 clearStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e410>
 close = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb90>
 closeAll = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432c10>
 closeAllForUGI = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432150>
 completeLocalOutput = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435190>
 concat = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e450>
 copyFromLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e650>
 copyToLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e990>
 create = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432790>
 createEncryptionZone = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432310>
 createNewFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44321d0>
 createNonRecursive = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432f90>
 createSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e290>
 createSymlink = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a50>
 delete = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435390>
 deleteOnExit = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec10>
 deleteSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e110>
 disallowSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee50>
 enableSymlinks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ed90>
 equals = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ef90>
 exists = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432050>
 finalizeUpgrade = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432890>
 get = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ea10>
 getAclStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a90>
 getAllStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e8d0>
 getBlockSize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e1d0>
 getCanonicalServiceName = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cdd0>
 getChildFileSystems = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435410>
 getClass = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432290>
 getClient = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44358d0>
 getConf = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435750>
 getContentSummary = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44357d0>
 getCorruptBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432550>
 getDataNodeStats = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432950>
 getDefaultBlockSize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d50>
 getDefaultReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e210>
 getDefaultUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435490>
 getDelegationToken = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c50>
 getDiskStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee10>
 getEZForPath = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435710>
 getFileBlockLocations = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e2d0>
 getFileBlockStorageLocations = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e890>
 getFileChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d10>
 getFileLinkStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435a90>
 getFileStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e250>
 getFileSystemClass = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e850>
 getHomeDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44326d0>
 getInotifyEventStream = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432690>
 getLength = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b90>
 getLinkTarget = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cfd0>
 getLocal = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ef10>
 getMissingBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435450>
 getName = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432350>
 getNamed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ea90>
 getRawCapacity = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435950>
 getRawUsed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e690>
 getReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e150>
 getScheme = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec90>
 getServerDefaults = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432650>
 getSnapshotDiffReport = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e0d0>
 getSnapshottableDirListing = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435a50>
 getStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432490>
 getStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cf50>
 getStoragePolicies = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432e90>
 getUnderReplicatedBlocksCount = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ced0>
 getUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44350d0>
 getUsed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432f10>
 getWorkingDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44329d0>
 getXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432410>
 getXAttrs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432b10>
 globStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435810>
 hashCode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435510>
 initialize = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44320d0>
 isDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435150>
 isFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ed10>
 isFileClosed = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435650>
 isInSafeMode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e4d0>
 listCacheDirectives = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442edd0>
 listCachePools = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435210>
 listCorruptFileBlocks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432ad0>
 listEncryptionZones = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432750>
 listFiles = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435690>
 listLocatedStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb50>
 listStatus = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432590>
 listXAttrs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e550>
 makeQualified = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b50>
 metaSave = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442cd50>
 mkdir = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e310>
 mkdirs = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e5d0>
 modifyAclEntries = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435b10>
 modifyCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435590>
 modifyCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435350>
 moveFromLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432510>
 moveToLocalFile = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e510>
 newInstance = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432bd0>
 newInstanceLocal = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432710>
 notify = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e910>
 notifyAll = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435250>
 open = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432210>
 printStatistics = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e190>
 recoverLease = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432fd0>
 refreshNodes = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44355d0>
 removeAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ce50>
 removeAclEntries = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e390>
 removeCacheDirective = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432a10>
 removeCachePool = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e710>
 removeDefaultAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ce10>
 removeXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ec50>
 rename = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442ee90>
 renameSnapshot = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432d90>
 resolvePath = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e9d0>
 restoreFailedStorage = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432810>
 rollEdits = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432dd0>
 rollingUpgrade = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44324d0>
 saveNamespace = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435c10>
 setAcl = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44352d0>
 setBalancerBandwidth = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e3d0>
 setConf = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44359d0>
 setDefaultUri = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432ed0>
 setOwner = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e050>
 setPermission = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442eb10>
 setQuota = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432c90>
 setReplication = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435990>
 setSafeMode = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435890>
 setStoragePolicy = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44354d0>
 setTimes = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435050>
 setVerifyChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432b90>
 setWorkingDirectory = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432e50>
 setWriteChecksum = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44351d0>
 setXAttr = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b44328d0>
 startLocalOutput = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4432190>
 supportsSymlinks = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b4435ad0>
 toString = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4dbcbdc10>
 wait = {JavaMember} <py4j.java_gateway.JavaMember object at 0x7ff4b442e750>
Contorted answered 12/4, 2021 at 2:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.