慢系统调用引起的signal无效问题

前言:

前两天一同事说 遇到一个比较奇妙的问题,说是线程下信号居然无效…    哈哈,我一听注册的信号无效,我内心就知道是怎么个一回事了。 后来经过我的快速排查,也证实了我的想法,确实由于慢系统调用引起signal无效。  在python里,这类场景常见于线程做join阻塞等待的时候。


该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。  http://xiaorui.cc/?p=4694


通常我们在使在一些动态语言里,怎么用线程? 拿python来说,一般来说是把线程对象放到一个容器结构里,然后join。 join的逻辑又是什么? 就是拿线程锁,一个线程如果是存活的,他是不会释放锁的。 别搞混了,这个线程锁,不是pyhton gil解释器锁。

thread join 是 拿锁,等待拿锁这个操作属于慢系统调用….

如果main thread主线程执行一个慢系统调用,那么signal是不能被执行的,虽然signal在底层是可以接收,信号本身也是异步的,他要被执行注册的信号处理函数,必须要调入函数调用栈,那要进入谁的调用栈?  是 main thread的… 但由于main thread 处在慢系统调用,无法被中断… 所以,无效… 

慢速系统调用 vs 快速系统调用

快速系统调用,被信号中断后,会自动重启,应用层无感知.

慢速系统调用会出错返回(-1),errno=EINTR。  像 read, recv,锁 都属于慢系统调用操作的。


当你触发kill信号时,会发现这么一个情况…  下面是strace的追踪结果.

futex(0x207b630, FUTEX_WAIT_PRIVATE, 0, NULL) = ? ERESTARTSYS (To be restarted)
--- SIGINT (Interrupt) @ 0 (0) ---
rt_sigreturn(0xffffffff)                = -1 EINTR (Interrupted system call)
futex(0x207b630, FUTEX_WAIT_PRIVATE, 0, NULL

注意几个点:

只有main_thread才可以接收信号, 其他线程接收信号不做传递,引起crash。

python time.sleep是基于select timeout实现的,select属于快系统调用。

SA_RESTART的重启也是需要main_thread释放慢系统调用.


怎么处理被信号中断的系统调用?


在执行一个系统调用时可能会被信号中断,继而去执行 信号注册的处理函数,系统调用被信号中断后,如何跳转到上次的系统调用? 

有两种方法可以恢复被中断的系统调用:

一个是 手动循环处理

另一个是 自动 SA_RESTART 处理。

当碰到EINTR错误的时候,有一些可以重启的系统调用要进行重启,而对于有一些系统调用是不能够重启的。例如:accept、read、write、select、和open之类的函数来说,是可以进行重启的。不过对于套接字编程中的connect函数我们是不能重启的,若connect函数返回一个EINTR错误的时候,我们不能再次调用它,否则将立即返回一个错误。针对connect不能重启的处理方法是,必须调用select来等待连接完成。

确切说,什么是重启 !!!

一些IO系统调用执行时,如 read 等待输入期间,如果收到一个信号,系统将中断read, 转而执行信号处理函数. 当信号处理返回后, 系统遇到了一个问题: 是重新开始这个系统调用, 还是让系统调用失败?早期UNIX系统的做法是, 中断系统调用,并让系统调用失败, 比如read返回 -1, 同时设置 errno 为EINTR中断了的系统调用是没有完成的调用,它的失败是临时性的,如果再次调用则可能成功,这并不是真正的失败,所以要对这种情况进行处理.

所以,我们在开发socket服务时,针对EINTR的操作进行手动重试,比如下面的伪代码:

while ((r = read (fd, buf, len)) < 0 && errno == EINTR) :
        do_something
        


在python里已经抽象了EINTR错误的异常类型,伪代码如下:

while True:
    try:
        data = file.read(size)
        break
    except InterruptedError:
        continue

还有一个自动处理方法,就是Signal 的 SO_RESTART 参数,他是自动修复被中断的系统调用。 当然了,有一些系统调用不能重启。

In [1]: import signal

In [2]: signal.siginterrupt?
Docstring:
siginterrupt(sig, flag) -> None
change system call restart behaviour: if flag is False, system calls
will be restarted when interrupted by signal sig, else system calls
will be interrupted.


对于复杂的服务,建议使用手动处理信号中断重启。 


如何解决慢系统调用信号处理无效的问题 ?

解决慢系统调用无法跳转signal函数的方法有这么几种:


第一个,在signal里配置sa_flags 的SA_RESTART 选项, 有些系统调用无效, 比如 connect.

第二个,加入逻辑超时判断 (也是基于第三种)

第三个,尽量避免在main_thread执行慢系统调用.


我这里三种方法都有代码的说明,但在threading join的场景下,适用于第二种和第三种的方法。

# xiaorui.cc

import threading, signal, time, os

RUNNING = True
threads = []

def monitoring(tid, itemId=None, threshold=None):
    global RUNNING
    while(RUNNING):
        print ("PID=", os.getpid(), ";id=", tid)
        time.sleep(2)
    print ("Thread stopped:", tid)

def handler(signum, frame):
    print ("Signal is received:" + str(signum))
    global RUNNING
    RUNNING=False
    #global threads

if __name__ == '__main__':
    signal.signal(signal.SIGUSR1, handler)
    signal.signal(signal.SIGUSR2, handler)
    signal.signal(signal.SIGALRM, handler)
    signal.signal(signal.SIGINT, handler)
    signal.signal(signal.SIGQUIT, handler)
    #signal.siginterrupt(signal.SIGINT, False)  # 第一种

    print ("Starting all threads...")
    thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60})
    thread1.start()
    threads.append(thread1)

    thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60})
    thread2.start()
    threads.append(thread2)
    # 第三种方法
    #while(RUNNING):
    #    print "Main program is sleeping."
    #    time.sleep(30)

    # 第二种
    c = set()
    while 1:
        if len(c) == len(threads):
            break

        for thread in threads:
            if thread.is_alive():
                thread.join(timeout=1)
            else:
                c.add(thread)

    print ("All threads stopped.")

下面是正常的strace信息:

# xiaorui.cc
rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0
rt_sigaction(SIGQUIT, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0
rt_sigaction(SIGUSR1, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0
rt_sigaction(SIGUSR2, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0
rt_sigaction(SIGALRM, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0exit_group(0)                           = ?


思路很简单,就在join里加入超时,让main thread执行非慢系统调用方法。

这里简单说提下 threading join的实现… 如果加入超时选项,那么在一个超时的时间区间内,他会while sleep 500微妙的速度,不断的去尝试拿锁.  先前我们有说过,python sleep是通过select实现的,select不是慢系统调用。 非阻塞拿锁是通过trylock实现。

# xiaorui.cc
def join(self, timeout=None):
        if not self.__initialized:
            raise RuntimeError("Thread.__init__() not called")
        if not self.__started.is_set():
            raise RuntimeError("cannot join thread before it is started")
        if self is current_thread():
            raise RuntimeError("cannot join current thread")

        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
        self.__block.acquire()
        try:
            if timeout is None:
                while not self.__stopped:
                    self.__block.wait()
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
            else:
                deadline = _time() + timeout
                while not self.__stopped:
                    delay = deadline - _time()
                    if delay <= 0:
                        if __debug__:
                            self._note("%s.join(): timed out", self)
                        break
                    self.__block.wait(delay)
                else:
                    if __debug__:
                        self._note("%s.join(): thread stopped", self)
        finally:
            self.__block.release()

def wait(self)...
     delay = 0.0005 # 500 us -> initial delay of 1 ms
                while True:
                    gotit = waiter.acquire(0)
                    if gotit:
                        break
                    remaining = endtime - _time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, .05)


总结,不要在main_thread里做一些慢系统调用。 如果不可避免使用慢系统系统,请加入SA_RESTART重启,能避免一些异常的问题。  

END….


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc