How do I log from my Python Spark script
Asked Answered
P

8

70

I have a Python Spark program which I run with spark-submit. I want to put logging statements in it.

logging.info("This is an informative message.")
logging.debug("This is a debug message.")

I want to use the same logger that Spark is using so that the log messages come out in the same format and the level is controlled by the same configuration files. How do I do this?

I've tried putting the logging statements in the code and starting out with a logging.getLogger(). In both cases I see Spark's log messages but not mine. I've been looking at the Python logging documentation, but haven't been able to figure it out from there.

Not sure if this is something specific to scripts submitted to Spark or just me not understanding how logging works.

Phototaxis answered 20/8, 2014 at 14:37 Comment(1)
You probably don't see your logging statements because default logging level is WARNING, so when you are trying to INFO or DEBUG you are filtered out.Lili
P
77

You can get the logger from the SparkContext object:

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
Proserpina answered 8/1, 2016 at 18:23 Comment(6)
I get the issue: logger = logging.getLogger('py4j') TypeError: 'JavaPackage' object is not callableMusselman
This is definitely allowing me to log like Spark does (thanks!). Is there a way to get this logger other than from the SparkContext? I have some logs that have to be printed before my SparkContext is createdLipid
@Lipid Before spark context is created you don't have access to spark logging.Sheilahshekel
I got an error trying to use this idea in PySpark. What I did was try to store the logger as a global, then when that didn't work try to store the context itself as a global. My use case is being able to do logging calls on my executors inside a foreach function (where it doesn't have the spark context). "Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063."Neckband
I got this to work but can't figure out where the logs are stored, can someone help me with thatWilmott
@shreyansh, I implemented this and the messages appeared on the Console. I use the Bitnami Docker installation of Spark.Cubiculum
D
25

You need to get the logger for spark itself, by default getLogger() will return the logger for you own module. Try something like:

logger = logging.getLogger('py4j')
logger.info("My test info statement")

It might also be 'pyspark' instead of 'py4j'.

In case the function that you use in your spark program (and which does some logging) is defined in the same module as the main function it will give some serialization error.

This is explained here and an example by the same person is given here

I also tested this on spark 1.3.1

EDIT:

To change logging from STDERR to STDOUT you will have to remove the current StreamHandler and add a new one.

Find the existing Stream Handler (This line can be removed when finished)

print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]

There will probably only be a single one, but if not you will have to update position.

logger.removeHandler(logger.handlers[0])

Add new handler for sys.stdout

import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
Declamation answered 20/8, 2014 at 15:7 Comment(7)
Do I have to pass that logger object as a parameter to all my components that use it? Is there some way to set it globally?Phototaxis
As long as you are not doing threading or multiprocessing you should be able to just set it at the top of your module and use it wherever. Just change logging. to logger. anytime you are about to log something.Declamation
Thanks. It works this way. But the message always goes to stderr. How can we direct to stdout or stderr?Affricate
I updated my answer to address that for you. There may be a way to update an existing StreamHandler, I am not sure, but above is how I know how to do it.Declamation
I'm tempted to downvote this answer because it doesn't work for me. Looking through the pyspark source, pyspark never configures the py4j logger, and py4j uses java.utils.logging instead of the log4j logger that spark uses, so I'm skeptical that this would work at all. I think it's possible that this would work for code on the master node, but not anything running on the workers.Polymorphonuclear
This needs to be downvoted. py4j uses java.utils.logging instead of the log4j logger. I cannot get this to work given this approach.Western
This also doesn't work on pyspark shell client mode(spark 2.4 ). Need get log from sc._jvm.org.apache.log4jLobe
C
8

We needed to log from the executors, not from the driver node. So we did the following:

  1. We created a /etc/rsyslog.d/spark.conf on all of the nodes (using a Bootstrap method with Amazon Elastic Map Reduceso that the Core nodes forwarded sysloglocal1` messages to the master node.

  2. On the Master node, we enabled the UDP and TCP syslog listeners, and we set it up so that all local messages got logged to /var/log/local1.log.

  3. We created a Python logging module Syslog logger in our map function.

  4. Now we can log with logging.info(). ...

One of the things we discovered is that the same partition is being processed simultaneously on multiple executors. Apparently Spark does this all the time, when it has extra resources. This handles the case when an executor is mysteriously delayed or fails.

Logging in the map functions has taught us a lot about how Spark works.

Chuu answered 23/6, 2018 at 19:23 Comment(4)
Did you implement any custom logging within your code, like job-specific timestamps etc. Does this approach also work to output spark logs from all worker nodes/executors.Jackqueline
Hello. I am interested in using this as well! I just wanted to ask you some questions. 1. How did you accomplish step 2. ? 2. Where are the executor logs going to? S3? Or are they aggregated in the stdout, along with the other default logs?Cryptic
@J.Snow — we are using standard syslog logging. See linux.die.net/man/5/syslog.conf for example. Executor logs are sent to the local1 syslog facility, which is then sent to the head-end via UDP.Chuu
@Chuu sorry, I am not an expert in that matter. Maybe I can be more specific. Did you need to open syslog TCP and UDP ports? When you say "and we set it up so that all local messages got logged to /var/log/local1.log", do you mean you specified some properties in some config file to output the logs to /var/log/local1.log? I am sorry for all the questions, but I am not an expert in those matters. Can you share your bootstrap script for EMR?Cryptic
T
7

In my case, I am just happy to get my log messages added to the workers stderr, along with the usual spark log messages.

If that suits your needs, then the trick is to redirect the particular Python logger to stderr.

For example, the following, inspired from this answer, works fine for me:

def getlogger(name, level=logging.INFO):
    import logging
    import sys

    logger = logging.getLogger(name)
    logger.setLevel(level)
    if logger.handlers:
        # or else, as I found out, we keep adding handlers and duplicate messages
        pass
    else:
        ch = logging.StreamHandler(sys.stderr)
        ch.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger

Usage:

def tst_log():
    logger = getlogger('my-worker')
    logger.debug('a')
    logger.info('b')
    logger.warning('c')
    logger.error('d')
    logger.critical('e')
    ...

Output (plus a few surrounding lines for context):

17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
Turgent answered 3/5, 2017 at 3:41 Comment(2)
Is there a particular reason to import logging and import sys within the getlogger function?Parted
this does not work from within workers... seems to only work from driver ..Swanson
L
4
import logging

# Logger

logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)

Simplest way to log from pyspark !

Lochia answered 17/2, 2020 at 6:20 Comment(2)
Prints something only when doing logger.error()Fantail
It does not log the worker nodes when using a foreach for exampleWooster
C
3

The key of interacting pyspark and java log4j is the jvm. This below is python code, the conf is missing the url, but this is about logging.

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName("DB2_Test")
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
 .builder\
 .appName("DB2_Test")\
 .config(conf = myconf) \
 .getOrCreate()


Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(__name__)
mylogger.error("some error trace")
mylogger.info("some info trace")
Columbous answered 25/11, 2017 at 20:23 Comment(0)
S
2

You can implement the logging.Handler interface in a class that forwards log messages to log4j under Spark. Then use logging.root.addHandler() (and, optionally, logging.root.removeHandler()) to install that handler.

The handler should have a method like the following:

def emit(self, record):
    """Forward a log message for log4j."""
    Logger = self.spark_session._jvm.org.apache.log4j.Logger
    logger = Logger.getLogger(record.name)
    if record.levelno >= logging.CRITICAL:
        # Fatal and critical seem about the same.
        logger.fatal(record.getMessage())
    elif record.levelno >= logging.ERROR:
        logger.error(record.getMessage())
    elif record.levelno >= logging.WARNING:
        logger.warn(record.getMessage())
    elif record.levelno >= logging.INFO:
        logger.info(record.getMessage())
    elif record.levelno >= logging.DEBUG:
        logger.debug(record.getMessage())
    else:
        pass

Installing the handler should go immediately after you initialise your Spark session:

spark = SparkSession.builder.appName("Logging Example").getOrCreate()
handler = CustomHandler(spark_session)
# Replace the default handlers with the log4j forwarder.
root_handlers = logging.root.handlers[:]
for h in self.root_handlers:
    logging.root.removeHandler(h)
logging.root.addHandler(handler)

# Now you can log stuff.
logging.debug("Installed log4j log handler.")

There's a more complete example here: https://gist.github.com/thsutton/65f0ec3cf132495ef91dc22b9bc38aec

Sharpnosed answered 11/1, 2021 at 0:56 Comment(0)
S
0

You need to make spark log is reachable for driver and all executors so we have create logging class and deal it as job dependency and load it on each executors.

class Log4j:
  def __init__(this, spark_session):
    conf = spark_session.SparkContext.getConf()
    app_id = conf.get('spark.app.id')
    app_name = conf.get('spark.app.name')
    log4jlogger = spark_session._jvm.org.apache.log4j
    prefix_msg = '<'+app_id + ' : ' + app_name +'> '
    print(prefix_msg)
    self.logger = log4jlogger.logManager.getLogger(prefix_msg)
  def warn(this, msg):
    # log warning 
    self.logger.warn(msg)
  def error(this, msg):
    #log error
    self.logger.error(msg)
  def info(this, msg):
    # log information message
    self.logger.info(msg)
Szabo answered 3/1, 2023 at 14:36 Comment(2)
How you have executor nodes having the spark_session?Wooster
@LuisSimoes driver is an intro point for all executors so spark drive shared among all executor nodes.Szabo

© 2022 - 2024 — McMap. All rights reserved.