Luigi - Unfulfilled %s at run time
Asked Answered
A

4

20

I am trying to learn in a very simple way how luigi works. Just as a newbie I came up with this code

import luigi

class class1(luigi.Task):

  def requires(self):
     return class2()

  def output(self):
    return luigi.LocalTarget('class1.txt')

 def run(self):
    print 'IN class A'


class class2(luigi.Task): 

  def requires(self):
     return []

  def output(self):
     return luigi.LocalTarget('class2.txt')


if __name__ == '__main__':
  luigi.run()

Running this in command prompt gives error saying

raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ',', '.join(missing)))      

which is:

RuntimeError: Unfulfilled dependency at run time: class2__99914b932b  
   
Alegar answered 30/3, 2017 at 1:58 Comment(1)
what command are you using to run it? your run() methods need to create the output files referenced in the LocalTarget in order for the dependencies to be met.Andeee
U
23

This happens because you define an output for class2 but never create it.

Let's break it down...

When running

python file.py class2 --local-scheduler

luigi will ask:

  • is the output of class2 already on disk? NO
  • check dependencies of class2: NONE
  • execute the run method (by default it's and empty method pass)
  • run method didn't return errors, so job finishes successfully.

However, when running

python file.py class1 --local-scheduler

luigi will:

  • is the output of class1 already on disk? NO
  • check task dependencies: YES: class2
  • pause to check status of class2
    • is the output of class2 on disk? NO
    • run class2 -> running -> done without errors
    • is the output of class2 on disk? NO -> raise error

luigi never runs a task unless all of its previous dependencies are met. (i.e. their output is on the file system)

Unsociable answered 10/4, 2017 at 15:3 Comment(4)
Good explanation, but what is the solution?Penton
What happens if class2 has no output or run function, just requires?Penton
@Penton solution is that "class2.run()" should create the output on disk.Unsociable
If class2 is a top-level task depending on other tasks, you can simply override the class2.complete() method to always return True (or to check "required" tasks that they are all completed.Unsociable
G
1

This error comes because if you get the output that will never create. ex. if the output folder create by timestamp. timestamp change in every second so it will never be the same. so the error could be come.

Gretta answered 20/2, 2021 at 7:27 Comment(0)
J
1

I had the same error, but I still haven't found it

class data_ingestion(luigi.Task):

def run(self):
    data = pd.read_csv('F:\Mega\MEGAsync\VS Code\winequality-red.csv', sep=';')
    data.to_csv(self.output().path, index=False)

def output(self):
    return luigi.LocalTarget('WineQuality.csv')

class data_prep(luigi.Task):

def requires(self):
    return data_ingestion()

def output(self):
    return [luigi.LocalTarget('Train.csv'), luigi.LocalTarget('Val.csv')]

def run(self):
    data = pd.read_csv('WineQuality.csv') # Lendo de um csv
    logger.info('\n Leitura rápida nos dados')
    data.head()

    column_target = 'quality' # Variável que se deseja prever
    columns_features = data.drop([column_target], axis=1)

    logger.info(f'=== Variável a ser predita: {column_target}')
    logger.info(f'=== Características disponíveis: {columns_features}')

    logger.info('Divisão do dataset em TREINO e TESTE (Validação)')
    data_train, data_val = train_test_split(data, test_size=0.2, stratify=data[column_target], random_state=1)
    
    logger.info(f"Salvando Train File")
    data_train.to_csv(self.output()[0].path, index=False)

    logger.info(f"Salvando Val File")
    data_val.to_csv(self.output()[1].path, index=False)
    

class training(luigi.Task):

def requires(self):
    return data_prep()

def output(self):
    return luigi.LocalTarget('joblibe_file') 

def run(self):

    data_train = pd.read_csv(self.input()[0].path)

    column_target = 'quality' # Variável que se deseja prever
    data_features = data_train.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_train = data_train[columns_features].values
    Y_train = data_train[column_target].values

    model = DecisionTreeRegressor() # Não implementei nenhum parâmetro pois preciso estudar certinho isso
    model.fit(X_train, Y_train)

    # Salvando o arquivo em um diretório de trabalho
    joblib_file = "joblib_model.pkl"
    joblib.dump(model, joblib_file)

    

class validation(luigi.Task):

def requires(self):
    return training()

def output(self):
    return luigi.LocalTarget('Metrics.csv')

def run(self):
    data_val = pd.read_csv(self.input()[1].path)
    
    column_target = 'quality' # Variável que se deseja prever
    data_features = data_val.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_val = data_val[columns_features].values
    Y_val = data_val[column_target].values
    
    # Importando o modelo salvo no treinamento
    joblib_model = joblib.load(self.input()[0].path) 

    y_val_predict = joblib_model.predict(X_val)
    score = joblib_model.score(X_val, Y_val)

    logger.info('=== Variáveis Preditas')
    logger.info(y_val_predict)
    logger.info('=== Acurácia')
    logger.info('{:.2f} %'.format(score))


    dict = {'Predições': [y_val_predict],
          'score': [score]} 

    df = pd.DataFrame(dict)

    logger.info(f"Salvando Em arquivo CSV para TESTE")
    df.to_csv(self.output()[0].path, index=False)
   
    # salvar várias métricas em um df e exportar

if name == 'main': luigi.run()

Jed answered 31/5, 2022 at 22:16 Comment(1)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Pasteurizer
P
0

I am also a beginner at luigi. Thanks for pointing out this kind of errors.

Following, the previous answer I managed to solve it adding to class2

def run(self):
     _out = self.output().open('w')
     _out.write(u"Hello World!\n")
     _out.close()
     print('in class B')
Penton answered 18/9, 2020 at 5:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.