本文介绍如何在 TVM 中规约(reduce)。关联规约算子(如 sum/max/min)是线性代数运算的典型构造块。

from __future__ import absolute_import, print_functionimport tvmimport tvm.testingfrom tvm import teimport numpy as np

在 NumPy 语法中,计算行的总和可以写成 B = numpy.sum(A, axis=1)

下面几行描述了行求和操作。为创建一个规约公式,用 te.reduce_axis 声明了一个 reduction 轴,它接收规约的范围。 te.sum 接收要规约的表达式以及 reduction 轴,并计算声明范围内所有 k 值的总和。

for (int i = 0; i 

有几种方法可以 Schedule Reduce,先打印出默认 Schedule 的 IR 代码

s = te.create_schedule(B.op)print(tvm.lower(s, [A, B], simple_mode=True))
@main = primfn(A_1: handle, B_1: handle) -> ()  attr = {"from_legacy_te_schedule": True, "global_symbol": "main", "tir.noalias": True}  buffers = {A: Buffer(A_2: Pointer(float32), float32, [(stride: int32*n: int32)], [], type="auto"),             B: Buffer(B_2: Pointer(float32), float32, [(stride_1: int32*n)], [], type="auto")}  buffer_map = {A_1: A, B_1: B}  preflattened_buffer_map = {A_1: A_3: Buffer(A_2, float32, [n, m: int32], [stride, stride_2: int32], type="auto"), B_1: B_3: Buffer(B_2, float32, [n], [stride_1], type="auto")} {  for (i: int32, 0, n) {    B[(i*stride_1)] = 0f32    for (k: int32, 0, m) {      B[(i*stride_1)] = (B[(i*stride_1)] + A[((i*stride) + (k*stride_2))])    }  }}
IR 代码与 C 代码非常相似,reduction 轴类似于普通轴,可以拆分。

以下代码按不同的因子将 B 的行轴和轴进行拆分,得到一个嵌套 reduction。

ko, ki = s[B].split(B.op.reduce_axis[0], factor=16)xo, xi = s[B].split(B.op.axis[0], factor=32)print(tvm.lower(s, [A, B], simple_mode=True))
@main = primfn(A_1: handle, B_1: handle) -> ()  attr = {"from_legacy_te_schedule": True, "global_symbol": "main", "tir.noalias": True}  buffers = {A: Buffer(A_2: Pointer(float32), float32, [(stride: int32*n: int32)], [], type="auto"),             B: Buffer(B_2: Pointer(float32), float32, [(stride_1: int32*n)], [], type="auto")}  buffer_map = {A_1: A, B_1: B}  preflattened_buffer_map = {A_1: A_3: Buffer(A_2, float32, [n, m: int32], [stride, stride_2: int32], type="auto"), B_1: B_3: Buffer(B_2, float32, [n], [stride_1], type="auto")} {  for (i.outer: int32, 0, floordiv((n + 31), 32)) {    for (i.inner: int32, 0, 32) {      if @tir.likely((((i.outer*32) + i.inner) 

把 B 的行绑定到 GPU 线程,从而构建一个 GPU 内核。

s[B].bind(xo, te.thread_axis("blockIdx.x"))s[B].bind(xi, te.thread_axis("threadIdx.x"))print(tvm.lower(s, [A, B], simple_mode=True))
@main = primfn(A_1: handle, B_1: handle) -> ()  attr = {"from_legacy_te_schedule": True, "global_symbol": "main", "tir.noalias": True}  buffers = {A: Buffer(A_2: Pointer(float32), float32, [(stride: int32*n: int32)], [], type="auto"),             B: Buffer(B_2: Pointer(float32), float32, [(stride_1: int32*n)], [], type="auto")}  buffer_map = {A_1: A, B_1: B}  preflattened_buffer_map = {A_1: A_3: Buffer(A_2, float32, [n, m: int32], [stride, stride_2: int32], type="auto"), B_1: B_3: Buffer(B_2, float32, [n], [stride_1], type="auto")} {  attr [IterVar(blockIdx.x: int32, (nullptr), "ThreadIndex", "blockIdx.x")] "thread_extent" = floordiv((n + 31), 32);  attr [IterVar(threadIdx.x: int32, (nullptr), "ThreadIndex", "threadIdx.x")] "thread_extent" = 32 {    if @tir.likely((((blockIdx.x*32) + threadIdx.x) 

构建规约时不能简单地在 reduction 轴上并行化,需要划分规约,将局部规约结果存储在数组中,然后再对临时数组进行规约。

rfactor 原语对计算进行了上述重写,在下面的调度中,B 的结果被写入一个临时结果 B.rf,分解后的维度成为 B.rf 的第一个维度。

s = te.create_schedule(B.op)ko, ki = s[B].split(B.op.reduce_axis[0], factor=16)BF = s.rfactor(B, ki)print(tvm.lower(s, [A, B], simple_mode=True))
@main = primfn(A_1: handle, B_1: handle) -> ()  attr = {"from_legacy_te_schedule": True, "global_symbol": "main", "tir.noalias": True}  buffers = {A: Buffer(A_2: Pointer(float32), float32, [(stride: int32*n: int32)], [], type="auto"),             B: Buffer(B_2: Pointer(float32), float32, [(stride_1: int32*n)], [], type="auto")}  buffer_map = {A_1: A, B_1: B}  preflattened_buffer_map = {A_1: A_3: Buffer(A_2, float32, [n, m: int32], [stride, stride_2: int32], type="auto"), B_1: B_3: Buffer(B_2, float32, [n], [stride_1], type="auto")} {  allocate(B.rf: Pointer(global float32), float32, [(n*16)]), storage_scope = global {    for (k.inner: int32, 0, 16) {      for (i: int32, 0, n) {        B.rf_1: Buffer(B.rf, float32, [(16*n)], [])[((k.inner*n) + i)] = 0f32        for (k.outer: int32, 0, floordiv((m + 15), 16)) {          if @tir.likely((((k.outer*16) + k.inner) 

B 的调度算子被重写为 B.f 的规约结果在第一个轴上的和。

print(s[B].op.body)
[reduce(combiner=comm_reducer(result=[(x + y)], lhs=[x], rhs=[y], identity_element=[0f]), source=[B.rf[k.inner.v, ax0]], init=[], axis=[iter_var(k.inner.v, range(min=0, ext=16))], where=(bool)1, value_index=0)]

接下来可以在因子轴上进行并行化,这里 B reduction 轴被标记为线程,如果唯一的 reduction 轴在设备中可以进行跨线程规约,则 TVM 允许将 reduction 轴标记为 thread。

也可以直接在规约轴上计算 BF。最终生成的内核会将行除以 blockIdx.x,将 threadIdx.y 列除以 threadIdx.x,最后对 threadIdx.x 进行跨线程规约。

xo, xi = s[B].split(s[B].op.axis[0], factor=32)s[B].bind(xo, te.thread_axis("blockIdx.x"))s[B].bind(xi, te.thread_axis("threadIdx.y"))tx = te.thread_axis("threadIdx.x")s[B].bind(s[B].op.reduce_axis[0], tx)s[BF].compute_at(s[B], s[B].op.reduce_axis[0])s[B].set_store_predicate(tx.var.equal(0))fcuda = tvm.build(s, [A, B], "cuda")print(fcuda.imported_modules[0].get_source())
#if defined(__CUDA_ARCH__) && (__CUDA_ARCH__ > 4); ++k_outer) {    if (((((int)blockIdx.x) * 32) + ((int)threadIdx.y)) > 4); ++k_outer1) {    if (((((int)blockIdx.x) * 32) + ((int)threadIdx.y)) > 4) * 16) + (k_outer1 * 16)) + ((int)threadIdx.x)) > 4) * 16) + (k_outer1 * 16)) + ((int)threadIdx.x)) * stride1))]);      }    }  }  uint mask[1];  float t0[1];  red_buf0[0] = B_rf[0];  mask[0] = (__activemask() & ((uint)(65535 

结果内核与 NumPy 进行比较来验证结果内核的正确性。

nn = 128dev = tvm.cuda(0)a = tvm.nd.array(np.random.uniform(size=(nn, nn)).astype(A.dtype), dev)b = tvm.nd.array(np.zeros(nn, dtype=B.dtype), dev)fcuda(a, b)tvm.testing.assert_allclose(b.numpy(), np.sum(a.numpy(), axis=1), rtol=1e-4)

在 TVM 中,用简单的二维规约来描述卷积(过滤器大小 = [3, 3],步长 = [1, 1])。

n = te.var("n")Input = te.placeholder((n, n), name="Input")Filter = te.placeholder((3, 3), name="Filter")di = te.reduce_axis((0, 3), name="di")dj = te.reduce_axis((0, 3), name="dj")Output = te.compute(    (n - 2, n - 2),    lambda i, j: te.sum(Input[i + di, j + dj] * Filter[di, dj], axis=[di, dj]),    name="Output",)s = te.create_schedule(Output.op)print(tvm.lower(s, [Input, Filter, Output], simple_mode=True))
@main = primfn(Input_1: handle, Filter_1: handle, Output_1: handle) -> ()  attr = {"from_legacy_te_schedule": True, "global_symbol": "main", "tir.noalias": True}  buffers = {Input: Buffer(Input_2: Pointer(float32), float32, [(stride: int32*n: int32)], [], type="auto"),             Filter: Buffer(Filter_2: Pointer(float32), float32, [9], []),             Output: Buffer(Output_2: Pointer(float32), float32, [((n - 2)*(n - 2))], [])}  buffer_map = {Input_1: Input, Filter_1: Filter, Output_1: Output}  preflattened_buffer_map = {Input_1: Input_3: Buffer(Input_2, float32, [n, n], [stride, stride_1: int32], type="auto"), Filter_1: Filter_3: Buffer(Filter_2, float32, [3, 3], []), Output_1: Output_3: Buffer(Output_2, float32, [(n - 2), (n - 2)], [])} {  for (i: int32, 0, (n - 2)) {    for (j: int32, 0, (n - 2)) {      Output[((i*(n - 2)) + j)] = 0f32      for (di: int32, 0, 3) {        for (dj: int32, 0, 3) {          Output[((i*(n - 2)) + j)] = (Output[((i*(n - 2)) + j)] + (Input[(((i + di)*stride) + ((j + dj)*stride_1))]*Filter[((di*3) + dj)]))        }      }    }  }}

除了 te.sum, tvm.te.mintvm.te.max 等内置规约操作外,还可以通过 te.comm_reducer 定义交换规约操作。

n = te.var("n")m = te.var("m")product = te.comm_reducer(lambda x, y: x * y, lambda t: tvm.tir.const(1, dtype=t), name="product")A = te.placeholder((n, m), name="A")k = te.reduce_axis((0, m), name="k")B = te.compute((n,), lambda i: product(A[i, k], axis=k), name="B")

    用 reduce_axis 描述规约。

    如需并行性(parallelism),用 rfactor 来分解轴。

    通过 te.comm_reducer 定义新的规约操作。