使用 @jit
进行自动并行化
为 parallel 选项设置 jit()
可启用一个 Numba 转换过程,该过程尝试对函数(的一部分)进行自动并行化和执行其他优化。目前,此功能仅在 CPU 上有效。
用户定义函数中的某些操作,例如将标量值添加到数组,已知具有并行语义。用户程序可能包含许多此类操作,虽然每个操作都可以单独并行化,但这种方法通常由于缓存行为不佳而导致性能不佳。相反,通过自动并行化,Numba 尝试识别用户程序中的此类操作,并将相邻的操作融合在一起,以形成一个或多个自动并行运行的内核。整个过程完全自动化,无需修改用户程序,这与 Numba 的 vectorize()
或 guvectorize()
机制形成对比,在后一种机制中,需要手动创建并行内核。
支持的操作
在本节中,我们列出了所有具有并行语义并尝试进行并行化的数组操作。
所有 Numba 数组操作,这些操作受 案例研究:数组表达式 支持,包括 Numpy 数组之间以及数组与标量之间的常见算术函数,以及 Numpy ufuncs。它们通常被称为 逐元素 或 逐点 数组操作
一元运算符:
+
-
~
二元运算符:
+
-
*
/
/?
%
|
>>
^
<<
&
**
//
比较运算符:
==
!=
<
<=
>
>=
在 nopython 模式 中受支持的 Numpy ufuncs。
通过
vectorize()
用户定义的DUFunc
。
Numpy 归约函数
sum
,prod
,min
,max
,argmin
和argmax
。此外,数组数学函数mean
,var
和std
。Numpy 数组创建函数
zeros
,ones
,arange
,linspace
,以及几种随机函数 (rand, randn, ranf, random_sample, sample, random, standard_normal, chisquare, weibull, power, geometric, exponential, poisson, rayleigh, normal, uniform, beta, binomial, f, gamma, lognormal, laplace, randint, triangular)。Numpy
dot
函数在矩阵和向量之间,或两个向量之间。在所有其他情况下,使用 Numba 的默认实现。上述操作也支持多维数组,条件是操作数具有匹配的维度和大小。不支持具有混合维度或大小的数组之间 Numpy 广播的完整语义,也不支持跨选定维度的归约。
数组赋值,其中目标是使用切片或布尔数组进行的数组选择,并且分配的值是标量或另一个切片范围或位数组被推断为兼容的选择。
functools
的reduce
运算符支持对一维 Numpy 数组指定并行归约,但初始值参数是强制性的。
显式并行循环
代码转换过程的另一个功能(当 parallel=True
时)是支持显式并行循环。可以使用 Numba 的 prange
代替 range
来指定一个循环可以并行化。用户需要确保循环没有跨迭代依赖,除了受支持的归约。
如果循环体内通过支持的二元函数/运算符使用其先前值更新变量,则会自动推断出归约。支持以下函数/运算符:+=
, +
, -=
, -
, *=
, *
, /=
, /
, max()
, min()
。对于受支持的运算符(即,不是 max
和 min
函数),归约的初始值会自动推断出来。请注意,不支持 //=
运算符,因为在一般情况下,结果取决于除数应用的顺序。但是,如果所有除数都是整数,则程序员可能能够将 //=
归约重写为 *=
归约,然后在并行区域之后进行一次地板除法,其中除数是累积积。对于 max
和 min
函数,归约变量在进入 prange
循环之前应保存恒等值。以这种方式进行的归约支持标量和任意维度的数组。
以下示例演示了一个带有归约的并行循环(A
是一个一维 Numpy 数组)
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
以下示例演示了对一个二维数组进行乘积归约
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def two_d_array_reduction_prod(n):
shp = (13, 17)
result1 = 2 * np.ones(shp, np.int_)
tmp = 2 * np.ones_like(result1)
for i in prange(n):
result1 *= tmp
return result1
注意
当使用 Python 的 range
创建循环时,Numba 将归纳变量类型化为有符号整数。当 parallel=False
时,Numba 的 prange
也是如此。然而,对于 parallel=True
,如果范围被识别为严格正数,则归纳变量的类型将是 uint64
。uint64
归纳变量的影响通常在涉及它和有符号整数的操作中最明显。根据 Numba 的类型强制规则,这种情况下操作通常会产生浮点结果类型。
注意
只有具有单个入口块和单个出口块的 prange 循环才能被转换成并行运行。循环中的异常控制流,例如断言,可能生成多个出口块,导致循环无法并行运行。如果出现这种情况,Numba 将发出警告,指出哪个循环无法并行化。
然而,需要注意的是,当归约到数组的切片或元素中时,如果切片或索引指定的元素同时被多个并行线程写入,编译器可能无法检测到此类情况,此时就会发生竞态条件。
以下示例演示了这样一种情况,其中并行 for 循环执行中的竞态条件导致返回不正确的值
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_wrong_result(x):
n = x.shape[0]
y = np.zeros(4)
for i in prange(n):
# accumulating into the same element of `y` from different
# parallel iterations of the loop results in a race condition
y[:] += x[i]
return y
如下例所示,其中累加元素是显式指定的
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_wrong_result(x):
n = x.shape[0]
y = np.zeros(4)
for i in prange(n):
# accumulating into the same element of `y` from different
# parallel iterations of the loop results in a race condition
y[i % 4] += x[i]
return y
而执行整个数组归约则没有问题
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_ok_result_whole_arr(x):
n = x.shape[0]
y = np.zeros(4)
for i in prange(n):
y += x[i]
return y
在并行归约循环外部创建切片引用也是如此
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_ok_result_outer_slice(x):
n = x.shape[0]
y = np.zeros(4)
z = y[:]
for i in prange(n):
z += x[i]
return y
示例
在本节中,我们举例说明此功能如何帮助并行化逻辑回归
@numba.jit(nopython=True, parallel=True)
def logistic_regression(Y, X, w, iterations):
for i in range(iterations):
w -= np.dot(((1.0 / (1.0 + np.exp(-Y * np.dot(X, w))) - 1.0) * Y), X)
return w
我们不讨论算法的细节,而是关注此程序在自动并行化下的行为
输入
Y
是一个大小为N
的向量,X
是一个N x D
矩阵,而w
是一个大小为D
的向量。函数体是一个迭代循环,它更新变量
w
。循环体由一系列向量和矩阵操作组成。内部的
dot
操作生成一个大小为N
的向量,然后是一系列算术操作,要么在标量和大小为N
的向量之间,要么在两个大小都为N
的向量之间。外部的
dot
操作生成一个大小为D
的向量,然后对变量w
进行原地数组减法。通过自动并行化,所有生成大小为
N
的数组的操作都会被融合在一起,形成一个单独的并行内核。这包括内部的dot
操作及其后的所有逐点数组操作。外部的
dot
操作生成一个不同维度的结果数组,并且没有与上述内核融合。
在这里,利用并行硬件唯一需要做的就是为 parallel 选项设置 jit()
,无需修改 logistic_regression
函数本身。如果我们要使用 guvectorize()
提供一个等效的并行实现,这将需要进行全面的修改,重写代码以提取可以并行化的内核计算,这既繁琐又具有挑战性。
不支持的操作
本节包含一个不完全列表,列出了常见但目前不支持的功能
修改列表不是线程安全的
对容器类型(即列表、集合和字典)的并发写入操作在
prange
并行区域中不是线程安全的,例如@njit(parallel=True) def invalid(): z = [] for i in prange(10000): z.append(i) return z
上述操作很可能导致数据损坏或访问冲突,因为容器在修改时需要线程安全,但此功能尚未实现。
归纳变量不与线程 ID 相关联
使用基于
prange
循环产生的归纳变量结合get_num_threads
作为确保安全写入预设大小容器的方法是无效的,例如@njit(parallel=True) def invalid(): n = get_num_threads() z = [0 for _ in range(n)] for i in prange(100): z[i % n] += i return z
上述方法有时可能看似有效,但这纯属偶然。无法保证哪些索引分配给哪些执行线程,也无法保证循环迭代的执行顺序。
诊断
注意
目前,并非所有并行转换和函数都可以通过代码生成过程进行跟踪。有时可能会缺少有关某些循环或转换的诊断信息。
为 parallel 选项设置 jit()
可以生成关于自动并行化装饰代码所进行的转换的诊断信息。可以通过两种方式访问此信息,第一种是通过设置环境变量 NUMBA_PARALLEL_DIAGNOSTICS
,第二种是通过调用 parallel_diagnostics()
,两种方法都提供相同的信息并打印到 STDOUT
。诊断信息的详细程度由一个整数参数控制,其值在 1 到 4 之间(含),1 表示最不详细,4 表示最详细。例如
@njit(parallel=True)
def test(x):
n = x.shape[0]
a = np.sin(x)
b = np.cos(a * a)
acc = 0
for i in prange(n - 2):
for j in prange(n - 1):
acc += b[i] + b[j + 1]
return acc
test(np.arange(10))
test.parallel_diagnostics(level=4)
产生
================================================================================
======= Parallel Accelerator Optimizing: Function test, example.py (4) =======
================================================================================
Parallel loop listing for Function test, example.py (4)
--------------------------------------|loop #ID
@njit(parallel=True) |
def test(x): |
n = x.shape[0] |
a = np.sin(x)---------------------| #0
b = np.cos(a * a)-----------------| #1
acc = 0 |
for i in prange(n - 2):-----------| #3
for j in prange(n - 1):-------| #2
acc += b[i] + b[j + 1] |
return acc |
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
Trying to fuse loops #0 and #1:
- fusion succeeded: parallel for-loop #1 is fused into for-loop #0.
Trying to fuse loops #0 and #3:
- fusion failed: loop dimension mismatched in axis 0. slice(0, x_size0.1, 1)
!= slice(0, $40.4, 1)
----------------------------- Before Optimization ------------------------------
Parallel region 0:
+--0 (parallel)
+--1 (parallel)
Parallel region 1:
+--3 (parallel)
+--2 (parallel)
--------------------------------------------------------------------------------
------------------------------ After Optimization ------------------------------
Parallel region 0:
+--0 (parallel, fused with loop(s): 1)
Parallel region 1:
+--3 (parallel)
+--2 (serial)
Parallel region 0 (loop #0) had 1 loop(s) fused.
Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part
of the larger parallel loop (#3).
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
---------------------------Loop invariant code motion---------------------------
Instruction hoisting:
loop #0:
Failed to hoist the following:
dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99)
dependency: $0.6.11 = getattr(value=$0.5, attr=sin)
dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None)
dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
dependency: $0.10.20 = getattr(value=$0.9, attr=cos)
dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None)
loop #3:
Has the following hoisted:
$const58.3 = const(int, 1)
$58.4 = _n_23 - $const58.3
--------------------------------------------------------------------------------
为了帮助不熟悉在使用 parallel 选项时所进行的转换的用户,并有助于理解后续章节,提供了以下定义
- 循环融合
循环融合 是一种技术,通过该技术,具有等效边界的循环在某些条件下可以组合,以产生一个具有更大循环体的循环(旨在改善数据局部性)。
- 循环序列化
当任意数量的由
prange
驱动的循环存在于另一个由prange
驱动的循环内部时,会发生循环序列化。在这种情况下,所有prange
循环中最外层的循环并行执行,任何内部的prange
循环(无论是嵌套的还是其他的)都被视为标准的range
循环。本质上,嵌套并行不会发生。
- 循环不变代码外提
循环不变代码外提 是一种优化技术,它分析循环以查找可以在不改变循环执行结果的情况下移出循环体的语句,然后将这些语句“提升”到循环之外,以节省重复计算。
- 分配外提
分配外提是循环不变代码外提的一种特殊情况,由于某些常见的 NumPy 分配方法的设计,这种优化成为可能。这种技术的解释最好通过一个例子来说明
@njit(parallel=True) def test(n): for i in prange(n): temp = np.zeros((50, 50)) # <--- Allocate a temporary array with np.zeros() for j in range(50): temp[j, j] = i # ...do something with temp
在内部,这大致转换为以下形式
@njit(parallel=True) def test(n): for i in prange(n): temp = np.empty((50, 50)) # <--- np.zeros() is rewritten as np.empty() temp[:] = 0 # <--- and then a zero initialisation for j in range(50): temp[j, j] = i # ...do something with temp
然后在外提之后
@njit(parallel=True) def test(n): temp = np.empty((50, 50)) # <--- allocation is hoisted as a loop invariant as `np.empty` is considered pure for i in prange(n): temp[:] = 0 # <--- this remains as assignment is a side effect for j in range(50): temp[j, j] = i # ...do something with temp
可以看出,
np.zeros
分配被分成一个分配和一个赋值,然后该分配被从i
循环中外提,这会产生更高效的代码,因为分配只发生一次。
并行诊断报告部分
报告分为以下几个部分
- 代码注释
这是第一部分,包含装饰函数的源代码,其中识别并列举了具有并行语义的循环。源代码右侧的
loop #ID
列与已识别的并行循环对齐。从示例来看,#0
是np.sin
,#1
是np.cos
,#2
和#3
是prange()
Parallel loop listing for Function test, example.py (4) --------------------------------------|loop #ID @njit(parallel=True) | def test(x): | n = x.shape[0] | a = np.sin(x)---------------------| #0 b = np.cos(a * a)-----------------| #1 acc = 0 | for i in prange(n - 2):-----------| #3 for j in prange(n - 1):-------| #2 acc += b[i] + b[j + 1] | return acc |
值得注意的是,循环 ID 是按照它们被发现的顺序枚举的,这不一定与源代码中存在的顺序相同。此外,还应注意并行转换使用静态计数器进行循环 ID 索引。因此,由于内部优化/转换使用相同的计数器,这些优化/转换对用户是不可见的,所以循环 ID 索引可能不会从 0 开始。
- 融合循环
本节描述了尝试融合已发现循环的情况,并指出哪些成功,哪些失败。如果融合失败,会给出原因(例如依赖于其他数据)。从示例来看
--------------------------------- Fusing loops --------------------------------- Attempting fusion of parallel loops (combines loops with similar properties)... Trying to fuse loops #0 and #1: - fusion succeeded: parallel for-loop #1 is fused into for-loop #0. Trying to fuse loops #0 and #3: - fusion failed: loop dimension mismatched in axis 0. slice(0, x_size0.1, 1) != slice(0, $40.4, 1)
可以看出,尝试了循环
#0
和#1
的融合,并且成功了(两者都基于x
的相同维度)。在#0
和#1
成功融合之后,尝试在#0
(现在包括已融合的#1
循环)和#3
之间进行融合。这次融合失败了,因为存在循环维度不匹配,#0
的大小是x.shape
,而#3
的大小是x.shape[0] - 2
。
- 优化前
本节展示了代码中并行区域的结构,在任何优化发生之前,但循环与其最终并行区域相关联(这是为了使优化前/后的输出可以直接比较)。如果存在无法融合的循环,则可能存在多个并行区域,在这种情况下,每个区域内的代码将并行执行,但每个并行区域将按顺序运行。从示例来看
Parallel region 0: +--0 (parallel) +--1 (parallel) Parallel region 1: +--3 (parallel) +--2 (parallel)
正如 融合循环 部分所暗示的,代码中必然存在两个并行区域。第一个包含循环
#0
和#1
,第二个包含#3
和#2
,所有循环都标记为parallel
,因为尚未进行优化。
- 优化后
本节展示了代码中并行区域在优化后的结构。同样,并行区域与其相应的循环一起被列举出来,但这次会注明哪些循环被融合或序列化,并给出总结。从示例来看
Parallel region 0: +--0 (parallel, fused with loop(s): 1) Parallel region 1: +--3 (parallel) +--2 (serial) Parallel region 0 (loop #0) had 1 loop(s) fused. Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part of the larger parallel loop (#3).
可以看出并行区域 0 包含循环
#0
并且,如 融合循环 部分所示,循环#1
已融合到循环#0
中。还可以注意到并行区域 1 包含循环#3
并且循环#2
(内部的prange()
)已序列化以便在循环#3
的主体中执行。
- 循环不变代码外提
本节展示了每个循环在优化后发生的情况
未能外提的指令以及失败原因(依赖/不纯)。
已外提的指令。
可能发生的任何分配外提。
从示例来看
Instruction hoisting: loop #0: Failed to hoist the following: dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99) dependency: $0.6.11 = getattr(value=$0.5, attr=sin) dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None) dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9 dependency: $0.10.20 = getattr(value=$0.9, attr=cos) dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None) loop #3: Has the following hoisted: $const58.3 = const(int, 1) $58.4 = _n_23 - $const58.3
首先需要注意的是,此信息面向高级用户,因为它涉及正在转换的函数的 Numba IR。例如,示例源代码中的表达式
a * a
在 IR 中部分转换为表达式$arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
,这显然不能从loop #0
中外提,因为它不是循环不变的!而在loop #3
中,表达式$const58.3 =
const(int, 1)
来自源代码b[j + 1]
,数字1
显然是常量,因此可以从循环中外提。
调度
默认情况下,Numba 将并行区域的迭代分成大致相等大小的块,并将一个这样的块分配给每个配置的线程。(参见 设置线程数)。这种调度方法等同于 OpenMP 的静态调度,没有指定块大小,并且适用于每个迭代所需工作量几乎恒定的情况。相反,如果每个迭代所需的工作量(如下面的 prange
循环所示)显著变化,则这种静态调度方法可能导致负载不平衡和更长的执行时间。
test_unbalanced_example
的 numba/tests/doc_examples/test_parallel_chunksize.py
1from numba import (njit,
2 prange,
3 )
4import numpy as np
5
6@njit(parallel=True)
7def func1():
8 n = 100
9 vals = np.empty(n)
10 # The work in each iteration of the following prange
11 # loop is proportional to its index.
12 for i in prange(n):
13 cur = i + 1
14 for j in range(i):
15 if cur % 2 == 0:
16 cur //= 2
17 else:
18 cur = cur * 3 + 1
19 vals[i] = cur
20 return vals
21
22result = func1()
在这种情况下,Numba 提供了一种机制来控制并行区域的迭代次数(即块大小)分配到每个块中。然后 Numba 计算所需块的数量,该数量等于迭代次数除以块大小,并截断为最接近的整数。所有这些块的大小都大致相等。然后 Numba 将一个这样的块分配给上面配置的每个线程,当一个线程完成一个块时,Numba 将下一个可用块分配给该线程。这种调度方法类似于 OpenMP 的动态调度选项,带有指定的块大小。(请注意,Numba 仅在底层 Numba 线程后端(线程层)也能够进行动态调度的情况下,才能支持并行区域的动态调度。目前,只有 tbb
后端能够进行动态调度,因此如果希望从这种块大小选择机制中获得任何性能优势,则需要 tbb
后端。)为了最大限度地减少执行时间,程序员必须选择一个块大小,以平衡较小块大小带来的更好负载均衡和较大块大小带来的更少调度开销。有关块大小内部实现的更多详细信息,请参阅 并行块大小详细信息。
一个块中并行区域的迭代次数存储为线程局部变量,并且可以使用 numba.set_parallel_chunksize()
设置。此函数接受一个整数参数,其值必须大于或等于 0。值为 0 是默认值,它指示 Numba 使用上述静态调度方法。大于 0 的值指示 Numba 在上述动态调度方法中使用该值作为块大小。numba.set_parallel_chunksize()
返回块大小的先前值。此线程局部变量的当前值用作该线程调用的所有后续并行区域的块大小。但是,在进入并行区域时,Numba 将执行该并行区域的每个线程的块大小线程局部变量重置为默认值 0,因为任何嵌套并行区域都不太可能需要相同的块大小。如果同一个线程用于执行顺序区域和并行区域,则该线程的块大小变量在并行区域开始时设置为 0,并在退出并行区域时恢复其原始值。此行为在下面的示例中的 func1
中得到了证明,即 prange
并行区域内报告的块大小为 0,但并行区域外为 4。请注意,如果 prange
由于任何原因(例如,设置 parallel=False
)未并行执行,则非并行 prange 内报告的块大小将报告为 4。这种行为对于程序员来说可能最初是反直觉的,因为它与其他语言中线程局部变量的典型行为不同。如果程序员希望为可能启动的任何嵌套并行区域指定块大小,则可以在执行并行区域的线程中使用本节中描述的块大小 API。可以通过调用 numba.get_parallel_chunksize()
获取并行块大小的当前值。这两个函数都可以从标准 Python 中使用,也可以从 Numba JIT 编译函数中使用,如下所示。对 func1
的两次调用都将以块大小 4 执行,而 func2
将使用块大小 8。
test_chunksize_manual
的 numba/tests/doc_examples/test_parallel_chunksize.py
1from numba import (njit,
2 prange,
3 set_parallel_chunksize,
4 get_parallel_chunksize,
5 )
6
7@njit(parallel=True)
8def func1(n):
9 acc = 0
10 print(get_parallel_chunksize()) # Will print 4.
11 for i in prange(n):
12 print(get_parallel_chunksize()) # Will print 0.
13 acc += i
14 print(get_parallel_chunksize()) # Will print 4.
15 return acc
16
17@njit(parallel=True)
18def func2(n):
19 acc = 0
20 # This version gets the previous chunksize explicitly.
21 old_chunksize = get_parallel_chunksize()
22 set_parallel_chunksize(8)
23 for i in prange(n):
24 acc += i
25 set_parallel_chunksize(old_chunksize)
26 return acc
27
28# This version saves the previous chunksize as returned
29# by set_parallel_chunksize.
30old_chunksize = set_parallel_chunksize(4)
31result1 = func1(12)
32result2 = func2(12)
33result3 = func1(12)
34set_parallel_chunksize(old_chunksize)
由于这种保存和恢复的惯用语非常常见,Numba 提供了 parallel_chunksize()
with 语句上下文管理器来简化此惯用语。如下所示,此 with 语句可以从标准 Python 和 Numba JIT 编译函数中调用。与其他 Numba 上下文管理器一样,请注意,在作为 Numba JIT 编译函数一部分的上下文管理块中不支持引发异常。
test_chunksize_with
的 numba/tests/doc_examples/test_parallel_chunksize.py
1from numba import njit, prange, parallel_chunksize
2
3@njit(parallel=True)
4def func1(n):
5 acc = 0
6 for i in prange(n):
7 acc += i
8 return acc
9
10@njit(parallel=True)
11def func2(n):
12 acc = 0
13 with parallel_chunksize(8):
14 for i in prange(n):
15 acc += i
16 return acc
17
18with parallel_chunksize(4):
19 result1 = func1(12)
20 result2 = func2(12)
21 result3 = func1(12)
请注意,这些设置块大小的函数仅对使用 parallel 选项的 Numba 自动并行化有效。块大小的指定对 vectorize()
装饰器或 guvectorize()
装饰器没有影响。