risingwave_frontend/optimizer/
backfill_order_strategy.rs1use risingwave_pb::common::Uint32Vector;
16use risingwave_pb::id::RelationId;
17use risingwave_pb::stream_plan::BackfillOrder;
18use risingwave_sqlparser::ast::BackfillOrderStrategy;
19
20use crate::error::Result;
21use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
22use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
23use crate::optimizer::plan_node::StreamPlanRef;
24use crate::session::SessionImpl;
25
26pub mod auto {
27 use std::collections::{HashMap, HashSet};
28
29 use risingwave_pb::id::RelationId;
30
31 use crate::optimizer::backfill_order_strategy::common::has_cycle;
32 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
33 use crate::session::SessionImpl;
34
35 #[derive(Debug)]
36 pub(super) enum BackfillTreeNode {
37 Join {
38 lhs: Box<BackfillTreeNode>,
39 rhs: Box<BackfillTreeNode>,
40 },
41 Scan {
42 id: RelationId,
43 },
44 Union {
45 children: Vec<BackfillTreeNode>,
46 },
47 Ignored,
48 }
49
50 fn plan_graph_to_backfill_tree(
52 session: &SessionImpl,
53 plan: StreamPlanRef,
54 ) -> Option<BackfillTreeNode> {
55 match plan.node_type() {
56 StreamPlanNodeType::StreamHashJoin => {
57 assert_eq!(plan.inputs().len(), 2);
58 let mut inputs = plan.inputs().into_iter();
59 let l = inputs.next().unwrap();
60 let r = inputs.next().unwrap();
61 Some(BackfillTreeNode::Join {
62 lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
63 rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
64 })
65 }
66 StreamPlanNodeType::StreamTableScan => {
67 let table_scan = plan.as_stream_table_scan().expect("table scan");
68 let relation_id = table_scan.core().table_catalog.id().as_relation_id();
69 Some(BackfillTreeNode::Scan { id: relation_id })
70 }
71 StreamPlanNodeType::StreamSourceScan => {
72 let source_scan = plan.as_stream_source_scan().expect("source scan");
73 let relation_id = source_scan.source_catalog().id.as_relation_id();
74 Some(BackfillTreeNode::Scan { id: relation_id })
75 }
76 StreamPlanNodeType::StreamUnion => {
77 let inputs = plan.inputs();
78 let mut children = Vec::with_capacity(inputs.len());
79 for child in inputs {
80 let subtree = plan_graph_to_backfill_tree(session, child)?;
81 if matches!(subtree, BackfillTreeNode::Ignored) {
82 continue;
83 }
84 children.push(subtree);
85 }
86 Some(BackfillTreeNode::Union { children })
87 }
88 node_type => {
89 let inputs = plan.inputs();
90 match inputs.len() {
91 0 => Some(BackfillTreeNode::Ignored),
92 1 => {
93 let mut inputs = inputs.into_iter();
94 let child = inputs.next().unwrap();
95 plan_graph_to_backfill_tree(session, child)
96 }
97 _ => {
98 session.notice_to_user(format!(
99 "Backfill order strategy is not supported for {:?}",
100 node_type
101 ));
102 None
103 }
104 }
105 }
106 }
107 }
108
109 fn fold_backfill_tree_to_partial_order(
163 tree: BackfillTreeNode,
164 ) -> HashMap<RelationId, HashSet<RelationId>> {
165 let mut order: HashMap<RelationId, HashSet<RelationId>> = HashMap::new();
166
167 fn traverse_backfill_tree(
170 tree: BackfillTreeNode,
171 order: &mut HashMap<RelationId, HashSet<RelationId>>,
172 is_leftmost_child: bool,
173 mut prior_terminal_nodes: HashSet<RelationId>,
174 ) -> HashSet<RelationId> {
175 match tree {
176 BackfillTreeNode::Ignored => HashSet::new(),
177 BackfillTreeNode::Scan { id } => {
178 if is_leftmost_child {
179 for prior_terminal_node in prior_terminal_nodes {
180 order.entry(prior_terminal_node).or_default().insert(id);
181 }
182 }
183 HashSet::from([id])
184 }
185 BackfillTreeNode::Union { children } => {
186 let mut terminal_nodes = HashSet::new();
187 for child in children {
188 let child_terminal_nodes = traverse_backfill_tree(
189 child,
190 order,
191 is_leftmost_child,
192 prior_terminal_nodes.clone(),
193 );
194 terminal_nodes.extend(child_terminal_nodes);
195 }
196 terminal_nodes
197 }
198 BackfillTreeNode::Join { lhs, rhs } => {
199 let rhs_terminal_nodes =
200 traverse_backfill_tree(*rhs, order, false, HashSet::new());
201 prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
202 traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
203 }
204 }
205 }
206
207 traverse_backfill_tree(tree, &mut order, false, HashSet::new());
208
209 order
210 }
211
212 pub(super) fn plan_auto_strategy(
213 session: &SessionImpl,
214 plan: StreamPlanRef,
215 ) -> HashMap<RelationId, HashSet<RelationId>> {
216 if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
217 let order = fold_backfill_tree_to_partial_order(tree);
218 if has_cycle(&order) {
219 tracing::warn!(?order, "Backfill order strategy has a cycle");
220 session.notice_to_user("Backfill order strategy has a cycle");
221 return Default::default();
222 }
223 return order;
224 }
225 Default::default()
226 }
227}
228
229mod fixed {
230 use std::collections::{HashMap, HashSet};
231
232 use risingwave_common::bail;
233 use risingwave_pb::id::RelationId;
234 use risingwave_sqlparser::ast::ObjectName;
235
236 use crate::error::Result;
237 use crate::optimizer::backfill_order_strategy::common::{
238 bind_backfill_relation_id_by_name, has_cycle,
239 };
240 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
241 use crate::session::SessionImpl;
242
243 fn collect_scanned_relation_ids(plan: StreamPlanRef) -> HashSet<RelationId> {
245 let mut relation_ids = HashSet::new();
246
247 fn visit(plan: StreamPlanRef, relation_ids: &mut HashSet<RelationId>) {
248 match plan.node_type() {
249 StreamPlanNodeType::StreamTableScan => {
250 let table_scan = plan.as_stream_table_scan().expect("table scan");
251 let relation_id = table_scan.core().table_catalog.id().as_relation_id();
252 relation_ids.insert(relation_id);
253 }
254 StreamPlanNodeType::StreamSourceScan => {
255 let source_scan = plan.as_stream_source_scan().expect("source scan");
256 let relation_id = source_scan.source_catalog().id.as_relation_id();
257 relation_ids.insert(relation_id);
258 }
259 _ => {}
260 }
261
262 for child in plan.inputs() {
264 visit(child, relation_ids);
265 }
266 }
267
268 visit(plan, &mut relation_ids);
269 relation_ids
270 }
271
272 pub(super) fn plan_fixed_strategy(
273 session: &SessionImpl,
274 orders: Vec<(ObjectName, ObjectName)>,
275 plan: StreamPlanRef,
276 ) -> Result<HashMap<RelationId, HashSet<RelationId>>> {
277 let scanned_relation_ids = collect_scanned_relation_ids(plan);
279
280 let mut order: HashMap<RelationId, HashSet<RelationId>> = HashMap::new();
281 for (start_name, end_name) in orders {
282 let start_relation_id = bind_backfill_relation_id_by_name(session, start_name.clone())?;
283 let end_relation_id = bind_backfill_relation_id_by_name(session, end_name.clone())?;
284
285 if !scanned_relation_ids.contains(&start_relation_id) {
287 bail!(
288 "Table or source '{}' specified in backfill_order is not used in the query",
289 start_name
290 );
291 }
292 if !scanned_relation_ids.contains(&end_relation_id) {
293 bail!(
294 "Table or source '{}' specified in backfill_order is not used in the query",
295 end_name
296 );
297 }
298
299 order
300 .entry(start_relation_id)
301 .or_default()
302 .insert(end_relation_id);
303 }
304 if has_cycle(&order) {
305 bail!("Backfill order strategy has a cycle");
306 }
307 Ok(order)
308 }
309}
310
311mod common {
312 use std::collections::{HashMap, HashSet};
313
314 use risingwave_pb::id::RelationId;
315 use risingwave_sqlparser::ast::ObjectName;
316
317 use crate::Binder;
318 use crate::catalog::CatalogError;
319 use crate::catalog::root_catalog::SchemaPath;
320 use crate::catalog::schema_catalog::SchemaCatalog;
321 use crate::error::Result;
322 use crate::session::SessionImpl;
323
324 pub(super) fn has_cycle(order: &HashMap<RelationId, HashSet<RelationId>>) -> bool {
326 fn dfs(
327 node: RelationId,
328 order: &HashMap<RelationId, HashSet<RelationId>>,
329 visited: &mut HashSet<RelationId>,
330 stack: &mut HashSet<RelationId>,
331 ) -> bool {
332 if stack.contains(&node) {
333 return true; }
335
336 if visited.insert(node) {
337 stack.insert(node);
338 if let Some(downstreams) = order.get(&node) {
339 for neighbor in downstreams {
340 if dfs(*neighbor, order, visited, stack) {
341 return true;
342 }
343 }
344 }
345 stack.remove(&node);
346 }
347 false
348 }
349
350 let mut visited = HashSet::new();
351 let mut stack = HashSet::new();
352 for &start in order.keys() {
353 if dfs(start, order, &mut visited, &mut stack) {
354 return true;
355 }
356 }
357
358 false
359 }
360
361 pub(super) fn bind_backfill_relation_id_by_name(
362 session: &SessionImpl,
363 name: ObjectName,
364 ) -> Result<RelationId> {
365 let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
366 let db_name = db_name.unwrap_or(session.database());
367
368 let reader = session.env().catalog_reader().read_guard();
369
370 match schema_name {
371 Some(name) => {
372 let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
373 bind_table(schema_catalog, &rel_name)
374 }
375 None => {
376 let search_path = session.config().search_path();
377 let user_name = session.user_name();
378 let schema_path = SchemaPath::Path(&search_path, &user_name);
379 let result: crate::error::Result<Option<(RelationId, &str)>> = schema_path
380 .try_find(|schema_name| {
381 if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
382 && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
383 {
384 Ok(Some(relation_id))
385 } else {
386 Ok(None)
387 }
388 });
389 if let Some((relation_id, _schema_name)) = result? {
390 return Ok(relation_id);
391 }
392 Err(CatalogError::not_found("table", &rel_name).into())
393 }
394 }
395 }
396
397 fn bind_table(
398 schema_catalog: &SchemaCatalog,
399 name: &String,
400 ) -> crate::error::Result<RelationId> {
401 if let Some(table) = schema_catalog.get_created_table_by_name(name) {
402 Ok(table.id().as_relation_id())
403 } else if let Some(source) = schema_catalog.get_source_by_name(name) {
404 Ok(source.id.as_relation_id())
405 } else {
406 Err(CatalogError::not_found("table or source", name).into())
407 }
408 }
409}
410
411pub mod display {
412 use risingwave_pb::stream_plan::BackfillOrder;
413
414 use crate::session::SessionImpl;
415
416 fn get_table_name(session: &SessionImpl, id: u32) -> crate::error::Result<String> {
417 let catalog_reader = session.env().catalog_reader().read_guard();
418 let table_catalog = catalog_reader.get_any_table_by_id(id.into())?;
419 let table_name = table_catalog.name();
420 let db_id = table_catalog.database_id;
421 let schema_id = table_catalog.schema_id;
422 let schema_catalog = catalog_reader.get_schema_by_id(db_id, schema_id)?;
423 let schema_name = schema_catalog.name();
424 let name = format!("{}.{}", schema_name, table_name);
425 Ok(name)
426 }
427
428 pub(super) fn print_backfill_order_in_dot_format(
429 session: &SessionImpl,
430 order: BackfillOrder,
431 ) -> crate::error::Result<String> {
432 let mut result = String::new();
433 result.push_str("digraph G {\n");
434 let mut edges = vec![];
437 for (start, end) in order.order {
438 let start_name = get_table_name(session, start.as_raw_id())?;
439 for end in end.data {
440 let end_name = get_table_name(session, end)?;
441 edges.push(format!(" \"{}\" -> \"{}\";\n", start_name, end_name));
442 }
443 }
444 edges.sort();
445 for edge in edges {
446 result.push_str(&edge);
447 }
448 result.push_str("}\n");
449 Ok(result)
450 }
451}
452
453pub fn plan_backfill_order(
462 session: &SessionImpl,
463 backfill_order_strategy: BackfillOrderStrategy,
464 plan: StreamPlanRef,
465) -> Result<BackfillOrder> {
466 let order = match backfill_order_strategy {
467 BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
468 BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
469 BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders, plan)?,
470 };
471 Ok(BackfillOrder {
472 order: order
473 .into_iter()
474 .map(|(relation_id, dependencies)| {
475 (
476 relation_id,
477 Uint32Vector {
478 data: dependencies
479 .into_iter()
480 .map(RelationId::as_raw_id)
481 .collect(),
482 },
483 )
484 })
485 .collect(),
486 })
487}
488
489pub fn explain_backfill_order_in_dot_format(
491 session: &SessionImpl,
492 backfill_order_strategy: BackfillOrderStrategy,
493 plan: StreamPlanRef,
494) -> Result<String> {
495 let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
496 let dot_formatted_backfill_order = display::print_backfill_order_in_dot_format(session, order)?;
497 Ok(dot_formatted_backfill_order)
498}