Python自带的多进程库 multiprocessing 可实现多进程。我想用这些短例子示范如何优雅地用多线程。中文网络上,有些人只是翻译了旧版的 Python官网的多进程文档。而我这篇文章会额外讲一讲下方加粗部分的内容。
- 创建进程 Process,fork直接继承资源,所以初始化更快,spawn只继承必要的资源,所以更省内存,「程序的入口」 if name == main
- 进程池 Pool,Pool只能接受一个参数,但有办法传入多个
- 管道通信 Pipe,最基本的功能,运行速度快
- 队列通信 Queue,有最常用的功能,运行速度稍慢
- 共享内存 Manager Value,Python3.9 新特性 真正的共享内存 shared_memory
如下所示,中文网络上一些讲Python多进程的文章,很多重要的东西没讲(毕竟只是翻译了Python官网的多进程旧版文档)。上方的加粗部分他们没讲,但是这是做多进程总需要知道的内容。
目录(请挑选感兴趣的看,没必要全看)
- 多线程与多进程的区别
- 全局锁与多进程
- 子进程 Process
- 进程池 Pool
- 管道 Pipe
- 队列 Queue
- 共享内存 Manager
我为何写【在Python中优雅地用多进程】?
1. 多线程与多进程的区别
多线程 threading: 一个人有与异性聊天和看剧两件事要做。单线程的她可以看完剧再去聊天,但这样子可能就没人陪她聊天了「哼,发消息不回」。我们把她看成一个CPU核心,为她开起多线程——先看一会剧,偶尔看看新消息,在两件事(线程)间来回切换。多线程:单个CPU核心可以同时做几件事,不至于卡在某一步傻等着。
用处:爬取网站信息(爬虫),等待多个用户输入
多进程 processing: 一个人有很多砖需要搬,他领取手套、推车各种物资(向系统申请了资源)然后开始搬砖。然而他身边有很多人,我们让这些人去帮他!(一核有难,八核围观)。于是他们做了分工,砖很快就搬完了。多进程让多个CPU核心可以一起做事,不至于只有一人干活而其他人傻站着。
用处:进行高性能计算。只有多进程方案设计合理,才能加速计算。
2. 全局锁与多进程
为何在Python里用多进程这么麻烦? 因为Python的线程是操作系统线程,因此要有Python全局解释器锁。一个python解释器进程内有一条主线程,以及多条用户程序的执行线程。即使在多核CPU平台上,由于GIL的存在,所以禁止多线程的并行执行。——来自百度百科词条 全局解释器锁。发展历程:
- Python全局锁。Python 3.2的时候更新过GIL。在我小时候,由于Python GIL的存在(全局解释器锁 Global Interpreter Lock) ,此时Python无法靠自己实现多进程
- 外部多进程通信。Python3.5。在2015年,要么用Python调用C语言(如Numpy此类用其他语言在底层实现多进程的第三方库),要么需要在外部代码(MPI 2015)
- 内置多进程通信。Python 3.6 才让 multiprocessing逐渐发展成一个能用的Python内置多进程库,可以进行进程间的通信,以及有限的内存共享
- 共享内存。Python 3.9 在2020年增加了新特性 shared_memory
3. 子进程 Process
多进程的主进程一定要写在程序入口 if name =='main': 内部
def function1(id): # 这里是子进程
print(f'id {id}')
def run__process(): # 这里是主进程
from multiprocessing import Process
process = [mp.Process(target=function1, args=(1,)),
mp.Process(target=function1, args=(2,)), ]
[p.start() for p in process] # 开启了两个进程
[p.join() for p in process] # 等待两个进程依次结束
# run__process() # 主线程不建议写在 if外部。由于这里的例子很简单,你强行这么做可能不会报错
if __name__ =='__main__':
run__process() # 正确做法:主线程只能写在 if内部
尽管在这个简单的例子里,把主进程run__process()写在程序入口if外部不会有报错。但是你最好还是按我要求去做。详细解释的内容过长,我写在→「Python程序入口有重要功能(多线程)而非编程习惯」
上面的例子只是用Process开启了多进程,不涉及进程通信。当我准备把一个串行任务编排成多进程时,我还需要多进程通信。进程池Pool可以让主程序获得子进程的计算结果(不太灵活,适合简单任务),管道Pipe 队列Queue 等等 可以让进程之间进行通信(足够灵活)。共享值 Value 共享数组 Array 共享内容 shared_memory(Python 3.6 Python3.9 的新特性,还不太成熟)下面开讲。
Python多进程可以选择两种创建进程的方式,spawn 与 fork。分支创建:fork会直接复制一份自己给子进程运行,并把自己所有资源的handle 都让子进程继承,因而创建速度很快,但更占用内存资源。分产创建:spawn只会把必要的资源的handle 交给子进程,因此创建速度稍慢。详细解释请看 Stack OverFlow multiprocessing fork vs spawn 。(分产spawn 是我自己随便翻译的,有更好的翻译请推荐。我绝不把handle 翻译成句柄)
multiprocessing.set_start_method('spawn') # default on WinOS or MacOS
multiprocessing.set_start_method('fork') # default on Linux (UnixOS)
请注意:我说 分支fork 在初始化创建多进程的时候比 分产spawn 快,而不是说高性能计算会比较快。通常高性能计算需要让程序运行很久,因此为了节省内存以及进程安全,我建议选择 spawn。
4. 进程池 Pool
几乎Python多进程代码都需要你明明白白地调用Process。而进程池Pool 会自动帮我们管理子进程。Python的Pool 不方便传入多个参数,我这里提供两个解决思路:
思路1:函数 func2 需要传入多个参数,现在把它改成一个参数,无论你直接让args作为一个元组tuple、词典dict、类class都可以
思路2:使用 function.partial Passing multiple parameters to pool.map() function in Python。这个不灵活的方法固定了其他参数,且需要导入Python的内置库,我不推荐
import time
def func2(args): # multiple parameters (arguments)
# x, y = args
x = args[0] # write in this way, easier to locate errors
y = args[1] # write in this way, easier to locate errors
time.sleep(1) # pretend it is a time-consuming operation
return x - y
def run__pool(): # main process
from multiprocessing import Pool
cpu_worker_num = 3
process_args = [(1, 1), (9, 9), (4, 4), (3, 3), ]
print(f'| inputs: {process_args}')
start_time = time.time()
with Pool(cpu_worker_num) as p:
outputs = p.map(func2, process_args)
print(f'| outputs: {outputs} TimeUsed: {time.time() - start_time:.1f} \n')
'''Another way (I don't recommend)
Using 'functions.partial'. See https://stackoverflow.com/a/25553970/9293137
from functools import partial
# from functools import partial
# pool.map(partial(f, a, b), iterable)
'''
if __name__ =='__main__':
run__pool()
5. 管道 Pipe
顾名思义,管道Pipe 有两端,因而 main_conn, child_conn = Pipe() ,管道的两端可以放在主进程或子进程内,我在实验中没发现主管道口main_conn 和子管道口child_conn 的区别。两端可以同时放进去东西,放进去的对象都经过了深拷贝:用 conn.send()在一端放入,用 conn.recv() 另一端取出,管道的两端可以同时给多个进程。conn是 connect的缩写。
import time
def func_pipe1(conn, p_id):
print(p_id)
time.sleep(0.1)
conn.send(f'{p_id}_send1')
print(p_id, 'send1')
time.sleep(0.1)
conn.send(f'{p_id}_send2')
print(p_id, 'send2')
time.sleep(0.1)
rec = conn.recv()
print(p_id, 'recv', rec)
time.sleep(0.1)
rec = conn.recv()
print(p_id, 'recv', rec)
def func_pipe2(conn, p_id):
print(p_id)
time.sleep(0.1)
conn.send(p_id)
print(p_id, 'send')
time.sleep(0.1)
rec = conn.recv()
print(p_id, 'recv', rec)
def run__pipe():
from multiprocessing import Process, Pipe
conn1, conn2 = Pipe()
process = [Process(target=func_pipe1, args=(conn1, 'I1')),
Process(target=func_pipe2, args=(conn2, 'I2')),
Process(target=func_pipe2, args=(conn2, 'I3')), ]
[p.start() for p in process]
print('| Main', 'send')
conn1.send(None)
print('| Main', conn2.recv())
[p.join() for p in process]
if __name__ =='__main__':
run__pipe()
如果追求运行更快,那么最好使用管道Pipe而非下面介绍的队列Queue,详细请移步Python pipes and queues performance ↓: https://forums.raspberrypi.com/viewtopic.php?t=141576
So yes, pipes are faster than queues - but only by 1.5 to 2 times, what did surprise me was that Python 3 is MUCH slower than Python 2 - most other tests I have done have been a bit up and down (as long as it is Python 3.4 - Python 3.2 seems to be a bit of a dog - especially for memory usage).
我小时候曾经用到Python多线程队列功能写过一个实际例子 ↓,若追求极致性能,还可以把里面的Queue改为Pipe。
Pipe还有 duplex参数 和 poll() 方法 需要了解。默认情况下 duplex==True,若不开启双向管道,那么传数据的方向只能 conn1 ← conn2 。conn2.poll()==True 意味着可以马上使用 conn2.recv() 拿到传过来的数据。conn2.poll(n) 会让它等待n秒钟再进行查询。
from multiprocessing import Pipe
conn1, conn2 = Pipe(duplex=True) # 开启双向管道,管道两端都能存取数据。默认开启
#
conn1.send('A')
print(conn1.poll()) # 会print出 False,因为没有东西等待conn1去接收
print(conn2.poll()) # 会print出 True ,因为conn1 send 了个 'A' 等着conn2 去接收
print(conn2.recv(), conn2.poll(2)) # 会等待2秒钟再开始查询,然后print出 'A False'
尽管我下面的例子不会报错,但这是因为它过于简单,没有真的开多线程去跑,也没有写在程序入口的if内部。很多时候 Pipe运行会快一点,但是它的功能太少了,得用 Queue。最明显的一个区别是:
conn1, conn2 = multiprocessing.Pipe() # 管道有两端,某一端放入的东西,只能在另一端拿到
queue = multiprocessing.Queue() # 队列只有一个,放进去的东西可以在任何地方拿到。
6. 队列 Queue
可以 import queue 调用Python内置的队列,在多线程里也有队列 from multiprocessing import Queue。下面提及的都是多线程的队列。
队列Queue 的功能与前面的管道Pipe非常相似:无论主进程或子进程,都能访问到队列,放进去的对象都经过了深拷贝。不同的是:管道Pipe只有两个断开,而队列Queue 有基本的队列属性,更加灵活,详细请移步Stack Overflow Multiprocessing - Pipe vs Queue。
def func1(i):
time.sleep(1)
print(f'args {i}')
def run__queue():
from multiprocessing import Process, Queue
queue = Queue(maxsize=4) # the following attribute can call in anywhere
queue.put(True)
queue.put([0, None, object]) # you can put deepcopy thing
queue.qsize() # the length of queue
print(queue.get()) # First In First Out
print(queue.get()) # First In First Out
queue.qsize() # the length of queue
process = [Process(target=func1, args=(queue,)),
Process(target=func1, args=(queue,)), ]
[p.start() for p in process]
[p.join() for p in process]
if __name__ =='__main__':
run__queue()
除了上面提及的 Python多线程,读取多个(海康\大华)网络摄像头的视频流: https://zhuanlan.zhihu.com/p/38136322 ,我自己写的开源的强化学习库:小雅 ElegantRL 也使用了 Queue 进行多CPU多GPU训练,为了提速,我已经把Queue 改为 Pipe。
7. 共享内存 Manager
为了在Python里面实现多进程通信,上面提及的 Pipe Queue 把需要通信的信息从内存里深拷贝了一份给其他线程使用(需要分发的线程越多,其占用的内存越多)。而共享内存会由解释器负责维护一块共享内存(而不用深拷贝),这块内存每个进程都能读取到,读写的时候遵守管理(因此不要以为用了共享内存就一定变快)。
Manager可以创建一块共享的内存区域,但是存入其中的数据需要按照特定的格式,Value可以保存数值,Array可以保存数组,如下。这里不推荐认为自己写代码能力弱的人尝试。下面这里例子来自Python官网的Document。
# https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing%20array#multiprocessing.Array
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])