多线程
实际上python的多线程不适合计算密集型应用, 因为实际上同一时间实际上只有一个线程会被执行, Python多线程更适用于I/O密集型应用, 当Python执行I/O时, 可以同时进行数据处理
_thread模块
Python2的thread模块, 在Python中改名为_thread模块, 是一个比较底层的多线程模块
- thread模块方法:
start_new_thread(fun, args, [kwargs=None])
: 执行新线程并返回线程编号, 使用给定的参数执行函数fun, 其中args
必须为元组, 而函数的名称参数使用字典kwargs
来指定allocate_lock()
: 分配一个LockType
对象exit()
: 结束线程(抛出SystemExit
异常, 如果没有被捕获将静默结束线程)
- LockType对象方法:
acquire(waitflag=1, timeout=-1)
: 请求锁, 返回布尔值表示获取成功与否, 如果锁正在使用, 则等待其释放, 当waitflag为0的时候, 表示只有当可以立即获取锁而无需等待时才会获取到锁; 而timeout则表示在最多等待的秒数(浮点数), 为负数表示无限等待locked()
: 返回锁对象状态, 如果被某个线程使用返回True
release()
: 释放锁
import _thread as thread
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec, lock):
print('start loop {0} at: {1}'.format(nloop, ctime()))
sleep(nsec)
print('loop {} done at; {}'.format(nloop, ctime()))
lock.release()
def main():
print('starting at: {}'.format(ctime()))
locks = []
nloops = range(len(loops))
for i in nloops:
lock = thread.allocate_lock()
lock.acquire()
locks.append(lock)
for i in nloops:
print(thread.start_new_thread(loop, (i, loops[i], locks[i])))
for i in nloops:
while locks[i].locked(): pass
print('all done at: {}'.format(ctime()))
if __name__ == '__main__': main()
threading模块
threading模块函数
- active_count()和enumerate(): 返回存活的线程个数
- current_thread(): 返回现在所在线程的Thread对象
- get_ident(): 返回当前线程的标识符(一个整数)
- main_thread(): 返回主线程的Thread对象
Thread对象
- 构造函数:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None)
- start(): 开始线程的活动
- run(): 线程的活动, 如果通过派生Thread方式创建线程, run()函数需要重写, 否则run()函数调用target
- join(): 对Thread对象调用join(), 将阻塞直到该Thread线程结束(也就是说会一直等待线程完成才会执行接下去的语句)
- name: 线程名字
- ident: 线程标识符
- is_alive(): 线程是否存活
- daemon: 是否为守护线程, 如果是守护线程, 则即使主线程结束, 该线程也会继续执行, 主线程会等待所有非守护线程结束才会结束, 线程默认继承父线程的守护线程模式
- 创建Thread实例, 并将函数传给他
import threading
from time import sleep, ctime
loops = [4, 2]
def loop(nloop, nsec):
print('start loop {0} at: {1}'.format(nloop, ctime()))
sleep(nsec)
print('loop {} done at; {}'.format(nloop, ctime()))
def main():
print('starting at: {}'.format(ctime()))
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop, args=(i, loops[i]), name=loop.__name__)
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all done at: {}'.format(ctime()))
if __name__ == '__main__': main()
- 创建Thread实例, 并将可调用的类实例传给它
import threading
from time import sleep, ctime
loops = [4, 2]
class ThreadFunc():
def __init__(self, func, args, name=''):
self.name = name
self.func = func
self.args = args
def __call__(self):
self.func(*self.args)
def loop(nloop, nsec):
print('start loop {0} at: {1}'.format(nloop, ctime()))
sleep(nsec)
print('loop {} done at; {}'.format(nloop, ctime()))
def main():
print('starting at: {}'.format(ctime()))
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all done at: {}'.format(ctime()))
if __name__ == '__main__': main()
- 派生Thread子类
import threading
from time import sleep, ctime
loops = [4, 2]
class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
super().__init__()
self.name = name
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def loop(nloop, nsec):
print('start loop {0} at: {1}'.format(nloop, ctime()))
sleep(nsec)
print('loop {} done at; {}'.format(nloop, ctime()))
def main():
print('starting at: {}'.format(ctime()))
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop, (i, loops[i]), loop.__name__)
threads.append(t)
for i in nloops:
threads[i].start()
for i in nloops:
threads[i].join()
print('all done at: {}'.format(ctime()))
if __name__ == '__main__': main()
Lock对象
同_thread模块, 有acquire()和release()方法
还有RLock对象, 与Lock对象不同的是, 它可以被同一线程acquire多次, 被acquire多次的锁必须被release同样的次数才会被释放
Condition
-
构造函数: Condition(lock=None): 可以给它传入一个Lock或RLock实例, 否则默认自动创建一个RLock实例
-
acquire(): 请求锁
-
release(): 释放锁
-
wait(): 释放锁并阻塞直到其他线程调用notify_all或者notify唤醒, 唤醒后将再次请求锁(必须保证当前已获得锁, 否则抛出RuntimeError, 下面三个函数也是)
-
wait_for(pre, timeout=None): 等同
while not predicate(): cv.wait()
-
notify(n=1): 唤醒其它正在等待的线程, 默认唤醒一个(按先入先出), 不会释放锁
-
notify_all(): 唤醒全部
-
Condition支持上下文控制语句(with)
-
生产者消费者问题:
import threading from time import sleep, ctime from random import randrange con = threading.Condition() items = [] max_item = 5 now = 1 class Consumer(): def __call__(self): while True: with con: while len(items) == 0: print('waiting...') con.wait() x = items.pop() print('consume', x) sleep(randrange(3)/10) class Producer(): def __call__(self): global now while True: with con: while len(items) == max_item: print('p waiting...') con.wait() items.append(now) print('produce', now) now += 1 con.notify() sleep(randrange(3)/10) threading.Thread(target=Consumer()).start() threading.Thread(target=Producer()).start()
Semaphore
信号量对象, 通过管理一个内部的计数器来实现同步, 两个函数acquire和release相当于PV操作
- 构造函数threading.Semaphore(value=1)
- acquire(blocking=True, timeout=None): 如果计数器值>0, 则将其减一然后返回True回到主线程继续执行, 否则阻塞直到计数器大于0, 如果参数blocking=False, 则不会阻塞而是直接返回false
- release(): 将计数器+1
- threading.BoundSemaphore(value=1): 信号量类, 与Semaphore区别是BoundSemaphore计数器的值不能大于初始值否则ValueError
Event
管理一个标记flag, flag初始为False
- is_set(): 返回内部flag值(True或False)
- set(): 将flag设置为True并且激活阻塞线程
- clear(): 将flag设置为False
- wait(timeout=None): 阻塞指导flag为True
Timer
计时器, 等待指定的时长后后执行
- threading.Timer(interval, function, args=None, kwargs=None): interval秒后执行function
- cancel(): 取消, 前提是还没有执行
Barrier
如其名, 像一个栅栏, 直到指定个数的线程都在wait后, 同时释放这些进程
可以被多次使用, 每次使用结束后进入broken状态, 需要reset后才能再次使用
- threading.Barrier(parties, action=None, timeout=None): parties为线程个数(释放所有线程需要达到的线程个数), 可选参数action是一个可执行的函数或对象, 当线程被释放时, 会有一个线程调用它
- wait(timeout=None): 阻塞直到有达到数量要求, 返回一个数字, 从0到parties-1, 每个线程的返回值都不相同, 可以用来指定单独一个线程执行某个特殊人物
- reset(): 将Barrier回复为初始状态, 如果还有线程在阻塞, 线程会收到BrokenBarrierError异常
- abort(): 使得对象进入broken状态, 所有正在等待的线程会收到BrokenBarrierError异常
- parties: 线程数
- n_waiting: 阻塞线程数
- broken: 如果为True则为broken状态
queue模块
提供了几个队列, Queue(先进先出), LifoQueue(后进先出), PriorityQueue(优先队列), 用来简化类似生产者消费者问题的进程同步问题
- get(block=True, timeout = None): 出队队首元素, 当队列为空, 如果block=True则阻塞直到队列不为空, 否则抛出Empty异常
- put(item, block=True, timeout=None): 元素入队, 如果超过队列容量, 当block=True时阻塞直到可以入队, 否则抛出Full异常
- task_done(): get()后调用, 表示任务完成, 用于join命令, 如果task_done调用次数大于所有入队元素个数, 抛出ValueError
- join(): 阻塞主线程直到所有队列中的元素都处理完成(调用了同样次数的task_done)