Blog/技术探讨

内核探究|让大表 Join 快 10 倍:YMatrix Runtime Filter 的实现与架构解析

2025-12-12 · 王勇
#技术探讨

前言

在 YMatrix Runtime Filter 的系列文章中,我们曾在第一篇《大表 Join 总卡壳?YMatrix Runtime Filter:给查询装个“智能预筛器”》介绍了 YMatrix 的 Runtime Filter 技术,该技术能够解决大表 Join 查询慢的问题,提升查询性能。

而此篇文章,将围绕 YMatrix 中 RuntimeFilter 的核心设计、关键实现及性能验证展开,揭示其如何为复杂查询带来质的飞跃。

作者|YMatrix 研发经理 王勇

01 概述

Join 是 SQL 查询的基本算子,也是执行代价较高的算子之一。以经常用于性能比较的 Benchmark TPC-H 来看,它的 22 条查询中有 20 条包含多表关联。Join 的主要代价体现在对大表逐条匹配。Runtime Filter 技术可以提前对大表进行过滤,如果最终的匹配结果也较小,则可以大幅缩减执行时间。

在我们的系统中(在实现了向量化和节点内并行的情况下),TPC-H 的第 17 条查询仍然可以得到 10 倍的性能提升。在 YMatrix 里实现 RuntimeFilter 需要关注如下问题:

RuntimeFilter 增加了过滤代价来节省匹配代价,其效果和过滤的速度以及匹配速度直接相关。在我们的实践中,由于向量化对于 Scan 有了很大的优化,需要在 BloomFilter 过滤时做较多优化,才能体现出 RuntimeFilter 的效果。

全局的 Runtime Filter(跨节点)是必要的。当 Scan 的速度足够快时(比如使用了向量化加速),仅在本地提前过滤产生的效果有限。全局 RF 能将所有节点的局部 BloomFilter 合在一起,大大提高了降低了扫描的通过率,减少了网络传输的数据量,从而得到更好的加速效果。

内存的随机访问是瓶颈。当使用多个函数在较大的内存范围内进行查找时,内存访问的代价变得突出,需要控制 BloomFilter 的大小和 hash 函数的个数。

本文以 BloomFilter 为例,介绍在 YMatrix 系统中如何实现 RuntimeFilter。具体要点包括:

  1. 改造式的生成计划。Runtime Filter 分为两部分:生成端和消费端,出于最大性能的考虑,消费端可能在一个较远的 Slice 中,因为改造式的计划更为灵活。

  2. interconnect 支持双向通信。YMatrix 仅支持单向数据通信,需要增加反向数据通道,使得 BloomFilter 可以逐级推送到消费端。

  3. 跨多个 Motion 的递归下推。本着尽早过滤的原则,Runtime Filter 需要穿透多层 Join,直至推送到最早具有过滤条件的 Scan 节点。

02 样例

对于 HashJoin,执行器会先扫描小表(inner table)建立 hashtable,然后遍历大表(outer table),计算 hash 值并根据 join 条件进行匹配。前者称为 build 阶段,后者称为 probe 阶段。

下面是一条根据 TPC-H Q17 简化的关联查询,part 是维表,lineitem 是事实表,它们通过外键 partkey 关联。过滤后的 inner table 有 100 条数据,而 key 的基数很高,假定是 2000000。因为两表都只需要保留 partkey 相同集合,因此关联谓词可以在 build 阶段转化成一个过滤条件,提前应用在 B 表的扫描阶段。这个用于过滤的集合是运行时才确定的,这也是 Runtime Filter 的名字由来。

SELECT 
    sum(l_extendedprice) / 7.0 AS avg_yearly 
FROM 
    :lineitem_tbl, 
    :part_tbl 
WHERE 
    p_partkey = l_partkey 
    AND p_brand = 'Brand#42'
    AND p_container = 'SM CAN'
    AND l_quantity < ( 
        SELECT 
            0.2::float8 * avg(l_quantity) 
        FROM 
            :lineitem_tbl 
        WHERE 
            l_partkey = p_partkey); 

按照现有的计划,执行器会先对 L(lineitem) 表按 partkey 聚集,然后和 P(part) 表关联,缩小范围后再和 L 表,对缩小后的订单条目按另一个字段聚集。由于 partkey 的 cardinality 高达 2000000,聚集的代价很高,总的执行时间为 22 s。显然 P 表上的条件能对 L 表的数据有很大的缩减。通过查询改写,如果将 P 表上的条件下推到子查询内,执行时间将缩减到 6s 多。如果应用 Runtime Filter 技术,则可以在不改变计划的情况下实现查询的加速。

下面是 TPC-H 100 倍数据集上的原型结果。这里实现了 Local/Global 两种形式的 filter,以 slice3 为例,原本需要扫描 2443 万行数据,经过 filter 下推后,实际 join 只需要处理 23693 行数据。在没有额外任何优化的情况下,查询时间缩短到 2.6s,性能提升 8 倍以上。

---------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=2401.348..2401.349 rows=1 loops=1)
   ->  Gather Motion 32:1  (slice1; segments: 32)  (actual time=2045.266..2401.308 rows=32 loops=1)
         ->  Partial Aggregate  (actual time=2396.007..2398.357 rows=1 loops=1)
               ->  Hash Join   (actual time=1019.164..2397.972 rows=2360 loops=1)
                     Hash Cond: (lineitem_mars2.l_partkey = part_mars2.p_partkey)
                     Join Filter: ((lineitem_mars2.l_quantity)::double precision < (('0.2'::double precision * (avg(lineitem_mars2_1.l_quantity))::double precision)))
                     Rows Removed by Join Filter: 23466
                     ->  Parallel Custom Scan (MxVAppend) (actual time=2.779..1362.451 rows=25532 loops=1)
                           ->  Parallel Custom Scan (MxVScan) on tpch2.lineitem_mars2_1_prt_4  (actual time=1.980..854.854 rows=11746 loops=1)
                           ......
                           ->  Parallel Custom Scan (MxVScan) on tpch2.lineitem_mars2_1_prt_others   (actual time=1.376..1.568 rows=0 loops=1)
                     ->  Hash  (actual time=1014.443..1027.434 rows=20428 loops=1)
                           Buckets: 262144  Batches: 1  Memory Usage: 3086kB
                           ->  Broadcast Motion 32:32  (slice2; segments: 32) (actual time=996.181..1012.355 rows=20428 loops=1)
                                 ->  Hash Join  (cost=1762757.68..3146080.83 rows=4153 width=16) (actual time=967.616..986.445 rows=699 loops=1)
                                       Hash Cond: (lineitem_mars2_1.l_partkey = part_mars2.p_partkey)
                                       ->  HashAggregate  (actual time=959.479..982.002 rows=815 loops=1)
                                             Group Key: lineitem_mars2_1.l_partkey
                                             ->  Custom Scan (MxVMotion) Redistribute Motion 32:32  (slice3; segments: 32)  (actual time=806.672..964.408 rows=20992 loops=1)
                                                   Hash Key: lineitem_mars2_1.l_partkey
                                                   ->  Parallel Custom Scan (MxVAppend)  (cost=0.00..767136.52 rows=24438080 width=6) (actual time=88.944..969.585 rows=23693 loops=1)
                                                         ->  Parallel Custom Scan (MxVScan) on tpch2.lineitem_mars2_1_prt_4 lineitem_mars2_1_prt_4_1   (actual time=2.882..572.608 rows=11731 loops=1)
                                                         ......
                                                         ->  Parallel Custom Scan (MxVScan) on tpch2.lineitem_mars2_1_prt_others lineitem_mars2_1_prt_others_1 (actual time=1.180..1.625 rows=0 loops=1)
                                       ->  Hash  (cost=2772.24..2772.24 rows=4153 width=4) (actual time=1.139..1.649 rows=699 loops=1)
                                             Buckets: 262144  Batches: 1  Memory Usage: 2073kB
                                             ->  Custom Scan (MxVMotion) Redistribute Motion 32:32  (slice4; segments: 32) (actual time=0.093..0.438 rows=699 loops=1)
                                                   Hash Key: part_mars2.p_partkey
                                                   ->  Parallel Custom Scan (MxVScan) on tpch2.part_mars2  (actual time=2.820..7.206 rows=2506 loops=1)
                                                         Filter: ((part_mars2.p_brand = 'Brand#42'::text) AND (part_mars2.p_container = 'SM CAN'::text))
                                                         Rows Removed by Filter: 99494

03 方案选型

决策时间的选择

由于 Join 的结果重度依赖于两表基数等统计信息的准确性,和选择比的准确估算,比较容易误判。尤其在多层 Join 的情况下,输入和输出的记录数都难以准确估计,容易导致 Runtime 的使用效果适得其反。根据 Runtime Filter 的决策时间,方案分为计划阶段和运行时两种。

计划阶段。生成执行物理计划的时候,可以通过规则或代价计算判别某个节点是否应该使用RF,并作为执行计划的一部分。需要 Runtime Filter 的节点在开始执行时就知道需要并获取 Runtime Filter,然后再执行 Scan。这种同步方式可以尽早利用 Runtime Filter,获得最大的收益,且和当前的火山模型可以无缝匹配,只需增加协议,无需改变通信模式。缺点包括:

需要改造生成计划,甚至参与 CBO 运算。

误判的可能性大,且误判的影响更大。误判的来源包括统计信息不准,缺少运行时信息等,而一旦误判会引起无谓的等待时间,额外的下推开销等。

完全运行时。仅在内表构建完成后,根据真实的信息来判定是否推送 Runtime Filter 给相应的 Scan 节点。接收端,异步接收并应用 Runtime Filter,无须等待。优点是对计划无侵入,按需推送。但这需要自行计算 filter 信息,比如有多少个发送端等。缺点是现有的火山模型和 interconnect 机制均缺少异步的下推通道和接入机制,不容易控制接收方的等待,需要解决逐级下推等问题。而异步通信机制对于 interconnect 协议侵入过大,改变火山模型代价很高,对系统稳定性影响很大。

鉴于这两者缺点都过于明显,本文采取将两者结合的第三种策略:计划阶段标记,动态取舍。 计划阶段通过计算代价,初步确定是否要采用 Runtime Filter。如果需要,则生成所需要的列名及一致的 filter 信息等。确切地,只在物理计划生成后,复用新增的向量化计划改造钩子,在相应的计划节点添加相应信息。 在生成 filter 过程中,源端如果判断启用 filter 会带来负作用,比如 filter 相比原来估计的要大小多,则可以发送“止损”消息。接收端得知 filter 无效,即不再等待和处理 Runtime Filter. 而 Scan 节点在运行阶段,也可以通过实际统计的选择比判断是否带来收益,否则就取消该过滤器。

计划生成

虽然把 Runtime Filter 纳入 CBO 生成物理计划的阶段能生成更优的计划,但带来较高的复杂度。如果代码估计有偏差,对整体计划的影响也较大。相关研究工作也表明,这种方式带来的计划改善相对有限。

本文采取直接改造最终计划的方式,在物理路径上判断是否采用 RuntimeFilter,然后在生成计划时加上相关信息。

04 实现

判断是否启用RF

可以基于规则和基于代价两种方式来判别启用 Runtime Filter 带来的好处。其中规则包括:

probe 端的数据量的大小。如果 probe 端的数据量过小,即便被过滤很多的数据,其性能提升也无法弥补 bloom filter 的额外开销。

bloom filter 的大小。bloom filter 的大小由输入的数量和 fpp(错误率)决定,并和输入的数量成正比。当 bloom filter 太大,不仅会增大网络传输的数据,也会增大内存占用,需要将其限制在一定范围内。

过滤比例。当生成的 bloom filter 的过滤比例太小时,将其下推到 Join 的 probe 端不仅不会起到任何的效果,而且会因为计算 bloom filter 增加开销。

代价模型的方法则是要和预期收益相比,当收益显著大于代价时才会生成 Runtime Filter hint。代价可以简要地表示为启用代价和运行代价,而启用代价主要是生成和传输代价。运行代价和原始数据量及过滤器类型有关。这里粗略列出估计模式。

C_rf = C_rf_startup + C_rf_Runtime
C_rf_startup = C_rf_build + C_rf_net
C_rf_rutime = C_rf_tuple * tuplenum
C_rf_build = C_rf_build[i] * tuplenum_left
C_rf_net = c_net_startup + C_rf_size * net_cost_unit

其中 c_net_startup 和网络类型(UDP/TCP)相关。C_rf_tuple 指 filter 单条记录的代价,和类型相关。例如,bloom_filter 的 hash 函数计算开销较大,且和 memory size 有关。具体参数可以简单度量,并结合 cpu/disk 的代价而确定,保持可比较性。 预期收益主要是减少后续 Join 或 agg 输出而节省的成本。因此是 HashCostEstimated|AggCostEstimate * tuplenum。而上述代价可以参考Hash/agg节点的代价。 实践表明,在 TPC-H 和 TPC-DS 中,规则的代价已经较好,基本没有“跷跷板效应”。

生成计划

经过物理优化后,在钩子中嵌入“路径修饰”的处理。它利用上述判别标准,对整体路径增加 Runtime Filter 的 Hint。经过逻辑优化后,物理阶段的路径已经生成了相对友好的条件,比如各种子连接转换成了等值连接。Join 表达式也通过等价类得到了较优的结果。 路径修饰从 Join 节点开始,通过分析 Join 条件得到是否需要提供哪些节点提供 Runtime Filter,而哪些节点需要接收的信息及下推等策略的信息。下文称修改过程为装饰器。

生成和接收节点

装饰器从计划的根节点开始遍历,遇到 HashJoin 节点就开始根据左、右节点的统计信息判断是否需要启用 Runtime Filter。对于右节点,要一直找到扫描节点并判断条件是否适用。由于一个 segment 可能起多个 parallel worker,因此 Runtime Filter 的接收以进程为单位。如果存在 Append 节点,则 Append 节点在执行时需要等待并接收 filter,子节点共享这一 filter。否则 就将 VScan(向量化 Scan) 作为接收节点。 如果存在多层 HashJoin,则上层 Join 生成的 filter 可以用于下层 Join 的左表或右表。在不穿透 motion 的情况下,需要识别下层 Join 是否适用Runtime Filter。

修饰信息

建议的 Runtime Filter 类型,及其参数,便于运行时给出生成更精准的 filter。以 Q17 为例,包括内表的预估大小,原表大小,是否为 MCV 值等,用以估算外表的筛选率。

filter 参数。分析 Join 条件,分析 Scan 时所需要的条件。

是否需要网络传输。如果跨不同 Slice 则需要网络传输。

预估的筛选率。当存在其它过滤条件时,用于确定是否优先执行 Runtime Filter。

下推协议

Runtime Filter 均由 HashJoin 节点发起,根据左表的数据生成 filter。根据是否要穿透 motion 分为 local/global 两种。 local Runtimer 指在一个进程内 filter,由生成节点发起,遍历执行树传递对接收节点。 Global Runtime Filter 由于跨了进程,需要通过网络发给接收端。由于计划期已经知道哪些节点需要 Runtime Filter,因此只需要增加“有限等待”的协议即可,不需要异步的双向通信和异步执行模型。Global Runtime Filter 需要网络层作如下增强。

针对 UDP 实现双向的通信,需要在 Transport 层增加 receiver->sender 的拆包组包协议,在 MotionLayer 层增加传输定制消息的能力。确保此类消息在正常的通信流程中可忽略。

UDP 连接信息尽早建立。当前 UDP 模式下第一个发送包充当了连接信息的沟通的作用,也就是仅当右表发送第一个 tuple 后,receive 才能发送消息。需要在 MotionSender 第一次被执行时即发送连接信息。而发送端要确认消息发送成功。

增加等待协议。接收者在开始操作时先等待 filter 的到达,如果超过“止损”时间,则直接进入到后续执行,放弃该 filter。

生命周期管理

RuntimeFilter 生成了额外的运行时信息,且有可能跨进程使用。它遵循“所有者管理“的模式:对于 local 模式,生成者是管理者,即 HashJoin节点负责生成和释放。对于 Global 模式,由接收者生成 filter 和释放,即 Append或Scan节点。

效果

启用 RuntimeFilter 后,TPC-H S100 的执行时间可以从 100s 降到80s,其中 Q17 可以提升 10 倍。