主代码:
from multiprocessing import Poolimport geventfrom gevent import monkey; monkey.patch_all()import requestsdef test1(): for i in range(10000): rep=requests.get("http://www.163.com/") print "test1",req.status_codedef test2(): for i in range(10000): rep=requests.get("http://www.163.com/") print "test2",req.status_codedef coroutine(): gevent.joinall([ gevent.spawn(test1), gevent.spawn(test2) ])if __name__=="__main__": p=Pool() for i in range(4): p.apply_async(coroutine,args=()) p.close() p.join()
strace的结果:
open("/dev/shm/9VRonB", O_RDWR|O_CREAT|O_EXCL, 0600) = 8write(8, "\1\0\0\0\0\0\0\0\200\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 32) = 32mmap(NULL, 32, PROT_READ|PROT_WRITE, MAP_SHARED, 8, 0) = 0x7f7c82fc7000link("/dev/shm/9VRonB", "/dev/shm/sem.mp29419-10437161768614121538") = 0fstat(8, {st_mode=S_IFREG|0600, st_size=32, ...}) = 0unlink("/dev/shm/9VRonB") = 0close(8) = 0unlink("/dev/shm/sem.mp29419-10437161768614121538") = 0open("/sys/devices/system/cpu/online", O_RDONLY|O_CLOEXEC) = 8read(8, "0-3\n", 8192) = 4close(8) = 0clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29428rt_sigaction(SIGCHLD, {0x7f7c8138da20, ~[KILL STOP RTMIN RT_1], SA_RESTORER|SA_RESTART, 0x7f7c82b9d390}, NULL, 8) = 0clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29429clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29430clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29431clock_gettime(CLOCK_MONOTONIC, {10471, 134885769}) = 0read(6,
fork 之后,一直阻塞在read6
然后,ps查看进程,所有进程都是处于可中断睡眠状态
wan 9710 0.0 0.0 15964 1076 pts/2 S+ 12:30 0:00 grep --color=auto pythonroot 22237 0.0 3.0 363416 115696 ? Sl 10:01 0:02 /usr/bin/python /usr/bin/ubuntu-kylin-software-center-daemonwan 22325 0.0 0.9 555788 37472 ? Sl 10:01 0:00 python indicator-china-weather.pyroot 28661 0.0 0.5 63904 19828 pts/8 S 11:22 0:00 python ceshi.pyroot 28668 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python]root 28669 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python] root 28670 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python] root 28671 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python] root 29417 0.0 0.0 5208 1284 pts/8 S+ 11:26 0:00 strace python ceshi.pyroot 29419 0.0 0.5 63648 19856 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29428 0.0 0.4 63904 15840 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29429 0.0 0.4 63904 15840 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29430 0.0 0.4 63904 15748 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29431 0.0 0.4 63904 15760 pts/8 S+ 11:26 0:00 python ceshi.py
接着是查看管道fd 6和对应的文件持有:
root@wan-ThinkPad-E450c:/proc/28661/fd# ls -ltotal 0lrwx------ 1 root root 64 Jul 25 11:29 0 -> /dev/pts/8lrwx------ 1 root root 64 Jul 25 11:29 1 -> /dev/pts/8lrwx------ 1 root root 64 Jul 25 11:29 11 -> anon_inode:[eventpoll]lr-x------ 1 root root 64 Jul 25 11:29 12 -> pipe:[245222]l-wx------ 1 root root 64 Jul 25 11:29 13 -> pipe:[245222]lrwx------ 1 root root 64 Jul 25 11:28 2 -> /dev/pts/8lr-x------ 1 root root 64 Jul 25 11:29 3 -> pipe:[246153]l-wx------ 1 root root 64 Jul 25 11:29 4 -> pipe:[246153]lr-x------ 1 root root 64 Jul 25 11:29 5 -> /dev/urandomlr-x------ 1 root root 64 Jul 25 11:29 6 -> pipe:[246154]l-wx------ 1 root root 64 Jul 25 11:29 7 -> pipe:[246154]root@wan-ThinkPad-E450c:/proc/28661/fd# lsof |grep 246154lsof: WARNING: can't stat() fuse.gvfsd-fuse file system /run/user/1000/gvfs Output information may be incomplete.python 28661 root 6r FIFO 0,10 0t0 246154 pipepython 28661 root 7w FIFO 0,10 0t0 246154 pipe
发现整个程序是阻塞在管道上。整个代码那个地方用到了管道?就是multiprocess pool 用了quque, 有个simplequeue ,里面用了管道,这里的管道是否是被patch掉了?查看patch 模块,发现patch_threading模块有下面描述:
def patch_thread(threading=True, _threading_local=True, Event=False, logging=True, existing_locks=True, _warnings=None): """ Replace the standard :mod:`thread` module to make it greenlet-based. - If *threading* is true (the default), also patch ``threading``. - If *_threading_local* is true (the default), also patch ``_threading_local.local``. - If *logging* is True (the default), also patch locks taken if the logging module has been configured. - If *existing_locks* is True (the default), and the process is still single threaded, make sure than any :class:`threading.RLock` (and, under Python 3, :class:`importlib._bootstrap._ModuleLock`) instances that are currently locked can be properly unlocked. .. caution:: Monkey-patching :mod:`thread` and using :class:`multiprocessing.Queue` or :class:`concurrent.futures.ProcessPoolExecutor` (which uses a ``Queue``) will hang the process. .. versionchanged:: 1.1b1 Add *logging* and *existing_locks* params. """ # XXX: Simplify # pylint:disable=too-many-branches,too-many-locals # Description of the hang: # There is an incompatibility with patching 'thread' and the 'multiprocessing' module: # The problem is that multiprocessing.queues.Queue uses a half-duplex multiprocessing.Pipe, # which is implemented with os.pipe() and _multiprocessing.Connection. os.pipe isn't patched # by gevent, as it returns just a fileno. _multiprocessing.Connection is an internal implementation # class implemented in C, which exposes a 'poll(timeout)' method; under the covers, this issues a # (blocking) select() call: hence the need for a real thread. Except for that method, we could # almost replace Connection with gevent.fileobject.SocketAdapter, plus a trivial # patch to os.pipe (below). Sigh, so close. (With a little work, we could replicate that method) # import os # import fcntl # os_pipe = os.pipe # def _pipe(): # r, w = os_pipe() # fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK) # fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK) # return r, w # os.pipe = _pipe # The 'threading' module copies some attributes from the # thread module the first time it is imported. If we patch 'thread' # before that happens, then we store the wrong values in 'saved', # So if we're going to patch threading, we either need to import it # before we patch thread, or manually clean up the attributes that # are in trouble. The latter is tricky because of the different names # on different versions.
取消掉patch_threading后,恢复正常。
其中这里谈谈multiprocess和threading的Queue共享方式, 这里差别比较大,线程模块使用的mmap,而进程模块使用的是管道,lock和信号量, 管道缓存, lcok保证一致性, 信号量事件通知。 进程队列的消耗远大于线程同步,所以,也是数据库连接池不用进程共享的一个原因。