risingwave_frontend/optimizer/rule/
table_function_to_internal_source_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_source_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::prelude::{PlanRef, *};
24use crate::TableCatalog;
25use crate::catalog::catalog_service::CatalogReadGuard;
26use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
27use crate::optimizer::OptimizerContext;
28use crate::optimizer::plan_node::generic::GenericPlanRef;
29use crate::optimizer::plan_node::{
30 Logical, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
31 StreamSourceScan,
32};
33use crate::optimizer::rule::{ApplyResult, FallibleRule};
34
35pub struct TableFunctionToInternalSourceBackfillProgressRule {}
40impl FallibleRule<Logical> for TableFunctionToInternalSourceBackfillProgressRule {
41 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
42 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
43 if logical_table_function.table_function.function_type
44 != TableFunctionType::InternalSourceBackfillProgress
45 {
46 return ApplyResult::NotApplicable;
47 }
48
49 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
50 let backfilling_tables = get_source_backfilling_tables(reader);
51 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
52 ApplyResult::Ok(plan)
53 }
54}
55
56impl TableFunctionToInternalSourceBackfillProgressRule {
57 fn build_plan(
58 ctx: Rc<OptimizerContext>,
59 backfilling_tables: Vec<Arc<TableCatalog>>,
60 ) -> anyhow::Result<PlanRef> {
61 if backfilling_tables.is_empty() {
62 let fields = vec![
63 Field::new("job_id", DataType::Int32),
64 Field::new("fragment_id", DataType::Int32),
65 Field::new("backfill_state_table_id", DataType::Int32),
66 Field::new("backfill_progress", DataType::Jsonb),
67 ];
68 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
69 return Ok(plan.into());
70 }
71
72 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
73 for table in backfilling_tables {
74 let backfill_info = SourceBackfillInfo::new(&table)?;
75
76 let scan = Self::build_scan(ctx.clone(), table);
77 let project = Self::build_project(&backfill_info, scan.into())?;
78
79 all_progress.push(project.into());
80 }
81 Ok(LogicalUnion::new(true, all_progress).into())
82 }
83
84 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
85 LogicalScan::create(table, ctx.clone(), None)
86 }
87
88 fn build_project(
89 backfill_info: &SourceBackfillInfo,
90 scan: PlanRef,
91 ) -> anyhow::Result<LogicalProject> {
92 let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
93 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
94 let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
95
96 let backfill_progress = ExprImpl::InputRef(Box::new(InputRef {
97 index: backfill_info.backfill_progress_column_index,
98 data_type: DataType::Jsonb,
99 }));
100
101 Ok(LogicalProject::new(
102 scan,
103 vec![
104 job_id_expr,
105 fragment_id_expr,
106 table_id_expr,
107 backfill_progress,
108 ],
109 ))
110 }
111
112 fn build_u32_expr(id: u32) -> ExprImpl {
113 ExprImpl::Literal(Box::new(Literal::new(
114 Some(ScalarImpl::Int32(id as i32)),
115 DataType::Int32,
116 )))
117 }
118}
119
120fn get_source_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
121 reader
122 .iter_backfilling_internal_tables()
123 .filter(|table| is_source_backfill_table(&table.name))
124 .cloned()
125 .collect_vec()
126}
127
128impl TableFunctionToInternalSourceBackfillProgressRule {
129 pub fn create() -> BoxedRule {
130 Box::new(TableFunctionToInternalSourceBackfillProgressRule {})
131 }
132}
133
134struct SourceBackfillInfo {
135 job_id: u32,
136 fragment_id: u32,
137 table_id: u32,
138 backfill_progress_column_index: usize,
139}
140
141impl SourceBackfillInfo {
142 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
143 let Some(job_id) = table.job_id.map(|id| id.table_id) else {
144 bail!("`job_id` column not found in source backfill table catalog");
145 };
146 let Some(backfill_progress_column_index) = table
147 .columns
148 .iter()
149 .position(|c| c.name() == StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME)
150 else {
151 bail!(
152 "`{}` column not found in source backfill state table schema",
153 StreamSourceScan::BACKFILL_PROGRESS_COLUMN_NAME
154 );
155 };
156 let fragment_id = table.fragment_id;
157 let table_id = table.id.table_id;
158
159 Ok(Self {
160 job_id,
161 fragment_id,
162 table_id,
163 backfill_progress_column_index,
164 })
165 }
166}