risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::id::{FragmentId, JobId, TableId};
22use risingwave_common::types::{DataType, ScalarImpl};
23
24use super::prelude::{PlanRef, *};
25use crate::TableCatalog;
26use crate::catalog::catalog_service::CatalogReadGuard;
27use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
28use crate::optimizer::OptimizerContext;
29use crate::optimizer::plan_node::generic::GenericPlanRef;
30use crate::optimizer::plan_node::{
31    Logical, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
32    StreamSourceScan,
33};
34use crate::optimizer::rule::{ApplyResult, FallibleRule};
35
36/// Transform the `internal_source_backfill_progress()` table function
37/// into a plan graph which will scan the state tables of source backfill nodes.
38/// It will return the progress of the source backfills,
39/// partitioned by the backfill node's fragment id and partition id.
40pub struct TableFunctionToInternalSourceBackfillProgressRule {}
41impl FallibleRule<Logical> for TableFunctionToInternalSourceBackfillProgressRule {
42    fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
43        let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
44        if logical_table_function.table_function.function_type
45            != TableFunctionType::InternalSourceBackfillProgress
46        {
47            return ApplyResult::NotApplicable;
48        }
49
50        let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
51        let backfilling_tables = get_source_backfilling_tables(reader);
52        let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
53        ApplyResult::Ok(plan)
54    }
55}
56
57impl TableFunctionToInternalSourceBackfillProgressRule {
58    fn build_plan(
59        ctx: Rc<OptimizerContext>,
60        backfilling_tables: Vec<Arc<TableCatalog>>,
61    ) -> anyhow::Result<PlanRef> {
62        if backfilling_tables.is_empty() {
63            let fields = vec![
64                Field::new("job_id", DataType::Int32),
65                Field::new("fragment_id", DataType::Int32),
66                Field::new("backfill_state_table_id", DataType::Int32),
67                Field::new("partition_id", DataType::Varchar),
68                Field::new("backfill_progress", DataType::Jsonb),
69            ];
70            let plan = LogicalValues::new(vec![], Schema::new(fields), ctx);
71            return Ok(plan.into());
72        }
73
74        let mut all_progress = Vec::with_capacity(backfilling_tables.len());
75        for table in backfilling_tables {
76            let backfill_info = SourceBackfillInfo::new(&table)?;
77
78            let scan = Self::build_scan(ctx.clone(), table);
79            let project = Self::build_project(&backfill_info, scan.into())?;
80
81            all_progress.push(project.into());
82        }
83        Ok(LogicalUnion::new(true, all_progress).into())
84    }
85
86    fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
87        LogicalScan::create(table, ctx, None)
88    }
89
90    fn build_project(
91        backfill_info: &SourceBackfillInfo,
92        scan: PlanRef,
93    ) -> anyhow::Result<LogicalProject> {
94        let job_id_expr = Self::build_u32_expr(backfill_info.job_id.as_raw_id());
95        let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id.as_raw_id());
96        let table_id_expr = Self::build_u32_expr(backfill_info.table_id.as_raw_id());
97
98        let partition_id = ExprImpl::InputRef(Box::new(InputRef {
99            index: backfill_info.partition_id_column_index,
100            data_type: DataType::Varchar,
101        }));
102
103        let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
104            index: backfill_info.backfill_progress_column_index,
105            data_type: DataType::Jsonb,
106        }));
107
108        Ok(LogicalProject::new(
109            scan,
110            vec![
111                job_id_expr,
112                fragment_id_expr,
113                table_id_expr,
114                partition_id,
115                backfill_progress,
116            ],
117        ))
118    }
119
120    fn build_u32_expr(id: u32) -> ExprImpl {
121        ExprImpl::Literal(Box::new(Literal::new(
122            Some(ScalarImpl::Int32(id as i32)),
123            DataType::Int32,
124        )))
125    }
126}
127
128fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
129    reader
130        .iter_backfilling_internal_tables()
131        .filter(|table| is_source_backfill_table(&table.name))
132        .cloned()
133        .collect_vec()
134}
135
136impl TableFunctionToInternalSourceBackfillProgressRule {
137    pub fn create() -> BoxedRule {
138        Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
139    }
140}
141
142struct SourceBackfillInfo {
143    job_id: JobId,
144    fragment_id: FragmentId,
145    table_id: TableId,
146    partition_id_column_index: usize,
147    backfill_progress_column_index: usize,
148}
149
150impl SourceBackfillInfo {
151    fn new(table: &TableCatalog) -> anyhow::Result<Self> {
152        let Some(job_id) = table.job_id else {
153            bail!("`job_id` column not found in source backfill table catalog");
154        };
155        let Some(backfill_progress_column_index) = table
156            .columns
157            .iter()
158            .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
159        else {
160            bail!(
161                "`{}` column not found in source backfill state table schema",
162                StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
163            );
164        };
165        let Some(partition_id_column_index) = table
166            .columns
167            .iter()
168            .position(|c| c.name() == StreamSourceScan::PARTITION_ID_COLUMN_NAME)
169        else {
170            bail!(
171                "`{}` column not found in source backfill state table schema",
172                StreamSourceScan::PARTITION_ID_COLUMN_NAME
173            );
174        };
175        let fragment_id = table.fragment_id;
176        let table_id = table.id;
177
178        Ok(Self {
179            job_id,
180            fragment_id,
181            table_id,
182            partition_id_column_index,
183            backfill_progress_column_index,
184        })
185    }
186}