risingwave_frontend/optimizer/
backfill_order_strategy.rs1use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::PlanRef;
19use crate::error::Result;
20use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
21use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
22use crate::session::SessionImpl;
23
24pub mod auto {
25 use std::collections::{HashMap, HashSet};
26
27 use risingwave_common::catalog::ObjectId;
28 use risingwave_pb::common::Uint32Vector;
29
30 use crate::PlanRef;
31 use crate::optimizer::PlanNodeType;
32 use crate::optimizer::backfill_order_strategy::common::has_cycle;
33 use crate::session::SessionImpl;
34
35 #[derive(Debug)]
36 pub enum BackfillTreeNode {
37 Join {
38 lhs: Box<BackfillTreeNode>,
39 rhs: Box<BackfillTreeNode>,
40 },
41 Scan {
42 id: ObjectId,
43 },
44 Union {
45 children: Vec<BackfillTreeNode>,
46 },
47 Ignored,
48 }
49
50 fn plan_graph_to_backfill_tree(
52 session: &SessionImpl,
53 plan: PlanRef,
54 ) -> Option<BackfillTreeNode> {
55 match plan.node_type() {
56 PlanNodeType::StreamHashJoin => {
57 assert_eq!(plan.inputs().len(), 2);
58 let mut inputs = plan.inputs().into_iter();
59 let l = inputs.next().unwrap();
60 let r = inputs.next().unwrap();
61 Some(BackfillTreeNode::Join {
62 lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
63 rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
64 })
65 }
66 PlanNodeType::StreamTableScan => {
67 let table_scan = plan.as_stream_table_scan().expect("table scan");
68 let relation_id = table_scan.core().table_catalog.id().into();
69 Some(BackfillTreeNode::Scan { id: relation_id })
70 }
71 PlanNodeType::StreamUnion => {
72 let inputs = plan.inputs();
73 let mut children = Vec::with_capacity(inputs.len());
74 for child in inputs {
75 let subtree = plan_graph_to_backfill_tree(session, child)?;
76 if matches!(subtree, BackfillTreeNode::Ignored) {
77 continue;
78 }
79 children.push(subtree);
80 }
81 Some(BackfillTreeNode::Union { children })
82 }
83 node_type => {
84 let inputs = plan.inputs();
85 match inputs.len() {
86 0 => Some(BackfillTreeNode::Ignored),
87 1 => {
88 let mut inputs = inputs.into_iter();
89 let child = inputs.next().unwrap();
90 plan_graph_to_backfill_tree(session, child)
91 }
92 _ => {
93 session.notice_to_user(format!(
94 "Backfill order strategy is not supported for {:?}",
95 node_type
96 ));
97 None
98 }
99 }
100 }
101 }
102 }
103
104 fn fold_backfill_tree_to_partial_order(
158 tree: BackfillTreeNode,
159 ) -> HashMap<ObjectId, Uint32Vector> {
160 let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
161
162 fn traverse_backfill_tree(
165 tree: BackfillTreeNode,
166 order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
167 is_leftmost_child: bool,
168 mut prior_terminal_nodes: HashSet<ObjectId>,
169 ) -> HashSet<ObjectId> {
170 match tree {
171 BackfillTreeNode::Ignored => HashSet::new(),
172 BackfillTreeNode::Scan { id } => {
173 if is_leftmost_child {
174 for prior_terminal_node in prior_terminal_nodes {
175 order.entry(prior_terminal_node).or_default().insert(id);
176 }
177 }
178 HashSet::from([id])
179 }
180 BackfillTreeNode::Union { children } => {
181 let mut terminal_nodes = HashSet::new();
182 for child in children {
183 let child_terminal_nodes = traverse_backfill_tree(
184 child,
185 order,
186 is_leftmost_child,
187 prior_terminal_nodes.clone(),
188 );
189 terminal_nodes.extend(child_terminal_nodes);
190 }
191 terminal_nodes
192 }
193 BackfillTreeNode::Join { lhs, rhs } => {
194 let rhs_terminal_nodes =
195 traverse_backfill_tree(*rhs, order, false, HashSet::new());
196 prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
197 traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
198 }
199 }
200 }
201
202 traverse_backfill_tree(tree, &mut order, false, HashSet::new());
203
204 order
205 .into_iter()
206 .map(|(k, v)| {
207 let data = v.into_iter().collect();
208 (k, Uint32Vector { data })
209 })
210 .collect()
211 }
212
213 pub(super) fn plan_auto_strategy(
214 session: &SessionImpl,
215 plan: PlanRef,
216 ) -> HashMap<ObjectId, Uint32Vector> {
217 if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
218 let order = fold_backfill_tree_to_partial_order(tree);
219 if has_cycle(&order) {
220 tracing::warn!(?order, "Backfill order strategy has a cycle");
221 session.notice_to_user("Backfill order strategy has a cycle");
222 return Default::default();
223 }
224 return order;
225 }
226 Default::default()
227 }
228}
229
230mod fixed {
231 use std::collections::HashMap;
232
233 use risingwave_common::bail;
234 use risingwave_common::catalog::ObjectId;
235 use risingwave_pb::common::Uint32Vector;
236 use risingwave_sqlparser::ast::ObjectName;
237
238 use crate::error::Result;
239 use crate::optimizer::backfill_order_strategy::common::{
240 bind_backfill_relation_id_by_name, has_cycle,
241 };
242 use crate::session::SessionImpl;
243
244 pub(super) fn plan_fixed_strategy(
245 session: &SessionImpl,
246 orders: Vec<(ObjectName, ObjectName)>,
247 ) -> Result<HashMap<ObjectId, Uint32Vector>> {
248 let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
249 for (start_name, end_name) in orders {
250 let start_relation_id = bind_backfill_relation_id_by_name(session, start_name)?;
251 let end_relation_id = bind_backfill_relation_id_by_name(session, end_name)?;
252 order
253 .entry(start_relation_id)
254 .or_default()
255 .data
256 .push(end_relation_id);
257 }
258 if has_cycle(&order) {
259 bail!("Backfill order strategy has a cycle");
260 }
261 Ok(order)
262 }
263}
264
265mod common {
266 use std::collections::{HashMap, HashSet};
267
268 use risingwave_common::catalog::ObjectId;
269 use risingwave_pb::common::Uint32Vector;
270 use risingwave_sqlparser::ast::ObjectName;
271
272 use crate::Binder;
273 use crate::catalog::CatalogError;
274 use crate::catalog::root_catalog::SchemaPath;
275 use crate::catalog::schema_catalog::SchemaCatalog;
276 use crate::error::Result;
277 use crate::session::SessionImpl;
278
279 pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
281 fn dfs(
282 node: ObjectId,
283 order: &HashMap<ObjectId, Uint32Vector>,
284 visited: &mut HashSet<ObjectId>,
285 stack: &mut HashSet<ObjectId>,
286 ) -> bool {
287 if stack.contains(&node) {
288 return true; }
290
291 if visited.insert(node) {
292 stack.insert(node);
293 if let Some(downstreams) = order.get(&node) {
294 for neighbor in &downstreams.data {
295 if dfs(*neighbor, order, visited, stack) {
296 return true;
297 }
298 }
299 }
300 stack.remove(&node);
301 }
302 false
303 }
304
305 let mut visited = HashSet::new();
306 let mut stack = HashSet::new();
307 for &start in order.keys() {
308 if dfs(start, order, &mut visited, &mut stack) {
309 return true;
310 }
311 }
312
313 false
314 }
315
316 pub(super) fn bind_backfill_relation_id_by_name(
317 session: &SessionImpl,
318 name: ObjectName,
319 ) -> Result<ObjectId> {
320 let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(name)?;
321 let db_name = db_name.unwrap_or(session.database());
322
323 let reader = session.env().catalog_reader().read_guard();
324
325 match schema_name {
326 Some(name) => {
327 let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
328 bind_table(schema_catalog, &rel_name)
329 }
330 None => {
331 let search_path = session.config().search_path();
332 let user_name = session.user_name();
333 let schema_path = SchemaPath::Path(&search_path, &user_name);
334 let result: crate::error::Result<Option<(ObjectId, &str)>> =
335 schema_path.try_find(|schema_name| {
336 if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
337 && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
338 {
339 Ok(Some(relation_id))
340 } else {
341 Ok(None)
342 }
343 });
344 if let Some((relation_id, _schema_name)) = result? {
345 return Ok(relation_id);
346 }
347 Err(CatalogError::NotFound("table", rel_name.to_owned()).into())
348 }
349 }
350 }
351
352 fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
353 if let Some(table) = schema_catalog.get_created_table_or_any_internal_table_by_name(name) {
354 Ok(table.id().table_id)
355 } else {
356 Err(CatalogError::NotFound("table", name.to_owned()).into())
357 }
358 }
364}
365
366pub mod display {
367 use risingwave_common::catalog::ObjectId;
368 use risingwave_pb::stream_plan::BackfillOrder;
369
370 use crate::session::SessionImpl;
371
372 fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
373 let catalog_reader = session.env().catalog_reader().read_guard();
374 let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
375 let table_name = table_catalog.name();
376 let db_id = table_catalog.database_id;
377 let schema_id = table_catalog.schema_id;
378 let schema_catalog = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
379 let schema_name = schema_catalog.name();
380 let name = format!("{}.{}", schema_name, table_name);
381 Ok(name)
382 }
383
384 pub(super) fn print_backfill_order_in_dot_format(
385 session: &SessionImpl,
386 order: BackfillOrder,
387 ) -> crate::error::Result<String> {
388 let mut result = String::new();
389 result.push_str("digraph G {\n");
390 let mut edges = vec![];
393 for (start, end) in order.order {
394 let start_name = get_table_name(session, start)?;
395 for end in end.data {
396 let end_name = get_table_name(session, end)?;
397 edges.push(format!(" \"{}\" -> \"{}\";\n", start_name, end_name));
398 }
399 }
400 edges.sort();
401 for edge in edges {
402 result.push_str(&edge);
403 }
404 result.push_str("}\n");
405 Ok(result)
406 }
407}
408
409pub fn plan_backfill_order(
418 session: &SessionImpl,
419 backfill_order_strategy: BackfillOrderStrategy,
420 plan: PlanRef,
421) -> Result<BackfillOrder> {
422 let order = match backfill_order_strategy {
423 BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
424 BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
425 BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders)?,
426 };
427 Ok(BackfillOrder { order })
428}
429
430pub fn explain_backfill_order_in_dot_format(
432 session: &SessionImpl,
433 backfill_order_strategy: BackfillOrderStrategy,
434 plan: PlanRef,
435) -> Result<String> {
436 let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
437 let dot_formatted_backfill_order =
438 display::print_backfill_order_in_dot_format(session, order.clone())?;
439 Ok(dot_formatted_backfill_order)
440}