夜间模式暗黑模式
字体
阴影
滤镜
圆角
主题色
Python学习笔记14-多线程

多线程

实际上python的多线程不适合计算密集型应用, 因为实际上同一时间实际上只有一个线程会被执行, Python多线程更适用于I/O密集型应用, 当Python执行I/O时, 可以同时进行数据处理

_thread模块

Python2的thread模块, 在Python中改名为_thread模块, 是一个比较底层的多线程模块

  • thread模块方法:
    • start_new_thread(fun, args, [kwargs=None]): 执行新线程并返回线程编号, 使用给定的参数执行函数fun, 其中args必须为元组, 而函数的名称参数使用字典kwargs来指定
    • allocate_lock(): 分配一个LockType对象
    • exit(): 结束线程(抛出SystemExit异常, 如果没有被捕获将静默结束线程)
  • LockType对象方法:
    • acquire(waitflag=1, timeout=-1): 请求锁, 返回布尔值表示获取成功与否, 如果锁正在使用, 则等待其释放, 当waitflag为0的时候, 表示只有当可以立即获取锁而无需等待时才会获取到锁; 而timeout则表示在最多等待的秒数(浮点数), 为负数表示无限等待
    • locked(): 返回锁对象状态, 如果被某个线程使用返回True
    • release(): 释放锁
import _thread as thread
from time import sleep, ctime

loops = [4, 2]

def loop(nloop, nsec, lock):
    print('start loop {0} at: {1}'.format(nloop, ctime()))
    sleep(nsec)
    print('loop {} done at; {}'.format(nloop, ctime()))
    lock.release()

def main():
    print('starting at: {}'.format(ctime()))
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        print(thread.start_new_thread(loop, (i, loops[i], locks[i])))

    for i in nloops:
        while locks[i].locked(): pass

    print('all done at: {}'.format(ctime()))


if __name__ == '__main__': main()

threading模块

threading模块函数

  • active_count()和enumerate(): 返回存活的线程个数
  • current_thread(): 返回现在所在线程的Thread对象
  • get_ident(): 返回当前线程的标识符(一个整数)
  • main_thread(): 返回主线程的Thread对象

Thread对象

  • 构造函数:threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None)
  • start(): 开始线程的活动
  • run(): 线程的活动, 如果通过派生Thread方式创建线程, run()函数需要重写, 否则run()函数调用target
  • join(): 对Thread对象调用join(), 将阻塞直到该Thread线程结束(也就是说会一直等待线程完成才会执行接下去的语句)
  • name: 线程名字
  • ident: 线程标识符
  • is_alive(): 线程是否存活
  • daemon: 是否为守护线程, 如果是守护线程, 则即使主线程结束, 该线程也会继续执行, 主线程会等待所有非守护线程结束才会结束, 线程默认继承父线程的守护线程模式
  • 创建Thread实例, 并将函数传给他
import threading
from time import sleep, ctime

loops = [4, 2]


def loop(nloop, nsec):
    print('start loop {0} at: {1}'.format(nloop, ctime()))
    sleep(nsec)
    print('loop {} done at; {}'.format(nloop, ctime()))
    

def main():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]), name=loop.__name__)
        threads.append(t)
        
    for i in nloops:
        threads[i].start()
        
    for i in nloops:
        threads[i].join()

    print('all done at: {}'.format(ctime()))


if __name__ == '__main__': main()

  • 创建Thread实例, 并将可调用的类实例传给它
import threading
from time import sleep, ctime

loops = [4, 2]

class ThreadFunc():
    def __init__(self, func, args, name=''):
        self.name = name
        self.func = func
        self.args = args

    def __call__(self):
        self.func(*self.args)


def loop(nloop, nsec):
    print('start loop {0} at: {1}'.format(nloop, ctime()))
    sleep(nsec)
    print('loop {} done at; {}'.format(nloop, ctime()))
    

def main():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)
        
    for i in nloops:
        threads[i].start()
        
    for i in nloops:
        threads[i].join()

    print('all done at: {}'.format(ctime()))


if __name__ == '__main__': main()

  • 派生Thread子类
import threading
from time import sleep, ctime

loops = [4, 2]


class MyThread(threading.Thread):
    def __init__(self, func, args, name=''):
        super().__init__()
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

    
def loop(nloop, nsec):
    print('start loop {0} at: {1}'.format(nloop, ctime()))
    sleep(nsec)
    print('loop {} done at; {}'.format(nloop, ctime()))
    

def main():
    print('starting at: {}'.format(ctime()))
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)
        
    for i in nloops:
        threads[i].start()
        
    for i in nloops:
        threads[i].join()

    print('all done at: {}'.format(ctime()))


if __name__ == '__main__': main()

Lock对象

同_thread模块, 有acquire()和release()方法

还有RLock对象, 与Lock对象不同的是, 它可以被同一线程acquire多次, 被acquire多次的锁必须被release同样的次数才会被释放

Condition

  • 构造函数: Condition(lock=None): 可以给它传入一个Lock或RLock实例, 否则默认自动创建一个RLock实例

  • acquire(): 请求锁

  • release(): 释放锁

  • wait(): 释放锁并阻塞直到其他线程调用notify_all或者notify唤醒, 唤醒后将再次请求锁(必须保证当前已获得锁, 否则抛出RuntimeError, 下面三个函数也是)

  • wait_for(pre, timeout=None): 等同while not predicate(): cv.wait()

  • notify(n=1): 唤醒其它正在等待的线程, 默认唤醒一个(按先入先出), 不会释放锁

  • notify_all(): 唤醒全部

  • Condition支持上下文控制语句(with)

  • 生产者消费者问题:

    import threading
    from time import sleep, ctime
    from random import randrange
    
    con = threading.Condition()
    items = []
    max_item = 5
    now = 1
    
    
    class Consumer():
        def __call__(self):
            while True:
                with con:
                    while len(items) == 0:
                        print('waiting...')
                        con.wait()
                    x = items.pop()
                    print('consume', x)
                    sleep(randrange(3)/10)
    
    class Producer():
        def __call__(self):
            global now
            while True:
                with con:
                    while len(items) == max_item:
                        print('p waiting...')
                        con.wait()
                    items.append(now)
                    print('produce', now)
                    now += 1
                    con.notify()
                    sleep(randrange(3)/10)
    
    threading.Thread(target=Consumer()).start()
    threading.Thread(target=Producer()).start()
    

Semaphore

信号量对象, 通过管理一个内部的计数器来实现同步, 两个函数acquire和release相当于PV操作

  • 构造函数threading.Semaphore(value=1)
  • acquire(blocking=True, timeout=None): 如果计数器值>0, 则将其减一然后返回True回到主线程继续执行, 否则阻塞直到计数器大于0, 如果参数blocking=False, 则不会阻塞而是直接返回false
  • release(): 将计数器+1
  • threading.BoundSemaphore(value=1): 信号量类, 与Semaphore区别是BoundSemaphore计数器的值不能大于初始值否则ValueError

Event

管理一个标记flag, flag初始为False

  • is_set(): 返回内部flag值(True或False)
  • set(): 将flag设置为True并且激活阻塞线程
  • clear(): 将flag设置为False
  • wait(timeout=None): 阻塞指导flag为True

Timer

计时器, 等待指定的时长后后执行

  • threading.Timer(interval, function, args=None, kwargs=None): interval秒后执行function
  • cancel(): 取消, 前提是还没有执行

Barrier

如其名, 像一个栅栏, 直到指定个数的线程都在wait后, 同时释放这些进程

可以被多次使用, 每次使用结束后进入broken状态, 需要reset后才能再次使用

  • threading.Barrier(parties, action=None, timeout=None): parties为线程个数(释放所有线程需要达到的线程个数), 可选参数action是一个可执行的函数或对象, 当线程被释放时, 会有一个线程调用它
  • wait(timeout=None): 阻塞直到有达到数量要求, 返回一个数字, 从0到parties-1, 每个线程的返回值都不相同, 可以用来指定单独一个线程执行某个特殊人物
  • reset(): 将Barrier回复为初始状态, 如果还有线程在阻塞, 线程会收到BrokenBarrierError异常
  • abort(): 使得对象进入broken状态, 所有正在等待的线程会收到BrokenBarrierError异常
  • parties: 线程数
  • n_waiting: 阻塞线程数
  • broken: 如果为True则为broken状态

queue模块

提供了几个队列, Queue(先进先出), LifoQueue(后进先出), PriorityQueue(优先队列), 用来简化类似生产者消费者问题的进程同步问题

  • get(block=True, timeout = None): 出队队首元素, 当队列为空, 如果block=True则阻塞直到队列不为空, 否则抛出Empty异常
  • put(item, block=True, timeout=None): 元素入队, 如果超过队列容量, 当block=True时阻塞直到可以入队, 否则抛出Full异常
  • task_done(): get()后调用, 表示任务完成, 用于join命令, 如果task_done调用次数大于所有入队元素个数, 抛出ValueError
  • join(): 阻塞主线程直到所有队列中的元素都处理完成(调用了同样次数的task_done)

暂无评论

发送评论


				
上一篇
下一篇