前言:
前两天一同事说 遇到一个比较奇妙的问题,说是线程下信号居然无效… 哈哈,我一听注册的信号无效,我内心就知道是怎么个一回事了。 后来经过我的快速排查,也证实了我的想法,确实由于慢系统调用引起signal无效。 在python里,这类场景常见于线程做join阻塞等待的时候。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新。 http://xiaorui.cc/?p=4694
通常我们在使在一些动态语言里,怎么用线程? 拿python来说,一般来说是把线程对象放到一个容器结构里,然后join。 join的逻辑又是什么? 就是拿线程锁,一个线程如果是存活的,他是不会释放锁的。 别搞混了,这个线程锁,不是pyhton gil解释器锁。
thread join 是 拿锁,等待拿锁这个操作属于慢系统调用….
如果main thread主线程执行一个慢系统调用,那么signal是不能被执行的,虽然signal在底层是可以接收,信号本身也是异步的,他要被执行注册的信号处理函数,必须要调入函数调用栈,那要进入谁的调用栈? 是 main thread的… 但由于main thread 处在慢系统调用,无法被中断… 所以,无效…
慢速系统调用 vs 快速系统调用
快速系统调用,被信号中断后,会自动重启,应用层无感知.
慢速系统调用会出错返回(-1),errno=EINTR。 像 read, recv,锁 都属于慢系统调用操作的。
当你触发kill信号时,会发现这么一个情况… 下面是strace的追踪结果.
futex(0x207b630, FUTEX_WAIT_PRIVATE, 0, NULL) = ? ERESTARTSYS (To be restarted) --- SIGINT (Interrupt) @ 0 (0) --- rt_sigreturn(0xffffffff) = -1 EINTR (Interrupted system call) futex(0x207b630, FUTEX_WAIT_PRIVATE, 0, NULL
注意几个点:
只有main_thread才可以接收信号, 其他线程接收信号不做传递,引起crash。
python time.sleep是基于select timeout实现的,select属于快系统调用。
SA_RESTART的重启也是需要main_thread释放慢系统调用.
怎么处理被信号中断的系统调用?
在执行一个系统调用时可能会被信号中断,继而去执行 信号注册的处理函数,系统调用被信号中断后,如何跳转到上次的系统调用?
有两种方法可以恢复被中断的系统调用:
一个是 手动循环处理
另一个是 自动 SA_RESTART 处理。
当碰到EINTR错误的时候,有一些可以重启的系统调用要进行重启,而对于有一些系统调用是不能够重启的。例如:accept、read、write、select、和open之类的函数来说,是可以进行重启的。不过对于套接字编程中的connect函数我们是不能重启的,若connect函数返回一个EINTR错误的时候,我们不能再次调用它,否则将立即返回一个错误。针对connect不能重启的处理方法是,必须调用select来等待连接完成。
确切说,什么是重启 !!!
一些IO系统调用执行时,如 read 等待输入期间,如果收到一个信号,系统将中断read, 转而执行信号处理函数. 当信号处理返回后, 系统遇到了一个问题: 是重新开始这个系统调用, 还是让系统调用失败?早期UNIX系统的做法是, 中断系统调用,并让系统调用失败, 比如read返回 -1, 同时设置 errno 为EINTR中断了的系统调用是没有完成的调用,它的失败是临时性的,如果再次调用则可能成功,这并不是真正的失败,所以要对这种情况进行处理.
所以,我们在开发socket服务时,针对EINTR的操作进行手动重试,比如下面的伪代码:
while ((r = read (fd, buf, len)) < 0 && errno == EINTR) : do_something
在python里已经抽象了EINTR错误的异常类型,伪代码如下:
while True: try: data = file.read(size) break except InterruptedError: continue
还有一个自动处理方法,就是Signal 的 SO_RESTART 参数,他是自动修复被中断的系统调用。 当然了,有一些系统调用不能重启。
In [1]: import signal In [2]: signal.siginterrupt? Docstring: siginterrupt(sig, flag) -> None change system call restart behaviour: if flag is False, system calls will be restarted when interrupted by signal sig, else system calls will be interrupted.
对于复杂的服务,建议使用手动处理信号中断重启。
如何解决慢系统调用信号处理无效的问题 ?
解决慢系统调用无法跳转signal函数的方法有这么几种:
第一个,在signal里配置sa_flags 的SA_RESTART 选项, 有些系统调用无效, 比如 connect.
第二个,加入逻辑超时判断 (也是基于第三种)
第三个,尽量避免在main_thread执行慢系统调用.
我这里三种方法都有代码的说明,但在threading join的场景下,适用于第二种和第三种的方法。
# xiaorui.cc import threading, signal, time, os RUNNING = True threads = [] def monitoring(tid, itemId=None, threshold=None): global RUNNING while(RUNNING): print ("PID=", os.getpid(), ";id=", tid) time.sleep(2) print ("Thread stopped:", tid) def handler(signum, frame): print ("Signal is received:" + str(signum)) global RUNNING RUNNING=False #global threads if __name__ == '__main__': signal.signal(signal.SIGUSR1, handler) signal.signal(signal.SIGUSR2, handler) signal.signal(signal.SIGALRM, handler) signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGQUIT, handler) #signal.siginterrupt(signal.SIGINT, False) # 第一种 print ("Starting all threads...") thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60}) thread1.start() threads.append(thread1) thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60}) thread2.start() threads.append(thread2) # 第三种方法 #while(RUNNING): # print "Main program is sleeping." # time.sleep(30) # 第二种 c = set() while 1: if len(c) == len(threads): break for thread in threads: if thread.is_alive(): thread.join(timeout=1) else: c.add(thread) print ("All threads stopped.")
下面是正常的strace信息:
# xiaorui.cc rt_sigaction(SIGINT, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0 rt_sigaction(SIGQUIT, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0 rt_sigaction(SIGUSR1, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0 rt_sigaction(SIGUSR2, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0 rt_sigaction(SIGALRM, {SIG_DFL, [], SA_RESTORER, 0x3df860f710}, {0x4e4a30, [], SA_RESTORER, 0x3df860f710}, 8) = 0exit_group(0) = ?
思路很简单,就在join里加入超时,让main thread执行非慢系统调用方法。
这里简单说提下 threading join的实现… 如果加入超时选项,那么在一个超时的时间区间内,他会while sleep 500微妙的速度,不断的去尝试拿锁. 先前我们有说过,python sleep是通过select实现的,select不是慢系统调用。 非阻塞拿锁是通过trylock实现。
# xiaorui.cc def join(self, timeout=None): if not self.__initialized: raise RuntimeError("Thread.__init__() not called") if not self.__started.is_set(): raise RuntimeError("cannot join thread before it is started") if self is current_thread(): raise RuntimeError("cannot join current thread") if __debug__: if not self.__stopped: self._note("%s.join(): waiting until thread stops", self) self.__block.acquire() try: if timeout is None: while not self.__stopped: self.__block.wait() if __debug__: self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout while not self.__stopped: delay = deadline - _time() if delay <= 0: if __debug__: self._note("%s.join(): timed out", self) break self.__block.wait(delay) else: if __debug__: self._note("%s.join(): thread stopped", self) finally: self.__block.release() def wait(self)... delay = 0.0005 # 500 us -> initial delay of 1 ms while True: gotit = waiter.acquire(0) if gotit: break remaining = endtime - _time() if remaining <= 0: break delay = min(delay * 2, remaining, .05)
总结,不要在main_thread里做一些慢系统调用。 如果不可避免使用慢系统系统,请加入SA_RESTART重启,能避免一些异常的问题。
END….