로깅 요리책

저자

Vinay Sajip <vinay_sajip at red-dove dot com>

이 페이지는 과거에 유용했던 로깅 관련 조리법을 많이 포함하고 있습니다.

여러 모듈에서 로깅 사용하기

logging.getLogger('someLogger') 를 여러 번 호출하면 같은 로거 객체에 대한 참조가 반환됩니다. 같은 모듈 내에서뿐만 아니라, 같은 파이썬 인터프리터 프로세스에 있는 한, 여러 모듈에서도 마찬가지입니다. 참조가 같은 객체를 가리킨다는 것에 더해, 응용 프로그램 코드는 하나의 모듈에서 부모 로거를 정의 및 구성하고 별도의 모듈에서 자식 로거를 생성 (구성하지 않음) 할 수 있으며, 자식에 대한 모든 로거 호출은 부모로 전달됩니다. 다음은 메인 모듈입니다:

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

다음은 보조 모듈입니다:

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

출력은 이렇게 됩니다:

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

여러 스레드에서 로깅 하기

여러 스레드에서 로깅 하는데 특별한 노력이 필요하지는 않습니다. 다음 예제에서는 메인 (최초) 스레드와 다른 스레드에서의 로깅을 보여줍니다:

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

실행하면 스크립트는 다음과 같이 인쇄합니다:

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

예상대로 로그 출력이 산재해 있음을 볼 수 있습니다. 물론, 이 방법은 여기에 표시된 것보다 많은 스레드에서도 작동합니다.

다중 처리기 및 포매터

로거는 일반 파이썬 객체입니다. addHandler() 메서드에는 추가할 수 있는 처리기의 수에 대한 최소 또는 최대 할당량이 없습니다. 때로는 응용 프로그램이 모든 심각도의 모든 메시지를 텍스트 파일에 기록하는 동시에, 에러 또는 그 이상을 콘솔에 기록하는 것이 유용 할 수 있습니다. 이렇게 설정하려면, 적절한 처리기를 구성하기만 하면 됩니다. 응용 프로그램 코드의 로깅 호출은 변경되지 않습니다. 다음은 앞의 간단한 모듈 기반 구성 예제를 약간 수정 한 것입니다:

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

‘응용 프로그램’ 코드는 여러 처리기에 신경 쓰지 않습니다. 변경된 것은 fh 라는 새로운 처리기의 추가 및 구성뿐입니다.

중요도가 높거나 낮은 필터를 사용하여 새 처리기를 만드는 기능은 응용 프로그램을 작성하고 테스트할 때 매우 유용합니다. 디버깅을 위해 많은 print 문을 사용하는 대신에 logger.debug 를 사용하십시오: 나중에 삭제하거나 주석 처리해야 할 print 문과 달리, logger.debug 문은 소스 코드에서 그대로 유지될 수 있고, 그들을 다시 필요로 할 때까지 휴면 상태로 남아 있습니다. 그때, 필요한 유일한 변경은 로거 또는 처리기의 심각도 수준을 DEBUG로 수정하는 것입니다.

여러 대상으로 로깅 하기

다른 메시지 포맷으로 다른 상황에서 콘솔과 파일에 기록하려고 한다고 가정 해 봅시다. DEBUG 이상 수준의 메시지를 파일에 기록하고, 수준 INFO 이상인 메시지를 콘솔에 기록하려고 한다고 가정 해보십시오. 또한, 타임스탬프가 파일에는 포함되어야 하지만, 콘솔 메시지에는 없어야 한다고 가정합시다. 이렇게 하면 됩니다:

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/temp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

실행하면 콘솔에는 다음과 같이 출력됩니다.

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

파일에는 이렇게 기록됩니다.

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

보시다시피 DEBUG 메시지는 파일에만 표시됩니다. 다른 메시지는 두 목적지로 전송됩니다.

이 예제는 콘솔과 파일 처리기를 사용하지만, 여러분이 선택하는 처리기의 수나 조합에 제약이 없습니다.

구성 서버 예제

다음은 로깅 구성 서버를 사용하는 모듈의 예입니다:

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warning('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

다음은 파일 이름을 받아서, 그 파일을 새 로깅 구성으로 (이진 인코딩된 길이를 적절하게 앞에 붙여서) 서버로 보내는 스크립트입니다:

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

블록 하는 처리기 다루기

때로는 로깅 처리기가 로깅 중인 스레드를 블록 하지 않고 작업을 수행해야 하는 경우가 있습니다. 이것은 웹 응용 프로그램에서 흔히 나타나는 요구사항이지만, 물론 다른 시나리오에서도 발생합니다.

흔히 느린 행동을 보이는 범인은 SMTPHandler 입니다: 개발자의 통제 밖에 있는 여러 가지 이유로, 이메일을 보내는 데 오랜 시간이 걸릴 수 있습니다 (예를 들어, 잘 동작하지 않는 메일 또는 네트워크 인프라). 그러나 거의 모든 네트워크 기반 처리기는 블록 할 수 있습니다. SocketHandler 작업도 너무 느린 DNS 질의를 이면에서 수행 할 수 있습니다 (그리고 이 질의는 여러분의 통제 밖에 있는, 파이썬 계층 아래의 소켓 라이브러리 코드 깊숙이 있을 수 있습니다).

한 가지 해결책은 두 부분으로 된 접근법을 사용하는 것입니다. 첫 번째 부분에서는, 성능이 중요한 스레드에서 액세스하는 로거에 QueueHandler 만 붙입니다. 그들은 단순히 큐에 씁니다. 충분한 용량으로 큐의 크기를 조정하거나, 크기의 상한이 없도록 초기화 할 수 있습니다. 큐에 대한 쓰기는 일반적으로 신속하게 받아들여지지만, 코드에서 예방책으로 queue.Full 예외를 잡아야 할 것입니다. 코드에 성능이 중요한 스레드가 있는 라이브러리 개발자인 경우, 코드를 사용할 다른 개발자의 이익을 위해 이것을 (여러분의 로거에 QueueHandlers 만 붙이라는 제안과 함께) 문서로 만들어야 합니다.

해결책의 두 번째 부분은 QueueListener며, 이는 QueueHandler 에 상응하여 설계되었습니다. QueueListener 는 매우 간단합니다: 큐와 처리기를 넘겨주면 QueueHandlers (또는 LogRecords 의 다른 소스)에서 보낸 LogRecord를 큐에서 수신하는 내부 스레드를 시작합니다. LogRecords 는 큐에서 제거되고 처리를 위해 처리기로 전달됩니다.

별도의 QueueListener 클래스를 사용하면 같은 인스턴스를 사용하여 여러 개의 QueueHandlers 를 처리할 수 있다는 장점이 있습니다. 이것은 특별한 이점 없이 처리기당 하나의 스레드를 먹게 되는 기존의 처리기 클래스의 스레드 버전을 만드는 것보다 자원 친화적입니다.

이 두 클래스를 사용하는 예제는 다음과 같습니다 (임포트 생략):

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

실행하면, 다음과 같은 결과를 만듭니다:

MainThread: Look out!

버전 3.5에서 변경: 파이썬 3.5 이전 버전에서는, QueueListener 는 항상 큐에서 받은 모든 메시지를 초기화될 때 제공된 모든 처리기로 전달했습니다. (이것은 큐가 채워질 때 수준 필터링이 모두 반대편에서 행해졌다고 가정했기 때문입니다.) 3.5 이후부터, 이 동작은 키워드 인자 respect_handler_level=True 를 리스너의 생성자에 전달함으로써 변경될 수 있습니다. 이렇게 할 때, 리스너는 각 메시지의 수준을 처리기의 수준과 비교하여, 적절한 메시지만 처리기에 전달되도록 합니다.

네트워크에서 로깅 이벤트 보내고 받기

네트워크를 통해 로깅 이벤트를 보내고, 받는 쪽에서 처리하려고 한다고 합시다. 이렇게 하는 간단한 방법은 SocketHandler 인스턴스를 보내는 쪽의 루트 로거에 연결하는 것입니다:

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

수신 측에서는 socketserver 모듈을 사용하여 수신기를 구성할 수 있습니다. 기본적인 작업 예제는 다음과 같습니다:

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

먼저 서버를 실행한 다음 클라이언트를 실행합니다. 클라이언트 쪽에서는 콘솔에 아무것도 인쇄되지 않습니다. 서버 측에서 다음과 같은 내용이 보여야 합니다.:

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

일부 시나리오에서는 피클이 몇 가지 보안 문제를 수반함에 유의하십시오. 이 문제가 중요하면, makePickle() 메서드를 재정의하고 거기서 여러분의 대안을 구현해서 다른 직렬화를 사용하는 한편, 위의 스크립트가 그 직렬화를 사용하도록 수정하십시오.

로그 출력에 문맥 정보 추가

로깅 호출에 전달된 매개 변수 외에도 로깅 출력에 문맥 정보가 포함되기 원하는 경우가 있습니다. 예를 들어, 네트워크 응용 프로그램에서, (원격 클라이언트의 사용자 이름 또는 IP 주소와 같은) 클라이언트별 정보를 로그에 기록하는 것이 바람직 할 수 있습니다. 이를 달성하기 위해 extra 매개 변수를 사용할 수는 있지만, 이러한 방식으로 정보를 전달하는 것이 항상 편리하지는 않습니다. 연결마다 Logger 인스턴스를 만들고 싶을지 모르지만, 이러한 인스턴스는 가비지 수집되지 않기 때문에 좋지 않습니다. Logger 인스턴스의 수가 응용 프로그램 로깅에 사용하고자 하는 세분성 수준에 의존적일 때 이것이 실제로 문제가 되지는 않지만, Logger 인스턴스의 수가 실질적으로 무제한이 되면 관리하기 어려울 수 있습니다.

문맥 정보 전달에 LoggerAdapters 사용하기

로깅 이벤트 정보와 함께 출력되는 문맥 정보를 전달하는 쉬운 방법은 LoggerAdapter 클래스를 사용하는 것입니다. 이 클래스는 Logger처럼 보이도록 설계되어 있어서, debug(), info(), warning(), error(), exception(), critical()log()를 호출할 수 있습니다. 이 메서드들은 Logger 에 있는 것과 똑같은 서명을 가지므로, 두 형의 인스턴스를 같은 의미로 사용할 수 있습니다.

LoggerAdapter 의 인스턴스를 생성할 때, Logger 인스턴스와 문맥 정보가 포함된 딕셔너리류 객체를 전달합니다. LoggerAdapter 의 인스턴스에서 로깅 메서드 중 하나를 호출하면, 생성자에 전달된 하위 Logger 인스턴스에 호출을 위임하고, 이 호출에 문맥 정보를 전달하도록 배치합니다. 다음은 LoggerAdapter 코드에서 발췌한 내용입니다:

def debug(self, msg, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

LoggerAdapterprocess() 메서드는 문맥 정보가 로그 출력에 추가되는 곳입니다. 로깅 호출의 메시지 및 키워드 인자를 받아서, 하부 로거에 대한 호출에서 사용될 (대체로) 수정된 버전을 돌려줍니다. 이 메서드의 기본 구현은 메시지는 그대로 두고, 키워드 인자에 생성자로 전달된 딕셔너리류 객체를 값으로 갖는 ‘extra’ 키를 삽입합니다. 물론, 어댑터에 대한 호출에서 ‘extra’ 키워드 인자를 전달한 경우 자동으로 덮어씁니다.

‘extra’를 사용하는 장점은, 딕셔너리류 객체에 들어있는 값이 LogRecord 인스턴스의 __dict__에 병합되어, 키에 대해 알고 있는 Formatter 인스턴스로 사용자 정의된 문자열을 사용할 수 있게 된다는 것입니다. 다른 방법이 필요한 경우, 가령 메시지 문자열의 앞이나 뒤에 문맥 정보를 덧붙이려는 경우, LoggerAdapter 의 서브 클래스를 만들고, 필요한 작업을 수행하기 위해 process() 를 재정의해야 합니다. 다음은 간단한 예제입니다:

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

이런 식으로 사용할 수 있습니다:

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

그러면 어댑터에 로그 하는 모든 이벤트는 로그 메시지 앞에 some_conn_id 값이 붙습니다.

딕셔너리 이외의 객체를 사용하여 문맥 정보 전달하기

실제 딕셔너리를 LoggerAdapter 에 전달할 필요는 없습니다 - 로깅에 딕셔너리처럼 보일 수 있도록, __getitem____iter__ 를 구현하는 클래스의 인스턴스를 전달할 수 있습니다. 값을 동적으로 생성하려는 경우 (반면에 딕셔너리에 들어있는 값은 바뀌지 않습니다) 유용합니다.

문맥 정보 전달에 필터 사용하기

사용자 정의 Filter를 사용하여 로그 출력에 문맥 정보를 추가할 수도 있습니다. Filter 인스턴스는 전달된 LogRecords 를 수정할 수 있는데, 어트리뷰트를 추가해서 적절한 포맷 문자열이나 필요하다면 사용자 정의 Formatter를 사용해서 출력되도록 할 수 있습니다.

예를 들어 웹 응용 프로그램에서, 처리 중인 요청(또는 적어도 그것의 흥미로운 부분)을 스레드 로컬 (threading.local) 변수에 저장한 다음, Filter 에서 액세스해서, 요청에서 온 정보를 - 원격 IP 주소와 원격 사용자의 사용자 이름이라고 합시다 - 위의 LoggerAdapter 예제에서와같이 어트리뷰트 이름 ‘ip’와 ‘user’를 사용하여 LogRecord 에 추가할 수 있습니다. 이 경우 같은 포맷 문자열을 사용하여 위에 표시된 것과 비슷한 출력을 얻을 수 있습니다. 다음은 스크립트 예입니다:

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

실행하면 다음과 같은 결과가 나옵니다:

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

여러 프로세스에서 단일 파일에 로깅 하기

logging 이 스레드-안전하고, 단일 프로세스의 여러 스레드에서 단일 파일로 로깅 하는 것이 지원되지만, 파이썬에서 여러 프로세스가 단일 파일에 액세스하는 것을 직렬화하는 표준적인 방법이 없으므로, 여러 프로세스에서 단일 파일로 로깅 하는 것은 지원되지 않습니다. 여러 프로세스에서 하나의 파일에 로그 해야 하는 경우, 이 작업을 수행하는 한 가지 방법은 모든 프로세스가 로그를 SocketHandler 에 기록하고, 소켓에서 읽어서 파일로 로그 하는 소켓 서버를 구현하는 별도의 프로세스를 사용하는 것입니다. (원한다면, 기존 프로세스 중 하나에서 한 스레드가 이 기능을 전담하도록 할 수 있습니다.) 이 섹션에서 이 접근법을 더 자세하게 설명하고, 여러분의 응용 프로그램에 적용하기 위한 출발점으로 사용할 수 있는 작동하는 소켓 수신기를 제공합니다.

multiprocessing 모듈을 포함하는 최신 버전의 파이썬을 사용하고 있다면, 이 모듈의 Lock 클래스를 사용하는 독자적인 처리기를 작성하여 여러 프로세스에서 파일에 액세스하는 것을 직렬화 할 수 있습니다. 기존 FileHandler 와 서브 클래스들은, 앞으로는 가능할 수 있지만, 현재 multiprocessing을 사용하지 않습니다. 현재 multiprocessing 모듈이 모든 플랫폼에서 작동하는 록 기능을 제공하지는 않는다는 것에 유의하십시오 (https://bugs.python.org/issue3770 를 참조하세요).

또는, QueueQueueHandler 를 사용하여, 모든 로깅 이벤트를 다중 프로세스 응용 프로그램의 프로세스 중 하나에 보낼 수 있습니다. 다음 예제 스크립트는 이렇게 하는 방법을 보여줍니다; 예제에서 별도의 리스너 프로세스가 다른 프로세스가 보낸 이벤트를 수신하고 자체 로깅 구성에 따라 이벤트를 기록합니다. 이 예제가 한 가지 방법만을 보여 주지만 (예를 들어, 별도의 리스너 프로세스 대신 리스너 스레드를 사용할 수도 있습니다 – 구현은 비슷할 것입니다), 리스너와 응용 프로그램의 다른 프로세스가 완전히 다른 로깅 구성을 사용하도록 허용하고, 여러분 자신의 특별한 요구 사항을 충족하는 코드의 기초로 사용할 수 있습니다:

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

위의 스크립트 변형은 로깅을 메인 프로세스의 별도의 스레드에서 유지합니다:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

이 변형은 특정 로거에 대한 구성을 적용하는 방법을 보여줍니다 - 예를 들어, foo 로거는 foo 서브 시스템의 모든 이벤트를 mplog-foo.log 파일에 저장하는 특별한 처리기를 갖고 있습니다. 이것은 메인 프로세스의 로깅 시스템이 ( 로깅 이벤트가 작업자 프로세스에서 만들어졌다 하더라도) 메시지를 적절한 대상으로 전달하는 데 사용됩니다.

Using concurrent.futures.ProcessPoolExecutor

If you want to use concurrent.futures.ProcessPoolExecutor to start your worker processes, you need to create the queue slightly differently. Instead of

queue = multiprocessing.Queue(-1)

you should use

queue = multiprocessing.Manager().Queue(-1)  # also works with the examples above

and you can then replace the worker creation from this:

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

to this (remembering to first import concurrent.futures):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

파일 회전 사용하기

때로는 로그 파일이 특정 크기까지 커지도록 한 다음, 새 파일을 열고 그곳에 로그에 기록하려고 할 수 있습니다. 이 파일들을 특정 수만 유지하고, 그 수 만큼의 파일이 만들어지면, 파일을 회전시켜 파일의 개수와 크기 모두 제한되도록 하고 싶을 수 있습니다. 이 사용 패턴을 위해, logging 패키지는 RotatingFileHandler 를 제공합니다:

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

결과는 6개의 파일이어야 하고, 각기 응용 프로그램에 대한 로그 기록의 일부입니다:

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

가장 최근의 파일은 항상 logging_rotatingfile_example.out 이며, 크기 제한에 도달할 때마다 접미사 .1 이 붙은 이름으로 변경됩니다. 기존 백업 파일 각각의 이름이 변경되어 접미사가 증가하고 ( .1.2 가 되는 등) .6 파일이 지워집니다.

분명, 이 예제는 로그 길이를 극단적으로 작게 설정합니다. maxBytes 를 적절한 값으로 설정하고 싶을 겁니다.

대체 포매팅 스타일 사용하기

logging이 파이썬 표준 라이브러리에 추가되었을 때, 가변 내용으로 메시지를 포맷하는 유일한 방법은 %-포매팅 방법을 사용하는 것이었습니다. 그 이후로, 파이썬은 두 개의 새로운 포매팅 접근법을 얻었습니다: string.Template(파이썬 2.4에 추가됨)과 str.format()(파이썬 2.6에 추가됨).

로깅은 (3.2부터) 이 두 가지 추가 포매팅 스타일에 대해 개선된 지원을 제공합니다. Formatter 클래스는 style 이라는 추가적인 키워드 매개 변수를 취하도록 개선되었습니다. 기본값은 '%' 이지만, 다른 두 가지 포매팅 스타일에 해당하는 '{''$' 를 사용할 수 있습니다. (여러분이 기대하듯이) 이전 버전과의 호환성은 기본적으로 유지되지만, style 매개변수를 명시적으로 지정하면 str.format() 또는 string.Template 과 함께 작동하는 포맷 문자열을 지정할 수 있습니다. 다음은 가능성을 보여주기 위한 예제 콘솔 세션입니다:

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

로그로 최종 출력하기 위해 로깅 메시지를 포매팅하는 것은 개별 로깅 메시지가 구성되는 방식과 완전히 별개입니다. 개별 메시지에는 다음과 같이 %-포매팅을 사용할 수 있습니다:

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

로깅 호출(logger.debug(), logger.info() 등)은 실제 로깅 메시지 자체를 위해서는 위치 매개 변수만을 취하고, 키워드 매개 변수는 실제 로깅 호출을 어떻게 다뤄야 하는지를 지정하는 옵션을 결정하는 용도로만 사용됩니다 (예를 들어, 트레이스백 정보를 로그 해야 할지를 가리키는 exc_info 키워드 매개 변수나 로그에 추가되는 문맥 정보를 나타내는 extra 키워드 매개 변수). 그래서 여러분은 str.format() 또는 string.Template 문법을 사용하여 직접 로깅 호출을 할 수 없습니다, 내부적으로 logging 패키지가 %-포매팅을 사용하여 포맷 문자열과 변수 인자를 병합하기 때문입니다. 이전 버전과의 호환성을 유지하는 동안은 이 상황이 바뀌지 않을 것입니다. 기존 코드에 있는 모든 로깅 호출이 %-포맷 문자열을 사용하기 때문입니다.

그러나 {}- 및 $- 포매팅을 사용하여 개별 로그 메시지를 구성하는 방법이 있습니다. 메시지의 경우, 메시지 포맷 문자열로 임의의 객체를 사용할 수 있으며, logging 패키지는 실제 형식 문자열을 얻기 위해 그 객체에 대해 str() 을 호출한다는 것을 상기하십시오. 다음 두 가지 클래스를 고려하십시오:

class BraceMessage:
    def __init__(self, fmt, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

이 중 하나를 포맷 문자열 대신 사용하면, {}- 또는 $-포매팅을 사용하여 포맷된 로그 출력의 “%(message)s”, “{message}” 또는 “$message” 자리에 나타나는 실제 “message” 부분을 만들 수 있습니다. 어떤 것을 로그 하고 싶을 때마다 클래스 이름을 사용하는 것은 다소 꼴사납지만, __(두 개의 밑줄 —- gettext.gettext() 나 그 형제들의 동의어/별칭으로 사용되는 _ 과 혼동하지 마세요)와 같은 별칭을 사용하면 꽤 쓸만합니다.

위의 클래스가 파이썬에 포함되어 있지는 않지만, 아주 쉽게 여러분의 코드에 복사하여 붙여넣을 수 있습니다. 다음과 같이 사용될 수 있습니다 (wherever 라는 모듈에서 선언되었다고 가정합니다):

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

위의 예제는 print() 를 사용하여 포매팅이 어떻게 작동하는지 보여주고 있지만, 물론 이 접근법으로 실제 로깅 할 때는 logger.debug() 나 그와 유사한 것을 사용해야 합니다.

한 가지 지적할 점은, 이 접근법이 성능상으로 큰 문제가 없다는 것입니다: 실제 포매팅은 로깅 호출을 할 때가 아니라 로깅 된 메시지를 실제로 처리기가 로그로 출력할 때 (그리고 실제로 그렇게 될 때만) 발생합니다. 그래서 여러분이 실수할 수도 있을 특이함은 괄호가 포맷 문자열과 인자들을 모두 감싼다는 것뿐입니다. __ 표기법이 단지 XXXMessage 클래스 중 하나에 대한 생성자 호출의 편의 문법이기 때문입니다.

원한다면, LoggerAdapter 를 사용하여 다음 예제와 같이 위와 유사한 효과를 얻을 수 있습니다:

import logging

class Message(object):
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def __init__(self, logger, extra=None):
        super(StyleAdapter, self).__init__(logger, extra or {})

    def log(self, level, msg, *args, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger._log(level, Message(msg, args), (), **kwargs)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

이 스크립트는 파이썬 3.2 이상에서 실행될 때 Hello, world! 라는 메시지를 기록해야 합니다.

사용자 정의 LogRecord

모든 로깅 이벤트는 LogRecord 인스턴스로 표현됩니다. 이벤트가 로그 되고 로거 수준에 의해 필터링 되지 않으면, LogRecord 가 생성되고 이벤트에 대한 정보로 채워진 다음 해당 로거(와 그 조상들, 계층 상위로의 전파가 비활성화된 지점의 로거까지)의 처리기로 전달됩니다. 파이썬 3.2 이전에는, 이 생성이 일어나는 곳이 두 곳밖에 없었습니다:

  • Logger.makeRecord(), 이벤트 로깅의 일반적인 프로세스에서 호출됩니다. 인스턴스를 생성하기 위해 LogRecord 를 직접 호출합니다.

  • makeLogRecord(), LogRecord에 추가될 어트리뷰트를 포함하는 딕셔너리와 함께 호출됩니다. 보통 적절한 딕셔너리가 네트워크를 통해 (예를 들어, SocketHandler 를 통해 피클 형태로, 또는 HTTPHandler 를 통해 JSON 형식으로) 수신될 때 호출됩니다.

이것은 보통 LogRecord 로 특별한 것을 할 필요가 있다면, 다음 중 하나를 해야 한다는 것을 의미합니다.

  • Logger.makeRecord() 를 재정의하는 자신만의 Logger 서브 클래스를 만들고, 관심 있는 로거의 인스턴스가 만들어지기 전에 setLoggerClass() 를 사용하여 설정하십시오.

  • 로거나 처리기에 Filter를 추가해서 filter() 메서드가 호출될 때 필요한 특별한 조작을 하십시오.

첫 번째 접근법은 여러 라이브러리가 서로 다른 일을 하고 싶어 하는 시나리오에서는 다루기 힘들 것입니다. 각자 자신의 Logger 서브 클래스를 설정하려고 시도할 것이고, 마지막 것이 이기게 될 것입니다.

두 번째 접근법은 많은 경우에 합리적으로 잘 작동하지만, LogRecord 의 특별한 서브 클래스를 사용할 수는 없습니다. 라이브러리 개발자는 로거에 적절한 필터를 설정할 수 있지만, 새로운 로거를 도입할 때마다 이를 수행해야 한다는 것을 기억해야 합니다. 이를 고려하지 않는다면 새 패키지나 모듈을 추가하고 모듈 수준에서 단순히 다음과 같이 합니다:

logger = logging.getLogger(__name__)

이것은 아마도 고려해야 할 많은 것 중 하나일 뿐입니다. 개발자는 자신의 최상위 로거에 첨부된 NullHandler 에도 필터를 추가 할 수 있지만, 응용 프로그램 개발자가 하위 수준 라이브러리 로거에 처리기를 연결하면 호출되지 않습니다 — 그래서 그 처리기로부터의 출력은 라이브러리 개발자의 의도를 반영하지 못합니다.

파이썬 3.2 이상에서는, LogRecord 생성이 사용자가 지정할 수 있는 팩토리를 통해 수행됩니다. 팩토리는 setLogRecordFactory() 로 설정할 수 있고, getLogRecordFactory() 로 조회할 수 있는 콜러블입니다. 팩토리는 LogRecord 생성자와 같은 서명으로 호출되고, 기본 설정은 LogRecord 입니다.

이 방법을 사용하면 사용자 정의 팩토리가 LogRecord 생성의 모든 측면을 제어 할 수 있습니다. 예를 들어, 서브 클래스를 반환하거나, 다음과 같은 방법으로 생성된 레코드에 어트리뷰트를 추가 할 수 있습니다:

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

이 패턴은 서로 다른 라이브러리가 팩토리를 체인으로 연결할 수 있도록 하며, 서로의 어트리뷰트를 덮어쓰거나 의도하지 않게 표준으로 제공된 어트리뷰트를 덮어쓰지 않는 한, 놀랄 일은 없어야 합니다. 그러나 체인의 각 고리는 모든 로깅 작업에 실행시간 오버헤드를 추가하므로, Filter를 사용해서 원하는 결과를 얻을 수 없을 때만 이 기법을 사용해야 합니다.

QueueHandler 서브 클래스 만들기 - ZeroMQ 예제

QueueHandler 서브 클래스를 사용하여 다른 유형의 큐에 메시지를 보낼 수 있습니다, 예를 들어 ZeroMQ ‘publish’ 소켓. 아래 예제에서, 소켓은 별도로 생성되어 처리기로 (‘queue’로) 전달됩니다:

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)


handler = ZeroMQSocketHandler(sock)

물론 구성하는 다른 방법이 있습니다. 예를 들어 처리기가 소켓을 만드는데 필요한 데이터를 전달하는 것입니다:

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

QueueListener 서브 클래스 만들기 - ZeroMQ 예제

다른 유형의 큐에서 메시지를 받기 위해 QueueListener 의 서브 클래스를 만들 수도 있습니다, 예를 들어 ZeroMQ ‘subscribe’ 소켓. 다음은 그 예입니다:

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json()
        return logging.makeLogRecord(msg)

더 보기

모듈 logging

logging 모듈에 대한 API 레퍼런스

모듈 logging.config

logging 모듈용 구성 API.

모듈 logging.handlers

logging 모듈에 포함된 유용한 처리기.

기초 로깅 자습서

고급 로깅 자습서

딕셔너리 기반 구성의 예

다음은 로깅 구성 딕셔너리의 예입니다 - 장고 프로젝트 설명서 에서 가져왔습니다. 이 딕셔너리를 dictConfig() 로 전달하여 구성을 적용합니다:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level':'DEBUG',
            'class':'logging.StreamHandler',
            'formatter': 'simple'
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers':['null'],
            'propagate': True,
            'level':'INFO',
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

이 구성에 대한 더 자세한 정보는 장고 설명서의 관련 섹션 을 참조하세요.

rotator와 namer를 사용해서 로그 회전 처리하기

다음 코드 조각에 namer 와 rotator를 정의하는 예가 있는데, 로그 파일을 zlib 기반으로 압축합니다:

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, "rb") as sf:
        data = sf.read()
        compressed = zlib.compress(data, 9)
        with open(dest, "wb") as df:
            df.write(compressed)
    os.remove(source)

rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer

이것은 “진짜” .gz 파일이 아닙니다. 단순히 압축된 데이터일 뿐이고, 실제 gzip 파일에서 찾을 수 있는 “컨테이너” 가 없습니다. 이 코드 조각은 단지 설명을 위한 것일 뿐입니다.

좀 더 정교한 multiprocessing 예제

다음 동작하는 예제에서는 구성 파일을 사용하여 로깅을 multiprocessing과 함께 사용하는 방법을 보여줍니다. 구성은 매우 간단하지만, 실제 multiprocessing 시나리오에서 더 복잡한 구성을 구현할 수 있음을 예시합니다.

이 예에서, 주 프로세스는 리스너 프로세스와 몇 개의 작업자 프로세스를 생성합니다. 주 프로세스, 리스너 및 작업자를 위한 세 가지 구성이 있습니다 (모든 작업자는 같은 구성을 공유합니다). 주 프로세스에서의 로깅, 작업자가 QueueHandler에 로그 하는 방법, 그리고 리스너가 QueueListener 및 더욱 복잡한 로깅 구성을 구현하고 큐를 통해 수신한 이벤트를 구성에서 지정된 처리기로 전달하도록 배치하는 는 방법을 볼 수 있습니다. 이러한 구성은 설명을 위한 것이지만, 이 예제를 여러분 자신의 시나리오에 적용할 수 있어야 합니다.

스크립트는 다음과 같습니다 - 독스트링과 주석이 어떻게 작동하는지 잘 설명하기를 바랍니다:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """
    def handle(self, record):
        logger = logging.getLogger(record.name)
        # The process name is transformed just to show that it's the listener
        # doing the logging to files and console
        record.processName = '%s (for %s)' % (current_process().name, record.processName)
        logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console']
        },
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q,
            },
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['queue']
        },
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
                'formatter': 'simple',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

SysLogHandler로 전송된 메시지에 BOM 삽입하기

RFC 5424 는 유니코드 메시지가 다음 구조를 갖는 바이트들로 syslog 데몬에 전송되어야 함을 요구합니다: 선택적인 순수 ASCII 구성 요소, 그 뒤를 이어 UTF-8 바이트 순서 표식 (BOM), 그 뒤를 이어 UTF-8으로 인코딩된 유니코드. (이 규격의 관련 절 을 참조하십시오.)

파이썬 3.1에서, BOM을 메시지에 삽입하는 코드가 SysLogHandler 에 추가되었지만, 유감스럽게도, BOM이 메시지의 시작 부분에 나타나서 그 앞에 순수 ASCII 구성 요소를 허락하지 않도록 잘못 구현되었습니다.

이 동작이 잘못됨에 따라, 잘못된 BOM 삽입 코드가 파이썬 3.2.4 이상에서 제거되었습니다. 그러나, 올바른 코드로 대체되지는 않았고, BOM을 포함하고, 그 앞에 순수 ASCII 시퀀스, 그 뒤에 UTF-8으로 인코딩된 임의의 유니코드로 구성된 RFC 5424-호환 메시지를 생성하려는 경우 다음과 같이 해야 합니다:

  1. Formatter 인스턴스를 SysLogHandler 인스턴스에 다음과 같은 포맷 문자열과 함께 첨부하십시오:

    'ASCII section\ufeffUnicode section'
    

    유니코드 코드 포인트 U+FEFF는, UTF-8을 사용하여 인코딩될 때, UTF-8 BOM으로 인코딩됩니다 – 바이트열 b'\xef\xbb\xbf'.

  2. ASCII section을 원하는 자리 표시기로 바꾸십시오. 그러나 치환 후 나타나는 데이터가 항상 ASCII임미 보장되어야 합니다 (그렇게 되면, UTF-8 인코딩 이후에는 변경되지 않은 채로 유지됩니다).

  3. Unidcode section을 원하는 자리 표시기로 바꾸십시오; 치환 후 나타나는 데이터에 ASCII 범위를 벗어나는 문자가 포함되어 있어도 괜찮습니다 – UTF-8을 사용하여 인코딩됩니다.

포맷된 된 메시지는 SysLogHandler 에 의해 UTF-8 인코딩을 사용하여 인코딩*됩니다*. 위의 규칙을 따르는 경우, RFC 5424-호환 메시지를 생성할 수 있어야 합니다. 그렇지 않으면, logging이 불평하지 않을 수도 있지만, 메시지가 RFC 5424와 호환되지 않고 syslog 데몬이 불평 할 수 있습니다.

구조적 로깅 구현

대부분의 로깅 메시지는 사람이 읽을 수 있도록 만들어졌기 때문에 쉽게 기계에서 파싱 할 수 없지만, 프로그램에서 (복잡한 정규식을 사용하지 않고도) 구문 분석할 수 있는 구조화된 포맷으로 메시지를 출력하려는 상황이 있을 수 있습니다. 이것은 logging 패키지를 사용하여 쉽게 달성 할 수 있습니다. 이것이 달성될 수 있는 여러 가지 방법이 있지만, 다음은 JSON을 사용하여 기계가 파싱할 수 있는 방식으로 이벤트를 직렬화하는 간단한 접근법입니다:

import json
import logging

class StructuredMessage(object):
    def __init__(self, message, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

위의 스크립트가 실행되면 다음과 같이 인쇄됩니다:

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

항목의 순서는 사용된 파이썬 버전에 따라 다를 수 있습니다.

좀 더 특별한 처리가 필요한 경우, 다음 예제와 같이 사용자 정의 JSON 인코더를 사용할 수 있습니다:

from __future__ import unicode_literals

import json
import logging

# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
    unicode
except NameError:
    unicode = str

class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, unicode):
            return o.encode('unicode_escape').decode('ascii')
        return super(Encoder, self).default(o)

class StructuredMessage(object):
    def __init__(self, message, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

위의 스크립트를 실행하면 다음과 같이 인쇄합니다:

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

항목의 순서는 사용된 파이썬 버전에 따라 다를 수 있습니다.

dictConfig()로 처리기를 사용자 정의하기

특정 상황에서 로깅 처리기를 사용자 정의하고 싶을 때가 있고, dictConfig()를 사용하고 있다면 서브 클래스를 만들지 않고도 이 작업을 수행 할 수 있습니다. 예를 들어, 로그 파일의 소유권을 설정하고 싶다고 합시다. POSIX에서, shutil.chown() 을 사용하면 쉽게 할 수 있지만, 표준 라이브러리의 파일 처리기는 내장된 지원을 제공하지 않습니다. 다음과 같은 일반 함수를 사용하여 처리기 생성을 사용자 정의 할 수 있습니다:

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

그런 다음, dictConfig() 에 전달되는 로깅 구성에서, 이 함수를 호출하여 로깅 처리기를 생성하도록 지정할 수 있습니다:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

이 예제에서는 단지 예를 들기 위해 pulse 라는 사용자와 그룹을 사용하여 소유권을 설정합니다. 작동하는 스크립트 chowntest.py 로 정리하면:

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

이것을 실행하기 위해서는, 아마도 root 로 실행해야 할 것입니다:

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

이 예제는 shutil.chown() 이 등장한 파이썬 3.3을 사용합니다. 이 접근법은 dictConfig()를 지원하는 모든 파이썬 버전에서 작동합니다 - 파이썬 2.7, 3.2 이상. 3.3 이전 버전의 경우, (예를 들어) os.chown() 을 사용하여 실제 소유권 변경을 구현해야 합니다.

실제로는, 처리기 생성 함수가 프로젝트 어딘가에 있는 유틸리티 모듈에 있을 수 있습니다. 구성에 있는 다음과 같은 줄 대신:

'()': owned_file_handler,

예를 들면 이렇게 쓸 수 있습니다:

'()': 'ext://project.util.owned_file_handler',

여기서 project.util 은 함수가 있는 패키지의 실제 이름으로 바꿀 수 있습니다. 위의 작업 스크립트에서 'ext://__main__.owned_file_handler' 를 사용해도 됩니다. 여기서, 실제 콜러블은 ext:// 스펙으로부터 dictConfig() 에 의해 결정됩니다.

이 예제는 희망하건대 다른 형태의 파일 변경을 - 예를 들어 특정 POSIX 권한 비트 설정 - 같은 방법으로 (os.chmod() 를 사용해서) 구현하는 방법도 알려줍니다.

물론 이 접근법은 FileHandler 이외의 처리기 유형으로도 확장될 수 있습니다 - 예를 들어, 회전 파일 처리기 중 하나 또는 다른 유형의 처리기 모두.

응용 프로그램 전체에서 특정 포맷 스타일 사용하기

파이썬 3.2에서, Formatterstyle 키워드 매개변수를 얻었는데, 이전 버전과의 호환성을 위해 % 를 기본값으로 사용하면서 { 또는 $ 를 지정하면 str.format()string.Template 에 의해 지원되는 포매팅 접근법을 사용할 수 있도록 합니다. 이것은 로그 되는 최종 출력으로 로깅 메시지를 포매팅하는 것과 관계된 것이고, 개별 로깅 메시지가 만들어지는 방법과는 무관함에 주의하십시오.

로깅 호출(debug(), info() 등)은 실제 로깅 메시지 자체를 위해서는 위치 매개 변수만을 취하고, 키워드 매개 변수는 실제 로깅 호출을 어떻게 다뤄야 하는지를 지정하는 옵션을 결정하는 용도로만 사용됩니다 (예를 들어, 트레이스백 정보를 로그 해야 할지를 가리키는 exc_info 키워드 매개 변수나 로그에 추가되는 문맥 정보를 나타내는 extra 키워드 매개 변수). 그래서 여러분은 str.format() 또는 string.Template 문법을 사용하여 직접 로깅 호출을 할 수 없습니다, 내부적으로 logging 패키지가 %-포매팅을 사용하여 포맷 문자열과 변수 인자를 병합하기 때문입니다. 이전 버전과의 호환성을 유지하는 동안은 이 상황이 바뀌지 않을 것입니다. 기존 코드에 있는 모든 로깅 호출이 %-포맷 문자열을 사용하기 때문입니다.

포맷 스타일을 특정 로거와 연관시키는 제안이 있었지만, 이전 버전과의 호환성 문제가 있는데, 기존 코드가 그 로거 이름으로 %-포매팅을 사용할 수 있기 때문입니다.

제삼자 라이브러리와 여러분의 코드 간에 상호 운용이 가능하도록 로깅 하려면, 개별 로깅 호출 수준에서 포매팅을 결정해야 합니다. 이렇게 할 때 대체 포매팅 스타일을 수용 할 수 있는 몇 가지 길이 열립니다.

LogRecord 팩토리 사용

파이썬 3.2에서, 위에서 언급 한 Formatter 변경 사항과 함께, logging 패키지는 setLogRecordFactory() 함수를 사용하여 사용자가 자신의 LogRecord 서브 클래스를 설정할 수 있는 기능을 얻었습니다. 이것을 사용하면, 원하는 일을 하도록 getMessage() 메서드를 재정의하는 여러분 자신의 LogRecord 서브 클래스를 설정할 수 있습니다. 이 메서드의 베이스 클래스 구현이 msg % args 포매팅이 일어나는 곳이며, 여러분이 대체 포매팅으로 치환할 수 있는 곳입니다; 그러나, 모든 포매팅 스타일을 지원하면서 다른 코드와의 상호 운용성을 보장하기 위해 %-포매팅을 기본값으로 사용하도록 주의해야 합니다. 또한, 베이스 구현과 마찬가지로 str(self.msg) 를 호출하도록 주의해야 합니다.

자세한 정보는 setLogRecordFactory()LogRecord 에 대한 레퍼런스 설명서를 참조하십시오.

사용자 정의 메시지 객체 사용

{}- 및 $-포매팅을 사용하여 개별 로그 메시지를 작성할 수 있는 또 다른, 아마도 더 간단한 방법이 있습니다. (임의의 객체를 메시지로 사용하기에서) 로깅 할 때 임의의 객체를 메시지 포맷 문자열로 사용할 수 있고, logging 패키지는 그 객체에 대해 str() 을 호출하여 실제 형식 문자열을 얻는다고 했던 것을 기억하실 수 있을 겁니다. 다음 두 클래스를 생각해봅시다:

class BraceMessage(object):
    def __init__(self, fmt, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage(object):
    def __init__(self, fmt, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

이 중 하나를 포맷 문자열 대신 사용하면, {}- 또는 $-포매팅을 사용하여 포맷된 로그 출력의 “%(message)s”, “{message}” 또는 “$message” 자리에 나타나는 실제 “message” 부분을 만들 수 있습니다. 어떤 것을 로그 하고 싶을 때마다 클래스 이름을 사용하는 것이 다소 꼴사납다면, 메시지에 M 이나 _ 과 같은 별칭을 사용해서 더 쓸만하게 만들 수 있습니다 (또는 지역화에 _ 를 사용하고 있다면, 아마도 __).

이 접근법의 예가 아래에 나와 있습니다. 먼저, str.format() 를 사용하는 포매팅입니다:

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

두 번째로, string.Template 를 사용하는 포매팅입니다:

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

한 가지 지적할 점은, 이 접근법이 성능상으로 큰 문제가 없다는 것입니다: 실제 포매팅은 로깅 호출을 할 때가 아니라 로깅 된 메시지를 실제로 처리기가 로그로 출력할 때 (그리고 실제로 그렇게 될 때만) 발생합니다. 그래서 여러분이 실수할 수도 있을 특이함은 괄호가 포맷 문자열과 인자들을 모두 감싼다는 것뿐입니다. __ 표기법이 단지 XXXMessage 클래스 중 하나에 대한 생성자 호출의 편의 문법이기 때문입니다.

dictConfig()로 필터 구성하기

dictConfig() 를 사용하여 필터를 구성할 수 있습니다. 하지만 처음에는 어떻게 해야 할지 명확하지 않을 수 있습니다 (그래서 이 조리법을 제공합니다). Filter 가 표준 라이브러리에 포함된 유일한 필터 클래스이고, 많은 요구 사항을 충족시키지는 않을 것이기 때문에 (오직 베이스 클래스로 제공됩니다), 일반적으로 filter() 메서드를 재정의하는 여러분 자신의 Filter 서브 클래스를 정의할 필요가 있습니다. 이렇게 하려면, 필터를 생성하는 데 사용될 콜러블을 필터의 구성 딕셔너리에 () 키로 지정하십시오 (클래스가 가장 분명하지만 Filter 인스턴스를 반환하는 콜러블은 모두 가능합니다). 다음은 완전한 예입니다:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

이 예제는 인스턴스를 만드는 콜러블로 키워드 매개 변수 형식으로 구성 데이터를 전달하는 방법을 보여줍니다. 실행하면, 위의 스크립트는 다음을 인쇄합니다:

changed: hello

필터가 구성된 대로 작동하고 있음을 보여줍니다.

주목해야 할 몇 가지 추가 사항:

  • 구성에서 직접 참조할 수 없는 경우 (예를 들어, 다른 모듈에 있고 구성 딕셔너리가 있는 곳에서 직접 임포트 할 수 없는 경우), 외부 객체에 대한 액세스 에 설명된 대로 ext://... 형식을 사용할 수 있습니다. 예를 들어, 위의 예에서 MyFilter 대신 'ext://__main__.MyFilter' 를 사용할 수 있습니다.

  • 필터뿐만 아니라, 이 기술을 사용자 정의 처리기 및 포매터를 구성하는데 사용할 수도 있습니다. logging이 구성에서 사용자 정의 객체를 어떻게 지원하는지에 대한 더 많은 정보는 사용자 정의 객체 를 보시고, 위의 다른 요리책 조리법 dictConfig()로 처리기를 사용자 정의하기 도 보십시오.

사용자 정의된 예외 포매팅

예외 포매팅을 사용자 정의하고 싶을 때가 있습니다 - 논쟁의 여지는 있지만, 예외 정보가 포함된 경우에도 이벤트 당 정확히 한 줄이 기록되기 원한다고 합시다. 다음 예제처럼, 사용자 정의 포매터 클래스를 사용할 수 있습니다:

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super(OneLineExceptionFormatter, self).formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super(OneLineExceptionFormatter, self).format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

실행하면, 정확하게 두 줄의 파일이 생성됩니다:

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|

위의 처리는 단순하지만, 예외 정보를 원하는 대로 포맷하는 방법을 알려줍니다. traceback 모듈은 더욱 전문화된 요구에 도움이 될 수 있습니다.

로깅 메시지 말하기

로깅 메시지를 보여주는 대신 들려주는 것이 바람직한 상황이 있을 수 있습니다. 여러분의 시스템에 텍스트-음성 변환 (TTS) 기능이 있다면 쉽습니다, 파이썬 바인딩이 없어도 됩니다. 대부분의 TTS 시스템에는 실행할 수 있는 명령행 프로그램이 있으며, 이것을 subprocess 를 사용하여 처리기에서 호출 할 수 있습니다. 여기서 TTS 명령행 프로그램이 사용자와 상호 작용하거나, 완료하는 데 오랜 시간이 걸릴 것으로 기대되지 않으며, 로그 되는 메시지의 빈도가 메시지로 사용자를 압도할 정도로 높지 않으며, 메시지는 동시에 처리되지 않고 한 번에 하나씩 읽어도 된다고 가정합니다. 아래의 예제 구현은 다음 메시지가 처리되기 전에 하나의 메시지를 다 읽을 때까지 대기하고, 이 때문에 다른 처리기가 대기 상태로 유지될 수 있습니다. 다음은 espeak TTS 패키지가 사용 가능하다고 가정하는 접근법을 보여주는 간단한 예입니다:

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

실행하면, 이 스크립트는 여성 음성으로 “Hello”와 “Goodbye”를 차례대로 말합니다.

물론 위의 접근법은 다른 TTS 시스템과 명령행에서 실행되는 외부 프로그램을 통해 메시지를 처리 할 수 있는 전혀 다른 시스템에도 적용될 수 있습니다.

로깅 메시지를 버퍼링하고 조건부 출력하기

임시 영역에 메시지를 기록하고 특정 조건이 발생할 때만 메시지를 출력하려는 상황이 있을 수 있습니다. 예를 들어, 함수에서 디버그 이벤트를 로깅 하기를 원할 수 있습니다. 함수가 에러 없이 완료되면 수집된 디버그 정보로 로그를 어지럽히고 싶지 않지만, 에러가 있으면 에러뿐만 아니라 모든 디버그 정보를 출력하고 싶습니다.

다음은 로깅이 이러한 방식으로 작동하기 원하는 함수에 데코레이터를 사용하여 이를 수행할 방법을 보여주는 예제입니다. logging.handlers.MemoryHandler 를 사용하는데, 어떤 상황이 발생할 때까지 로그 된 이벤트를 버퍼링할 수 있도록 하고, 때가 되면 버퍼링 된 이벤트들이 flush 됩니다 - 처리를 위해 다른 처리기(target 처리기)로 전달됩니다. 기본적으로, MemoryHandler 는 버퍼가 다 차거나 수준이 지정된 임계값보다 크거나 같은 이벤트가 발생하면 플러시 됩니다. 사용자 정의 플러시 동작을 원할 경우, 이 조리법을 MemoryHandler 의 더 특수한 서브 클래스와 함께 사용할 수 있습니다.

예제 스크립트에는 간단한 함수 foo 가 있는데, 모든 로그 수준을 순회하면서, 어떤 수준으로 로그 할지를 sys.stderr 에 쓴 다음, 그 수준으로 실제 메시지를 로깅 합니다. 매개 변수를 foo 에 전달할 수 있는데, 참이면 ERROR 및 CRITICAL 수준으로 로그 합니다 - 그렇지 않으면 DEBUG, INFO 및 WARNING 수준에서만 로그 합니다.

The script just arranges to decorate foo with a decorator which will do the conditional logging that’s required. The decorator takes a logger as a parameter and attaches a memory handler for the duration of the call to the decorated function. The decorator can be additionally parameterised using a target handler, a level at which flushing should occur, and a capacity for the buffer (number of records buffered). These default to a StreamHandler which writes to sys.stderr, logging.ERROR and 100 respectively.

스크립트는 다음과 같습니다:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

이 스크립트를 실행하면 다음과 같은 출력이 나타납니다.:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

보시다시피, 실제 로깅 출력은 심각도가 ERROR 이상인 이벤트가 기록될 때만 발생하지만, 이 경우 심각도가 낮은 이전 이벤트도 기록됩니다.

물론 전통적인 데코레이션 수단을 쓸 수 있습니다.:

@log_if_errors(logger)
def foo(fail=False):
    ...

구성을 통해 UTC(GMT)로 시간을 포맷하기

때로는 UTC를 사용하여 시간을 포맷하고 싶습니다. 아래에 표시된 UTCFormatter 와 같은 클래스를 사용할 수 있습니다:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

이제 Formatter 대신 코드에서 UTCFormatter 를 사용할 수 있습니다. 구성을 통해 이를 수행하려면, 다음에 나오는 완전한 예제에 의해 설명된 접근법으로 dictConfig() API를 사용할 수 있습니다:

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

이 스크립트를 실행하면, 다음과 같은 내용을 인쇄합니다:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

시간이 한 처리기에서는 UTC로, 다른 처리기에서는 지역 시간으로 포맷되는 것을 보여줍니다.

선택적 로깅을 위해 컨텍스트 관리자 사용하기

로깅 구성을 일시적으로 변경하고 무언가를 한 후에 되돌리는 것이 유용할 때가 있습니다. 이를 위해, 컨텍스트 관리자는 로깅 컨텍스트를 저장하고 복원하는 가장 분명한 방법입니다. 다음은 그러한 컨텍스트 관리자의 간단한 예입니다. 컨텍스트 관리자의 범위 안에서 선택적으로 로깅 수준을 변경하고 로깅 처리기를 추가 할 수 있습니다:

import logging
import sys

class LoggingContext(object):
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

수준 값을 지정하면, 로거의 수준은 컨텍스트 관리자가 적용되는 with 블록의 범위 안에서 해당 값으로 설정됩니다. 처리기를 지정하면, 블록 진입 시 로거에 추가되고 블록에서 빠져나갈 때 제거됩니다. 블록을 빠져나갈 때 처리기를 닫도록 관리자에게 요청할 수도 있습니다 - 더는 처리기가 필요하지 않으면 이렇게 할 수 있습니다.

작동 원리를 보여주기 위해, 다음 코드 블록을 위에 추가 할 수 있습니다:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

우리는 초기에 로거 수준을 INFO 로 설정합니다. 그래서 메시지 #1은 나타나고 메시지 #2는 나타나지 않습니다. 그다음에 with 블록에서 수준을 DEBUG 로 임시 변경하면, 메시지 #3이 나타납니다. 블록이 종료되면 로거 수준이 INFO 로 복원되므로, 메시지 #4가 표시되지 않습니다. 그다음 with 블록에서 수준을 다시 DEBUG 로 다시 설정하지만, sys.stdout 으로 쓰는 처리기도 추가합니다. 따라서 메시지 #5는 콘솔에 두 번 표시됩니다 (stderr 를 통해 한 번, stdout 을 통해 한 번). with 문장이 완료된 후에 상태는 이전과 같으므로, (메시지 #1처럼) 메시지 #6이 나타나고, (메시지 #2처럼) 메시지 #7은 보이지 않습니다.

이렇게 만든 스크립트를 실행하면, 결과는 다음과 같습니다:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

다시 실행하면서 stderr/dev/null 로 리디렉트하면, 다음과 같이 stdout 으로 출력된 메시지만 나타납니다:

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

다시 한번, 하지만 stdout/dev/null 로 리디렉트하면, 이렇게 됩니다:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

이 경우, stdout 에 인쇄된 메시지 #5는 예상대로 나타나지 않습니다.

물론 여기서 설명한 방법을 일반화 할 수 있습니다. 예를 들어 로깅 필터를 임시로 첨부 할 수 있습니다. 위의 코드는 파이썬 2와 파이썬 3에서 모두 작동합니다.

A CLI application starter template

Here’s an example which shows how you can:

  • Use a logging level based on command-line arguments

  • Dispatch to multiple subcommands in separate files, all logging at the same level in a consistent way

  • Make use of simple, minimal configuration

Suppose we have a command-line application whose job is to stop, start or restart some services. This could be organised for the purposes of illustration as a file app.py that is the main script for the application, with individual commands implemented in start.py, stop.py and restart.py. Suppose further that we want to control the verbosity of the application via a command-line argument, defaulting to logging.INFO. Here’s one way that app.py could be written:

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels)
    subparsers = parser.add_subparsers(dest='command',
                                       help='Available commands:')
    start_cmd = subparsers.add_parser('start', help='Start a service')
    start_cmd.add_argument('name', metavar='NAME',
                           help='Name of service to start')
    stop_cmd = subparsers.add_parser('stop',
                                     help='Stop one or more services')
    stop_cmd.add_argument('names', metavar='NAME', nargs='+',
                          help='Name of service to stop')
    restart_cmd = subparsers.add_parser('restart',
                                        help='Restart one or more services')
    restart_cmd.add_argument('names', metavar='NAME', nargs='+',
                             help='Name of service to restart')
    options = parser.parse_args()
    # the code to dispatch commands could all be in this file. For the purposes
    # of illustration only, we implement each command in a separate module.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('Unable to find the code for command \'%s\'' % options.command)
        return 1
    # Could get fancy here and load configuration from file or dictionary
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

And the start, stop and restart commands can be implemented in separate modules, like so for starting:

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('About to start %s', options.name)
    # actually do the command processing here ...
    logger.info('Started the \'%s\' service.', options.name)

and thus for stopping:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to stop %s', services)
    # actually do the command processing here ...
    logger.info('Stopped the %s service%s.', services, plural)

and similarly for restarting:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to restart %s', services)
    # actually do the command processing here ...
    logger.info('Restarted the %s service%s.', services, plural)

If we run this application with the default log level, we get output like this:

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

The first word is the logging level, and the second word is the module or package name of the place where the event was logged.

If we change the logging level, then we can change the information sent to the log. For example, if we want more information:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

And if we want less:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

In this case, the commands don’t print anything to the console, since nothing at WARNING level or above is logged by them.

A Qt GUI for logging

A question that comes up from time to time is about how to log to a GUI application. The Qt framework is a popular cross-platform UI framework with Python bindings using PySide2 or PyQt5 libraries.

The following example shows how to log to a Qt GUI. This introduces a simple QtHandler class which takes a callable, which should be a slot in the main thread that does GUI updates. A worker thread is also created to show how you can log to the GUI from both the UI itself (via a button for manual logging) as well as a worker thread doing work in the background (here, just logging messages at random levels with random short delays in between).

The worker thread is implemented using Qt’s QThread class rather than the threading module, as there are circumstances where one has to use QThread, which offers better integration with other Qt components.

The code should work with recent releases of either PySide2 or PyQt5. You should be able to adapt the approach to earlier versions of Qt. Please refer to the comments in the code snippet for more detailed information.

import datetime
import logging
import random
import sys
import time

# Deal with minor differences between PySide2 and PyQt5
try:
    from PySide2 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    from PyQt5 import QtCore, QtGui, QtWidgets
    Signal = QtCore.pyqtSignal
    Slot = QtCore.pyqtSlot


logger = logging.getLogger(__name__)


#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super(QtHandler, self).__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()


#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # Let the thread run until interrupted. This allows reasonably clean
        # thread termination.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            level = random.choice(LEVELS)
            logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            i += 1

#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super(Window, self).__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # Set whatever the default monospace font is for the platform
        f = QtGui.QFont('nosuchfont')
        f.setStyleHint(f.Monospace)
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # Remember to use qThreadName rather than threadName in the format string.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # Set up to terminate the QThread when we exit
        app.aboutToQuit.connect(self.force_quit)

        # Lay out all the widgets
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # Connect the non-worker slots and signals
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # Start a new worker thread and connect the slots for the worker
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # Once started, the button should be disabled
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # for qThreadName
        self.worker.moveToThread(self.worker_thread)
        # This will start an event loop in the worker thread
        self.worker_thread.start()

    def kill_thread(self):
        # Just tell the worker to stop, then tell it to quit and wait for that
        # to happen
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # For use when the window is closed
        if self.worker_thread.isRunning():
            self.kill_thread()

    # The functions below update the UI and run in the main thread because
    # that's where the slots are set up

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # This function uses the formatted message passed in, but also uses
        # information from the record to format the message in an appropriate
        # color according to its severity (level).
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()


def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    sys.exit(app.exec_())

if __name__=='__main__':
    main()