risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::prelude::{PlanRef, *};
24use crate::TableCatalog;
25use crate::catalog::catalog_service::CatalogReadGuard;
26use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
27use crate::optimizer::OptimizerContext;
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{
30    Logical, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
31    StreamSourceScan,
32};
33use crate::optimizer::rule::{ApplyResult, FallibleRule};
34
35/// Transform the `internal_source_backfill_progress()` table function
36/// into a plan graph which will scan the state tables of source backfill nodes.
37/// It will return the progress of the source backfills,
38/// partitioned by the backfill node's fragment id and partition id.
39pub struct TableFunctionToInternalSourceBackfillProgressRule {}
40impl FallibleRule<Logical> for TableFunctionToInternalSourceBackfillProgressRule {
41    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
42        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
43        if logical_table_function.table_function.function_type
44            != TableFunctionType::InternalSourceBackfillProgress
45        {
46            return ApplyResult::NotApplicable;
47        }
48
49        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
50        let backfilling_tables = get_source_backfilling_tables(reader);
51        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
52        ApplyResult::Ok(plan)
53    }
54}
55
56impl TableFunctionToInternalSourceBackfillProgressRule {
57    fn build_plan(
58        ctx: Rc<OptimizerContext>,
59        backfilling_tables: Vec<Arc<TableCatalog>>,
60    ) -> anyhow::Result<PlanRef> {
61        if backfilling_tables.is_empty() {
62            let fields = vec![
63                Field::new("job_id", DataType::Int32),
64                Field::new("fragment_id", DataType::Int32),
65                Field::new("backfill_state_table_id", DataType::Int32),
66                Field::new("backfill_progress", DataType::Jsonb),
67            ];
68            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
69            return Ok(plan.into());
70        }
71
72        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
73        for table in backfilling_tables {
74            let backfill_info = SourceBackfillInfo::new(&table)?;
75
76            let scan = Self::build_scan(ctx.clone(), table);
77            let project = Self::build_project(&backfill_info, scan.into())?;
78
79            all_progress.push(project.into());
80        }
81        Ok(LogicalUnion::new(true, all_progress).into())
82    }
83
84    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
85        LogicalScan::create(table, ctx.clone(), None)
86    }
87
88    fn build_project(
89        backfill_info: &SourceBackfillInfo,
90        scan: PlanRef,
91    ) -> anyhow::Result<LogicalProject> {
92        let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
93        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
94        let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
95
96        let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
97            index: backfill_info.backfill_progress_column_index,
98            data_type: DataType::Jsonb,
99        }));
100
101        Ok(LogicalProject::new(
102            scan,
103            vec![
104                job_id_expr,
105                fragment_id_expr,
106                table_id_expr,
107                backfill_progress,
108            ],
109        ))
110    }
111
112    fn build_u32_expr(id: u32) -> ExprImpl {
113        ExprImpl::Literal(Box::new(Literal::new(
114            Some(ScalarImpl::Int32(id as i32)),
115            DataType::Int32,
116        )))
117    }
118}
119
120fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
121    reader
122        .iter_backfilling_internal_tables()
123        .filter(|table| is_source_backfill_table(&table.name))
124        .cloned()
125        .collect_vec()
126}
127
128impl TableFunctionToInternalSourceBackfillProgressRule {
129    pub fn create() -> BoxedRule {
130        Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
131    }
132}
133
134struct SourceBackfillInfo {
135    job_id: u32,
136    fragment_id: u32,
137    table_id: u32,
138    backfill_progress_column_index: usize,
139}
140
141impl SourceBackfillInfo {
142    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
143        let Some(job_id) = table.job_id.map(|id| id.table_id) else {
144            bail!("`job_id` column not found in source backfill table catalog");
145        };
146        let Some(backfill_progress_column_index) = table
147            .columns
148            .iter()
149            .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
150        else {
151            bail!(
152                "`{}` column not found in source backfill state table schema",
153                StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
154            );
155        };
156        let fragment_id = table.fragment_id;
157        let table_id = table.id.table_id;
158
159        Ok(Self {
160            job_id,
161            fragment_id,
162            table_id,
163            backfill_progress_column_index,
164        })
165    }
166}