Accessing stream output from hdfs of MRjob
Asked Answered
L

1

31

I'm trying to use a Python driver to run an iterative MRjob program. The exit criteria depend on a counter.

The job itself seems to run. If I run a single iteration from the command line, I can then hadoop fs -cat /user/myname/myhdfsdir/part-00000 and see expected results from the single iteration.

However, I need to use a Python driver to run the code and access counters from the runner. This is because it is an iterative algorithm that requires the value of the counter to determine the exit criteria.

OUTPUT_PATH = /user/myname/myhdfsdir
!hadoop fs -rm -r {OUTPUT_PATH}

from my_custom_MRjob import my_custom_MRjob

mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt", 
                                     "-r", "hadoop", 
                                     "--output-dir=hdfs://"+OUTPUT_PATH, 
                                     "--no-output"])

while True:
    with mr_job.make_runner() as runner:
        print runner.get_opts()
        runner.run()
        with open('localDir/localTextFile.txt', 'w') as f:
            for line in runner.stream_output():
                key,value =  mr_job.parse_output_line(line)
                #
                f.write(key +'\t'+ value +'\n')
        print "End of MRjob iteration. Counters: {}".format(runner.counters())
        # read a particular counter
        # use counter value to evaluate exit criteria
        if exit_criteria_met:
            break

This produces the following error:

IOErrorTraceback (most recent call last)
<ipython-input-136-aded8ecaa727> in <module>()
     25         runner.run()
     26         with open('localDir/localTextFile.txt', 'w') as f:
---> 27             for line in runner.stream_output():
     28                 key,value =  mr_job.parse_output_line(line)
     29                 #

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/util.pyc in _to_lines(chunks)
    391     leftovers = []
    392 
--> 393     for chunk in chunks:
    394         # special case for b'' standing for EOF
    395         if chunk == b'':

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/runner.pyc in cat_output(self)
    555                 yield b''  # EOF of previous file
    556 
--> 557             for chunk in self.fs._cat_file(filename):
    558                 yield chunk
    559 

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/composite.pyc in _cat_file(self, path)
     75 
     76     def _cat_file(self, path):
---> 77         for line in self._do_action('_cat_file', path):
     78             yield line
     79 

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/hadoop.pyc in _cat_file(self, filename)
    272 
    273         if returncode != 0:
--> 274             raise IOError("Could not stream %s" % filename)
    275 
    276     def mkdir(self, path):

IOError: Could not stream hdfs://hdfs:/user/myname/myhdfsdir/part-00000

What is especially baffling and frustrating is this: hdfs://hdfs:/user/myname/myhdfsdir/part-00000. Note the presence of two hdfs schemes in the URL but with only one forward slash in the second instance of hdfs. I have tried adding and removing the literal hdfs:// at this in the mrjob args: "--output-dir=hdfs://"+OUTPUT_PATH. I get the same error signature in both cases.

If I run the driver in "local" mode instead of Hadoop, I have no problem, with the obvious and critical exception that I do not have access to the Hadoop engine. This works fine:

mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt"])

I need to read in the initial input file, always from the local filesystem (even in Hadoop mode). Then run the MRjob iteration, with its output overwriting the local filesystem input file. Then access the counters from the runner and evaluate exit criteria. If exit criteria are not met, run the job again with input from the local filesystem, this time with that local input file updated from the previous run.

Laterite answered 25/3, 2018 at 4:10 Comment(2)
I've tried using pythonhosted.org/mrjob/…. Still same problem. "--output-dir="+fully_qualify_hdfs_path(OUTPUT_PATH)Laterite
Tried replacing runner.stream_output() with mrjob.util.to_lines(runner.cat_output()) (per deprecation note), but again saw the same problem and same stacktrace.Laterite
E
1

As long as you have a path that contains hdfs:/ you will not succeed as that is never going to be valid.

In the comments you mentioned that you tried to add hdfs:// manually, which may be a nice hack, but in your code I don't see you 'clean up' the wrong hdfs:/. So even if you add the right prefix, the next thing in line will be the wrong one, and the code still has no chance to succeed.

So, please clean it up.


Practical note: This question is from a while ago, if there is a problem in the software itself that is likely resolved by now. If the problem persists, it is likely something odd in the code that you try to use. Perhaps start with a trivial example from a reliable source to exclude this possibility.

Edmead answered 31/7, 2020 at 9:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.