# multiprocessing(下)

# 数据共享

展望未来,基于消息传递的并发编程是大势所趋即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题,应该尽量避免使用本节所讲的共享数据的方式,以后我们会尝试使用数据库来解决进程之间的数据共享问题。

进程之间数据共享的模块之一Manager模块。

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁。

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)   #0,加了锁后
#100减100次1这么慢? 不是减操作造成的 而是开启进程 管理进程 销毁进程拖慢了程序的执行速度

 为什么在这里出现了数据不安全的现象?
# 什么情况下会出现数据不安全 : Manager类当中对字典\列表  += -= *= /=
# 如何解决 : 加锁        
  • 总结一下:进程之间通信:队列、管道、数据共享也算

    • 下面要讲的信号量和事件也相当于锁,也是全局的,所有进程都能拿到这些锁的状态,进程之间这些锁啊信号量啊事件啊等等的通信,其实底层还是socekt,只不过是基于文件的socket通信,而不是跟上面的数据共享啊空间共享啊之类的机制,我们之前学的是基于网络的socket通信,还记得socket的两个家族吗,一个文件的一个网络的,所以将来如果说这些锁之类的报错,可能你看到的就是类似于socket的错误,简单知道一下就可以啦~~~

      工作中常用的是锁,信号量和事件不常用,但是信号量和事件面试的时候会问到,你能知道就行啦

# 信息量(进程中的,也有线程的)

  • 信息量Semaphore介绍

    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。异步(在go中也是如此)
    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    
    实现:
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
    
  • 比如大保健:提前设定好,一个房间只有4个床(计数器现在为4),那么同时只能四个人进来,谁先来的谁先占一个床(acquire,计数器减1),4个床满了之后(计数器为0了),第五个人就要等着,等其中一个人出来(release,计数器加1),他就去占用那个床了。

        from multiprocessing import Process,Semaphore
        import time,random
        
        def go_ktv(sem,user):
            sem.acquire()
            print('%s 占到一间ktv小屋' %user)
            time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
            sem.release()
        
        if __name__ == '__main__':
            sem=Semaphore(4)
            p_l=[]
            for i in range(13):
                p=Process(target=go_ktv,args=(sem,'user%s' %i,))
                p.start()
                p_l.append(p)
        
            for i in p_l:
                i.join()
            print('============》')
        
            -----------------------------------------------------------------
        #模拟6个人去ktv唱歌
        import time
        import random
        from multiprocessing import Process,Semaphore
        def ktv(i):
            print('person %s 进来唱歌了'%i)
            time.sleep(random.randint(1,5))
            print('person %s 从ktv出去了'%i)
        if __name__ == '__main__':
            for i in range(6):  # 模拟6个人
                Process(target=ktv,args=(i,)).start()  #结果发现有问题,6个人一拥而入,没有次序乱了
          ---------------------------------------------------------------------
        
        #使用semaphore来设置一次进去的人数,KTV 4个人
        import time
        import random
        from multiprocessing import Process,Semaphore
         
        def ktv(i,sem):
            sem.acquire()  #取得锁
            print('person %s 进来唱歌了'%i)
            time.sleep(random.randint(1,5))
            print('person %s 从ktv出去了'%i)
            sem.release()  #释放锁
         
        if __name__ == '__main__':
            sem = Semaphore(4)  #初始化信号量,数量为4
            for i in range(6):  # 模拟6个人
                Process(target=ktv,args=(i,sem)).start()
                
        #在同一时间,最多有4个人进去
        acquire()是一个阻塞行为,信号量和锁有点类似,那么它们之间的区别在于:
        信号量,相当于计数器:它是锁+计数器,调用acquire() 计数器-1
        当计数器到 0 时,再调用 acquire() 就会阻塞,直到其他线程来调用release()
        调用release() 计数器+1   
           
    

# 事件

# 进程池

  • 进程池介绍:

    为什么要有进程池?进程池的概念。
    
    	在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
    
    	在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
    
  • multiprocessing中的Pool模块(后面我们将另一个模块实现concrrent.future)

    #语法:Pool([numprocess  [,initializer [, initargs]]]):创建进程池
    
    #参数说明:
    	1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    	2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    	3 initargs:是要传给initializer的参数组
    
    #主要方法:
    p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
       
    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    
    #其他参数
    方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    obj.ready():如果调用完成,返回True
    obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    obj.wait([timeout]):等待结果变为可用。
    obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
    
    #代码示例:
    import time
    from multiprocessing import Pool
    def fc(i):
        time.sleep(0.5)
        print('func%s'%i)
     
    if __name__ == '__main__':
        p = Pool(5)
        p.apply(func=fc,args=(1,))   #func1
        
    #示例二
    import time
    from multiprocessing import Pool
    def fc(i):
        time.sleep(0.5)
        print('func%s'%i)
     
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(5):
            p.apply(func=fc,args=(1,))  # 同步调用
            #ret = p.apply_async(func=fc, args=(1,))  # 异步调用
     #结果
    func1
    func1
    func1
    func1
    func1
    
    #示例三,可以实现回调函数
    import time
    import random
    from multiprocessing import Pool
    def fc(i):
        print('func%s' % i)
        time.sleep(random.randint(1, 3))
        return i ** 2
     
    if __name__ == '__main__':
        p = Pool(5) # 创建拥有5个进程数量的进程池
        ret_1 = []
        for i in range(5):
            #p.apply(func=fc,args=(1,))  # 同步调用
            ret = p.apply_async(func=fc, args=(i,))  # 异步调用
            ret_1.append(ret)
        for ret in ret_1:print(ret.get())  # 打印返回结果
            
     #结果:
    func0
    func1
    func2
    func3
    func4
    
    0
    1
    4
    9
    16
    

# 死锁(进程)

  • 在多道程序系统中,由于多个进程的并发执行,改善了系统资源的利用率并提高了系统的处理能力。然而,多个进程的并发执行也带来了新的问题——死锁。所谓死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。

  • 出现的原因:

  • 代码示例

    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('\033[41m%s 拿到A锁\033[0m' %self.name)
    
            mutexB.acquire()
            print('\033[42m%s 拿到B锁\033[0m' %self.name)
            mutexB.release()
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('\033[43m%s 拿到B锁\033[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('\033[44m%s 拿到A锁\033[0m' %self.name)
            mutexA.release()
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(5):
            t=MyThread()
            t.start()