risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::{ApplyResult, BoxedRule, FallibleRule};
24use crate::TableCatalog;
25use crate::catalog::catalog_service::CatalogReadGuard;
26use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
27use crate::optimizer::plan_node::generic::GenericPlanRef;
28use crate::optimizer::plan_node::{
29 LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
30 StreamSourceScan,
31};
32use crate::optimizer::{OptimizerContext, PlanRef};
33
34pub struct TableFunctionToInternalSourceBackfillProgressRule {}
39impl FallibleRule for TableFunctionToInternalSourceBackfillProgressRule {
40 fn apply(&self, plan: PlanRef) -> ApplyResult {
41 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
42 if logical_table_function.table_function.function_type
43 != TableFunctionType::InternalSourceBackfillProgress
44 {
45 return ApplyResult::NotApplicable;
46 }
47
48 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
49 let backfilling_tables = get_source_backfilling_tables(reader);
50 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
51 ApplyResult::Ok(plan)
52 }
53}
54
55impl TableFunctionToInternalSourceBackfillProgressRule {
56 fn build_plan(
57 ctx: Rc<OptimizerContext>,
58 backfilling_tables: Vec<Arc<TableCatalog>>,
59 ) -> anyhow::Result<PlanRef> {
60 if backfilling_tables.is_empty() {
61 let fields = vec![
62 Field::new("job_id", DataType::Int32),
63 Field::new("fragment_id", DataType::Int32),
64 Field::new("backfill_state_table_id", DataType::Int32),
65 Field::new("backfill_progress", DataType::Jsonb),
66 ];
67 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
68 return Ok(plan.into());
69 }
70
71 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
72 for table in backfilling_tables {
73 let backfill_info = SourceBackfillInfo::new(&table)?;
74
75 let scan = Self::build_scan(ctx.clone(), table);
76 let project = Self::build_project(&backfill_info, scan.into())?;
77
78 all_progress.push(project.into());
79 }
80 Ok(LogicalUnion::new(true, all_progress).into())
81 }
82
83 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
84 LogicalScan::create(
85 table.name.clone(),
86 table,
87 vec![],
88 ctx.clone(),
89 None,
90 Default::default(),
91 )
92 }
93
94 fn build_project(
95 backfill_info: &SourceBackfillInfo,
96 scan: PlanRef,
97 ) -> anyhow::Result<LogicalProject> {
98 let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
99 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
100 let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
101
102 let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
103 index: backfill_info.backfill_progress_column_index,
104 data_type: DataType::Jsonb,
105 }));
106
107 Ok(LogicalProject::new(
108 scan,
109 vec![
110 job_id_expr,
111 fragment_id_expr,
112 table_id_expr,
113 backfill_progress,
114 ],
115 ))
116 }
117
118 fn build_u32_expr(id: u32) -> ExprImpl {
119 ExprImpl::Literal(Box::new(Literal::new(
120 Some(ScalarImpl::Int32(id as i32)),
121 DataType::Int32,
122 )))
123 }
124}
125
126fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
127 reader
128 .iter_backfilling_internal_tables()
129 .filter(|table| is_source_backfill_table(&table.name))
130 .cloned()
131 .collect_vec()
132}
133
134impl TableFunctionToInternalSourceBackfillProgressRule {
135 pub fn create() -> BoxedRule {
136 Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
137 }
138}
139
140struct SourceBackfillInfo {
141 job_id: u32,
142 fragment_id: u32,
143 table_id: u32,
144 backfill_progress_column_index: usize,
145}
146
147impl SourceBackfillInfo {
148 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
149 let Some(job_id) = table.job_id.map(|id| id.table_id) else {
150 bail!("`job_id` column not found in source backfill table catalog");
151 };
152 let Some(backfill_progress_column_index) = table
153 .columns
154 .iter()
155 .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
156 else {
157 bail!(
158 "`{}` column not found in source backfill state table schema",
159 StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
160 );
161 };
162 let fragment_id = table.fragment_id;
163 let table_id = table.id.table_id;
164
165 Ok(Self {
166 job_id,
167 fragment_id,
168 table_id,
169 backfill_progress_column_index,
170 })
171 }
172}