# 常用模块(二)

# hmac模块

实现了标准的Hmac算法

  • HMAC的一个典型应用是用在“挑战/响应”(Challenge/Response)身份认证中(应用场景):

      1. 客户端向服务器发出一个验证请求

      2. 服务器接到此请求后生成一个随机数并通过网络传输给客户端(此为挑战)

      3. 客户端将收到的随机数提供给ePass,由ePass使用该随机数存储在ePass中的密钥进行HMAC-MD5运算并得到一个结果作为认证证据传给服务器(此为响应)。

      4. 与此同时,服务器也使用该随机数存储在服务器数据库中的该客户密钥进行HMAC-MD5运算,如果服务器的运算结果与客户端传回的响应结果相同,则认为客户端是一个合法用户

  • 安全性

    • HMAC算法更象是一种加密算法,它引入了密钥,其安全性已经不完全依赖于所使用的HASH算法

      1. 使用的密钥是双方事先约定的,第三方不可能知道。能够得到的信息只有作为“挑战”的随机数和作为“响应”的HMAC结果,无法根据这两个数据推算出密钥。由于不知道密钥,所以无法仿造出一致的响应。

      2. HMAC与一般的加密重要的区别在于它具有“瞬时”性,即认证只在当时有效

  • 代码实现

    #服务端
    from socket import *
    import hmac,os
    secret_key=b'Jedan has a big key!'  #密钥
    def conn_auth(conn):
        print('开始验证新链接的合法性')
        msg=os.urandom(32)     #生成一个32字节的随机字符串
        conn.sendall(msg)
        h=hmac.new(secret_key,msg) #第一个参数是key密钥,第二个参数是随机字符串,第三个参数默认为md5加密方式
        digest=h.digest()    #生成ascii格式的密文,还可以生成十六进制的 h.hexdigest()
        respone=conn.recv(len(digest))
        return hmac.compare_digest(respone,digest)  #进行比较
    
    def data_handler(conn,bufsize=1024):
        if not conn_auth(conn):
            print('该链接不合法,关闭')
            conn.close()
            return
        print('链接合法,开始通信')
        while True:
            data=conn.recv(bufsize)
            if not data:break
            conn.sendall(data.upper())
    
    def server_handler(ip_port,bufsize,backlog=5):
        tcp_socket_server=socket(AF_INET,SOCK_STREAM)
        tcp_socket_server.bind(ip_port)
        tcp_socket_server.listen(backlog)
        while True:
            conn,addr=tcp_socket_server.accept()
            print('新连接[%s:%s]' %(addr[0],addr[1]))
            data_handler(conn,bufsize)
    
    if __name__ == '__main__':
        ip_port=('127.0.0.1',9999)
        bufsize=1024
        server_handler(ip_port,bufsize)
     
    #客服端
    from socket import *
    import hmac,os
    secret_key=b'Jedan has a big key!'
    def conn_auth(conn):
        msg=conn.recv(32)
        h=hmac.new(secret_key,msg)
        digest=h.digest()
        conn.sendall(digest)
    
    def client_handler(ip_port,bufsize=1024):
        tcp_socket_client=socket(AF_INET,SOCK_STREAM)
        tcp_socket_client.connect(ip_port)
        conn_auth(tcp_socket_client)
    
        while True:
            data=input('>>: ').strip()
            if not data:continue
            if data == 'quit':break
            tcp_socket_client.sendall(data.encode('utf-8'))
            respone=tcp_socket_client.recv(bufsize)
            print(respone.decode('utf-8'))
        tcp_socket_client.close()
    
    if __name__ == '__main__':
        ip_port=('127.0.0.1',9999)
        bufsize=1024
        client_handler(ip_port,bufsize)
    

# struct模块

  • python中的struct模块就提供了这样的机制,该模块的主要作用就是对python基本类型值与用python字符串格式表示的C struct类型间的转化

  • 准确地讲,Python没有专门处理字节的数据类型。但由于b'str'可以表示字节,所以,字节数组=二进制str。而在C语言中,我们可以很方便地用struct、union来处理字节,以及字节和int,float的转换。

    在Python中,比方说要把一个32位无符号整数变成字节,也就是4个长度的bytes,你得配合位运算符这么写:

    n = 10240099
    b1 = (n & 0xff000000) >> 24  #>> 和 <<都是位运算,对二进制数进行移位操作,表示向右移动24位,并保留后面的
    b2 = (n & 0xff0000) >> 16
    b3 = (n & 0xff00) >> 8
    b4 = n & 0xff
    print( n & 0xff0000)  #10223616,所以 0xff0000表示保留多少为二进制
    print(b1,b2,b3)  # 0 156 64
    print(bytes([b1, b2, b3, b4])) #b'\x00\x9c@c'
    
  • 上面的方法比较哦麻烦,而且不能准换浮点,所以python为我们提供了struct模块用于解决bytes和其他二进制数据类型的转换

    • 基本用法pack和unpack:

      #struct的pack函数把任意数据类型变成bytes:
      import struct
      struct.pack('>I', 10240099)   #b'\x00\x9c@c'
      
      #pack的第一个参数是处理指令,'>I'的意思是:>表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数。后面的参数个数要和处理指令一致。
      
      #unpack把bytes变成相应的数据类型:
      struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80')  #(4042322160, 32896)
      
      #根据>IH的说明,后面的bytes依次变为I:4字节无符号整数和H:2字节无符号整数
      

# itertools模块

  • 介绍:它是用于一些可迭代对象的处理工具,在做面试题时非常简单

  • 1、combinations(iterable, r) :将可迭代对象中每r个元素按序进行组合,排列组合

    #例题:输入一个字符串,输出该字符串的所有组合,例如输入:'1,2,3',则输出为1,2,3,12,13,23,123
    from itertools import combinations
    inp='1,2,3,4'
    nums=inp.split(',')
    ret_1=[''.join(nums),]
    for i in range(1,len(nums)):
        ret = combinations(nums,i)
        # print(map(lambda x:x,ret))
        list(map(lambda x:ret_1.append(''.join(x)),ret))
    
    print(ret_1)
    
    #源码
    def combinations(iterable, r):
        # combinations('ABCD', 2) --> AB AC AD BC BD CD
        # combinations(range(4), 3) --> 012 013 023 123
        pool = tuple(iterable)
        n = len(pool)
        if r > n:
            return
        indices = list(range(r)) # 索引
        yield tuple(pool[i] for i in indices)   # 最初返回一个字符长度的
        while True:
            for i in reversed(range(r)):
                if indices[i] != i + n - r:  #为了防止r=n
                    break
            else:
                return
            indices[i] += 1
            for j in range(i+1, r):
                indices[j] = indices[j-1] + 1
            yield tuple(pool[i] for i in indices)
            
     #示例
    >>> from itertools import combinations
    >>> comb = combinations('abcd', 3)
    >>> for i in comb:
    ...     print(i)
    ... 
    ('a', 'b', 'c')
    ('a', 'b', 'd')
    ('a', 'c', 'd')
    ('b', 'c', 'd')
    
  • 2、combinations_with_replacement(iterable, r) 按照顺序从可迭代对象中取r个元素进行组合,允许使用重复的元素,功能相当于:

    #源码分析
    def combinations_with_replacement(iterable, r):
        # combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CC
        pool = tuple(iterable)
        n = len(pool)
        if not n and r:
            return
        indices = [0] * r
        yield tuple(pool[i] for i in indices)
        while True:
            for i in reversed(range(r)):
                if indices[i] != n - 1:
                    break
            else:
                return
            indices[i:] = [indices[i] + 1] * (r - i)
            yield tuple(pool[i] for i in indices)
            
     #示例
    >>> from itertools import combinations_with_replacement
    >>> a = combinations_with_replacement('abc', 2)
    >>> for i in a:
    ...     print(i)
    ... 
    ('a', 'a')
    ('a', 'b')
    ('a', 'c')
    ('b', 'b')
    ('b', 'c')
    ('c', 'c')
    
  • 3、accumulate(iterable[, func]) 将一个二元操作的函数作用于一个可迭代对象上,每次循环计算时,函数的两个参数一个是可迭代对象中当前值,另一个是上次计算得到的结果。函数的返回值是一个由每次计算得到的结果组成的可迭代对象。也就是迭代对象的每个值作用于这个函数func(两个参数),返回一个,留一个等待辖有一个的操作,类似于reduce()

    #源码实现
    def accumulate(iterable, func=operator.add):
        'Return running totals'
        # accumulate([1,2,3,4,5]) --> 1 3 6 10 15
        # accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120
        it = iter(iterable)
        try:
            total = next(it)
        except StopIteration:
            return
        yield total
        for element in it:
            total = func(total, element)
            yield total
            
    #二元操作的函数如下:
    min(): 计算最小值
    max(): 计算最大值
    operator.mul(): 叠乘
    operator.add(): 叠加
     
    #示例
    >>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8]
    >>> list(accumulate(data, operator.mul))     # 叠乘
    [3, 12, 72, 144, 144, 1296, 0, 0, 0, 0]
    >>> list(accumulate(data, max))              # 计算最大值
    [3, 4, 6, 6, 6, 9, 9, 9, 9, 9]
    # 将上一个结果乘1.25,然后加上下一个迭代值
    >>> cashflows = [1000, -90, -90, -90, -90]
    >>> list(accumulate(cashflows, lambda bal, pmt: bal*1.05 + pmt))
    [1000, 960.0, 918.0, 873.9000000000001, 827.5950000000001]
    # 这个示例相当于单目迭代运算
    >>> logistic_map = lambda x, _:  r * x * (1 - x)
    >>> r = 3.8
    >>> x0 = 0.4
    >>> inputs = repeat(x0, 36)     # 初始化值
    >>> [format(x, '.2f') for x in accumulate(inputs, logistic_map)]
    ['0.40', '0.91', '0.30', '0.81', '0.60', '0.92', '0.29', '0.79', '0.63',
     '0.88', '0.39', '0.90', '0.33', '0.84', '0.52', '0.95', '0.18', '0.57',
     '0.93', '0.25', '0.71', '0.79', '0.63', '0.88', '0.39', '0.91', '0.32',
     '0.83', '0.54', '0.95', '0.20', '0.60', '0.91', '0.30', '0.80', '0.60']
    
  • chain(*itables) 将多个可迭代对象进行合并,相当于如下代码:

    def chain(*iterables):
        # chain('ABC', 'DEF') --> A B C D E F
        for it in iterables:
            for element in it:
                yield element
                
    #示例
    >>> from itertools import chain
    >>> chain([1, 2, 3], [4, 5, 6])
    <itertools.chain object at 0x7f751ad90b70>
    >>> a = chain([1, 2, 3], [4, 5, 6])
    >>> for i in a:
    ...     print(i)
    ... 
    1
    2
    3
    4
    5
    6
    
  • compress(data, selectors) 筛选根据slectors,它为True和False组成的列表或

    def compress(data, selectors):
        # compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
        return (d for d, s in zip(data, selectors) if s)
    
    #示例
    >>> from itertools import compress
    >>> a = compress('abcdef', [1, 0, 1, 0, 1, 1])
    >>> for i in a:
    ...     print(i)
    ... 
    a
    c
    e
    f
    
  • 还有很多工具……

# subprocess 模块

  • 我们学过的让系统中运行命令的方式:os.system('命令')

  • 我们将写一个使用系统命令的模块:

  • 服务端:

     #server服务器端
    import socket
    import subprocess
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    
    conn,addr = sk.accept()
    while 1:
        cmd = conn.recv(1024).decode('utf-8')
        r = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        stdout = r.stdout.read()
        stderr = r.stderr.read()
        if stderr:
            conn.send(stderr)#错误结果或不可执行的命令
        else:
            conn.send(stdout)#正确结果
    
    conn.close()
    sk.close()
    
  • 客户端

     #client客户端
    import socket
    sk = socket.socket()
    
    sk.connect_ex(('127.0.0.1',8080))
    while 1:
        cmd = input('请输入一个命令>>>')
        sk.send(cmd.encode('utf-8'))
    
        result = sk.recv(102400).decode('gbk')
    
        print(result)
    
    
    sk.close()
    
  • 注意编码格式

    import subprocess
    
    def get_file_content(filePath):
        # cmd = f"ffmpeg -y  -i {filePath} -acodec pcm_s16le -f s16le -ac 1 -ar 16000 {filePath}.pcm"
        cmd = 'dir'
        r = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,)
        stdout = r.stdout.read().decode('gbk')
        print(stdout)
        stderr = r.stderr.read()
        ret = r.communicate()
    
    
    #0xc7
    
    get_file_content(' ')
    

# 依赖包打包工具

项目开发的时候,总是要搭建和部署环境,这时,就需要一个python第三方包的list,一般叫做requirements.txt。如果项目使用virtualenv环境,直接使用pip freeze即可,但是如果项目的依赖没有维护,就会很麻烦,这时我们就需要一个叫pipreqs的工具,可以帮助我们自动生成requirements.txt文件。

  • # 使用pip freeze
    $ pip freeze > requirements.txt这种方式配合virtualenv 才好使,否则把整个环境中的包都列出来了。
    

    pipreqs 这个工具的好处是可以通过对项目目录的扫描,自动发现使用了那些类库,自动生成依赖清单。缺点是可能会有些偏差,需要检查并自己调整下。

  • 安装:

    pip3 install pipreqs
    
  • 用法:

    pipreqs ./     #在项目的根目录下使用
    
    	#如果是Windows系统,会报编码错误 (UnicodeDecodeError: 'gbk' codec can't decode byte 0xa8 in position 24: illegal multibyte sequence) 使用时,指定编码格式      
    
    pipreqs ./ --encoding=utf8 
    

    生成requirements.txt 文件后,可以根据这个文件下载所有的依赖

    用法:pip install -r requriements.txt #即可
    

  • 附加:
详细用法:
pipreqs [options] <path>

选项:
--use-local仅使用本地包信息而不是查询PyPI
--pypi-server <url>使用自定义PyPi服务器
--proxy <url>使用Proxy,参数将传递给请求库。你也可以设置
终端中的环境参数:
$ export HTTP_PROXY =“http://10.10.1.10:3128”
$ export HTTPS_PROXY =“https://10.10.1.10:1080”
--debug打印调试信息
--ignore <dirs> ...忽略额外的目录
--encoding <charset>使用编码参数打开文件
--savepath <file>保存给定文件中的需求列表
--print输出标准输出中的需求列表
--force覆盖现有的requirements.txt
--diff <file>将requirements.txt中的模块与项目导入进行比较。
--clean <file>通过删除未在项目中导入的模块来清理requirements.txt。
  • # pip 常用命令

    pip的用法其实跟linux的yum很像,它可以帮我们安装python所需要的环境包,并且可以包解决依赖关系
    
    1、列出已安装的包 
    pip list
    
    2、安装要安装的包
    pip install xxx
    
    3、 安装特定版本
    pip install django==1.1.5
    pip freeze > requestment.txt
    
    4、 从导出的python环境中安装所需要的包
    pip install -r requestment.txt
    
    5、 卸载导出的python环境中的包
    pip uninstall -r requestment.txt
    
    6、 升级包 
    pip install -U <包名>
    
    7、 显示包所在的目录
    pip show -f <包名>
    
    8、 搜索包
    pip search <搜索关键字>
    
    9、 查询可升级的包 
    pip list -o
    
    10、 下载包而不安装 
    pip install <包名> -d <目录> 或 pip install -d <目录> -r requirements.txt
    
    11、更换国内pypi镜像 
    阿里:https://mirrors.aliyun.com/pypi/simple 
    豆瓣:http://pypi.douban.com/simple 
    中国科学技术大学:http://pypi.mirrors.ustc.edu.cn/simple/ 
    
    pip install <包名> -i http://pypi.v2ex.com/simple