risingwave_frontend/optimizer/
backfill_order_strategy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::error::Result;
19use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
20use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
21use crate::optimizer::plan_node::StreamPlanRef;
22use crate::session::SessionImpl;
23
24pub mod auto {
25    use std::collections::{HashMap, HashSet};
26
27    use risingwave_common::catalog::ObjectId;
28    use risingwave_pb::common::Uint32Vector;
29
30    use crate::optimizer::backfill_order_strategy::common::has_cycle;
31    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
32    use crate::session::SessionImpl;
33
34    #[derive(Debug)]
35    pub enum BackfillTreeNode {
36        Join {
37            lhs: Box<BackfillTreeNode>,
38            rhs: Box<BackfillTreeNode>,
39        },
40        Scan {
41            id: ObjectId,
42        },
43        Union {
44            children: Vec<BackfillTreeNode>,
45        },
46        Ignored,
47    }
48
49    /// TODO: Handle stream share
50    fn plan_graph_to_backfill_tree(
51        session: &SessionImpl,
52        plan: StreamPlanRef,
53    ) -> Option<BackfillTreeNode> {
54        match plan.node_type() {
55            StreamPlanNodeType::StreamHashJoin => {
56                assert_eq!(plan.inputs().len(), 2);
57                let mut inputs = plan.inputs().into_iter();
58                let l = inputs.next().unwrap();
59                let r = inputs.next().unwrap();
60                Some(BackfillTreeNode::Join {
61                    lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
62                    rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
63                })
64            }
65            StreamPlanNodeType::StreamTableScan => {
66                let table_scan = plan.as_stream_table_scan().expect("table scan");
67                let relation_id = table_scan.core().table_catalog.id().into();
68                Some(BackfillTreeNode::Scan { id: relation_id })
69            }
70            StreamPlanNodeType::StreamSourceScan => {
71                let source_scan = plan.as_stream_source_scan().expect("source scan");
72                let relation_id = source_scan.source_catalog().id;
73                Some(BackfillTreeNode::Scan { id: relation_id })
74            }
75            StreamPlanNodeType::StreamUnion => {
76                let inputs = plan.inputs();
77                let mut children = Vec::with_capacity(inputs.len());
78                for child in inputs {
79                    let subtree = plan_graph_to_backfill_tree(session, child)?;
80                    if matches!(subtree, BackfillTreeNode::Ignored) {
81                        continue;
82                    }
83                    children.push(subtree);
84                }
85                Some(BackfillTreeNode::Union { children })
86            }
87            node_type => {
88                let inputs = plan.inputs();
89                match inputs.len() {
90                    0 => Some(BackfillTreeNode::Ignored),
91                    1 => {
92                        let mut inputs = inputs.into_iter();
93                        let child = inputs.next().unwrap();
94                        plan_graph_to_backfill_tree(session, child)
95                    }
96                    _ => {
97                        session.notice_to_user(format!(
98                            "Backfill order strategy is not supported for {:?}",
99                            node_type
100                        ));
101                        None
102                    }
103                }
104            }
105        }
106    }
107
108    /// For a given subtree, all the leaf nodes in the leftmost leaf-node node
109    /// must come _after_ all other leaf nodes in the subtree.
110    /// For example, for the following tree:
111    ///
112    /// ```text
113    ///       JOIN (A)
114    ///      /        \
115    ///     JOIN (B)   SCAN (C)
116    ///    /        \
117    ///   /         \
118    /// /           \
119    /// SCAN (D)    SCAN (E)
120    /// ```
121    ///
122    /// D is the leftmost leaf node.
123    /// {C, E} are the other leaf nodes.
124    ///
125    /// So the partial order is:
126    /// {C, E} -> {D}
127    /// Expanded:
128    /// C -> D
129    /// E -> D
130    ///
131    /// Next, we have to consider UNION as well.
132    /// If a UNION node is the leftmost child,
133    /// then for all subtrees in the UNION,
134    /// their leftmost leaf nodes must come after
135    /// all other leaf nodes in the subtree.
136    ///
137    /// ``` text
138    ///         JOIN (A)
139    ///        /        \
140    ///       JOIN (B)   SCAN (C)
141    ///       /       \
142    ///      /         \
143    ///     UNION (D)   SCAN (E)
144    ///    /        \
145    ///   /         \
146    /// SCAN (F)    JOIN (G)
147    ///            /        \
148    ///           /          \
149    ///          SCAN (H)   SCAN (I)
150    /// ```
151    ///
152    /// In this case, {F, H} are the leftmost leaf nodes.
153    /// {C, E} -> {F, H}
154    /// I -> H
155    /// Expanded:
156    /// C -> F
157    /// E -> F
158    /// C -> H
159    /// E -> H
160    /// I -> H
161    fn fold_backfill_tree_to_partial_order(
162        tree: BackfillTreeNode,
163    ) -> HashMap<ObjectId, Uint32Vector> {
164        let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
165
166        // Returns terminal nodes of the subtree
167        // This is recursive algorithm we use to traverse the tree and compute partial orders.
168        fn traverse_backfill_tree(
169            tree: BackfillTreeNode,
170            order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
171            is_leftmost_child: bool,
172            mut prior_terminal_nodes: HashSet<ObjectId>,
173        ) -> HashSet<ObjectId> {
174            match tree {
175                BackfillTreeNode::Ignored => HashSet::new(),
176                BackfillTreeNode::Scan { id } => {
177                    if is_leftmost_child {
178                        for prior_terminal_node in prior_terminal_nodes {
179                            order.entry(prior_terminal_node).or_default().insert(id);
180                        }
181                    }
182                    HashSet::from([id])
183                }
184                BackfillTreeNode::Union { children } => {
185                    let mut terminal_nodes = HashSet::new();
186                    for child in children {
187                        let child_terminal_nodes = traverse_backfill_tree(
188                            child,
189                            order,
190                            is_leftmost_child,
191                            prior_terminal_nodes.clone(),
192                        );
193                        terminal_nodes.extend(child_terminal_nodes);
194                    }
195                    terminal_nodes
196                }
197                BackfillTreeNode::Join { lhs, rhs } => {
198                    let rhs_terminal_nodes =
199                        traverse_backfill_tree(*rhs, order, false, HashSet::new());
200                    prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
201                    traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
202                }
203            }
204        }
205
206        traverse_backfill_tree(tree, &mut order, false, HashSet::new());
207
208        order
209            .into_iter()
210            .map(|(k, v)| {
211                let data = v.into_iter().collect();
212                (k, Uint32Vector { data })
213            })
214            .collect()
215    }
216
217    pub(super) fn plan_auto_strategy(
218        session: &SessionImpl,
219        plan: StreamPlanRef,
220    ) -> HashMap<ObjectId, Uint32Vector> {
221        if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
222            let order = fold_backfill_tree_to_partial_order(tree);
223            if has_cycle(&order) {
224                tracing::warn!(?order, "Backfill order strategy has a cycle");
225                session.notice_to_user("Backfill order strategy has a cycle");
226                return Default::default();
227            }
228            return order;
229        }
230        Default::default()
231    }
232}
233
234mod fixed {
235    use std::collections::HashMap;
236
237    use risingwave_common::bail;
238    use risingwave_common::catalog::ObjectId;
239    use risingwave_pb::common::Uint32Vector;
240    use risingwave_sqlparser::ast::ObjectName;
241
242    use crate::error::Result;
243    use crate::optimizer::backfill_order_strategy::common::{
244        bind_backfill_relation_id_by_name, has_cycle,
245    };
246    use crate::session::SessionImpl;
247
248    pub(super) fn plan_fixed_strategy(
249        session: &SessionImpl,
250        orders: Vec<(ObjectName, ObjectName)>,
251    ) -> Result<HashMap<ObjectId, Uint32Vector>> {
252        let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
253        for (start_name, end_name) in orders {
254            let start_relation_id = bind_backfill_relation_id_by_name(session, start_name)?;
255            let end_relation_id = bind_backfill_relation_id_by_name(session, end_name)?;
256            order
257                .entry(start_relation_id)
258                .or_default()
259                .data
260                .push(end_relation_id);
261        }
262        if has_cycle(&order) {
263            bail!("Backfill order strategy has a cycle");
264        }
265        Ok(order)
266    }
267}
268
269mod common {
270    use std::collections::{HashMap, HashSet};
271
272    use risingwave_common::catalog::ObjectId;
273    use risingwave_pb::common::Uint32Vector;
274    use risingwave_sqlparser::ast::ObjectName;
275
276    use crate::Binder;
277    use crate::catalog::CatalogError;
278    use crate::catalog::root_catalog::SchemaPath;
279    use crate::catalog::schema_catalog::SchemaCatalog;
280    use crate::error::Result;
281    use crate::session::SessionImpl;
282
283    /// Check if the backfill order has a cycle.
284    pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
285        fn dfs(
286            node: ObjectId,
287            order: &HashMap<ObjectId, Uint32Vector>,
288            visited: &mut HashSet<ObjectId>,
289            stack: &mut HashSet<ObjectId>,
290        ) -> bool {
291            if stack.contains(&node) {
292                return true; // Cycle detected
293            }
294
295            if visited.insert(node) {
296                stack.insert(node);
297                if let Some(downstreams) = order.get(&node) {
298                    for neighbor in &downstreams.data {
299                        if dfs(*neighbor, order, visited, stack) {
300                            return true;
301                        }
302                    }
303                }
304                stack.remove(&node);
305            }
306            false
307        }
308
309        let mut visited = HashSet::new();
310        let mut stack = HashSet::new();
311        for &start in order.keys() {
312            if dfs(start, order, &mut visited, &mut stack) {
313                return true;
314            }
315        }
316
317        false
318    }
319
320    pub(super) fn bind_backfill_relation_id_by_name(
321        session: &SessionImpl,
322        name: ObjectName,
323    ) -> Result<ObjectId> {
324        let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
325        let db_name = db_name.unwrap_or(session.database());
326
327        let reader = session.env().catalog_reader().read_guard();
328
329        match schema_name {
330            Some(name) => {
331                let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
332                bind_table(schema_catalog, &rel_name)
333            }
334            None => {
335                let search_path = session.config().search_path();
336                let user_name = session.user_name();
337                let schema_path = SchemaPath::Path(&search_path, &user_name);
338                let result: crate::error::Result<Option<(ObjectId, &str)>> =
339                    schema_path.try_find(|schema_name| {
340                        if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
341                            && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
342                        {
343                            Ok(Some(relation_id))
344                        } else {
345                            Ok(None)
346                        }
347                    });
348                if let Some((relation_id, _schema_name)) = result? {
349                    return Ok(relation_id);
350                }
351                Err(CatalogError::NotFound("table", rel_name.to_owned()).into())
352            }
353        }
354    }
355
356    fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
357        if let Some(table) = schema_catalog.get_created_table_by_name(name) {
358            Ok(table.id().table_id)
359        } else if let Some(source) = schema_catalog.get_source_by_name(name) {
360            Ok(source.id)
361        } else {
362            Err(CatalogError::NotFound("table or source", name.to_owned()).into())
363        }
364    }
365}
366
367pub mod display {
368    use risingwave_common::catalog::ObjectId;
369    use risingwave_pb::stream_plan::BackfillOrder;
370
371    use crate::session::SessionImpl;
372
373    fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
374        let catalog_reader = session.env().catalog_reader().read_guard();
375        let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
376        let table_name = table_catalog.name();
377        let db_id = table_catalog.database_id;
378        let schema_id = table_catalog.schema_id;
379        let schema_catalog = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
380        let schema_name = schema_catalog.name();
381        let name = format!("{}.{}", schema_name, table_name);
382        Ok(name)
383    }
384
385    pub(super) fn print_backfill_order_in_dot_format(
386        session: &SessionImpl,
387        order: BackfillOrder,
388    ) -> crate::error::Result<String> {
389        let mut result = String::new();
390        result.push_str("digraph G {\n");
391        // NOTE(kwannoel): This is a hack to make the edge ordering deterministic.
392        // so our planner tests are deterministic.
393        let mut edges = vec![];
394        for (start, end) in order.order {
395            let start_name = get_table_name(session, start)?;
396            for end in end.data {
397                let end_name = get_table_name(session, end)?;
398                edges.push(format!("  \"{}\" -> \"{}\";\n", start_name, end_name));
399            }
400        }
401        edges.sort();
402        for edge in edges {
403            result.push_str(&edge);
404        }
405        result.push_str("}\n");
406        Ok(result)
407    }
408}
409
410/// We only bind tables and materialized views.
411/// We need to bind sources and indices in the future as well.
412/// For auto backfill strategy,
413/// if a cycle forms due to the same relation being scanned twice in the derived order,
414/// we won't generate any backfill order strategy.
415/// For fixed backfill strategy,
416/// for scans on the same relation id, even though they may be in different fragments,
417/// they will all share the same backfill order.
418pub fn plan_backfill_order(
419    session: &SessionImpl,
420    backfill_order_strategy: BackfillOrderStrategy,
421    plan: StreamPlanRef,
422) -> Result<BackfillOrder> {
423    let order = match backfill_order_strategy {
424        BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
425        BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
426        BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders)?,
427    };
428    Ok(BackfillOrder { order })
429}
430
431/// Plan the backfill order, and also output the backfill tree.
432pub fn explain_backfill_order_in_dot_format(
433    session: &SessionImpl,
434    backfill_order_strategy: BackfillOrderStrategy,
435    plan: StreamPlanRef,
436) -> Result<String> {
437    let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
438    let dot_formatted_backfill_order =
439        display::print_backfill_order_in_dot_format(session, order.clone())?;
440    Ok(dot_formatted_backfill_order)
441}