risingwave_frontend/optimizer/
backfill_order_strategy.rs1use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::error::Result;
19use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
20use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
21use crate::optimizer::plan_node::StreamPlanRef;
22use crate::session::SessionImpl;
23
24pub mod auto {
25 use std::collections::{HashMap, HashSet};
26
27 use risingwave_common::catalog::ObjectId;
28 use risingwave_pb::common::Uint32Vector;
29
30 use crate::optimizer::backfill_order_strategy::common::has_cycle;
31 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
32 use crate::session::SessionImpl;
33
34 #[derive(Debug)]
35 pub enum BackfillTreeNode {
36 Join {
37 lhs: Box<BackfillTreeNode>,
38 rhs: Box<BackfillTreeNode>,
39 },
40 Scan {
41 id: ObjectId,
42 },
43 Union {
44 children: Vec<BackfillTreeNode>,
45 },
46 Ignored,
47 }
48
49 fn plan_graph_to_backfill_tree(
51 session: &SessionImpl,
52 plan: StreamPlanRef,
53 ) -> Option<BackfillTreeNode> {
54 match plan.node_type() {
55 StreamPlanNodeType::StreamHashJoin => {
56 assert_eq!(plan.inputs().len(), 2);
57 let mut inputs = plan.inputs().into_iter();
58 let l = inputs.next().unwrap();
59 let r = inputs.next().unwrap();
60 Some(BackfillTreeNode::Join {
61 lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
62 rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
63 })
64 }
65 StreamPlanNodeType::StreamTableScan => {
66 let table_scan = plan.as_stream_table_scan().expect("table scan");
67 let relation_id = table_scan.core().table_catalog.id().into();
68 Some(BackfillTreeNode::Scan { id: relation_id })
69 }
70 StreamPlanNodeType::StreamSourceScan => {
71 let source_scan = plan.as_stream_source_scan().expect("source scan");
72 let relation_id = source_scan.source_catalog().id;
73 Some(BackfillTreeNode::Scan { id: relation_id })
74 }
75 StreamPlanNodeType::StreamUnion => {
76 let inputs = plan.inputs();
77 let mut children = Vec::with_capacity(inputs.len());
78 for child in inputs {
79 let subtree = plan_graph_to_backfill_tree(session, child)?;
80 if matches!(subtree, BackfillTreeNode::Ignored) {
81 continue;
82 }
83 children.push(subtree);
84 }
85 Some(BackfillTreeNode::Union { children })
86 }
87 node_type => {
88 let inputs = plan.inputs();
89 match inputs.len() {
90 0 => Some(BackfillTreeNode::Ignored),
91 1 => {
92 let mut inputs = inputs.into_iter();
93 let child = inputs.next().unwrap();
94 plan_graph_to_backfill_tree(session, child)
95 }
96 _ => {
97 session.notice_to_user(format!(
98 "Backfill order strategy is not supported for {:?}",
99 node_type
100 ));
101 None
102 }
103 }
104 }
105 }
106 }
107
108 fn fold_backfill_tree_to_partial_order(
162 tree: BackfillTreeNode,
163 ) -> HashMap<ObjectId, Uint32Vector> {
164 let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
165
166 fn traverse_backfill_tree(
169 tree: BackfillTreeNode,
170 order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
171 is_leftmost_child: bool,
172 mut prior_terminal_nodes: HashSet<ObjectId>,
173 ) -> HashSet<ObjectId> {
174 match tree {
175 BackfillTreeNode::Ignored => HashSet::new(),
176 BackfillTreeNode::Scan { id } => {
177 if is_leftmost_child {
178 for prior_terminal_node in prior_terminal_nodes {
179 order.entry(prior_terminal_node).or_default().insert(id);
180 }
181 }
182 HashSet::from([id])
183 }
184 BackfillTreeNode::Union { children } => {
185 let mut terminal_nodes = HashSet::new();
186 for child in children {
187 let child_terminal_nodes = traverse_backfill_tree(
188 child,
189 order,
190 is_leftmost_child,
191 prior_terminal_nodes.clone(),
192 );
193 terminal_nodes.extend(child_terminal_nodes);
194 }
195 terminal_nodes
196 }
197 BackfillTreeNode::Join { lhs, rhs } => {
198 let rhs_terminal_nodes =
199 traverse_backfill_tree(*rhs, order, false, HashSet::new());
200 prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
201 traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
202 }
203 }
204 }
205
206 traverse_backfill_tree(tree, &mut order, false, HashSet::new());
207
208 order
209 .into_iter()
210 .map(|(k, v)| {
211 let data = v.into_iter().collect();
212 (k, Uint32Vector { data })
213 })
214 .collect()
215 }
216
217 pub(super) fn plan_auto_strategy(
218 session: &SessionImpl,
219 plan: StreamPlanRef,
220 ) -> HashMap<ObjectId, Uint32Vector> {
221 if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
222 let order = fold_backfill_tree_to_partial_order(tree);
223 if has_cycle(&order) {
224 tracing::warn!(?order, "Backfill order strategy has a cycle");
225 session.notice_to_user("Backfill order strategy has a cycle");
226 return Default::default();
227 }
228 return order;
229 }
230 Default::default()
231 }
232}
233
234mod fixed {
235 use std::collections::{HashMap, HashSet};
236
237 use risingwave_common::bail;
238 use risingwave_common::catalog::ObjectId;
239 use risingwave_pb::common::Uint32Vector;
240 use risingwave_sqlparser::ast::ObjectName;
241
242 use crate::error::Result;
243 use crate::optimizer::backfill_order_strategy::common::{
244 bind_backfill_relation_id_by_name, has_cycle,
245 };
246 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
247 use crate::session::SessionImpl;
248
249 fn collect_scanned_relation_ids(plan: StreamPlanRef) -> HashSet<ObjectId> {
251 let mut relation_ids = HashSet::new();
252
253 fn visit(plan: StreamPlanRef, relation_ids: &mut HashSet<ObjectId>) {
254 match plan.node_type() {
255 StreamPlanNodeType::StreamTableScan => {
256 let table_scan = plan.as_stream_table_scan().expect("table scan");
257 let relation_id = table_scan.core().table_catalog.id().into();
258 relation_ids.insert(relation_id);
259 }
260 StreamPlanNodeType::StreamSourceScan => {
261 let source_scan = plan.as_stream_source_scan().expect("source scan");
262 let relation_id = source_scan.source_catalog().id;
263 relation_ids.insert(relation_id);
264 }
265 _ => {}
266 }
267
268 for child in plan.inputs() {
270 visit(child, relation_ids);
271 }
272 }
273
274 visit(plan, &mut relation_ids);
275 relation_ids
276 }
277
278 pub(super) fn plan_fixed_strategy(
279 session: &SessionImpl,
280 orders: Vec<(ObjectName, ObjectName)>,
281 plan: StreamPlanRef,
282 ) -> Result<HashMap<ObjectId, Uint32Vector>> {
283 let scanned_relation_ids = collect_scanned_relation_ids(plan);
285
286 let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
287 for (start_name, end_name) in orders {
288 let start_relation_id = bind_backfill_relation_id_by_name(session, start_name.clone())?;
289 let end_relation_id = bind_backfill_relation_id_by_name(session, end_name.clone())?;
290
291 if !scanned_relation_ids.contains(&start_relation_id) {
293 bail!(
294 "Table or source '{}' specified in backfill_order is not used in the query",
295 start_name
296 );
297 }
298 if !scanned_relation_ids.contains(&end_relation_id) {
299 bail!(
300 "Table or source '{}' specified in backfill_order is not used in the query",
301 end_name
302 );
303 }
304
305 order
306 .entry(start_relation_id)
307 .or_default()
308 .data
309 .push(end_relation_id);
310 }
311 if has_cycle(&order) {
312 bail!("Backfill order strategy has a cycle");
313 }
314 Ok(order)
315 }
316}
317
318mod common {
319 use std::collections::{HashMap, HashSet};
320
321 use risingwave_common::catalog::ObjectId;
322 use risingwave_pb::common::Uint32Vector;
323 use risingwave_sqlparser::ast::ObjectName;
324
325 use crate::Binder;
326 use crate::catalog::CatalogError;
327 use crate::catalog::root_catalog::SchemaPath;
328 use crate::catalog::schema_catalog::SchemaCatalog;
329 use crate::error::Result;
330 use crate::session::SessionImpl;
331
332 pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
334 fn dfs(
335 node: ObjectId,
336 order: &HashMap<ObjectId, Uint32Vector>,
337 visited: &mut HashSet<ObjectId>,
338 stack: &mut HashSet<ObjectId>,
339 ) -> bool {
340 if stack.contains(&node) {
341 return true; }
343
344 if visited.insert(node) {
345 stack.insert(node);
346 if let Some(downstreams) = order.get(&node) {
347 for neighbor in &downstreams.data {
348 if dfs(*neighbor, order, visited, stack) {
349 return true;
350 }
351 }
352 }
353 stack.remove(&node);
354 }
355 false
356 }
357
358 let mut visited = HashSet::new();
359 let mut stack = HashSet::new();
360 for &start in order.keys() {
361 if dfs(start, order, &mut visited, &mut stack) {
362 return true;
363 }
364 }
365
366 false
367 }
368
369 pub(super) fn bind_backfill_relation_id_by_name(
370 session: &SessionImpl,
371 name: ObjectName,
372 ) -> Result<ObjectId> {
373 let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
374 let db_name = db_name.unwrap_or(session.database());
375
376 let reader = session.env().catalog_reader().read_guard();
377
378 match schema_name {
379 Some(name) => {
380 let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
381 bind_table(schema_catalog, &rel_name)
382 }
383 None => {
384 let search_path = session.config().search_path();
385 let user_name = session.user_name();
386 let schema_path = SchemaPath::Path(&search_path, &user_name);
387 let result: crate::error::Result<Option<(ObjectId, &str)>> =
388 schema_path.try_find(|schema_name| {
389 if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
390 && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
391 {
392 Ok(Some(relation_id))
393 } else {
394 Ok(None)
395 }
396 });
397 if let Some((relation_id, _schema_name)) = result? {
398 return Ok(relation_id);
399 }
400 Err(CatalogError::NotFound("table", rel_name.clone()).into())
401 }
402 }
403 }
404
405 fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
406 if let Some(table) = schema_catalog.get_created_table_by_name(name) {
407 Ok(table.id().as_raw_id())
408 } else if let Some(source) = schema_catalog.get_source_by_name(name) {
409 Ok(source.id)
410 } else {
411 Err(CatalogError::NotFound("table or source", name.to_owned()).into())
412 }
413 }
414}
415
416pub mod display {
417 use risingwave_common::catalog::ObjectId;
418 use risingwave_pb::stream_plan::BackfillOrder;
419
420 use crate::session::SessionImpl;
421
422 fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
423 let catalog_reader = session.env().catalog_reader().read_guard();
424 let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
425 let table_name = table_catalog.name();
426 let db_id = table_catalog.database_id;
427 let schema_id = table_catalog.schema_id;
428 let schema_catalog = catalog_reader.get_schema_by_id(db_id, schema_id)?;
429 let schema_name = schema_catalog.name();
430 let name = format!("{}.{}", schema_name, table_name);
431 Ok(name)
432 }
433
434 pub(super) fn print_backfill_order_in_dot_format(
435 session: &SessionImpl,
436 order: BackfillOrder,
437 ) -> crate::error::Result<String> {
438 let mut result = String::new();
439 result.push_str("digraph G {\n");
440 let mut edges = vec![];
443 for (start, end) in order.order {
444 let start_name = get_table_name(session, start)?;
445 for end in end.data {
446 let end_name = get_table_name(session, end)?;
447 edges.push(format!(" \"{}\" -> \"{}\";\n", start_name, end_name));
448 }
449 }
450 edges.sort();
451 for edge in edges {
452 result.push_str(&edge);
453 }
454 result.push_str("}\n");
455 Ok(result)
456 }
457}
458
459pub fn plan_backfill_order(
468 session: &SessionImpl,
469 backfill_order_strategy: BackfillOrderStrategy,
470 plan: StreamPlanRef,
471) -> Result<BackfillOrder> {
472 let order = match backfill_order_strategy {
473 BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
474 BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
475 BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders, plan)?,
476 };
477 Ok(BackfillOrder { order })
478}
479
480pub fn explain_backfill_order_in_dot_format(
482 session: &SessionImpl,
483 backfill_order_strategy: BackfillOrderStrategy,
484 plan: StreamPlanRef,
485) -> Result<String> {
486 let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
487 let dot_formatted_backfill_order = display::print_backfill_order_in_dot_format(session, order)?;
488 Ok(dot_formatted_backfill_order)
489}