risingwave_frontend/optimizer/
backfill_order_strategy.rs1use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::error::Result;
19use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
20use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
21use crate::optimizer::plan_node::StreamPlanRef;
22use crate::session::SessionImpl;
23
24type ObjectId = u32;
25
26pub mod auto {
27 use std::collections::{HashMap, HashSet};
28
29 use risingwave_pb::common::Uint32Vector;
30
31 use super::ObjectId;
32 use crate::optimizer::backfill_order_strategy::common::has_cycle;
33 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
34 use crate::session::SessionImpl;
35
36 #[derive(Debug)]
37 pub(super) enum BackfillTreeNode {
38 Join {
39 lhs: Box<BackfillTreeNode>,
40 rhs: Box<BackfillTreeNode>,
41 },
42 Scan {
43 id: ObjectId,
44 },
45 Union {
46 children: Vec<BackfillTreeNode>,
47 },
48 Ignored,
49 }
50
51 fn plan_graph_to_backfill_tree(
53 session: &SessionImpl,
54 plan: StreamPlanRef,
55 ) -> Option<BackfillTreeNode> {
56 match plan.node_type() {
57 StreamPlanNodeType::StreamHashJoin => {
58 assert_eq!(plan.inputs().len(), 2);
59 let mut inputs = plan.inputs().into_iter();
60 let l = inputs.next().unwrap();
61 let r = inputs.next().unwrap();
62 Some(BackfillTreeNode::Join {
63 lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
64 rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
65 })
66 }
67 StreamPlanNodeType::StreamTableScan => {
68 let table_scan = plan.as_stream_table_scan().expect("table scan");
69 let relation_id = table_scan.core().table_catalog.id().as_raw_id();
70 Some(BackfillTreeNode::Scan { id: relation_id })
71 }
72 StreamPlanNodeType::StreamSourceScan => {
73 let source_scan = plan.as_stream_source_scan().expect("source scan");
74 let relation_id = source_scan.source_catalog().id.as_raw_id();
75 Some(BackfillTreeNode::Scan { id: relation_id })
76 }
77 StreamPlanNodeType::StreamUnion => {
78 let inputs = plan.inputs();
79 let mut children = Vec::with_capacity(inputs.len());
80 for child in inputs {
81 let subtree = plan_graph_to_backfill_tree(session, child)?;
82 if matches!(subtree, BackfillTreeNode::Ignored) {
83 continue;
84 }
85 children.push(subtree);
86 }
87 Some(BackfillTreeNode::Union { children })
88 }
89 node_type => {
90 let inputs = plan.inputs();
91 match inputs.len() {
92 0 => Some(BackfillTreeNode::Ignored),
93 1 => {
94 let mut inputs = inputs.into_iter();
95 let child = inputs.next().unwrap();
96 plan_graph_to_backfill_tree(session, child)
97 }
98 _ => {
99 session.notice_to_user(format!(
100 "Backfill order strategy is not supported for {:?}",
101 node_type
102 ));
103 None
104 }
105 }
106 }
107 }
108 }
109
110 fn fold_backfill_tree_to_partial_order(
164 tree: BackfillTreeNode,
165 ) -> HashMap<ObjectId, Uint32Vector> {
166 let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
167
168 fn traverse_backfill_tree(
171 tree: BackfillTreeNode,
172 order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
173 is_leftmost_child: bool,
174 mut prior_terminal_nodes: HashSet<ObjectId>,
175 ) -> HashSet<ObjectId> {
176 match tree {
177 BackfillTreeNode::Ignored => HashSet::new(),
178 BackfillTreeNode::Scan { id } => {
179 if is_leftmost_child {
180 for prior_terminal_node in prior_terminal_nodes {
181 order.entry(prior_terminal_node).or_default().insert(id);
182 }
183 }
184 HashSet::from([id])
185 }
186 BackfillTreeNode::Union { children } => {
187 let mut terminal_nodes = HashSet::new();
188 for child in children {
189 let child_terminal_nodes = traverse_backfill_tree(
190 child,
191 order,
192 is_leftmost_child,
193 prior_terminal_nodes.clone(),
194 );
195 terminal_nodes.extend(child_terminal_nodes);
196 }
197 terminal_nodes
198 }
199 BackfillTreeNode::Join { lhs, rhs } => {
200 let rhs_terminal_nodes =
201 traverse_backfill_tree(*rhs, order, false, HashSet::new());
202 prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
203 traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
204 }
205 }
206 }
207
208 traverse_backfill_tree(tree, &mut order, false, HashSet::new());
209
210 order
211 .into_iter()
212 .map(|(k, v)| {
213 let data = v.into_iter().collect();
214 (k, Uint32Vector { data })
215 })
216 .collect()
217 }
218
219 pub(super) fn plan_auto_strategy(
220 session: &SessionImpl,
221 plan: StreamPlanRef,
222 ) -> HashMap<ObjectId, Uint32Vector> {
223 if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
224 let order = fold_backfill_tree_to_partial_order(tree);
225 if has_cycle(&order) {
226 tracing::warn!(?order, "Backfill order strategy has a cycle");
227 session.notice_to_user("Backfill order strategy has a cycle");
228 return Default::default();
229 }
230 return order;
231 }
232 Default::default()
233 }
234}
235
236mod fixed {
237 use std::collections::{HashMap, HashSet};
238
239 use risingwave_common::bail;
240 use risingwave_pb::common::Uint32Vector;
241 use risingwave_sqlparser::ast::ObjectName;
242
243 use super::ObjectId;
244 use crate::error::Result;
245 use crate::optimizer::backfill_order_strategy::common::{
246 bind_backfill_relation_id_by_name, has_cycle,
247 };
248 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
249 use crate::session::SessionImpl;
250
251 fn collect_scanned_relation_ids(plan: StreamPlanRef) -> HashSet<ObjectId> {
253 let mut relation_ids = HashSet::new();
254
255 fn visit(plan: StreamPlanRef, relation_ids: &mut HashSet<ObjectId>) {
256 match plan.node_type() {
257 StreamPlanNodeType::StreamTableScan => {
258 let table_scan = plan.as_stream_table_scan().expect("table scan");
259 let relation_id = table_scan.core().table_catalog.id().as_raw_id();
260 relation_ids.insert(relation_id);
261 }
262 StreamPlanNodeType::StreamSourceScan => {
263 let source_scan = plan.as_stream_source_scan().expect("source scan");
264 let relation_id = source_scan.source_catalog().id.as_raw_id();
265 relation_ids.insert(relation_id);
266 }
267 _ => {}
268 }
269
270 for child in plan.inputs() {
272 visit(child, relation_ids);
273 }
274 }
275
276 visit(plan, &mut relation_ids);
277 relation_ids
278 }
279
280 pub(super) fn plan_fixed_strategy(
281 session: &SessionImpl,
282 orders: Vec<(ObjectName, ObjectName)>,
283 plan: StreamPlanRef,
284 ) -> Result<HashMap<ObjectId, Uint32Vector>> {
285 let scanned_relation_ids = collect_scanned_relation_ids(plan);
287
288 let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
289 for (start_name, end_name) in orders {
290 let start_relation_id = bind_backfill_relation_id_by_name(session, start_name.clone())?;
291 let end_relation_id = bind_backfill_relation_id_by_name(session, end_name.clone())?;
292
293 if !scanned_relation_ids.contains(&start_relation_id) {
295 bail!(
296 "Table or source '{}' specified in backfill_order is not used in the query",
297 start_name
298 );
299 }
300 if !scanned_relation_ids.contains(&end_relation_id) {
301 bail!(
302 "Table or source '{}' specified in backfill_order is not used in the query",
303 end_name
304 );
305 }
306
307 order
308 .entry(start_relation_id)
309 .or_default()
310 .data
311 .push(end_relation_id);
312 }
313 if has_cycle(&order) {
314 bail!("Backfill order strategy has a cycle");
315 }
316 Ok(order)
317 }
318}
319
320mod common {
321 use std::collections::{HashMap, HashSet};
322
323 use risingwave_pb::common::Uint32Vector;
324 use risingwave_sqlparser::ast::ObjectName;
325
326 use super::ObjectId;
327 use crate::Binder;
328 use crate::catalog::CatalogError;
329 use crate::catalog::root_catalog::SchemaPath;
330 use crate::catalog::schema_catalog::SchemaCatalog;
331 use crate::error::Result;
332 use crate::session::SessionImpl;
333
334 pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
336 fn dfs(
337 node: ObjectId,
338 order: &HashMap<ObjectId, Uint32Vector>,
339 visited: &mut HashSet<ObjectId>,
340 stack: &mut HashSet<ObjectId>,
341 ) -> bool {
342 if stack.contains(&node) {
343 return true; }
345
346 if visited.insert(node) {
347 stack.insert(node);
348 if let Some(downstreams) = order.get(&node) {
349 for neighbor in &downstreams.data {
350 if dfs(*neighbor, order, visited, stack) {
351 return true;
352 }
353 }
354 }
355 stack.remove(&node);
356 }
357 false
358 }
359
360 let mut visited = HashSet::new();
361 let mut stack = HashSet::new();
362 for &start in order.keys() {
363 if dfs(start, order, &mut visited, &mut stack) {
364 return true;
365 }
366 }
367
368 false
369 }
370
371 pub(super) fn bind_backfill_relation_id_by_name(
372 session: &SessionImpl,
373 name: ObjectName,
374 ) -> Result<ObjectId> {
375 let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
376 let db_name = db_name.unwrap_or(session.database());
377
378 let reader = session.env().catalog_reader().read_guard();
379
380 match schema_name {
381 Some(name) => {
382 let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
383 bind_table(schema_catalog, &rel_name)
384 }
385 None => {
386 let search_path = session.config().search_path();
387 let user_name = session.user_name();
388 let schema_path = SchemaPath::Path(&search_path, &user_name);
389 let result: crate::error::Result<Option<(ObjectId, &str)>> =
390 schema_path.try_find(|schema_name| {
391 if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
392 && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
393 {
394 Ok(Some(relation_id))
395 } else {
396 Ok(None)
397 }
398 });
399 if let Some((relation_id, _schema_name)) = result? {
400 return Ok(relation_id);
401 }
402 Err(CatalogError::NotFound("table", rel_name.clone()).into())
403 }
404 }
405 }
406
407 fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
408 if let Some(table) = schema_catalog.get_created_table_by_name(name) {
409 Ok(table.id().as_raw_id())
410 } else if let Some(source) = schema_catalog.get_source_by_name(name) {
411 Ok(source.id.as_raw_id())
412 } else {
413 Err(CatalogError::NotFound("table or source", name.to_owned()).into())
414 }
415 }
416}
417
418pub mod display {
419 use risingwave_pb::stream_plan::BackfillOrder;
420
421 use super::ObjectId;
422 use crate::session::SessionImpl;
423
424 fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
425 let catalog_reader = session.env().catalog_reader().read_guard();
426 let table_catalog = catalog_reader.get_any_table_by_id(id.into())?;
427 let table_name = table_catalog.name();
428 let db_id = table_catalog.database_id;
429 let schema_id = table_catalog.schema_id;
430 let schema_catalog = catalog_reader.get_schema_by_id(db_id, schema_id)?;
431 let schema_name = schema_catalog.name();
432 let name = format!("{}.{}", schema_name, table_name);
433 Ok(name)
434 }
435
436 pub(super) fn print_backfill_order_in_dot_format(
437 session: &SessionImpl,
438 order: BackfillOrder,
439 ) -> crate::error::Result<String> {
440 let mut result = String::new();
441 result.push_str("digraph G {\n");
442 let mut edges = vec![];
445 for (start, end) in order.order {
446 let start_name = get_table_name(session, start)?;
447 for end in end.data {
448 let end_name = get_table_name(session, end)?;
449 edges.push(format!(" \"{}\" -> \"{}\";\n", start_name, end_name));
450 }
451 }
452 edges.sort();
453 for edge in edges {
454 result.push_str(&edge);
455 }
456 result.push_str("}\n");
457 Ok(result)
458 }
459}
460
461pub fn plan_backfill_order(
470 session: &SessionImpl,
471 backfill_order_strategy: BackfillOrderStrategy,
472 plan: StreamPlanRef,
473) -> Result<BackfillOrder> {
474 let order = match backfill_order_strategy {
475 BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
476 BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
477 BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders, plan)?,
478 };
479 Ok(BackfillOrder { order })
480}
481
482pub fn explain_backfill_order_in_dot_format(
484 session: &SessionImpl,
485 backfill_order_strategy: BackfillOrderStrategy,
486 plan: StreamPlanRef,
487) -> Result<String> {
488 let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
489 let dot_formatted_backfill_order = display::print_backfill_order_in_dot_format(session, order)?;
490 Ok(dot_formatted_backfill_order)
491}