python luigi localTarget pickle
Asked Answered
L

2

6

I am running on Windows 7, Python 2.7 via Anaconda 4.3.17, Luigi 2.4.0, Pandas 0.18, sklearn version 0.18. Per below, I am trying to have a luigi.LocalTarget output be a pickle to store a few different objects (using firstJob) and then read from that pickle in a dependent job (secondJob). firstJob completes successfully if I run the following from the command line:

"python -m luigi --module luigiPickle firstJob --date 2017-06-07 --local-scheduler"

However, if I try running secondJob i.e.,

"python -m luigi --module luigiPickle secondJob --date 2017-06-07 --local-scheduler"

I get

Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 191, in run
    new_deps = self._run_get_new_deps()
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 129, in _run_get_new_deps
    task_gen = self.task.run()
  File "luigiPickle.py", line 41, in run
    ret2 = pickle.load(inFile)
  File "C:\Anaconda2\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "C:\Anaconda2\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "C:\Anaconda2\lib\pickle.py", line 1096, in load_global
    klass = self.find_class(module, name)
  File "C:\Anaconda2\lib\pickle.py", line 1130, in find_class
    __import__(module)
ImportError: No module named frame

It appears that luigi is having trouble reading the pickle due to not recognizing the pandas.DataFrame() object (perhaps a scope issue?).

import luigi
import pandas as pd
import pickle
from sklearn.linear_model import LinearRegression

class firstJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget('%s_first.pickle' % self.date)

    def run(self):
        ret = {}
        ret['a'] = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
        ret['b'] = pd.DataFrame({'a': [3, 4], 'd': [0, 0]})
        ret['c'] = LinearRegression()
        outFile = self.output().open('wb')
        pickle.dump(ret, outFile, protocol=pickle.HIGHEST_PROTOCOL)
        outFile.close()

class secondJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def output(self):
        return luigi.LocalTarget('%s_second.pickle' % self.date)

    def run(self):
        inFile = self.input().open('rb')
        ret2 = pickle.load(inFile)
        inFile.close()

if __name__ == '__main__':
    luigi.run()
Lucie answered 7/6, 2017 at 15:46 Comment(0)
G
9

The luigi open command doesn't work with the b flag for binary- it strips it out of the options string. (not sure why). Better to just use standard open with the path attribute:

open(self.input().path, 'rb') and open(self.output().path, 'wb').

Guadalajara answered 7/6, 2017 at 17:11 Comment(3)
Thanks. I changed outFile = self.output().open('wb') to outFile = open(self.output().path, 'w') and inFile = self.input().open('rb') to inFile=open(self.input().path, 'r'). Running secondJob now gives File "C:\Anaconda2\lib\pickle.py", line 1384, in load return Unpickler(file).load() File "C:\Anaconda2\lib\pickle.py", line 864, in load dispatch[key](self) File "C:\Anaconda2\lib\pickle.py", line 1175, in load_binput i = ord(self.read(1)) TypeError: ord() expected a character, but string of length 0 foundLucie
I think you actually need the 'b' in there in the outfile, so do something like `outFile = open(self.output().path, 'wb')Guadalajara
Ah ha, this works. Thank you! To clarify for anyone else, replacing outFile = self.output().open('wb') to outFile = open(self.output().path, 'wb') and inFile = self.input().open('rb') with inFile=open(self.input().path, 'rb') resolved the issue.Lucie
M
0

d6tflow solves this, see example for sklearn model pickle which answers this question. Plus you don't need to write all that boilerplate code.

import d6tflow

class firstJob(d6tflow.tasks.TaskPickle):
    def run(self):
        # your code
        self.save(ret)

class secondJob(TaskClass):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def run(self):
        inFile = self.input().load()
        # use inFile

d6tflow.run([secondJob])
Melody answered 20/1, 2019 at 4:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.