hadoop/yarn and task parallelization on non-hdfs filesystems
Asked Answered
E

1

7

I've instantiated a Hadoop 2.4.1 cluster and I've found that running MapReduce applications will parallelize differently depending on what kind of filesystem the input data is on.

Using HDFS, a MapReduce job will spawn enough containers to maximize use of all available memory. For example, a 3-node cluster with 172GB of memory with each map task allocating 2GB, about 86 application containers will be created.

On a filesystem that isn't HDFS (like NFS or in my use case, a parallel filesystem), a MapReduce job will only allocate a subset of available tasks (e.g., with the same 3-node cluster, about 25-40 containers are created). Since I'm using a parallel filesystem, I'm not as concerned with the bottlenecks one would find if one were to use NFS.

Is there a YARN (yarn-site.xml) or MapReduce (mapred-site.xml) configuration that will allow me to effectively maximize resource utilization?

Engelbert answered 12/8, 2014 at 16:58 Comment(0)
B
3

It depends on the file system.

The way locality will work , is that you must implement getBlockLocations, for a given file, inside of your Hadoop FileSYstem interface. For an example, you can see:

An example implementation, from the glusterfs-hadoop filesystem implementation, is here:

public BlockLocation[] getFileBlockLocations(FileStatus file,long start,long len) throws IOException{
    File f=pathToFile(file.getPath());
    BlockLocation[] result=null;

    result=attr.getPathInfo(f.getPath(), start, len);
    if(result==null){
        log.info("Problem getting destination host for file "+f.getPath());
        return null;
    }

    return result;
}

Above you can see that the metadata for files is provided through gluster specific wrappers to which call gluster specific commands to determine which nodes store the actual contents of a file. The BlockLocation[] array then servers as hints to the job scheduler, it will try to land tasks local to where splits determine that their block locations are.

But ultimately, the schedulers job is to process splits, not blocks. So, splits can be smaller than, or larger than, file system blocks. If its larger, then there is a high likliehood that some portion of the split will be streamed over the network. If its alot smaller, then you might get more locality, but possibly at cost of having more overall # of tasks.

When optimizing, remember that each input split is ultimately what is fed to the mappers.

In HDFS, the defaults tend to be better tuned than other file systems.

By implementing more fine grained blocking (getBlockLocations) in your hadoop compatible file system, you can increase the amount of blocks, and replication of those blocks also.

Increasing # of blocks can have an effect of allowing higher probability that a particular block will be able to run in a local context.

Also, you can toggle # of input splits (maximum and minimum) as part of the mapreduce job parameters at runtime. By updating this value, you might increase performance (i.e. use of machines) but you also might decrease locality (more splits mean that, if some machines are inherently faster, mapreduce could stream a split over to a non-local machine which could snatch up a lot of tasks.)

Batha answered 12/8, 2014 at 19:36 Comment(1)
I'm utilizing the Panasas filesystem (PanFS) so I don't require replication and there is little notion of data locality (i.e., compute nodes have no local disk). I haven't found a plugin and ideally would like to avoid writing one at the moment. I've been able to read files fine, but the number of MapReduce tasks is not scaling with the input file splits (e.g., ~30 tasks with capacity for ~80 handling 2000 file splits). Are there parameters I can tune manually on the default filesystem interface in Hadoop to let it scale?Engelbert

© 2022 - 2024 — McMap. All rights reserved.