Back

Python线程池使用

介绍

从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。

相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:

1.主线程可以获取某一个线程(或者任务的)的状态,以及返回值。

2.当一个线程完成的时候,主线程能够立即知道。

3.让多线程和多进程的编码接口一致。

基本使用

from concurrent.futures import ThreadPoolExecutor
import time


def get_page(url):
    time.sleep(url)
    return url


with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    task1 = t.submit(get_page, 1)
    task2 = t.submit(get_page, 2)  # 通过submit提交执行的函数到线程池中
    task3 = t.submit(get_page, 3)

    print(f"task1: {task1.done()}")  # 通过done来判断线程是否完成
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  # 通过result来获取返回值

>>> task1: False
    task2: False
    task3: False 
    ...
    task1: True
    task2: True
    task3: False

Api

as_completed

concurrent.futures.as_completed(fs, timeout=None)

返回一个生成器在迭代过程中会阻塞

直到线程完成或者异常时,返回一个被set_result的Future对象

此方法的返回顺序为哪个线程先失败/完成就返回

from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def get_page(url):
    time.sleep(url)
    return url


with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    tasks = [t.submit(get_page, page) for page in range(1, 5)]
    
    for future in as_completed(tasks):
        result = future.result()
        print(result)

>>> 1
    2
    3
    4

wait

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

fs: 执行的序列

timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回

return_when: 表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回

  • FIRST_COMPLETED
函数将在任意可等待对象结束或取消时返回。
  • FIRST_EXCEPTION
函数将在任意可等待对象因引发异常而结束时返回。
当没有引发任何异常时它就相当于 ALL_COMPLETED。
  • ALL_COMPLETED
函数将在所有可等待对象结束或取消时返回。
from concurrent.futures import ThreadPoolExecutor, wait
import time


def get_page(url):
    time.sleep(url)
    return url


with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    tasks = [t.submit(get_page, page) for page in range(1, 5)]

    a, b = wait(tasks)
    print(a)
    print(b)

>>> {<Future at 0x1c071fb1f28 state=finished returned int>, <Future at 0x1c071fb1d68 state=finished returned int>, <Future at 0x1c071f9fd68 state=finished returned int>, <Future at 0x1c071d78278 state=finished returned int>} 
    set()

map

*concurrent.futures.Executor.map(fn, iterables, timeout=None)

fn: 第一个参数 fn 是需要线程执行的函数

*iterables: 第二个参数接受一个可迭代对象

timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError

from concurrent.futures import ThreadPoolExecutor
import time


def get_page(url):
    time.sleep(url)
    return url


URLS = [url for url in range(1, 4)]

with ThreadPoolExecutor(max_workers=5) as executor:  # 创建一个最大容纳数量为5的线程池
    for result in executor.map(get_page, URLS):
        print(result)

>>> 1
    2
    3

回调函数

回调函数(add_done_callback)是在调用线程完成后再调用的

from concurrent.futures import ThreadPoolExecutor, wait
import threading
import time


def get_page(url):
    time.sleep(url)
    return url


def call_back(worker):
    print(f'tid: {threading.current_thread().ident}', worker.result())


with ThreadPoolExecutor() as t:
    tasks = []
    for page in range(1, 5):
        task = t.submit(get_page, url=page)
        task.add_done_callback(call_back)
        tasks.append(task)
    wait(tasks)

>>> tid: 6392  1
    tid: 14936 2
    tid: 12516 3
    tid: 10524 4

异常处理

  • 通过添加回调函数的方法处理异常
import logging


def executor_callback(worker):
    logging.info(f'finished')
    worker_exception = worker.exception()
    if worker_exception:
        logging.exception(worker_exception)

备注

  • 一定使用with关键字处理线程池,在某些情况下线程池可能不能自动回收线程资源,with可以避免内存持续增长等情况
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy