跳转至

脚本开发 / 线程池 DFF.THREAD

此功能于 3.3.1 版本新增

DataFlux Func 内置了线程池功能DFF.THREAD,用户可以直接使用并由系统自动管理线程池的开启和关闭。

对于一些 IO 密集的处理(如大量 HTTP 请求),可以使用线程池提高处理效率。

DFF.THREAD.set_pool_size(...)

设置线程池大小必须在首次调用DFF.THREAD.submit(...)之前进行

DFF.THREAD.set_pool_size(...)方法用户设置线程池大小(线程池默认大小为5

参数 类型 必须/默认值 说明
pool_size int 5 线程池大小

示例如下:

Python
1
DFF.THREAD.set_pool_size(10)

DFF.THREAD.submit(...)

DFF.THREAD.submit(...)方法用于使用线程池调用一个函数

参数 类型 必须/默认值 说明
fn function 必须 线程执行函数
*args - () 函数位置参数
*kwargs - {} 函数命名参数

返回值为一个字符串,用于标记此函数执行的 Key,可在后续配合DFF.THREAD.get_result(...)获取指定任务的结果。

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    # 以下两种调用方式等价
    key = DFF.THREAD.submit(fn, 1)
    print(key)

    key = DFF.THREAD.submit(fn, sleep_time=1)
    print(key)

# 输出:
# thread-result-xxxxx
# thread-result-yyyyy

DFF.THREAD.pop_result(...)

DFF.THREAD.pop_result(...)方法用于从线程池的执行函数结果中弹出一个已完成的结果。

已弹出的函数结果不会再在DFF.THREAD.get_result(...)DFF.THREAD.get_all_results(...)中返回

参数 类型 必须/默认值 说明
wait bool True 是否等待结果(即是否阻塞)

返回值为FuncThreadResult对象。

FuncThreadResult对象可通过.value属性获取函数执行返回值,.error属性获取抛出的错误。

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    while True:
        result = DFF.THREAD.pop_result()
        if result:
            print(f"{result.value}, {repr(result.error)}")
        else:
            break

# 输出:
# None, Exception('Sleep too long')
# 1, None
# 2, None

DFF.THREAD.get_all_results(...)

DFF.THREAD.get_all_results(...)方法用于获取全部线程执行函数结果。

参数 类型 必须/默认值 说明
wait bool True 是否等待结果(即是否阻塞)

返回值为FuncThreadResult对象列表。

FuncThreadResult对象可通过.value属性获取函数执行返回值,.error属性获取抛出的错误。

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    # 获取全部结果并遍历
    for result in DFF.THREAD.get_all_results():
        print(f"{result.value}, {repr(result.error)}")

# 输出:
# None, Exception('Sleep too long')
# 2, None
# 1, None

DFF.THREAD.get_result(...)

DFF.THREAD.get_result(...)方法用于获取某个线程执行函数结果

返回值为FuncThreadResult对象。

FuncThreadResult对象可通过.value属性获取函数执行返回值,.error属性获取抛出的错误。

参数 类型 必须/默认值 说明
key str 必须 待获取结果的函数执行 Key(由DFF.THREAD.submit(...)返回)
wait bool True 是否等待结果(即是否阻塞)

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    key = DFF.THREAD.submit(fn, 1)

    # 获取指定任务结果
    result = DFF.THREAD.get_result(key=key)
    print(f"{result.value}, {repr(result.error)}")

# 输出:1, None

DFF.THREAD.is_all_finished

DFF.THREAD.is_all_finished属性用于判断当前线程池中函数是否全部执行完毕

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    print(DFF.THREAD.is_all_finished)
    time.sleep(3)
    print(DFF.THREAD.is_all_finished)

# 输出:
# False
# True

DFF.THREAD.wait_all_finished(...)

DFF.THREAD.wait_all_finished(...)方法用于判断当前线程池中函数全部执行完毕

示例如下:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    DFF.THREAD.wait_all_finished()
    print('Finished')

# 输出:
# Finished

DFF.THREAD.pop_result(...) VS DFF.THREAD.get_all_results(...)

获取线程池函数执行结果的两种方式略有不同:

  1. DFF.THREAD.pop_result(...)更适合相互之间独立的任务,任意一个任务有结果后立即进入后续处理的场景
  2. DFF.THREAD.get_all_results(...)则适合任务之间存在关联或依赖,需要全部完成后再一起进入后续处理的场景