python分步式进程计算(python中如何使用分步式进程计算详解)
python分步式进程计算
python中如何使用分步式进程计算详解前言
在python中使用多进程和多线程都能达到同时运行多个任务,和多进程和多线程的选择上,应该优先选择多进程的方式,因为多进程更加稳定,且对于进程的操作管理也更加方便,但有一点是多进程独有的杀手锏,多进程可以将进程分步到多台机器上跑,假如有很多个任务,一台机器即使开了多进程或者多进程跑起来还是要耗很多时间,那么这时就要想一下可否将任务分配到多台机器上跑,这样可以更快的完成任务。
在分步式进程运算中,进程之前的通信还是依赖于queue,但此时的队列不能直接使用,需要使用multiprocessing.managers.basemanager
进行包装,通过回调以后才能使用,既然是分步式的调用,那么应该有一个服务端和一个客户端,服务端通过网络协议将队列中的信息给各个客户端进行调用,客户端也可以通过队列将结果返回,然后服务端进行结果的收集展示,流程如下
分步式流程
服务端将任务放到 task_queue 中,然后四个客户端通过网络端口从task_queue中获取到任务,然后进行计算,再将结果放到result_queue中,最后服务端统一处理结果。整体的流程比较清晰,只是需要强调,这里的队列不能是原始的队列,需要使用basemanager 进行包装。
先看一下服务端的代码
|
#coding:gbk import time, queue from multiprocessing.managers import basemanager from multiprocessing import freeze_support # 任务个数 task_number = 10 # 定义收发队列 task_queue = queue.queue(task_number) result_queue = queue.queue(task_number) def gettask(): return task_queue def getresult(): return result_queue def test(): # windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定 basemanager.register( 'get_task' , callable = gettask) basemanager.register( 'get_result' , callable = getresult) # 绑定端口并设置验证码,windows下需要填写ip地址,linux下不填默认为本地 manager = basemanager(address = ( '127.0.0.1' , 5002 ), authkey = b '123' ) # 启动 manager.start() try : # 通过网络获取任务队列和结果队列 task = manager.get_task() result = manager.get_result() # 添加任务 for i in range (task_number): print ( 'put task %d...' % i) task.put(i) # 每秒检测一次是否所有任务都被执行完 while not result.full(): print (task.qsize()) time.sleep( 1 ) for i in range (result.qsize()): ans = result.get() print ( 'task %d is finish , runtime:%d s' % ans) except : print ( 'manager error' ) finally : manager.shutdown() if __name__ = = '__main__' : # windows下多进程可能会炸,添加这句可以缓解 freeze_support() test() |
这里重点说一下 basemanager.register('get_task', callable=gettask)
这行代码,它的意思是注册一个get_task的操作,执行的操作是gettask()
函数,上面定义了gettask()
函数,返回的是task_queue,这也是之前说的不能直接使用queue.queue
,必须要使用通过basemanager的register接口封装过的的队列,下面使用task = manager.get_task()
来获取到这个队列。
|
manager = basemanager(address = ( '127.0.0.1' , 5002 ), authkey = b '123' ) |
这行代码初始了一个manager,它绑定了本机的5002端口,并且在客户端连接的时候需要一个密码:123。
接下来看一下客户端代码。
|
#coding:gbk import time, sys, queue, random from multiprocessing.managers import basemanager basemanager.register( 'get_task' ) basemanager.register( 'get_result' ) conn = basemanager(address = ( '127.0.0.1' , 5002 ), authkey = b '123' ) try : conn.connect() except : print ( '连接失败' ) sys.exit() task = conn.get_task() result = conn.get_result() while not task.empty(): print (task.qsize()) n = task.get(timeout = 1 ) print ( 'run task %d' % n) sleeptime = random.randint( 0 , 3 ) time.sleep(sleeptime) rt = (n, sleeptime) result.put(rt) if __name__ = = '__main__' : pass ; |
这里主要看以下的代码
|
basemanager.register( 'get_task' ) basemanager.register( 'get_result' ) |
这两个是注册函数,和之前的服务端所对应,之前服务端注册了这两个函数,这里才能注册使用,注意这里不能注册服务端没有注册的函数
运行一下,先运行服务端,然后再启两个cmd运行客户端,也可以在局域网中的另外的机器上运行,但是要修改服务端的ip地址
服务端的结果如下
put task 0...
put task 1...
put task 2...
put task 3...
put task 4...
put task 5...
put task 6...
put task 7...
put task 8...
put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s
两个客户端的结果分别如下
客户端1
10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9
客户端2
7
run task 3
4
run task 6
3
run task 7
2
run task 8
一起运行的截图如下
结果
由于队列是线程安全的,所以这里不用加锁,在客户端中打印print(task.qsize()) 当前的队列大小,可以看到队列的信息中同步到各个客户端的。
最后还是要多说一句,分步式多进程虽然可以把任务分散到不同的机器上运行,可以处理多任务,但是如果此时服务端挂掉的话,任务就全丢掉了,所以在生产环境下还是考虑使用消息中间件如kafka等。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对开心学习网的支持。
原文链接:https://www.yangyanxing.com/article/fractional_in_python.html
- python try高级用法(python try 异常处理史上最全)
- python使用telnet(python 处理telnet返回的More,以及get想要的那个参数方法)
- python中numpy常用函数(使用Python的SymPy库解决数学运算问题的方法)
- python中读取文件怎么操作(Python实现的读取文件内容并写入其他文件操作示例)
- python中for语句的无限循环(python使用for循环计算0-100的整数的和方法)
- python随机生成时间戳(python时间序列按频率生成日期的方法)
- python yield 使用浅析(yii框架使用分页的方法分析)
- python接口管理系统(基于Python实现用户管理系统)
- python停止执行的代码(python定时检测无响应进程并重启的实例代码)
- python中怎样使用列表的sort方法(详解python中sort排序使用)
- python 微信二维码接口(python实现微信防撤回神器)
- kmp算法怎么用c描述(详解小白之KMP算法及python实现)
- jupyter如何编写python(windows系统中Python多版本与jupyter notebook使用虚拟环境的过程)
- python爬虫怎么设置代理ip(python爬虫简单的添加代理进行访问的实现代码)
- python dict 操作(Python中dict和set的用法讲解)
- python 的常用工具(Python静态类型检查新工具之pyright 使用指南)
- 美国数十万加仑牛奶倒入下水道,贫民区食不果腹,历史再次重演(美国数十万加仑牛奶倒入下水道)
- 美国倒掉数十万加仑牛奶 上热搜第一,这一幕似曾相识(美国倒掉数十万加仑牛奶)
- 深度 倒牛奶 这一幕为何又在美国上演(深度倒牛奶)
- 美国数十万加仑牛奶倒下水道怎么回事 原因曝光令人心痛(美国数十万加仑牛奶倒下水道怎么回事)
- 探索中国神秘文字(探索中国神秘文字)
- 重温《蜗居》 宋思明选中海藻为红颜知己,纯属巧合,与爱无关(宋思明选中海藻为红颜知己)
热门推荐
- css边框属性一览(css背景和边框标签实例详解)
- sql server没有服务器怎么办(SQL Server 2012安装后服务器名称找不到的解决办法)
- sqlserver字符串截取填充(SQL Server实现split函数分割字符串功能及用法示例)
- sql server 2016配置管理(SQL Server 2016 配置 SA 登录教程)
- css文本溢出部分成省略号(CSS文本超出2行就隐藏并且显示省略号)
- dedecms设置轮播图(织梦dedecms网站地图改变生成目录的方法)
- windows连接云服务器的软件(Windows云服务器如何开启ping?)
- pythontime模块有哪些(Python3.5内置模块之time与datetime模块用法实例分析)
- vsftpd详细配置(vsFTPd 服务器初学者指南)
- django删掉模型的字段(django中ORM模型常用的字段的使用方法)
排行榜
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9