risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::id::{FragmentId, JobId, TableId};
22use risingwave_common::types::{DataType, ScalarImpl};
23
24use super::prelude::{PlanRef, *};
25use crate::TableCatalog;
26use crate::catalog::catalog_service::CatalogReadGuard;
27use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
28use crate::optimizer::OptimizerContext;
29use crate::optimizer::plan_node::generic::GenericPlanRef;
30use crate::optimizer::plan_node::{
31 Logical, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
32 StreamSourceScan,
33};
34use crate::optimizer::rule::{ApplyResult, FallibleRule};
35
36pub struct TableFunctionToInternalSourceBackfillProgressRule {}
41impl FallibleRule<Logical> for TableFunctionToInternalSourceBackfillProgressRule {
42 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
43 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
44 if logical_table_function.table_function.function_type
45 != TableFunctionType::InternalSourceBackfillProgress
46 {
47 return ApplyResult::NotApplicable;
48 }
49
50 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
51 let backfilling_tables = get_source_backfilling_tables(reader);
52 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
53 ApplyResult::Ok(plan)
54 }
55}
56
57impl TableFunctionToInternalSourceBackfillProgressRule {
58 fn build_plan(
59 ctx: Rc<OptimizerContext>,
60 backfilling_tables: Vec<Arc<TableCatalog>>,
61 ) -> anyhow::Result<PlanRef> {
62 if backfilling_tables.is_empty() {
63 let fields = vec![
64 Field::new("job_id", DataType::Int32),
65 Field::new("fragment_id", DataType::Int32),
66 Field::new("backfill_state_table_id", DataType::Int32),
67 Field::new("backfill_progress", DataType::Jsonb),
68 ];
69 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx);
70 return Ok(plan.into());
71 }
72
73 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
74 for table in backfilling_tables {
75 let backfill_info = SourceBackfillInfo::new(&table)?;
76
77 let scan = Self::build_scan(ctx.clone(), table);
78 let project = Self::build_project(&backfill_info, scan.into())?;
79
80 all_progress.push(project.into());
81 }
82 Ok(LogicalUnion::new(true, all_progress).into())
83 }
84
85 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
86 LogicalScan::create(table, ctx, None)
87 }
88
89 fn build_project(
90 backfill_info: &SourceBackfillInfo,
91 scan: PlanRef,
92 ) -> anyhow::Result<LogicalProject> {
93 let job_id_expr = Self::build_u32_expr(backfill_info.job_id.as_raw_id());
94 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id.as_raw_id());
95 let table_id_expr = Self::build_u32_expr(backfill_info.table_id.as_raw_id());
96
97 let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
98 index: backfill_info.backfill_progress_column_index,
99 data_type: DataType::Jsonb,
100 }));
101
102 Ok(LogicalProject::new(
103 scan,
104 vec![
105 job_id_expr,
106 fragment_id_expr,
107 table_id_expr,
108 backfill_progress,
109 ],
110 ))
111 }
112
113 fn build_u32_expr(id: u32) -> ExprImpl {
114 ExprImpl::Literal(Box::new(Literal::new(
115 Some(ScalarImpl::Int32(id as i32)),
116 DataType::Int32,
117 )))
118 }
119}
120
121fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
122 reader
123 .iter_backfilling_internal_tables()
124 .filter(|table| is_source_backfill_table(&table.name))
125 .cloned()
126 .collect_vec()
127}
128
129impl TableFunctionToInternalSourceBackfillProgressRule {
130 pub fn create() -> BoxedRule {
131 Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
132 }
133}
134
135struct SourceBackfillInfo {
136 job_id: JobId,
137 fragment_id: FragmentId,
138 table_id: TableId,
139 backfill_progress_column_index: usize,
140}
141
142impl SourceBackfillInfo {
143 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
144 let Some(job_id) = table.job_id else {
145 bail!("`job_id` column not found in source backfill table catalog");
146 };
147 let Some(backfill_progress_column_index) = table
148 .columns
149 .iter()
150 .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
151 else {
152 bail!(
153 "`{}` column not found in source backfill state table schema",
154 StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
155 );
156 };
157 let fragment_id = table.fragment_id;
158 let table_id = table.id;
159
160 Ok(Self {
161 job_id,
162 fragment_id,
163 table_id,
164 backfill_progress_column_index,
165 })
166 }
167}