# 常用模块(二)
# hmac模块
实现了标准的Hmac算法
HMAC的一个典型应用是用在“挑战/响应”(Challenge/Response)身份认证中(应用场景):
客户端向服务器发出一个验证请求
服务器接到此请求后生成一个随机数并通过网络传输给客户端(此为挑战)
客户端将收到的随机数提供给ePass,由ePass使用该随机数与存储在ePass中的密钥进行HMAC-MD5运算并得到一个结果作为认证证据传给服务器(此为响应)。
与此同时,服务器也使用该随机数与存储在服务器数据库中的该客户密钥进行HMAC-MD5运算,如果服务器的运算结果与客户端传回的响应结果相同,则认为客户端是一个合法用户
安全性:
HMAC算法更象是一种加密算法,它引入了密钥,其安全性已经不完全依赖于所使用的HASH算法
使用的密钥是双方事先约定的,第三方不可能知道。能够得到的信息只有作为“挑战”的随机数和作为“响应”的HMAC结果,无法根据这两个数据推算出密钥。由于不知道密钥,所以无法仿造出一致的响应。
HMAC与一般的加密重要的区别在于它具有“瞬时”性,即认证只在当时有效
代码实现
#服务端 from socket import * import hmac,os secret_key=b'Jedan has a big key!' #密钥 def conn_auth(conn): print('开始验证新链接的合法性') msg=os.urandom(32) #生成一个32字节的随机字符串 conn.sendall(msg) h=hmac.new(secret_key,msg) #第一个参数是key密钥,第二个参数是随机字符串,第三个参数默认为md5加密方式 digest=h.digest() #生成ascii格式的密文,还可以生成十六进制的 h.hexdigest() respone=conn.recv(len(digest)) return hmac.compare_digest(respone,digest) #进行比较 def data_handler(conn,bufsize=1024): if not conn_auth(conn): print('该链接不合法,关闭') conn.close() return print('链接合法,开始通信') while True: data=conn.recv(bufsize) if not data:break conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5): tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(backlog) while True: conn,addr=tcp_socket_server.accept() print('新连接[%s:%s]' %(addr[0],addr[1])) data_handler(conn,bufsize) if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 server_handler(ip_port,bufsize) #客服端 from socket import * import hmac,os secret_key=b'Jedan has a big key!' def conn_auth(conn): msg=conn.recv(32) h=hmac.new(secret_key,msg) digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)
# struct模块
python中的struct模块就提供了这样的机制,该模块的主要作用就是对python基本类型值与用python字符串格式表示的C struct类型间的转化
准确地讲,Python没有专门处理字节的数据类型。但由于
b'str'
可以表示字节,所以,字节数组=二进制str。而在C语言中,我们可以很方便地用struct、union来处理字节,以及字节和int,float的转换。在Python中,比方说要把一个32位无符号整数变成字节,也就是4个长度的
bytes
,你得配合位运算符这么写:n = 10240099 b1 = (n & 0xff000000) >> 24 #>> 和 <<都是位运算,对二进制数进行移位操作,表示向右移动24位,并保留后面的 b2 = (n & 0xff0000) >> 16 b3 = (n & 0xff00) >> 8 b4 = n & 0xff print( n & 0xff0000) #10223616,所以 0xff0000表示保留多少为二进制 print(b1,b2,b3) # 0 156 64 print(bytes([b1, b2, b3, b4])) #b'\x00\x9c@c'
上面的方法比较哦麻烦,而且不能准换浮点,所以python为我们提供了struct模块用于解决bytes和其他二进制数据类型的转换
基本用法pack和unpack:
#struct的pack函数把任意数据类型变成bytes: import struct struct.pack('>I', 10240099) #b'\x00\x9c@c' #pack的第一个参数是处理指令,'>I'的意思是:>表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数。后面的参数个数要和处理指令一致。 #unpack把bytes变成相应的数据类型: struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80') #(4042322160, 32896) #根据>IH的说明,后面的bytes依次变为I:4字节无符号整数和H:2字节无符号整数
# itertools模块
介绍:它是用于一些可迭代对象的处理工具,在做面试题时非常简单
1、
combinations(iterable, r)
:将可迭代对象中每r个元素按序进行组合,排列组合#例题:输入一个字符串,输出该字符串的所有组合,例如输入:'1,2,3',则输出为1,2,3,12,13,23,123 from itertools import combinations inp='1,2,3,4' nums=inp.split(',') ret_1=[''.join(nums),] for i in range(1,len(nums)): ret = combinations(nums,i) # print(map(lambda x:x,ret)) list(map(lambda x:ret_1.append(''.join(x)),ret)) print(ret_1) #源码 def combinations(iterable, r): # combinations('ABCD', 2) --> AB AC AD BC BD CD # combinations(range(4), 3) --> 012 013 023 123 pool = tuple(iterable) n = len(pool) if r > n: return indices = list(range(r)) # 索引 yield tuple(pool[i] for i in indices) # 最初返回一个字符长度的 while True: for i in reversed(range(r)): if indices[i] != i + n - r: #为了防止r=n break else: return indices[i] += 1 for j in range(i+1, r): indices[j] = indices[j-1] + 1 yield tuple(pool[i] for i in indices) #示例 >>> from itertools import combinations >>> comb = combinations('abcd', 3) >>> for i in comb: ... print(i) ... ('a', 'b', 'c') ('a', 'b', 'd') ('a', 'c', 'd') ('b', 'c', 'd')
2、
combinations_with_replacement(iterable, r)
按照顺序从可迭代对象中取r个元素进行组合,允许使用重复的元素,功能相当于:#源码分析 def combinations_with_replacement(iterable, r): # combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CC pool = tuple(iterable) n = len(pool) if not n and r: return indices = [0] * r yield tuple(pool[i] for i in indices) while True: for i in reversed(range(r)): if indices[i] != n - 1: break else: return indices[i:] = [indices[i] + 1] * (r - i) yield tuple(pool[i] for i in indices) #示例 >>> from itertools import combinations_with_replacement >>> a = combinations_with_replacement('abc', 2) >>> for i in a: ... print(i) ... ('a', 'a') ('a', 'b') ('a', 'c') ('b', 'b') ('b', 'c') ('c', 'c')
3、accumulate(iterable[, func]) 将一个二元操作的函数作用于一个可迭代对象上,每次循环计算时,函数的两个参数一个是可迭代对象中当前值,另一个是上次计算得到的结果。函数的返回值是一个由每次计算得到的结果组成的可迭代对象。也就是迭代对象的每个值作用于这个函数func(两个参数),返回一个,留一个等待辖有一个的操作,类似于reduce()
#源码实现 def accumulate(iterable, func=operator.add): 'Return running totals' # accumulate([1,2,3,4,5]) --> 1 3 6 10 15 # accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120 it = iter(iterable) try: total = next(it) except StopIteration: return yield total for element in it: total = func(total, element) yield total #二元操作的函数如下: min(): 计算最小值 max(): 计算最大值 operator.mul(): 叠乘 operator.add(): 叠加 #示例 >>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8] >>> list(accumulate(data, operator.mul)) # 叠乘 [3, 12, 72, 144, 144, 1296, 0, 0, 0, 0] >>> list(accumulate(data, max)) # 计算最大值 [3, 4, 6, 6, 6, 9, 9, 9, 9, 9] # 将上一个结果乘1.25,然后加上下一个迭代值 >>> cashflows = [1000, -90, -90, -90, -90] >>> list(accumulate(cashflows, lambda bal, pmt: bal*1.05 + pmt)) [1000, 960.0, 918.0, 873.9000000000001, 827.5950000000001] # 这个示例相当于单目迭代运算 >>> logistic_map = lambda x, _: r * x * (1 - x) >>> r = 3.8 >>> x0 = 0.4 >>> inputs = repeat(x0, 36) # 初始化值 >>> [format(x, '.2f') for x in accumulate(inputs, logistic_map)] ['0.40', '0.91', '0.30', '0.81', '0.60', '0.92', '0.29', '0.79', '0.63', '0.88', '0.39', '0.90', '0.33', '0.84', '0.52', '0.95', '0.18', '0.57', '0.93', '0.25', '0.71', '0.79', '0.63', '0.88', '0.39', '0.91', '0.32', '0.83', '0.54', '0.95', '0.20', '0.60', '0.91', '0.30', '0.80', '0.60']
chain(*itables)
将多个可迭代对象进行合并,相当于如下代码:def chain(*iterables): # chain('ABC', 'DEF') --> A B C D E F for it in iterables: for element in it: yield element #示例 >>> from itertools import chain >>> chain([1, 2, 3], [4, 5, 6]) <itertools.chain object at 0x7f751ad90b70> >>> a = chain([1, 2, 3], [4, 5, 6]) >>> for i in a: ... print(i) ... 1 2 3 4 5 6
compress(data, selectors)
筛选根据slectors,它为True和False组成的列表或def compress(data, selectors): # compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F return (d for d, s in zip(data, selectors) if s) #示例 >>> from itertools import compress >>> a = compress('abcdef', [1, 0, 1, 0, 1, 1]) >>> for i in a: ... print(i) ... a c e f
还有很多工具……
# subprocess 模块
我们学过的让系统中运行命令的方式:
os.system('命令')
我们将写一个使用系统命令的模块:
服务端:
#server服务器端 import socket import subprocess sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() conn,addr = sk.accept() while 1: cmd = conn.recv(1024).decode('utf-8') r = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) stdout = r.stdout.read() stderr = r.stderr.read() if stderr: conn.send(stderr)#错误结果或不可执行的命令 else: conn.send(stdout)#正确结果 conn.close() sk.close()
客户端
#client客户端 import socket sk = socket.socket() sk.connect_ex(('127.0.0.1',8080)) while 1: cmd = input('请输入一个命令>>>') sk.send(cmd.encode('utf-8')) result = sk.recv(102400).decode('gbk') print(result) sk.close()
注意编码格式
import subprocess def get_file_content(filePath): # cmd = f"ffmpeg -y -i {filePath} -acodec pcm_s16le -f s16le -ac 1 -ar 16000 {filePath}.pcm" cmd = 'dir' r = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,) stdout = r.stdout.read().decode('gbk') print(stdout) stderr = r.stderr.read() ret = r.communicate() #0xc7 get_file_content(' ')
# 依赖包打包工具
项目开发的时候,总是要搭建和部署环境,这时,就需要一个python第三方包的list,一般叫做requirements.txt。如果项目使用virtualenv环境,直接使用pip freeze即可,但是如果项目的依赖没有维护,就会很麻烦,这时我们就需要一个叫pipreqs的工具,可以帮助我们自动生成requirements.txt文件。
# 使用pip freeze
$ pip freeze > requirements.txt这种方式配合virtualenv 才好使,否则把整个环境中的包都列出来了。
pipreqs 这个工具的好处是可以通过对项目目录的扫描,自动发现使用了那些类库,自动生成依赖清单。缺点是可能会有些偏差,需要检查并自己调整下。
安装:
pip3 install pipreqs
用法:
pipreqs ./ #在项目的根目录下使用 #如果是Windows系统,会报编码错误 (UnicodeDecodeError: 'gbk' codec can't decode byte 0xa8 in position 24: illegal multibyte sequence) 使用时,指定编码格式 pipreqs ./ --encoding=utf8
生成requirements.txt 文件后,可以根据这个文件下载所有的依赖
用法:pip install -r requriements.txt #即可
- 附加:
详细用法:
pipreqs [options] <path>
选项:
--use-local仅使用本地包信息而不是查询PyPI
--pypi-server <url>使用自定义PyPi服务器
--proxy <url>使用Proxy,参数将传递给请求库。你也可以设置
终端中的环境参数:
$ export HTTP_PROXY =“http://10.10.1.10:3128”
$ export HTTPS_PROXY =“https://10.10.1.10:1080”
--debug打印调试信息
--ignore <dirs> ...忽略额外的目录
--encoding <charset>使用编码参数打开文件
--savepath <file>保存给定文件中的需求列表
--print输出标准输出中的需求列表
--force覆盖现有的requirements.txt
--diff <file>将requirements.txt中的模块与项目导入进行比较。
--clean <file>通过删除未在项目中导入的模块来清理requirements.txt。
# pip 常用命令
pip的用法其实跟linux的yum很像,它可以帮我们安装python所需要的环境包,并且可以包解决依赖关系 1、列出已安装的包 pip list 2、安装要安装的包 pip install xxx 3、 安装特定版本 pip install django==1.1.5 pip freeze > requestment.txt 4、 从导出的python环境中安装所需要的包 pip install -r requestment.txt 5、 卸载导出的python环境中的包 pip uninstall -r requestment.txt 6、 升级包 pip install -U <包名> 7、 显示包所在的目录 pip show -f <包名> 8、 搜索包 pip search <搜索关键字> 9、 查询可升级的包 pip list -o 10、 下载包而不安装 pip install <包名> -d <目录> 或 pip install -d <目录> -r requirements.txt 11、更换国内pypi镜像 阿里:https://mirrors.aliyun.com/pypi/simple 豆瓣:http://pypi.douban.com/simple 中国科学技术大学:http://pypi.mirrors.ustc.edu.cn/simple/ pip install <包名> -i http://pypi.v2ex.com/simple
← 常用模块 asyncio 模块 →