risingwave_frontend/optimizer/rule/
table_function_to_internal_backfill_progress.rs1use std::rc::Rc;
16use std::sync::Arc;
17
18use anyhow::bail;
19use itertools::Itertools;
20use risingwave_common::catalog::{Field, Schema, is_backfill_table};
21use risingwave_common::id::{FragmentId, JobId, TableId};
22use risingwave_common::types::{DataType, ScalarImpl};
23use risingwave_expr::aggregate::AggType;
24pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
25
26use super::prelude::{PlanRef, *};
27use crate::TableCatalog;
28use crate::catalog::catalog_service::CatalogReadGuard;
29use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
30use crate::optimizer::OptimizerContext;
31use crate::optimizer::plan_node::generic::GenericPlanRef;
32use crate::optimizer::plan_node::{
33 Logical, LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion,
34 LogicalValues, StreamTableScan,
35};
36use crate::optimizer::rule::{ApplyResult, FallibleRule};
37use crate::utils::{Condition, GroupBy};
38
39pub struct TableFunctionToInternalBackfillProgressRule {}
43impl FallibleRule<Logical> for TableFunctionToInternalBackfillProgressRule {
44 fn apply(&self, plan: PlanRef) -> ApplyResult<PlanRef> {
45 let logical_table_function: &LogicalTableFunction = plan.as_logical_table_function()?;
46 if logical_table_function.table_function.function_type
47 != TableFunctionType::InternalBackfillProgress
48 {
49 return ApplyResult::NotApplicable;
50 }
51
52 let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
53 let backfilling_tables = get_backfilling_tables(reader);
54 let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
55 ApplyResult::Ok(plan)
56 }
57}
58
59impl TableFunctionToInternalBackfillProgressRule {
60 fn build_plan(
61 ctx: Rc<OptimizerContext>,
62 backfilling_tables: Vec<Arc<TableCatalog>>,
63 ) -> anyhow::Result<PlanRef> {
64 if backfilling_tables.is_empty() {
65 let fields = vec![
66 Field::new("job_id", DataType::Int32),
67 Field::new("fragment_id", DataType::Int32),
68 Field::new("backfill_state_table_id", DataType::Int32),
69 Field::new("current_row_count", DataType::Int64),
70 Field::new("min_epoch", DataType::Int64),
71 ];
72 let plan = LogicalValues::new(vec![], Schema::new(fields), ctx);
73 return Ok(plan.into());
74 }
75
76 let mut all_progress = Vec::with_capacity(backfilling_tables.len());
77 for table in backfilling_tables {
78 let backfill_info = BackfillInfo::new(&table)?;
79
80 let scan = Self::build_scan(ctx.clone(), table);
81 let agg = Self::build_agg(&backfill_info, scan)?;
82 let project = Self::build_project(&backfill_info, agg)?;
83
84 all_progress.push(project.into());
85 }
86 Ok(LogicalUnion::new(true, all_progress).into())
87 }
88
89 fn build_scan(ctx: Rc<OptimizerContext>, table: Arc<TableCatalog>) -> LogicalScan {
90 LogicalScan::create(table, ctx, None)
91 }
92
93 fn build_agg(backfill_info: &BackfillInfo, scan: LogicalScan) -> anyhow::Result<PlanRef> {
94 let epoch_expr = match backfill_info.epoch_column_index {
95 Some(epoch_column_index) => ExprImpl::InputRef(Box::new(InputRef {
96 index: epoch_column_index,
97 data_type: DataType::Int64,
98 })),
99 None => ExprImpl::Literal(Box::new(Literal::new(None, DataType::Int64))),
100 };
101 let aggregated_min_epoch = ExprImpl::AggCall(Box::new(AggCall::new(
102 AggType::Builtin(PbAggKind::Min),
103 vec![epoch_expr],
104 false,
105 OrderBy::any(),
106 Condition::true_cond(),
107 vec![],
108 )?));
109 let aggregated_current_row_count = ExprImpl::AggCall(Box::new(AggCall::new(
110 AggType::Builtin(PbAggKind::Sum),
111 vec![ExprImpl::InputRef(Box::new(InputRef {
112 index: backfill_info.row_count_column_index,
113 data_type: DataType::Int64,
114 }))],
115 false,
116 OrderBy::any(),
117 Condition::true_cond(),
118 vec![],
119 )?));
120 let select_exprs = vec![aggregated_current_row_count, aggregated_min_epoch];
121 let group_by = GroupBy::GroupKey(vec![]);
122 let (agg, _, _) = LogicalAgg::create(select_exprs, group_by, None, scan.into())?;
123 Ok(agg)
124 }
125
126 fn build_project(backfill_info: &BackfillInfo, agg: PlanRef) -> anyhow::Result<LogicalProject> {
127 let job_id_expr = Self::build_u32_expr(backfill_info.job_id.as_raw_id());
128 let fragment_id_expr = Self::build_u32_expr(backfill_info.fragment_id.as_raw_id());
129 let table_id_expr = Self::build_u32_expr(backfill_info.table_id.as_raw_id());
130
131 let current_count_per_vnode = ExprImpl::InputRef(Box::new(InputRef {
132 index: 0,
133 data_type: DataType::Decimal,
134 }))
135 .cast_explicit(&DataType::Int64)?;
136 let min_epoch = ExprImpl::InputRef(Box::new(InputRef {
137 index: 1,
138 data_type: DataType::Int64,
139 }));
140
141 Ok(LogicalProject::new(
142 agg,
143 vec![
144 job_id_expr,
145 fragment_id_expr,
146 table_id_expr,
147 current_count_per_vnode,
148 min_epoch,
149 ],
150 ))
151 }
152
153 fn build_u32_expr(id: u32) -> ExprImpl {
154 ExprImpl::Literal(Box::new(Literal::new(
155 Some(ScalarImpl::Int32(id as i32)),
156 DataType::Int32,
157 )))
158 }
159}
160
161fn get_backfilling_tables(reader: CatalogReadGuard) -> Vec<Arc<TableCatalog>> {
162 reader
163 .iter_backfilling_internal_tables()
164 .filter(|table| is_backfill_table(&table.name))
165 .cloned()
166 .collect_vec()
167}
168
169impl TableFunctionToInternalBackfillProgressRule {
170 pub fn create() -> BoxedRule {
171 Box::new(TableFunctionToInternalBackfillProgressRule {})
172 }
173}
174
175struct BackfillInfo {
176 job_id: JobId,
177 fragment_id: FragmentId,
178 table_id: TableId,
179 row_count_column_index: usize,
180 epoch_column_index: Option<usize>,
181}
182
183impl BackfillInfo {
184 fn new(table: &TableCatalog) -> anyhow::Result<Self> {
185 let Some(job_id) = table.job_id else {
186 bail!("`job_id` column not found in backfill table");
187 };
188 let Some(row_count_column_index) = table
189 .columns
190 .iter()
191 .position(|c| c.name() == StreamTableScan::ROW_COUNT_COLUMN_NAME)
192 else {
193 bail!(
194 "`{}` column not found in backfill table",
195 StreamTableScan::ROW_COUNT_COLUMN_NAME
196 );
197 };
198 let epoch_column_index = table
199 .columns
200 .iter()
201 .position(|c| c.name() == StreamTableScan::EPOCH_COLUMN_NAME);
202 let fragment_id = table.fragment_id;
203 let table_id = table.id;
204
205 Ok(Self {
206 job_id,
207 fragment_id,
208 table_id,
209 row_count_column_index,
210 epoch_column_index,
211 })
212 }
213}