No module named unusual_prefix_*
Asked Answered
S

3

7

I tried to run the Python Operator Example in my Airflow installation. The installation has deployed webserver, scheduler and worker on the same machine and runs with no complaints for all non-PytohnOperator tasks. The task fails, complaining that the module "unusual_prefix_*" could not be imported, where * is the name of the file containing the DAG.

The full stacktrace:

['/usr/bin/airflow', 'run', 'tutorialpy', 'print_the_context', '2016-08-23T10:00:00', '--pickle', '90', '--local']
[2016-08-29 11:35:20,054] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-08-29 11:35:20,175] {configuration.py:508} WARNING - section/key [core/fernet_key] not found in config
Traceback (most recent call last):
  File "/usr/bin/airflow", line 90, in <module>
    args.func(args)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/airflow/bin/cli.py", line 214, in run
    DagPickle).filter(DagPickle.id == args.pickle).first()
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2659, in first
    ret = list(self[0:1])
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2457, in __getitem__
    return list(res)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 86, in instances
    util.raise_from_cause(err)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 71, in instances
    rows = [proc(row) for row in fetch]
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 428, in _instance
    loaded_instance, populate_existing, populators)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
    dict_[key] = getter(row)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1253, in process
    return loads(value)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 260, in loads
    return load(file)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 250, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 406, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named unusual_prefix_tutorialpy
[2016-08-29 11:35:20,498: ERROR/Worker-1] Command 'airflow run tutorialpy print_the_context 2016-08-23T10:00:00 --pickle 90 --local ' returned non-zero exit status 1
[2016-08-29 11:35:20,502: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[01152b52-044e-4361-888c-ef2d45983c60] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 45, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed
Skald answered 30/8, 2016 at 7:34 Comment(3)
The pickled file referenced by the configuration likely contain a reference to this item from a custom module/class that is not available on this system. Figure out where/what this file is and try to restart your stack without it and see if the issue goes away.Arabelle
@metatoaster: I guess you meant the line regarding WARNING - section/key. This one isn't a problem, I added the necessary config and it did not change anything. However, doing so I found at least a temporary workaround. Will post this as an answer.Skald
no, not that, but that specifically that airflow for whatever reason had picked a object that is implemented by a module/class that does not exist. This is also why donot_pickle=True will work because the loading of the pickle file is the problem - it contains an object that your installation of Python cannot find the module for (see the whole stack trace, especially towards the end).Arabelle
S
3

Using donot_pickle=True option (see Documentation) I got rid of the error. Probably not the best solution, but good enough for my scenario.

Skald answered 30/8, 2016 at 8:32 Comment(0)
T
0

I also encountered this problem when running puckel/docker-airlfow on kubernetnes.

trun donot_pickle True or False don't work for me.

After some experiments I get rid of this problem by remove -p parameter from the paramters airlfow schduler/worker used to start process

airflow scheduler -n 5 -p -> airflow scheduler -n 5

airflow worker -p -> airflow worker

Tymes answered 8/3, 2020 at 12:46 Comment(0)
B
0

This may happen because you have a custom import like this:

import tutorialpy # or something similar

Which causes this code to run and it adds unusual_prefix_:

def _load_modules_from_file(self, filepath, safe_mode):
    if not might_contain_dag(filepath, safe_mode):
        # Don't want to spam user with skip messages
        if not self.has_logged:
            self.has_logged = True
            self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
        return []

self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
path_hash = hashlib.sha1(filepath.encode('utf-8')).hexdigest()
mod_name = f'unusual_prefix_{path_hash}_{org_mod_name}'

if mod_name in sys.modules:
    del sys.modules[mod_name]

def parse(mod_name, filepath):
    try:
        loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
        spec = importlib.util.spec_from_loader(mod_name, loader)
...
Believe answered 27/6, 2023 at 0:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.