risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::{ApplyResult, BoxedRule, FallibleRule};
24use crate::TableCatalog;
25use crate::catalog::catalog_service::CatalogReadGuard;
26use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::{
29    LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
30    StreamSourceScan,
31};
32use crate::optimizer::{OptimizerContext, PlanRef};
33
34/// Transform the `internal_source_backfill_progress()` table function
35/// into a plan graph which will scan the state tables of source backfill nodes.
36/// It will return the progress of the source backfills,
37/// partitioned by the backfill node's fragment id and partition id.
38pub struct TableFunctionToInternalSourceBackfillProgressRule {}
39impl FallibleRule for TableFunctionToInternalSourceBackfillProgressRule {
40    fn apply(&self, plan: PlanRef) -> ApplyResult {
41        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
42        if logical_table_function.table_function.function_type
43            != TableFunctionType::InternalSourceBackfillProgress
44        {
45            return ApplyResult::NotApplicable;
46        }
47
48        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
49        let backfilling_tables = get_source_backfilling_tables(reader);
50        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
51        ApplyResult::Ok(plan)
52    }
53}
54
55impl TableFunctionToInternalSourceBackfillProgressRule {
56    fn build_plan(
57        ctx: Rc<OptimizerContext>,
58        backfilling_tables: Vec<Arc<TableCatalog>>,
59    ) -> anyhow::Result<PlanRef> {
60        if backfilling_tables.is_empty() {
61            let fields = vec![
62                Field::new("job_id", DataType::Int32),
63                Field::new("fragment_id", DataType::Int32),
64                Field::new("backfill_state_table_id", DataType::Int32),
65                Field::new("backfill_progress", DataType::Jsonb),
66            ];
67            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
68            return Ok(plan.into());
69        }
70
71        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
72        for table in backfilling_tables {
73            let backfill_info = SourceBackfillInfo::new(&table)?;
74
75            let scan = Self::build_scan(ctx.clone(), table);
76            let project = Self::build_project(&backfill_info, scan.into())?;
77
78            all_progress.push(project.into());
79        }
80        Ok(LogicalUnion::new(true, all_progress).into())
81    }
82
83    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
84        LogicalScan::create(
85            table.name.clone(),
86            table,
87            vec![],
88            ctx.clone(),
89            None,
90            Default::default(),
91        )
92    }
93
94    fn build_project(
95        backfill_info: &SourceBackfillInfo,
96        scan: PlanRef,
97    ) -> anyhow::Result<LogicalProject> {
98        let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
99        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
100        let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
101
102        let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
103            index: backfill_info.backfill_progress_column_index,
104            data_type: DataType::Jsonb,
105        }));
106
107        Ok(LogicalProject::new(
108            scan,
109            vec![
110                job_id_expr,
111                fragment_id_expr,
112                table_id_expr,
113                backfill_progress,
114            ],
115        ))
116    }
117
118    fn build_u32_expr(id: u32) -> ExprImpl {
119        ExprImpl::Literal(Box::new(Literal::new(
120            Some(ScalarImpl::Int32(id as i32)),
121            DataType::Int32,
122        )))
123    }
124}
125
126fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
127    reader
128        .iter_backfilling_internal_tables()
129        .filter(|table| is_source_backfill_table(&table.name))
130        .cloned()
131        .collect_vec()
132}
133
134impl TableFunctionToInternalSourceBackfillProgressRule {
135    pub fn create() -> BoxedRule {
136        Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
137    }
138}
139
140struct SourceBackfillInfo {
141    job_id: u32,
142    fragment_id: u32,
143    table_id: u32,
144    backfill_progress_column_index: usize,
145}
146
147impl SourceBackfillInfo {
148    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
149        let Some(job_id) = table.job_id.map(|id| id.table_id) else {
150            bail!("`job_id` column not found in source backfill table catalog");
151        };
152        let Some(backfill_progress_column_index) = table
153            .columns
154            .iter()
155            .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
156        else {
157            bail!(
158                "`{}` column not found in source backfill state table schema",
159                StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
160            );
161        };
162        let fragment_id = table.fragment_id;
163        let table_id = table.id.table_id;
164
165        Ok(Self {
166            job_id,
167            fragment_id,
168            table_id,
169            backfill_progress_column_index,
170        })
171    }
172}