Source code for signalfd
"""
Simple wrapper around the signalfd syscall.
"""
import errno
import os
import signal
from ._signalfd import ffi as _ffi
from ._signalfd import lib as _lib
__version__ = "0.4.0"
SFD_CLOEXEC = _lib.SFD_CLOEXEC
SFD_NONBLOCK = _lib.SFD_NONBLOCK
SIG_BLOCK = _lib.SIG_BLOCK
SIG_UNBLOCK = _lib.SIG_UNBLOCK
SIG_SETMASK = _lib.SIG_SETMASK
SIGINFO_SIZE = _ffi.sizeof('struct signalfd_siginfo')
ALL_SIGNALS = sorted(getattr(signal, s) for s in dir(signal) if s.startswith('SIG') and '_' not in s)
[docs]class UnknownError(Exception):
pass
def signalfd(fd, signals, flags):
if hasattr(fd, 'fileno'):
fd = fd.fileno()
if not isinstance(fd, int):
raise TypeError('fd: must be an integer.')
if not isinstance(flags, int):
raise TypeError('flags: must be an integer.')
sigmask = _ffi.new('sigset_t[1]')
for sig in signals:
_lib.sigaddset(sigmask, sig)
result = _lib.signalfd(fd, sigmask, flags)
if result < 0:
err = _ffi.errno
if err == errno.EBADF:
raise ValueError("fd: not a valid file descriptor.")
elif err == errno.EINVAL:
if flags & (0xffffffff ^ (SFD_CLOEXEC | SFD_NONBLOCK)):
raise ValueError("flags: mask contains invalid values.")
else:
raise ValueError("fd: not a signalfd.")
elif err == errno.EMFILE:
raise OSError("max system fd limit reached.")
elif err == errno.ENFILE:
raise OSError("max system fd limit reached.")
elif err == errno.ENODEV:
raise OSError("could not mount (internal) anonymous inode device.")
elif err == errno.ENOMEM:
raise MemoryError("insufficient kernel memory available.")
else:
raise UnknownError(err)
return result
def sigprocmask(flags, signals):
if not isinstance(flags, int):
raise TypeError('flags: must be an integer.')
new_sigmask = _ffi.new('sigset_t[1]')
old_sigmask = _ffi.new('sigset_t[1]')
_lib.sigemptyset(new_sigmask)
for sig in signals:
_lib.sigaddset(new_sigmask, sig)
err = _lib.pthread_sigmask(flags, new_sigmask, old_sigmask)
if err:
if err == errno.EINVAL:
raise ValueError("flags: invalid value (not one of SIG_BLOCK, SIG_UNBLOCK or SIG_SETMASK)")
elif err == errno.EFAULT:
raise ValueError("sigmask is not a valid sigset_t")
else:
raise UnknownError(err)
return [s for s in ALL_SIGNALS if _lib.sigismember(old_sigmask, s)]
def read_siginfo(fh):
info = _ffi.new('struct signalfd_siginfo *')
buffer = _ffi.buffer(info)
if hasattr(fh, 'readinto'):
if not fh.readinto(buffer):
raise IOError(errno.EAGAIN, "not enough bytes available")
else:
buffer[:] = os.read(fh, SIGINFO_SIZE)
return info