FastAPI gunicon uvicorn access_log format customization
Asked Answered
T

3

14

We are using the https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker FastAPI and were able to customize our logging with a gunicorn logging file.

However, we are not able to change the details of the %(message)s attribute as defined in the documentation access log - https://docs.gunicorn.org/en/stable/settings.html#accesslog.

We receive an error postet below, that the keys are unknown. A similar question has been asked before and received many upvotes. gunicorn log-config access_log_format

What are we doing wrong?

#start.sh
# Start Gunicorn
exec gunicorn -k uvicorn.workers.UvicornWorker -c "$GUNICORN_CONF" "$APP_MODULE" --log-config "/logging.conf"
[loggers]
keys=root, gunicorn.error, gunicorn.access,uvicorn.error,uvicorn.access

[handlers]
keys=console, error_file, access_file, access_filegunicorn

[formatters]
keys=generic, access, accessgunicorn

[logger_root]
level=INFO
handlers=console
propagate=1

[logger_gunicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=gunicorn.error

[logger_gunicorn.access]
level=INFO
handlers=access_filegunicorn
propagate=0
qualname=gunicorn.access

[logger_uvicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=uvicorn.error

[logger_uvicorn.access]
level=INFO
handlers=access_file
propagate=0
qualname=uvicorn.access

[handler_console]
class=StreamHandler
formatter=generic
args=(sys.stdout, )

[handler_error_file]
class=StreamHandler
formatter=generic
args=(sys.stdout, )

[handler_access_file]
class=StreamHandler
formatter=access
args=(sys.stdout, )

[handler_access_filegunicorn]
class=StreamHandler
formatter=accessgunicorn
args=(sys.stdout, )

[formatter_generic]
format=[%(levelname)s]: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=logging.Formatter

[formatter_access]
format=[%(levelname)s]: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=logging.Formatter

[formatter_accessgunicorn]
format=[%(levelname)s]: '{"remote_ip":"%(h)s","session_id":"%({X-Session-Id}i)s","status":"%(s)s","request_method":"%(m)s","request_path":"%(U)s","request_querystring":"%(q)s","request_timetaken":"%(D)s","response_length":"%(B)s", "remote_addr": "%(h)s"}'
datefmt=%Y-%m-%dT%H:%M:%S
class=logging.Formatter

Message: '%s - "%s %s HTTP/%s" %d'
Arguments: ('213.3.14.24:53374', 'GET', '/v1/docs', '1.1', 200)
--- Logging error ---
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/logging/__init__.py", line 1025, in emit
    msg = self.format(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 869, in format
    return fmt.format(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 611, in format
    s = self.formatMessage(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 580, in formatMessage
    return self._style.format(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 422, in format
    return self._fmt % record.__dict__
KeyError: 'h'
Call stack:
  File "/usr/local/bin/gunicorn", line 8, in <module>
    sys.exit(run())
  File "/usr/local/lib/python3.7/site-packages/gunicorn/app/wsgiapp.py", line 58, in run
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/app/base.py", line 228, in run
    super().run()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/app/base.py", line 72, in run
    Arbiter(self).run()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/arbiter.py", line 202, in run
    self.manage_workers()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/arbiter.py", line 545, in manage_workers
    self.spawn_workers()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/arbiter.py", line 616, in spawn_workers
    self.spawn_worker()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/arbiter.py", line 583, in spawn_worker
    worker.init_process()
  File "/usr/local/lib/python3.7/site-packages/uvicorn/workers.py", line 61, in init_process
    super(UvicornWorker, self).init_process()
  File "/usr/local/lib/python3.7/site-packages/gunicorn/workers/base.py", line 140, in init_process
    self.run()
  File "/usr/local/lib/python3.7/site-packages/uvicorn/workers.py", line 70, in run
    loop.run_until_complete(server.serve(sockets=self.sockets))
  File "/usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py", line 385, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/usr/local/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/site-packages/fastapi/applications.py", line 171, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.7/site-packages/starlette/applications.py", line 102, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.7/site-packages/starlette/middleware/cors.py", line 78, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.7/site-packages/starlette/routing.py", line 550, in __call__
    await route.handle(scope, receive, send)
Tildy answered 14/7, 2020 at 12:14 Comment(8)
Can you explain what extra data you are trying to log?Presumably
its this line: format=[%(levelname)s]: '{"remote_ip":"%(h)s","session_id":"%({X-Session-Id}i)s","status":"%(s)s","request_method":"%(m)s","request_path":"%(U)s","request_querystring":"%(q)s","request_timetaken":"%(D)s","response_length":"%(B)s", "remote_addr": "%(h)s"}' , i think its self-explanatory and follows the gunicorn doucmentationTildy
@ArakkalAbu It seems that the docs.gunicorn.org/en/stable/settings.html#access-log-format setting does not work when set via --access-logformat STRING, so I tried to define it in a logging config file. However there are a lot of wrappers involved - see here github.com/aio-libs/aiohttp/issues/705Tildy
seems like gunicorn issuePresumably
@ArakkalAbu yes i also believe it is some kind of a bug. I even tried adding "access_log_format = ..." to the log config but this didnt helpTildy
Have you tried common python logger formats? instead of gunicorn's? Something like '%(asctime)s - %(message)s' ?Presumably
Did this approach end up working out?Soiree
@Tildy the gunicorn.access logger is only used by the Gunicorn worker classes. Uvicorn workers don't use it at all, instead they use uvicorn.access. So any Gunicorn config for access logging will have no effect by default. You need to configure the uvicorn.access logger instead, e.g. in one of the Gunicorn server hooks that runs in the worker upon startup.Cartridge
T
1

Our solution was a customer logger written in python that is referenced in a logging.conf file

logging.conf

[loggers]
keys=root, gunicorn.error, gunicorn.access,uvicorn.error,uvicorn.access

[handlers]
keys=console, error_file, access_file, accesscustom

[formatters]
keys=generic, access, AccessFormatter

[logger_root]
level=INFO
handlers=console
propagate=1

[logger_gunicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=gunicorn.error

[logger_gunicorn.access]
level=INFO
handlers=accesscustom
propagate=0
qualname=gunicorn.access

[logger_uvicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=uvicorn.error

[logger_uvicorn.access]
level=INFO
handlers=accesscustom
propagate=0
qualname=uvicorn.access

[handler_console]
class=StreamHandler
formatter=generic
args=(sys.stdout, )

[handler_error_file]
class=StreamHandler
formatter=generic
args=(sys.stdout, )

[handler_access_file]
class=StreamHandler
formatter=access
args=(sys.stdout, )

[handler_accesscustom]
class=StreamHandler
formatter=AccessFormatter
args=(sys.stdout, )

[formatter_generic]
format=%(levelname)s: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=uvicorn.logging.DefaultFormatter

[formatter_access]
format=%(levelname)s: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=customlogger.CustomFormatter

[formatter_AccessFormatter]
format={"event":"access_log","ip":"%(h)s","status":"%(s)s","method":"%(m)s","path":"%(U)s","referer":"%(f)s","x_session_id":"%(x-session-id)s","x_google_id":"%(x-google-id)s","x_server_time":"%(x-server-time)s","agent":"%(a)s"}
datefmt=%Y-%m-%dT%H:%M:%S
class=customlogger.CustomFormatter

customlogger.py


import base64
import binascii
import http
import logging
import os
import sys
import time
from copy import copy
from datetime import datetime
from pprint import pprint
import click

TRACE_LOG_LEVEL = 5


class ColourizedFormatter(logging.Formatter):
    """
    A custom log formatter class that:
    * Outputs the LOG_LEVEL with an appropriate color.
    * If a log call includes an `extras={"color_message": ...}` it will be used
      for formatting the output, instead of the plain text message.
    """

    level_name_colors = {
        TRACE_LOG_LEVEL: lambda level_name: click.style(str(level_name), fg="blue"),
        logging.DEBUG: lambda level_name: click.style(str(level_name), fg="cyan"),
        logging.INFO: lambda level_name: click.style(str(level_name), fg="green"),
        logging.WARNING: lambda level_name: click.style(str(level_name), fg="yellow"),
        logging.ERROR: lambda level_name: click.style(str(level_name), fg="red"),
        logging.CRITICAL: lambda level_name: click.style(
            str(level_name), fg="bright_red"
        ),
    }

    def __init__(self, fmt=None, datefmt=None, style="%", use_colors=None):
        if use_colors in (True, False):
            self.use_colors = use_colors
        else:
            self.use_colors = sys.stdout.isatty()
        super().__init__(fmt=fmt, datefmt=datefmt, style=style)

    def color_level_name(self, level_name, level_no):
        default = lambda level_name: str(level_name)
        func = self.level_name_colors.get(level_no, default)
        return func(level_name)

    def should_use_colors(self):
        return True

    def formatMessage(self, record):
        recordcopy = copy(record)
        levelname = recordcopy.levelname
        seperator = " " * (8 - len(recordcopy.levelname))
        if self.use_colors:
            levelname = self.color_level_name(levelname, recordcopy.levelno)
            if "color_message" in recordcopy.__dict__:
                recordcopy.msg = recordcopy.__dict__["color_message"]
                recordcopy.__dict__["message"] = recordcopy.getMessage()
        recordcopy.__dict__["levelprefix"] = levelname + ":" + seperator
        return super().formatMessage(recordcopy)


class DefaultFormatter(ColourizedFormatter):
    def should_use_colors(self):
        return sys.stderr.isatty()


class AccessFormatter(ColourizedFormatter):
    status_code_colours = {
        1: lambda code: click.style(str(code), fg="bright_white"),
        2: lambda code: click.style(str(code), fg="green"),
        3: lambda code: click.style(str(code), fg="yellow"),
        4: lambda code: click.style(str(code), fg="red"),
        5: lambda code: click.style(str(code), fg="bright_red"),
    }

    def get_client_addr(self, scope):
        client = scope.get("client")
        if not client:
            return ""
        return "%s:%d" % (client[0], client[1])

    def get_path(self, scope):
        return scope.get("root_path", "") + scope["path"]

    def get_full_path(self, scope):
        path = scope.get("root_path", "") + scope["path"]
        query_string = scope.get("query_string", b"").decode("ascii")
        if query_string:
            return path + "?" + query_string
        return path

    def get_status_code(self, record):
        status_code = record.__dict__["status_code"]
        try:
            status_phrase = http.HTTPStatus(status_code).phrase
        except ValueError:
            status_phrase = ""
        status_and_phrase = "%s %s" % (status_code, status_phrase)

        if self.use_colors:
            default = lambda code: status_and_phrase
            func = self.status_code_colours.get(status_code // 100, default)
            return func(status_and_phrase)
        return status_and_phrase

    def formatMessage(self, record):
        recordcopy = copy(record)
        scope = recordcopy.__dict__["scope"]
        method = scope["method"]
        path = self.get_path(scope)
        full_path = self.get_full_path(scope)
        client_addr = self.get_client_addr(scope)
        status_code = self.get_status_code(recordcopy)
        http_version = scope["http_version"]
        request_line = "%s %s HTTP/%s" % (method, full_path, http_version)
        if self.use_colors:
            request_line = click.style(request_line, bold=True)
        recordcopy.__dict__.update(
            {
                "method": method,
                "path": path,
                "full_path": full_path,
                "client_addr": client_addr,
                "request_line": request_line,
                "status_code": status_code,
                "http_version": http_version,
            }
        )
        return super().formatMessage(recordcopy)


class SafeAtoms(dict):

    def __init__(self, atoms):
        dict.__init__(self)
        for key, value in atoms.items():
            if isinstance(value, str):
                self[key] = value.replace('"', '\\"')
            else:
                self[key] = value

    def __getitem__(self, k):
        if k.startswith("{"):
            kl = k.lower()
            if kl in self:
                return super().__getitem__(kl)
            else:
                return "-"
        if k in self:
            return super().__getitem__(k)
        else:
            return '-'


class CustomFormatter(AccessFormatter):
    atoms_wrapper_class = SafeAtoms

    def now(self):
        """ return date in Apache Common Log Format """
        return time.strftime('[%d/%b/%Y:%H:%M:%S %z]')

    def _get_user(self, environ):
        user = None
        http_auth = environ.get("HTTP_AUTHORIZATION")
        if http_auth and http_auth.lower().startswith('basic'):
            auth = http_auth.split(" ", 1)
            if len(auth) == 2:
                try:
                    # b64decode doesn't accept unicode in Python < 3.3
                    # so we need to convert it to a byte string
                    auth = base64.b64decode(auth[1].strip().encode('utf-8'))
                    # b64decode returns a byte string
                    auth = auth.decode('utf-8')
                    auth = auth.split(":", 1)
                except (TypeError, binascii.Error, UnicodeDecodeError) as exc:
                    self.debug("Couldn't get username: %s", exc)
                    return user
                if len(auth) == 2:
                    user = auth[0]
        return user

    def atoms(self, environ, request_time, scope, statuscode, created):
        headers = dict(scope.get('headers',[('-','-')]))
        response_headers = dict(scope.get('response_headers',[('-','-')]))
        atoms = {
            'h': scope.get("client", ('-', ''))[0],
            'l': '-',
            's': statuscode,
            'u': self._get_user(environ) or '-',
            't': created,
            'm': str(scope.get("method", "-")),
            'U': scope.get("path", "-"),
            'q': scope.get("query_string", "-").decode("utf-8"),
            'H': str(scope.get("type", "-")),
            'f': headers.get(b"referer", b"-").decode("utf-8"),
            'a': headers.get(b"user-agent", b"-").decode("utf-8"),
            'x-session-id': headers.get(b"x-session-id", b"-").decode("utf-8"),
            'x-google-id': headers.get(b"x-google-id", b"-").decode("utf-8"),  
            'x-server-time': response_headers.get(b"x-server-time", b"").decode("utf-8"),   
            'p': "<%s>" % os.getpid()
        }

        return atoms

    def formatMessage(self, record):
        recordcopy = copy(record)
        scope = recordcopy.__dict__["scope"]
        #pprint(vars(recordcopy))
        safe_atoms = self.atoms_wrapper_class(
            self.atoms(os.environ, datetime.now(), scope, recordcopy.status_code, recordcopy.created)
        )
        recordcopy.__dict__.update(safe_atoms)

        # pprint(vars(os.environ))
        return super().formatMessage(recordcopy)
Tildy answered 12/11, 2020 at 16:23 Comment(1)
Are you also using a middleware so you can give the scope?Queenie
S
10

I found very useful information here https://github.com/tiangolo/fastapi/issues/1508

I needed to add the request datetime , and the solution that I implemented was:

@app.on_event("startup")
async def startup_event():
    logger = logging.getLogger("uvicorn.access")
    console_formatter = uvicorn.logging.ColourizedFormatter(
        "{asctime} {levelprefix} : {message}",
        style="{", use_colors=True)
    logger.handlers[0].setFormatter(console_formatter)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Shorter answered 20/8, 2021 at 15:57 Comment(0)
O
6

I'm also using the FastAPI - Uvicorn - Gunicorn stack.

To modify the format of the uvicorn logging, I first inspected its current configuration :

>>> from pprint import pprint
>>> import uvicorn.config
>>> pprint(uvicorn.config.LOGGING_CONFIG)

{'disable_existing_loggers': False,
 'formatters': {'access': {'()': 'uvicorn.logging.AccessFormatter',
                           'fmt': '%(levelprefix)s %(client_addr)s - '
                                  '"%(request_line)s" %(status_code)s'},
                'default': {'()': 'uvicorn.logging.DefaultFormatter',
                            'fmt': '%(levelprefix)s %(message)s',
                            'use_colors': None}},
 'handlers': {'access': {'class': 'logging.StreamHandler',
                         'formatter': 'access',
                         'stream': 'ext://sys.stdout'},
              'default': {'class': 'logging.StreamHandler',
                          'formatter': 'default',
                          'stream': 'ext://sys.stderr'}},
 'loggers': {'uvicorn': {'handlers': ['default'], 'level': 'INFO'},
             'uvicorn.access': {'handlers': ['access'],
                                'level': 'INFO',
                                'propagate': False},
             'uvicorn.error': {'level': 'INFO'}},
 'version': 1}

And I created my own logging configuration based on the default configuration of uvicorn. I added the date/time of the log, and my own custom logger :

import logging

LOGGER_NAME = "myapp"

log_config = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        'access': {
            '()': 'uvicorn.logging.AccessFormatter',
            'fmt': '%(levelprefix)s %(asctime)s - %(client_addr)s - "%(request_line)s" %(status_code)s',
            "datefmt": "%Y-%m-%d %H:%M:%S",
            "use_colors": True
        },
        "default": {
            "()": "uvicorn.logging.DefaultFormatter",
            "fmt": "%(levelprefix)s %(asctime)s - %(message)s",
            "datefmt": "%Y-%m-%d %H:%M:%S",
            "use_colors": True
        },
    },
    "handlers": {
        'access': {
            'class': 'logging.StreamHandler',
            'formatter': 'access',
            'stream': 'ext://sys.stdout'
        },
        "default": {
            "formatter": "default",
            "class": "logging.StreamHandler",
            "stream": "ext://sys.stderr",
        },
    },
    "loggers": {
        LOGGER_NAME: {
            "handlers": ["default"],
            "level": "DEBUG",
            "propagate": False
        },
        "uvicorn": {
            "handlers": ["default"],
            "level": "DEBUG",
            "propagate": True
        },
        'uvicorn.access': {
            'handlers': ['access'],
            'level': 'INFO',
            'propagate': False
        },
        'uvicorn.error': {
            'level': 'INFO',
            'propagate': False
        }
    },
}


def get_logger():
    return logging.getLogger(LOGGER_NAME)

Then in my main.py file, where I defined app = FastAPI(...), I configure the logging just after my imports lines with :

logging.config.dictConfig(log_config)

And I do custom logging in my app by using the custom logger that I defined :

logger = get_logger()
logger.info("Hello World!")
Oliana answered 19/7, 2022 at 13:1 Comment(1)
This is the way! Note to the reader: you can further extend this by creating a custom Formatter class (extending logger.Formatter), implement format method how you like it and then reference your custom formatted class in this JSON.Vacuity
T
1

Our solution was a customer logger written in python that is referenced in a logging.conf file

logging.conf

[loggers]
keys=root, gunicorn.error, gunicorn.access,uvicorn.error,uvicorn.access

[handlers]
keys=console, error_file, access_file, accesscustom

[formatters]
keys=generic, access, AccessFormatter

[logger_root]
level=INFO
handlers=console
propagate=1

[logger_gunicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=gunicorn.error

[logger_gunicorn.access]
level=INFO
handlers=accesscustom
propagate=0
qualname=gunicorn.access

[logger_uvicorn.error]
level=INFO
handlers=error_file
propagate=0
qualname=uvicorn.error

[logger_uvicorn.access]
level=INFO
handlers=accesscustom
propagate=0
qualname=uvicorn.access

[handler_console]
class=StreamHandler
formatter=generic
args=(sys.stdout, )

[handler_error_file]
class=StreamHandler
formatter=generic
args=(sys.stdout, )

[handler_access_file]
class=StreamHandler
formatter=access
args=(sys.stdout, )

[handler_accesscustom]
class=StreamHandler
formatter=AccessFormatter
args=(sys.stdout, )

[formatter_generic]
format=%(levelname)s: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=uvicorn.logging.DefaultFormatter

[formatter_access]
format=%(levelname)s: %(message)s
datefmt=%Y-%m-%dT%H:%M:%S
class=customlogger.CustomFormatter

[formatter_AccessFormatter]
format={"event":"access_log","ip":"%(h)s","status":"%(s)s","method":"%(m)s","path":"%(U)s","referer":"%(f)s","x_session_id":"%(x-session-id)s","x_google_id":"%(x-google-id)s","x_server_time":"%(x-server-time)s","agent":"%(a)s"}
datefmt=%Y-%m-%dT%H:%M:%S
class=customlogger.CustomFormatter

customlogger.py


import base64
import binascii
import http
import logging
import os
import sys
import time
from copy import copy
from datetime import datetime
from pprint import pprint
import click

TRACE_LOG_LEVEL = 5


class ColourizedFormatter(logging.Formatter):
    """
    A custom log formatter class that:
    * Outputs the LOG_LEVEL with an appropriate color.
    * If a log call includes an `extras={"color_message": ...}` it will be used
      for formatting the output, instead of the plain text message.
    """

    level_name_colors = {
        TRACE_LOG_LEVEL: lambda level_name: click.style(str(level_name), fg="blue"),
        logging.DEBUG: lambda level_name: click.style(str(level_name), fg="cyan"),
        logging.INFO: lambda level_name: click.style(str(level_name), fg="green"),
        logging.WARNING: lambda level_name: click.style(str(level_name), fg="yellow"),
        logging.ERROR: lambda level_name: click.style(str(level_name), fg="red"),
        logging.CRITICAL: lambda level_name: click.style(
            str(level_name), fg="bright_red"
        ),
    }

    def __init__(self, fmt=None, datefmt=None, style="%", use_colors=None):
        if use_colors in (True, False):
            self.use_colors = use_colors
        else:
            self.use_colors = sys.stdout.isatty()
        super().__init__(fmt=fmt, datefmt=datefmt, style=style)

    def color_level_name(self, level_name, level_no):
        default = lambda level_name: str(level_name)
        func = self.level_name_colors.get(level_no, default)
        return func(level_name)

    def should_use_colors(self):
        return True

    def formatMessage(self, record):
        recordcopy = copy(record)
        levelname = recordcopy.levelname
        seperator = " " * (8 - len(recordcopy.levelname))
        if self.use_colors:
            levelname = self.color_level_name(levelname, recordcopy.levelno)
            if "color_message" in recordcopy.__dict__:
                recordcopy.msg = recordcopy.__dict__["color_message"]
                recordcopy.__dict__["message"] = recordcopy.getMessage()
        recordcopy.__dict__["levelprefix"] = levelname + ":" + seperator
        return super().formatMessage(recordcopy)


class DefaultFormatter(ColourizedFormatter):
    def should_use_colors(self):
        return sys.stderr.isatty()


class AccessFormatter(ColourizedFormatter):
    status_code_colours = {
        1: lambda code: click.style(str(code), fg="bright_white"),
        2: lambda code: click.style(str(code), fg="green"),
        3: lambda code: click.style(str(code), fg="yellow"),
        4: lambda code: click.style(str(code), fg="red"),
        5: lambda code: click.style(str(code), fg="bright_red"),
    }

    def get_client_addr(self, scope):
        client = scope.get("client")
        if not client:
            return ""
        return "%s:%d" % (client[0], client[1])

    def get_path(self, scope):
        return scope.get("root_path", "") + scope["path"]

    def get_full_path(self, scope):
        path = scope.get("root_path", "") + scope["path"]
        query_string = scope.get("query_string", b"").decode("ascii")
        if query_string:
            return path + "?" + query_string
        return path

    def get_status_code(self, record):
        status_code = record.__dict__["status_code"]
        try:
            status_phrase = http.HTTPStatus(status_code).phrase
        except ValueError:
            status_phrase = ""
        status_and_phrase = "%s %s" % (status_code, status_phrase)

        if self.use_colors:
            default = lambda code: status_and_phrase
            func = self.status_code_colours.get(status_code // 100, default)
            return func(status_and_phrase)
        return status_and_phrase

    def formatMessage(self, record):
        recordcopy = copy(record)
        scope = recordcopy.__dict__["scope"]
        method = scope["method"]
        path = self.get_path(scope)
        full_path = self.get_full_path(scope)
        client_addr = self.get_client_addr(scope)
        status_code = self.get_status_code(recordcopy)
        http_version = scope["http_version"]
        request_line = "%s %s HTTP/%s" % (method, full_path, http_version)
        if self.use_colors:
            request_line = click.style(request_line, bold=True)
        recordcopy.__dict__.update(
            {
                "method": method,
                "path": path,
                "full_path": full_path,
                "client_addr": client_addr,
                "request_line": request_line,
                "status_code": status_code,
                "http_version": http_version,
            }
        )
        return super().formatMessage(recordcopy)


class SafeAtoms(dict):

    def __init__(self, atoms):
        dict.__init__(self)
        for key, value in atoms.items():
            if isinstance(value, str):
                self[key] = value.replace('"', '\\"')
            else:
                self[key] = value

    def __getitem__(self, k):
        if k.startswith("{"):
            kl = k.lower()
            if kl in self:
                return super().__getitem__(kl)
            else:
                return "-"
        if k in self:
            return super().__getitem__(k)
        else:
            return '-'


class CustomFormatter(AccessFormatter):
    atoms_wrapper_class = SafeAtoms

    def now(self):
        """ return date in Apache Common Log Format """
        return time.strftime('[%d/%b/%Y:%H:%M:%S %z]')

    def _get_user(self, environ):
        user = None
        http_auth = environ.get("HTTP_AUTHORIZATION")
        if http_auth and http_auth.lower().startswith('basic'):
            auth = http_auth.split(" ", 1)
            if len(auth) == 2:
                try:
                    # b64decode doesn't accept unicode in Python < 3.3
                    # so we need to convert it to a byte string
                    auth = base64.b64decode(auth[1].strip().encode('utf-8'))
                    # b64decode returns a byte string
                    auth = auth.decode('utf-8')
                    auth = auth.split(":", 1)
                except (TypeError, binascii.Error, UnicodeDecodeError) as exc:
                    self.debug("Couldn't get username: %s", exc)
                    return user
                if len(auth) == 2:
                    user = auth[0]
        return user

    def atoms(self, environ, request_time, scope, statuscode, created):
        headers = dict(scope.get('headers',[('-','-')]))
        response_headers = dict(scope.get('response_headers',[('-','-')]))
        atoms = {
            'h': scope.get("client", ('-', ''))[0],
            'l': '-',
            's': statuscode,
            'u': self._get_user(environ) or '-',
            't': created,
            'm': str(scope.get("method", "-")),
            'U': scope.get("path", "-"),
            'q': scope.get("query_string", "-").decode("utf-8"),
            'H': str(scope.get("type", "-")),
            'f': headers.get(b"referer", b"-").decode("utf-8"),
            'a': headers.get(b"user-agent", b"-").decode("utf-8"),
            'x-session-id': headers.get(b"x-session-id", b"-").decode("utf-8"),
            'x-google-id': headers.get(b"x-google-id", b"-").decode("utf-8"),  
            'x-server-time': response_headers.get(b"x-server-time", b"").decode("utf-8"),   
            'p': "<%s>" % os.getpid()
        }

        return atoms

    def formatMessage(self, record):
        recordcopy = copy(record)
        scope = recordcopy.__dict__["scope"]
        #pprint(vars(recordcopy))
        safe_atoms = self.atoms_wrapper_class(
            self.atoms(os.environ, datetime.now(), scope, recordcopy.status_code, recordcopy.created)
        )
        recordcopy.__dict__.update(safe_atoms)

        # pprint(vars(os.environ))
        return super().formatMessage(recordcopy)
Tildy answered 12/11, 2020 at 16:23 Comment(1)
Are you also using a middleware so you can give the scope?Queenie

© 2022 - 2024 — McMap. All rights reserved.