博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一个使用gevent协程池的问题代码分析
阅读量:6861 次
发布时间:2019-06-26

本文共 7186 字,大约阅读时间需要 23 分钟。

hot3.png

主代码:

from multiprocessing import Poolimport geventfrom gevent import monkey; monkey.patch_all()import requestsdef test1():    for i in range(10000):        rep=requests.get("http://www.163.com/")    print "test1",req.status_codedef test2():    for i in range(10000):        rep=requests.get("http://www.163.com/")        print "test2",req.status_codedef coroutine():    gevent.joinall([        gevent.spawn(test1),        gevent.spawn(test2)    ])if __name__=="__main__":    p=Pool()    for i in range(4):        p.apply_async(coroutine,args=())    p.close()    p.join()

strace的结果:

open("/dev/shm/9VRonB", O_RDWR|O_CREAT|O_EXCL, 0600) = 8write(8, "\1\0\0\0\0\0\0\0\200\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 32) = 32mmap(NULL, 32, PROT_READ|PROT_WRITE, MAP_SHARED, 8, 0) = 0x7f7c82fc7000link("/dev/shm/9VRonB", "/dev/shm/sem.mp29419-10437161768614121538") = 0fstat(8, {st_mode=S_IFREG|0600, st_size=32, ...}) = 0unlink("/dev/shm/9VRonB")               = 0close(8)                                = 0unlink("/dev/shm/sem.mp29419-10437161768614121538") = 0open("/sys/devices/system/cpu/online", O_RDONLY|O_CLOEXEC) = 8read(8, "0-3\n", 8192)                  = 4close(8)                                = 0clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29428rt_sigaction(SIGCHLD, {0x7f7c8138da20, ~[KILL STOP RTMIN RT_1], SA_RESTORER|SA_RESTART, 0x7f7c82b9d390}, NULL, 8) = 0clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29429clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29430clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f7c82fac9d0) = 29431clock_gettime(CLOCK_MONOTONIC, {10471, 134885769}) = 0read(6,

fork 之后,一直阻塞在read6

然后,ps查看进程,所有进程都是处于可中断睡眠状态

wan       9710  0.0  0.0  15964  1076 pts/2    S+   12:30   0:00 grep --color=auto pythonroot     22237  0.0  3.0 363416 115696 ?       Sl   10:01   0:02 /usr/bin/python /usr/bin/ubuntu-kylin-software-center-daemonwan      22325  0.0  0.9 555788 37472 ?        Sl   10:01   0:00 python indicator-china-weather.pyroot     28661  0.0  0.5  63904 19828 pts/8    S    11:22   0:00 python ceshi.pyroot     28668  0.0  0.0      0     0 pts/8    Z    11:22   0:00 [python] 
root 28669 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python]
root 28670 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python]
root 28671 0.0 0.0 0 0 pts/8 Z 11:22 0:00 [python]
root 29417 0.0 0.0 5208 1284 pts/8 S+ 11:26 0:00 strace python ceshi.pyroot 29419 0.0 0.5 63648 19856 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29428 0.0 0.4 63904 15840 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29429 0.0 0.4 63904 15840 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29430 0.0 0.4 63904 15748 pts/8 S+ 11:26 0:00 python ceshi.pyroot 29431 0.0 0.4 63904 15760 pts/8 S+ 11:26 0:00 python ceshi.py

接着是查看管道fd 6和对应的文件持有:

root@wan-ThinkPad-E450c:/proc/28661/fd# ls -ltotal 0lrwx------ 1 root root 64 Jul 25 11:29 0 -> /dev/pts/8lrwx------ 1 root root 64 Jul 25 11:29 1 -> /dev/pts/8lrwx------ 1 root root 64 Jul 25 11:29 11 -> anon_inode:[eventpoll]lr-x------ 1 root root 64 Jul 25 11:29 12 -> pipe:[245222]l-wx------ 1 root root 64 Jul 25 11:29 13 -> pipe:[245222]lrwx------ 1 root root 64 Jul 25 11:28 2 -> /dev/pts/8lr-x------ 1 root root 64 Jul 25 11:29 3 -> pipe:[246153]l-wx------ 1 root root 64 Jul 25 11:29 4 -> pipe:[246153]lr-x------ 1 root root 64 Jul 25 11:29 5 -> /dev/urandomlr-x------ 1 root root 64 Jul 25 11:29 6 -> pipe:[246154]l-wx------ 1 root root 64 Jul 25 11:29 7 -> pipe:[246154]root@wan-ThinkPad-E450c:/proc/28661/fd# lsof |grep 246154lsof: WARNING: can't stat() fuse.gvfsd-fuse file system /run/user/1000/gvfs      Output information may be incomplete.python    28661                   root    6r     FIFO               0,10        0t0     246154 pipepython    28661                   root    7w     FIFO               0,10        0t0     246154 pipe

发现整个程序是阻塞在管道上。整个代码那个地方用到了管道?就是multiprocess pool 用了quque, 有个simplequeue ,里面用了管道,这里的管道是否是被patch掉了?查看patch 模块,发现patch_threading模块有下面描述:

def patch_thread(threading=True, _threading_local=True, Event=False, logging=True,                 existing_locks=True,                 _warnings=None):    """    Replace the standard :mod:`thread` module to make it greenlet-based.    - If *threading* is true (the default), also patch ``threading``.    - If *_threading_local* is true (the default), also patch ``_threading_local.local``.    - If *logging* is True (the default), also patch locks taken if the logging module has      been configured.    - If *existing_locks* is True (the default), and the process is still single threaded,      make sure than any :class:`threading.RLock` (and, under Python 3, :class:`importlib._bootstrap._ModuleLock`)      instances that are currently locked can be properly unlocked.    .. caution::        Monkey-patching :mod:`thread` and using        :class:`multiprocessing.Queue` or        :class:`concurrent.futures.ProcessPoolExecutor` (which uses a        ``Queue``) will hang the process.    .. versionchanged:: 1.1b1        Add *logging* and *existing_locks* params.    """    # XXX: Simplify    # pylint:disable=too-many-branches,too-many-locals    # Description of the hang:    # There is an incompatibility with patching 'thread' and the 'multiprocessing' module:    # The problem is that multiprocessing.queues.Queue uses a half-duplex multiprocessing.Pipe,    # which is implemented with os.pipe() and _multiprocessing.Connection. os.pipe isn't patched    # by gevent, as it returns just a fileno. _multiprocessing.Connection is an internal implementation    # class implemented in C, which exposes a 'poll(timeout)' method; under the covers, this issues a    # (blocking) select() call: hence the need for a real thread. Except for that method, we could    # almost replace Connection with gevent.fileobject.SocketAdapter, plus a trivial    # patch to os.pipe (below). Sigh, so close. (With a little work, we could replicate that method)    # import os    # import fcntl    # os_pipe = os.pipe    # def _pipe():    #   r, w = os_pipe()    #   fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)    #   fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)    #   return r, w    # os.pipe = _pipe    # The 'threading' module copies some attributes from the    # thread module the first time it is imported. If we patch 'thread'    # before that happens, then we store the wrong values in 'saved',    # So if we're going to patch threading, we either need to import it    # before we patch thread, or manually clean up the attributes that    # are in trouble. The latter is tricky because of the different names    # on different versions.

   取消掉patch_threading后,恢复正常。 

   其中这里谈谈multiprocess和threading的Queue共享方式, 这里差别比较大,线程模块使用的mmap,而进程模块使用的是管道,lock和信号量, 管道缓存, lcok保证一致性, 信号量事件通知。 进程队列的消耗远大于线程同步,所以,也是数据库连接池不用进程共享的一个原因。

转载于:https://my.oschina.net/u/2950272/blog/1486814

你可能感兴趣的文章
Docker | 搭建docker本地镜像仓库
查看>>
(转) Python 实现简单的Web服务器
查看>>
关于SVM的那点破事
查看>>
写给java web一年左右工作经验的人
查看>>
Java八种基本数据类型的比较及其相互转化
查看>>
【Java编程规范】 代码书写规范...待续中
查看>>
Qml数据类型
查看>>
float浮点数的二进制存储方式及转换
查看>>
二手X61续
查看>>
如何在 CentOS 7 上禁用 SELinux
查看>>
Android有用代码片断(五)
查看>>
Git 版本库理解
查看>>
spring-boot项目在线生成工程使用(start.spring.io)
查看>>
tomcat修改jsessionid在cookie中的名称
查看>>
机器学习教程
查看>>
在 MinGW 中使用 OpenSSL 创建证书时的 BUG
查看>>
第二十三讲:tapestry条件与循环组件详解之if
查看>>
用js实现的刷新页面
查看>>
我有一个梦想
查看>>
iOS SDK 开发遇到问题集锦
查看>>