risingwave_frontend/optimizer/rule/
table_function_to_internal_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22use risingwave_expr::aggregate::AggType;
23pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
24
25use super::prelude::{PlanRef, *};
26use crate::TableCatalog;
27use crate::catalog::catalog_service::CatalogReadGuard;
28use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
29use crate::optimizer::OptimizerContext;
30use crate::optimizer::plan_node::generic::GenericPlanRef;
31use crate::optimizer::plan_node::{
32 Logical, LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion,
33 LogicalValues, StreamTableScan,
34};
35use crate::optimizer::rule::{ApplyResult, FallibleRule};
36use crate::utils::{Condition, GroupBy};
37
38pub struct TableFunctionToInternalBackfillProgressRule {}
42impl FallibleRule<Logical> for TableFunctionToInternalBackfillProgressRule {
43 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
44 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
45 if logical_table_function.table_function.function_type
46 != TableFunctionType::InternalBackfillProgress
47 {
48 return ApplyResult::NotApplicable;
49 }
50
51 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
52 let backfilling_tables = get_backfilling_tables(reader);
53 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
54 ApplyResult::Ok(plan)
55 }
56}
57
58impl TableFunctionToInternalBackfillProgressRule {
59 fn build_plan(
60 ctx: Rc<OptimizerContext>,
61 backfilling_tables: Vec<Arc<TableCatalog>>,
62 ) -> anyhow::Result<PlanRef> {
63 if backfilling_tables.is_empty() {
64 let fields = vec![
65 Field::new("job_id", DataType::Int32),
66 Field::new("fragment_id", DataType::Int32),
67 Field::new("backfill_state_table_id", DataType::Int32),
68 Field::new("current_row_count", DataType::Int64),
69 Field::new("min_epoch", DataType::Int64),
70 ];
71 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
72 return Ok(plan.into());
73 }
74
75 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
76 for table in backfilling_tables {
77 let backfill_info = BackfillInfo::new(&table)?;
78
79 let scan = Self::build_scan(ctx.clone(), table);
80 let agg = Self::build_agg(&backfill_info, scan)?;
81 let project = Self::build_project(&backfill_info, agg)?;
82
83 all_progress.push(project.into());
84 }
85 Ok(LogicalUnion::new(true, all_progress).into())
86 }
87
88 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
89 LogicalScan::create(table, ctx.clone(), None)
90 }
91
92 fn build_agg(backfill_info: &BackfillInfo, scan: LogicalScan) -> anyhow::Result<PlanRef> {
93 let epoch_expr = match backfill_info.epoch_column_index {
94 Some(epoch_column_index) => ExprImpl::InputRef(Box::new(InputRef {
95 index: epoch_column_index,
96 data_type: DataType::Int64,
97 })),
98 None => ExprImpl::Literal(Box::new(Literal::new(None, DataType::Int64))),
99 };
100 let aggregated_min_epoch = ExprImpl::AggCall(Box::new(AggCall::new(
101 AggType::Builtin(PbAggKind::Min),
102 vec![epoch_expr],
103 false,
104 OrderBy::any(),
105 Condition::true_cond(),
106 vec![],
107 )?));
108 let aggregated_current_row_count = ExprImpl::AggCall(Box::new(AggCall::new(
109 AggType::Builtin(PbAggKind::Sum),
110 vec![ExprImpl::InputRef(Box::new(InputRef {
111 index: backfill_info.row_count_column_index,
112 data_type: DataType::Int64,
113 }))],
114 false,
115 OrderBy::any(),
116 Condition::true_cond(),
117 vec![],
118 )?));
119 let select_exprs = vec![aggregated_current_row_count, aggregated_min_epoch];
120 let group_by = GroupBy::GroupKey(vec![]);
121 let (agg, _, _) = LogicalAgg::create(select_exprs, group_by, None, scan.into())?;
122 Ok(agg)
123 }
124
125 fn build_project(backfill_info: &BackfillInfo, agg: PlanRef) -> anyhow::Result<LogicalProject> {
126 let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
127 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
128 let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
129
130 let current_count_per_vnode = ExprImpl::InputRef(Box::new(InputRef {
131 index: 0,
132 data_type: DataType::Decimal,
133 }))
134 .cast_explicit(&DataType::Int64)?;
135 let min_epoch = ExprImpl::InputRef(Box::new(InputRef {
136 index: 1,
137 data_type: DataType::Int64,
138 }));
139
140 Ok(LogicalProject::new(
141 agg,
142 vec![
143 job_id_expr,
144 fragment_id_expr,
145 table_id_expr,
146 current_count_per_vnode,
147 min_epoch,
148 ],
149 ))
150 }
151
152 fn build_u32_expr(id: u32) -> ExprImpl {
153 ExprImpl::Literal(Box::new(Literal::new(
154 Some(ScalarImpl::Int32(id as i32)),
155 DataType::Int32,
156 )))
157 }
158}
159
160fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
161 reader
162 .iter_backfilling_internal_tables()
163 .filter(|table| is_backfill_table(&table.name))
164 .cloned()
165 .collect_vec()
166}
167
168impl TableFunctionToInternalBackfillProgressRule {
169 pub fn create() -> BoxedRule {
170 Box::new(TableFunctionToInternalBackfillProgressRule {})
171 }
172}
173
174struct BackfillInfo {
175 job_id: u32,
176 fragment_id: u32,
177 table_id: u32,
178 row_count_column_index: usize,
179 epoch_column_index: Option<usize>,
180}
181
182impl BackfillInfo {
183 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
184 let Some(job_id) = table.job_id.map(|id| id.table_id) else {
185 bail!("`job_id` column not found in backfill table");
186 };
187 let Some(row_count_column_index) = table
188 .columns
189 .iter()
190 .position(|c| c.name() == StreamTableScan::ROW_COUNT_COLUMN_NAME)
191 else {
192 bail!(
193 "`{}` column not found in backfill table",
194 StreamTableScan::ROW_COUNT_COLUMN_NAME
195 );
196 };
197 let epoch_column_index = table
198 .columns
199 .iter()
200 .position(|c| c.name() == StreamTableScan::EPOCH_COLUMN_NAME);
201 let fragment_id = table.fragment_id;
202 let table_id = table.id.table_id;
203
204 Ok(Self {
205 job_id,
206 fragment_id,
207 table_id,
208 row_count_column_index,
209 epoch_column_index,
210 })
211 }
212}