# multiprocessing(上)
**概念:**仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
重点强调:进程没有任何共享状态,进程修改的数据,改动仅限于该进程内,但是通过一些特殊的方法,可以实现进程之间数据的共享。
Process模块**(创建进程的第一种方式)**:一个创建进程的模块,借助这个模块,就可以完成进程的创建。
Process([group [, target [, name [, args [, kwargs]]]]]) #由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) #强调: #1. 需要使用关键字的方式来指定参数 #2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 # 当前文件名称为test.py from multiprocessing import Process def func(): print(12345) if __name__ == '__main__': #windows下才需要写这个,这和系统创建进程的机制有关系,不用深究,记着windows下要写就好啦(因为windows是通过import导入模块,所以如果不区分那么会无限导入,而mac、linux则是直接copy导入文件的方式,首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程 p = Process(target=func,) #将函数注册到一个进程中,p是一个进程对象,此时还没有启动进程,只是创建了一个进程对象。并且func是不加括号的,因为加上括号这个函数就直接运行了对吧。 p.start() #告诉操作系统,给我开启一个进程,func这个函数就被我们新开的这个进程执行了,而这个进程是我主进程运行过程中创建出来的,所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。 而这个子进程中执行的程序,相当于将现在这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就相当于当前这个文件,被另外一个py文件import过去并执行了。 # start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。 print('*' * 10) #这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步 -------------------------------------------------------- import os import time from multiprocessing import Process def func(a,b,c): time.sleep(1) print(a,b,c,os.getpid(),os.getppid()) #1 2 3 9936 9592 if __name__ == '__main__': # windows操作系统下开启子进程子进程中的代码是通过import这种方式被导入到子进程中的 print('主 :', os.getpid()) #主 : 9592 Process(target=func,args=(1,2,3)).start() -------------------------------------------------------- import time import os #os.getpid() 获取自己进程的ID号 #os.getppid() 获取自己进程的父进程的ID号 from multiprocessing import Process def func(): print('aaaa') time.sleep(1) print('子进程>>',os.getpid()) # 8064 print('该子进程的父进程>>',os.getppid()) # 3552 print(12345) if __name__ == '__main__': p = Process(target=func,) #首先我运行当前这个文件,运行的这个文件的程序,那么就产生了主进程 p.start() print('*' * 10) print('父进程>>',os.getpid()) #3552 print('父进程的父进程>>',os.getppid()) #10804 #将上个例题的函数下加 print()后会打印两次,因为子进程会加载主程序的函数, #1、也就是说在window是系统上,python通过import导入的形式创建子进程,如果不写if __name__ 语句,则会无限导入 #2、if 上面的语句在子进程创建时,会在执行一遍 def func(): print('aaaa') time.sleep(1) print('子进程>>',os.getpid()) print('该子进程的父进程>>',os.getppid()) print(12345) print('太白老司机~~~~') #如果我在这里加了一个打印,你会发现运行结果中会出现两次打印出来的太白老司机,因为我们在主进程中开了一个子进程,子进程中的程序相当于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,所以出现了两次打印 #其实是因为windows开起进程的机制决定的,在linux下是不存在这个效果的,因为windows使用的是process方法来开启进程,他就会拿到主进程中的所有程序,而linux下只是去执行我子进程中注册的那个函数,不会执行别的程序,这也是为什么在windows下要加上执行程序的时候, #要加上if __name__ == '__main__':,否则会出现子进程中运行的时候还开启子进程,那就出现无限循环的创建进程了,就报错了
我们可以打开windows的任务管理器查看pycharm的pid的进程号:
一个主进程运行完了之后,我们把pycharm关了,但是子进程还没有执行结束,那么子进程还存在吗?这要看你的进程是如何配置的,如果说我们没有配置说我主进程结束,子进程要跟着结束,那么主进程结束的时候,子进程是不会跟着结束的,他会自己执行完,如果我设定的是主进程结束,子进程必须跟着结束,那么就不会出现单独的子进程(孤儿进程)了,具体如何设置,看下面的守护进程的讲解。比如说,我们将来启动项目的时候,可能通过cmd来启动,那么我cmd关闭了你的项目就会关闭吗,不会的,因为你的项目不能停止对外的服务,对吧,但是所有的孤儿进程会随着关机,内存释放。
Process类中各方法的介绍:
1、 p.start():启动进程,并调用该子进程中的p.run() 2、 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 。 3、 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 4、 p.is_alive():如果p仍然运行,返回True 5、 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程。 6、start terminate join分别是什么类型? start\terminate 异步非阻塞 join 同步阻塞
Process类中自带封装的各属性的介绍:
1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程(代码终止,而不是主进程终止,因为主进程要进行垃圾回收 子进程)终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 2 p.name:进程的名称,这些属性是在Process类中的init方法中实现的 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
join方法的列子:让主进程加上join的地方等待(也就是阻塞住),等待子进程执行完之后,再继续往下执行我的主进程,好多时候,我们主进程需要子进程的执行结果,所以必须要等待。join感觉就像是将子进程和主进程拼接起来一样,将异步改为同步执行。
from multiprocessing import Process def func(x,y): #前面的省略 print(x) time.sleep(1) print(y) if __name__ == '__main__': p = Process(target=func,args=('姑娘','来玩啊!')) p.start() print('我这里是异步的啊!') #这里相对于子进程还是异步的 p.join() #只有在join的地方才会阻塞住,将子进程和主进程之间的异步改为同步 print('父进程执行结束!') #打印结果: 我这里是异步的啊! 姑娘 来玩啊! 父进程执行结束! ----------------------------------------------------------------- #第二种传参 def func(a, b, c): print("函数func的参数分别是>>>", a, b, c) print("子进程执行完毕!") if __name__ == '__main__': p = Process(target=func, kwargs={"a":"参数一", "b":"参数二", "c":"参数三"}) #传输位置参数 p.start() time.sleep(1) print("主进程执行完毕!") # 执行结果: # 函数func的参数分别是>>> 参数一 参数二 参数三 # 子进程执行完毕! # 主进程执行完毕! -------------------------------开启多个进程------------------------ #怎么样开启多个进程呢?for循环。并且我有个需求就是说,所有的子进程异步执行,然后所有的子进程全部执行完之后,我再执行主进程,怎么搞?看代码 def func(x,y): print(x) # time.sleep(1) #进程切换:如果没有这个时间间隔,那么你会发现func执行结果是打印一个x然后一个y,再打印一个x一个y,不会出现打印多个x然后打印y的情况,因为两个打印距离太近了而且执行的也非常快,但是如果你这段程序运行慢的话,你就会发现进程之间的切换了。 print(y) if __name__ == '__main__': p_list= [] for i in range(10): p = Process(target=func,args=('姑娘%s'%i,'来玩啊!')) p_list.append(p) p.start() [ap.join() for ap in p_list] #4、这是解决办法,前提是我们的子进程全部都已经去执行了,那么我在一次给所有正在执行的子进程加上join,那么主进程就需要等着所有子进程执行结束才会继续执行自己的程序了,并且保障了所有子进程是异步执行的。 # p.join()放在for内: 如果加到for循环里面,那么所有子进程包括父进程就全部变为同步了,因为for循环也是主进程的,循环第一次的时候,一个进程去执行了,然后这个进程就join住了,那么for循环就不会继续执行了,等着第一个子进程执行结束才会继续执行for循环去创建第二个子进程。 #2、如果我不想这样的,也就是我想所有的子进程是异步的,然后所有的子进程执行完了再执行主进程p.join()在for外部:如果这样写的话,多次运行之后,你会发现会出现主进程的程序比一些子进程先执行完,因为我们p.join()是对最后一个子进程进行了join,也就是说如果这最后一个子进程先于其他子进程执行完,那么主进程就会去执行,而此时如果还有一些子进程没有执行完,而主进程执行完了,那么就会先打印主进程的内容了,这个cpu调度进程的机制有关系,因为我们的电脑可能只有4个cpu,我的子进程加上住进程有11个,虽然我for循环是按顺序起进程的,但是操作系统一定会按照顺序给你执行你的进程吗,答案是不会的,操作系统会按照自己的算法来分配进程给cpu去执行,这里也解释了我们打印出来的子进程中的内容也是没有固定顺序的原因,因为打印结果也需要调用cpu,可以理解成进程在争抢cpu,如果同学你想问这是什么算法,这就要去研究操作系统啦。那我们的想所有子进程异步执行,然后再执行主进程的这个需求怎么解决啊 print('不要钱~~~~~~~~~~~~~~~~!') #列:1、同时对一个文件进行写操作 2、同时创建多个文件 import time import os import re from multiprocessing import Process #例1、多进程同时对一个文件进行写操作 def func(x,y,i): with open(x,'a',encoding='utf-8') as f: print('当前进程%s拿到的文件的光标位置>>%s'%(os.getpid(),f.tell())) f.write(y) #多进程同时创建多个文件 # def func(x, y): # with open(x, 'w', encoding='utf-8') as f: # f.write(y) if __name__ == '__main__': p_list= [] for i in range(10): p = Process(target=func,args=('can_do_girl_lists.txt','姑娘%s'%i,i)) p_list.append(p) p.start() [ap.join() for ap in p_list] #这就是个for循环,只不过用列表生成式的形式写的 with open('can_do_girl_lists.txt','r',encoding='utf-8') as f: data = f.read() all_num = re.findall('\d+',data) #打开文件,统计一下里面有多少个数据,每个数据都有个数字,所以re匹配一下就行了 print('>>>>>',all_num,'.....%s'%(len(all_num))) #print([i in in os.walk(r'你的文件夹路径')]) print('不要钱~~~~~~~~~~~~~~~~!') ------------------------------------------------------------- #例2、terminate关闭子进程 def func(): print("子进程开始执行!") time.sleep(2) print("子进程执行完毕!") if __name__ == '__main__': p = Process(target=func,) p.start() p.terminate() # 给操作系统发送一个关闭进程p1的信号,让操作系统去关闭它 time.sleep(1) """由于操作系统关闭子进程的过程需要做许多事情(如回收资源),这是需要耗费一定时间的, 如果在给操作系统发出关闭信号后(p1.terminate())立刻判断子进程是否还活着,结果是不准 确的,此时操作系统正在关闭子进程,所以我们要等待一定时间才可以得到正确的判断结果.""" print("子进程是否还活着>>>", p.is_alive()) print("主进程执行完毕!") ----------------------------------------------------------------- #例3、进程对象的其他方法一:terminate,is_alive class Piao(Process): def __init__(self,name): self.name=name #如果这个self.name 在这里定义了,会被后面的init中的覆盖(其为name赋值类名piao-1) super().__init__() def run(self): print('%s is 打飞机' %self.name) # s = input('???') #别忘了再pycharm下子进程中不能input输入,会报错EOFError: EOF when reading a line,因为子进程中没有像我们主进程这样的在pycharm下的控制台可以输入东西的地方 time.sleep(2) print('%s is 打飞机结束' %self.name) if __name__ == '__main__': p1=Piao('太白') p1.start() time.sleep(5) p1.terminate()#关闭进程,不会立即关闭,有个等着操作系统去关闭这个进程的时间,所以is_alive立刻查看的结果可能还是存活,但是稍微等一会,就被关掉了 print(p1.is_alive()) #结果为True print('等会。。。。') time.sleep(1) print(p1.is_alive()) #结果为False ---------------------------------------------------------------------- #例4、Process类中自带的self.name 在父类的init函数在已存在 class Piao(Process): def __init__(self,name): self.name=name super().__init__() #Process的__init__方法会执行self.name=Piao-1,覆盖egon #所以加到这里,会覆盖我们的self.name=name #为我们开启的进程设置名字的做法 # super().__init__() # 这种情况不会影响我们的名字,不会被覆盖 # self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) if __name__=='__main__': p=Piao('egon') p.start() #会自动执行run里面得方法 print('开始') print(p.pid) #查看pid #开始 #15148 #Piao-1 is piaoing #Piao-1 is piao end ---------------------------------------------------------------- #例5,子进程的import过程 from multiprocessing import Process import time def func(): print('ok') print(1) #1 第一次打印主程序 打印 1 if __name__=='__main__': p=Process(target=func) #2 开始导入主模块,import 所以还会执行 打印 1,3 p.start() #3 打印 ok a=2 time.sleep(2) print('a',a) #4打印 a ,2 #如果外部调用a # print(a) # 报错,子进程没有a print('3') #5 打印3
在windows中Process()必须放到# if
__name__
=='__main__'
:下由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)无限import。 这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
# 进程创建的第二种方法:(继承)
class MyProcess(Process): #自己写一个类,继承Process类 #我们通过init方法可以传参数,如果只写一个run方法,那么没法传参数,因为创建对象的是传参就是在init方法里面,面向对象的时候,我们是不是学过 def __init__(self,person): #之前我只能使用start的方法 super().__init__() self.person=person def run(self): print(os.getpid()) print(self.pid) print(self.pid) print('%s 正在聊天' %self.person) # def start(self): # #如果你非要写一个start方法,可以这样写,并且在run方法前后,可以写一些其他的逻辑 # self.run() if __name__ == '__main__': p1=MyProcess('Jedan') p2=MyProcess('太白') p3=MyProcess('alexDSB') p1.start() #start内部会自动调用run方法,这是在写了start方法后 p2.start() # p2.run() #当注释掉p2.join(),执行这一句时,太白只能打印池pid,后面的都是None,因为覆盖了父类的run p3.start() p1.join() p2.join() p3.join() #13328 #13328 #13328 #Jedan 正在聊天 #4052 #4052 #4052 #太白 正在聊天 #4556 #4556 #4556 #alexDSB 正在聊天
进程的内存空间是隔离的
import time from multiprocessing import Process global_num = 100 def func(): # 子进程 global global_num global_num = 0 print("子进程的全局变量>>>", global_num) if __name__ == '__main__': p1 = Process(target=func,) p1.start() time.sleep(1) # 等待子进程执行结束 print("主进程的全局变量>>>", global_num) # 执行结果: # 子进程的全局变量>>> 0 # 主进程的全局变量>>> 100 # 得出结论: 进程之间是空间隔离的,不共享资源,(相当于复制一份走)
子进程中不能使用input:如果在子进程中存在input,由于输入台只显示在主进程中,子进程是没有输入台的,于是系统会自动报错.所以不要在子进程中出现input。
僵尸进程和孤儿进程:
- 僵尸进程(有害):任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。
- 孤儿进程(无害):一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。
守护进程
- 使用平常的方法时,子进程是不会随着主进程的结束而结束,只有当主进程和子进程全部执行完毕后,程序才会结束.但是,如果我们的需求是: 主进程执行结束,由该主进程创建的子进程必须跟着结束. 这时,我们就需要用到守护进程了
- 主进程创建守护进程:
- 其一: 守护进程会在主进程代码执行结束后就终止。
- 其二: 守护进程内无法再开启子进程,否则抛出异常: AssertionError: daemonic processes are not allowed to have children。
- 需要注意的是: 进程之间是相互独立的,主进程代码运行结束,守护进程随机终止。
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print("这个人的ID号是:%s" % os.getpid()) print("这个人的名字是:%s" % self.name) time.sleep(3) if __name__ == '__main__': p=Myprocess('李华') p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p.start() # time.sleep(1) # 在sleep时linux下查看进程id对应的进程ps -ef|grep id print('主进程执行完毕!') #这种情况子进程都还没进行就结束了,就打印'主程序……' -------------------------------------------------------------- #具体使用自定义的进程类来处理事情: import time from multiprocessing import Process def func(person): print('%s test ok'%person) class Myprocess(Process): def __init__(self,func,person): self.func = func self.person = person super().__init__(target=self.func,args=(self.person,)) #这里args必须是元组 if __name__ == '__main__': p = Myprocess(func,'wang') # 这里func传递的是一个函数对象过去 p.start() time.sleep(3)
总结:
1、主进程和子进程互不干扰 2、主进程执行完毕之后程序不会结束,会等待所有的子进程结束之后才结束 3、为什么主进程要等待子进程结束之后才结束? 因为主进程要负责给子进程回收一些系统资源 4、守护进程 : 是一个子进程,守护的是主进程 结束条件 : 主进程的代码结束,守护进程也结束 5、进程 主进程的代码结束,守护进程结束 主进程要回收守护进程(子进程)的资源 主进程等待其他所有子进程结束 主进程回收所有子进程的资源
实例:使用原生socket模块写一个实现多人聊天的功能?
#服务端
from socket import *
from multiprocessing import Process
def talk(conn,client_addr):
while True:
try:
msg=conn.recv(1024)
print('客户端消息>>',msg)
if not msg:break
conn.send(msg.upper())
#在这里有同学可能会想,我能不能在这里写input来自己输入内容和客户端进行对话?朋友,是这样的,按说是可以的,但是需要什么呢?需要你像我们用pycharm的是一样下面有一个输入内容的控制台,当我们的子进程去执行的时候,我们是没有地方可以显示能够让你输入内容的控制台的,所以你没办法输入,就会给你报错。
except Exception:
break
if __name__ == '__main__': #windows下start进程一定要写到这下面
server = socket(AF_INET, SOCK_STREAM)
# server.setsockopt(SOL_SOCKET, SO_REUSEADDR,1) # 如果你将如果你将bind这些代码写到if __name__ == '__main__'这行代码的上面,那么地址重用必须要有,因为我们知道windows创建的子进程是对整个当前文件的内容进行的copy,前面说了就像import,如果你开启了子进程,那么子进程是会执行bind的,那么你的主进程bind了这个ip和端口,子进程在进行bind的时候就会报错。
server.bind(('127.0.0.1', 8080))
#有同学可能还会想,我为什么多个进程就可以连接一个server段的一个ip和端口了呢,我记得当时说tcp的socket的时候,我是不能在你这个ip和端口被连接的情况下再连接你的啊,这里是因为当时我们就是一个进程,一个进程里面是只能一个连接的,多进程是可以多连接的,这和进程之间是单独的内存空间有关系,先这样记住他,好吗?
server.listen()
while True:
conn,client_addr=server.accept()
p=Process(target=talk,args=(conn,client_addr))
p.start()
#客服端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
← 线程 multiprocessing(中) →