risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::id::{FragmentId, JobId, TableId};
22use risingwave_common::types::{DataType, ScalarImpl};
23
24use super::prelude::{PlanRef, *};
25use crate::TableCatalog;
26use crate::catalog::catalog_service::CatalogReadGuard;
27use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
28use crate::optimizer::OptimizerContext;
29use crate::optimizer::plan_node::generic::GenericPlanRef;
30use crate::optimizer::plan_node::{
31 Logical, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
32 StreamSourceScan,
33};
34use crate::optimizer::rule::{ApplyResult, FallibleRule};
35
36pub struct TableFunctionToInternalSourceBackfillProgressRule {}
41impl FallibleRule<Logical> for TableFunctionToInternalSourceBackfillProgressRule {
42 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
43 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
44 if logical_table_function.table_function.function_type
45 != TableFunctionType::InternalSourceBackfillProgress
46 {
47 return ApplyResult::NotApplicable;
48 }
49
50 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
51 let backfilling_tables = get_source_backfilling_tables(reader);
52 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
53 ApplyResult::Ok(plan)
54 }
55}
56
57impl TableFunctionToInternalSourceBackfillProgressRule {
58 fn build_plan(
59 ctx: Rc<OptimizerContext>,
60 backfilling_tables: Vec<Arc<TableCatalog>>,
61 ) -> anyhow::Result<PlanRef> {
62 if backfilling_tables.is_empty() {
63 let fields = vec![
64 Field::new("job_id", DataType::Int32),
65 Field::new("fragment_id", DataType::Int32),
66 Field::new("backfill_state_table_id", DataType::Int32),
67 Field::new("partition_id", DataType::Varchar),
68 Field::new("backfill_progress", DataType::Jsonb),
69 ];
70 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx);
71 return Ok(plan.into());
72 }
73
74 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
75 for table in backfilling_tables {
76 let backfill_info = SourceBackfillInfo::new(&table)?;
77
78 let scan = Self::build_scan(ctx.clone(), table);
79 let project = Self::build_project(&backfill_info, scan.into())?;
80
81 all_progress.push(project.into());
82 }
83 Ok(LogicalUnion::new(true, all_progress).into())
84 }
85
86 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
87 LogicalScan::create(table, ctx, None)
88 }
89
90 fn build_project(
91 backfill_info: &SourceBackfillInfo,
92 scan: PlanRef,
93 ) -> anyhow::Result<LogicalProject> {
94 let job_id_expr = Self::build_u32_expr(backfill_info.job_id.as_raw_id());
95 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id.as_raw_id());
96 let table_id_expr = Self::build_u32_expr(backfill_info.table_id.as_raw_id());
97
98 let partition_id = ExprImpl::InputRef(Box::new(InputRef {
99 index: backfill_info.partition_id_column_index,
100 data_type: DataType::Varchar,
101 }));
102
103 let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
104 index: backfill_info.backfill_progress_column_index,
105 data_type: DataType::Jsonb,
106 }));
107
108 Ok(LogicalProject::new(
109 scan,
110 vec![
111 job_id_expr,
112 fragment_id_expr,
113 table_id_expr,
114 partition_id,
115 backfill_progress,
116 ],
117 ))
118 }
119
120 fn build_u32_expr(id: u32) -> ExprImpl {
121 ExprImpl::Literal(Box::new(Literal::new(
122 Some(ScalarImpl::Int32(id as i32)),
123 DataType::Int32,
124 )))
125 }
126}
127
128fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
129 reader
130 .iter_backfilling_internal_tables()
131 .filter(|table| is_source_backfill_table(&table.name))
132 .cloned()
133 .collect_vec()
134}
135
136impl TableFunctionToInternalSourceBackfillProgressRule {
137 pub fn create() -> BoxedRule {
138 Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
139 }
140}
141
142struct SourceBackfillInfo {
143 job_id: JobId,
144 fragment_id: FragmentId,
145 table_id: TableId,
146 partition_id_column_index: usize,
147 backfill_progress_column_index: usize,
148}
149
150impl SourceBackfillInfo {
151 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
152 let Some(job_id) = table.job_id else {
153 bail!("`job_id` column not found in source backfill table catalog");
154 };
155 let Some(backfill_progress_column_index) = table
156 .columns
157 .iter()
158 .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
159 else {
160 bail!(
161 "`{}` column not found in source backfill state table schema",
162 StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
163 );
164 };
165 let Some(partition_id_column_index) = table
166 .columns
167 .iter()
168 .position(|c| c.name() == StreamSourceScan::PARTITION_ID_COLUMN_NAME)
169 else {
170 bail!(
171 "`{}` column not found in source backfill state table schema",
172 StreamSourceScan::PARTITION_ID_COLUMN_NAME
173 );
174 };
175 let fragment_id = table.fragment_id;
176 let table_id = table.id;
177
178 Ok(Self {
179 job_id,
180 fragment_id,
181 table_id,
182 partition_id_column_index,
183 backfill_progress_column_index,
184 })
185 }
186}