risingwave_frontend/optimizer/rule/
table_function_to_internal_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_backfill_table};
21use risingwave_common::id::{FragmentId, JobId, TableId};
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_expr::aggregate::AggType;
24pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
25
26use super::prelude::{PlanRef, *};
27use crate::TableCatalog;
28use crate::catalog::catalog_service::CatalogReadGuard;
29use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
30use crate::optimizer::OptimizerContext;
31use crate::optimizer::plan_node::generic::GenericPlanRef;
32use crate::optimizer::plan_node::{
33    Logical, LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion,
34    LogicalValues, StreamTableScan,
35};
36use crate::optimizer::rule::{ApplyResult, FallibleRule};
37use crate::utils::{Condition, GroupBy};
38
39/// Transform the `internal_backfill_progress()` table function
40/// into a plan graph which will scan the state tables of backfill nodes.
41/// It will return the progress of the backfills, partitioned by the backfill node's fragment id.
42pub struct TableFunctionToInternalBackfillProgressRule {}
43impl FallibleRule<Logical> for TableFunctionToInternalBackfillProgressRule {
44    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
45        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
46        if logical_table_function.table_function.function_type
47            != TableFunctionType::InternalBackfillProgress
48        {
49            return ApplyResult::NotApplicable;
50        }
51
52        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
53        let backfilling_tables = get_backfilling_tables(reader);
54        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
55        ApplyResult::Ok(plan)
56    }
57}
58
59impl TableFunctionToInternalBackfillProgressRule {
60    fn build_plan(
61        ctx: Rc<OptimizerContext>,
62        backfilling_tables: Vec<Arc<TableCatalog>>,
63    ) -> anyhow::Result<PlanRef> {
64        if backfilling_tables.is_empty() {
65            let fields = vec![
66                Field::new("job_id", DataType::Int32),
67                Field::new("fragment_id", DataType::Int32),
68                Field::new("backfill_state_table_id", DataType::Int32),
69                Field::new("current_row_count", DataType::Int64),
70                Field::new("min_epoch", DataType::Int64),
71            ];
72            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx);
73            return Ok(plan.into());
74        }
75
76        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
77        for table in backfilling_tables {
78            let backfill_info = BackfillInfo::new(&table)?;
79
80            let scan = Self::build_scan(ctx.clone(), table);
81            let agg = Self::build_agg(&backfill_info, scan)?;
82            let project = Self::build_project(&backfill_info, agg)?;
83
84            all_progress.push(project.into());
85        }
86        Ok(LogicalUnion::new(true, all_progress).into())
87    }
88
89    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
90        LogicalScan::create(table, ctx, None)
91    }
92
93    fn build_agg(backfill_info: &BackfillInfo, scan: LogicalScan) -> anyhow::Result<PlanRef> {
94        let epoch_expr = match backfill_info.epoch_column_index {
95            Some(epoch_column_index) => ExprImpl::InputRef(Box::new(InputRef {
96                index: epoch_column_index,
97                data_type: DataType::Int64,
98            })),
99            None => ExprImpl::Literal(Box::new(Literal::new(None, DataType::Int64))),
100        };
101        let aggregated_min_epoch = ExprImpl::AggCall(Box::new(AggCall::new(
102            AggType::Builtin(PbAggKind::Min),
103            vec![epoch_expr],
104            false,
105            OrderBy::any(),
106            Condition::true_cond(),
107            vec![],
108        )?));
109        let aggregated_current_row_count = ExprImpl::AggCall(Box::new(AggCall::new(
110            AggType::Builtin(PbAggKind::Sum),
111            vec![ExprImpl::InputRef(Box::new(InputRef {
112                index: backfill_info.row_count_column_index,
113                data_type: DataType::Int64,
114            }))],
115            false,
116            OrderBy::any(),
117            Condition::true_cond(),
118            vec![],
119        )?));
120        let select_exprs = vec![aggregated_current_row_count, aggregated_min_epoch];
121        let group_by = GroupBy::GroupKey(vec![]);
122        let (agg, _, _) = LogicalAgg::create(select_exprs, group_by, None, scan.into())?;
123        Ok(agg)
124    }
125
126    fn build_project(backfill_info: &BackfillInfo, agg: PlanRef) -> anyhow::Result<LogicalProject> {
127        let job_id_expr = Self::build_u32_expr(backfill_info.job_id.as_raw_id());
128        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id.as_raw_id());
129        let table_id_expr = Self::build_u32_expr(backfill_info.table_id.as_raw_id());
130
131        let current_count_per_vnode = ExprImpl::InputRef(Box::new(InputRef {
132            index: 0,
133            data_type: DataType::Decimal,
134        }))
135        .cast_explicit(&DataType::Int64)?;
136        let min_epoch = ExprImpl::InputRef(Box::new(InputRef {
137            index: 1,
138            data_type: DataType::Int64,
139        }));
140
141        Ok(LogicalProject::new(
142            agg,
143            vec![
144                job_id_expr,
145                fragment_id_expr,
146                table_id_expr,
147                current_count_per_vnode,
148                min_epoch,
149            ],
150        ))
151    }
152
153    fn build_u32_expr(id: u32) -> ExprImpl {
154        ExprImpl::Literal(Box::new(Literal::new(
155            Some(ScalarImpl::Int32(id as i32)),
156            DataType::Int32,
157        )))
158    }
159}
160
161fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
162    reader
163        .iter_backfilling_internal_tables()
164        .filter(|table| is_backfill_table(&table.name))
165        .cloned()
166        .collect_vec()
167}
168
169impl TableFunctionToInternalBackfillProgressRule {
170    pub fn create() -> BoxedRule {
171        Box::new(TableFunctionToInternalBackfillProgressRule {})
172    }
173}
174
175struct BackfillInfo {
176    job_id: JobId,
177    fragment_id: FragmentId,
178    table_id: TableId,
179    row_count_column_index: usize,
180    epoch_column_index: Option<usize>,
181}
182
183impl BackfillInfo {
184    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
185        let Some(job_id) = table.job_id else {
186            bail!("`job_id` column not found in backfill table");
187        };
188        let Some(row_count_column_index) = table
189            .columns
190            .iter()
191            .position(|c| c.name() == StreamTableScan::ROW_COUNT_COLUMN_NAME)
192        else {
193            bail!(
194                "`{}` column not found in backfill table",
195                StreamTableScan::ROW_COUNT_COLUMN_NAME
196            );
197        };
198        let epoch_column_index = table
199            .columns
200            .iter()
201            .position(|c| c.name() == StreamTableScan::EPOCH_COLUMN_NAME);
202        let fragment_id = table.fragment_id;
203        let table_id = table.id;
204
205        Ok(Self {
206            job_id,
207            fragment_id,
208            table_id,
209            row_count_column_index,
210            epoch_column_index,
211        })
212    }
213}