risingwave_frontend/optimizer/
backfill_order_strategy.rs1use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::error::Result;
19use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
20use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
21use crate::optimizer::plan_node::StreamPlanRef;
22use crate::session::SessionImpl;
23
24pub mod auto {
25 use std::collections::{HashMap, HashSet};
26
27 use risingwave_common::catalog::ObjectId;
28 use risingwave_pb::common::Uint32Vector;
29
30 use crate::optimizer::backfill_order_strategy::common::has_cycle;
31 use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
32 use crate::session::SessionImpl;
33
34 #[derive(Debug)]
35 pub enum BackfillTreeNode {
36 Join {
37 lhs: Box<BackfillTreeNode>,
38 rhs: Box<BackfillTreeNode>,
39 },
40 Scan {
41 id: ObjectId,
42 },
43 Union {
44 children: Vec<BackfillTreeNode>,
45 },
46 Ignored,
47 }
48
49 fn plan_graph_to_backfill_tree(
51 session: &SessionImpl,
52 plan: StreamPlanRef,
53 ) -> Option<BackfillTreeNode> {
54 match plan.node_type() {
55 StreamPlanNodeType::StreamHashJoin => {
56 assert_eq!(plan.inputs().len(), 2);
57 let mut inputs = plan.inputs().into_iter();
58 let l = inputs.next().unwrap();
59 let r = inputs.next().unwrap();
60 Some(BackfillTreeNode::Join {
61 lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
62 rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
63 })
64 }
65 StreamPlanNodeType::StreamTableScan => {
66 let table_scan = plan.as_stream_table_scan().expect("table scan");
67 let relation_id = table_scan.core().table_catalog.id().into();
68 Some(BackfillTreeNode::Scan { id: relation_id })
69 }
70 StreamPlanNodeType::StreamSourceScan => {
71 let source_scan = plan.as_stream_source_scan().expect("source scan");
72 let relation_id = source_scan.source_catalog().id;
73 Some(BackfillTreeNode::Scan { id: relation_id })
74 }
75 StreamPlanNodeType::StreamUnion => {
76 let inputs = plan.inputs();
77 let mut children = Vec::with_capacity(inputs.len());
78 for child in inputs {
79 let subtree = plan_graph_to_backfill_tree(session, child)?;
80 if matches!(subtree, BackfillTreeNode::Ignored) {
81 continue;
82 }
83 children.push(subtree);
84 }
85 Some(BackfillTreeNode::Union { children })
86 }
87 node_type => {
88 let inputs = plan.inputs();
89 match inputs.len() {
90 0 => Some(BackfillTreeNode::Ignored),
91 1 => {
92 let mut inputs = inputs.into_iter();
93 let child = inputs.next().unwrap();
94 plan_graph_to_backfill_tree(session, child)
95 }
96 _ => {
97 session.notice_to_user(format!(
98 "Backfill order strategy is not supported for {:?}",
99 node_type
100 ));
101 None
102 }
103 }
104 }
105 }
106 }
107
108 fn fold_backfill_tree_to_partial_order(
162 tree: BackfillTreeNode,
163 ) -> HashMap<ObjectId, Uint32Vector> {
164 let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
165
166 fn traverse_backfill_tree(
169 tree: BackfillTreeNode,
170 order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
171 is_leftmost_child: bool,
172 mut prior_terminal_nodes: HashSet<ObjectId>,
173 ) -> HashSet<ObjectId> {
174 match tree {
175 BackfillTreeNode::Ignored => HashSet::new(),
176 BackfillTreeNode::Scan { id } => {
177 if is_leftmost_child {
178 for prior_terminal_node in prior_terminal_nodes {
179 order.entry(prior_terminal_node).or_default().insert(id);
180 }
181 }
182 HashSet::from([id])
183 }
184 BackfillTreeNode::Union { children } => {
185 let mut terminal_nodes = HashSet::new();
186 for child in children {
187 let child_terminal_nodes = traverse_backfill_tree(
188 child,
189 order,
190 is_leftmost_child,
191 prior_terminal_nodes.clone(),
192 );
193 terminal_nodes.extend(child_terminal_nodes);
194 }
195 terminal_nodes
196 }
197 BackfillTreeNode::Join { lhs, rhs } => {
198 let rhs_terminal_nodes =
199 traverse_backfill_tree(*rhs, order, false, HashSet::new());
200 prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
201 traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
202 }
203 }
204 }
205
206 traverse_backfill_tree(tree, &mut order, false, HashSet::new());
207
208 order
209 .into_iter()
210 .map(|(k, v)| {
211 let data = v.into_iter().collect();
212 (k, Uint32Vector { data })
213 })
214 .collect()
215 }
216
217 pub(super) fn plan_auto_strategy(
218 session: &SessionImpl,
219 plan: StreamPlanRef,
220 ) -> HashMap<ObjectId, Uint32Vector> {
221 if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
222 let order = fold_backfill_tree_to_partial_order(tree);
223 if has_cycle(&order) {
224 tracing::warn!(?order, "Backfill order strategy has a cycle");
225 session.notice_to_user("Backfill order strategy has a cycle");
226 return Default::default();
227 }
228 return order;
229 }
230 Default::default()
231 }
232}
233
234mod fixed {
235 use std::collections::HashMap;
236
237 use risingwave_common::bail;
238 use risingwave_common::catalog::ObjectId;
239 use risingwave_pb::common::Uint32Vector;
240 use risingwave_sqlparser::ast::ObjectName;
241
242 use crate::error::Result;
243 use crate::optimizer::backfill_order_strategy::common::{
244 bind_backfill_relation_id_by_name, has_cycle,
245 };
246 use crate::session::SessionImpl;
247
248 pub(super) fn plan_fixed_strategy(
249 session: &SessionImpl,
250 orders: Vec<(ObjectName, ObjectName)>,
251 ) -> Result<HashMap<ObjectId, Uint32Vector>> {
252 let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
253 for (start_name, end_name) in orders {
254 let start_relation_id = bind_backfill_relation_id_by_name(session, start_name)?;
255 let end_relation_id = bind_backfill_relation_id_by_name(session, end_name)?;
256 order
257 .entry(start_relation_id)
258 .or_default()
259 .data
260 .push(end_relation_id);
261 }
262 if has_cycle(&order) {
263 bail!("Backfill order strategy has a cycle");
264 }
265 Ok(order)
266 }
267}
268
269mod common {
270 use std::collections::{HashMap, HashSet};
271
272 use risingwave_common::catalog::ObjectId;
273 use risingwave_pb::common::Uint32Vector;
274 use risingwave_sqlparser::ast::ObjectName;
275
276 use crate::Binder;
277 use crate::catalog::CatalogError;
278 use crate::catalog::root_catalog::SchemaPath;
279 use crate::catalog::schema_catalog::SchemaCatalog;
280 use crate::error::Result;
281 use crate::session::SessionImpl;
282
283 pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
285 fn dfs(
286 node: ObjectId,
287 order: &HashMap<ObjectId, Uint32Vector>,
288 visited: &mut HashSet<ObjectId>,
289 stack: &mut HashSet<ObjectId>,
290 ) -> bool {
291 if stack.contains(&node) {
292 return true; }
294
295 if visited.insert(node) {
296 stack.insert(node);
297 if let Some(downstreams) = order.get(&node) {
298 for neighbor in &downstreams.data {
299 if dfs(*neighbor, order, visited, stack) {
300 return true;
301 }
302 }
303 }
304 stack.remove(&node);
305 }
306 false
307 }
308
309 let mut visited = HashSet::new();
310 let mut stack = HashSet::new();
311 for &start in order.keys() {
312 if dfs(start, order, &mut visited, &mut stack) {
313 return true;
314 }
315 }
316
317 false
318 }
319
320 pub(super) fn bind_backfill_relation_id_by_name(
321 session: &SessionImpl,
322 name: ObjectName,
323 ) -> Result<ObjectId> {
324 let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
325 let db_name = db_name.unwrap_or(session.database());
326
327 let reader = session.env().catalog_reader().read_guard();
328
329 match schema_name {
330 Some(name) => {
331 let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
332 bind_table(schema_catalog, &rel_name)
333 }
334 None => {
335 let search_path = session.config().search_path();
336 let user_name = session.user_name();
337 let schema_path = SchemaPath::Path(&search_path, &user_name);
338 let result: crate::error::Result<Option<(ObjectId, &str)>> =
339 schema_path.try_find(|schema_name| {
340 if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
341 && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
342 {
343 Ok(Some(relation_id))
344 } else {
345 Ok(None)
346 }
347 });
348 if let Some((relation_id, _schema_name)) = result? {
349 return Ok(relation_id);
350 }
351 Err(CatalogError::NotFound("table", rel_name.to_owned()).into())
352 }
353 }
354 }
355
356 fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
357 if let Some(table) = schema_catalog.get_created_table_by_name(name) {
358 Ok(table.id().table_id)
359 } else if let Some(source) = schema_catalog.get_source_by_name(name) {
360 Ok(source.id)
361 } else {
362 Err(CatalogError::NotFound("table or source", name.to_owned()).into())
363 }
364 }
365}
366
367pub mod display {
368 use risingwave_common::catalog::ObjectId;
369 use risingwave_pb::stream_plan::BackfillOrder;
370
371 use crate::session::SessionImpl;
372
373 fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
374 let catalog_reader = session.env().catalog_reader().read_guard();
375 let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
376 let table_name = table_catalog.name();
377 let db_id = table_catalog.database_id;
378 let schema_id = table_catalog.schema_id;
379 let schema_catalog = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
380 let schema_name = schema_catalog.name();
381 let name = format!("{}.{}", schema_name, table_name);
382 Ok(name)
383 }
384
385 pub(super) fn print_backfill_order_in_dot_format(
386 session: &SessionImpl,
387 order: BackfillOrder,
388 ) -> crate::error::Result<String> {
389 let mut result = String::new();
390 result.push_str("digraph G {\n");
391 let mut edges = vec![];
394 for (start, end) in order.order {
395 let start_name = get_table_name(session, start)?;
396 for end in end.data {
397 let end_name = get_table_name(session, end)?;
398 edges.push(format!(" \"{}\" -> \"{}\";\n", start_name, end_name));
399 }
400 }
401 edges.sort();
402 for edge in edges {
403 result.push_str(&edge);
404 }
405 result.push_str("}\n");
406 Ok(result)
407 }
408}
409
410pub fn plan_backfill_order(
419 session: &SessionImpl,
420 backfill_order_strategy: BackfillOrderStrategy,
421 plan: StreamPlanRef,
422) -> Result<BackfillOrder> {
423 let order = match backfill_order_strategy {
424 BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
425 BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
426 BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders)?,
427 };
428 Ok(BackfillOrder { order })
429}
430
431pub fn explain_backfill_order_in_dot_format(
433 session: &SessionImpl,
434 backfill_order_strategy: BackfillOrderStrategy,
435 plan: StreamPlanRef,
436) -> Result<String> {
437 let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
438 let dot_formatted_backfill_order =
439 display::print_backfill_order_in_dot_format(session, order.clone())?;
440 Ok(dot_formatted_backfill_order)
441}