多进程:https://docs.python.org/zh-cn/3.7/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing
import os from multiprocessing import Process def func(s): # 输出传入的参数,当前子进程的进程ID,当前进程的父进程ID print(s, os.getpid(), os.getppid()) # 注意:此处的if __name__ == '__main__'语句不能少 if __name__ == '__main__': # 打印当前进程的进程ID print(os.getpid()) print('main process start...') # 创建进程对象 p = Process(target=func, args=('hello', )) # 生成一个进程,并开始运行新的进程 p.start() # 等待子进程运行完毕 p.join() print('main process end!')
# 区分父子进程 import os import time res = os.fork() print(f'res == {res}') if res == 0: print(f'我是子进程,我的pid是:{os.getpid()}我的父进程id是:{os.getppid()}') else: print(f'我是父进程,我的pid是: {os.getpid()}') # fork()运行时,会有2个返回值,返回值为大于0时,此进程为父进程,且返回的数字为子进程的PID;当返回值为0时,此进程为子进程。 # 注意:父进程结束时,子进程并不会随父进程立刻结束。同样,父进程不会等待子进程执行完。 # 注意:os.fork()无法在windows上运行。
# 参数 # multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}) # - group:分组,实际上很少使用 # - target:表示调用对象,你可以传入方法的名字 # - name:别名,相当于给这个进程取一个名字 # - args:表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可 # - kwargs:表示调用对象的字典 from multiprocessing import Process def f(name): print(f'hello {name}') if __name__ == '__main__': p = Process(target=f, args=('john',)) p.start() p.join() # join([timeout]) # 如果可选参数 timeout 是 None (默认值),则该方法将阻塞, # 直到调用 join() 方法的进程终止。如果 timeout 是一个正数, # 它最多会阻塞 timeout 秒。 # 请注意,如果进程终止或方法超时,则该方法返回 None 。 # 检查进程的 exitcode 以确定它是否终止。 # 一个进程可以合并多次。 # 进程无法并入自身,因为这会导致死锁。 # 尝试在启动进程之前合并进程是错误的。
import time from multiprocessing import Process import os def run(): print("子进程开启") time.sleep(2) print("子进程结束") if __name__ == "__main__": print("父进程启动") p = Process(target=run) p.start() # p.join() print("父进程结束") # # 输出结果 # 父进程启动 # 父进程结束 # 子进程开启 # 子进程结束
# 显示所涉及的各个进程ID,这是一个扩展示例 from multiprocessing import Process import os import multiprocessing def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main') p = Process(target=f, args=('bob',)) p.start() for p in multiprocessing.active_children(): print(f'子进程名称: {p.name} id: { str(p.pid) }' ) print('进程结束') print(f'CPU核心数量: { str(multiprocessing.cpu_count()) }') p.join()
# multiprocessing.Process的run()方法 import os import time from multiprocessing import Process class NewProcess(Process): #继承Process类创建一个新类 def __init__(self,num): self.num = num super().__init__() def run(self): #重写Process类中的run方法. while True: print(f'我是进程 {self.num} , 我的pid是: {os.getpid()}') time.sleep(1) for i in range(2): p = NewProcess(i) p.start() # 当不给Process指定target时,会默认调用Process类里的run()方法。 # 这和指定target效果是一样的,只是将函数封装进类之后便于理解和调用。
# 全局变量在多个进程中不能共享 # 在子进程中修改全局变量对父进程中的全局变量没有影响。 # 因为父进程在创建子进程时对全局变量做了一个备份, # 父进程中的全局变量与子进程的全局变量完全是不同的两个变量。 # 全局变量在多个进程中不能共享 from multiprocessing import Process from time import sleep num = 100 def run(): print("子进程开始") global num num += 1 print(f"子进程num:{num}" ) print("子进程结束") if __name__ == "__main__": print("父进程开始") p = Process(target=run) p.start() p.join() # 在子进程中修改全局变量对父进程中的全局变量没有影响 print("父进程结束。num:%s" % num) # # 输出结果 # 父进程开始 # 子进程开始 # 子进程num:101 # 子进程结束 # 父进程结束。num:100
# multiprocessing 支持进程之间的两种通信通道 # 队列 # 来自官方文档的一个简单demo # Queue 类是一个近似 queue.Queue 的克隆 # 现在有这样一个需求:我们有两个进程,一个进程负责写(write)一个进程负责读(read)。 # 当写的进程写完某部分以后要把数据交给读的进程进行使用 # write()将写完的数据交给队列,再由队列交给read() from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join() # 队列是线程和进程安全的
from multiprocessing import Process, Queue import os, time def write(q): print("启动Write子进程:%s" % os.getpid()) for i in ["A", "B", "C", "D"]: q.put(i) # 写入队列 time.sleep(1) print("结束Write子进程:%s" % os.getpid()) def read(q): print("启动Write子进程:%s" % os.getpid()) while True: # 阻塞,等待获取write的值 value = q.get(True) print(value) print("结束Write子进程:%s" % os.getpid()) # 不会执行 if __name__ == "__main__": # 父进程创建队列,并传递给子进程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() # pr进程是一个死循环,无法等待其结束,只能强行结束 # (写进程结束了,所以读进程也可以结束了) pr.terminate() print("父进程结束") # 输出结果: # 启动Write子进程:29564 # 启动Write子进程:22852 # A # B # C # D # 结束Write子进程:22852 # 父进程结束
# 管道 # 官方文档 # Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向) from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join() # 返回的两个连接对象 Pipe() 表示管道的两端。 # 每个连接对象都有 send() 和 recv() 方法(相互之间的)。 # 请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端, # 则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
# 在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此 # 如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法 # 共享内存 shared memory # 可以使用 Value 或 Array 将数据存储在共享内存映射中 # 这里的Array和numpy中的不同,它只能是一维的,不能是多维的。 # 同样和Value 一样,需要定义数据形式,否则会报错 from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) # 将打印 # 3.1415927 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] # 创建 num 和 arr 时使用的 'd' 和 'i' # 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。 # 这些共享对象将是进程和线程安全的。
# 进程锁Lock # 不加进程锁 # 让我们看看没有加进程锁时会产生什么样的结果。 import multiprocessing as mp import time def job(v, num): for _ in range(5): time.sleep(0.1) # 暂停0.1秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value, end="|") def multicore(): v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v,1)) p2 = mp.Process(target=job, args=(v,3)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore() # 在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 # 在job()中我们想让v每隔0.1秒输出一次累加num的结果, # 但是在两个进程p1和p2 中设定了不同的累加值。 # 所以接下来让我们来看下这两个进程是否会出现冲突。 # 结论:进程1和进程2在相互抢着使用共享内存v。
# 加进程锁 # 为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。 import multiprocessing as mp import time # 在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占 def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # 获取共享内存 print(v.value, end="|") l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享内存 # 进程锁的信息传入各个进程中 p1 = mp.Process(target=job, args=(v,1,l)) p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore() # 运行一下,让我们看看是否还会出现抢占资源的情况 # 显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行 # 在某些特定的场景下要共享string类型,方式如下: from ctypes import c_char_p str_val = mp.Value(c_char_p, b"Hello World")
# 加进程锁 # 为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。 import multiprocessing as mp import time # 在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占 def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # 获取共享内存 print(v.value, end="|") l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享内存 # 进程锁的信息传入各个进程中 p1 = mp.Process(target=job, args=(v,1,l)) p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore() # 运行一下,让我们看看是否还会出现抢占资源的情况 # 显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行 # 在某些特定的场景下要共享string类型,方式如下: from ctypes import c_char_p str_val = mp.Value(c_char_p, b"Hello World")
# Pool 类表示一个工作进程池 # 如果要启动大量的子进程,可以用进程池的方式批量创建子进程 from multiprocessing.pool import Pool from time import sleep, time import random import os def run(name): print("%s子进程开始,进程ID:%d" % (name, os.getpid())) start = time() sleep(random.choice([1, 2, 3, 4])) end = time() print("%s子进程结束,进程ID:%d。耗时0.2%f" % (name, os.getpid(), end-start)) if __name__ == "__main__": print("父进程开始") # 创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数 p = Pool(4) for i in range(10): # 创建进程,放入进程池统一管理 p.apply_async(run, args=(i,)) # 如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程 p.close() # 进程池对象调用join,会等待进程吃中所有的子进程结束完毕再去结束父进程 p.join() print("父进程结束。") p.terminate() # Pool(8):创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数果。 # close():如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程 # join():进程池对象调用join,会等待进程池中所有的子进程结束完毕再去结束父进程 # terminate():一旦运行到此步,不管任务是否完成,立即终止。
from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
# join dead lock from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get() # 交换最后两行可以修复这个问题(或者直接删掉 p.join())
# process vs thread import multiprocessing as mp def job(q): res = 0 for i in range(1000000): res += i+i**2+i**3 q.put(res) # queue # 多核 def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore:',res1 + res2) # 创建多线程mutithread # 接下来创建多线程程序,创建多线程和多进程有很多相似的地方。 # 首先import threading然后定义multithread()完成同样的任务 import threading as td def multithread(): q = mp.Queue() # thread可放入process同样的queue中 t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread:', res1 + res2) # 创建普通函数 def normal(): res = 0 for _ in range(2): for i in range(1000000): res += i + i**2 + i**3 print('normal:', res) # 在上面例子中我们建立了两个进程或线程,均对job()进行了两次运算, # 所以在normal()中我们也让它循环两次 # 运行时间 import time if __name__ == '__main__': st = time.time() normal() st1 = time.time() print('normal time:', st1 - st) multithread() st2 = time.time() print('multithread time:', st2 - st1) multicore() print('multicore time:', time.time() - st2) # 普通/多线程/多进程的运行时间分别是1.41,1.47和0.75秒。 # 我们发现多核/多进程最快,说明在同时间运行了多个任务。 # 而多线程的运行时间居然比什么都不做的程序还要慢一点, # 说明多线程还是有一定的短板的(GIL)。