多线程
关于什么是多线程,可以参考《基于Java的后端开发入门:7.多线程 [1/2]》 ,这里不赘述。
threading
在Python3中,通过threading
模块提供线程的功能。
原来的thread
模块已废弃,但是threading
模块中有个Thread
类(T大写),这个类是模块中最主要的线程类,这两个要分清楚。
threading
的常用方法和属性:
current_thread()
:返回当前线程。
active_count()
:返回当前活跃的线程数。
get_ident()
:返回当前线程ID
enumerate()
:返回当前活动Thread对象列表
main_thread()
:返回主Thread对象
settrace(func)
:为所有线程设置一个trace函数
setprofile(func)
:所有线程设置一个profile函数
stack_size([size])
:返回新创建线程栈大小;或为后续创建的线程设定栈大小为size
TIMEOUT_MAX
:Lock.acquire()
、RLock.acquire()
、Condition.wait()
允许的最大超时时间
示例代码:
1 2 3 4 5 6 7 import threadingprint(threading.current_thread()) print(threading.active_count()) print(threading.get_ident()) print(threading.enumerate()) print(threading.main_thread())
运行结果:
1 2 3 4 5 <_MainThread(MainThread, started 140704355178688)> 1 140704355178688 [<_MainThread(MainThread, started 140704355178688)>] <_MainThread(MainThread, started 140704355178688)>
threading模块包含下面的类:
Thread
:基本线程类。
Lock
:互斥锁。
RLock
:可重入锁,使单一进程再次获得已持有的锁(递归锁)。
Condition
:条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。
Semaphore
:信号锁,为线程间共享的有限资源提供一个"计数器",如果没有可用资源则会被阻塞。
Event
:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活。
Timer
:一种计时器。
Barrier
:必须达到指定数量的线程后才可以继续执行。
创建线程
有两种方式来创建线程:
继承threading.Thread
类,并重写run()
方法。
实例化threading.Thread
对象的时候,将线程要执行的任务函数作为参数传入线程。
继承threading.Thread
类,并重写run()
方法。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import threadingimport timeclass MyThread (threading.Thread) : def __init__ (self, thread_name) : super(MyThread, self).__init__(name=thread_name,msg=msg) def run (self) : time.sleep(1 ) print("%s正在运行中......" % self.name) if __name__ == '__main__' : for i in range(10 ): MyThread("thread-" + str(i)).start()
运行结果:
1 2 3 4 5 6 7 8 9 10 thread-0正在运行中......thread-2正在运行中...... thread-3正在运行中......thread-7正在运行中...... thread-6正在运行中...... thread-8正在运行中...... thread-5正在运行中...... thread-1正在运行中...... thread-9正在运行中...... thread-4正在运行中......
实例化threading.Thread
对象的时候,将线程要执行的任务函数作为参数传入线程。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 import threadingimport timedef show (arg) : time.sleep(1 ) print('thread ' + str(arg) + " running...." ) if __name__ == '__main__' : for i in range(10 ): t = threading.Thread(target=show, args=(i,)) t.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 thread 3 running....thread 1 running.... thread 2 running.... thread 0 running.... thread 5 running.... thread 7 running.... thread 9 running.... thread 4 running....thread 8 running.... thread 6 running....
我们还可以点开看一下threading.Thread
的构造方法,参数如下:
group
:暂时无用,未来功能的预留参数。
target
:执行的目标任务名。
args
:以元组的方式给执行任务传参。
kwargs
:以字典方式给执行任务传参。
name
:线程名,一般不用设置。
threading.Thread
的常用方法和属性:
start()
:启动线程,等待CPU调度。
run()
:线程被CPU调度后自动执行的方法。
getName()
、setName()
、name
:用于获取和设置线程的名称。
setDaemon()
:被标记为守护线程的线程,在主线程结束的时候,也会结束运行。
ident
:获取线程的ID。
is_alive()
:判断线程是否是激活的(alive)。
isDaemon()
:判断现场是否为守护线程。
join([timeout])
:只有这个线程死亡了,其他线程才可以执行。参数timeout
是一个数值类型,表示超时时间,如果未提供该参数,那么其他线程将一直堵塞到被调线程结束。
传递参数
示例代码:
1 2 3 4 5 6 7 8 9 10 import threadingdef p (msg) : print(msg) if __name__ == '__main__' : pt = threading.Thread(target=p, args=('信息' ,)) pt.start()
运行结果:
解释说明:我们是以元组的方式传递参数,如果定义的元组只有一个数据,那么这个数据后面也要添加逗号。
特别的,如果我们继承threading.Thread
类,并重写run()
方法,这时候怎么传递参数呢?
可以参考《基于Java的后端开发入门:7.多线程 [1/2]》 ,用成员变量的方式。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import threadingclass MyThread (threading.Thread) : def __init__ (self, thread_name, msg) : super(MyThread, self).__init__(name=thread_name) self.msg = msg def run (self) : print(self.name) print(self.msg) if __name__ == '__main__' : MyThread("thread-zi" , '信息' ).start()
运行结果:
线程控制
join
如果没有join,主线程会立即结束。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import timeimport threadingdef waiting () : print('start waiting:' , time.strftime('%H:%M:%S' )) time.sleep(3 ) print('stop waiting' , time.strftime('%H:%M:%S' )) t = threading.Thread(target=waiting) t.start() time.sleep(1 ) print('start join' ) print('end join' )
运行结果:
1 2 3 4 start waiting: 13:53:30 start join end join stop waiting 13:53:33
如果有了join,会将主线程阻塞,直到子线程结束后继续运行。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import timeimport threadingdef waiting () : print('start waiting:' , time.strftime('%H:%M:%S' )) time.sleep(3 ) print('stop waiting' , time.strftime('%H:%M:%S' )) t = threading.Thread(target=waiting) t.start() time.sleep(1 ) print('start join' ) t.join() print('end join' )
运行结果:
1 2 3 4 start waiting: 13:55:39 start join stop waiting 13:55:42 end join
setDaemon
如果没有setDaemon,或者设置为False
。当主线程结束后,子线程会继续运行。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import threadingimport timedef run () : print(threading.current_thread().getName() + "开始工作" ) time.sleep(2 ) print("子线程工作完毕" ) for i in range(3 ): t = threading.Thread(target=run, name="thread-" + str(i)) t.start() time.sleep(1 ) print("主线程结束了!" ) print(threading.active_count())
运行结果:
1 2 3 4 5 6 7 8 thread-0开始工作 thread-1开始工作 thread-2开始工作 主线程结束了! 4 子线程工作完毕 子线程工作完毕 子线程工作完毕
如果设置了setDaemon,且值为True
,在主线程结束的时候,也会结束运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import threadingimport timedef run () : print(threading.current_thread().getName() + "开始工作" ) time.sleep(2 ) print("子线程工作完毕" ) for i in range(3 ): t = threading.Thread(target=run, name="thread-" + str(i)) t.setDaemon(True ) t.start() time.sleep(1 ) print("主线程结束了!" ) print(threading.active_count())
运行结果:
1 2 3 4 5 thread-0开始工作thread-1开始工作 thread-2开始工作 主线程结束了! 4
线程锁
线程安全
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import threadingimport timenumber = 0 def plus () : global number for _ in range(1000000 ): number = number + 1 print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number)) for i in range(2 ): t = threading.Thread(target=plus) t.start() time.sleep(5 ) print("主线程执行完毕后,number = " , number)
运行结果:
1 2 3 子线程Thread-1运算结束后,number = 1152193 子线程Thread-2运算结束后,number = 1231093 主线程执行完毕后,number = 1231093
结果并不等于2000000,可以很明显地看出脏数据的情况。这是因为两个线程在运行过程中,CPU随机调度,你算一会我算一会,在没有对number进行保护的情况下,就发生了数据错误。
如果想获得正确结果,可以使用join()方法,让多线程变成顺序执行。示例代码:
1 2 3 4 for i in range(2 ): t = threading.Thread(target=plus) t.start() t.join()
但,这么做,其实是让多线程变成了单线程,属于因噎废食的做法。正确的做法是使用线程锁。
threading模块中定义了几种线程锁类,分别是:
Lock:互斥锁
RLock:可重入锁
Semaphore:信号
Event:事件
Condition:条件
Barrier:阻碍
Lock(互斥锁)
Lock(互斥锁),是一种独占锁,同一时刻只有一个线程可以访问共享的数据。
使用很简单,初始化锁对象,然后将锁当做参数传递给任务函数,在任务中加锁,使用后释放锁。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import threadingimport timenumber = 0 lock = threading.Lock() def plus (lk) : global number for _ in range(1000000 ): lk.acquire() number += 1 lk.release() print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number)) if __name__ == '__main__' : for i in range(2 ): t = threading.Thread(target=plus, args=(lock,)) t.start() time.sleep(5 ) print("主线程执行完毕后,number = " , number)
运行结果:
1 2 3 子线程Thread-1运算结束后,number = 1887869 子线程Thread-2运算结束后,number = 2000000 主线程执行完毕后,number = 2000000
RLock
的使用方法和Lock
一模一样,只不过它支持重入锁,其特点有:
锁对象内部维护着一个Lock
和一个counter
对象,counter
对象记录了acquire
的次数。
同一个线程中,RLock.acquire()
可以被多次调用,利用该特性,可以解决部分死锁问题。
Semaphore(信号)
类名:BoundedSemaphore
。
这种锁允许一定数量的线程同时更改数据, 不是互斥锁 。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timeimport threadingdef run (n, se) : se.acquire() print("run the thread: %s" % n) time.sleep(5 ) se.release() semaphore = threading.BoundedSemaphore(5 ) for i in range(20 ): t = threading.Thread(target=run, args=(i, semaphore)) t.start()
Event(事件线程锁)
类名:Event
。
Event(事件线程锁)的运行机制:全局定义了一个Flag,如果Flag的值为False,那么当程序执行wait()方法时就会阻塞,如果Flag值为True,线程不再阻塞。这种锁,类似交通红绿灯(默认是红灯),它属于在红灯的时候一次性阻挡所有线程,在绿灯的时候,一次性放行所有排队中的线程。
主要方法如下:
set()
:调用set()方法会将Flag设置为True。
wait()
:调用wait()方法将等待"红绿灯"信号。
clear()
:调用clear()方法会将Flag设置为False。
is_set()
:判断当前是否"绿灯放行"状态
下面是一个模拟红绿灯,然后汽车通行的例子,示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import threadingimport timeevent = threading.Event() def lighter () : green_time = 5 red_time = 5 event.set() while True : print("绿灯亮..." ) time.sleep(green_time) event.clear() print("红灯亮..." ) time.sleep(red_time) event.set() def run (name) : while True : if event.is_set(): print("[%s]通过..." % name) time.sleep(1 ) else : print("[%s]停下..." % name) event.wait() print("[%s]继续..." % name) if __name__ == '__main__' : light = threading.Thread(target=lighter, ) light.start() for name in ['奔驰' , '宝马' , '奥迪' ]: car = threading.Thread(target=run, args=(name,)) car.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 绿灯亮...[奔驰]通过... [宝马]通过... [奥迪]通过... [宝马]通过...[奥迪]通过...[奔驰]通过... [奔驰]通过... [奥迪]通过... [宝马]通过... [奔驰]通过...[奥迪]通过... [宝马]通过... [奥迪]通过... [奔驰]通过... [宝马]通过... 红灯亮... [奔驰]停下... [奥迪]停下... [宝马]停下... 绿灯亮... [奔驰]继续... [奔驰]通过... [宝马]继续... [奥迪]继续... [奥迪]通过... [宝马]通过... [奥迪]通过...
Condition(条件锁)
类名:Condition
。
主要方法如下:
acquire()
:加锁。
release()
:解锁。
wait([timeout])
:使线程进入Condition的等待池,等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify()
:从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()
尝试获得锁。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll()
:通知等待池中所有的线程,这些线程都将尝试获得锁。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import threadingimport timenum = 0 con = threading.Condition() class Foo (threading.Thread) : def __init__ (self, name, action) : super(Foo, self).__init__() self.name = name self.action = action def run (self) : global num con.acquire() print("%s 开始执行..." % self.name) while True : if self.action == "add" : num += 1 elif self.action == 'reduce' : num -= 1 else : exit(1 ) print("num当前为:" , num) time.sleep(1 ) if num == 5 or num == 0 : print("%s 暂停执行" % self.name) con.notify() con.wait() print("%s 开始执行..." % self.name) con.release() if __name__ == '__main__' : a = Foo("线程A" , 'add' ) b = Foo("线程B" , 'reduce' ) a.start() b.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 线程A 开始执行... num当前为: 1 num当前为: 2 num当前为: 3 num当前为: 4 num当前为: 5 线程A 暂停执行 线程B 开始执行... num当前为: 4 num当前为: 3 num当前为: 2 num当前为: 1 num当前为: 0 线程B 暂停执行 线程A 开始执行... num当前为: 1 num当前为: 2 num当前为: 3 num当前为: 4 num当前为: 5 线程A 暂停执行 线程B 开始执行... num当前为: 4 num当前为: 3 num当前为: 2 num当前为: 1 num当前为: 0
Timer(定时器)
概述
Timer是Thread的子类,是一个定时器功能的类,可以在指定秒之后执行某个方法。
使用方法
1 timer = threading.Timer(interval, function, args=None , kwargs=None )
参数:
interval
:经过多少秒调用函数,单位秒。
不断调用则在目标函数末尾调用该方法。
function
:调用的目标函数。
args=None
:传递到目标函数的位置参数。
kwargs=None
:传递到目标函数的关键字参数。
例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threadingimport timeexec_count = 0 def start () : print("hello world" , exec_count) def heart_beat () : print(time.strftime('%Y-%m-%d %H:%M:%S' )) global exec_count exec_count += 1 if exec_count < 15 : start() threading.Timer(5 , heart_beat).start() if __name__ == '__main__' : heart_beat()
通过with语句使用线程锁
所有的线程锁都有一个加锁和释放锁的动作,非常类似文件的打开和关闭。在加锁后,如果线程执行过程中出现异常或者错误,没有正常的释放锁,那么其他的线程会造到致命性的影响。通过with上下文管理器,可以确保锁被正常释放。
其格式如下:
这相当于:
1 2 3 4 5 some_lock.acquire() try : finally : some_lock.release()
全局解释器锁(GIL)
什么是全局解释器锁(GIL)
在大多数环境中,单核CPU情况下,本质上某一时刻只能有一个线程被执行。多核CPU时则可以支持多个线程同时执行。
但是在Python中,无论CPU有多少核,同时只能执行一个线程。
这是由于GIL的存在导致的。
GIL的全称是Global Interpreter Lock,全局解释器锁,是Python设计之初为了数据安全所做的决定。Python中的某个线程想要执行,必须先拿到GIL。可以把GIL看作是执行任务的"通行证",并且在一个Python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。
不过,GIL只在CPython解释器中才有,在PyPy和JPython中没有GIL。
但是,正如我们在《5.Python [1/2]》 的开头所说的,CPython,是官方的解释器。
也正因为这个特性,Python针对不同类型的任务,多线程执行效率是不同的:
Python下的多线程对CPU密集型任务并不友好。
Python的多线程对IO密集型任务比较友好。
如果是双线程的CPU呢?
一个刁钻的问题,如果是那种四核八线程的那种CPU。
Python可以用两个线程吗?
不能。
对于四核八线程的CPU,在软件上就理解成是八核CPU。
如何充分利用多核CPU
根据上文的讨论,如何在Python中想要充分利用多核CPU?
多进程
这一每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行。
那么,怎么同时启用多个进程呢?
同时跑多个Python应用,就有多进程了。
(虽然在下文我们会讨论"多进程",但是那种方法在Windows系统中不支持。)
还有一种方法,协程。
我们也会在下文讨论。
最好的方法:多进程+协程
线程池
在Python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。
如下的线程池,可以参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 """ 一个基于thread和queue的线程池,以任务为队列元素,动态创建线程,重复利用线程, 通过close和terminate方法关闭线程池。 """ import queueimport threadingimport contextlibimport timeStopEvent = object() def callback (status, result) : """ 根据需要进行的回调函数,默认不执行。 :param status: action函数的执行状态 :param result: action函数的返回值 :return: """ pass def action (thread_name, arg) : """ 真实的任务定义在这个函数里 :param thread_name: 执行该方法的线程名 :param arg: 该函数需要的参数 :return: """ time.sleep(0.1 ) print("第%s个任务调用了线程 %s,并打印了这条信息!" % (arg + 1 , thread_name)) class ThreadPool : def __init__ (self, max_num, max_task_num=None) : """ 初始化线程池 :param max_num: 线程池最大线程数量 :param max_task_num: 任务队列长度 """ if max_task_num: self.q = queue.Queue(max_task_num) else : self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def put (self, func, args, callback=None) : """ 往任务队列里放入一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数 1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread (self) : """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call (self) : """ 循环去获取任务函数并执行任务函数。在正常情况下,每个线程都保存生存状态, 直到获取线程终止的flag。 """ current_thread = threading.currentThread().getName() self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try : result = func(current_thread, *arguments) success = True except Exception as e: result = None success = False if callback is not None : try : callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else : event = self.q.get() else : self.generate_list.remove(current_thread) def close (self) : """ 执行完所有的任务后,让所有线程都停止的方法 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate (self) : """ 在任务执行过程中,终止线程,提前退出。 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) @contextlib.contextmanager def worker_state (self, state_list, worker_thread) : """ 用于记录空闲的线程,或从空闲列表中取出线程处理任务 """ state_list.append(worker_thread) try : yield finally : state_list.remove(worker_thread) if __name__ == '__main__' : pool = ThreadPool(5 ) for i in range(100 ): pool.put(action, (i,), callback) time.sleep(3 ) print("-" * 50 ) print("\033[32;0m任务停止之前线程池中有%s个线程,空闲的线程有%s个!\033[0m" % (len(pool.generate_list), len(pool.free_list))) pool.close() print("任务执行完毕,正常退出!" )
协程
根据上文的讨论,我们知道:
对于CPU计算密集型任务由于GIL的存在通常使用多进程来实现。
对于IO密集型任务可以通过多线程,让线程在执行IO任务时让出GIL,从而实现表面上的并发。
但,对于IO密集型任务我们还有一种更好的选择:协程。
协程,也称微线程,Coroutine
,是运行在单线程中的"并发",协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。
什么是协程
协程的切换不同于线程切换,是由程序自身控制的,没有切换的开销。协程不需要多线程的锁机制,因为都是在同一个线程中运行,所以没有同时访问数据的问题,执行效率比多线程高很多。
我们可以这么简单的理解:
进程/线程:操作系统提供的一种并发处理任务的能力。
协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内的任务调度技巧。
下面举一个例子,甲,乙两个工人模拟两个工作任务交替进行,在单线程内实现了类似多线程的功能。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 rt = True def task1 () : while True : print("<甲>工作了一段时间....." ) print("<甲>需要休息,线程不切换,换<乙>" ) task2() def task2 () : global rt if rt: rt = False task1() print("<乙>工作了一段时间....." ) print("<乙>需要休息,线程不切换,换<甲>" ) if __name__ == '__main__' : task2()
运行结果:
1 2 3 4 5 6 7 <甲>工作了一段时间..... <甲>需要休息,线程不切换,换<乙> <乙>工作了一段时间..... <乙>需要休息,线程不切换,换<甲> <甲>工作了一段时间..... <甲>需要休息,线程不切换,换<乙> <乙>工作了一段时间.....
实现
在Python中有多种方式可以实现协程:
greenlet,是一个第三方模块,用于实现协程代码(Gevent协程就是基于greenlet实现)
yield,生成器,借助生成器的特点也可以实现协程代码。
asyncio,在Python3.4中引入的模块用于编写协程代码。
async & awiat,在Python3.5中引入的两个关键字,结合asyncio模块可以更方便的编写协程代码。
greenlet
greentlet
是一个第三方模块。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from greenlet import greenletdef func1 () : print(1 ) gr2.switch() print(2 ) gr2.switch() def func2 () : print(3 ) gr1.switch() print(4 ) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()
运行结果:
yield
基于Python的生成器的yield
和yield form
关键字实现协程代码。
一个函数中如果存在了yield
,就是一个生成器函数,返回的就是一个生成器。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def func1 () : yield 1 yield from func2() yield 2 def func2 () : yield 3 yield 4 f1 = func1() i = 1 for item in f1: print('第%s次遍历' % i) i = i + 1 print(item)
运行结果:
1 2 3 4 5 6 7 8 第1次遍历 1 第2次遍历 3 第3次遍历 4 第4次遍历 2
解释说明:
f1 = func1()
,即执行了一个生成器函数func1()
,返回了一个生成器f1
。
for item in f1:
对生成器进行遍历:
第一次遍历,yield 1
,返回1
。
第二次遍历,yield from func2()
,从func2()
进行yield
,yield 3
,返回3
。
第三次遍历,yield 4
,返回4
。
第四次遍历,yield 2
,返回2
。
asyncio
asyncio
,是3.4及的内置模块。
基于asyncio
实现的协程,相比上文的,优势在于,遇到IO耗时操作自动切换。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncio@asyncio.coroutine def func1 () : print(1 ) yield from asyncio.sleep(2 ) print(2 ) @asyncio.coroutine def func2 () : print(3 ) yield from asyncio.sleep(2 ) print(4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
运行结果:
1 2 3 4 5 6 7 8 /Users/kaka/Documents/1.py:5: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead def func1(): /Users/kaka/Documents/1.py:13: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead def func2(): 1 3 2 4
解释说明:
@asyncio.coroutine
,是一个装饰器,可以将一个普通的函数,装饰成协程函数。
其它的代码是执行协程函数用的,具体我们会在下文进行讨论。
async 和 awit
正如上文的运行结果,从3.8开始,@asyncio.coroutine
就已经被弃用了。
我们用async
和awit
实现协程。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioasync def func1 () : print(1 ) await asyncio.sleep(2 ) print(2 ) async def func2 () : print(3 ) await asyncio.sleep(2 ) print(4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
例子
我们举一个例子,下载三张图片。在下载第一张图片,发HTTP请求,这时候进入了一个IO等待,这时候会切换,去下第二张图片,即发第二个HTTP请求。
该例子需要使用第三方模块aiohttp
。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import aiohttpimport asyncioasync def fetch (session, url) : print("发送请求:" , url) async with session.get(url, verify_ssl=False ) as response: content = await response.content.read() file_name = url.rsplit('/' )[-1 ] with open(file_name, mode='wb' ) as file_object: file_object.write(content) print("下载完成:" , file_name) async def main () : async with aiohttp.ClientSession() as session: url_list = [ 'https://www.kakawanyifan.com/-/1/09/07/000.jpg' , 'https://www.kakawanyifan.com/-/1/09/07/001.png' , 'https://www.kakawanyifan.com/-/1/09/07/002.png' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == '__main__' : asyncio.run(main())
运行结果:
1 2 3 4 5 6 发送请求: https://www.kakawanyifan.com/-/1/09/07/000.jpg 发送请求: https://www.kakawanyifan.com/-/1/09/07/001.png 发送请求: https://www.kakawanyifan.com/-/1/09/07/002.png 下载完成: 001.png 下载完成: 002.png 下载完成: 000.jpg
详解
我们对asyncio
模块、async
关键字和await
关键字进行更详细的讲解。
事件循环
事件循环,可以把他当做是一个死循环,这个死循环在周期性的运行并执行一些任务,但会在特定条件下终止。
用伪代码描述如下:
1 2 3 4 5 6 7 8 任务列表 = [ 任务1, 任务2, 任务3,... ] while True: 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回 for 就绪任务 in 可执行的任务列表: 执行已就绪的任务 for 已完成的任务 in 已完成的任务列表: 在任务列表中移除 已完成的任务 如果 任务列表 中的任务都已完成,则终止循环
在编写程序时候可以通过如下代码来获取和创建事件循环:
1 2 3 4 5 import asyncioloop = asyncio.get_event_loop() loop.run_until_complete(【任务】)
快速入门
协程对象
协程函数
:定义形式为async def
的函数。
协程对象
:调用协程函数
所返回的对象。
注意:调用协程函数时,函数内部代码不会执行,只是会返回一个协程对象。
如果想要执行协程函数的内部代码,需要事件循环
和协程对象
配合才能实现。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def func () : print("协程内部代码" ) result = func() loop = asyncio.get_event_loop() loop.run_until_complete(result)
asyncio.run
asyncio.run(result)
,这种方法也可以。asyncio.run
是在3.7版本中加入的:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def func () : print("协程内部代码" ) result = func() asyncio.run(result)
特别的,我们可以点进run
方法,查看其源码。会发现是在run
中实现了asyncio.get_event_loop
和run_until_complete
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def run (main, *, debug=False) : if events._get_running_loop() is not None : raise RuntimeError( "asyncio.run() cannot be called from a running event loop" ) if not coroutines.iscoroutine(main): raise ValueError("a coroutine was expected, got {!r}" .format(main)) loop = events.new_event_loop() try : events.set_event_loop(loop) loop.set_debug(debug) return loop.run_until_complete(main) finally : try : _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally : events.set_event_loop(None ) loop.close()
await
语法格式:await 【可等待对象】
【可等待的对象】
有三种,协程对象、Future、Task对象。
作用:等待对象的值,得到结果后往下走。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioasync def others () : print("start" ) await asyncio.sleep(2 ) print('end' ) return '返回值' async def func () : print("执行协程函数内部代码" ) response1 = await others() print("IO请求结束,结果为:" , response1) response2 = await others() print("IO请求结束,结果为:" , response2) asyncio.run(func())
运行结果:
1 2 3 4 5 6 7 执行协程函数内部代码 start end IO请求结束,结果为: 返回值 start end IO请求结束,结果为: 返回值
await
是一个只能在协程函数中使用的关键字,用于遇到IO操作时挂起当前协程(任务),当前协程(任务)挂起过程中事件循环可以去执行其他的协程(任务),当前协程IO处理完成时,可以再次切换回来执行await之后的代码。
Task对象
asyncio.create_task
在我们上文的代码中,有这么一行asyncio.create_task(协程对象)
,其作用是创建Task对象,并且立即添加到事件循环的任务列表中,等待被调度执行。
asyncio.create_task()
是在3.7中被加入。在3.7之前,可以改用低层级的asyncio.ensure_future()
或者loop.create_task()
。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import asyncioasync def func () : print(1 ) await asyncio.sleep(2 ) print(2 ) return "返回值" async def main () : print("main开始" ) task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) await asyncio.sleep(5 ) print("main结束" ) ret1 = await task1 ret2 = await task2 print(ret1, ret2) asyncio.run(main())
运行结果:
1 2 3 4 5 6 7 main开始 1 1 2 2 main结束 返回值 返回值
特别的,如果我们改成这种方式呢?
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioasync def func () : print(1 ) await asyncio.sleep(2 ) print(2 ) return "返回值" if __name__ == '__main__' : print("main开始" ) task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) print("main结束" ) ret1 = task1 ret2 = task2 print(ret1, ret2)
运行结果:
1 RuntimeError: no running event loop
no running event loop
,我们讨论过,asyncio.run
中实现了asyncio.get_event_loop
和run_until_complete
。
也就是说asyncio.run(main())
创建了一个event loop
,然后asyncio.create_task(func())
会将Task对象加入其event loop
。
asyncio.wait
上文,我们有这里两行代码:
1 2 ret1 = await task1 ret2 = await task2·
如果我们的Task很多的话,要写很多遍,所以我们可以将其放在一个列表中。但是,我们不能直接await 一个列表,需要用asyncio.wait
进行封装。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioasync def func () : print(1 ) await asyncio.sleep(2 ) print(2 ) return "返回值" async def main () : print("main开始" ) task_list = [ asyncio.create_task(func(), name="n1" ), asyncio.create_task(func(), name="n2" ) ] await asyncio.sleep(5 ) print("main结束" ) done, pending = await asyncio.wait(task_list, timeout=None ) print(done, pending) asyncio.run(main())
运行结果:
1 2 3 4 5 6 7 main开始 1 1 2 2 main结束 {<Task finished name='n1' coro=<func() done, defined at /Users/kaka/Documents/auto-work/1.py:4> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at /Users/kaka/Documents/auto-work/1.py:4> result='返回值'>} set()
asyncio.wait
内部会对列表中的每个协程对象执行asyncio.ensure_future
从而封装为Task对象,所以在和asyncio.wait
配合使用时,task_list
的值为[func(),func()]
也是可以的。所以,我们还可以这么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def func () : print("执行协程函数内部代码" ) response = await asyncio.sleep(2 ) print("IO请求结束,结果为:" , response) coroutine_list = [func(), func()] done, pending = asyncio.run(asyncio.wait(coroutine_list)) print(done) print(pending)
asyncio.Future对象
Task继承asyncio.Future
,Task对象内部的await结果的处理,是基于asyncio.Future
来的,具体是成员变量_state
。
通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪。
该代码不会结束,会一直等下去。
1 2 3 4 5 6 7 8 9 10 11 12 13 import asyncioasync def main () : loop = asyncio.get_running_loop() fut = loop.create_future() await fut asyncio.run(main())
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioasync def set_after (fut) : await asyncio.sleep(2 ) fut.set_result("666" ) async def main () : loop = asyncio.get_running_loop() fut = loop.create_future() await loop.create_task(set_after(fut)) data = await fut print(data) asyncio.run(main())
运行结果:
特别的,我们可以看set_result
的源码,self._state = _FINISHED
,对成员变量_state
进行了赋值。
1 2 3 4 5 6 7 8 9 10 11 def set_result (self, result) : """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state} : {self!r} ' ) self._result = result self._state = _FINISHED self.__schedule_callbacks()
uvloop
uvloop是一个第三方的包,是asyncio
中的事件循环的替代方案,替换后可以使得asyncio性能提高。
在项目中想要使用uvloop替换asyncio的事件循环也非常简单,只要在代码中这么做:
1 2 3 import asyncioimport uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
其他的代码与上文的代码一致,内部的事件循环自动化会变为uvloop
。
多进程
多进程不支持Windows
Python中的multiprocess
提供了Process
类,实现进程相关的功能。
但是它基于fork机制,想要在Windows中运行,只能使用if __name__ == '__main__':
的方式。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import osimport multiprocessingdef foo (i) : print("这里是 " , multiprocessing.current_process().name) print('模块名称:' , __name__) print('父进程 ID:' , os.getppid()) print('当前子进程 ID:' , os.getpid()) print() if __name__ == '__main__' : for i in range(5 ): p = multiprocessing.Process(target=foo, args=(i,)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 这里是 Process-1 模块名称: __mp_main__ 父进程 ID: 11053 当前子进程 ID: 11056 这里是 Process-3 模块名称: __mp_main__ 父进程 ID: 11053 当前子进程 ID: 11058 这里是 Process-5 模块名称: __mp_main__ 父进程 ID: 11053 当前子进程 ID: 11060 这里是 Process-2 模块名称: __mp_main__ 父进程 ID: 11053 当前子进程 ID: 11057 这里是 Process-4 模块名称: __mp_main__ 父进程 ID: 11053 当前子进程 ID: 11059
进程间的数据共享
进程间的数据共享,即进程通信。
现象
每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的。
我们尝试用一个全局列表来实现进程间的数据共享,可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processlis = [] def foo (i) : lis.append(i) print("This is Process " , i, " and lis is " , lis, " and lis.address is " , id(lis)) if __name__ == '__main__' : for i in range(5 ): p = Process(target=foo, args=(i,)) p.start() print("The end of list_1:" , lis)
运行结果:
1 2 3 4 5 6 The end of list_1: [] This is Process 0 and lis is [0] and lis.address is 140520551137856 This is Process 1 and lis is [1] and lis.address is 140208487580224 This is Process 3 and lis is [3] and lis.address is 140253920281152 This is Process 2 and lis is [2] and lis.address is 140627111625280 This is Process 4 and lis is [4] and lis.address is 140435884917312
共享数据不支持MacOS
想要在进程之间进行数据共享可以使用Queues
、Array
和Manager
这三个multiprocess
模块提供的类,但是根据实测,在MacOS上不支持。
使用Array共享数据
入门案例
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Processfrom multiprocessing import Arraydef func (i_val, temp_val) : temp_val[0 ] += 100 print("进程%s " % i_val, ' 修改数组第一个元素后 -->' , temp_val[0 ]) if __name__ == '__main__' : temp = Array('i' , [1 , 2 , 3 , 4 ]) for i in range(10 ): p = Process(target=func, args=(i, temp)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 进程0 修改数组第一个元素后 --> 101 进程3 修改数组第一个元素后 --> 201 进程7 修改数组第一个元素后 --> 401 进程2 修改数组第一个元素后 --> 301 进程6 修改数组第一个元素后 --> 501 进程1 修改数组第一个元素后 --> 601 进程9 修改数组第一个元素后 --> 701 进程4 修改数组第一个元素后 --> 801 进程8 修改数组第一个元素后 --> 901 进程5 修改数组第一个元素后 --> 1001
注意!Array('i', [1, 2, 3, 4])
。其中i
的含义是说,这个Array中都是int类型。
必须预先指定元素,或者指定数组的长度
Array内的元素可以预先指定,也可以只指定数组的长度。
例如,指定数组长度,Array('i', 5)
。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Processfrom multiprocessing import Arraydef func (i_val, temp_val) : temp_val[0 ] += 1 print("进程%s " % i_val, ' 修改数组第一个元素后 -->' , temp_val[0 ]) if __name__ == '__main__' : temp = Array('i' , 5 ) for i in range(10 ): p = Process(target=func, args=(i, temp)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 进程1 修改数组第一个元素后 --> 1 进程2 修改数组第一个元素后 --> 2 进程0 修改数组第一个元素后 --> 3 进程4 修改数组第一个元素后 --> 4 进程3 修改数组第一个元素后 --> 5 进程6 修改数组第一个元素后 --> 6 进程5 修改数组第一个元素后 --> 7 进程8 修改数组第一个元素后 --> 8 进程9 修改数组第一个元素后 --> 9 进程7 修改数组第一个元素后 --> 10
但是,不能既不预先指定元素,又不指定数组的大小。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Processfrom multiprocessing import Arraydef func (i_val, temp_val) : temp_val[0 ] += 1 print("进程%s " % i_val, ' 修改数组第一个元素后 -->' , temp_val[0 ]) if __name__ == '__main__' : temp = Array('i' ) for i in range(10 ): p = Process(target=func, args=(i, temp)) p.start()
运行结果:
1 2 3 4 Traceback (most recent call last): File "/home/kaka/PycharmProjects/pythonProject/main.py", line 11, in <module> temp = Array('i') TypeError: BaseContext.Array() missing 1 required positional argument: 'size_or_initializer'
Arrya中元素的类型
除了用i
,表示Array数组中的内容必须是int类型。还有其他很多选项。
c
: ctypes.c_char
u
: ctypes.c_wchar
b
: ctypes.c_byte
B
: ctypes.c_ubyte
h
: ctypes.c_short
H
: ctypes.c_ushort
i
: ctypes.c_int
I
: ctypes.c_uint
l
: ctypes.c_long
L
: ctypes.c_ulong
f
: ctypes.c_float
d
: ctypes.c_double
使用Queue类共享数据
入门案例
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Processfrom multiprocessing import Queuedef func (i_val, q_val) : ret = q_val.get() print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i_val, ret, i_val)) q_val.put(i_val) if __name__ == "__main__" : lis = Queue(20 ) lis.put(0 ) for i in range(10 ): p = Process(target=func, args=(i, lis,)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 进程1从队列里获取了一个0,然后又向队列里放入了一个1 进程7从队列里获取了一个1,然后又向队列里放入了一个7 进程3从队列里获取了一个7,然后又向队列里放入了一个3 进程5从队列里获取了一个3,然后又向队列里放入了一个5 进程9从队列里获取了一个5,然后又向队列里放入了一个9 进程2从队列里获取了一个9,然后又向队列里放入了一个2 进程0从队列里获取了一个2,然后又向队列里放入了一个0 进程4从队列里获取了一个0,然后又向队列里放入了一个4 进程6从队列里获取了一个4,然后又向队列里放入了一个6 进程8从队列里获取了一个6,然后又向队列里放入了一个8
multiprocessing.queues.Queue
有些资料会介绍另一个Queue
,位于multiprocessing.queues.Queue
。
位置不同,使用方法略有不同。
multiprocessing.Queue
:Queue(20)
multiprocessing.queues.Queue
:Queue(20, ctx=multiprocessing)
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import multiprocessingfrom multiprocessing import Processfrom multiprocessing.queues import Queuedef func (i_val, q_val) : ret = q_val.get() print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i_val, ret, i_val)) q_val.put(i_val) if __name__ == "__main__" : lis = Queue(20 , ctx=multiprocessing) lis.put(0 ) for i in range(10 ): p = Process(target=func, args=(i, lis,)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 进程0从队列里获取了一个0,然后又向队列里放入了一个0 进程2从队列里获取了一个0,然后又向队列里放入了一个2 进程3从队列里获取了一个2,然后又向队列里放入了一个3 进程1从队列里获取了一个3,然后又向队列里放入了一个1 进程6从队列里获取了一个1,然后又向队列里放入了一个6 进程4从队列里获取了一个6,然后又向队列里放入了一个4 进程5从队列里获取了一个4,然后又向队列里放入了一个5 进程7从队列里获取了一个5,然后又向队列里放入了一个7 进程9从队列里获取了一个7,然后又向队列里放入了一个9 进程8从队列里获取了一个9,然后又向队列里放入了一个8
使用Manager共享数据
入门案例
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processfrom multiprocessing import Managerdef func (i_val, dic_val) : dic_val[100 + i_val] = 100 + i_val print(dic_val.items()) if __name__ == '__main__' : dic = Manager().dict() for i in range(10 ): p = Process(target=func, args=(i, dic)) p.start() p.join()
运行结果:
1 2 3 4 5 6 7 8 9 10 [(100, 100)] [(100, 100), (101, 101)] [(100, 100), (101, 101), (102, 102)] [(100, 100), (101, 101), (102, 102), (103, 103)] [(100, 100), (101, 101), (102, 102), (103, 103), (104, 104)] [(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105)] [(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106)] [(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106), (107, 107)] [(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106), (107, 107), (108, 108)] [(100, 100), (101, 101), (102, 102), (103, 103), (104, 104), (105, 105), (106, 106), (107, 107), (108, 108), (109, 109)]
原因分析
注意,上文的例子中,有一行代码p.join()
,如果我们把这行代码注释呢?
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Processfrom multiprocessing import Managerdef func (i_val, dic_val) : dic_val[100 + i_val] = 100 + i_val print(dic_val.items()) if __name__ == '__main__' : dic = Manager().dict() for i in range(10 ): p = Process(target=func, args=(i, dic)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 /home/kaka/PycharmProjects/pythonProject/venv/bin/python /home/kaka/PycharmProjects/pythonProject/main.py [(100, 100)] [(100, 100), (104, 104)] [(100, 100), (104, 104), (103, 103)] [(100, 100), (104, 104), (103, 103), (102, 102)] [(100, 100), (104, 104), (103, 103), (102, 102), (101, 101)] [(100, 100), (104, 104), (103, 103), (102, 102), (101, 101), (106, 106)] [(100, 100), (104, 104), (103, 103), (102, 102), (101, 101), (106, 106), (105, 105)] [(100, 100), (104, 104), (103, 103), (102, 102), (101, 101), (106, 106), (105, 105), (107, 107)] Process Process-10: Process Process-11: Traceback (most recent call last): Traceback (most recent call last): File "/usr/lib/python3.10/multiprocessing/managers.py", line 810, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): 【部分运行结果略】 ConnectionResetError: [Errno 104] Connection reset by peer File "/usr/lib/python3.10/multiprocessing/managers.py", line 810, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): 【部分运行结果略】 ConnectionResetError: [Errno 104] Connection reset by peer
报错了!
为什么会报错?
p.join()
的作用是什么?
进程中的join()
方法,我们没讨论过。但是线程中的join()
方法我们讨论过,只有这个线程死亡了,其他线程才可以执行。
例如,主线程调用子线程的.join()
方法,那么主线程会阻塞,直到子线程结束后继续运行。
上文,我们主进程会不等子进程结束,就立马结束。
构造方法Manager()
返回的manager对象会提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。
其他支持
Manager的对象支持:list
、dict
、Namespace
、Lock
、RLock
、Semaphore
、BoundedSemaphore
、Condition
、Event
、Barrier
、Queue
、Value
、Array
等多种格式。
现在,我们来做一个事情,用多进程,从1加到100。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 import multiprocessingdef add_to_value (addend, value) : value.value += addend with multiprocessing.Manager() as manager: value = manager.Value(float, 0.0 ) with multiprocessing.Pool(2 ) as pool: pool.starmap(add_to_value, [(float(i), value) for i in range(100 )]) print(value.value)
运行结果:
结果不对!
这个肯定是出现了进程不安全,加锁就够了。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import multiprocessingdef add_to_value (addend, value, lock) : with lock: value.value += addend if __name__ == '__main__' : with multiprocessing.Manager() as manager: lock = manager.Lock() value = manager.Value(float, 0.0 ) with multiprocessing.Pool(2 ) as pool: pool.starmap(add_to_value, [(float(i), value, lock) for i in range(100 )]) print(value.value)
运行结果:
注意两段代码的add_to_value(addend, value, lock)
这个方法。
进程锁
为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。
与threading
类似,在multiprocessing
里也有同名的锁类RLock
、Lock
、Event
、Condition
和Semaphore
,连用法都是一样样的。
根据实测,进程锁不支持
MacOS。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import timefrom multiprocessing import Arrayfrom multiprocessing import Processfrom multiprocessing import RLockdef func (i_val, lis_val, lc_val) : lc_val.acquire() lis_val[0 ] = lis_val[0 ] - 1 time.sleep(1 ) print('say hi' , lis_val[0 ]) lc_val.release() if __name__ == "__main__" : array = Array('i' , 1 ) array[0 ] = 10 lock = RLock() for i in range(10 ): p = Process(target=func, args=(i, array, lock)) p.start()
运行结果:
1 2 3 4 5 6 7 8 9 10 say hi 9 say hi 8 say hi 7 say hi 6 say hi 5 say hi 4 say hi 3 say hi 2 say hi 1 say hi 0
进程池Pool类
进程启动的开销很大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。
根据实测,进程池Pool类支持
MacOS。
Python内置了一个进程池
1 from multiprocessing import Pool
进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
apply()
,申请进程(同步执行,即串行)
apply_async()
,申请进程(异步执行,即并行)
terminate()
,立刻关闭进程池
join()
,主进程等待所有子进程执行完毕。必须在close()
或terminate()
之后。
close()
,等待所有进程结束后,才关闭进程池。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Poolimport timedef func (args) : time.sleep(1 ) print("正在执行进程 " , args) if __name__ == '__main__' : p = Pool(5 ) for i in range(30 ): p.apply_async(func=func, args=(i,)) p.close() p.join()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 正在执行进程 1 正在执行进程 0 正在执行进程 2 正在执行进程 4 正在执行进程 3 【部分运行结果略】 正在执行进程 27 正在执行进程 26 正在执行进程 25 正在执行进程 28 正在执行进程 29
子进程结果汇总
多进程,最常见的一个应用。就是将一个任务拆分多个子任务,交由不同的进程去处理,最后把结果汇总完成后,主进程继续。
在这里举一个例子。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import randomimport timeimport multiprocessingdef worker (name_worker, q_worker) : t = 0 for index in range(10 ): print(name_worker + " " + str(index)) x = random.randint(1 , 3 ) t += x time.sleep(x * 0.1 ) q_worker.put({name_worker: t}) q = multiprocessing.Queue() jobs = [] for i in range(10 ): p = multiprocessing.Process(target=worker, args=(str(i), q)) jobs.append(p) p.start() for p in jobs: p.join() results = [q.get() for j in jobs] print(results)
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 1 0 2 0 5 0 0 0 【部分运行结果略】 9 8 3 9 7 9 9 9 2 9 [{'6': 15}, {'1': 18}, {'4': 19}, {'8': 19}, {'5': 22}, {'0': 22}, {'2': 23}, {'9': 23}, {'3': 24}, {'7': 24}]
joblib
在实际工作中,我们一般不会手动实现一个多进程,而是直接用第三方的包joblib
。
除了基本的多进程功能,joblib
还对磁盘的读取和大型Numpy数组进行了优化。
而且!Linux、MacOS和Windows,三个系统都支持。
关于joblib
,可以参考《未分类【计算机】:Kaggle中技术问题的解决方案》 进行讨论。
进程和线程的区别
进程是CPU资源分配中的最小单元
线程是被CPU调度的最小单元
一个进程中可能会有多个线程
附录:迭代器和生成器
上文在协程部分讨论了yield
,yield
的另一个用途是生成器。
在这里补充讨论一下迭代器和生成器。
迭代器
迭代是Python最强大的功能之一,是访问集合元素的一种方式。
迭代器是一个可以记住遍历的位置的对象。
迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。
迭代器有两个基本的方法:iter() 和 next()。
字符串,列表或元组对象都可用于创建迭代器:
示例代码:
1 2 3 4 5 6 list = [1 , 2 , 3 , 4 ] it = iter(list) print(next(it)) print(next(it))
运行结果:
迭代器对象可以使用常规for语句进行遍历:
示例代码:
1 2 3 4 5 list = [1 , 2 , 3 , 4 ] it = iter(list) for x in it: print(x)
运行结果:
也可以使用next()
函数:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 import syslist = [1 , 2 , 3 , 4 ] it = iter(list) while True : try : print(next(it)) except StopIteration: sys.exit()
运行结果:
创建迭代器
把一个类作为一个迭代器使用需要在类中实现两个方法__iter__()
与__next__()
。
__iter__()
方法返回一个特殊的迭代器对象,这个迭代器对象实现了__next__()
方法并通过StopIteration
异常标识迭代的完成。
__next__()
方法会返回下一个迭代器对象。
创建一个返回数字的迭代器,初始值为 1,逐步递增 1:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class MyNumbers : def __iter__ (self) : self.a = 1 return self def __next__ (self) : x = self.a self.a += 1 return x myclass = MyNumbers() myiter = iter(myclass) print(next(myiter)) print(next(myiter)) print(next(myiter)) print(next(myiter)) print(next(myiter))
运行结果:
StopIteration
StopIteration
异常用于标识迭代的完成,防止出现无限循环的情况,在__next__()
方法中我们可以设置在完成指定循环次数后触发StopIteration
异常来结束迭代。
在20次迭代后停止执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class MyNumbers : def __iter__ (self) : self.a = 1 return self def __next__ (self) : if self.a <= 20 : x = self.a self.a += 1 return x else : raise StopIteration myclass = MyNumbers() myiter = iter(myclass) for x in myiter: print(x)
生成器
在Python中,使用了yield的函数被称为生成器(generator)。
yield
是一个关键字,用于定义生成器函数,生成器函数是一种特殊的函数,可以在迭代过程中逐步产生值,而不是一次性返回所有结果。
和普通函数不同的是,生成器是一个返回迭代器的函数,只能用于迭代操作,更简单点理解生成器就是一个迭代器。
当在生成器函数中使用yield
语句时,函数的执行将会暂停,并将yield
后面的表达式作为当前迭代的值返回。
然后,每次调用生成器的next()
方法或使用for
循环进行迭代时,函数会从上次暂停的地方继续执行,直到再次遇到yield
语句。
这样,生成器函数可以逐步产生值,而不需要一次性计算并返回所有结果。
调用一个生成器函数,返回的是一个迭代器对象。
下面是一个简单的示例,展示了生成器函数的使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def countdown (n) : while n > 0 : yield n n -= 1 generator = countdown(5 ) print(next(generator)) print(next(generator)) print(next(generator)) for value in generator: print(value)
解释说明:
countdown
函数是一个生成器函数。使用yield
语句逐步产生从n
到1
的倒数数字。在每次调用yield
语句时,函数会返回当前的倒数值,并在下一次调用时从上次暂停的地方继续执行。
通过创建生成器对象并使用next()
函数或for
循环迭代生成器,我们可以逐步获取生成器函数产生的值。
在上文,我们首先使用next()
函数获取前三个倒数值,然后通过for
循环获取剩下的两个倒数值。
生成器函数的优势是可以按需生成值,避免一次性生成大量数据并占用大量内存。此外,生成器还可以与其他迭代工具(如for循环)无缝配合使用,提供简洁和高效的迭代方式。
小结
共同点:
两者都可以用来遍历数据,都可以用在for
循环中。
不同点:
实现方式:
迭代器通常通过类实现,需要手动实现__iter__
和__next__
方法。
生成器可以通过函数或生成器表达式实现,使用yield
关键字。
内存效率:
生成器因为其惰性求值的特性,在处理大数据量时更加高效。
使用场景:
迭代器适用于遍历现有的数据集合。
生成器更适用于生成新的数据流,尤其是当这些数据流是无限的或是基于一些复杂计算产生的。