risingwave_frontend/optimizer/rule/
table_function_to_internal_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22use risingwave_expr::aggregate::AggType;
23pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
24
25use super::prelude::{PlanRef, *};
26use crate::TableCatalog;
27use crate::catalog::catalog_service::CatalogReadGuard;
28use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
29use crate::optimizer::OptimizerContext;
30use crate::optimizer::plan_node::generic::GenericPlanRef;
31use crate::optimizer::plan_node::{
32    Logical, LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion,
33    LogicalValues, StreamTableScan,
34};
35use crate::optimizer::rule::{ApplyResult, FallibleRule};
36use crate::utils::{Condition, GroupBy};
37
38/// Transform the `internal_backfill_progress()` table function
39/// into a plan graph which will scan the state tables of backfill nodes.
40/// It will return the progress of the backfills, partitioned by the backfill node's fragment id.
41pub struct TableFunctionToInternalBackfillProgressRule {}
42impl FallibleRule<Logical> for TableFunctionToInternalBackfillProgressRule {
43    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
44        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
45        if logical_table_function.table_function.function_type
46            != TableFunctionType::InternalBackfillProgress
47        {
48            return ApplyResult::NotApplicable;
49        }
50
51        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
52        let backfilling_tables = get_backfilling_tables(reader);
53        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
54        ApplyResult::Ok(plan)
55    }
56}
57
58impl TableFunctionToInternalBackfillProgressRule {
59    fn build_plan(
60        ctx: Rc<OptimizerContext>,
61        backfilling_tables: Vec<Arc<TableCatalog>>,
62    ) -> anyhow::Result<PlanRef> {
63        if backfilling_tables.is_empty() {
64            let fields = vec![
65                Field::new("job_id", DataType::Int32),
66                Field::new("fragment_id", DataType::Int32),
67                Field::new("backfill_state_table_id", DataType::Int32),
68                Field::new("current_row_count", DataType::Int64),
69                Field::new("min_epoch", DataType::Int64),
70            ];
71            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
72            return Ok(plan.into());
73        }
74
75        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
76        for table in backfilling_tables {
77            let backfill_info = BackfillInfo::new(&table)?;
78
79            let scan = Self::build_scan(ctx.clone(), table);
80            let agg = Self::build_agg(&backfill_info, scan)?;
81            let project = Self::build_project(&backfill_info, agg)?;
82
83            all_progress.push(project.into());
84        }
85        Ok(LogicalUnion::new(true, all_progress).into())
86    }
87
88    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
89        LogicalScan::create(table, ctx.clone(), None)
90    }
91
92    fn build_agg(backfill_info: &BackfillInfo, scan: LogicalScan) -> anyhow::Result<PlanRef> {
93        let epoch_expr = match backfill_info.epoch_column_index {
94            Some(epoch_column_index) => ExprImpl::InputRef(Box::new(InputRef {
95                index: epoch_column_index,
96                data_type: DataType::Int64,
97            })),
98            None => ExprImpl::Literal(Box::new(Literal::new(None, DataType::Int64))),
99        };
100        let aggregated_min_epoch = ExprImpl::AggCall(Box::new(AggCall::new(
101            AggType::Builtin(PbAggKind::Min),
102            vec![epoch_expr],
103            false,
104            OrderBy::any(),
105            Condition::true_cond(),
106            vec![],
107        )?));
108        let aggregated_current_row_count = ExprImpl::AggCall(Box::new(AggCall::new(
109            AggType::Builtin(PbAggKind::Sum),
110            vec![ExprImpl::InputRef(Box::new(InputRef {
111                index: backfill_info.row_count_column_index,
112                data_type: DataType::Int64,
113            }))],
114            false,
115            OrderBy::any(),
116            Condition::true_cond(),
117            vec![],
118        )?));
119        let select_exprs = vec![aggregated_current_row_count, aggregated_min_epoch];
120        let group_by = GroupBy::GroupKey(vec![]);
121        let (agg, _, _) = LogicalAgg::create(select_exprs, group_by, None, scan.into())?;
122        Ok(agg)
123    }
124
125    fn build_project(backfill_info: &BackfillInfo, agg: PlanRef) -> anyhow::Result<LogicalProject> {
126        let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
127        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
128        let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
129
130        let current_count_per_vnode = ExprImpl::InputRef(Box::new(InputRef {
131            index: 0,
132            data_type: DataType::Decimal,
133        }))
134        .cast_explicit(&DataType::Int64)?;
135        let min_epoch = ExprImpl::InputRef(Box::new(InputRef {
136            index: 1,
137            data_type: DataType::Int64,
138        }));
139
140        Ok(LogicalProject::new(
141            agg,
142            vec![
143                job_id_expr,
144                fragment_id_expr,
145                table_id_expr,
146                current_count_per_vnode,
147                min_epoch,
148            ],
149        ))
150    }
151
152    fn build_u32_expr(id: u32) -> ExprImpl {
153        ExprImpl::Literal(Box::new(Literal::new(
154            Some(ScalarImpl::Int32(id as i32)),
155            DataType::Int32,
156        )))
157    }
158}
159
160fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
161    reader
162        .iter_backfilling_internal_tables()
163        .filter(|table| is_backfill_table(&table.name))
164        .cloned()
165        .collect_vec()
166}
167
168impl TableFunctionToInternalBackfillProgressRule {
169    pub fn create() -> BoxedRule {
170        Box::new(TableFunctionToInternalBackfillProgressRule {})
171    }
172}
173
174struct BackfillInfo {
175    job_id: u32,
176    fragment_id: u32,
177    table_id: u32,
178    row_count_column_index: usize,
179    epoch_column_index: Option<usize>,
180}
181
182impl BackfillInfo {
183    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
184        let Some(job_id) = table.job_id.map(|id| id.table_id) else {
185            bail!("`job_id` column not found in backfill table");
186        };
187        let Some(row_count_column_index) = table
188            .columns
189            .iter()
190            .position(|c| c.name() == StreamTableScan::ROW_COUNT_COLUMN_NAME)
191        else {
192            bail!(
193                "`{}` column not found in backfill table",
194                StreamTableScan::ROW_COUNT_COLUMN_NAME
195            );
196        };
197        let epoch_column_index = table
198            .columns
199            .iter()
200            .position(|c| c.name() == StreamTableScan::EPOCH_COLUMN_NAME);
201        let fragment_id = table.fragment_id;
202        let table_id = table.id.table_id;
203
204        Ok(Self {
205            job_id,
206            fragment_id,
207            table_id,
208            row_count_column_index,
209            epoch_column_index,
210        })
211    }
212}