risingwave_frontend/optimizer/rule/
table_function_to_internal_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22use risingwave_expr::aggregate::AggType;
23pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
24
25use super::{ApplyResult, BoxedRule, FallibleRule};
26use crate::TableCatalog;
27use crate::catalog::catalog_service::CatalogReadGuard;
28use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
29use crate::optimizer::plan_node::generic::GenericPlanRef;
30use crate::optimizer::plan_node::{
31    LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
32};
33use crate::optimizer::{OptimizerContext, PlanRef};
34use crate::utils::{Condition, GroupBy};
35
36/// Transform the `internal_backfill_progress()` table function
37/// into a plan graph which will scan the state tables of backfill nodes.
38/// It will return the progress of the backfills, partitioned by the backfill node's fragment id.
39pub struct TableFunctionToInternalBackfillProgressRule {}
40impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
41    fn apply(&self, plan: PlanRef) -> ApplyResult {
42        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
43        if logical_table_function.table_function.function_type
44            != TableFunctionType::InternalBackfillProgress
45        {
46            return ApplyResult::NotApplicable;
47        }
48
49        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
50        // TODO(kwannoel): Make sure it reads from source tables as well.
51        let backfilling_tables = get_backfilling_tables(reader);
52        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
53        ApplyResult::Ok(plan)
54    }
55}
56
57impl TableFunctionToInternalBackfillProgressRule {
58    fn build_plan(
59        ctx: Rc<OptimizerContext>,
60        backfilling_tables: Vec<Arc<TableCatalog>>,
61    ) -> anyhow::Result<PlanRef> {
62        if backfilling_tables.is_empty() {
63            let fields = vec![
64                Field::new("job_id", DataType::Int32),
65                Field::new("fragment_id", DataType::Int32),
66                Field::new("backfill_state_table_id", DataType::Int32),
67                Field::new("current_row_count", DataType::Int64),
68                Field::new("min_epoch", DataType::Int64),
69            ];
70            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
71            return Ok(plan.into());
72        }
73
74        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
75        for table in backfilling_tables {
76            let backfill_info = BackfillInfo::new(&table)?;
77
78            let scan = Self::build_scan(ctx.clone(), table);
79            let agg = Self::build_agg(&backfill_info, scan)?;
80            let project = Self::build_project(&backfill_info, agg)?;
81
82            all_progress.push(project.into());
83        }
84        Ok(LogicalUnion::new(true, all_progress).into())
85    }
86
87    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
88        LogicalScan::create(
89            table.name.clone(),
90            table,
91            vec![],
92            ctx.clone(),
93            None,
94            Default::default(),
95        )
96    }
97
98    fn build_agg(backfill_info: &BackfillInfo, scan: LogicalScan) -> anyhow::Result<PlanRef> {
99        let epoch_expr = match backfill_info.epoch_column_index {
100            Some(epoch_column_index) => ExprImpl::InputRef(Box::new(InputRef {
101                index: epoch_column_index,
102                data_type: DataType::Int64,
103            })),
104            None => ExprImpl::Literal(Box::new(Literal::new(None, DataType::Int64))),
105        };
106        let aggregated_min_epoch = ExprImpl::AggCall(Box::new(AggCall::new(
107            AggType::Builtin(PbAggKind::Min),
108            vec![epoch_expr],
109            false,
110            OrderBy::any(),
111            Condition::true_cond(),
112            vec![],
113        )?));
114        let aggregated_current_row_count = ExprImpl::AggCall(Box::new(AggCall::new(
115            AggType::Builtin(PbAggKind::Sum),
116            vec![ExprImpl::InputRef(Box::new(InputRef {
117                index: backfill_info.row_count_column_index,
118                data_type: DataType::Int64,
119            }))],
120            false,
121            OrderBy::any(),
122            Condition::true_cond(),
123            vec![],
124        )?));
125        let select_exprs = vec![aggregated_current_row_count, aggregated_min_epoch];
126        let group_by = GroupBy::GroupKey(vec![]);
127        let (agg, _, _) = LogicalAgg::create(select_exprs, group_by, None, scan.into())?;
128        Ok(agg)
129    }
130
131    fn build_project(backfill_info: &BackfillInfo, agg: PlanRef) -> anyhow::Result<LogicalProject> {
132        let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
133        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
134        let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
135
136        let current_count_per_vnode = ExprImpl::InputRef(Box::new(InputRef {
137            index: 0,
138            data_type: DataType::Decimal,
139        }))
140        .cast_explicit(DataType::Int64)?;
141        let min_epoch = ExprImpl::InputRef(Box::new(InputRef {
142            index: 1,
143            data_type: DataType::Int64,
144        }));
145
146        Ok(LogicalProject::new(
147            agg,
148            vec![
149                job_id_expr,
150                fragment_id_expr,
151                table_id_expr,
152                current_count_per_vnode,
153                min_epoch,
154            ],
155        ))
156    }
157
158    fn build_u32_expr(id: u32) -> ExprImpl {
159        ExprImpl::Literal(Box::new(Literal::new(
160            Some(ScalarImpl::Int32(id as i32)),
161            DataType::Int32,
162        )))
163    }
164}
165
166fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
167    reader
168        .iter_backfilling_internal_tables()
169        .filter(|table| is_backfill_table(&table.name))
170        .cloned()
171        .collect_vec()
172}
173
174impl TableFunctionToInternalBackfillProgressRule {
175    pub fn create() -> BoxedRule {
176        Box::new(TableFunctionToInternalBackfillProgressRule {})
177    }
178}
179
180struct BackfillInfo {
181    job_id: u32,
182    fragment_id: u32,
183    table_id: u32,
184    row_count_column_index: usize,
185    epoch_column_index: Option<usize>,
186}
187
188impl BackfillInfo {
189    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
190        let Some(job_id) = table.job_id.map(|id| id.table_id) else {
191            bail!("`job_id` column not found in backfill table");
192        };
193        let Some(row_count_column_index) =
194            table.columns.iter().position(|c| c.name() == "row_count")
195        else {
196            bail!("`row_count` column not found in backfill table");
197        };
198        let epoch_column_index = table.columns.iter().position(|c| c.name() == "epoch");
199        let fragment_id = table.fragment_id;
200        let table_id = table.id.table_id;
201
202        Ok(Self {
203            job_id,
204            fragment_id,
205            table_id,
206            row_count_column_index,
207            epoch_column_index,
208        })
209    }
210}