signal --- 設定非同步事件的處理函式

原始碼:Lib/signal.py


本模組提供於 Python 中使用訊號處理程式的機制。

一般規則

signal.signal() 函式允許定義自訂的處理程式,會在收到訊號時執行。我們安裝了少數的預設處理程式:SIGPIPE 會被忽略 (所以管道和 socket 上的寫入錯誤可以當作一般的 Python 例外報告),而 SIGINT(如果父行程沒有改變它的話)會被轉換成 KeyboardInterrupt 例外。

特定訊號的處理程式一旦被設定,就會一直被安裝,直到被明確地重設為止 (不管底層的實作為何,Python 皆模擬出 BSD 風格的介面),但 SIGCHLD 的處理程式除外,它會跟隨底層的實作。

在 WebAssembly 平台上,訊號是模擬出來的,故行為不同。有幾個函式和訊號在這些平台上是不可用的。

Python 訊號處理程式的執行

Python 訊號處理程式不會在低階(C 語言)訊號處理程式中執行。相反地,低階訊號處理程式會設定一個旗標,告訴虛擬機在稍後執行相對應的 Python 訊號處理程式(例如在下一個 bytecode 指令)。這會有一些後果:

  • 捕捉像 SIGFPESIGSEGV 這類由 C 程式碼中無效操作所引起的同步錯誤是沒有意義的。Python 將從訊號處理程式中回傳到 C 程式碼,而 C 程式碼很可能再次引發相同的訊號,導致 Python 明顯假當機 (hang)。從 Python 3.3 開始,你可以使用 faulthandler 模組來報告同步錯誤。

  • 純粹以 C 實作的長時間計算(例如在大量文字上的正規表示式比對)可能會不間斷地運行任意長度的時間而不考慮收到的任何訊號。當計算完成時,Python 訊號處理程式會被呼叫。

  • 如果處理程式引發例外,就會在主執行緒中「憑空」產生例外。請參閱下面的說明

訊號和執行緒

Python 訊號處理程式總是在主直譯器的主 Python 執行緒中執行,即使訊號是在另一個執行緒中接收到的。這意味著訊號不能用來做為執行緒間通訊的方式。你可以使用 threading 模組的同步原語 (synchronization primitive) 來代替。

此外,只有主直譯器的主執行緒才被允許設定新的訊號處理程式。

模組內容

在 3.5 版的變更: 下面列出的訊號 (SIG*)、處理器(SIG_DFLSIG_IGN)和訊號遮罩 (sigmask)(SIG_BLOCKSIG_UNBLOCKSIG_SETMASK)的相關常數被轉換成 enumsSignalsHandlersSigmasks)。getsignal()pthread_sigmask()sigpending()sigwait() 函式會回傳可被人類閱讀的枚舉作為 Signals 物件。

訊號模組定義了三個枚舉:

class signal.Signals

SIG* 常數和 CTRL_* 常數的 enum.IntEnum 集合。

在 3.5 版被加入.

class signal.Handlers

SIG_DFLSIG_IGN 常數的 enum.IntEnum 集合。

在 3.5 版被加入.

class signal.Sigmasks

SIG_BLOCKSIG_UNBLOCKSIG_SETMASK 常數的 enum.IntEnum 集合。

Availability: Unix.

更多資訊請見 sigprocmask(2)pthread_sigmask(3) 線上手冊。

在 3.5 版被加入.

signal 模組中定義的變數有:

signal.SIG_DFL

這是兩種標準訊號處理選項之一;它會簡單地執行訊號的預設功能。例如,在大多數系統上,SIGQUIT 的預設動作是轉儲 (dump) 核心並退出,而 SIGCHLD 的預設動作是直接忽略。

signal.SIG_IGN

這是另一個標準的訊號處理程式,會直接忽略給定的訊號。

signal.SIGABRT

來自 abort(3) 的中止訊號。

signal.SIGALRM

來自 alarm(2) 的計時器訊號。

Availability: Unix.

signal.SIGBREAK

從鍵盤中斷 (CTRL + BREAK)。

Availability: Windows.

signal.SIGBUS

匯流排錯誤(記憶體存取不良)。

Availability: Unix.

signal.SIGCHLD

子行程停止或終止。

Availability: Unix.

signal.SIGCLD

SIGCHLD 的別名。

Availability: not macOS.

signal.SIGCONT

如果目前行程是被停止的,則繼續運行

Availability: Unix.

signal.SIGFPE

浮點運算例外。例如除以零。

也參考

ZeroDivisionError 會在除法或模運算 (modulo operation) 的第二個引數為零時引發。

signal.SIGHUP

偵測到控制終端機掛斷 (hangup) 或控制行程死亡。

Availability: Unix.

signal.SIGILL

非法指令。

signal.SIGINT

從鍵盤中斷 (CTRL + C)。

預設動作是引發 KeyboardInterrupt

signal.SIGKILL

殺死訊號。

它無法被捕捉、阻擋或忽略。

Availability: Unix.

signal.SIGPIPE

管道中斷 (broken pipe):寫到沒有讀取器 (reader) 的管道。

預設動作是忽略訊號。

Availability: Unix.

signal.SIGSEGV

記憶體區段錯誤 (segmentation fault):無效記憶體參照。

signal.SIGSTKFLT

輔助處理器 (coprocessor) 上的堆疊錯誤 (stack fault)。Linux 核心不會引發此訊號:它只能在使用者空間 (user space) 中引發。

Availability: Linux.

在訊號可用的架構上。請參閱 signal(7) 線上手冊以取得更多資訊。

在 3.11 版被加入.

signal.SIGTERM

終止訊號。

signal.SIGUSR1

使用者定義訊號 1。

Availability: Unix.

signal.SIGUSR2

使用者定義訊號 2。

Availability: Unix.

signal.SIGWINCH

視窗調整大小訊號。

Availability: Unix.

SIG*

所有的訊號編號都是以符號定義的。例如,掛斷訊號被定義為 signal.SIGHUP;變數名稱與 C 程式中使用的名稱相同,可在 <signal.h> 中找到。Unix 線上手冊 'signal()' 列出了現有的訊號(在某些系統上是 signal(2),在其他系統上是在 signal(7) 中)。請注意,並非所有系統都會定義同一套訊號名稱;只有那些由系統所定義的名稱才會由這個模組定義。

signal.CTRL_C_EVENT

Ctrl+C 擊鍵 (keystroke) 事件相對應的訊號。此訊號只能與 os.kill() 搭配使用。

Availability: Windows.

在 3.2 版被加入.

signal.CTRL_BREAK_EVENT

Ctrl+Break 擊鍵事件相對應的訊號。此訊號只能與 os.kill() 搭配使用。

Availability: Windows.

在 3.2 版被加入.

signal.NSIG

比最高編號訊號的編號多一。使用 valid_signals() 來取得有效的訊號編號。

signal.ITIMER_REAL

即時減少間隔計時器 (interval timer),並在到期時送出 SIGALRM

signal.ITIMER_VIRTUAL

僅在執行行程時遞減間隔計時器,並在到期時傳送 SIGVTALRM。

signal.ITIMER_PROF

當行程執行或系統代表行程執行時,都會減少間隔計時器。與 ITIMER_VIRTUAL 配合,這個計時器通常用來分析 (profile) 應用程式在使用者空間與核心空間 (kernel space) 所花費的時間。SIGPROF 會在到期時傳送。

signal.SIG_BLOCK

pthread_sigmask()how 參數的可能值,表示訊號將被阻檔。

在 3.3 版被加入.

signal.SIG_UNBLOCK

pthread_sigmask()how 參數的可能值,表示訊號將被解除阻檔。

在 3.3 版被加入.

signal.SIG_SETMASK

pthread_sigmask()how 參數的可能值,表示訊號遮罩 (signal mask) 要被取代。

在 3.3 版被加入.

signal 模組定義了一個例外:

exception signal.ItimerError

setitimer()getitimer() 底層實作發生錯誤時引發訊號。如果傳給 setitimer() 的是無效的間隔計時器或負數時間,則預期會發生此錯誤。這個錯誤是 OSError 的子型別。

在 3.3 版被加入: 此錯誤過去是 IOError 的子型別,現在是 OSError 的別名。

signal 模組定義了下列函式:

signal.alarm(time)

如果 time 非零,則此函式會要求在 time 秒後傳送 SIGALRM 訊號給該行程。任何先前排程 (scheduled) 的警報都會被取消(任何時候都只能排程一個警報)。回傳值是先前設定的警報原本再等多久就會被傳送的秒數。如果 time 為零,則不會去排程任何警報,且已排程的警報會被取消。如果回傳值為零,則代表目前未排程任何警報。

Availability: Unix.

更多資訊請見 alarm(2) 線上手冊。

signal.getsignal(signalnum)

回傳訊號 signalnum 的目前訊號處理程式。回傳值可以是一個可呼叫的 Python 物件,或是特殊值 signal.SIG_IGNsignal.SIG_DFLNone 之一。這裡的 signal.SIG_IGN 表示訊號先前被忽略,signal.SIG_DFL 表示訊號先前使用預設的處理方式,而 None 表示先前的訊號處理程式並未從 Python 安裝。

signal.strsignal(signalnum)

回傳訊號 signalnum 的描述,例如 SIGINT 的 "Interrupt"。如果 signalnum 沒有描述,則回傳 None。如果 signalnum 無效則會引發 ValueError

在 3.8 版被加入.

signal.valid_signals()

回傳此平台上的有效訊號編號集合。如果某些訊號被系統保留作為內部使用,則此值可能小於 range(1, NSIG)

在 3.8 版被加入.

signal.pause()

使行程休眠,直到接收到訊號;然後呼叫適當的處理程式。不會回傳任何東西。

Availability: Unix.

更多資訊請見 signal(2) 線上手冊。

也請見 sigwait()sigwaitinfo()sigtimedwait()sigpending()

signal.raise_signal(signum)

傳送訊號至呼叫的行程。不會回傳任何東西。

在 3.8 版被加入.

signal.pidfd_send_signal(pidfd, sig, siginfo=None, flags=0)

傳送訊號 sig 到檔案描述器 pidfd 所指的行程。Python 目前不支援 siginfo 參數;它必須是 Noneflags 引數是提供給未來的擴充;目前沒有定義旗標值。

更多資訊請見 pidfd_send_signal(2) 線上手冊。

Availability: Linux >= 5.1, Android >= build-time API level 31

在 3.9 版被加入.

signal.pthread_kill(thread_id, signalnum)

將訊號 signalnum 傳送給與呼叫者在同一行程中的另一個執行緒 thread_id。目標執行緒能執行任何程式碼(無論為 Python 與否)。但是,如果目標執行緒正執行 Python 直譯器,Python 訊號處理程式將會由主直譯器的主執行緒來執行。因此,向特定 Python 執行緒發送訊號的唯一意義是強制執行中的系統呼叫以 InterruptedError 方式失敗。

使用 threading.get_ident()threading.Thread 物件的 ident 屬性來取得 thread_id 的適當值。

如果 signalnum 為 0,則不會傳送訊號,但仍會執行錯誤檢查;這可用來檢查目標執行緒是否仍在執行。

引發一個附帶引數 thread_idsignalnum稽核事件 signal.pthread_kill

Availability: Unix.

更多資訊請見 pthread_kill(3) 線上手冊。

另請參閱 os.kill()

在 3.3 版被加入.

signal.pthread_sigmask(how, mask)

擷取和/或變更呼叫執行緒的訊號遮罩。訊號遮罩是目前阻擋呼叫者傳送的訊號集合。將舊的訊號遮罩作為一組訊號集合回傳。

呼叫的行為取決於 how 的值,如下所示。

  • SIG_BLOCK:被阻檔的訊號集合是目前訊號集合與 mask 引數的聯集。

  • SIG_UNBLOCK:將 mask 中的訊號從目前阻檔的訊號集合中移除。嘗試為未被阻檔的訊號解除阻檔是允許的。

  • SIG_SETMASK:將被阻檔的訊號集合設定為 mask 引數。

mask 是一組訊號編號(例如 {signal.SIGINT, signal.SIGTERM})的集合。使用 valid_signals() 來取得包含所有訊號的完整遮罩。

例如,signal.pthread_sigmask(signal.SIG_BLOCK, []) 會讀取呼叫執行緒的訊號遮罩。

SIGKILLSIGSTOP 不能被阻檔。

Availability: Unix.

更多資訊請見 sigprocmask(2)pthread_sigmask(3) 線上手冊。

另請參閱 pause()sigpending()sigwait()

在 3.3 版被加入.

signal.setitimer(which, seconds, interval=0.0)

Sets given interval timer (one of signal.ITIMER_REAL, signal.ITIMER_VIRTUAL or signal.ITIMER_PROF) specified by which to fire after seconds (float is accepted, different from alarm()) and after that every interval seconds (if interval is non-zero). The interval timer specified by which can be cleared by setting seconds to zero.

When an interval timer fires, a signal is sent to the process. The signal sent is dependent on the timer being used; signal.ITIMER_REAL will deliver SIGALRM, signal.ITIMER_VIRTUAL sends SIGVTALRM, and signal.ITIMER_PROF will deliver SIGPROF.

The old values are returned as a tuple: (delay, interval).

Attempting to pass an invalid interval timer will cause an ItimerError.

Availability: Unix.

signal.getitimer(which)

Returns current value of a given interval timer specified by which.

Availability: Unix.

signal.set_wakeup_fd(fd, *, warn_on_full_buffer=True)

Set the wakeup file descriptor to fd. When a signal is received, the signal number is written as a single byte into the fd. This can be used by a library to wakeup a poll or select call, allowing the signal to be fully processed.

The old wakeup fd is returned (or -1 if file descriptor wakeup was not enabled). If fd is -1, file descriptor wakeup is disabled. If not -1, fd must be non-blocking. It is up to the library to remove any bytes from fd before calling poll or select again.

When threads are enabled, this function can only be called from the main thread of the main interpreter; attempting to call it from other threads will cause a ValueError exception to be raised.

There are two common ways to use this function. In both approaches, you use the fd to wake up when a signal arrives, but then they differ in how they determine which signal or signals have arrived.

In the first approach, we read the data out of the fd's buffer, and the byte values give you the signal numbers. This is simple, but in rare cases it can run into a problem: generally the fd will have a limited amount of buffer space, and if too many signals arrive too quickly, then the buffer may become full, and some signals may be lost. If you use this approach, then you should set warn_on_full_buffer=True, which will at least cause a warning to be printed to stderr when signals are lost.

In the second approach, we use the wakeup fd only for wakeups, and ignore the actual byte values. In this case, all we care about is whether the fd's buffer is empty or non-empty; a full buffer doesn't indicate a problem at all. If you use this approach, then you should set warn_on_full_buffer=False, so that your users are not confused by spurious warning messages.

在 3.5 版的變更: On Windows, the function now also supports socket handles.

在 3.7 版的變更: 新增 warn_on_full_buffer 參數。

signal.siginterrupt(signalnum, flag)

Change system call restart behaviour: if flag is False, system calls will be restarted when interrupted by signal signalnum, otherwise system calls will be interrupted. Returns nothing.

Availability: Unix.

更多資訊請見 siginterrupt(3) 手冊頁。

Note that installing a signal handler with signal() will reset the restart behaviour to interruptible by implicitly calling siginterrupt() with a true flag value for the given signal.

signal.signal(signalnum, handler)

Set the handler for signal signalnum to the function handler. handler can be a callable Python object taking two arguments (see below), or one of the special values signal.SIG_IGN or signal.SIG_DFL. The previous signal handler will be returned (see the description of getsignal() above). (See the Unix man page signal(2) for further information.)

When threads are enabled, this function can only be called from the main thread of the main interpreter; attempting to call it from other threads will cause a ValueError exception to be raised.

The handler is called with two arguments: the signal number and the current stack frame (None or a frame object; for a description of frame objects, see the description in the type hierarchy or see the attribute descriptions in the inspect module).

On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK. A ValueError will be raised in any other case. Note that not all systems define the same set of signal names; an AttributeError will be raised if a signal name is not defined as SIG* module level constant.

signal.sigpending()

Examine the set of signals that are pending for delivery to the calling thread (i.e., the signals which have been raised while blocked). Return the set of the pending signals.

Availability: Unix.

更多資訊請見 sigpending(2) 手冊頁。

另請參閱 pause()pthread_sigmask()sigwait()

在 3.3 版被加入.

signal.sigwait(sigset)

Suspend execution of the calling thread until the delivery of one of the signals specified in the signal set sigset. The function accepts the signal (removes it from the pending list of signals), and returns the signal number.

Availability: Unix.

更多資訊請見 sigwait(3) 手冊頁。

See also pause(), pthread_sigmask(), sigpending(), sigwaitinfo() and sigtimedwait().

在 3.3 版被加入.

signal.sigwaitinfo(sigset)

Suspend execution of the calling thread until the delivery of one of the signals specified in the signal set sigset. The function accepts the signal and removes it from the pending list of signals. If one of the signals in sigset is already pending for the calling thread, the function will return immediately with information about that signal. The signal handler is not called for the delivered signal. The function raises an InterruptedError if it is interrupted by a signal that is not in sigset.

The return value is an object representing the data contained in the siginfo_t structure, namely: si_signo, si_code, si_errno, si_pid, si_uid, si_status, si_band.

Availability: Unix.

更多資訊請見 sigwaitinfo(2) 手冊頁。

另請參閱 pause()sigwait()sigtimedwait()

在 3.3 版被加入.

在 3.5 版的變更: The function is now retried if interrupted by a signal not in sigset and the signal handler does not raise an exception (see PEP 475 for the rationale).

signal.sigtimedwait(sigset, timeout)

Like sigwaitinfo(), but takes an additional timeout argument specifying a timeout. If timeout is specified as 0, a poll is performed. Returns None if a timeout occurs.

Availability: Unix.

更多資訊請見 sigtimedwait(2) 手冊頁。

另請參閱 pause()sigwait()sigwaitinfo()

在 3.3 版被加入.

在 3.5 版的變更: The function is now retried with the recomputed timeout if interrupted by a signal not in sigset and the signal handler does not raise an exception (see PEP 475 for the rationale).

範例

Here is a minimal example program. It uses the alarm() function to limit the time spent waiting to open a file; this is useful if the file is for a serial device that may not be turned on, which would normally cause the os.open() to hang indefinitely. The solution is to set a 5-second alarm before opening the file; if the operation takes too long, the alarm signal will be sent, and the handler raises an exception.

import signal, os

def handler(signum, frame):
    signame = signal.Signals(signum).name
    print(f'Signal handler called with signal {signame} ({signum})')
    raise OSError("Couldn't open device!")

# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGALRM, handler)
signal.alarm(5)

# This open() may hang indefinitely
fd = os.open('/dev/ttyS0', os.O_RDWR)

signal.alarm(0)          # Disable the alarm

Note on SIGPIPE

Piping output of your program to tools like head(1) will cause a SIGPIPE signal to be sent to your process when the receiver of its standard output closes early. This results in an exception like BrokenPipeError: [Errno 32] Broken pipe. To handle this case, wrap your entry point to catch this exception as follows:

import os
import sys

def main():
    try:
        # simulate large output (your code replaces this loop)
        for x in range(10000):
            print("y")
        # flush output here to force SIGPIPE to be triggered
        # while inside this try block.
        sys.stdout.flush()
    except BrokenPipeError:
        # Python flushes standard streams on exit; redirect remaining output
        # to devnull to avoid another BrokenPipeError at shutdown
        devnull = os.open(os.devnull, os.O_WRONLY)
        os.dup2(devnull, sys.stdout.fileno())
        sys.exit(1)  # Python exits with error code 1 on EPIPE

if __name__ == '__main__':
    main()

Do not set SIGPIPE's disposition to SIG_DFL in order to avoid BrokenPipeError. Doing that would cause your program to exit unexpectedly whenever any socket connection is interrupted while your program is still writing to it.

Note on Signal Handlers and Exceptions

If a signal handler raises an exception, the exception will be propagated to the main thread and may be raised after any bytecode instruction. Most notably, a KeyboardInterrupt may appear at any point during execution. Most Python code, including the standard library, cannot be made robust against this, and so a KeyboardInterrupt (or any other exception resulting from a signal handler) may on rare occasions put the program in an unexpected state.

To illustrate this issue, consider the following code:

class SpamContext:
    def __init__(self):
        self.lock = threading.Lock()

    def __enter__(self):
        # If KeyboardInterrupt occurs here, everything is fine
        self.lock.acquire()
        # If KeyboardInterrupt occurs here, __exit__ will not be called
        ...
        # KeyboardInterrupt could occur just before the function returns

    def __exit__(self, exc_type, exc_val, exc_tb):
        ...
        self.lock.release()

For many programs, especially those that merely want to exit on KeyboardInterrupt, this is not a problem, but applications that are complex or require high reliability should avoid raising exceptions from signal handlers. They should also avoid catching KeyboardInterrupt as a means of gracefully shutting down. Instead, they should install their own SIGINT handler. Below is an example of an HTTP server that avoids KeyboardInterrupt:

import signal
import socket
from selectors import DefaultSelector, EVENT_READ
from http.server import HTTPServer, SimpleHTTPRequestHandler

interrupt_read, interrupt_write = socket.socketpair()

def handler(signum, frame):
    print('Signal handler called with signal', signum)
    interrupt_write.send(b'\0')
signal.signal(signal.SIGINT, handler)

def serve_forever(httpd):
    sel = DefaultSelector()
    sel.register(interrupt_read, EVENT_READ)
    sel.register(httpd, EVENT_READ)

    while True:
        for key, _ in sel.select():
            if key.fileobj == interrupt_read:
                interrupt_read.recv(1)
                return
            if key.fileobj == httpd:
                httpd.handle_request()

print("Serving on port 8000")
httpd = HTTPServer(('', 8000), SimpleHTTPRequestHandler)
serve_forever(httpd)
print("Shutdown...")