示例
CUDA 内置目标弃用通知
Numba 中内置的 CUDA 目标已被弃用,未来的开发已移至 NVIDIA numba-cuda 包。请参阅 内置 CUDA 目标弃用和维护状态。
向量加法
此示例使用 Numba 创建设备上数组和向量加法内核;这是学习如何使用 Numba 编写 GPU 内核的热身。我们将从一些必要的导入开始
test_ex_vecadd
在 numba/cuda/tests/doc_examples/test_vecadd.py
1import numpy as np
2from numba import cuda
以下函数是内核。请注意,它是由 Python 变量定义的,类型未指定。当内核启动时,Numba 将检查在运行时传递的参数类型,并为它们生成一个专门的 CUDA 内核。
请注意,Numba 内核不返回值,必须将任何输出写入作为参数传入的数组中(这类似于 CUDA C/C++ 内核需要具有 void
返回类型的要求)。在这里,我们传入 c
以便将结果写入其中。
test_ex_vecadd
在 numba/cuda/tests/doc_examples/test_vecadd.py
1@cuda.jit
2def f(a, b, c):
3 # like threadIdx.x + (blockIdx.x * blockDim.x)
4 tid = cuda.grid(1)
5 size = len(c)
6
7 if tid < size:
8 c[tid] = a[tid] + b[tid]
cuda.to_device()
可用于创建数组的设备端副本。cuda.device_array_like()
创建一个与现有数组具有相同形状和类型的未初始化数组。在这里,我们传输两个向量并创建一个空向量来保存我们的结果
test_ex_vecadd
在 numba/cuda/tests/doc_examples/test_vecadd.py
1N = 100000
2a = cuda.to_device(np.random.random(N))
3b = cuda.to_device(np.random.random(N))
4c = cuda.device_array_like(a)
调用 forall()
会生成一个带有 1D 网格的适当启动配置(请参阅 内核调用),适用于给定数据大小,并且通常是启动内核最简单的方式
test_ex_vecadd
在 numba/cuda/tests/doc_examples/test_vecadd.py
1f.forall(len(a))(a, b, c)
2print(c.copy_to_host())
这将打印
[0.73548323 1.32061059 0.12582968 ... 1.25925809 1.49335059 1.59315414]
也可以使用下标语法手动配置网格。以下示例启动了一个具有足够线程的网格,以操作每个向量元素
test_ex_vecadd
在 numba/cuda/tests/doc_examples/test_vecadd.py
1# Enough threads per block for several warps per block
2nthreads = 256
3# Enough blocks to cover the entire vector depending on its length
4nblocks = (len(a) // nthreads) + 1
5f[nblocks, nthreads](a, b, c)
6print(c.copy_to_host())
这也将打印
[0.73548323 1.32061059 0.12582968 ... 1.25925809 1.49335059 1.59315414]
一维热方程
此示例解决了一维拉普拉斯方程,用于一组特定的初始条件和边界条件。关于拉普拉斯方程的全面讨论超出了本文档的范围,但可以说它描述了热量如何随时间通过物体传播。它通过两种方式离散化问题
域被划分为一个点网格,每个点都具有单独的温度。
时间被划分为离散的时间间隔,并按顺序向前推进。
然后,应用以下假设:一个点经过一段时间后的温度是与其直接相邻点的温度的加权平均值。直观地讲,如果域中的所有点都非常热,而中间的单个点非常冷,随着时间的推移,热点会导致冷点升温,而冷点会导致周围的热点略微冷却。简而言之,热量会在物体中扩散。
我们可以使用 Numba 内核实现此模拟。让我们从简单的开始,假设我们有一个一维对象,我们将其表示为一个值数组。数组中元素的位置是对象内点的位置,元素的值表示温度。
test_ex_laplace
在 numba/cuda/tests/doc_examples/test_laplace.py
1import numpy as np
2from numba import cuda
这里有一些初始设置。让我们将物体中心的一个点变得非常热。
test_ex_laplace
在 numba/cuda/tests/doc_examples/test_laplace.py
1# Use an odd problem size.
2# This is so there can be an element truly in the "middle" for symmetry.
3size = 1001
4data = np.zeros(size)
5
6# Middle element is made very hot
7data[500] = 10000
8buf_0 = cuda.to_device(data)
9
10# This extra array is used for synchronization purposes
11buf_1 = cuda.device_array_like(buf_0)
12
13niter = 10000
问题的初始状态可以可视化为
在我们的内核中,每个线程将负责在所需时间步数的循环中管理单个元素的温度更新。内核如下。请注意,此处使用了协作组同步,并且在每次迭代中交换了两个缓冲区以避免竞态条件。有关详细信息,请参阅 numba.cuda.cg.this_grid()
。
test_ex_laplace
在 numba/cuda/tests/doc_examples/test_laplace.py
1@cuda.jit
2def solve_heat_equation(buf_0, buf_1, timesteps, k):
3 i = cuda.grid(1)
4
5 # Don't continue if our index is outside the domain
6 if i >= len(buf_0):
7 return
8
9 # Prepare to do a grid-wide synchronization later
10 grid = cuda.cg.this_grid()
11
12 for step in range(timesteps):
13 # Select the buffer from the previous timestep
14 if (step % 2) == 0:
15 data = buf_0
16 next_data = buf_1
17 else:
18 data = buf_1
19 next_data = buf_0
20
21 # Get the current temperature associated with this point
22 curr_temp = data[i]
23
24 # Apply formula from finite difference equation
25 if i == 0:
26 # Left wall is held at T = 0
27 next_temp = curr_temp + k * (data[i + 1] - (2 * curr_temp))
28 elif i == len(data) - 1:
29 # Right wall is held at T = 0
30 next_temp = curr_temp + k * (data[i - 1] - (2 * curr_temp))
31 else:
32 # Interior points are a weighted average of their neighbors
33 next_temp = curr_temp + k * (
34 data[i - 1] - (2 * curr_temp) + data[i + 1]
35 )
36
37 # Write new value to the next buffer
38 next_data[i] = next_temp
39
40 # Wait for every thread to write before moving on
41 grid.sync()
调用内核
test_ex_laplace
在 numba/cuda/tests/doc_examples/test_laplace.py
1solve_heat_equation.forall(len(data))(
2 buf_0, buf_1, niter, 0.25
3)
绘制最终数据显示一个弧线,该弧线在物体最初炽热的地方最高,并逐渐向温度固定为零的边缘倾斜至零。在无限时间的极限下,该弧线将完全变平。
将点击数据划分为会话
商业分析中一个常见的问题是将在线平台用户的活动分组为会话,这被称为“会话化”(sessionization)。其理念是,用户通常会浏览网站并执行各种操作(点击某物、填写表格等),这些操作以离散组的形式进行。也许客户上午花了一些时间购物,然后晚上又购物——通常,企业有兴趣将这些时段视为与其服务的独立交互,这便产生了以某种约定方式通过编程将活动拆分的问题。
在这里,我们将演示如何编写一个 Numba 内核来解决这个问题。我们将从包含两个字段的数据开始:让 user_id
表示对应单个客户的唯一 ID,并让 action_time
表示在服务上执行某个未知操作的时间。目前,我们假设只有一种操作类型,所以只需要知道它发生的时间。
我们的目标是创建一个名为 session_id
的新列,其中包含对应于唯一会话的标签。我们将把会话之间的边界定义为两次点击之间至少间隔一小时。
test_ex_sessionize
在 numba/cuda/tests/doc_examples/test_sessionize.py
1import numpy as np
2from numba import cuda
3
4# Set the timeout to one hour
5session_timeout = np.int64(np.timedelta64("3600", "s"))
这是一个使用 Numba 的解决方案
test_ex_sessionize
在 numba/cuda/tests/doc_examples/test_sessionize.py
1@cuda.jit
2def sessionize(user_id, timestamp, results):
3 gid = cuda.grid(1)
4 size = len(user_id)
5
6 if gid >= size:
7 return
8
9 # Determine session boundaries
10 is_first_datapoint = gid == 0
11 if not is_first_datapoint:
12 new_user = user_id[gid] != user_id[gid - 1]
13 timed_out = (
14 timestamp[gid] - timestamp[gid - 1] > session_timeout
15 )
16 is_sess_boundary = new_user or timed_out
17 else:
18 is_sess_boundary = True
19
20 # Determine session labels
21 if is_sess_boundary:
22 # This thread marks the start of a session
23 results[gid] = gid
24
25 # Make sure all session boundaries are written
26 # before populating the session id
27 grid = cuda.cg.this_grid()
28 grid.sync()
29
30 look_ahead = 1
31 # Check elements 'forward' of this one
32 # until a new session boundary is found
33 while results[gid + look_ahead] == 0:
34 results[gid + look_ahead] = gid
35 look_ahead += 1
36 # Avoid out-of-bounds accesses by the last thread
37 if gid + look_ahead == size - 1:
38 results[gid + look_ahead] = gid
39 break
让我们生成一些数据并尝试运行内核
test_ex_sessionize
在 numba/cuda/tests/doc_examples/test_sessionize.py
1# Generate data
2ids = cuda.to_device(
3 np.array(
4 [
5 1, 1, 1, 1, 1, 1,
6 2, 2, 2,
7 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
8 4, 4, 4, 4, 4, 4, 4, 4, 4,
9 ]
10 )
11)
12sec = cuda.to_device(
13 np.array(
14 [
15 1, 2, 3, 5000, 5001, 5002, 1,
16 2, 3, 1, 2, 5000, 5001, 10000,
17 10001, 10002, 10003, 15000, 150001,
18 1, 5000, 50001, 15000, 20000,
19 25000, 25001, 25002, 25003,
20 ],
21 dtype="datetime64[ns]",
22 ).astype(
23 "int64"
24 ) # Cast to int64 for compatibility
25)
26# Create a vector to hold the results
27results = cuda.to_device(np.zeros(len(ids)))
如上所示,内核成功地将第一个用户 ID 的前三个数据点与后三个数据点分开,并且整个过程中都看到了类似的模式。
JIT 函数 CPU-GPU 兼容性
此示例演示了如何使用 numba.jit
对函数进行 JIT 编译以在 CPU 上运行,同时使其可在 CUDA 内核内部使用。这对于将工作流从 CPU 迁移到 GPU 的用户非常有用,因为他们可以直接重用潜在的业务逻辑,而无需进行较少的代码更改。
以下是一个示例函数
test_ex_cpu_gpu_compat
在 numba/cuda/tests/doc_examples/test_cpu_gpu_compat.py
1@numba.jit
2def business_logic(x, y, z):
3 return 4 * z * (2 * x - (4 * y) / 2 * pi)
函数 business_logic
可以在 CPU 上以编译形式独立运行
test_ex_cpu_gpu_compat
在 numba/cuda/tests/doc_examples/test_cpu_gpu_compat.py
1print(business_logic(1, 2, 3)) # -126.79644737231007
它也可以在 GPU 内核内部直接按线程重用。例如,可以生成一些向量来表示 x
、y
和 z
test_ex_cpu_gpu_compat
在 numba/cuda/tests/doc_examples/test_cpu_gpu_compat.py
1X = cuda.to_device([1, 10, 234])
2Y = cuda.to_device([2, 2, 4014])
3Z = cuda.to_device([3, 14, 2211])
4results = cuda.to_device([0.0, 0.0, 0.0])
以及一个引用了该修饰函数的 Numba 内核
test_ex_cpu_gpu_compat
在 numba/cuda/tests/doc_examples/test_cpu_gpu_compat.py
1@cuda.jit
2def f(res, xarr, yarr, zarr):
3 tid = cuda.grid(1)
4 if tid < len(xarr):
5 # The function decorated with numba.jit may be directly reused
6 res[tid] = business_logic(xarr[tid], yarr[tid], zarr[tid])
这个内核可以以正常方式调用
test_ex_cpu_gpu_compat
在 numba/cuda/tests/doc_examples/test_cpu_gpu_compat.py
1f.forall(len(X))(results, X, Y, Z)
2print(results)
3# [-126.79644737231007, 416.28324559588634, -218912930.2987788]
蒙特卡洛积分
此示例展示了如何使用 Numba 通过在 GPU 上快速生成随机数来近似定积分的值。蒙特卡洛积分数学机制的详细描述超出了本示例的范围,但可以简要描述为一种平均过程,其中曲线下的面积是通过对由其函数值形成的许多矩形取平均值来近似的。
此外,此示例展示了如何使用 cuda.reduce()
API 在 Numba 中执行规约。
test_ex_montecarlo
在 numba/cuda/tests/doc_examples/test_montecarlo.py
1import numba
2import numpy as np
3from numba import cuda
4from numba.cuda.random import (
5 create_xoroshiro128p_states,
6 xoroshiro128p_uniform_float32,
7)
让我们创建一个变量来控制抽取的样本数量
test_ex_montecarlo
在 numba/cuda/tests/doc_examples/test_montecarlo.py
1# number of samples, higher will lead to a more accurate answer
2nsamps = 1000000
以下内核实现了主要的积分例程
test_ex_montecarlo
在 numba/cuda/tests/doc_examples/test_montecarlo.py
1@cuda.jit
2def mc_integrator_kernel(out, rng_states, lower_lim, upper_lim):
3 """
4 kernel to draw random samples and evaluate the function to
5 be integrated at those sample values
6 """
7 size = len(out)
8
9 gid = cuda.grid(1)
10 if gid < size:
11 # draw a sample between 0 and 1 on this thread
12 samp = xoroshiro128p_uniform_float32(rng_states, gid)
13
14 # normalize this sample to the limit range
15 samp = samp * (upper_lim - lower_lim) + lower_lim
16
17 # evaluate the function to be
18 # integrated at the normalized
19 # value of the sample
20 y = func(samp)
21 out[gid] = y
这个便利函数调用内核,执行一些预处理和后处理步骤。请注意,它使用了 Numba 的规约 API 来对数组求和并计算最终结果
test_ex_montecarlo
在 numba/cuda/tests/doc_examples/test_montecarlo.py
1@cuda.reduce
2def sum_reduce(a, b):
3 return a + b
4
5def mc_integrate(lower_lim, upper_lim, nsamps):
6 """
7 approximate the definite integral of `func` from
8 `lower_lim` to `upper_lim`
9 """
10 out = cuda.to_device(np.zeros(nsamps, dtype="float32"))
11 rng_states = create_xoroshiro128p_states(nsamps, seed=42)
12
13 # jit the function for use in CUDA kernels
14
15 mc_integrator_kernel.forall(nsamps)(
16 out, rng_states, lower_lim, upper_lim
17 )
18 # normalization factor to convert
19 # to the average: (b - a)/(N - 1)
20 factor = (upper_lim - lower_lim) / (nsamps - 1)
21
22 return sum_reduce(out) * factor
我们现在可以使用 mc_integrate
来计算此函数在两个限制之间的定积分
test_ex_montecarlo
在 numba/cuda/tests/doc_examples/test_montecarlo.py
1# define a function to integrate
2@numba.jit
3def func(x):
4 return 1.0 / x
5
6mc_integrate(1, 2, nsamps) # array(0.6929643, dtype=float32)
7mc_integrate(2, 3, nsamps) # array(0.4054021, dtype=float32)
矩阵乘法
首先,导入此示例所需的模块
test_ex_matmul
在 numba/cuda/tests/doc_examples/test_matmul.py
1from numba import cuda, float32
2import numpy as np
3import math
这是一个使用 CUDA 内核实现的朴素矩阵乘法
test_ex_matmul
在 numba/cuda/tests/doc_examples/test_matmul.py
1@cuda.jit
2def matmul(A, B, C):
3 """Perform square matrix multiplication of C = A * B."""
4 i, j = cuda.grid(2)
5 if i < C.shape[0] and j < C.shape[1]:
6 tmp = 0.
7 for k in range(A.shape[1]):
8 tmp += A[i, k] * B[k, j]
9 C[i, j] = tmp
此函数的一个用法示例如下
test_ex_matmul
在 numba/cuda/tests/doc_examples/test_matmul.py
1x_h = np.arange(16).reshape([4, 4])
2y_h = np.ones([4, 4])
3z_h = np.zeros([4, 4])
4
5x_d = cuda.to_device(x_h)
6y_d = cuda.to_device(y_h)
7z_d = cuda.to_device(z_h)
8
9threadsperblock = (16, 16)
10blockspergrid_x = math.ceil(z_h.shape[0] / threadsperblock[0])
11blockspergrid_y = math.ceil(z_h.shape[1] / threadsperblock[1])
12blockspergrid = (blockspergrid_x, blockspergrid_y)
13
14matmul[blockspergrid, threadsperblock](x_d, y_d, z_d)
15z_h = z_d.copy_to_host()
16print(z_h)
17print(x_h @ y_h)
此实现直观易懂,但性能不佳,因为相同的矩阵元素会多次从设备内存中加载,这很慢(有些设备可能有透明数据缓存,但它们可能不足以一次性容纳所有输入)。
如果我们使用分块算法来减少对设备内存的访问,它会更快。CUDA 提供快速的 共享内存,供块中的线程协作计算任务。以下实现了使用共享内存的平方矩阵乘法的更快版本
test_ex_matmul
在 numba/cuda/tests/doc_examples/test_matmul.py
1# Controls threads per block and shared memory usage.
2# The computation will be done on blocks of TPBxTPB elements.
3# TPB should not be larger than 32 in this example
4TPB = 16
5
6@cuda.jit
7def fast_matmul(A, B, C):
8 """
9 Perform matrix multiplication of C = A * B using CUDA shared memory.
10
11 Reference: https://stackoverflow.com/a/64198479/13697228 by @RobertCrovella
12 """
13 # Define an array in the shared memory
14 # The size and type of the arrays must be known at compile time
15 sA = cuda.shared.array(shape=(TPB, TPB), dtype=float32)
16 sB = cuda.shared.array(shape=(TPB, TPB), dtype=float32)
17
18 x, y = cuda.grid(2)
19
20 tx = cuda.threadIdx.x
21 ty = cuda.threadIdx.y
22 bpg = cuda.gridDim.x # blocks per grid
23
24 # Each thread computes one element in the result matrix.
25 # The dot product is chunked into dot products of TPB-long vectors.
26 tmp = float32(0.)
27 for i in range(bpg):
28 # Preload data into shared memory
29 sA[ty, tx] = 0
30 sB[ty, tx] = 0
31 if y < A.shape[0] and (tx + i * TPB) < A.shape[1]:
32 sA[ty, tx] = A[y, tx + i * TPB]
33 if x < B.shape[1] and (ty + i * TPB) < B.shape[0]:
34 sB[ty, tx] = B[ty + i * TPB, x]
35
36 # Wait until all threads finish preloading
37 cuda.syncthreads()
38
39 # Computes partial product on the shared memory
40 for j in range(TPB):
41 tmp += sA[ty, j] * sB[j, tx]
42
43 # Wait until all threads finish computing
44 cuda.syncthreads()
45 if y < C.shape[0] and x < C.shape[1]:
46 C[y, x] = tmp
由于共享内存是有限的资源,代码会从输入数组中一次预加载一小块。然后,它调用 syncthreads()
,等待所有线程完成预加载,然后才在共享内存上进行计算。计算完成后,它再次同步,以确保所有线程都已处理完共享内存中的数据,然后才在下一次循环迭代中覆盖它。
函数 fast_matmul
的一个用法示例如下
test_ex_matmul
在 numba/cuda/tests/doc_examples/test_matmul.py
1x_h = np.arange(16).reshape([4, 4])
2y_h = np.ones([4, 4])
3z_h = np.zeros([4, 4])
4
5x_d = cuda.to_device(x_h)
6y_d = cuda.to_device(y_h)
7z_d = cuda.to_device(z_h)
8
9threadsperblock = (TPB, TPB)
10blockspergrid_x = math.ceil(z_h.shape[0] / threadsperblock[0])
11blockspergrid_y = math.ceil(z_h.shape[1] / threadsperblock[1])
12blockspergrid = (blockspergrid_x, blockspergrid_y)
13
14fast_matmul[blockspergrid, threadsperblock](x_d, y_d, z_d)
15z_h = z_d.copy_to_host()
16print(z_h)
17print(x_h @ y_h)
这通过了 CUDA 内存检查测试,这有助于调试。运行上述代码会产生以下输出
$ python fast_matmul.py
[[ 6. 6. 6. 6.]
[22. 22. 22. 22.]
[38. 38. 38. 38.]
[54. 54. 54. 54.]]
[[ 6. 6. 6. 6.]
[22. 22. 22. 22.]
[38. 38. 38. 38.]
[54. 54. 54. 54.]]
注意
对于 CUDA 中的高性能矩阵乘法,另请参阅 CuPy 实现。
此处概述的方法通过调整 blockspergrid
变量可以推广到非方阵乘法,如下所示
再次,以下是一个用法示例
test_ex_matmul
在 numba/cuda/tests/doc_examples/test_matmul.py
1x_h = np.arange(115).reshape([5, 23])
2y_h = np.ones([23, 7])
3z_h = np.zeros([5, 7])
4
5x_d = cuda.to_device(x_h)
6y_d = cuda.to_device(y_h)
7z_d = cuda.to_device(z_h)
8
9threadsperblock = (TPB, TPB)
10grid_y_max = max(x_h.shape[0], y_h.shape[0])
11grid_x_max = max(x_h.shape[1], y_h.shape[1])
12blockspergrid_x = math.ceil(grid_x_max / threadsperblock[0])
13blockspergrid_y = math.ceil(grid_y_max / threadsperblock[1])
14blockspergrid = (blockspergrid_x, blockspergrid_y)
15
16fast_matmul[blockspergrid, threadsperblock](x_d, y_d, z_d)
17z_h = z_d.copy_to_host()
18print(z_h)
19print(x_h @ y_h)
以及相应的输出
$ python nonsquare_matmul.py
[[ 253. 253. 253. 253. 253. 253. 253.]
[ 782. 782. 782. 782. 782. 782. 782.]
[1311. 1311. 1311. 1311. 1311. 1311. 1311.]
[1840. 1840. 1840. 1840. 1840. 1840. 1840.]
[2369. 2369. 2369. 2369. 2369. 2369. 2369.]]
[[ 253. 253. 253. 253. 253. 253. 253.]
[ 782. 782. 782. 782. 782. 782. 782.]
[1311. 1311. 1311. 1311. 1311. 1311. 1311.]
[1840. 1840. 1840. 1840. 1840. 1840. 1840.]
[2369. 2369. 2369. 2369. 2369. 2369. 2369.]]
调用 NumPy UFunc
CUDA 目标中支持的 UFuncs(请参阅 NumPy 支持)可以在内核内部调用,但输出数组必须作为位置参数传入。以下示例演示了遵循此模式在内核内部调用 np.sin()
的方法
test_ex_cuda_ufunc_call
在 numba/cuda/tests/doc_examples/test_ufunc.py
1import numpy as np
2from numba import cuda
3
4# A kernel calling a ufunc (sin, in this case)
5@cuda.jit
6def f(r, x):
7 # Compute sin(x) with result written to r
8 np.sin(x, r)
9
10# Declare input and output arrays
11x = np.arange(10, dtype=np.float32) - 5
12r = np.zeros_like(x)
13
14# Launch kernel that calls the ufunc
15f[1, 1](r, x)
16
17# A quick sanity check demonstrating equality of the sine computed by
18# the sin ufunc inside the kernel, and NumPy's sin ufunc
19np.testing.assert_allclose(r, np.sin(x))