I'm trying to use a Python driver to run an iterative MRjob program. The exit criteria depend on a counter.
The job itself seems to run. If I run a single iteration from the command line, I can then hadoop fs -cat /user/myname/myhdfsdir/part-00000
and see expected results from the single iteration.
However, I need to use a Python driver to run the code and access counters from the runner
. This is because it is an iterative algorithm that requires the value of the counter to determine the exit criteria.
OUTPUT_PATH = /user/myname/myhdfsdir
!hadoop fs -rm -r {OUTPUT_PATH}
from my_custom_MRjob import my_custom_MRjob
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt",
"-r", "hadoop",
"--output-dir=hdfs://"+OUTPUT_PATH,
"--no-output"])
while True:
with mr_job.make_runner() as runner:
print runner.get_opts()
runner.run()
with open('localDir/localTextFile.txt', 'w') as f:
for line in runner.stream_output():
key,value = mr_job.parse_output_line(line)
#
f.write(key +'\t'+ value +'\n')
print "End of MRjob iteration. Counters: {}".format(runner.counters())
# read a particular counter
# use counter value to evaluate exit criteria
if exit_criteria_met:
break
This produces the following error:
IOErrorTraceback (most recent call last)
<ipython-input-136-aded8ecaa727> in <module>()
25 runner.run()
26 with open('localDir/localTextFile.txt', 'w') as f:
---> 27 for line in runner.stream_output():
28 key,value = mr_job.parse_output_line(line)
29 #
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/util.pyc in _to_lines(chunks)
391 leftovers = []
392
--> 393 for chunk in chunks:
394 # special case for b'' standing for EOF
395 if chunk == b'':
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/runner.pyc in cat_output(self)
555 yield b'' # EOF of previous file
556
--> 557 for chunk in self.fs._cat_file(filename):
558 yield chunk
559
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/composite.pyc in _cat_file(self, path)
75
76 def _cat_file(self, path):
---> 77 for line in self._do_action('_cat_file', path):
78 yield line
79
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/hadoop.pyc in _cat_file(self, filename)
272
273 if returncode != 0:
--> 274 raise IOError("Could not stream %s" % filename)
275
276 def mkdir(self, path):
IOError: Could not stream hdfs://hdfs:/user/myname/myhdfsdir/part-00000
What is especially baffling and frustrating is this: hdfs://hdfs:/user/myname/myhdfsdir/part-00000
. Note the presence of two hdfs
schemes in the URL but with only one forward slash in the second instance of hdfs. I have tried adding and removing the literal hdfs://
at this in the mrjob args: "--output-dir=hdfs://"+OUTPUT_PATH
. I get the same error signature in both cases.
If I run the driver in "local" mode instead of Hadoop, I have no problem, with the obvious and critical exception that I do not have access to the Hadoop engine. This works fine:
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt"])
I need to read in the initial input file, always from the local filesystem (even in Hadoop mode). Then run the MRjob iteration, with its output overwriting the local filesystem input file. Then access the counters from the runner and evaluate exit criteria. If exit criteria are not met, run the job again with input from the local filesystem, this time with that local input file updated from the previous run.
"--output-dir="+fully_qualify_hdfs_path(OUTPUT_PATH)
– Lateriterunner.stream_output()
withmrjob.util.to_lines(runner.cat_output())
(per deprecation note), but again saw the same problem and same stacktrace. – Laterite