avatar


4.多线程、协程、多进程

多线程

关于什么是多线程,可以参考《基于Java的后端开发入门:7.多线程 [1/2]》,这里不赘述。

threading

在Python3中,通过threading模块提供线程的功能。

原来的thread模块已废弃,但是threading模块中有个Thread类(T大写),这个类是模块中最主要的线程类,这两个要分清楚。

threading的常用方法和属性:

  • current_thread():返回当前线程。
  • active_count():返回当前活跃的线程数。
  • get_ident():返回当前线程ID
  • enumerate():返回当前活动Thread对象列表
  • main_thread():返回主Thread对象
  • settrace(func):为所有线程设置一个trace函数
  • setprofile(func):所有线程设置一个profile函数
  • stack_size([size]):返回新创建线程栈大小;或为后续创建的线程设定栈大小为size
  • TIMEOUT_MAXLock.acquire()RLock.acquire()Condition.wait()允许的最大超时时间

示例代码:

1
2
3
4
5
6
7
import threading

print(threading.current_thread())
print(threading.active_count())
print(threading.get_ident())
print(threading.enumerate())
print(threading.main_thread())

运行结果:

1
2
3
4
5
<_MainThread(MainThread, started 140704355178688)>
1
140704355178688
[<_MainThread(MainThread, started 140704355178688)>]
<_MainThread(MainThread, started 140704355178688)>

threading模块包含下面的类:

  • Thread:基本线程类。
  • Lock:互斥锁。
  • RLock:可重入锁,使单一进程再次获得已持有的锁(递归锁)。
  • Condition:条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。
  • Semaphore:信号锁,为线程间共享的有限资源提供一个"计数器",如果没有可用资源则会被阻塞。
  • Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活。
  • Timer:一种计时器。
  • Barrier:必须达到指定数量的线程后才可以继续执行。

创建线程

有两种方式来创建线程:

  1. 继承threading.Thread类,并重写run()方法。
  2. 实例化threading.Thread对象的时候,将线程要执行的任务函数作为参数传入线程。

继承threading.Thread类,并重写run()方法。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time


class MyThread(threading.Thread):
def __init__(self, thread_name):
# 注意:一定要显式的调用父类的初始化函数。
super(MyThread, self).__init__(name=thread_name,msg=msg)

def run(self):
time.sleep(1)
print("%s正在运行中......" % self.name)


if __name__ == '__main__':
for i in range(10):
MyThread("thread-" + str(i)).start()

运行结果:

1
2
3
4
5
6
7
8
9
10
thread-0正在运行中......thread-2正在运行中......
thread-3正在运行中......thread-7正在运行中......
thread-6正在运行中......
thread-8正在运行中......


thread-5正在运行中......
thread-1正在运行中......
thread-9正在运行中......
thread-4正在运行中......

实例化threading.Thread对象的时候,将线程要执行的任务函数作为参数传入线程。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import time


def show(arg):
time.sleep(1)
print('thread ' + str(arg) + " running....")


if __name__ == '__main__':
for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
thread 3 running....thread 1 running....
thread 2 running....
thread 0 running....
thread 5 running....
thread 7 running....
thread 9 running....
thread 4 running....thread 8 running....


thread 6 running....

我们还可以点开看一下threading.Thread的构造方法,参数如下:

  • group:暂时无用,未来功能的预留参数。
  • target:执行的目标任务名。
  • args:以元组的方式给执行任务传参。
  • kwargs:以字典方式给执行任务传参。
  • name:线程名,一般不用设置。

threading.Thread的常用方法和属性:

  • start():启动线程,等待CPU调度。
  • run():线程被CPU调度后自动执行的方法。
  • getName()setName()name:用于获取和设置线程的名称。
  • setDaemon():被标记为守护线程的线程,在主线程结束的时候,也会结束运行。
  • ident:获取线程的ID。
  • is_alive():判断线程是否是激活的(alive)。
  • isDaemon():判断现场是否为守护线程。
  • join([timeout]):只有这个线程死亡了,其他线程才可以执行。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么其他线程将一直堵塞到被调线程结束。

传递参数

示例代码:

1
2
3
4
5
6
7
8
9
10
import threading


def p(msg):
print(msg)


if __name__ == '__main__':
pt = threading.Thread(target=p, args=('信息',))
pt.start()

运行结果:

1
信息

解释说明:我们是以元组的方式传递参数,如果定义的元组只有一个数据,那么这个数据后面也要添加逗号。

特别的,如果我们继承threading.Thread类,并重写run()方法,这时候怎么传递参数呢?
可以参考《基于Java的后端开发入门:7.多线程 [1/2]》,用成员变量的方式。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading


class MyThread(threading.Thread):
def __init__(self, thread_name, msg):
# 注意:一定要显式的调用父类的初始化函数。
super(MyThread, self).__init__(name=thread_name)
self.msg = msg

def run(self):
print(self.name)
print(self.msg)


if __name__ == '__main__':
MyThread("thread-zi", '信息').start()

运行结果:

1
2
thread-zi
信息

线程控制

join

如果没有join,主线程会立即结束。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
import threading


def waiting():
print('start waiting:', time.strftime('%H:%M:%S'))
time.sleep(3)
print('stop waiting', time.strftime('%H:%M:%S'))


t = threading.Thread(target=waiting)
t.start()
# 确保线程t已经启动
time.sleep(1)
print('start join')
# 将一直堵塞,直到t运行结束。
# t.join()
print('end join')

运行结果:

1
2
3
4
start waiting: 13:53:30
start join
end join
stop waiting 13:53:33

如果有了join,会将主线程阻塞,直到子线程结束后继续运行。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
import threading


def waiting():
print('start waiting:', time.strftime('%H:%M:%S'))
time.sleep(3)
print('stop waiting', time.strftime('%H:%M:%S'))


t = threading.Thread(target=waiting)
t.start()
# 确保线程t已经启动
time.sleep(1)
print('start join')
# 将一直堵塞,直到t运行结束。
t.join()
print('end join')

运行结果:

1
2
3
4
start waiting: 13:55:39
start join
stop waiting 13:55:42
end join

setDaemon

如果没有setDaemon,或者设置为False。当主线程结束后,子线程会继续运行。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time


def run():
print(threading.current_thread().getName() + "开始工作")
time.sleep(2)
print("子线程工作完毕")


for i in range(3):
t = threading.Thread(target=run, name="thread-" + str(i))
# t.setDaemon(True)
t.start()

time.sleep(1)
print("主线程结束了!")
print(threading.active_count())

运行结果:

1
2
3
4
5
6
7
8
thread-0开始工作
thread-1开始工作
thread-2开始工作
主线程结束了!
4
子线程工作完毕
子线程工作完毕
子线程工作完毕

如果设置了setDaemon,且值为True,在主线程结束的时候,也会结束运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import time


def run():
print(threading.current_thread().getName() + "开始工作")
time.sleep(2)
print("子线程工作完毕")


for i in range(3):
t = threading.Thread(target=run, name="thread-" + str(i))
t.setDaemon(True)
t.start()

time.sleep(1)
print("主线程结束了!")
print(threading.active_count())

运行结果:

1
2
3
4
5
thread-0开始工作thread-1开始工作

thread-2开始工作
主线程结束了!
4

线程锁

线程安全

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

number = 0


def plus():
# global声明此处的number是外面的全局变量number
global number
# 进行一个大数级别的循环加一运算
for _ in range(1000000):
number = number + 1
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))


# 用2个子线程,就可以观察到脏数据
for i in range(2):
t = threading.Thread(target=plus)
t.start()

time.sleep(5)
print("主线程执行完毕后,number = ", number)

运行结果:

1
2
3
子线程Thread-1运算结束后,number = 1152193
子线程Thread-2运算结束后,number = 1231093
主线程执行完毕后,number = 1231093

结果并不等于2000000,可以很明显地看出脏数据的情况。这是因为两个线程在运行过程中,CPU随机调度,你算一会我算一会,在没有对number进行保护的情况下,就发生了数据错误。

如果想获得正确结果,可以使用join()方法,让多线程变成顺序执行。示例代码:

1
2
3
4
for i in range(2):
t = threading.Thread(target=plus)
t.start()
t.join()

但,这么做,其实是让多线程变成了单线程,属于因噎废食的做法。正确的做法是使用线程锁。

threading模块中定义了几种线程锁类,分别是:

  • Lock:互斥锁
  • RLock:可重入锁
  • Semaphore:信号
  • Event:事件
  • Condition:条件
  • Barrier:阻碍

Lock(互斥锁)

Lock(互斥锁),是一种独占锁,同一时刻只有一个线程可以访问共享的数据。

使用很简单,初始化锁对象,然后将锁当做参数传递给任务函数,在任务中加锁,使用后释放锁。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading
import time

number = 0
lock = threading.Lock()


def plus(lk):
# global声明此处的number是外面的全局变量number
global number
# 进行一个大数级别的循环加一运算
for _ in range(1000000):
# 开始加锁
lk.acquire()
number += 1
lk.release()
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
# 释放锁,让别的线程也可以访问number


if __name__ == '__main__':
# 用2个子线程,就可以观察到脏数据
for i in range(2):
# 需要把锁当做参数传递给plus函数
t = threading.Thread(target=plus, args=(lock,))
t.start()

time.sleep(5)
print("主线程执行完毕后,number = ", number)

运行结果:

1
2
3
子线程Thread-1运算结束后,number = 1887869
子线程Thread-2运算结束后,number = 2000000
主线程执行完毕后,number = 2000000

RLock的使用方法和Lock一模一样,只不过它支持重入锁,其特点有:

  • 锁对象内部维护着一个Lock和一个counter对象,counter对象记录了acquire的次数。
  • 同一个线程中,RLock.acquire()可以被多次调用,利用该特性,可以解决部分死锁问题。

Semaphore(信号)

类名:BoundedSemaphore
这种锁允许一定数量的线程同时更改数据, 不是互斥锁。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
import threading


def run(n, se):
se.acquire()
print("run the thread: %s" % n)
time.sleep(5)
se.release()


# 设置允许5个线程同时运行
semaphore = threading.BoundedSemaphore(5)
for i in range(20):
t = threading.Thread(target=run, args=(i, semaphore))
t.start()

Event(事件线程锁)

类名:Event

Event(事件线程锁)的运行机制:全局定义了一个Flag,如果Flag的值为False,那么当程序执行wait()方法时就会阻塞,如果Flag值为True,线程不再阻塞。这种锁,类似交通红绿灯(默认是红灯),它属于在红灯的时候一次性阻挡所有线程,在绿灯的时候,一次性放行所有排队中的线程。

主要方法如下:

  • set():调用set()方法会将Flag设置为True。
  • wait():调用wait()方法将等待"红绿灯"信号。
  • clear():调用clear()方法会将Flag设置为False。
  • is_set():判断当前是否"绿灯放行"状态

下面是一个模拟红绿灯,然后汽车通行的例子,示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import time

event = threading.Event()


def lighter():
# 绿灯时间
green_time = 5
# 红灯时间
red_time = 5
# 初始设为绿灯
event.set()
while True:
print("绿灯亮...")
time.sleep(green_time)
event.clear()
print("红灯亮...")
time.sleep(red_time)
event.set()


def run(name):
while True:
# 判断当前是否"放行"状态
if event.is_set():
print("[%s]通过..." % name)
time.sleep(1)
else:
print("[%s]停下..." % name)
event.wait()
print("[%s]继续..." % name)


if __name__ == '__main__':

light = threading.Thread(target=lighter, )
light.start()

for name in ['奔驰', '宝马', '奥迪']:
car = threading.Thread(target=run, args=(name,))
car.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
绿灯亮...[奔驰]通过...
[宝马]通过...

[奥迪]通过...
[宝马]通过...[奥迪]通过...[奔驰]通过...


[奔驰]通过...
[奥迪]通过...
[宝马]通过...
[奔驰]通过...[奥迪]通过...

[宝马]通过...
[奥迪]通过...
[奔驰]通过...
[宝马]通过...
红灯亮...
[奔驰]停下...
[奥迪]停下...
[宝马]停下...
绿灯亮...
[奔驰]继续...
[奔驰]通过...
[宝马]继续...
[奥迪]继续...
[奥迪]通过...
[宝马]通过...
[奥迪]通过...

Condition(条件锁)

类名:Condition

主要方法如下:

  • acquire():加锁。
  • release():解锁。
  • wait([timeout]):使线程进入Condition的等待池,等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
  • notify():从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
  • notifyAll():通知等待池中所有的线程,这些线程都将尝试获得锁。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time

num = 0
con = threading.Condition()


class Foo(threading.Thread):

def __init__(self, name, action):
super(Foo, self).__init__()
self.name = name
self.action = action

def run(self):
global num
con.acquire()
print("%s 开始执行..." % self.name)
while True:
if self.action == "add":
num += 1
elif self.action == 'reduce':
num -= 1
else:
exit(1)
print("num当前为:", num)
time.sleep(1)
if num == 5 or num == 0:
print("%s 暂停执行" % self.name)
con.notify()
con.wait()
print("%s 开始执行..." % self.name)
con.release()


if __name__ == '__main__':
a = Foo("线程A", 'add')
b = Foo("线程B", 'reduce')
a.start()
b.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
线程A 开始执行...
num当前为: 1
num当前为: 2
num当前为: 3
num当前为: 4
num当前为: 5
线程A 暂停执行
线程B 开始执行...
num当前为: 4
num当前为: 3
num当前为: 2
num当前为: 1
num当前为: 0
线程B 暂停执行
线程A 开始执行...
num当前为: 1
num当前为: 2
num当前为: 3
num当前为: 4
num当前为: 5
线程A 暂停执行
线程B 开始执行...
num当前为: 4
num当前为: 3
num当前为: 2
num当前为: 1
num当前为: 0

Timer(定时器)

概述

Timer是Thread的子类,是一个定时器功能的类,可以在指定秒之后执行某个方法。

使用方法

1
timer = threading.Timer(interval, function, args=None, kwargs=None)

参数:

  • interval:经过多少秒调用函数,单位秒。
    不断调用则在目标函数末尾调用该方法。
  • function:调用的目标函数。
  • args=None:传递到目标函数的位置参数。
  • kwargs=None:传递到目标函数的关键字参数。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time

exec_count = 0

def start():
print("hello world", exec_count)


def heart_beat():
print(time.strftime('%Y-%m-%d %H:%M:%S'))
global exec_count
exec_count += 1
# 执行15次后停止定时器
if exec_count < 15:
start()
threading.Timer(5, heart_beat).start()


if __name__ == '__main__':
heart_beat()

通过with语句使用线程锁

所有的线程锁都有一个加锁和释放锁的动作,非常类似文件的打开和关闭。在加锁后,如果线程执行过程中出现异常或者错误,没有正常的释放锁,那么其他的线程会造到致命性的影响。通过with上下文管理器,可以确保锁被正常释放。

其格式如下:

1
2
with some_lock:
# 执行任务...

这相当于:

1
2
3
4
5
some_lock.acquire()
try:
# 执行任务..
finally:
some_lock.release()

全局解释器锁(GIL)

什么是全局解释器锁(GIL)

在大多数环境中,单核CPU情况下,本质上某一时刻只能有一个线程被执行。多核CPU时则可以支持多个线程同时执行。
但是在Python中,无论CPU有多少核,同时只能执行一个线程。

围观

这是由于GIL的存在导致的。
GIL的全称是Global Interpreter Lock,全局解释器锁,是Python设计之初为了数据安全所做的决定。Python中的某个线程想要执行,必须先拿到GIL。可以把GIL看作是执行任务的"通行证",并且在一个Python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。

不过,GIL只在CPython解释器中才有,在PyPy和JPython中没有GIL。
但是,正如我们在《5.Python [1/2]》的开头所说的,CPython,是官方的解释器。

也正因为这个特性,Python针对不同类型的任务,多线程执行效率是不同的:

  • Python下的多线程对CPU密集型任务并不友好。
  • Python的多线程对IO密集型任务比较友好。

如果是双线程的CPU呢?

一个刁钻的问题,如果是那种四核八线程的那种CPU。
Python可以用两个线程吗?
不能。
对于四核八线程的CPU,在软件上就理解成是八核CPU。

如何充分利用多核CPU

根据上文的讨论,如何在Python中想要充分利用多核CPU?
多进程
这一每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行。

那么,怎么同时启用多个进程呢?
同时跑多个Python应用,就有多进程了。
(虽然在下文我们会讨论"多进程",但是那种方法在Windows系统中不支持。)

还有一种方法,协程。
我们也会在下文讨论。

最好的方法:多进程+协程

线程池

在Python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。

如下的线程池,可以参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
一个基于thread和queue的线程池,以任务为队列元素,动态创建线程,重复利用线程,
通过close和terminate方法关闭线程池。
"""
import queue
import threading
import contextlib
import time

# 创建空对象,用于停止线程
StopEvent = object()


def callback(status, result):
"""
根据需要进行的回调函数,默认不执行。
:param status: action函数的执行状态
:param result: action函数的返回值
:return:
"""
pass


def action(thread_name, arg):
"""
真实的任务定义在这个函数里
:param thread_name: 执行该方法的线程名
:param arg: 该函数需要的参数
:return:
"""
# 模拟该函数执行了0.1秒
time.sleep(0.1)
print("第%s个任务调用了线程 %s,并打印了这条信息!" % (arg + 1, thread_name))


class ThreadPool:

def __init__(self, max_num, max_task_num=None):
"""
初始化线程池
:param max_num: 线程池最大线程数量
:param max_task_num: 任务队列长度
"""
# 如果提供了最大任务数的参数,则将队列的最大元素个数设置为这个值。
if max_task_num:
self.q = queue.Queue(max_task_num)
# 默认队列可接受无限多个的任务
else:
self.q = queue.Queue()
# 设置线程池最多可实例化的线程数
self.max_num = max_num
# 任务取消标识
self.cancel = False
# 任务中断标识
self.terminal = False
# 已实例化的线程列表
self.generate_list = []
# 处于空闲状态的线程列表
self.free_list = []

def put(self, func, args, callback=None):
"""
往任务队列里放入一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
# 先判断标识,看看任务是否取消了
if self.cancel:
return
# 如果没有空闲的线程,并且已创建的线程的数量小于预定义的最大线程数,则创建新线程。
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
# 构造任务参数元组,分别是调用的函数,该函数的参数,回调函数。
w = (func, args, callback,)
# 将任务放入队列
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
# 每个线程都执行call方法
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数。在正常情况下,每个线程都保存生存状态, 直到获取线程终止的flag。
"""
# 获取当前线程的名字
current_thread = threading.currentThread().getName()
# 将当前线程的名字加入已实例化的线程列表中
self.generate_list.append(current_thread)
# 从任务队列中获取一个任务
event = self.q.get()
# 让获取的任务不是终止线程的标识对象时
while event != StopEvent:
# 解析任务中封装的三个参数
func, arguments, callback = event
# 抓取异常,防止线程因为异常退出
try:
# 正常执行任务函数
result = func(current_thread, *arguments)
success = True
except Exception as e:
# 当任务执行过程中弹出异常
result = None
success = False
# 如果有指定的回调函数
if callback is not None:
# 执行回调函数,并抓取异常
try:
callback(success, result)
except Exception as e:
pass
# 当某个线程正常执行完一个任务时,先执行worker_state方法
with self.worker_state(self.free_list, current_thread):
# 如果强制关闭线程的flag开启,则传入一个StopEvent元素
if self.terminal:
event = StopEvent
# 否则获取一个正常的任务,并回调worker_state方法的yield语句
else:
# 从这里开始又是一个正常的任务循环
event = self.q.get()
else:
# 一旦发现任务是个终止线程的标识元素,将线程从已创建线程列表中删除
self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,让所有线程都停止的方法
"""
# 设置flag
self.cancel = True
# 计算已创建线程列表中线程的个数,
# 然后往任务队列里推送相同数量的终止线程的标识元素
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
在任务执行过程中,终止线程,提前退出。
"""
self.terminal = True
# 强制性的停止线程
while self.generate_list:
self.q.put(StopEvent)

# 该装饰器用于上下文管理
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录空闲的线程,或从空闲列表中取出线程处理任务
"""
# 将当前线程,添加到空闲线程列表中
state_list.append(worker_thread)
# 捕获异常
try:
# 在此等待
yield
finally:
# 将线程从空闲列表中移除
state_list.remove(worker_thread)


# 调用方式
if __name__ == '__main__':
# 创建一个最多包含5个线程的线程池
pool = ThreadPool(5)
# 创建100个任务,让线程池进行处理
for i in range(100):
pool.put(action, (i,), callback)
# 等待一定时间,让线程执行任务
time.sleep(3)
print("-" * 50)
print("\033[32;0m任务停止之前线程池中有%s个线程,空闲的线程有%s个!\033[0m"
% (len(pool.generate_list), len(pool.free_list)))
# 正常关闭线程池
pool.close()
print("任务执行完毕,正常退出!")
# 强制关闭线程池
# pool.terminate()
# print("强制停止任务!")

协程

根据上文的讨论,我们知道:

  • 对于CPU计算密集型任务由于GIL的存在通常使用多进程来实现。
  • 对于IO密集型任务可以通过多线程,让线程在执行IO任务时让出GIL,从而实现表面上的并发。

但,对于IO密集型任务我们还有一种更好的选择:协程。
协程,也称微线程,Coroutine,是运行在单线程中的"并发",协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。

什么是协程

协程的切换不同于线程切换,是由程序自身控制的,没有切换的开销。协程不需要多线程的锁机制,因为都是在同一个线程中运行,所以没有同时访问数据的问题,执行效率比多线程高很多。

我们可以这么简单的理解:

  • 进程/线程:操作系统提供的一种并发处理任务的能力。
  • 协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内的任务调度技巧。

下面举一个例子,甲,乙两个工人模拟两个工作任务交替进行,在单线程内实现了类似多线程的功能。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
rt = True


def task1():
while True:
print("<甲>工作了一段时间.....")
print("<甲>需要休息,线程不切换,换<乙>")
task2()


def task2():
global rt
if rt:
rt = False
task1()
print("<乙>工作了一段时间.....")
print("<乙>需要休息,线程不切换,换<甲>")


if __name__ == '__main__':
task2()

运行结果:

1
2
3
4
5
6
7
<甲>工作了一段时间.....
<甲>需要休息,线程不切换,换<乙>
<乙>工作了一段时间.....
<乙>需要休息,线程不切换,换<甲>
<甲>工作了一段时间.....
<甲>需要休息,线程不切换,换<乙>
<乙>工作了一段时间.....

实现

在Python中有多种方式可以实现协程:

  • greenlet,是一个第三方模块,用于实现协程代码(Gevent协程就是基于greenlet实现)
  • yield,生成器,借助生成器的特点也可以实现协程代码。
  • asyncio,在Python3.4中引入的模块用于编写协程代码。
  • async & awiat,在Python3.5中引入的两个关键字,结合asyncio模块可以更方便的编写协程代码。

greenlet

greentlet是一个第三方模块。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from greenlet import greenlet


def func1():
# 第2步:输出 1
print(1)
# 第3步:切换到 func2 函数
gr2.switch()
# 第6步:输出 2
print(2)
# 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行
gr2.switch()


def func2():
# 第4步:输出 3
print(3)
# 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
gr1.switch()
# 第8步:输出 4
print(4)


gr1 = greenlet(func1)
gr2 = greenlet(func2)
# 第1步:去执行 func1 函数
gr1.switch()

运行结果:

1
2
3
4
1
3
2
4

yield

基于Python的生成器的yieldyield form关键字实现协程代码。

一个函数中如果存在了yield,就是一个生成器函数,返回的就是一个生成器。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def func1():
yield 1
yield from func2()
yield 2


def func2():
yield 3
yield 4


f1 = func1()
i = 1
for item in f1:
print('第%s次遍历' % i)
i = i + 1
print(item)

运行结果:

1
2
3
4
5
6
7
8
第1次遍历
1
第2次遍历
3
第3次遍历
4
第4次遍历
2

解释说明:

  • f1 = func1(),即执行了一个生成器函数func1(),返回了一个生成器f1
  • for item in f1:对生成器进行遍历:
    • 第一次遍历,yield 1,返回1
    • 第二次遍历,yield from func2(),从func2()进行yieldyield 3,返回3
    • 第三次遍历,yield 4,返回4
    • 第四次遍历,yield 2,返回2

asyncio

asyncio,是3.4及的内置模块。
基于asyncio实现的协程,相比上文的,优势在于,遇到IO耗时操作自动切换。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


@asyncio.coroutine
def func1():
print(1)
# 遇到IO耗时操作,自动化切换到tasks中的其他任务
yield from asyncio.sleep(2)
print(2)


@asyncio.coroutine
def func2():
print(3)
# 遇到IO耗时操作,自动化切换到tasks中的其他任务
yield from asyncio.sleep(2)
print(4)


tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

运行结果:

1
2
3
4
5
6
7
8
/Users/kaka/Documents/1.py:5: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
def func1():
/Users/kaka/Documents/1.py:13: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
def func2():
1
3
2
4

解释说明:

  • @asyncio.coroutine,是一个装饰器,可以将一个普通的函数,装饰成协程函数。
  • 其它的代码是执行协程函数用的,具体我们会在下文进行讨论。

async 和 awit

正如上文的运行结果,从3.8开始,@asyncio.coroutine就已经被弃用了。
我们用asyncawit实现协程。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio


async def func1():
print(1)
# 遇到IO耗时操作,自动化切换到tasks中的其他任务
await asyncio.sleep(2)
print(2)


async def func2():
print(3)
# 遇到IO耗时操作,自动化切换到tasks中的其他任务
await asyncio.sleep(2)
print(4)


tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

例子

我们举一个例子,下载三张图片。在下载第一张图片,发HTTP请求,这时候进入了一个IO等待,这时候会切换,去下第二张图片,即发第二个HTTP请求。

该例子需要使用第三方模块aiohttp

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import aiohttp
import asyncio


async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('/')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
print("下载完成:", file_name)


async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.kakawanyifan.com/-/1/09/07/000.jpg',
'https://www.kakawanyifan.com/-/1/09/07/001.png',
'https://www.kakawanyifan.com/-/1/09/07/002.png'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)


if __name__ == '__main__':
asyncio.run(main())

运行结果:

1
2
3
4
5
6
发送请求: https://www.kakawanyifan.com/-/1/09/07/000.jpg
发送请求: https://www.kakawanyifan.com/-/1/09/07/001.png
发送请求: https://www.kakawanyifan.com/-/1/09/07/002.png
下载完成: 001.png
下载完成: 002.png
下载完成: 000.jpg

详解

我们对asyncio模块、async关键字和await关键字进行更详细的讲解。

事件循环

事件循环,可以把他当做是一个死循环,这个死循环在周期性的运行并执行一些任务,但会在特定条件下终止。

用伪代码描述如下:

1
2
3
4
5
6
7
8
任务列表 = [ 任务1, 任务2, 任务3,... ]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果 任务列表 中的任务都已完成,则终止循环

在编写程序时候可以通过如下代码来获取和创建事件循环:

1
2
3
4
5
import asyncio
# 生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(【任务】)

《基于Java的后端开发入门:6.网络编程》,我们讨论过NIO,其思路比较接近。

快速入门

协程对象

  • 协程函数:定义形式为async def的函数。
    1
    2
    async def func():
    pass
  • 协程对象:调用协程函数所返回的对象。
    1
    result = func()
    注意:调用协程函数时,函数内部代码不会执行,只是会返回一个协程对象。

如果想要执行协程函数的内部代码,需要事件循环协程对象配合才能实现。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio


async def func():
print("协程内部代码")


# 调用协程函数,返回一个协程对象。
result = func()

# 创建一个事件循环
loop = asyncio.get_event_loop()
# 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。
loop.run_until_complete(result)

asyncio.run

asyncio.run(result),这种方法也可以。asyncio.run是在3.7版本中加入的:
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio


async def func():
print("协程内部代码")


# 调用协程函数,返回一个协程对象。
result = func()

# 创建一个事件循环
# 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止。
# asyncio.run 函数在 Python 3.7 中加入 asyncio 模块,
asyncio.run(result)

特别的,我们可以点进run方法,查看其源码。会发现是在run中实现了asyncio.get_event_looprun_until_complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
events.set_event_loop(loop)
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()

await

  • 语法格式:await 【可等待对象】
    【可等待的对象】有三种,协程对象、Future、Task对象。
  • 作用:等待对象的值,得到结果后往下走。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio


async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'


async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
response1 = await others()
print("IO请求结束,结果为:", response1)
response2 = await others()
print("IO请求结束,结果为:", response2)


asyncio.run(func())

运行结果:

1
2
3
4
5
6
7
执行协程函数内部代码
start
end
IO请求结束,结果为: 返回值
start
end
IO请求结束,结果为: 返回值

await是一个只能在协程函数中使用的关键字,用于遇到IO操作时挂起当前协程(任务),当前协程(任务)挂起过程中事件循环可以去执行其他的协程(任务),当前协程IO处理完成时,可以再次切换回来执行await之后的代码。

Task对象

asyncio.create_task

在我们上文的代码中,有这么一行asyncio.create_task(协程对象),其作用是创建Task对象,并且立即添加到事件循环的任务列表中,等待被调度执行。
asyncio.create_task()是在3.7中被加入。在3.7之前,可以改用低层级的asyncio.ensure_future()或者loop.create_task()

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio


async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"


async def main():
print("main开始")
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task1 = asyncio.create_task(func())
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task2 = asyncio.create_task(func())
await asyncio.sleep(5)
print("main结束")
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待相对应的协程全都执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)


asyncio.run(main())

运行结果:

1
2
3
4
5
6
7
main开始
1
1
2
2
main结束
返回值 返回值

特别的,如果我们改成这种方式呢?
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio


async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"


if __name__ == '__main__':
print("main开始")
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task1 = asyncio.create_task(func())
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task2 = asyncio.create_task(func())
print("main结束")
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待相对应的协程全都执行完毕并获取结果
ret1 = task1
ret2 = task2
print(ret1, ret2)

运行结果:

1
RuntimeError: no running event loop

no running event loop,我们讨论过,asyncio.run中实现了asyncio.get_event_looprun_until_complete
也就是说asyncio.run(main())创建了一个event loop,然后asyncio.create_task(func())会将Task对象加入其event loop

asyncio.wait

上文,我们有这里两行代码:

1
2
ret1 = await task1
ret2 = await task2·

如果我们的Task很多的话,要写很多遍,所以我们可以将其放在一个列表中。但是,我们不能直接await 一个列表,需要用asyncio.wait进行封装。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio


async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"


async def main():
print("main开始")
# 创建协程,将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
# 在调用
task_list = [
asyncio.create_task(func(), name="n1"),
asyncio.create_task(func(), name="n2")
]
await asyncio.sleep(5)
print("main结束")
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
# 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
done, pending = await asyncio.wait(task_list, timeout=None)
print(done, pending)


asyncio.run(main())

运行结果:

1
2
3
4
5
6
7
main开始
1
1
2
2
main结束
{<Task finished name='n1' coro=<func() done, defined at /Users/kaka/Documents/auto-work/1.py:4> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at /Users/kaka/Documents/auto-work/1.py:4> result='返回值'>} set()

asyncio.wait内部会对列表中的每个协程对象执行asyncio.ensure_future从而封装为Task对象,所以在和asyncio.wait配合使用时,task_list的值为[func(),func()]也是可以的。所以,我们还可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio


async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。
response = await asyncio.sleep(2)
print("IO请求结束,结果为:", response)


coroutine_list = [func(), func()]
# 错误:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
# 此处不能直接 asyncio.create_task,因为将Task立即加入到事件循环的任务列表,
# 但此时事件循环还未创建,所以会报错。
# 使用asyncio.wait将列表封装为一个协程,并调用asyncio.run实现执行两个协程
# asyncio.wait内部会对列表中的每个协程执行ensure_future,封装为Task对象。
done, pending = asyncio.run(asyncio.wait(coroutine_list))
print(done)
print(pending)

asyncio.Future对象

Task继承asyncio.Future,Task对象内部的await结果的处理,是基于asyncio.Future来的,具体是成员变量_state
通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪。

该代码不会结束,会一直等下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio


async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# # 创建一个任务(Future对象),这个任务什么都不干。
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果则会一直等下去。
await fut


asyncio.run(main())

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio


async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")


async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
fut = loop.create_future()
# 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
# 即手动设置future任务的最终结果,那么fut就可以结束了。
await loop.create_task(set_after(fut))
# 等待 Future对象获取 最终结果,否则一直等下去
data = await fut
print(data)


asyncio.run(main())

运行结果:

1
666

特别的,我们可以看set_result的源码,self._state = _FINISHED,对成员变量_state进行了赋值。

1
2
3
4
5
6
7
8
9
10
11
def set_result(self, result):
"""Mark the future done and set its result.

If the future is already done when this method is called, raises
InvalidStateError.
"""
if self._state != _PENDING:
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()

uvloop

uvloop是一个第三方的包,是asyncio中的事件循环的替代方案,替换后可以使得asyncio性能提高。

在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做:

1
2
3
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

其他的代码与上文的代码一致,内部的事件循环自动化会变为uvloop

多进程

多进程不支持Windows

Python中的multiprocess提供了Process类,实现进程相关的功能。
但是它基于fork机制,想要在Windows中运行,只能使用if __name__ == '__main__':的方式。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import os
import multiprocessing


def foo(i):
# 同样的参数传递方法
print("这里是 ", multiprocessing.current_process().name)
print('模块名称:', __name__)
# 获取父进程ID
print('父进程 ID:', os.getppid())
# 获取自己的进程ID
print('当前子进程 ID:', os.getpid())
print()


if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=foo, args=(i,))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
这里是  Process-1
模块名称: __mp_main__
父进程 ID: 11053
当前子进程 ID: 11056

这里是 Process-3
模块名称: __mp_main__
父进程 ID: 11053
当前子进程 ID: 11058

这里是 Process-5
模块名称: __mp_main__
父进程 ID: 11053
当前子进程 ID: 11060

这里是 Process-2
模块名称: __mp_main__
父进程 ID: 11053
当前子进程 ID: 11057

这里是 Process-4
模块名称: __mp_main__
父进程 ID: 11053
当前子进程 ID: 11059

进程间的数据共享

进程间的数据共享,即进程通信。

现象

每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的。
我们尝试用一个全局列表来实现进程间的数据共享,可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process

lis = []


def foo(i):
lis.append(i)
print("This is Process ", i, " and lis is ", lis, " and lis.address is ", id(lis))


if __name__ == '__main__':
for i in range(5):
p = Process(target=foo, args=(i,))
p.start()
print("The end of list_1:", lis)

运行结果:

1
2
3
4
5
6
The end of list_1: []
This is Process 0 and lis is [0] and lis.address is 140520551137856
This is Process 1 and lis is [1] and lis.address is 140208487580224
This is Process 3 and lis is [3] and lis.address is 140253920281152
This is Process 2 and lis is [2] and lis.address is 140627111625280
This is Process 4 and lis is [4] and lis.address is 140435884917312

共享数据不支持MacOS

想要在进程之间进行数据共享可以使用QueuesArrayManager这三个multiprocess模块提供的类,但是根据实测,在MacOS上不支持。

使用Array共享数据

入门案例

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
from multiprocessing import Array


def func(i_val, temp_val):
temp_val[0] += 100
print("进程%s " % i_val, ' 修改数组第一个元素后 -->', temp_val[0])


if __name__ == '__main__':
temp = Array('i', [1, 2, 3, 4])
for i in range(10):
p = Process(target=func, args=(i, temp))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
进程0   修改数组第一个元素后 --> 101
进程3 修改数组第一个元素后 --> 201
进程7 修改数组第一个元素后 --> 401
进程2 修改数组第一个元素后 --> 301
进程6 修改数组第一个元素后 --> 501
进程1 修改数组第一个元素后 --> 601
进程9 修改数组第一个元素后 --> 701
进程4 修改数组第一个元素后 --> 801
进程8 修改数组第一个元素后 --> 901
进程5 修改数组第一个元素后 --> 1001

注意!Array('i', [1, 2, 3, 4])。其中i的含义是说,这个Array中都是int类型。

必须预先指定元素,或者指定数组的长度

Array内的元素可以预先指定,也可以只指定数组的长度。
例如,指定数组长度,Array('i', 5)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
from multiprocessing import Array


def func(i_val, temp_val):
temp_val[0] += 1
print("进程%s " % i_val, ' 修改数组第一个元素后 -->', temp_val[0])


if __name__ == '__main__':
temp = Array('i', 5)
for i in range(10):
p = Process(target=func, args=(i, temp))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
进程1   修改数组第一个元素后 --> 1
进程2 修改数组第一个元素后 --> 2
进程0 修改数组第一个元素后 --> 3
进程4 修改数组第一个元素后 --> 4
进程3 修改数组第一个元素后 --> 5
进程6 修改数组第一个元素后 --> 6
进程5 修改数组第一个元素后 --> 7
进程8 修改数组第一个元素后 --> 8
进程9 修改数组第一个元素后 --> 9
进程7 修改数组第一个元素后 --> 10

但是,不能既不预先指定元素,又不指定数组的大小。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
from multiprocessing import Array


def func(i_val, temp_val):
temp_val[0] += 1
print("进程%s " % i_val, ' 修改数组第一个元素后 -->', temp_val[0])


if __name__ == '__main__':
temp = Array('i')
for i in range(10):
p = Process(target=func, args=(i, temp))
p.start()

运行结果:

1
2
3
4
Traceback (most recent call last):
File "/home/kaka/PycharmProjects/pythonProject/main.py", line 11, in <module>
temp = Array('i')
TypeError: BaseContext.Array() missing 1 required positional argument: 'size_or_initializer'

Arrya中元素的类型

除了用i,表示Array数组中的内容必须是int类型。还有其他很多选项。

  • c: ctypes.c_char
    u: ctypes.c_wchar
  • b: ctypes.c_byte
    B: ctypes.c_ubyte
  • h: ctypes.c_short
    H: ctypes.c_ushort
  • i: ctypes.c_int
    I: ctypes.c_uint
  • l: ctypes.c_long
    L: ctypes.c_ulong
  • f: ctypes.c_float
  • d: ctypes.c_double

使用Queue类共享数据

入门案例

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
from multiprocessing import Queue


def func(i_val, q_val):
ret = q_val.get()
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i_val, ret, i_val))
q_val.put(i_val)


if __name__ == "__main__":
lis = Queue(20)
lis.put(0)
for i in range(10):
p = Process(target=func, args=(i, lis,))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
进程1从队列里获取了一个0,然后又向队列里放入了一个1
进程7从队列里获取了一个1,然后又向队列里放入了一个7
进程3从队列里获取了一个7,然后又向队列里放入了一个3
进程5从队列里获取了一个3,然后又向队列里放入了一个5
进程9从队列里获取了一个5,然后又向队列里放入了一个9
进程2从队列里获取了一个9,然后又向队列里放入了一个2
进程0从队列里获取了一个2,然后又向队列里放入了一个0
进程4从队列里获取了一个0,然后又向队列里放入了一个4
进程6从队列里获取了一个4,然后又向队列里放入了一个6
进程8从队列里获取了一个6,然后又向队列里放入了一个8

multiprocessing.queues.Queue

有些资料会介绍另一个Queue,位于multiprocessing.queues.Queue
位置不同,使用方法略有不同。

  • multiprocessing.QueueQueue(20)
  • multiprocessing.queues.QueueQueue(20, ctx=multiprocessing)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue


def func(i_val, q_val):
ret = q_val.get()
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i_val, ret, i_val))
q_val.put(i_val)


if __name__ == "__main__":
lis = Queue(20, ctx=multiprocessing)
lis.put(0)
for i in range(10):
p = Process(target=func, args=(i, lis,))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
进程0从队列里获取了一个0,然后又向队列里放入了一个0
进程2从队列里获取了一个0,然后又向队列里放入了一个2
进程3从队列里获取了一个2,然后又向队列里放入了一个3
进程1从队列里获取了一个3,然后又向队列里放入了一个1
进程6从队列里获取了一个1,然后又向队列里放入了一个6
进程4从队列里获取了一个6,然后又向队列里放入了一个4
进程5从队列里获取了一个4,然后又向队列里放入了一个5
进程7从队列里获取了一个5,然后又向队列里放入了一个7
进程9从队列里获取了一个7,然后又向队列里放入了一个9
进程8从队列里获取了一个9,然后又向队列里放入了一个8

使用Manager共享数据

入门案例

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
from multiprocessing import Manager


def func(i_val, dic_val):
dic_val[100 + i_val] = 100 + i_val
print(dic_val.items())


if __name__ == '__main__':
dic = Manager().dict()
for i in range(10):
p = Process(target=func, args=(i, dic))
p.start()
p.join()

运行结果:

1
2
3
4
5
6
7
8
9
10
[(100, 100)]
[(100, 100), (101, 101)]
[(100, 100), (101, 101), (102, 102)]
[(100, 100), (101, 101), (102, 102), (103, 103)]
[(100, 100), (101, 101), (102, 102), (103, 103), (104, 104)]
[(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105)]
[(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106)]
[(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106), (107, 107)]
[(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106), (107, 107), (108, 108)]
[(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106), (107, 107), (108, 108), (109, 109)]

原因分析

注意,上文的例子中,有一行代码p.join(),如果我们把这行代码注释呢?

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
from multiprocessing import Manager


def func(i_val, dic_val):
dic_val[100 + i_val] = 100 + i_val
print(dic_val.items())


if __name__ == '__main__':
dic = Manager().dict()
for i in range(10):
p = Process(target=func, args=(i, dic))
p.start()
# p.join()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/home/kaka/PycharmProjects/pythonProject/venv/bin/python /home/kaka/PycharmProjects/pythonProject/main.py 
[(100, 100)]
[(100, 100), (104, 104)]
[(100, 100), (104, 104), (103, 103)]
[(100, 100), (104, 104), (103, 103), (102, 102)]
[(100, 100), (104, 104), (103, 103), (102, 102), (101, 101)]
[(100, 100), (104, 104), (103, 103), (102, 102), (101, 101), (106, 106)]
[(100, 100), (104, 104), (103, 103), (102, 102), (101, 101), (106, 106), (105, 105)]
[(100, 100), (104, 104), (103, 103), (102, 102), (101, 101), (106, 106), (105, 105), (107, 107)]
Process Process-10:
Process Process-11:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/managers.py", line 810, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

【部分运行结果略】

ConnectionResetError: [Errno 104] Connection reset by peer
File "/usr/lib/python3.10/multiprocessing/managers.py", line 810, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

【部分运行结果略】

ConnectionResetError: [Errno 104] Connection reset by peer

报错了!
为什么会报错?
p.join()的作用是什么?
进程中的join()方法,我们没讨论过。但是线程中的join()方法我们讨论过,只有这个线程死亡了,其他线程才可以执行。
例如,主线程调用子线程的.join()方法,那么主线程会阻塞,直到子线程结束后继续运行。

上文,我们主进程会不等子进程结束,就立马结束。
构造方法Manager()返回的manager对象会提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。

其他支持

Manager的对象支持:listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray等多种格式。

现在,我们来做一个事情,用多进程,从1加到100。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing


def add_to_value(addend, value):
value.value += addend


with multiprocessing.Manager() as manager:
value = manager.Value(float, 0.0)
with multiprocessing.Pool(2) as pool:
pool.starmap(add_to_value,
[(float(i), value) for i in range(100)])
print(value.value)

运行结果:

1
3435.0

结果不对!
这个肯定是出现了进程不安全,加锁就够了。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing


def add_to_value(addend, value, lock):
with lock:
value.value += addend


if __name__ == '__main__':
with multiprocessing.Manager() as manager:
lock = manager.Lock()
value = manager.Value(float, 0.0)
with multiprocessing.Pool(2) as pool:
pool.starmap(add_to_value,
[(float(i), value, lock) for i in range(100)])
print(value.value)

运行结果:

1
4950.0

注意两段代码的add_to_value(addend, value, lock)这个方法。

进程锁

为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。
threading类似,在multiprocessing里也有同名的锁类RLockLockEventConditionSemaphore,连用法都是一样样的。

根据实测,进程锁不支持MacOS。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from multiprocessing import Array
from multiprocessing import Process
from multiprocessing import RLock


def func(i_val, lis_val, lc_val):
lc_val.acquire()
lis_val[0] = lis_val[0] - 1
time.sleep(1)
print('say hi', lis_val[0])
lc_val.release()


if __name__ == "__main__":
array = Array('i', 1)
array[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=func, args=(i, array, lock))
p.start()

运行结果:

1
2
3
4
5
6
7
8
9
10
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0

进程池Pool类

进程启动的开销很大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。

根据实测,进程池Pool类支持MacOS。

Python内置了一个进程池

1
from multiprocessing import Pool

进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中常用的方法:

  1. apply(),申请进程(同步执行,即串行)
  2. apply_async(),申请进程(异步执行,即并行)
  3. terminate(),立刻关闭进程池
  4. join(),主进程等待所有子进程执行完毕。必须在close()terminate()之后。
  5. close(),等待所有进程结束后,才关闭进程池。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Pool
import time


def func(args):
time.sleep(1)
print("正在执行进程 ", args)


if __name__ == '__main__':

# 创建一个包含5个进程的进程池
p = Pool(5)

for i in range(30):
p.apply_async(func=func, args=(i,))

# 等子进程执行完毕后关闭进程池
p.close()
# time.sleep(2)
# 立刻关闭进程池
# p.terminate()
p.join()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
正在执行进程  1
正在执行进程 0
正在执行进程 2
正在执行进程 4
正在执行进程 3

【部分运行结果略】

正在执行进程 27
正在执行进程 26
正在执行进程 25
正在执行进程 28
正在执行进程 29

子进程结果汇总

多进程,最常见的一个应用。就是将一个任务拆分多个子任务,交由不同的进程去处理,最后把结果汇总完成后,主进程继续。
在这里举一个例子。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import random
import time
import multiprocessing


def worker(name_worker, q_worker):
t = 0
for index in range(10):
print(name_worker + " " + str(index))
x = random.randint(1, 3)
t += x
time.sleep(x * 0.1)
q_worker.put({name_worker: t})


q = multiprocessing.Queue()
jobs = []
for i in range(10):
p = multiprocessing.Process(target=worker, args=(str(i), q))
jobs.append(p)
p.start()

for p in jobs:
p.join()

results = [q.get() for j in jobs]
print(results)

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
1 0
2 0
5 0
0 0

【部分运行结果略】

9 8
3 9
7 9
9 9
2 9
[{'6': 15}, {'1': 18}, {'4': 19}, {'8': 19}, {'5': 22}, {'0': 22}, {'2': 23}, {'9': 23}, {'3': 24}, {'7': 24}]

joblib

在实际工作中,我们一般不会手动实现一个多进程,而是直接用第三方的包joblib

除了基本的多进程功能,joblib还对磁盘的读取和大型Numpy数组进行了优化。

而且!Linux、MacOS和Windows,三个系统都支持。

关于joblib,可以参考《未分类【计算机】:Kaggle中技术问题的解决方案》进行讨论。

进程和线程的区别

  • 进程是CPU资源分配中的最小单元
  • 线程是被CPU调度的最小单元
    一个进程中可能会有多个线程
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/10904
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

评论区