risingwave_frontend/optimizer/
backfill_order_strategy.rs1use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::PlanRef;
19use crate::error::Result;
20use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
21use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
22use crate::session::SessionImpl;
23
24pub mod auto {
25 use std::collections::{HashMap, HashSet};
26
27 use risingwave_common::catalog::ObjectId;
28 use risingwave_pb::common::Uint32Vector;
29
30 use crate::PlanRef;
31 use crate::optimizer::PlanNodeType;
32 use crate::optimizer::backfill_order_strategy::common::has_cycle;
33 use crate::session::SessionImpl;
34
35 #[derive(Debug)]
36 pub enum BackfillTreeNode {
37 Join {
38 lhs: Box<BackfillTreeNode>,
39 rhs: Box<BackfillTreeNode>,
40 },
41 Scan {
42 id: ObjectId,
43 },
44 Union {
45 children: Vec<BackfillTreeNode>,
46 },
47 Ignored,
48 }
49
50 fn plan_graph_to_backfill_tree(
52 session: &SessionImpl,
53 plan: PlanRef,
54 ) -> Option<BackfillTreeNode> {
55 match plan.node_type() {
56 PlanNodeType::StreamHashJoin => {
57 assert_eq!(plan.inputs().len(), 2);
58 let mut inputs = plan.inputs().into_iter();
59 let l = inputs.next().unwrap();
60 let r = inputs.next().unwrap();
61 Some(BackfillTreeNode::Join {
62 lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
63 rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
64 })
65 }
66 PlanNodeType::StreamTableScan => {
67 let table_scan = plan.as_stream_table_scan().expect("table scan");
68 let relation_id = table_scan.core().table_catalog.id().into();
69 Some(BackfillTreeNode::Scan { id: relation_id })
70 }
71 PlanNodeType::StreamSourceScan => {
72 let source_scan = plan.as_stream_source_scan().expect("source scan");
73 let relation_id = source_scan.source_catalog().id;
74 Some(BackfillTreeNode::Scan { id: relation_id })
75 }
76 PlanNodeType::StreamUnion => {
77 let inputs = plan.inputs();
78 let mut children = Vec::with_capacity(inputs.len());
79 for child in inputs {
80 let subtree = plan_graph_to_backfill_tree(session, child)?;
81 if matches!(subtree, BackfillTreeNode::Ignored) {
82 continue;
83 }
84 children.push(subtree);
85 }
86 Some(BackfillTreeNode::Union { children })
87 }
88 node_type => {
89 let inputs = plan.inputs();
90 match inputs.len() {
91 0 => Some(BackfillTreeNode::Ignored),
92 1 => {
93 let mut inputs = inputs.into_iter();
94 let child = inputs.next().unwrap();
95 plan_graph_to_backfill_tree(session, child)
96 }
97 _ => {
98 session.notice_to_user(format!(
99 "Backfill order strategy is not supported for {:?}",
100 node_type
101 ));
102 None
103 }
104 }
105 }
106 }
107 }
108
109 fn fold_backfill_tree_to_partial_order(
163 tree: BackfillTreeNode,
164 ) -> HashMap<ObjectId, Uint32Vector> {
165 let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
166
167 fn traverse_backfill_tree(
170 tree: BackfillTreeNode,
171 order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
172 is_leftmost_child: bool,
173 mut prior_terminal_nodes: HashSet<ObjectId>,
174 ) -> HashSet<ObjectId> {
175 match tree {
176 BackfillTreeNode::Ignored => HashSet::new(),
177 BackfillTreeNode::Scan { id } => {
178 if is_leftmost_child {
179 for prior_terminal_node in prior_terminal_nodes {
180 order.entry(prior_terminal_node).or_default().insert(id);
181 }
182 }
183 HashSet::from([id])
184 }
185 BackfillTreeNode::Union { children } => {
186 let mut terminal_nodes = HashSet::new();
187 for child in children {
188 let child_terminal_nodes = traverse_backfill_tree(
189 child,
190 order,
191 is_leftmost_child,
192 prior_terminal_nodes.clone(),
193 );
194 terminal_nodes.extend(child_terminal_nodes);
195 }
196 terminal_nodes
197 }
198 BackfillTreeNode::Join { lhs, rhs } => {
199 let rhs_terminal_nodes =
200 traverse_backfill_tree(*rhs, order, false, HashSet::new());
201 prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
202 traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
203 }
204 }
205 }
206
207 traverse_backfill_tree(tree, &mut order, false, HashSet::new());
208
209 order
210 .into_iter()
211 .map(|(k, v)| {
212 let data = v.into_iter().collect();
213 (k, Uint32Vector { data })
214 })
215 .collect()
216 }
217
218 pub(super) fn plan_auto_strategy(
219 session: &SessionImpl,
220 plan: PlanRef,
221 ) -> HashMap<ObjectId, Uint32Vector> {
222 if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
223 let order = fold_backfill_tree_to_partial_order(tree);
224 if has_cycle(&order) {
225 tracing::warn!(?order, "Backfill order strategy has a cycle");
226 session.notice_to_user("Backfill order strategy has a cycle");
227 return Default::default();
228 }
229 return order;
230 }
231 Default::default()
232 }
233}
234
235mod fixed {
236 use std::collections::HashMap;
237
238 use risingwave_common::bail;
239 use risingwave_common::catalog::ObjectId;
240 use risingwave_pb::common::Uint32Vector;
241 use risingwave_sqlparser::ast::ObjectName;
242
243 use crate::error::Result;
244 use crate::optimizer::backfill_order_strategy::common::{
245 bind_backfill_relation_id_by_name, has_cycle,
246 };
247 use crate::session::SessionImpl;
248
249 pub(super) fn plan_fixed_strategy(
250 session: &SessionImpl,
251 orders: Vec<(ObjectName, ObjectName)>,
252 ) -> Result<HashMap<ObjectId, Uint32Vector>> {
253 let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
254 for (start_name, end_name) in orders {
255 let start_relation_id = bind_backfill_relation_id_by_name(session, start_name)?;
256 let end_relation_id = bind_backfill_relation_id_by_name(session, end_name)?;
257 order
258 .entry(start_relation_id)
259 .or_default()
260 .data
261 .push(end_relation_id);
262 }
263 if has_cycle(&order) {
264 bail!("Backfill order strategy has a cycle");
265 }
266 Ok(order)
267 }
268}
269
270mod common {
271 use std::collections::{HashMap, HashSet};
272
273 use risingwave_common::catalog::ObjectId;
274 use risingwave_pb::common::Uint32Vector;
275 use risingwave_sqlparser::ast::ObjectName;
276
277 use crate::Binder;
278 use crate::catalog::CatalogError;
279 use crate::catalog::root_catalog::SchemaPath;
280 use crate::catalog::schema_catalog::SchemaCatalog;
281 use crate::error::Result;
282 use crate::session::SessionImpl;
283
284 pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
286 fn dfs(
287 node: ObjectId,
288 order: &HashMap<ObjectId, Uint32Vector>,
289 visited: &mut HashSet<ObjectId>,
290 stack: &mut HashSet<ObjectId>,
291 ) -> bool {
292 if stack.contains(&node) {
293 return true; }
295
296 if visited.insert(node) {
297 stack.insert(node);
298 if let Some(downstreams) = order.get(&node) {
299 for neighbor in &downstreams.data {
300 if dfs(*neighbor, order, visited, stack) {
301 return true;
302 }
303 }
304 }
305 stack.remove(&node);
306 }
307 false
308 }
309
310 let mut visited = HashSet::new();
311 let mut stack = HashSet::new();
312 for &start in order.keys() {
313 if dfs(start, order, &mut visited, &mut stack) {
314 return true;
315 }
316 }
317
318 false
319 }
320
321 pub(super) fn bind_backfill_relation_id_by_name(
322 session: &SessionImpl,
323 name: ObjectName,
324 ) -> Result<ObjectId> {
325 let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(name)?;
326 let db_name = db_name.unwrap_or(session.database());
327
328 let reader = session.env().catalog_reader().read_guard();
329
330 match schema_name {
331 Some(name) => {
332 let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
333 bind_table(schema_catalog, &rel_name)
334 }
335 None => {
336 let search_path = session.config().search_path();
337 let user_name = session.user_name();
338 let schema_path = SchemaPath::Path(&search_path, &user_name);
339 let result: crate::error::Result<Option<(ObjectId, &str)>> =
340 schema_path.try_find(|schema_name| {
341 if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
342 && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
343 {
344 Ok(Some(relation_id))
345 } else {
346 Ok(None)
347 }
348 });
349 if let Some((relation_id, _schema_name)) = result? {
350 return Ok(relation_id);
351 }
352 Err(CatalogError::NotFound("table", rel_name.to_owned()).into())
353 }
354 }
355 }
356
357 fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
358 if let Some(table) = schema_catalog.get_created_table_by_name(name) {
359 Ok(table.id().table_id)
360 } else if let Some(source) = schema_catalog.get_source_by_name(name) {
361 Ok(source.id)
362 } else {
363 Err(CatalogError::NotFound("table or source", name.to_owned()).into())
364 }
365 }
366}
367
368pub mod display {
369 use risingwave_common::catalog::ObjectId;
370 use risingwave_pb::stream_plan::BackfillOrder;
371
372 use crate::session::SessionImpl;
373
374 fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
375 let catalog_reader = session.env().catalog_reader().read_guard();
376 let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
377 let table_name = table_catalog.name();
378 let db_id = table_catalog.database_id;
379 let schema_id = table_catalog.schema_id;
380 let schema_catalog = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
381 let schema_name = schema_catalog.name();
382 let name = format!("{}.{}", schema_name, table_name);
383 Ok(name)
384 }
385
386 pub(super) fn print_backfill_order_in_dot_format(
387 session: &SessionImpl,
388 order: BackfillOrder,
389 ) -> crate::error::Result<String> {
390 let mut result = String::new();
391 result.push_str("digraph G {\n");
392 let mut edges = vec![];
395 for (start, end) in order.order {
396 let start_name = get_table_name(session, start)?;
397 for end in end.data {
398 let end_name = get_table_name(session, end)?;
399 edges.push(format!(" \"{}\" -> \"{}\";\n", start_name, end_name));
400 }
401 }
402 edges.sort();
403 for edge in edges {
404 result.push_str(&edge);
405 }
406 result.push_str("}\n");
407 Ok(result)
408 }
409}
410
411pub fn plan_backfill_order(
420 session: &SessionImpl,
421 backfill_order_strategy: BackfillOrderStrategy,
422 plan: PlanRef,
423) -> Result<BackfillOrder> {
424 let order = match backfill_order_strategy {
425 BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
426 BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
427 BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders)?,
428 };
429 Ok(BackfillOrder { order })
430}
431
432pub fn explain_backfill_order_in_dot_format(
434 session: &SessionImpl,
435 backfill_order_strategy: BackfillOrderStrategy,
436 plan: PlanRef,
437) -> Result<String> {
438 let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
439 let dot_formatted_backfill_order =
440 display::print_backfill_order_in_dot_format(session, order.clone())?;
441 Ok(dot_formatted_backfill_order)
442}