tvm.tir.schedule
TensorIR 调度 API 的命名空间。
- class tvm.tir.schedule.BlockScope
一个对象对应于 sref 树中的每个块 sref,它跟踪块之间的生产者-消费者依赖关系。
术语表
块作用域:sref 树的连续子树,以每个块 sref 为根,其组成部分为
作用域根:一个块 sref
内部 sref:循环 sref
作用域叶节点:块 sref
子块:作用域根或特定内部 sref 下的作用域叶节点块
- get_deps_by_src(block: StmtSRef) List[Dependency]
获取所有 src 为目标 `block` 的依赖项。
- 参数:
block (StmtSRef) – 被查询的块
- 返回:
blocks – 依赖项
- 返回类型:
List[Dependency]
- get_deps_by_dst(block: StmtSRef) List[Dependency]
获取所有 dst 为目标 block 的依赖项。
- 参数:
block (StmtSRef) – 被查询的块
- 返回:
blocks – 依赖项
- 返回类型:
List[Dependency]
- class tvm.tir.schedule.Dependency
一个元组 (src, dst, kind) 表示特定类型的依赖关系。例如,(A, B, kRAW) 表示块 B 依赖于块 A,依赖关系类型为读后写 (read-after-write),这意味着块 B 读取块 A 写入的结果。
- class tvm.tir.schedule.DepKind(value)
依赖关系类型。
- RAW
读后写依赖关系
- 类型:
int = 0
- WAW
写后写依赖关系
- 类型:
int = 1
- WAR
写后读依赖关系。TensorIR 暂时不支持。
- 类型:
int = 2
- OPAQUE
不透明依赖关系
- 类型:
int = 3
- class tvm.tir.schedule.StmtSRef
一个对象,指代 TensorIR 中可调度的元素,也称为 “sref”。
术语表 - 块 sref:指向 TensorIR 块的 StmtSref。 - 循环 sref:指向 TensorIR for 循环的 StmtSRef。 - 父 sref:sref 的父 sref 是指向其祖先在 TensorIR AST 上最近的可调度语句的块/循环 sref。 - 根 sref:指向根块的 Sref。除了根 sref 之外,每个 sref 都有且只有一个父 sref。 - Sref 树:sref 的父子关系形成的树,由 TensorIR AST 唯一确定。
- class tvm.tir.schedule.Instruction(kind: InstructionKind, inputs: List[Any], attrs: List[Any], outputs: List[Any])
调度指令,每个指令对应一个调度原语
- kind
指令的类型
- 类型:
- inputs
指令的输入随机变量,每个元素的类型可以是以下之一: - BlockRV - LoopRV - ExprRV - float - int - str - None
- 类型:
List[INPUT_RV_TYPE]
- attrs
指令的属性。与运算符的属性类似,指令的属性是指令所需的任意常量元数据。例如,在 GetBlock 中要检索的块的名称。
- 类型:
List[ATTR_TYPE]
- outputs
指令的输出随机变量,每个元素的类型可以是以下之一: - BlockRV - LoopRV - ExprRV,仅原子变量,不会是常量或复合 PrimExpr
- 类型:
List[OUTPUT_RV_TYPE]
- class tvm.tir.schedule.InstructionKind
指令的类型,例如 Split、Reorder 等。除了名称之外,每种类型的指令都有其自身的属性,包括:1) 一个布尔值,指示指令是否为纯指令,即在调度状态中不更改任何内容;2) 一个将指令应用于 TensorIR 调度的仿函数;3) 一个将指令转换为 Python 语法语句的仿函数;4) 一个将其属性序列化为 JSON 的仿函数;5) 一个从 JSON 反序列化其属性的仿函数。
与 tvm.ir.op 不同,InstructionKind 不支持非结构化属性,主要是因为目前还没有用例来添加任何其他属性。
注意
仿函数属性目前未在 Python 端公开
- property is_pure: bool
指示指令是否为纯指令,即单独删除它不会改变调度状态。例如,指令 GetBlock 是纯指令,因为它不更改任何内容,而 ComputeInline 不是,因为删除它会导致不同的结果调度。
- 返回:
pure – 指示指令是否为纯指令的布尔标志
- 返回类型:
- static get(name: str) InstructionKind
使用名称检索 InstructionKind
- 参数:
name (str) – InstructionKind 的注册名称
- 返回:
kind – 检索到的 InstructionKind
- 返回类型:
- class tvm.tir.schedule.BlockRV
指代块的随机变量
- class tvm.tir.schedule.LoopRV
指代循环的随机变量
- class tvm.tir.schedule.Schedule(mod: PrimFunc | IRModule, *, seed: int | None = None, debug_mask: str | int = 'none', error_render_level: str = 'detail', enable_check: bool = True)
面向用户的调度类
调度是一组变换,它们更改计算的顺序,但保留计算的语义。 调度的一些示例:1) 将循环拆分为两个;2) 重新排序两个循环;3) 将特定缓冲区的计算内联到其消费者中
调度类存储辅助信息,以便正确有效地进行调度。
教程链接:https://tvm.apache.org/docs/tutorials/language/schedule_primitives.html
- property state: ScheduleState
返回当前调度类中的 ScheduleState
- work_on(func_name: str) None
指示调度处理 IRModule 中的函数。
默认情况下,调度处理名称为 “main” 的函数,或者 IRModule 中唯一的函数(如果只有一个函数)。如果 IRModule 中有多个函数,并且它们的名称都不是 “main”,则用户将必须调用此方法来显式指定要处理的函数。
如果未指定 func_name,则此语法糖函数将引导 GetBlock 方法。
- 参数:
func_name (str) – 要处理的函数的名称。
- copy() Schedule
返回调度的副本,包括状态和符号表,* 保证 * 1) SRef 树完全重建;* 2) 正在调度的 IRModule 未被触及;* 3) 所有随机变量在副本中都有效,指向相应的 sref * 重建
- 返回:
copy – 调度的新副本
- 返回类型:
- get(rand_var_or_sref: PrimExpr | BlockRV | LoopRV | StmtSRef) int | Block | For | None
返回值: - BlockRV 评估到的对应 Block; - LoopRV 评估到的对应 For; - ExprRV 评估到的对应整数; - 块 sref 指向的对应 Block; - 循环 sref 指向的对应 For;
- get_sref(rand_var_or_stmt: BlockRV | LoopRV | Block | For) StmtSRef | None
返回给定对象的对应 sref:1) LoopRV 2) BlockRV 3) Block 4) For
- sample_categorical(candidates: List[int], probs: List[float], decision: int | None = None) PrimExpr
根据概率分布采样一个整数
- sample_perfect_tile(loop: LoopRV, n: int, max_innermost_factor: int = 16, decision: List[int] | None = None) List[PrimExpr]
采样因子以完美分块特定的循环
- sample_partitioned_tile(loop: LoopRV, n: int, partition_pos: int = 0, innerpart_factor: int = 1, decision: List[int] | None = None) List[PrimExpr]
采样因子以对特定循环进行分区平铺
- get_block(name: str, func_name: str | None = None) BlockRV
使用名称检索特定函数中的代码块
默认情况下,如果未指定 func_name,调度器将在当前“正在处理”的函数中搜索代码块。要切换要处理的函数,请在使用此方法之前使用 work_on。
- get_output_blocks(scope_block: BlockRV | str) List[BlockRV]
获取给定作用域内的输出代码块列表。输出代码块是至少有一个缓冲区被写入,但未在 PrimFunc 中分配的代码块
- merge(*loops: List[LoopRV]) LoopRV
将一系列循环合并为一个循环。其 LCA 下的循环需要:1) 在同一作用域下。 2) 不能有注解或线程绑定。 3) 以 0 开始,并具有相同的范围和相同的嵌套深度。 4) 从目标循环到其 LCA,内层循环必须是外层循环的唯一子循环。
示例
在应用合并之前,在 TensorIR 中,IR 为
@T.prim_func def before_merge(a: T.handle, b: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = A[vi, vj] * 2.0
创建调度器并执行融合
sch = tir.Schedule(before_fuse) i1, _ = sch.get_loops(sch.get_block("B")) i2, _ = sch.get_loops(sch.get_block("C")) sch.merge(i1, i2) print(sch.mod["main"].script())
应用融合后,IR 变为
@T.prim_func def after_fuse(a: T.handle, b: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) C = T.match_buffer(c, (128, 128)) # the 2 loops are merged into 1 for i_m in range(128): for j in range(128): with T.block("B"): vi, vj = T.axis.remap("SS", [i_m, j]) T.reads(A[vi, vj]) T.writes(B[vi, vj]) B[vi, vj] = A[vi, vj] * T.float32(2) for j in range(128): with T.block("C"): vi, vj = T.axis.remap("SS", [i_m, j]) T.reads(A[vi, vj]) T.writes(C[vi, vj]) C[vi, vj] = A[vi, vj] * T.float32(2)
- fuse(*loops: List[LoopRV], preserve_unit_iters: bool = True) LoopRV
将一系列连续循环融合为一个循环。它需要:1) 循环不能有注解或线程绑定。 2) 第 (i+1) 个循环必须是第 i 个循环的唯一子循环。 3) 所有循环必须以 0 开始。 4) 要融合的循环的域不能依赖于另一个要融合的循环。
示例
在应用融合之前,在 TensorIR 中,IR 为
@T.prim_func def before_fuse(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建调度器并执行融合
sch = tir.Schedule(before_fuse) i, j = sch.get_loops(sch.get_block("B")) sch.fuse(i, j) print(sch.mod["main"].script())
应用融合后,IR 变为
@T.prim_func def after_fuse(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) # the 2 loops are fused into 1 for i_j_fused in T.serial(0, 16384): with T.block("B"): vi = T.axis.S(128, T.floordiv(i_j_fused, 128)) vj = T.axis.S(128, T.floormod(i_j_fused, 128)) B[vi, vj] = A[vi, vj] * 2.0
- split(loop: LoopRV, factors: List[int | PrimExpr | None], preserve_unit_iters: bool = True, disable_predication: bool = False) List[LoopRV]
将一个循环拆分为一系列连续的循环。它需要:1) 循环不能有注解或线程绑定。 2) 循环必须以 0 开始。可以添加谓词以确保循环总数保持不变。在 factors 中,最多可以有一个因子为 None,它将被自动推断。
- 参数:
- 返回:
split_loops – 拆分后的新循环
- 返回类型:
List[LoopRV]
示例
在拆分之前,在 TensorIR 中,IR 为
@T.prim_func def before_split(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建调度器并执行拆分
sch = tir.Schedule(before_split) i, j = sch.get_loops(sch.get_block("B")) sch.split(i, factors=[2, 64]) print(sch.mod["main"].script())
应用拆分后,IR 变为
@T.prim_func def after_split(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) # the original loop is split into 2 loops for i0, i1, j in T.grid(2, 64, 128): with T.block("B"): vi = T.axis.S(128, i0 * 64 + i1) vj = T.axis.S(128, j) B[vi, vj] = A[vi, vj] * 2.0
- loop_partition(loop: LoopRV, factors: List[int | PrimExpr | None], preserve_unit_iters: bool = True) List[LoopRV]
将一个循环分区为一系列连续的循环。它需要:1) 循环不能有注解或线程绑定。可以添加谓词以确保循环总数保持不变。在 factors 中,最多可以有一个因子为 None,它将被自动推断。
- 参数:
- 返回:
partition_loops – 分区后的新循环
- 返回类型:
List[LoopRV]
示例
在分区之前,在 TensorIR 中,IR 为
@T.prim_func def before_partition(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建调度器并执行分区
sch = tir.Schedule(before_partition) i, j = sch.get_loops(sch.get_block("B")) sch.partition(i, factors=[2, 64]) print(sch.mod["main"].script())
应用分区后,IR 变为
def after_partition(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) # the original loop is partition into 3 loops with T.block("root"): T.reads() T.writes() with T.block("B_i_common"): T.reads() T.writes() with T.block("B_i0_partition"): T.reads() T.writes() for i0, j in T.grid(2, 128): with T.block("B_i0"): vi, vj = T.axis.remap("SS", [i0, j]) T.reads(A[0:2, 0:128]) T.writes(B[0:2, 0:128]) B[vi, vj] = A[vi, vj] * T.float32(2) with T.block("B_i1_partition"): T.reads() T.writes() for i1 in range(2, 66): for j in range(128): with T.block("B_i1"): vi, vj = T.axis.remap("SS", [i1, j]) T.reads(A[2:66, 0:128]) T.writes(B[2:66, 0:128]) B[vi, vj] = A[vi, vj] * T.float32(2) with T.block("B_partition_2"): T.reads() T.writes() for i2 in range(66, 128): for j in range(128): with T.block("B_i2"): vi, vj = T.axis.remap("SS", [i2, j]) T.reads(A[66:128, 0:128]) T.writes(B[66:128, 0:128]) B[vi, vj] = A[vi, vj] * T.float32(2)
- reorder(*ordered_loops: List[LoopRV]) None
重新排序一系列循环。它不要求循环是连续的。它需要:1) 循环在同一链中。这意味着:循环可以排序为 [l_1, l_2, … , l_n],其中 l_i 是 l_{i+1} 的祖先,并且在 l_1 和 l_n 之间只有单分支循环(这也表明它们在同一作用域下)。 2) 重新排序后,外层循环的域不能依赖于任何内层循环。 3) 对于循环嵌套下的每个代码块,其代码块绑定必须是仿射的,并且代码块变量必须是数据并行或规约。 4) 参数中不允许重复循环。
- 参数:
*ordered_loops (List[LoopRV]) – 新顺序的循环
示例
在重新排序之前,在 TensorIR 中,IR 为
@T.prim_func def before_reorder(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建调度器并执行重新排序
sch = tir.Schedule(before_reorder) i, j = sch.get_loops(sch.get_block("B")) sch.reorder(j, i) print(sch.mod["main"].script())
应用重新排序后,IR 变为
@T.prim_func def after_reorder(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) # Here j and i are reordered for j, i in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
- reorder_block_iter_var(block: BlockRV, new_order: List[int]) None
重新排序给定代码块内的迭代变量。
示例
在 reorder_block_iter_var 之前,在 TensorIR 中,IR 为
@T.prim_func def matmul( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32"), ) -> None: for i, j, k in T.grid(128, 128, 128): with T.block("C"): vi, vj, vk = T.axis.remap("SSR", [i, j, k]) with T.init(): C[vi, vj] = 0.0 C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vj, vk]
创建调度器并执行 reorder_block_iter_var
sch = tir.Schedule(matmul) C = sch.get_block("C") sch.reorder_block_iter_var(C, [2, 1, 0])
应用 reorder_block_iter_var 后,IR 变为
@T.prim_func def matmul_after_reorder_block_iter_var( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32"), ): for i, j, k in T.grid(128, 128, 128): with T.block("C"): vk, vj, vi = T.axis.remap("RSS", [k, j, i]) T.reads(A[vi, vk], B[vj, vk]) T.writes(C[vi, vj]) with T.init(): C[vi, vj] = T.float32(0) C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vj, vk]
另请参阅
- add_unit_loop(block_or_loop: LoopRV | BlockRV) LoopRV
在特定代码块或循环之上创建一个新的单位循环。
示例
在 add_unit_loop 之前,在 TensorIR 中,IR 是
@T.prim_func def before_add_unit_loop( A: T.Buffer((), "int32"), B: T.Buffer((), "int32"), C: T.Buffer((), "int32"), ) -> None: with T.block("C"): vi = T.axis.spatial(1, 0) C[()] = A[()] + B[()]
创建 schedule 并执行 add-unit-loop
sch = tir.Schedule(before_add_unit_loop) sch.add_unit_loop(sch.get_block("C")) print(sch.mod["main"].script())
应用 add-unit-loop 之后,IR 变为
@T.prim_func def after_add_unit_loop( A: T.Buffer((), "int32"), B: T.Buffer((), "int32"), C: T.Buffer((), "int32"), ) -> None: for u in T.serial(1): with T.block("C"): vi = T.axis.spatial(1, 0) C[()] = A[()] + B[()]
- parallel(loop: LoopRV) None
并行化输入循环。它需要:1) 循环所在的 scope block 应具有 stage-pipeline 属性 2) 循环下的所有 block 都是 complete block 或 reduction block,并具有 affine 绑定 3) 对于循环下的每个 block,该循环只能包含在 data-parallel block iters 的绑定中
- 参数:
loop (LoopRV) – 要并行化的循环
示例
在 parallel 之前,在 TensorIR 中,IR 是
@T.prim_func def before_parallel(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 parallel
sch = tir.Schedule(before_parallel) i, j = sch.get_loops(sch.get_block("B")) sch.parallel(i)
应用 parallel 之后,IR 变为
@T.prim_func def after_parallel(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i in T.parallel(0, 128): for j in T.serial(0, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
- vectorize(loop: LoopRV) None
向量化输入循环。它需要:1) 循环所在的 scope block 应具有 stage-pipeline 属性 2) 循环下的所有 block 都是 complete block 或 reduction block,并具有 affine 绑定 3) 对于循环下的每个 block,该循环只能包含在 data-parallel block iters 的绑定中
- 参数:
loop (LoopRV) – 要向量化的循环
示例
在 vectorize 之前,在 TensorIR 中,IR 是
@T.prim_func def before_vectorize(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 vectorize
sch = tir.Schedule(before_vectorize) i, j = sch.get_loops(sch.get_block("B")) sch.vectorize(j)
应用 vectorize 之后,IR 变为
@T.prim_func def after_vectorize(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i in T.serial(0, 128): for j in T.vectorized(0, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
- bind(loop: LoopRV, thread_axis: str) None
将输入循环绑定到给定的线程轴。它需要:1) 循环所在的 scope block 应具有 stage-pipeline 属性 2) 循环下的所有 block 都是 complete block 或 reduction block,并具有 affine 绑定 3) 对于循环下的每个 block,如果线程轴以 “threadIdx” 开头,则循环只能包含在 data-parallel block iter 和 reduction block iters 的绑定中。否则,循环只能包含在 data-parallel block iters 的绑定中
- 参数:
示例
在 bind 之前,在 TensorIR 中,IR 是
@T.prim_func def before_bind(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 bind
sch = tir.Schedule(before_bind) i, j = sch.get_loops(sch.get_block("B")) sch.bind(i, "blockIdx.x") sch.bind(j, "threadIdx.x")
应用 bind 之后,IR 变为
@T.prim_func def after_bind(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i in T.thread_binding(0, 128, thread = "blockIdx.x"): for j in T.thread_binding(0, 128, thread = "threadIdx.x"): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
- unroll(loop: LoopRV) None
展开输入循环。它没有任何要求
- 参数:
loop (LoopRV) – 要展开的循环
示例
在 unroll 之前,在 TensorIR 中,IR 是
@T.prim_func def before_unroll(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 unroll
sch = tir.Schedule(before_unroll) i, j = sch.get_loops(sch.get_block("B")) sch.unroll(i)
应用 unroll 之后,IR 变为
@T.prim_func def after_unroll(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i in T.unroll(0, 128): for j in T.serial(0, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
- cache_read(block: BlockRV | str, read_buffer_index: int | str | Buffer, storage_scope: str, consumer_blocks: List[BlockRV | str] | None = None) BlockRV
创建一个 block,它将 buffer 区域读取到读缓存中。它需要
在 scope 中最多只有一个 block 写入 buffer。
Scope block 具有 stage-pipeline 属性。
- 参数:
- 返回:
cached_block – 缓存阶段的 block
- 返回类型:
示例
在 cache_read 之前,在 TensorIR 中,IR 是
@T.prim_func def before_cache_read(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 cache_read
sch = tir.Schedule(before_cache_read) block_b = sch.get_block("B") sch.cache_read(block_b, 0, "local") print(sch.mod["main"].script())
应用 cache_read 之后,IR 变为
@T.prim_func def after_cache_read(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) A_local = T.alloc_buffer((128, 128), scope="local") for i, j in T.grid(128, 128): with T.block("A_local"): vi, vj = T.axis.remap("SS", [i, j]) A_local[vi, vj] = A[vi, vj] for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A_local[vi, vj] * 2.0
- cache_write(block: BlockRV | str, write_buffer_index: int | str | Buffer, storage_scope: str, consumer_blocks: List[BlockRV | str] | None = None) BlockRV
创建一个 block,它将 buffer 区域读取到写缓存中。它需要
在 scope 中只有一个 block 写入 buffer。
Scope block 具有 stage-pipeline 属性。
- 参数:
- 返回:
cached_block – 缓存阶段的 block
- 返回类型:
示例
在 cache_write 之前,在 TensorIR 中,IR 是
@T.prim_func def before_cache_write(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 cache_write
sch = tir.Schedule(before_cache_write) block_b = sch.get_block("B") sch.cache_write(block_b, 0, "local") print(sch.mod["main"].script())
应用 cache_write 之后,IR 变为
@T.prim_func def after_cache_write(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) B_local = T.alloc_buffer((128, 128), scope="local") for i, j in T.grid(128, 128): with T.block("A_local"): vi, vj = T.axis.remap("SS", [i, j]) B_local[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = B_local[vi, vj]
- reindex_cache_read(block: BlockRV | str, read_buffer_index: int, storage_scope: str, index_map: IndexMap | Callable) BlockRV
创建一个 block,它使用索引映射指定的自定义索引将 buffer 区域读取到读缓存中。buffer 的读取区域必须是单点。
缓存阶段 block 遵循 block 中循环和 block itervars 的原始顺序。如果 block itervar 未出现在 buffer 访问区域中,则它及其对应的循环变量将被省略。然后,用户可以使用 transform_block_layout 原语来重新排序缓存读取/写入 block 的 block itervars 和周围的循环。
与 cache_read 不同,reindex_cache_read 仅支持单个 consumer,当有多个 consumer 时,请使用 cache_read。
- 参数:
- 返回:
cached_block – 缓存阶段的 block
- 返回类型:
示例
在 reindex_cache_read 之前,在 TensorIR 中,IR 是
@T.prim_func def before_reindex_cache_read(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 reindex_cache_read
sch = tir.Schedule(before_cache_read) block_b = sch.get_block("B") sch.reindex_cache_read(block_b, 0, "local", lambda vi, vj: (vj, vi)) print(sch.mod["main"].script())
应用 reindex_cache_read 之后,IR 变为
@T.prim_func def after_reindex_cache_read(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) A_local = T.alloc_buffer((128, 128), scope="local") for i, j in T.grid(128, 128): with T.block("A_local"): vi, vj = T.axis.remap("SS", [i, j]) A_local[vj, vi] = A[vi, vj] for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A_local[vj, vi] * 2.0
- reindex_cache_write(block: BlockRV | str, write_buffer_index: int, storage_scope: str, index_map: Callable | IndexMap) BlockRV
创建一个 block,它使用索引映射指定的自定义索引将 buffer 区域读取到写缓存中。buffer 的写入区域必须是单点。
缓存阶段 block 遵循 block 中循环和 block itervars 的原始顺序。如果 block itervar 未出现在 buffer 访问区域中,则它及其对应的循环变量将被省略。然后,用户可以使用 transform_block_layout 原语来重新排序缓存读取/写入 block 的 block itervars 和周围的循环。
与 cache_write 不同,reindex_cache_write 仅支持单个 consumer,当有多个 consumer 时,请使用 cache_write。
- 参数:
- 返回:
cached_block – 缓存阶段的 block
- 返回类型:
示例
在 reindex_cache_write 之前,在 TensorIR 中,IR 是
@T.prim_func def before_reindex_cache_write(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 reindex_cache_write
sch = tir.Schedule(before_cache_write) block_b = sch.get_block("B") sch.reindex_cache_write(block_b, 0, "local", lambda vi, vj: (vi // 2, vi % 2, vj)) print(sch.mod["main"].script())
应用 reindex_cache_write 之后,IR 变为
@T.prim_func def after_cache_write(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (64, 2, 128)) B_local = T.alloc_buffer((128, 128), scope="local") for i, j in T.grid(128, 128): with T.block("A_local"): vi, vj = T.axis.remap("SS", [i, j]) B_local[vi % 2, vi // 2, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = B_local[vi % 2, vi // 2, vj]
- cache_inplace(block: BlockRV | str, read_buffer_index: int | str | Buffer, storage_scope: str) List[BlockRV]
创建读取和写入 buffer 区域到缓存 block 的 block。它要求目标 block 同时读取和写入目标 buffer。主要用于 inplace 操作。
- 参数:
- 返回:
cached_blocks – 缓存阶段的 block,先读缓存,后写缓存
- 返回类型:
List[BlockRV]
示例
在 cache_inplace 之前,在 TensorIR 中,IR 是
@T.prim_func def before_cache_inplace(data_io: T.Buffer((64), "int32")): for i0 in T.serial(1): with T.block("A"): T.reads(data_io[:64]) T.writes(data_io[:64]) T.evaluate(T.call_extern("call_impl", data_io.data, dtype=""))
创建 schedule 并执行 cache_inplace
sch = tir.Schedule(before_cache_inplace) block_a = sch.get_block("A") sch.cache_inplace(block_a, 0, "local") print(sch.mod["main"].script())
应用 cache_inplace 之后,IR 变为
@T.prim_func def cache_inplace(data_io: T.Buffer(64, "int32")) -> None: data_io_local = T.alloc_buffer([64], dtype="int32", scope="local") for i0 in T.serial(1): for ax0 in T.serial(64): with T.block("data_io_local"): v0 = T.axis.spatial(64, ax0) T.reads(data_io[v0]) T.writes(data_io_local[v0]) data_io_local[v0] = data_io[v0] with T.block("A"): T.reads(data_io_local[0 : 64]) T.writes(data_io_local[0 : 64]) T.evaluate(T.call_extern("call_impl", data_io_local.data, dtype="")) for ax0 in T.serial(64): with T.block("data_io_local"): v0 = T.axis.spatial(64, ax0) T.reads(data_io_local[v0]) T.writes(data_io[v0]) data_io[v0] = data_io_local[v0]
- cache_index(block: BlockRV | str, storage_scope: str, cse_thresh: int = 0) List[BlockRV]
创建一个 block 以缓存预先计算的索引,供以后使用。如果没有索引计算,则保持不变。
- 参数:
- 返回:
cached_blocks – 写入缓存 buffer 的阶段的 block
- 返回类型:
List[BlockRV]
示例
在 cache_inplace 之前,在 TensorIR 中,IR 是
@T.prim_func def resize(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (1, 3, 40, 40)) B = T.match_buffer(b, (1, 3, 80, 80)) for i0, i1, i2, i3 in T.grid(1, 3, 80, 80): with T.block("A"): n, c, vi, vj = T.axis.remap("SSSS", [i0, i1, i2, i3]) B[n, c, vi, vj] = A[n, c, vi//4 + vj//4, vj//2]
创建 schedule 并执行 cache_index
sch = tir.Schedule(resize) block_a = sch.get_block("A") sch.cache_index(block_a, "global", 1) print(sch.mod["main"].script())
应用 cache_index 之后,IR 变为
@T.prim_func def resize_cache_index( A: T.Buffer((1, 3, 40, 40), "float32"), B: T.Buffer((1, 3, 80, 80), "float32") ) -> None: index_var_0 = T.alloc_buffer([80, 80], dtype="int32", strides=[1]) index_var_1 = T.alloc_buffer([80], dtype="int32", strides=[1]) for ax0, ax1 in T.grid(80, 80): with T.block("index_0"): v0 = T.axis.spatial(80, ax0) v1 = T.axis.spatial(80, ax1) T.reads() T.writes(index_var_0[v0, v1]) index_var_0[v0, v1] = v0 // 4 + v1 // 4 for ax0 in T.serial(80): with T.block("index_1"): v0 = T.axis.spatial(80, ax0) T.reads() T.writes(index_var_1[v0]) index_var_1[v0] = v0 // 2 for i0, i1, i2, i3 in T.grid(1, 3, 80, 80): with T.block("A"): n, c, vi, vj = T.axis.remap("SSSS", [i0, i1, i2, i3]) T.reads(A[n, c, vi // 4 + vj // 4, vj // 2]) T.writes(B[n, c, vi, vj]) B[n, c, vi, vj] = A[n, c, index_var_0[vi, vj], index_var_1[vj]]
- reindex(block: BlockRV | str, buffer: Tuple[str, int] | str | Buffer) BlockRV
创建一个 block,它通过重新索引将 buffer 区域读取/写入到读/写缓存中。缓存的布局将与读取/写入 buffer 的 block 的迭代器相同。它需要:1) 只有一个 block 读取/写入目标 buffer 2) 在 block 中只有一个 buffer 加载/存储此 buffer
- 参数:
block (Union[BlockRV, str]) – 访问目标 buffer 的 block。如果为字符串,则必须唯一标识一个 block。
buffer (Union[Tuple[str,int], Buffer, str]) –
要转换的 buffer,或如何标识要转换的 buffer 的规范。
如果 buffer 是
(str,int)
的元组,则第一个项应为 “read” 或 “write”,第二个项是 block 的读取或写入区域的索引。如果 buffer 是字符串,则它是 buffer 的名称,该名称必须存在于 block 的读取/写入中。此外,block 的读取/写入可能不包含多个具有此名称的 buffer。
如果 buffer 是 Buffer 对象,则它必须存在于 block 的读取/写入中。
- 返回:
reindex_block – reindex 阶段的 block
- 返回类型:
示例
在 reindex 之前,在 TensorIR 中,IR 是
@T.prim_func def before_reindex( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32") ) -> None: for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vj, vi] * 2.0
创建 schedule 并执行 reindex
sch = tir.Schedule(before_reindex) block = sch.get_block("B") sch.reindex(block, ("read", 0))
应用 reindex 之后,IR 变为
@T.prim_func def after_reindex( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32") ) -> None: A_reindex = T.alloc_buffer((128, 128), "float32") for i, j in T.grid(128, 128): with T.block("A_reindex"): vi, vj = T.axis.remap("SS", [i, j]) A_reindex[vi, vj] = A[vj, vi] for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A_reindex[vi, vj] * 2.0
- compute_at(block: BlockRV | str, loop: LoopRV, preserve_unit_loops: bool = False, index: int = 0) None
Compute-At。将 producer block 移动到特定循环下,并重新生成由 block 引起的循环,以便 producer block 生成的 buffer 区域可以覆盖给定循环下其 consumer block 消耗的区域。它需要
block 和 loop 位于同一 scope 下,loop 不是 block 的祖先
Scope block 具有 stage-pipeline 属性
3) scope block 的子树(给定 block 所在的子树)满足紧凑数据流条件。即,scope block 子树中的所有 block 必须是 complete block 或 reduction block
4) block 不是相对于 scope block 的输出 block,即 block 写入的 buffer 在 scope block 下分配
block 的所有 consumer 都位于给定循环下
- 参数:
示例
在 compute-at 之前,在 TensorIR 中,IR 是
@T.prim_func def before_compute_at(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128), "float32") B = T.alloc_buffer((128, 128), "float32") C = T.match_buffer(c, (128, 128), "float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行 compute-at
sch = tir.Schedule(before_compute_at) block = sch.get_block("B") loop, _ = sch.get_loops(sch.get_block("C")) sch.compute_at(block, loop, preserve_unit_loops=False) print(sch.mod["main"].script())
应用 compute-at 后,IR 变为
@T.prim_func def after_compute_at(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128), "float32") B = T.alloc_buffer((128, 128), "float32") C = T.match_buffer(c, (128, 128), "float32") for i in T.serial(0, 128): for j in T.serial(0, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for j in T.serial(0, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
- reverse_compute_at(block: BlockRV | str, loop: LoopRV, preserve_unit_loops: bool = False, index: int = -1) None
反向 Compute-At。将消费者块移动到特定循环下,并重新生成由该块引起的循环,以便消费者块消耗的缓冲区区域可以覆盖给定循环下其生产者块产生的区域。它要求:
block 和 loop 位于同一 scope 下,loop 不是 block 的祖先
Scope block 具有 stage-pipeline 属性
3) scope block 的子树(给定 block 所在的子树)满足紧凑数据流条件。即,scope block 子树中的所有 block 必须是 complete block 或 reduction block
该块的所有生产者都在给定的循环下。
- 参数:
示例
在应用反向 compute-at 之前,在 TensorIR 中,IR 是
@T.prim_func def before_reverse_compute_at(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128), "float32") B = T.alloc_buffer((128, 128), "float32") C = T.match_buffer(c, (128, 128), "float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行反向 compute-at
sch = tir.Schedule(before_reverse_compute_at) block = sch.get_block("C") loop, _ = sch.get_loops(sch.get_block("B")) sch.reverse_compute_at(block, loop, preserve_unit_loops=False) print(sch.mod["main"].script())
应用反向 compute-at 后,IR 变为
@T.prim_func def after_reverse_compute_at(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128), "float32") B = T.alloc_buffer((128, 128), "float32") C = T.match_buffer(c, (128, 128), "float32") for i in T.serial(0, 128): for j in T.serial(0, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for j in T.serial(0, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
- compute_inline(block: BlockRV | str) None
将一个块内联到其消费者中。它要求:
该块是一个完整的非根块,只产生一个缓冲区。
该块不能是作用域中唯一的叶子节点。
块的主体必须是 BufferStore 语句,形式为
A[i, j, k, ...] = ...
,其中 LHS 的索引都是不同的原子变量,并且语句中不允许出现索引变量以外的变量。
示例
在应用 compute-inline 之前,在 TensorIR 中,IR 是
@T.prim_func def before_inline(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.alloc_buffer((128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行 compute-inline
sch = tir.Schedule(before_inline) sch.compute_inline(sch.get_block("B")) print(sch.mod["main"].script())
应用 compute-inline 后,IR 变为
@T.prim_func def after_inline(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = A[vi, vj] * 2.0 + 1.0
- reverse_compute_inline(block: BlockRV | str) None
将一个块内联到其唯一的生产者中。它要求:
该块是一个完整的非根块,只产生和消耗一个缓冲区。
该块不能是作用域中唯一的叶子节点。
该块的唯一生产者是一个写后读生产者和一个完整的非根块。
块的主体必须是 BufferStore 语句,形式为
B[f(i, j, k, ...)] = g(i, j, k, A[i, j, k, ...] ...)
,其中 RHS 上每个 BufferLoad 的索引都是不同的原子变量,并且语句中不允许出现索引变量以外的变量。
示例
在应用反向 compute-inline 之前,在 TensorIR 中,IR 是
@T.prim_func def before_inline(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.alloc_buffer((128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行反向 compute-inline
sch = tir.Schedule(before_inline) sch.reverse_compute_inline(sch.get_block("C")) print(sch.mod["main"].script())
应用反向 compute-inline 后,IR 变为
@T.prim_func def after_inline(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = A[vi, vj] * 2.0 + 1.0
- decompose_reduction(block: BlockRV | str, loop: LoopRV) BlockRV
将一个规约块分解为两个独立的块。
初始化块,从规约块的初始化语句翻译而来;
更新块,是原始块,但不包含初始化语句。
初始化块插入在给定循环之前。
该调度原语要求:
输入块是一个规约块。
输入循环是该块的祖先循环。
输入循环不低于所有与规约块变量相关的循环。
- 参数:
- 返回:
init_block – 初始化块
- 返回类型:
示例
在应用 decompose-reduction 之前,在 TensorIR 中,IR 是
@T.prim_func def before_decompose(a: ty.handle, c: ty.handle) -> None: A = tir.match_buffer(a, [128, 128]) B = tir.match_buffer(b, [128, 128]) C = tir.match_buffer(c, [128, 128]) for i, j, k in tir.grid(128, 128, 128): with tir.block([128, 128, tir.reduce_axis(0, 128)], "C") as [vi, vj, vk]: with tir.init(): C[vi, vj] = 0.0 C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vj, vk]
创建调度并使用指定的循环执行 decompose-reduction
sch = tir.Schedule(before_decompose) C = sch.get_block("C") i, j, k = sch.get_loops(C) sch.decompose_reduction(C, i) print(sch.mod["main"].script())
应用 decompose-reduction 后,IR 变为
@T.prim_func def after_decompose(a: ty.handle, c: ty.handle) -> None: A = tir.match_buffer(a, [128, 128]) B = tir.match_buffer(b, [128, 128]) C = tir.match_buffer(c, [128, 128]) for i in tir.serial(128): for j in tir.serial(128): with tir.block([128, 128]) as [vi, vj]: C[vi, vj] = 0.0 for i, j, k in tir.grid(128, 128, 128): with tir.block([128, 128, tir.reduce_axis(0, 128)], "C") as [vi, vj, vk]: C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vj, vk]
- rfactor(loop: LoopRV, factor_axis: int) BlockRV
通过指定的循环分解一个结合性的规约块。
结合性规约不能直接并行化,因为它会导致累积期间潜在的竞争条件。或者,可以在循环上分解规约,步骤如下:- 步骤 1:将规约均匀切片成 n 个独立的块,其中 n 是循环范围;- 步骤 2:分别计算这些块,并将结果写入 n 个中间缓冲区;- 步骤 3:将 n 个独立的缓冲区累积到结果缓冲区中。请注意,上面的步骤 2 为并行化引入了机会。
RFactor 是一种调度原语,它实现上述转换:给定一个写入缓冲区 B 的块,它会分解一个范围为 n 的循环。
例如,下面的伪代码累积 B[i] = sum(A[i, : , : ])
for i in range(128): # loop i is a data parallel loop for j in range(128): # loop j is a reduction loop for k in range(128): # loop k is a reduction loop B[i] = B[i] + A[i, j, k]
假设 RFactor 应用于最内层循环 k 且 factor_axis = 1。然后 RFactor 会创建一个中间缓冲区和两个块。
1. 中间缓冲区,或“rf-buffer”是一个秩为 ndim(B) + 1,大小为 size(B) * n 的缓冲区,其形状从 shape(B) 扩展而来,方法是在 factor_axis 指定的位置添加一个大小为 n 的轴。例如,
shape(B) = [1, 2, 3], factor_axis = 0 => shape(B_rf) = [n, 1, 2, 3]
shape(B) = [1, 2, 3], factor_axis = 1 => shape(B_rf) = [1, n, 2, 3]
shape(B) = [1, 2, 3], factor_axis = 2 => shape(B_rf) = [1, 2, n, 3]
shape(B) = [1, 2, 3], factor_axis = 3 => shape(B_rf) = [1, 2, 3, n]
2. rfactor 块,或 “rf-block”,是一个写入 rf-buffer 的块,但不累积循环 k,即循环 k 从规约循环转换为数据并行循环。在我们的示例中,rf-block 是
B_rf = np.zeros((128, 128)) # the rf-buffer for k in range(128): # loop k is converted to a data parallel loop for i in range(128): # loop i is a data parallel loop (unchanged) for j in range(128): # loop j is a reduction loop (unchanged) B_rf[i, k] = B_rf[i, k] + A[i, j, k]
3. 回写块,或 wb-block,是一个将 rf-buffer 累积到结果缓冲区中的块。除了用于累积的循环 k 外,所有规约循环都被移除。在我们的示例中,wb-block 是
for i in range(128): # loop i is a data parallel loop (unchanged) # loop j is removed because it is a reduction loop for k in range(128): # loop k is a reduction loop (unchanged) B[i] = B[i] + B_rf[i, k]
- 参数:
- 返回:
rf_block – 计算每个切片的部分结果的块(即,如上图所示的第一个块)
- 返回类型:
示例
在应用 rfactor 之前,在 TensorIR 中,IR 是
@T.prim_func def before_rfactor(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128, 128)) B = T.match_buffer(b, (128,)) for ii, i, j in T.grid(128, 128, 128): with T.block("B"): vii, vi, vj = T.axis.remap("SRR", [ii, i, j]) with T.init(): B[vii] = 0.0 B[vii] = B[vii] + A[vii, vi, vj]
创建调度并执行 rfactor
sch = tir.Schedule(before_rfactor) _, _, k = sch.get_loops(sch.get_block("B")) sch.rfactor(k, 0) print(sch.mod["main"].script())
应用 rfactor 后,IR 变为
@T.prim_func def after_rfactor(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, [128, 128, 128]) B = T.match_buffer(b, [128]) B_rf = T.alloc_buffer([128, 128]) for i2, ii, i in T.grid(128, 128, 128): with T.block("B_rf"): vi2, vii, vi = T.axis.remap("SSR", [i2, ii, i]) with T.init(): B_rf[vi2, vii] = 0.0 B_rf[vi2, vii] = (B_rf[vi2, vii] + A[vii, vi, vi2]) for ii, i2 in T.grid(128, 128): with T.block("B"): vii, vi2 = T.axis.remap("SR", [ii, i2]) with T.init(): B[vii] = 0.0 B[vii] = B[vii] + B_rf[vi2, vii]
注意
Rfactor 要求:1) loop 只有一个子块,并且它是一个规约块;2) loop 是一个规约循环,即循环变量仅绑定到块绑定中的规约变量;3) loop 没有被并行化、向量化、展开或绑定到任何线程轴;4) loop 所在的块作用域是一个分阶段流水线;5) 规约块外部的最外层循环应将规约块作为其第一个子块;6) 最外层规约循环应只有一个子块;7) 在某些规约循环下,不应出现未绑定到块绑定中任何规约或数据并行变量的单元范围循环;8) 规约块应仅写入一个缓冲区,并且其 init 和 body 都是简单的 BufferStore,并且该模式被注册为结合性规约器。预定义的模式包括:加法、乘法、最小值和最大值;9) 块顶部的每个循环不能同时绑定到数据并行和规约块绑定;10) factor_axis 应在范围 [-ndim(B) - 1, ndim(B)] 内,其中 B 是规约块写入的缓冲区。负索引根据 numpy 约定进行归一化。
- storage_align(block: BlockRV | str, buffer_index: int, axis: int, factor: int, offset: int) None
为特定维度设置对齐要求,使得 stride[axis] == k * factor + offset,其中 k 为某个整数。这对于设置内存布局以获得更友好的内存访问模式很有用。例如,我们可以将对齐设置为 factor=2, offset=1,以避免 GPU 共享内存中较高维度上的线程访问发生 bank 冲突。
- 参数:
示例
在应用 storage_align 之前,在 TensorIR 中,IR 是
@T.prim_func def before_storage_align(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.alloc_buffer((128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行 storage_align
sch = tir.Schedule(before_storage_align) sch.storage_align(sch.get_block("B"), buffer_index=0, axis=0, factor=128, offset=1) print(sch.mod["main"].script())
应用 storage_align 后,IR 变为
@T.prim_func def after_storage_align(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.alloc_buffer((128, 128)) C = T.match_buffer(c, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): T.block_attr({"buffer_dim_align": [[[0, 128, 1]]]}) vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
在 lowering 过程之后,缓冲区 B 的 strides 将为 [129, 1]。
注意
Storage_align 要求缓冲区是通过 alloc_buffer 定义的中间缓冲区。
- set_scope(block: BlockRV | str, buffer_index: int | str | Buffer, storage_scope: str) None
设置缓冲区的存储作用域,其中缓冲区由块和写入索引指定。
- 参数:
示例
在应用 set_scope 之前,在 TensorIR 中,IR 是
@T.prim_func def before_set_scope( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer((128, 128), dtype="float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行 set_scope
sch = tir.Schedule(before_set_scope) sch.set_scope(sch.get_block("B"), buffer_index=0, storage_scope="shared") print(sch.mod["main"].script())
应用 set_scope 后,IR 变为
@T.prim_func def after_set_scope( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B_shared = T.alloc_buffer([128, 128], dtype="float32", scope="shared") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B_shared[vi, vj] = A[vi, vj] * T.float32(2) for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B_shared[vi, vj] + T.float32(1)
注意
set_scope 要求缓冲区是通过 alloc_buffer 定义的中间缓冲区。
- unsafe_set_dtype(block: BlockRV | str, buffer_index: int, dtype: str) None
设置缓冲区的数据类型,其中缓冲区由块和写入索引指定。
此调度原语是不安全的,并且可能由于类型转换而更改程序的正确性,请谨慎使用。
- 参数:
示例
在应用 unsafe_set_dtype 之前,在 TensorIR 中,IR 是
@T.prim_func def before_set_dtype( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer((128, 128), dtype="float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j] C[vi, vj] = B[vi, vj] + 1.0
创建调度并执行 unsafe_set_dtype
sch = tir.Schedule(before_set_dtype) sch.unsafe_set_dtype("B", buffer_index=0, dtype="float16") print(sch.mod["main"].script())
应用 set_dtype 后,IR 变为
@T.prim_func def after_set_dtype( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer((128, 128), dtype="float16") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = T.cast(A[vi, vj] * 2.0, "float16") for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j] C[vi, vj] = T.cast(B[vi, vj], "float32") + 1.0
注意
unsafe_set_dtype 要求缓冲区是通过 alloc_buffer 定义的中间缓冲区。
- blockize(target: LoopRV | List[BlockRV], preserve_unit_iters: bool = True) BlockRV
将多个块或以特定循环为根的子树转换为一个块。
- 参数:
- 返回:
result – 新块。
- 返回类型:
示例
在应用 blockize 之前,在 TensorIR 中,IR 是
@T.prim_func def before_blockize( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32") ) -> None: for i_0, j_0, i_1, j_1 in T.grid(8, 8, 16, 16): with T.block("B"): vi = T.axis.spatial(128, i_0 * 16 + i_1) vj = T.axis.spatial(128, j_0 * 16 + j_1) T.reads(A[vi, vj]) T.writes(B[vi, vj]) B[vi, vj] = A[vi, vj] * T.float32(2)
创建调度并执行 set_scope
sch = tir.Schedule(before_blockize) B = sch.get_block("B") _, _, i1, _ = sch.get_loops(B) sch.blockize(i1) print(sch.mod["main"].script())
应用 blockize 后,IR 变为
@T.prim_func def after_blockize( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32") )-> None: for i_0, j_0 in T.grid(8, 8): with T.block("B_o"): vio, vjo = T.axis.remap("SS", [i_0, j_0]) T.reads(A[vio * 16 : vio * 16 + 16, vjo * 16 : vjo * 16 + 16]) T.writes(B[vio * 16 : vio * 16 + 16, vjo * 16 : vjo * 16 + 16]) for i_1, j_1 in T.grid(16, 16): with T.block("B"): vi, vj = T.axis.remap("SS", [i_1, j_1]) T.reads(A[vio * 16 + vi, vjo * 16 + vj]) T.writes(B[vio * 16 + vi, vjo * 16 + vj]) B[vio * 16 + vi, vjo * 16 + vj] = A[vio * 16 + vi, vjo * 16 + vj] * T.float32(2)
注意
blockize 要求在给定的循环下恰好有一个块,并且该块的绑定可以被以给定循环开始的循环表示的子空间整除。
- tensorize(block_or_loop: BlockRV | LoopRV, tensor_intrin: str, preserve_unit_iters: bool = True) None
使用张量内在函数张量化循环封闭的计算。
- 参数:
示例
在应用 tensorize 之前,在 TensorIR 中,IR 是
@T.prim_func def before_tensorize( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32"), ) -> None: # body # with T.block("root") for i_0, j_0, k_0, i_1, j_1, k_1 in T.grid(8, 8, 8, 16, 16, 16): with T.block("update"): vi = T.axis.spatial(128, i_0 * 16 + i_1) vj = T.axis.spatial(128, j_0 * 16 + j_1) vk = T.axis.reduce(128, k_0 * 16 + k_1) T.reads(C[vi, vj], A[vi, vk], B[vj, vk]) T.writes(C[vi, vj]) C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vj, vk]
声明并注册张量内在函数
@T.prim_func def mma_desc(a: T.handle, b: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (16, 16), align=128, offset_factor=1) B = T.match_buffer(b, (16, 16), align=128, offset_factor=1) C = T.match_buffer(c, (16, 16), align=128, offset_factor=1) with T.block("root"): T.reads(C[0 : 16, 0 : 16], A[0 : 16, 0 : 16], B[0 : 16, 0 : 16]) T.writes(C[0 : 16, 0 : 16]) for i, j, k in T.grid(16, 16, 16): with T.block("update"): vi, vj, vk = T.axis.remap("SSR", [i, j, k]) C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vj, vk] @T.prim_func def mma_intrin(a: T.handle, b: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (16, 16), align=128, offset_factor=1) B = T.match_buffer(b, (16, 16), align=128, offset_factor=1) C = T.match_buffer(c, (16, 16), align=128, offset_factor=1) with T.block("root"): T.reads(C[0 : 16, 0 : 16], A[0 : 16, 0 : 16], B[0 : 16, 0 : 16]) T.writes(C[0 : 16, 0 : 16]) T.evaluate( T.tvm_mma_sync( C.data, C.elem_offset // 256, A.data, A.elem_offset // 256, B.data, B.elem_offset // 256, C.data, C.elem_offset // 256, dtype="handle", ) ) tir.TensorIntrin.register("test_mma_intrin", mma_desc, mma_intrin)
创建调度并执行 tensorize
sch = tir.Schedule(before_tensorize) update = sch.get_block("update") _, _, _, i1, _, _ = sch.get_loops(update) sch.tensorize(i1, "test_mma_intrin") print(sch.mod["main"].script())
应用 tensorize 后,IR 变为
@T.prim_func def after_tensorize( A: T.Buffer((128, 128), "float32"), B: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32"), ) -> None: # body # with T.block("root") for i_0, j_0, k_0 in T.grid(8, 8, 8): with T.block("update_o"): vio, vjo, vko = T.axis.remap("SSR", [i_0, j_0, k_0]) T.reads( C[vio * 16 : vio * 16 + 16, vjo * 16 : vjo * 16 + 16], A[vio * 16 : vio * 16 + 16, vko * 16 : vko * 16 + 16], B[vjo * 16 : vjo * 16 + 16, vko * 16 : vko * 16 + 16], ) T.writes(C[vio * 16 : vio * 16 + 16, vjo * 16 : vjo * 16 + 16]) A_1 = T.match_buffer( A[vio * 16 : vio * 16 + 16, vko * 16 : vko * 16 + 16], [16, 16], dtype="float32", offset_factor=1, ) B_1 = T.match_buffer( B[vjo * 16 : vjo * 16 + 16, vko * 16 : vko * 16 + 16], [16, 16], dtype="float32", offset_factor=1, ) C_1 = T.match_buffer( C[vio * 16 : vio * 16 + 16, vjo * 16 : vjo * 16 + 16], [16, 16], dtype="float32", offset_factor=1, ) T.evaluate( T.tvm_mma_sync( C_1.data, C_1.elem_offset // 256, A_1.data, A_1.elem_offset // 256, B_1.data, B_1.elem_offset // 256, C_1.data, C_1.elem_offset // 256, dtype="handle", ) )
- annotate(block_or_loop: BlockRV | LoopRV, ann_key: str, ann_val: str | int | float | PrimExpr | List[str | int | float | PrimExpr] | Dict[str, str | int | float | PrimExpr | List[str | int | float | PrimExpr]]) None
使用键值对注释块/循环。
- 参数:
示例
在应用 annotate 之前,在 TensorIR 中,IR 是
@T.prim_func def before_annotate(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建调度并执行 annotate
sch = tir.Schedule(before_annotate) sch.annotate(sch.get_block("B"), "ann_key", "ann_value") print(sch.mod["main"].script())
应用 annotate 后,IR 变为
@T.prim_func def after_annotate(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) T.block_attr({"ann_key", "ann_value"}) B[vi, vj] = A[vi, vj] * 2.0
- unannotate(block_or_loop: BlockRV | LoopRV, ann_key: str) None
移除块/循环中键为 ann_key 的注释。
示例
在应用 unannotate 之前,在 TensorIR 中,IR 是
@T.prim_func def before_unannotate(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) T.block_attr({"ann_key", "ann_value"}) B[vi, vj] = A[vi, vj] * 2.0
创建调度并执行 annotate
sch = tir.Schedule(before_unannotate) sch.unannotate(sch.get_block("B"), "ann_key") print(sch.mod["main"].script())
应用 unannotate 后,IR 变为
@T.prim_func def after_unannotate(a: T.handle, b: T.handle) -> None: A = T.match_buffer(a, (128, 128)) B = T.match_buffer(b, (128, 128)) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
- transform_layout(block: BlockRV | str, buffer: Tuple[str, int] | str | Buffer, index_map: IndexMap | Callable, pad_value: int | float | PrimExpr | IndexMap | Callable | None = None, *, assume_injective_transform: bool = False) None
将 IndexMap 表示的转换应用于缓冲区。
- 参数:
block (Union[BlockRV, str]) – 访问目标 buffer 的 block。如果为字符串,则必须唯一标识一个 block。
buffer (Union[Tuple[str,int], Buffer, str]) –
要转换的 buffer,或如何标识要转换的 buffer 的规范。
如果 buffer 是
(str,int)
的元组,则第一个项应为 “read” 或 “write”,第二个项是 block 的读取或写入区域的索引。如果 buffer 是字符串,则它是 buffer 的名称,该名称必须存在于 block 的读取/写入中。此外,block 的读取/写入可能不包含多个具有此名称的 buffer。
如果 buffer 是 Buffer 对象,则它必须存在于 block 的读取/写入中。
index_map (Union[IndexMap, Callable]) –
要应用的转换。
如果 index_map 是可调用对象,并且返回的列表包含 IndexMap.AXIS_SEPARATOR,则除了 TransformLayout 原语之外,还将调用 SetAxisSeparators 原语。
pad_value (可选[Union[int, float, PrimExpr, IndexMap, Callable]]) –
用于转换引入的任何填充的值。如果 schedule 包含指定缓冲区的生产者块,则 pad 值将尽可能作为生产者块的一部分写入,否则在生产者块之后写入。否则,如果缓冲区是输入,将插入一个注解块来说明填充包含已知值。
pad 值可能不包含 BufferLoad 的实例,除非它从正在转换的缓冲区加载值(例如,创建填充由重复元素组成的循环缓冲区)。
注意:如果应用于输入缓冲区,则调用范围负责确保 pad_value 存在。代数简化、分支消除和其他优化可能会假设满足此前提条件,并可能导致返回不正确的结果。
如果为 None,则转换可能不会引入填充。
如果为 int、float 或 PrimExpr,则转换是填充中存在的特定值。
如果为 IndexMap 或 Callable,则转换是填充中存在的、以转换后的索引表示的值。
assume_injective_transform (bool) – 如果设置为 true,则 schedule 原语将假定 index_map 是单射的,并跳过检查映射索引的重叠。这对于分析未覆盖的复杂 index_map 可能很有用。调用者有责任确保索引映射是单射的,否则,schedule 的正确性无法保证。
示例
在 transform_layout 之前,在 TensorIR 中,IR 是
@T.prim_func def before_transform_layout(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128), "float32") B = T.alloc_buffer((128, 128), "float32") C = T.match_buffer(c, (128, 128), "float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建 schedule 并执行 transform_layout
sch = tir.Schedule(before_storage_align) sch.transform_layout(sch.get_block("B"), buffer=("write",0), index_map=lambda m, n: (m // 16, n // 16, m % 16, n % 16)) print(sch.mod["main"].script())
应用 transform_layout 后,IR 变为
@T.prim_func def two_elementwise_transformed_intermediate_buffer(a: T.handle, c: T.handle) -> None: A = T.match_buffer(a, (128, 128), "float32") B = T.alloc_buffer((8, 8, 16, 16), "float32") C = T.match_buffer(c, (128, 128), "float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi // 16, vj // 16, vi % 16, vj % 16] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi // 16, vj // 16, vi % 16, vj % 16] + 1.0
- transform_block_layout(block: BlockRV | str, index_map: IndexMap | Callable) None
应用 IndexMap 表示的转换到块
示例
在 transform_block_layout 之前,在 TensorIR 中,IR 是
@T.prim_func def before_transform_block_layout( A: T.Buffer((16, 16), "float32"), B: T.Buffer((16, 16), "float32") ) -> None: for i, j in T.grid(16, 16): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0
创建 schedule 并执行 transform_block_layout
sch = tir.Schedule(before_transform_block_layout) sch.transform_block_layout(sch.get_block("B"), lambda i, j: (i * 16 + j,)) print(sch.mod["main"].script())
应用 transform_block_layout 后,IR 变为
@T.prim_func def after_transform_block_layout( A: T.Buffer((16, 16), "float32"), B: T.Buffer((16, 16), "float32") ) -> None: for i in range(256): with T.block("B"): vi, = T.axis.remap("S", [i]) B[vi // 16, vi % 16] = A[vi // 16, vi % 16] * 2.0
- set_axis_separator(block: BlockRV | str, buffer: Tuple[str, int] | str | Buffer, axis_separators: List[int] | None) None
设置缓冲区的轴分隔符,其中缓冲区由块和读取或写入索引指定。
- 参数:
block (Union[BlockRV, str]) – 访问目标 buffer 的 block。如果为字符串,则必须唯一标识一个 block。
buffer (Union[Tuple[str,int], Buffer, str]) –
要转换的 buffer,或如何标识要转换的 buffer 的规范。
如果 buffer 是
(str,int)
的元组,则第一个项应为 “read” 或 “write”,第二个项是 block 的读取或写入区域的索引。如果 buffer 是字符串,则它是 buffer 的名称,该名称必须存在于 block 的读取/写入中。此外,block 的读取/写入可能不包含多个具有此名称的 buffer。
如果 buffer 是 Buffer 对象,则它必须存在于 block 的读取/写入中。
axis_separators (可选[List[int]]) – 轴分隔符。
示例
在 set_axis_separator 之前,在 TensorIR 中,IR 是
@T.prim_func def before_set_axis_separator( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer((128, 128), dtype="float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建 schedule 并执行 set_axis_separator
sch = tir.Schedule(before_set_axis_separator) sch.set_axis_separators(sch.get_block("B"), buffer=("write", 0), axis_separators=[1]) print(sch.mod["main"].script())
应用 set_axis_separator 后,IR 变为
@T.prim_func def after_set_axis_separators( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer([128, 128], dtype="float32", axis_separators=[1]) for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * T.float32(2) for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + T.float32(1)
- decompose_padding(block: BlockRV | str, loop: LoopRV) BlockRV
将填充计算模式的块分解为两个单独的块。
将常量填充值填充到完整写入区域的块;
将界内值填充到填充谓词为 true 的区域的块。
填充值填充块插入在给定循环的正前方。
该调度原语要求:
输入块是一个完整的块。
输入循环是该块的祖先循环。
输入块是一个匹配填充模式的块。
- 参数:
- 返回:
pad_value_block – 填充常量填充值的块。
- 返回类型:
示例
在 decompose-padding 之前,在 TensorIR 中,IR 是
@T.prim_func def before_decompose(x: T.Buffer(128, "int32"), y: T.Buffer(140, "int32")): for i in range(140): with T.block("block"): vi = T.axis.remap("S", [i]) y[vi] = T.if_then_else(vi >= 6 and vi < 134, x[vi - 6], 0, dtype="int32")
创建 schedule 并使用指定的循环执行 decompose-padding
sch = tir.Schedule(before_decompose, debug_mask="all") block = sch.get_block("block") sch.decompose_padding(block, sch.get_loops(block)[0]) print(sch.mod["main].script())
应用 decompose-padding 后,IR 变为
@T.prim_func def after_decompose(x: T.Buffer(128, "int32"), y: T.Buffer(140, "int32")): for i in T.serial(140): with T.block("block_pad_const"): vi = T.axis.spatial(140, i) y[vi] = 0 for i in T.serial(128): with T.block("block"): vi = T.axis.spatial(128, i) y[vi + 6] = x[vi]
- pad_einsum(block: BlockRV | str, padding: List[int]) None
填充 Einsum 的计算。
在具有简单绑定的块上,此原语通过给定的填充因子来填充块的迭代域,例如,当填充因子为 16 时,127 -> 128,132 -> 144。将生成额外的生产者和消费者填充块,以避免越界缓冲区访问。
Einsum 模式意味着缓冲区访问上的所有索引要么是常量(例如 B[0]),要么是变量(例如 B[i]),而不是复合表达式(例如 B[i + 1])。
示例
在应用 pad-einsum 之前,在 TensorIR 中,IR 是
@T.prim_func def before_pad_einsum( A: T.Buffer((127, 127), "float32"), B: T.Buffer((127, 127), "float32"), C: T.Buffer((127, 127), "float32"), ) -> None: for i0, i1, i2 in T.grid(127, 127, 127): with T.block("C_shared"): i, j, k = T.axis.remap("SSR", [i0, i1, i2]) with T.init(): C[i, j] = T.float32(0) C[i, j] = C[i, j] + A[i, k] * B[k, j]
创建 schedule 并使用指定的块执行 pad-einsum
sch = tir.Schedule(before_pad_einsum, debug_mask="all") block = sch.get_block("C_shared") sch.pad_einsum(block, [32, 32, 32]) print(sch.mod["main"].script())
应用 decompose-padding 后,IR 变为
@T.prim_func def main( A: T.Buffer((127, 127), "float32"), B: T.Buffer((127, 127), "float32"), C: T.Buffer((127, 127), "float32"), ): # with T.block("root"): A_pad = T.alloc_buffer((128, 128)) B_pad = T.alloc_buffer((128, 128)) C_pad = T.alloc_buffer((128, 128)) for i0, i1 in T.grid(128, 128): with T.block("A_pad"): v0, v1 = T.axis.remap("SS", [i0, i1]) A_pad[v0, v1] = T.if_then_else( v0 < 127 and v1 < 127, A[v0, v1], T.float32(0), ) for i0, i1 in T.grid(128, 128): with T.block("B_pad"): v0, v1 = T.axis.remap("SS", [i0, i1]) B_pad[v0, v1] = T.if_then_else( v0 < 127 and v1 < 127, B[v0, v1], T.float32(0), ) for i0, i1, i2 in T.grid(128, 128, 128): with T.block("C_shared"): i, j, k = T.axis.remap("SSR", [i0, i1, i2]) with T.init(): C_pad[i, j] = T.float32(0) C_pad[i, j] = C_pad[i, j] + A_pad[i, k] * B_pad[k, j] for i0, i1 in T.grid(127, 127): with T.block("C_pad"): v0, v1 = T.axis.remap("SS", [i0, i1]) C[v0, v1] = C_pad[v0, v1]
- rolling_buffer(block: BlockRV | str, write_buffer_index: int) None
通过滚动缓冲计算目标缓冲区,选择在块的祖先循环中出现的具有正边界重叠的最外层可滚动轴作为 rolling axis,沿滚动维度折叠和循环化缓冲区,附加块谓词以避免重新计算重叠元素。它需要
该块不是输出块,并且只有 RAW 依赖项。
要成为通过 alloc_buffer 定义的中间缓冲区。
3) 缓冲区的生产者和消费者的 LCA 是 for 循环,通常,缓冲区的生产者和消费者通过 compute_at 级联。
4) 缓冲区的访问区域至少有一个维度包含正边界重叠。
示例
在 rolling_buffer 之前,在 TensorIR 中,IR 是
@T.prim_func def before_rolling_buffer( A: T.Buffer((12, 12), "int8"), C: T.Buffer((8, 8), "int8") ) -> None: # body # with T.block("root") B = T.alloc_buffer([10, 10], dtype="int8") for i0, i1 in T.grid(2, 2): for ax0, ax1, ax2, ax3 in T.grid(6, 6, 3, 3): with T.block("B"): ax0_1 = T.axis.spatial(10, i0 * 4 + ax0) ax1_1 = T.axis.spatial(10, i1 * 4 + ax1) rv0, rv1 = T.axis.remap("RR", [ax2, ax3]) B[ax0_1, ax1_1] = T.max( B[ax0_1, ax1_1], A[ax0_1 + rv0, ax1_1 + rv1] ) for ax0, ax1, ax2, ax3 in T.grid(4, 4, 3, 3): with T.block("C"): ax0_1 = T.axis.spatial(8, i0 * 4 + ax0) ax1_1 = T.axis.spatial(8, i1 * 4 + ax1) rv0, rv1 = T.axis.remap("RR", [ax2, ax3]) C[ax0_1, ax1_1] = T.max( C[ax0_1, ax1_1], B[ax0_1 + rv0, ax1_1 + rv1] )
创建 schedule 并执行 rolling_buffer
sch = tir.Schedule(before_rolling_buffer) sch.rolling_buffer(sch.get_block("B"), write_buffer_index=0) print(sch.mod["main"].script())
应用 rolling_buffer 后,IR 变为
@T.prim_func def after_rolling_buffer( A: T.Buffer((12, 12), "int8"), C: T.Buffer((8, 8), "int8") ) -> None: # body # with T.block("root") B = T.alloc_buffer([6, 10], dtype="int8") for i0, i1 in T.grid(2, 2): for ax0, ax1, ax2, ax3 in T.grid(6, 6, 3, 3): with T.block("B"): T.where((i0 < 1 or 2 <= ax0) and (i1 < 1 or 2 <= ax1)) ax0_1 = T.axis.spatial(10, i0 * 4 + ax0) ax1_1 = T.axis.spatial(10, i1 * 4 + ax1) rv0, rv1 = T.axis.remap("RR", [ax2, ax3]) B[ax0_1 % 6, ax1_1] = T.max( B[ax0_1 % 6, ax1_1], A[ax0_1 + rv0, ax1_1 + rv1] ) for ax0, ax1, ax2, ax3 in T.grid(4, 4, 3, 3): with T.block("C"): ax0_1 = T.axis.spatial(8, i0 * 4 + ax0) ax1_1 = T.axis.spatial(8, i1 * 4 + ax1) rv0, rv1 = T.axis.remap("RR", [ax2, ax3]) C[ax0_1, ax1_1] = T.max( C[ax0_1, ax1_1], B[ax0_1 % 6 + rv0, ax1_1 + rv1] )
注意
目标缓冲区的消费者块的 region_cover 属性将变为 false。
- unsafe_hide_buffer_access(block: BlockRV, buf_type: str, buf_index_array: List[int]) None
隐藏给定块中的某些缓冲区访问。这是一个不安全的 schedule 原语。
- 参数:
注意
此 schedule 原语是不安全的,并且可能导致依赖性分析失败。unsafe_hide_buffer_access 的一个用例是隐藏对索引缓冲区的缓冲区访问(例如,在稀疏计算中),以便我们可以进一步张量化该块(出现在读/写区域中的索引缓冲区可能会使 tensorize 原语中的模式匹配失败,并且隐藏对这些缓冲区的访问可以解决该问题)。
- annotate_buffer_access(block: BlockRV, buffer_index: int, buf_type: str, gen_new_ranges: Callable) None
注解块的读取或写入区域
- 参数:
示例
为一个缓冲区注解 2D 读取区域。在 annotate_buffer_access 之前,在 TensorIR 中,IR 是
@T.prim_func def before_annotate_buffer_access( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer((128, 128), "float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
创建 schedule 并执行 annotate_buffer_access
sch = tir.Schedule(before_annotate_buffer_access) block = sch.get_block("B") sch.annotate_buffer_access(block, 0, "read", lambda vi, vj: ((vi - 1, vi + 1), (vj - 1, vj + 1))) print(sch.mod["main"].script())
应用 annotate_buffer_access 后,IR 变为
@T.prim_func def after_annotate_buffer_access( A: T.Buffer((128, 128), "float32"), C: T.Buffer((128, 128), "float32") ) -> None: B = T.alloc_buffer((128, 128), "float32") for i, j in T.grid(128, 128): with T.block("B"): vi, vj = T.axis.remap("SS", [i, j]) T.reads(A[vi - 1:vi + 1, vj - 1:vj + 1]) T.writes(B[vi, vj]) T.block_attr({"explicit_read_region": 0}) B[vi, vj] = A[vi, vj] * 2.0 for i, j in T.grid(128, 128): with T.block("C"): vi, vj = T.axis.remap("SS", [i, j]) C[vi, vj] = B[vi, vj] + 1.0
这会将块 “B” 中缓冲区 A(索引 0)的读取区域注解为 [vi-1:vi+1, vj-1:vj+1],对于块迭代域中的每个 (vi, vj)。
注意
此函数允许手动指定读取或写入区域,这在编译器无法准确推断访问模式的情况下很有用,例如复杂的数据相关访问。它覆盖了指定缓冲区的自动推断区域。该函数向块添加一个注解,指示已为给定索引处的缓冲区提供了显式区域。此注解在 CompactBufferAllocation pass 中使用,以尊重手动指定的区域,而不是依赖自动推断。
使用此功能时应谨慎,因为不正确的注解可能会导致不正确的代码生成或运行时错误。务必确保指定的区域覆盖块对给定缓冲区执行的所有实际读取或写入操作。
- exception tvm.tir.schedule.ScheduleError
在 TensorIR scheduling 期间发生的错误。
- class tvm.tir.schedule.ScheduleDebugMask(value)
debug_mask 标志在 ScheduleState 类中的位掩码。
如果 debug_mask 标志的某个位为 on,则将执行相应的验证 pass。例如,如果 (debug_mask & VERIFY_SREF_TREE) != 0,则将在每个 schedule 指令之后验证 sref 树的正确性。
- VERIFY_SREF_TREE
验证 sref 树的正确性
- 类型:
int = 1
- VERIFY_CACHED_FLAGS
验证 affine_binding、region_cover 和 stage_pipeline 的正确性
- 类型:
int = 2
- class tvm.tir.schedule.ScheduleState(mod: PrimFunc | IRModule, *, debug_mask: str | int = 'none', enable_check: bool = True)
scheduling 的状态,它公开了一个 Replace 方法,作为所有 scheduling 原语操作 TensorIR 的主要手段。
数据结构包含以下信息 1) 正在 scheduling 的 AST (mod) 2) 可 scheduling 语句的 sref 树(由 srefs 指示)3) 每个块范围的依赖性信息 (block_info) 4) 从 AST 节点到 sref 树中节点的反向映射 (get_sref) 5) 调试标志,如果设置,则启用额外的检查 (debug_mask) 6) 启用检查标志,如果为 False,则禁用某些先决条件检查。
- 参数:
- get_block_scope(block_sref: StmtSRef) BlockScope
获取与块 sref 对应的 BlockScope
- replace(src_sref: StmtSRef, tgt_stmt: Block | For | BlockRealize, block_sref_reuse: Dict[Block, Block] | None = None) None
将 AST 的一部分(由 src_sref 指向)替换为特定的语句 tgt_stmt,并相应地维护 sref 树。当 ScheduleState 仅持有对 IRModule 和 IR 节点的副本时,Replace 将尝试尽可能地执行写时复制。
仅允许 3 种类型的替换:从 src_sref->stmt 到 tgt_stmt。1) Block -> Block 2) Loop -> Loop 3) Loop -> BlockRealize
- 参数:
注意
循环 sref 的重用会根据循环变量的重用自动检测。
- class tvm.tir.schedule.Trace(insts: List[Instruction], decisions: Dict[Instruction, Any])
scheduling 程序的执行跟踪。
一个跟踪有两个部分:1) 到目前为止调用的指令 2) 对这些指令做出的随机决策(如果有)
跟踪可以序列化为:1) 可往返 JSON 格式:可以保存到文件并加载回来 2) Python 语法:允许用户复制粘贴跟踪以重现 scheduling 过程
通过重新应用其所有指令(可能根据其决策)可以将跟踪应用于 TensorIR schedule。如果抽样指令没有其相应的决策,则会调用重新抽样;否则,将相应地重用现有决策。
- insts
到目前为止在程序执行中调用的指令
- 类型:
List[Instruction]
- decisions
对这些指令做出的随机决策
- 类型:
Dict[Instruction, DECISION_TYPE]
- get_decision(inst: Instruction) Any | None
检索对特定指令做出的决策
- 参数:
insts (Instruction) – 要检索其决策的指令
- 返回:
decision – 相应的决策;如果指令上没有做出决策,则为 None
- 返回类型:
Optional[DECISION_TYPE]
- append(inst: Instruction, decision: Any | None = None) None
将新指令附加到跟踪
- 参数:
insts (Instruction) – 要附加的新指令
decision (Optional[DECISION_TYPE] = None) – 关于此指令做出的随机决策
- pop() Instruction | None
移除最后一个指令,以及在该指令上做出的任何决策(如果有)
- 返回:
popped_inst – 返回移除的指令;如果跟踪为空,则返回 NullOpt
- 返回类型:
- apply_to_schedule(sch: Schedule, remove_postproc: bool, decision_provider: Callable[[Instruction, List[Any], List[Any], Any], Any] | None = None) None
将跟踪应用于 TensorIR 调度
- as_json(remove_postproc: bool = False) Any
将跟踪序列化为 JSON 风格的对象
- 参数:
remove_postproc (bool = False) – 如果后处理指令被移除
- 返回:
json – JSON 风格的对象
- 返回类型:
JSON_TYPE
- as_python(remove_postproc: bool = False) List[str]
将跟踪序列化为一系列 python 语句
- 参数:
remove_postproc (bool = False) – 如果后处理指令被移除
- 返回:
py_stmts – 一系列 python 语句
- 返回类型:
List[str]
- with_decision(inst: Instruction, decision: Any, remove_postproc: bool) Trace
创建一个新跟踪,其中的指令的决策已更改,假设此指令存在于结果跟踪中
- 参数:
inst (Instruction) – 要更改其决策的指令
decision (DECISION_TYPE) – 要更改为的决策
remove_postproc (bool) – 如果后处理指令被移除
- 返回:
trace – 决策已更改的新跟踪
- 返回类型: