risingwave_frontend/optimizer/rule/
table_function_to_internal_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_backfill_table};
21use risingwave_common::types::{DataType, ScalarImpl};
22use risingwave_expr::aggregate::AggType;
23pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
24
25use super::{ApplyResult, BoxedRule, FallibleRule};
26use crate::TableCatalog;
27use crate::catalog::catalog_service::CatalogReadGuard;
28use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
29use crate::optimizer::plan_node::generic::GenericPlanRef;
30use crate::optimizer::plan_node::{
31 LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
32};
33use crate::optimizer::{OptimizerContext, PlanRef};
34use crate::utils::{Condition, GroupBy};
35
36pub struct TableFunctionToInternalBackfillProgressRule {}
40impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
41 fn apply(&self, plan: PlanRef) -> ApplyResult {
42 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
43 if logical_table_function.table_function.function_type
44 != TableFunctionType::InternalBackfillProgress
45 {
46 return ApplyResult::NotApplicable;
47 }
48
49 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
50 let backfilling_tables = get_backfilling_tables(reader);
52 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
53 ApplyResult::Ok(plan)
54 }
55}
56
57impl TableFunctionToInternalBackfillProgressRule {
58 fn build_plan(
59 ctx: Rc<OptimizerContext>,
60 backfilling_tables: Vec<Arc<TableCatalog>>,
61 ) -> anyhow::Result<PlanRef> {
62 if backfilling_tables.is_empty() {
63 let fields = vec![
64 Field::new("job_id", DataType::Int32),
65 Field::new("fragment_id", DataType::Int32),
66 Field::new("backfill_state_table_id", DataType::Int32),
67 Field::new("current_row_count", DataType::Int64),
68 Field::new("min_epoch", DataType::Int64),
69 ];
70 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx.clone());
71 return Ok(plan.into());
72 }
73
74 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
75 for table in backfilling_tables {
76 let backfill_info = BackfillInfo::new(&table)?;
77
78 let scan = Self::build_scan(ctx.clone(), table);
79 let agg = Self::build_agg(&backfill_info, scan)?;
80 let project = Self::build_project(&backfill_info, agg)?;
81
82 all_progress.push(project.into());
83 }
84 Ok(LogicalUnion::new(true, all_progress).into())
85 }
86
87 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
88 LogicalScan::create(
89 table.name.clone(),
90 table,
91 vec![],
92 ctx.clone(),
93 None,
94 Default::default(),
95 )
96 }
97
98 fn build_agg(backfill_info: &BackfillInfo, scan: LogicalScan) -> anyhow::Result<PlanRef> {
99 let epoch_expr = match backfill_info.epoch_column_index {
100 Some(epoch_column_index) => ExprImpl::InputRef(Box::new(InputRef {
101 index: epoch_column_index,
102 data_type: DataType::Int64,
103 })),
104 None => ExprImpl::Literal(Box::new(Literal::new(None, DataType::Int64))),
105 };
106 let aggregated_min_epoch = ExprImpl::AggCall(Box::new(AggCall::new(
107 AggType::Builtin(PbAggKind::Min),
108 vec![epoch_expr],
109 false,
110 OrderBy::any(),
111 Condition::true_cond(),
112 vec![],
113 )?));
114 let aggregated_current_row_count = ExprImpl::AggCall(Box::new(AggCall::new(
115 AggType::Builtin(PbAggKind::Sum),
116 vec![ExprImpl::InputRef(Box::new(InputRef {
117 index: backfill_info.row_count_column_index,
118 data_type: DataType::Int64,
119 }))],
120 false,
121 OrderBy::any(),
122 Condition::true_cond(),
123 vec![],
124 )?));
125 let select_exprs = vec![aggregated_current_row_count, aggregated_min_epoch];
126 let group_by = GroupBy::GroupKey(vec![]);
127 let (agg, _, _) = LogicalAgg::create(select_exprs, group_by, None, scan.into())?;
128 Ok(agg)
129 }
130
131 fn build_project(backfill_info: &BackfillInfo, agg: PlanRef) -> anyhow::Result<LogicalProject> {
132 let job_id_expr = Self::build_u32_expr(backfill_info.job_id);
133 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id);
134 let table_id_expr = Self::build_u32_expr(backfill_info.table_id);
135
136 let current_count_per_vnode = ExprImpl::InputRef(Box::new(InputRef {
137 index: 0,
138 data_type: DataType::Decimal,
139 }))
140 .cast_explicit(DataType::Int64)?;
141 let min_epoch = ExprImpl::InputRef(Box::new(InputRef {
142 index: 1,
143 data_type: DataType::Int64,
144 }));
145
146 Ok(LogicalProject::new(
147 agg,
148 vec![
149 job_id_expr,
150 fragment_id_expr,
151 table_id_expr,
152 current_count_per_vnode,
153 min_epoch,
154 ],
155 ))
156 }
157
158 fn build_u32_expr(id: u32) -> ExprImpl {
159 ExprImpl::Literal(Box::new(Literal::new(
160 Some(ScalarImpl::Int32(id as i32)),
161 DataType::Int32,
162 )))
163 }
164}
165
166fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
167 reader
168 .iter_backfilling_internal_tables()
169 .filter(|table| is_backfill_table(&table.name))
170 .cloned()
171 .collect_vec()
172}
173
174impl TableFunctionToInternalBackfillProgressRule {
175 pub fn create() -> BoxedRule {
176 Box::new(TableFunctionToInternalBackfillProgressRule {})
177 }
178}
179
180struct BackfillInfo {
181 job_id: u32,
182 fragment_id: u32,
183 table_id: u32,
184 row_count_column_index: usize,
185 epoch_column_index: Option<usize>,
186}
187
188impl BackfillInfo {
189 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
190 let Some(job_id) = table.job_id.map(|id| id.table_id) else {
191 bail!("`job_id` column not found in backfill table");
192 };
193 let Some(row_count_column_index) =
194 table.columns.iter().position(|c| c.name() == "row_count")
195 else {
196 bail!("`row_count` column not found in backfill table");
197 };
198 let epoch_column_index = table.columns.iter().position(|c| c.name() == "epoch");
199 let fragment_id = table.fragment_id;
200 let table_id = table.id.table_id;
201
202 Ok(Self {
203 job_id,
204 fragment_id,
205 table_id,
206 row_count_column_index,
207 epoch_column_index,
208 })
209 }
210}