How to overwrite/reuse the existing output path for Hadoop jobs again and agian
Asked Answered
G

10

22

I want to overwrite/reuse the existing output directory when I run my Hadoop job daily. Actually the output directory will store summarized output of each day's job run results. If I specify the same output directory it gives the error "output directory already exists".

How to bypass this validation?

Glutamine answered 10/10, 2011 at 13:15 Comment(0)
P
19

What about deleting the directory before you run the job?

You can do this via shell:

hadoop fs -rmr /path/to/your/output/

or via the Java API:

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
P answered 10/10, 2011 at 13:53 Comment(2)
Thanks Thomas for reply. The Only problem I have is I don't want to delete existing output directory. Every day when I will run my job I want its output to be merged with existing output(old). The solution I am thinking of is to generate/store output of daily job into some temp directory and copy paste that temp folder structure into the old output directory by some script.Glutamine
Then you have to use a folder which you can copy all the job outputs to. This can be done with a shellscript, too, There is no built-in solution for this problem.P
T
12

Jungblut's answer is your direct solution. Since I never trust automated processes to delete stuff (me personally), I'll suggest an alternative:

Instead of trying to overwrite, I suggest you make the output name of your job dynamic, including the time in which it ran.

Something like "/path/to/your/output-2011-10-09-23-04/". This way you can keep around your old job output in case you ever need to revisit in. In my system, which runs 10+ daily jobs, we structure the output to be: /output/job1/2011/10/09/job1out/part-r-xxxxx, /output/job1/2011/10/10/job1out/part-r-xxxxx, etc.

Terzetto answered 10/10, 2011 at 14:10 Comment(3)
+1 that is the data keeping approach. But make sure that there is a garbage collection daemon that collects all outdated directories. Otherwise your HDFS will overflow ;)).P
Indeed! We typically delete stuff after a week.Terzetto
Thanks for the reply. The Only problem I have is - I don't want to delete existing output directory. Every day when I will run my job I want its output to be merged with existing output(old). The solution I am thinking of is to generate/store output of daily job into some temp directory and copy paste that temp folder structure into the old output directory by some script. I have folder structure on s3 like "Campaign_Id/Year/Month/day"Glutamine
L
5

Hadoop's TextInputFormat (which I guess you are using) does not allow overwriting an existing directory. Probably to excuse you the pain of finding out you mistakenly deleted something you (and your cluster) worked very hard on.

However, If you are certain you want your output folder to be overwritten by the job, I believe the cleanest way is to change TextOutputFormat a little like this:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

Now you are creating the FSDataOutputStream (fs.create(file, true)) with overwrite=true.

Lundberg answered 5/8, 2014 at 7:55 Comment(5)
This proposed solution is not only conceptually problematic, but also won't work without further changes -- a check in FileOutputFormat ensures on job initialization that the output directory does not exist.Omentum
I tried offering a solution in addition to the obvious "delete the folder before the job" one. With a word of caution. Is this always recommended? probably not, is it an interesting option that's worth mentioning, IMO, yes. And it worked in the past, not sure about it working now though.Lundberg
I think answer-with-caution becomes more appropriate with the question sophistication. For an obviously newbie question it's IMHO better to avoid pointing in the wrong direction.Omentum
And re: working in the past -- maybe mapred vs mapreduce APIs? My comment is regarding the latter.Omentum
see this answer for a simpler alternativeWestering
O
1

Hadoop already supports the effect you seem to be trying to achieve by allowing multiple input paths to a job. Instead of trying to have a single directory of files to which you add more files, have a directory of directories to which you add new directories. To use the aggregate result as input, simply specify the input glob as a wildcard over the subdirectories (e.g., my-aggregate-output/*). To "append" new data to the aggregate as output, simply specify a new unique subdirectory of the aggregate as the output directory, generally using a timestamp or some sequence number derived from your input data (e.g. my-aggregate-output/20140415154424).

Omentum answered 15/4, 2015 at 15:45 Comment(0)
F
1

If one is loading the input file (with e.g., appended entries) from the local file system to hadoop distributed file system as such:

hdfs dfs -put  /mylocalfile /user/cloudera/purchase

Then one could also overwrite/reuse the existing output directory with -f. No need to delete or re-create the folder

hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase
Furnace answered 16/10, 2018 at 12:3 Comment(0)
C
1

Hadoop follows the philosophy Write Once, Read Many times. Thus when you try to write to the directory again, it assumes it has to make a new one (Write once) but it already exists, and so it complains. You can delete it via hadoop fs -rmr /path/to/your/output/. It's better to create a dynamic directory (eg,based on timestamp or hash value) in order to preserve data.

Centimeter answered 28/1, 2019 at 11:16 Comment(0)
F
0

You can create an output subdirectory for each execution by time. For example lets say you are expecting output directory from user and then set it as follows:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

Change this by the following lines:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
Fourwheeler answered 5/8, 2016 at 21:59 Comment(0)
R
0

I had a similar use case, I use MultipleOutputs to resolve this.

For example, if I want different MapReduce jobs to write to the same directory /outputDir/. Job 1 writes to /outputDir/job1-part1.txt, job 2 writes to /outputDir/job1-part2.txt (without deleting exiting files).

In the main, set the output directory to a random one (it can be deleted before a new job runs)

FileInputFormat.addInputPath(job, new Path("/randomPath"));

In the reducer/mapper, use MultipleOutputs and set the writer to write to the desired directory:

public void setup(Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
}

and:

mos.write(key, value, "/outputDir/fileOfJobX.txt")

However, my use case was a bit complicated than that. If it's just to write to the same flat directory, you can write to a different directory and runs a script to migrate the files, like: hadoop fs -mv /tmp/* /outputDir

In my use case, each MapReduce job writes to different sub-directories based on the value of the message being writing. The directory structure can be multi-layered like:

/outputDir/
    messageTypeA/
        messageSubTypeA1/
            job1Output/
                job1-part1.txt
                job1-part2.txt
                ...
            job2Output/
                job2-part1.txt
                ...

        messageSubTypeA2/
        ...
    messageTypeB/
    ...

Each Mapreduce job can write to thousands of sub-directories. And the cost of writing to a tmp dir and moving each files to the correct directory is high.

Relative answered 7/9, 2018 at 4:27 Comment(0)
W
0

I encountered this exact problem, it stems from the exception raised in checkOutputSpecs in the class FileOutputFormat. In my case, I wanted to have many jobs adding files to directories that already exist and I guaranteed that the files would have unique names.

I solved it by creating an output format class which overrides only the checkOutputSpecs method and suffocates (ignores) the FileAlreadyExistsException that's thrown where it checks if the directory already exists.

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public void checkOutputSpecs(JobContext job) throws IOException {
        try {
            super.checkOutputSpecs(job);
        }catch (FileAlreadyExistsException ignored){
            // Suffocate the exception
        }
    }
}

And the in the job configuration, I used LazyOutputFormat and also MultipleOutputs.

LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
Westering answered 16/7, 2019 at 0:39 Comment(0)
M
0

you need to add the setting in your main class:

//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);
Magen answered 20/4, 2020 at 2:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.