Python threading模块

线程有2种调用方式,如下:

直接调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time
def sayhi(num): #定义每个线程要运行的函数
    print("running on number:%s" %num)
    time.sleep(3)
if __name__ == '__main__':
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
    t1.start() #启动线程
    t2.start() #启动另一个线程
    print(t1.getName()) #获取线程名
    print(t2.getName())

继承式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
    def run(self):#定义每个线程要运行的函数
        print("running on number:%s" %self.num)
        time.sleep(3)
if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

Join & Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it’s okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you’d have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import time
import threading
def run(n):
    print('[%s]------running----\n' % n)
    time.sleep(2)
    print('--done--')
def main():
    for in range(5):
        = threading.Thread(target=run,args=[i,])
        t.start()
        t.join(1)
        print('starting thread', t.getName())
= threading.Thread(target=main,args=[])
m.setDaemon(True#将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout=2)
print("---main thread done----")

 

Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

 

 

 

线程锁(互斥锁Mutex)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    num  -=1 #对此公共变量进行-1操作
num = 100  #设定一个共享变量
thread_list = []
for in range(100):
    = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for in thread_list: #等待所有线程执行完毕
    t.join()
print('final num:', num )

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import threading
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    lock.acquire() #修改数据前加锁
    num  -=1 #对此公共变量进行-1操作
    lock.release() #修改后释放
num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for in range(100):
    = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for in thread_list: #等待所有线程执行完毕
    t.join()
print('final num:', num )

 

GIL VS Lock 

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 ,具体我们通过下图来看一下+配合我现场讲给大家,就明白了。

那你又问了, 既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  这可以说是Python早期版本的遗留问题。

 

 

 

RLock(递归锁)

说白了就是在一个大锁中还要再包含子锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading,time
def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global  num2
    num2+=1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res,res2)
if __name__ == '__main__':
    num,num2 = 0,0
    lock = threading.RLock()
    for in range(10):
        = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num,num2)

 

Semaphore(信号量)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading,time
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s\n" %n)
    semaphore.release()
if __name__ == '__main__':
    num= 0
    semaphore  = threading.BoundedSemaphore(5#最多允许5个线程同时运行
    for in range(20):
        = threading.Thread(target=run,args=(i,))
        t.start()
while threading.active_count() != 1:
    pass #print threading.active_count()
else:
    print('----all threads done---')
    print(num)

 

Timer

This class represents an action that should be run only after a certain amount of time has passed

Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling thecancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

1
2
3
4
5
def hello():
    print("hello, world")
= Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed

 

 

Events

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m--green light on---\033[0m')
        elif count <13:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('\033[41;1m--red light on---\033[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for in range(3):
        = threading.Thread(target=car,args=(i,))
        t.start()

这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。

复制代码
 1 #_*_coding:utf-8_*_
 2 __author__ = 'Alex Li'
 3 import threading
 4 import time
 5 import random
 6 
 7 def door():
 8     door_open_time_counter = 0
 9     while True:
10         if door_swiping_event.is_set():
11             print("\033[32;1mdoor opening....\033[0m")
12             door_open_time_counter +=1
13 
14         else:
15             print("\033[31;1mdoor closed...., swipe to open.\033[0m")
16             door_open_time_counter = 0 #清空计时器
17             door_swiping_event.wait()
18 
19 
20         if door_open_time_counter > 3:#门开了已经3s了,该关了
21             door_swiping_event.clear()
22 
23         time.sleep(0.5)
24 
25 
26 def staff(n):
27 
28     print("staff [%s] is comming..." % n )
29     while True:
30         if door_swiping_event.is_set():
31             print("\033[34;1mdoor is opened, passing.....\033[0m")
32             break
33         else:
34             print("staff [%s] sees door got closed, swipping the card....." % n)
35             print(door_swiping_event.set())
36             door_swiping_event.set()
37             print("after set ",door_swiping_event.set())
38         time.sleep(0.5)
39 door_swiping_event  = threading.Event() #设置事件
40 
41 
42 door_thread = threading.Thread(target=door)
43 door_thread.start()
44 
45 
46 
47 for i in range(5):
48     p = threading.Thread(target=staff,args=(i,))
49     time.sleep(random.randrange(3))
50     p.start()
复制代码

发表评论