# 多任务协程爬虫

​ ​ 其实爬虫的本质就是client发请求批量获取server的响应数据,如果我们有多个url待爬取,只用一个线程且采用串行的方式执行,那只能等待爬取一个结束后才能继续下一个,效率会非常低。

​ 需要强调的是:对于单线程下串行N个任务,并不完全等同于低效,如果这N个任务都是纯计算的任务,那么该线程对cpu的利用率仍然会很高,之所以单线程下串行多个爬虫任务低效,是因为爬虫任务是明显的IO密集型(阻塞)程序。那么该如何提高爬取性能呢?

  • # 分析处理

    同步调用:即提交一个任务后就在原地等待任务结束,等到拿到任务的结果后再继续下一行代码,效率低下

    import requests
    def parse_page(res):
        print('解析 %s' %(len(res)))
    
    def get_page(url):
        print('下载 %s' %url)
        response=requests.get(url)
        if response.status_code == 200:
            return response.text
    
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        res=get_page(url) #调用一个任务,就在原地等待任务结束拿到结果后才继续往后执行
        parse_page(res)
    
  • 我们也可以使用Flask写一个服务端,每个网址停留2s,我们可以通过爬虫的串行和并行比较时间

    from multiprocessing.dummy import Pool
    import time,requests
    
    #爬取网址
    urls = [
        'http://127.0.0.1:5000/bobo',
        'http://127.0.0.1:5000/jay',
        'http://127.0.0.1:5000/tom',
    ]
    start_time  = time.time()
    
    # 同步爬取时耗
    # 同时向这3个网址发请求
    for url in urls:
         page_text = requests.get(url).text
    
    #总耗时: 6.025481224060059
    print('总耗时:', time.time() - start_time)
    
    --------------------------------------------
    # 开启四个线程池
    pool = Pool(4)
    
    def get_request(url):
        return requests.get(url).text
    
    response_list = pool.map(get_request,urls)
    #['i am bobo!!!', 'Hello jay', 'Hello tom']
    print(response_list)
    
    #总耗时: 2.0233120918273926
    print('总耗时:', time.time() - start_time)
    
  • 解决同步调用方案之多线程/多进程

    • 好处:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
    • 弊端:开启多进程或都线程的方式,我们是无法无限制地开启多进程或多线程的:在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
  • 解决同步调用方案之线程/进程池

    • 好处:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。可以很好的降低系统开销。
    • 弊端:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
# 案例1:基于multiprocessing.dummy线程池爬取梨视频的视频信息
import requests
import random
from lxml import etree


import re
from fake_useragent import UserAgent
# 安装fake-useragent库:pip install fake-useragent
# 导入线程池模块
from multiprocessing.dummy import Pool

# 实例化线程池对象
pool = Pool()
url = 'http://www.pearvideo.com/category_1'
# 随机产生UA
ua = UserAgent().random
headers = {
    'User-Agent': ua
}
# 获取首页页面数据
page_text = requests.get(url=url, headers=headers).text
# 对获取的首页页面数据中的相关视频详情链接进行解析
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@id="listvideoList"]/ul/li')

detail_urls = []  # 存储二级页面的url
for li in li_list:
    detail_url = 'http://www.pearvideo.com/' + li.xpath('./div/a/@href')[0]
    title = li.xpath('.//div[@class="vervideo-title"]/text()')[0]
    detail_urls.append(detail_url)

vedio_urls = []  # 存储视频的url

def save(data):
    fileName = str(random.randint(1, 10000)) + '.mp4'
    with open(fileName, 'wb') as fp:
        fp.write(data)
        print(fileName + '已存储')

for url in detail_urls:
    page_text = requests.get(url=url, headers=headers).text
    vedio_url = re.findall('srcUrl="(.*?)"', page_text, re.S)[0]
    vedio_urls.append(vedio_url)
    # 使用线程池进行视频数据下载
    func_request = lambda link: requests.get(url=link, headers=headers).content
    video_data_list = pool.map(func_request, vedio_urls)
    # 使用线程池进行视频数据保存
    func_saveData = lambda data: save(data)
    pool.map(func_saveData, video_data_list)


pool.close()
pool.join()
  • 总结:对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

# 终结处理方案

​ 上述无论哪种解决方案其实没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。

# asyncio 模块

  • 在python3.4之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO【HTTP连接就是网络IO操作】),实现应用程序级别的切换(异步IO)。注意:asyncio只能发tcp级别的请求,不能发http协议。

  • 异步IO:所谓「异步 IO」,就是你发起一个 网络IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

    • 实现方式:单线程+协程实现异步IO操作

# 异步协程用法

​ 接下来让我们来了解下协程的实现,从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。首先我们需要了解下面几个概念:(让我想起了Sanic)

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。(比作一个无底的容器,任务在里面进行调度)

  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。

  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

  • 另外我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

  • 单线程+多任务异步协程(推荐) 总结:

        - 协程:对象.可以把协程当做是一个特殊的函数.如果一个函数的定义被async关键字所修饰.该特殊的函数被调用后函数内部的程序语句不会被立即执行,而是会返回一个协程对象.
        
        - 任务对象(task):所谓的任务对象就是对协程对象的进一步封装.在任务对象中可以实现显示协程对象的运行状况.任务对象最终是需要被注册到事件循环对象中.
            - 绑定回调:回调函数是绑定给任务对象,只有当任务对象对应的特殊函数被执行完毕后,回调函数才会被执行
        - 事件循环对象:无限循环的对象.也可以把其当成是某一种容器.该容器中需要放置多个任务对象(就是一组待执行的代码块).
            - 异步的体现:当事件循环开启后,该对象会安装顺序执行每一个任务对象,
                当一个任务对象发生了阻塞事件循环是不会等待,而是直接执行下一个任务对象
        - await:挂起的操作.交出cpu的使用权
    
  • 代码示例1: 单独的协程对象是并不会执行的

    import asyncio
    import time
    
    # 通过async 将函数进行修饰,返回一个协程对象
    async def get_request(url):
        print('正在请求',url)
        time.sleep(2)
        print('请求结束',url)
    
    c= get_request('www.baidu.com')
    print(c)  #<coroutine object get_request at 0x000002C603449990>
    
  • 代码示例2:既然协程不能执行,那么我们可不可以试试将其直接注册到事件循环中,(跳过task)

    import asyncio
    import time
    
    # 通过async 将函数进行修饰,返回一个协程对象
    async def get_request(url):
        print('正在请求',url)
        time.sleep(2)
        print('请求结束',url)
    
    c= get_request('www.baidu.com')
    print(c) 
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(c)
    print('ok!')
    
    # <coroutine object get_request at 0x00000299B38798E0>
    # 正在请求 www.baidu.com
    # 请求结束 www.baidu.com
    # ok!
    

    ​ 首先我们引入了 asyncio 这个包,这样我们才可以使用 async 和 await,然后我们使用 async 定义了一个 get_request() 方法,方法接收一个数字参数,方法执行之后会打印这个数字。随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象。随后我们使用 get_event_loop() 方法创建了一个事件循环 loop,并调用了 loop 对象的 run_until_complete() 方法将协程注册到事件循环 loop 中,然后启动。最后我们才看到了 get_request() 方法打印了输出结果。可见,async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行


    ​ 上文我们还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明,如下所示

  • 代码示例3:执行task封装的协程对象,的返回值

    import asyncio
    import time
    
    # 通过async 将函数进行修饰,返回一个协程对象
    async def get_request(url):
        print('正在请求',url)
        time.sleep(2)
        print('请求结束',url)
        return '协程的返回值'
    
    
    #创建一个协程对象
    c = get_request('www.baidu.com')
    print('创建一个协程对象')
    
    #创建一个事件循环
    loop = asyncio.get_event_loop()
    
    #创建一个任务对象,将协程对象的c进行封装, create_task用于创建task
    task = loop.create_task(c)
    print('Befor Task:',task)
    
    #这种方式也能绑定回调,下面我们可通过ensure_future绑定
    #task.add_done_callback()
    
    # 将事件对象添加到事件循环中,并执行
    loop.run_until_complete(task)
    
    print('After Task:',task)
    
    # 创建一个协程对象
    # Befor Task: <Task pending coro=<get_request() running at C:/Users/wanglixing/Desktop/文件夹/test/任务对象.py:5>>
    # 正在请求 www.baidu.com
    # 请求结束 www.baidu.com
    # After Task: <Task finished coro=<get_request() done, defined at C:/Users/wanglixing/Desktop/文件夹/test/任务对象.py:5> result='协程的返回值'>
    
    通过上面的运行结果我们可以知道,task会为协程绑定运行状态:挂起、运行、结束.../运行位置/以及返回值
    

    ​ 我们定义了 loop 对象之后,接着调用了它的 create_task() 方法将 coroutine 对象转化为了 task 对象,随后我们打印输出一下,发现它是 pending 状态。接着我们将 task 对象添加到事件循环中得到执行,随后我们再打印输出一下 task 对象,发现它的状态就变成了 finished,同时还可以看到其 result 变成了 '字符串',也就是我们定义的 get_request() 方法的返回结果。

    另外定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:

    import asyncio
    import time
    
    # 通过async 将函数进行修饰,返回一个协程对象
    async def get_request(url):
        print('正在请求',url)
        time.sleep(2)
        print('请求结束',url)
        return '协程的返回值'
    
    
    #创建一个协程对象
    c = get_request('www.baidu.com')
    print('创建一个协程对象')
    
    #创建一个事件循环
    loop = asyncio.get_event_loop()
    
    #创建一个任务对象,将协程对象的c进行封装,使用的的是ensure_future
    task = asyncio.ensure_future(c)
    print('Befor Task:',task)
    
    
    # 将事件对象添加到事件循环中,并执行
    loop.run_until_complete(task)
    
    print('After Task:',task)
    
    # 创建一个协程对象
    # Befor Task: <Task pending coro=<get_request() running at C:/Users/wanglixing/Desktop/文件夹/test/任务对象.py:5>>
    # 正在请求 www.baidu.com
    # 请求结束 www.baidu.com
    # After Task: <Task finished coro=<get_request() done, defined at C:/Users/wanglixing/Desktop/文件夹/test/任务对象.py:5> result='协程的返回值'>
    
    
  • 绑定回调:也可以为某个 task 绑定一个回调方法,来看下面的例子

    import asyncio
    import time
    start = time.time()
    # 通过async 将函数进行修饰,返回一个协程对象
    async def get_request(url):
        print('正在请求',url)
        time.sleep(2)
        print('请求结束',url)
        return '协程的返回值'
    
    #创建一个协程对象
    c = get_request('www.baidu.com')
    
    #创建一个事件循环
    loop = asyncio.get_event_loop()
    #创建一个任务对象,将协程对象的c进行封装
    task = asyncio.ensure_future(c)
    print('Befor Task:',task)
    
    # 回调函数,必须有一个参数接受task
    def callback(task):
        print('i am callback')
        # .result() 函数返回task的返回值
        print(task.result())
    
    # 绑定回调函数,task执行完了后执行
    task.add_done_callback(callback)
    
    # 将事件对象添加到事件循环中,并执行
    loop.run_until_complete(task)   #执行task后执行回调
    print('After Task:',task)
    print('执行时间:',time.time()-start)
    
    # 正在请求 www.baidu.com
    # 请求结束 www.baidu.com
    # i am callback
    # 协程的返回值
    # After Task: <Task finished coro=<get_request() done, defined at C:/Users/wanglixing/Desktop/文件夹/test/任务对象.py:5> result='协程的返回值'>
    #执行时间: 2.001148223876953
    

    1567171493322

    • 在这里我们定义了一个 request() 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print() 语句。随后我们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,然后调用 print() 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback() 方法即可,我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了。

      实际上不用回调方法,直接在 task 运行完毕之后也可以直接调用 result() 方法获取结果,运行结果是一样的:

      import asyncio
      import requests
       
      async def request():
          url = 'https://www.baidu.com'
          status = requests.get(url).status_code
          return status
       
      coroutine = request()
      task = asyncio.ensure_future(coroutine)
      print('Task:', task)
       
      loop = asyncio.get_event_loop()
      loop.run_until_complete(task)
      print('Task:', task)
      print('Task Result:', task.result())
      

# 多任务异步协程

​ 上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行。

  • 错误示例:

    import asyncio
    from time import sleep
    import time
    
    urls = [
        'http://127.0.0.1:5000/bobo',
        'http://127.0.0.1:5000/jay',
        'http://127.0.0.1:5000/tom',
    ]
    
    async def get_request(url):
        print('正在请求',url)
        time.sleep(2)
        print('请求结束',url)
        return '协程的返回值'
    
    tasks = []
    for url in urls:
        # 每次循环创建一个协程
        c = get_request(url)
        task=asyncio.ensure_future(c)
        tasks.append(task)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(tasks)  
    
     #TypeError: An asyncio.Future, a coroutine or an awaitable is required,当某一个事件发生阻塞时程序不会主动切换,所以报错,得手动挂起
    
    • 使用asynic中的wait函数自动封装里面的阻塞函数

      import asyncio
      from time import sleep
      import time
      
      urls = [
          'http://127.0.0.1:5000/bobo',
          'http://127.0.0.1:5000/jay',
          'http://127.0.0.1:5000/tom',
      ]
      
      start = time.time()
      async def get_request(url):
          print('正在请求',url)
          time.sleep(2)  #在待执行函数的代码块中更不可以出现不支持异步模块的代码
          print('请求结束',url)
          return '协程的返回值'
      
      tasks = []
      for url in urls:
          # 每次循环创建一个协程
          c = get_request(url)
          task=asyncio.ensure_future(c)
          tasks.append(task)
      
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.wait(tasks))  #自动检测任务对象的阻塞
      print(time.time()-start)  #6.002898693084717,并没有异步,因为函数内部中有阻塞,sleep阻塞
      
      -------------------------------------------------------------------
       #使用asyncio.sleep 将会避免阻塞,主动挂起
      import asyncio
      from time import sleep
      import time
      
      urls = [
          'http://127.0.0.1:5000/bobo',
          'http://127.0.0.1:5000/jay',
          'http://127.0.0.1:5000/tom',
      ]
      
      start = time.time()
      async def get_request(url):
          print('正在请求',url)
          asyncio.sleep(2)  # 没有挂起,会直接忽略这一步i/o操作
          print('请求结束',url)
          return '协程的返回值'
      
      tasks = []
      for url in urls:
          # 每次循环创建一个协程
          c = get_request(url)
          task=asyncio.ensure_future(c)
          tasks.append(task)
      
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.wait(tasks))
      print(time.time()-start) #0.0
      
      
      ------------------------------------------------------------
       # asyncio.wait() 会主动捕获await 修饰的阻塞,注意不是有了await,就会异步,必须时asyncio能识别的阻塞,如: await sleep(2) 就不能异步:
      import asyncio
      from time import sleep
      import time
      
      urls = [
          'http://127.0.0.1:5000/bobo',
          'http://127.0.0.1:5000/jay',
          'http://127.0.0.1:5000/tom',
      ]
      
      start = time.time()
      async def get_request(url):
          print('正在请求',url)
          await asyncio.sleep(2)
          print('请求结束',url)
          return '协程的返回值'
      
      tasks = []
      for url in urls:
          # 每次循环创建一个协程
          c = get_request(url)
          task=asyncio.ensure_future(c)
          tasks.append(task)
      
      loop = asyncio.get_event_loop()
      loop.run_until_complete(asyncio.wait(tasks))
      print(time.time()-start)  #2.0   
      

      总结:在待执行代码块中不可以出现不支持异步操作的代码;在该函数中内部如果有阻塞操作必须使用await关键字主动挂起(反问:request.get等操作能被asyncio捕获?asyncio认识它?)

  • 验证await的用法:

    我们还是使用之前我们建好的flask服务器,使用下面代码测试

    import asyncio
    import requests
    import time
    start = time.time()
     
    async def request():
        url = 'http://127.0.0.1:5000'
        print('Waiting for', url)
        response = requests.get(url)
        print('Get response from', url, 'Result:', response.text)
     
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
     
    end = time.time()
    print('Cost time:', end - start) #10,发现并没减少,因为没有使用await挂起
    

    ​ 要实现异步,接下来我们再了解一下 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。所以,我们可能会将代码中的 request() 方法改成如下的样子:

    async def request():
        url = 'http://127.0.0.1:5000'
        print('Waiting for', url)
        response = await requests.get(url)
        print('Get response from', url, 'Result:', response.text)
        
     #仅仅是在 requests 前面加了一个 await,然而执行以下代码,会得到如下报错:TypeError: object Response can't be used in 'await' expression,不识别这个阻塞事件
    
    这次它遇到 await 方法确实挂起了,也等待了,但是最后却报了这么个错,这个错误的意思是 requests 返回的 Response 对象不能和 await 一起使用,为什么呢?因为根据官方文档说明,await 后面的对象必须是如下格式之一:
    	1、A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
    
    	2、A generator-based coroutine object returned from a function decorated with types.coroutine(),一个由 types.coroutine() 修饰的生成器,这个生成器可以返回 coroutine 对象。
    
    	3、An object with an await__ method returning an iterator,一个包含 __await 方法的对象返回的一个迭代器。
    

    我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。

  • 总结多任务异步协程:

    - 注意事项:
        - 1.将多个任务对象存储到一个列表中,然后将该列表注册到事件循环中.在注册的过程中,该列表需要被
            wait方法进行处理.
        - 2.在任务对象对应的特殊函数内部的实现中,不可以出现不支持异步模块的代码,否则就会中断整个的
            异步效果.并且,在该函数内部每一组阻塞的操作都必须使用await关键字进行修饰.
        - 3.requests模块对应的代码不可以出现在特殊函数内部,因为requests是一个不支持异步的模块
    

# aiohttp

官网 :Async HTTP client/server for asyncio and Python, 基于HTTP,专门为asynico和python搭桥的,即支持异步操作的网路请求模块

环境安装:pip install aiohttp

  • 简单使用

    import aiohttp
    import asyncio
    import requests
    import time
    
    start = time.time()
    
    urls = [
        #bobo url对应的视图函数返回的是一个html页面,我们可以进行解析
        'http://127.0.0.1:5000/bobo',
        'http://127.0.0.1:5000/bobo',
        'http://127.0.0.1:5000/bobo',
    ]
    
    async def request(url):
        async with aiohttp.ClientSession() as s:
            async with s.get(url) as response:
                # response.read() 返回的是byte类型,其他的和request类似
                page_text = await response.text()
                return page_text
        # 细节:在每一个with前面加上async,在每一步的阻塞操作前加上await
    
    from lxml import html
    etree = html.etree
    def parse(task):
        page_text = task.result() # 拿到返回结果
        tree = etree.HTML(page_text)#python3.5之后不再lxml不再支持etree,所以需要使用这种方式使用
        name = tree.xpath('//p/text()')[0]
        print(name)
    
    tasks = []
    for url in urls:
        c = request(url)
        task = asyncio.ensure_future(c)
        task.add_done_callback(parse) # 添加回调函数,对返回的html文件进行解析
        tasks.append(task)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('Cost time:', end - start)
    
    # i am bobo!!!
    # i am bobo!!!
    # i am bobo!!!
    # Cost time: 2.031763792037964