Python(多进程)

多进程:https://docs.python.org/zh-cn/3.7/library/multiprocessing.html?highlight=multiprocessing#module-multiprocessing

import os
from multiprocessing import Process
def func(s):
    # 输出传入的参数,当前子进程的进程ID,当前进程的父进程ID
    print(s, os.getpid(), os.getppid())
# 注意:此处的if __name__ == '__main__'语句不能少
if __name__ == '__main__':
    # 打印当前进程的进程ID
    print(os.getpid())
    print('main process start...')
    # 创建进程对象
    p = Process(target=func, args=('hello', ))
    # 生成一个进程,并开始运行新的进程
    p.start()
    # 等待子进程运行完毕
    p.join()
    print('main process end!')

 

 

# 区分父子进程
import os
import time

res = os.fork()
print(f'res == {res}')

if res == 0:
    print(f'我是子进程,我的pid是:{os.getpid()}我的父进程id是:{os.getppid()}')
else:
    print(f'我是父进程,我的pid是: {os.getpid()}')


# fork()运行时,会有2个返回值,返回值为大于0时,此进程为父进程,且返回的数字为子进程的PID;当返回值为0时,此进程为子进程。
# 注意:父进程结束时,子进程并不会随父进程立刻结束。同样,父进程不会等待子进程执行完。
# 注意:os.fork()无法在windows上运行。
# 参数
# multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

# - group:分组,实际上很少使用
# - target:表示调用对象,你可以传入方法的名字
# - name:别名,相当于给这个进程取一个名字
# - args:表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
# - kwargs:表示调用对象的字典

from multiprocessing import Process

def f(name):
    print(f'hello {name}')

if __name__ == '__main__':
    p = Process(target=f, args=('john',))
    p.start()
    p.join()
# join([timeout])
# 如果可选参数 timeout 是 None (默认值),则该方法将阻塞,
# 直到调用 join() 方法的进程终止。如果 timeout 是一个正数,
# 它最多会阻塞 timeout 秒。
# 请注意,如果进程终止或方法超时,则该方法返回 None 。
# 检查进程的 exitcode 以确定它是否终止。
# 一个进程可以合并多次。
# 进程无法并入自身,因为这会导致死锁。
# 尝试在启动进程之前合并进程是错误的。
import time
from multiprocessing import Process
import os
def run():
    print("子进程开启")
    time.sleep(2)
    print("子进程结束")


if __name__ == "__main__":
    print("父进程启动")
    p = Process(target=run)
    p.start()
    # p.join()
    print("父进程结束")
# # 输出结果
# 父进程启动
# 父进程结束
# 子进程开启
# 子进程结束
# 显示所涉及的各个进程ID,这是一个扩展示例

from multiprocessing import Process
import os
import multiprocessing

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main')
    p = Process(target=f, args=('bob',))
    p.start()

    for p in multiprocessing.active_children():
        print(f'子进程名称: {p.name}  id: { str(p.pid) }' )
    print('进程结束')
    print(f'CPU核心数量: { str(multiprocessing.cpu_count()) }')
    
    p.join()


# multiprocessing.Process的run()方法
import os
import time
from multiprocessing import Process

class NewProcess(Process): #继承Process类创建一个新类
    def __init__(self,num):
        self.num = num
        super().__init__()

    def run(self):  #重写Process类中的run方法.
        while True:
            print(f'我是进程 {self.num} , 我的pid是: {os.getpid()}')
            time.sleep(1)

for i in range(2):
    p = NewProcess(i)
    p.start()
# 当不给Process指定target时,会默认调用Process类里的run()方法。
# 这和指定target效果是一样的,只是将函数封装进类之后便于理解和调用。
# 全局变量在多个进程中不能共享
# 在子进程中修改全局变量对父进程中的全局变量没有影响。
# 因为父进程在创建子进程时对全局变量做了一个备份,
# 父进程中的全局变量与子进程的全局变量完全是不同的两个变量。
# 全局变量在多个进程中不能共享

from multiprocessing import Process
from time import sleep

num = 100


def run():
    print("子进程开始")
    global num
    num += 1
    print(f"子进程num:{num}" )
    print("子进程结束")


if __name__ == "__main__":
    print("父进程开始")
    p = Process(target=run)
    p.start()
    p.join()
  # 在子进程中修改全局变量对父进程中的全局变量没有影响
    print("父进程结束。num:%s" % num)

# # 输出结果
# 父进程开始
# 子进程开始
# 子进程num:101
# 子进程结束
# 父进程结束。num:100
# multiprocessing 支持进程之间的两种通信通道
# 队列
# 来自官方文档的一个简单demo
# Queue 类是一个近似 queue.Queue 的克隆
# 现在有这样一个需求:我们有两个进程,一个进程负责写(write)一个进程负责读(read)。
# 当写的进程写完某部分以后要把数据交给读的进程进行使用
# write()将写完的数据交给队列,再由队列交给read()

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

# 队列是线程和进程安全的
from multiprocessing import Process, Queue
import os, time

def write(q):
    print("启动Write子进程:%s" % os.getpid())
    for i in ["A", "B", "C", "D"]:
        q.put(i)  # 写入队列
        time.sleep(1)
    print("结束Write子进程:%s" % os.getpid())

def read(q):
    print("启动Write子进程:%s" % os.getpid())
    while True:  # 阻塞,等待获取write的值
        value = q.get(True)
        print(value)
    print("结束Write子进程:%s" % os.getpid())  # 不会执行

if __name__ == "__main__":
    # 父进程创建队列,并传递给子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()

    pw.join()
    # pr进程是一个死循环,无法等待其结束,只能强行结束
    # (写进程结束了,所以读进程也可以结束了)
    pr.terminate()
    print("父进程结束")

# 输出结果:
# 启动Write子进程:29564
# 启动Write子进程:22852
# A
# B
# C
# D
# 结束Write子进程:22852
# 父进程结束
# 管道
# 官方文档
# Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)
from multiprocessing import Process, Pipe
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
# 返回的两个连接对象 Pipe() 表示管道的两端。
# 每个连接对象都有 send() 和 recv() 方法(相互之间的)。
# 请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,
# 则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
# 在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此
# 如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法
# 共享内存 shared memory

#  可以使用 Value 或 Array 将数据存储在共享内存映射中
# 这里的Array和numpy中的不同,它只能是一维的,不能是多维的。
# 同样和Value 一样,需要定义数据形式,否则会报错
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

# 将打印
# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
# 创建 num 和 arr 时使用的 'd' 和 'i' 
# 参数是 array 模块使用的类型的 typecode : 'd' 表示双精度浮点数, 'i' 表示有符号整数。
# 这些共享对象将是进程和线程安全的。
# 进程锁Lock
# 不加进程锁
# 让我们看看没有加进程锁时会产生什么样的结果。
import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
        v.value += num # v.value获取共享变量值
        print(v.value, end="|")

def multicore():
    v = mp.Value('i', 0) # 定义共享变量
    p1 = mp.Process(target=job, args=(v,1))
    p2 = mp.Process(target=job, args=(v,3)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

# 在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 
# 在job()中我们想让v每隔0.1秒输出一次累加num的结果,
# 但是在两个进程p1和p2 中设定了不同的累加值。
# 所以接下来让我们来看下这两个进程是否会出现冲突。

# 结论:进程1和进程2在相互抢着使用共享内存v。
# 加进程锁
# 为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
import multiprocessing as mp
import time

# 在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 获取共享内存
        print(v.value, end="|")
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0) # 定义共享内存
    # 进程锁的信息传入各个进程中
    p1 = mp.Process(target=job, args=(v,1,l)) 
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

# 运行一下,让我们看看是否还会出现抢占资源的情况
# 显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

# 在某些特定的场景下要共享string类型,方式如下:
from ctypes import c_char_p
str_val = mp.Value(c_char_p, b"Hello World")
# 加进程锁
# 为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
import multiprocessing as mp
import time

# 在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 获取共享内存
        print(v.value, end="|")
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0) # 定义共享内存
    # 进程锁的信息传入各个进程中
    p1 = mp.Process(target=job, args=(v,1,l)) 
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

# 运行一下,让我们看看是否还会出现抢占资源的情况
# 显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

# 在某些特定的场景下要共享string类型,方式如下:
from ctypes import c_char_p
str_val = mp.Value(c_char_p, b"Hello World")
# Pool 类表示一个工作进程池
# 如果要启动大量的子进程,可以用进程池的方式批量创建子进程
from multiprocessing.pool import Pool
from time import sleep, time
import random
import os

def run(name):
    print("%s子进程开始,进程ID:%d" % (name, os.getpid()))
    start = time()
    sleep(random.choice([1, 2, 3, 4]))
    end = time()
    print("%s子进程结束,进程ID:%d。耗时0.2%f" % (name, os.getpid(), end-start))


if __name__ == "__main__":
    print("父进程开始")
    # 创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数
    p = Pool(4)
    for i in range(10):
        # 创建进程,放入进程池统一管理
        p.apply_async(run, args=(i,))
    # 如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程
    p.close()
    # 进程池对象调用join,会等待进程吃中所有的子进程结束完毕再去结束父进程
    p.join()
    print("父进程结束。")
    p.terminate()

# Pool(8):创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数果。
# close():如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程
# join():进程池对象调用join,会等待进程池中所有的子进程结束完毕再去结束父进程
# terminate():一旦运行到此步,不管任务是否完成,立即终止。
from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError
# join dead lock
from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

#  交换最后两行可以修复这个问题(或者直接删掉 p.join())
# process vs thread
import multiprocessing as mp

def job(q):
    res = 0
    for i in range(1000000):
        res += i+i**2+i**3
    q.put(res) # queue

# 多核
def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)

# 创建多线程mutithread
# 接下来创建多线程程序,创建多线程和多进程有很多相似的地方。
# 首先import threading然后定义multithread()完成同样的任务
import threading as td

def multithread():
    q = mp.Queue() # thread可放入process同样的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)

# 创建普通函数
def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)
# 在上面例子中我们建立了两个进程或线程,均对job()进行了两次运算,
# 所以在normal()中我们也让它循环两次
# 运行时间
import time

if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)

# 普通/多线程/多进程的运行时间分别是1.41,1.47和0.75秒。 
# 我们发现多核/多进程最快,说明在同时间运行了多个任务。 
# 而多线程的运行时间居然比什么都不做的程序还要慢一点,
# 说明多线程还是有一定的短板的(GIL)。