risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::id::{FragmentId, JobId, TableId};
22use risingwave_common::types::{DataType, ScalarImpl};
23
24use super::prelude::{PlanRef, *};
25use crate::TableCatalog;
26use crate::catalog::catalog_service::CatalogReadGuard;
27use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
28use crate::optimizer::OptimizerContext;
29use crate::optimizer::plan_node::generic::GenericPlanRef;
30use crate::optimizer::plan_node::{
31    Logical, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
32    StreamSourceScan,
33};
34use crate::optimizer::rule::{ApplyResult, FallibleRule};
35
36/// Transform the `internal_source_backfill_progress()` table function
37/// into a plan graph which will scan the state tables of source backfill nodes.
38/// It will return the progress of the source backfills,
39/// partitioned by the backfill node's fragment id and partition id.
40pub struct TableFunctionToInternalSourceBackfillProgressRule {}
41impl FallibleRule<Logical> for TableFunctionToInternalSourceBackfillProgressRule {
42    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
43        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
44        if logical_table_function.table_function.function_type
45            != TableFunctionType::InternalSourceBackfillProgress
46        {
47            return ApplyResult::NotApplicable;
48        }
49
50        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
51        let backfilling_tables = get_source_backfilling_tables(reader);
52        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
53        ApplyResult::Ok(plan)
54    }
55}
56
57impl TableFunctionToInternalSourceBackfillProgressRule {
58    fn build_plan(
59        ctx: Rc<OptimizerContext>,
60        backfilling_tables: Vec<Arc<TableCatalog>>,
61    ) -> anyhow::Result<PlanRef> {
62        if backfilling_tables.is_empty() {
63            let fields = vec![
64                Field::new("job_id", DataType::Int32),
65                Field::new("fragment_id", DataType::Int32),
66                Field::new("backfill_state_table_id", DataType::Int32),
67                Field::new("backfill_progress", DataType::Jsonb),
68            ];
69            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx);
70            return Ok(plan.into());
71        }
72
73        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
74        for table in backfilling_tables {
75            let backfill_info = SourceBackfillInfo::new(&table)?;
76
77            let scan = Self::build_scan(ctx.clone(), table);
78            let project = Self::build_project(&backfill_info, scan.into())?;
79
80            all_progress.push(project.into());
81        }
82        Ok(LogicalUnion::new(true, all_progress).into())
83    }
84
85    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
86        LogicalScan::create(table, ctx, None)
87    }
88
89    fn build_project(
90        backfill_info: &SourceBackfillInfo,
91        scan: PlanRef,
92    ) -> anyhow::Result<LogicalProject> {
93        let job_id_expr = Self::build_u32_expr(backfill_info.job_id.as_raw_id());
94        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id.as_raw_id());
95        let table_id_expr = Self::build_u32_expr(backfill_info.table_id.as_raw_id());
96
97        let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
98            index: backfill_info.backfill_progress_column_index,
99            data_type: DataType::Jsonb,
100        }));
101
102        Ok(LogicalProject::new(
103            scan,
104            vec![
105                job_id_expr,
106                fragment_id_expr,
107                table_id_expr,
108                backfill_progress,
109            ],
110        ))
111    }
112
113    fn build_u32_expr(id: u32) -> ExprImpl {
114        ExprImpl::Literal(Box::new(Literal::new(
115            Some(ScalarImpl::Int32(id as i32)),
116            DataType::Int32,
117        )))
118    }
119}
120
121fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
122    reader
123        .iter_backfilling_internal_tables()
124        .filter(|table| is_source_backfill_table(&table.name))
125        .cloned()
126        .collect_vec()
127}
128
129impl TableFunctionToInternalSourceBackfillProgressRule {
130    pub fn create() -> BoxedRule {
131        Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
132    }
133}
134
135struct SourceBackfillInfo {
136    job_id: JobId,
137    fragment_id: FragmentId,
138    table_id: TableId,
139    backfill_progress_column_index: usize,
140}
141
142impl SourceBackfillInfo {
143    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
144        let Some(job_id) = table.job_id else {
145            bail!("`job_id` column not found in source backfill table catalog");
146        };
147        let Some(backfill_progress_column_index) = table
148            .columns
149            .iter()
150            .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
151        else {
152            bail!(
153                "`{}` column not found in source backfill state table schema",
154                StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
155            );
156        };
157        let fragment_id = table.fragment_id;
158        let table_id = table.id;
159
160        Ok(Self {
161            job_id,
162            fragment_id,
163            table_id,
164            backfill_progress_column_index,
165        })
166    }
167}