risingwave_frontend/optimizer/
backfill_order_strategy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::PlanRef;
19use crate::error::Result;
20use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
21use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
22use crate::session::SessionImpl;
23
24pub mod auto {
25    use std::collections::{HashMap, HashSet};
26
27    use risingwave_common::catalog::ObjectId;
28    use risingwave_pb::common::Uint32Vector;
29
30    use crate::PlanRef;
31    use crate::optimizer::PlanNodeType;
32    use crate::optimizer::backfill_order_strategy::common::has_cycle;
33    use crate::session::SessionImpl;
34
35    #[derive(Debug)]
36    pub enum BackfillTreeNode {
37        Join {
38            lhs: Box<BackfillTreeNode>,
39            rhs: Box<BackfillTreeNode>,
40        },
41        Scan {
42            id: ObjectId,
43        },
44        Union {
45            children: Vec<BackfillTreeNode>,
46        },
47        Ignored,
48    }
49
50    /// TODO: Handle stream share
51    fn plan_graph_to_backfill_tree(
52        session: &SessionImpl,
53        plan: PlanRef,
54    ) -> Option<BackfillTreeNode> {
55        match plan.node_type() {
56            PlanNodeType::StreamHashJoin => {
57                assert_eq!(plan.inputs().len(), 2);
58                let mut inputs = plan.inputs().into_iter();
59                let l = inputs.next().unwrap();
60                let r = inputs.next().unwrap();
61                Some(BackfillTreeNode::Join {
62                    lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
63                    rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
64                })
65            }
66            PlanNodeType::StreamTableScan => {
67                let table_scan = plan.as_stream_table_scan().expect("table scan");
68                let relation_id = table_scan.core().table_catalog.id().into();
69                Some(BackfillTreeNode::Scan { id: relation_id })
70            }
71            PlanNodeType::StreamUnion => {
72                let inputs = plan.inputs();
73                let mut children = Vec::with_capacity(inputs.len());
74                for child in inputs {
75                    let subtree = plan_graph_to_backfill_tree(session, child)?;
76                    if matches!(subtree, BackfillTreeNode::Ignored) {
77                        continue;
78                    }
79                    children.push(subtree);
80                }
81                Some(BackfillTreeNode::Union { children })
82            }
83            node_type => {
84                let inputs = plan.inputs();
85                match inputs.len() {
86                    0 => Some(BackfillTreeNode::Ignored),
87                    1 => {
88                        let mut inputs = inputs.into_iter();
89                        let child = inputs.next().unwrap();
90                        plan_graph_to_backfill_tree(session, child)
91                    }
92                    _ => {
93                        session.notice_to_user(format!(
94                            "Backfill order strategy is not supported for {:?}",
95                            node_type
96                        ));
97                        None
98                    }
99                }
100            }
101        }
102    }
103
104    /// For a given subtree, all the leaf nodes in the leftmost leaf-node node
105    /// must come _after_ all other leaf nodes in the subtree.
106    /// For example, for the following tree:
107    ///
108    /// ```text
109    ///       JOIN (A)
110    ///      /        \
111    ///     JOIN (B)   SCAN (C)
112    ///    /        \
113    ///   /         \
114    /// /           \
115    /// SCAN (D)    SCAN (E)
116    /// ```
117    ///
118    /// D is the leftmost leaf node.
119    /// {C, E} are the other leaf nodes.
120    ///
121    /// So the partial order is:
122    /// {C, E} -> {D}
123    /// Expanded:
124    /// C -> D
125    /// E -> D
126    ///
127    /// Next, we have to consider UNION as well.
128    /// If a UNION node is the leftmost child,
129    /// then for all subtrees in the UNION,
130    /// their leftmost leaf nodes must come after
131    /// all other leaf nodes in the subtree.
132    ///
133    /// ``` text
134    ///         JOIN (A)
135    ///        /        \
136    ///       JOIN (B)   SCAN (C)
137    ///       /       \
138    ///      /         \
139    ///     UNION (D)   SCAN (E)
140    ///    /        \
141    ///   /         \
142    /// SCAN (F)    JOIN (G)
143    ///            /        \
144    ///           /          \
145    ///          SCAN (H)   SCAN (I)
146    /// ```
147    ///
148    /// In this case, {F, H} are the leftmost leaf nodes.
149    /// {C, E} -> {F, H}
150    /// I -> H
151    /// Expanded:
152    /// C -> F
153    /// E -> F
154    /// C -> H
155    /// E -> H
156    /// I -> H
157    fn fold_backfill_tree_to_partial_order(
158        tree: BackfillTreeNode,
159    ) -> HashMap<ObjectId, Uint32Vector> {
160        let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
161
162        // Returns terminal nodes of the subtree
163        // This is recursive algorithm we use to traverse the tree and compute partial orders.
164        fn traverse_backfill_tree(
165            tree: BackfillTreeNode,
166            order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
167            is_leftmost_child: bool,
168            mut prior_terminal_nodes: HashSet<ObjectId>,
169        ) -> HashSet<ObjectId> {
170            match tree {
171                BackfillTreeNode::Ignored => HashSet::new(),
172                BackfillTreeNode::Scan { id } => {
173                    if is_leftmost_child {
174                        for prior_terminal_node in prior_terminal_nodes {
175                            order.entry(prior_terminal_node).or_default().insert(id);
176                        }
177                    }
178                    HashSet::from([id])
179                }
180                BackfillTreeNode::Union { children } => {
181                    let mut terminal_nodes = HashSet::new();
182                    for child in children {
183                        let child_terminal_nodes = traverse_backfill_tree(
184                            child,
185                            order,
186                            is_leftmost_child,
187                            prior_terminal_nodes.clone(),
188                        );
189                        terminal_nodes.extend(child_terminal_nodes);
190                    }
191                    terminal_nodes
192                }
193                BackfillTreeNode::Join { lhs, rhs } => {
194                    let rhs_terminal_nodes =
195                        traverse_backfill_tree(*rhs, order, false, HashSet::new());
196                    prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
197                    traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
198                }
199            }
200        }
201
202        traverse_backfill_tree(tree, &mut order, false, HashSet::new());
203
204        order
205            .into_iter()
206            .map(|(k, v)| {
207                let data = v.into_iter().collect();
208                (k, Uint32Vector { data })
209            })
210            .collect()
211    }
212
213    pub(super) fn plan_auto_strategy(
214        session: &SessionImpl,
215        plan: PlanRef,
216    ) -> HashMap<ObjectId, Uint32Vector> {
217        if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
218            let order = fold_backfill_tree_to_partial_order(tree);
219            if has_cycle(&order) {
220                tracing::warn!(?order, "Backfill order strategy has a cycle");
221                session.notice_to_user("Backfill order strategy has a cycle");
222                return Default::default();
223            }
224            return order;
225        }
226        Default::default()
227    }
228}
229
230mod fixed {
231    use std::collections::HashMap;
232
233    use risingwave_common::bail;
234    use risingwave_common::catalog::ObjectId;
235    use risingwave_pb::common::Uint32Vector;
236    use risingwave_sqlparser::ast::ObjectName;
237
238    use crate::error::Result;
239    use crate::optimizer::backfill_order_strategy::common::{
240        bind_backfill_relation_id_by_name, has_cycle,
241    };
242    use crate::session::SessionImpl;
243
244    pub(super) fn plan_fixed_strategy(
245        session: &SessionImpl,
246        orders: Vec<(ObjectName, ObjectName)>,
247    ) -> Result<HashMap<ObjectId, Uint32Vector>> {
248        let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
249        for (start_name, end_name) in orders {
250            let start_relation_id = bind_backfill_relation_id_by_name(session, start_name)?;
251            let end_relation_id = bind_backfill_relation_id_by_name(session, end_name)?;
252            order
253                .entry(start_relation_id)
254                .or_default()
255                .data
256                .push(end_relation_id);
257        }
258        if has_cycle(&order) {
259            bail!("Backfill order strategy has a cycle");
260        }
261        Ok(order)
262    }
263}
264
265mod common {
266    use std::collections::{HashMap, HashSet};
267
268    use risingwave_common::catalog::ObjectId;
269    use risingwave_pb::common::Uint32Vector;
270    use risingwave_sqlparser::ast::ObjectName;
271
272    use crate::Binder;
273    use crate::catalog::CatalogError;
274    use crate::catalog::root_catalog::SchemaPath;
275    use crate::catalog::schema_catalog::SchemaCatalog;
276    use crate::error::Result;
277    use crate::session::SessionImpl;
278
279    /// Check if the backfill order has a cycle.
280    pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
281        fn dfs(
282            node: ObjectId,
283            order: &HashMap<ObjectId, Uint32Vector>,
284            visited: &mut HashSet<ObjectId>,
285            stack: &mut HashSet<ObjectId>,
286        ) -> bool {
287            if stack.contains(&node) {
288                return true; // Cycle detected
289            }
290
291            if visited.insert(node) {
292                stack.insert(node);
293                if let Some(downstreams) = order.get(&node) {
294                    for neighbor in &downstreams.data {
295                        if dfs(*neighbor, order, visited, stack) {
296                            return true;
297                        }
298                    }
299                }
300                stack.remove(&node);
301            }
302            false
303        }
304
305        let mut visited = HashSet::new();
306        let mut stack = HashSet::new();
307        for &start in order.keys() {
308            if dfs(start, order, &mut visited, &mut stack) {
309                return true;
310            }
311        }
312
313        false
314    }
315
316    pub(super) fn bind_backfill_relation_id_by_name(
317        session: &SessionImpl,
318        name: ObjectName,
319    ) -> Result<ObjectId> {
320        let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(name)?;
321        let db_name = db_name.unwrap_or(session.database());
322
323        let reader = session.env().catalog_reader().read_guard();
324
325        match schema_name {
326            Some(name) => {
327                let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
328                bind_table(schema_catalog, &rel_name)
329            }
330            None => {
331                let search_path = session.config().search_path();
332                let user_name = session.user_name();
333                let schema_path = SchemaPath::Path(&search_path, &user_name);
334                let result: crate::error::Result<Option<(ObjectId, &str)>> =
335                    schema_path.try_find(|schema_name| {
336                        if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
337                            && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
338                        {
339                            Ok(Some(relation_id))
340                        } else {
341                            Ok(None)
342                        }
343                    });
344                if let Some((relation_id, _schema_name)) = result? {
345                    return Ok(relation_id);
346                }
347                Err(CatalogError::NotFound("table", rel_name.to_owned()).into())
348            }
349        }
350    }
351
352    fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
353        if let Some(table) = schema_catalog.get_created_table_or_any_internal_table_by_name(name) {
354            Ok(table.id().table_id)
355        } else {
356            Err(CatalogError::NotFound("table", name.to_owned()).into())
357        }
358        // TODO: support source catalog
359        // else if let Some(source) = schema_catalog.get_source_by_name(name) {
360        //     Ok(source.id)
361        // }
362        // Err(CatalogError::NotFound("table or source", name.to_owned()).into())
363    }
364}
365
366pub mod display {
367    use risingwave_common::catalog::ObjectId;
368    use risingwave_pb::stream_plan::BackfillOrder;
369
370    use crate::session::SessionImpl;
371
372    fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
373        let catalog_reader = session.env().catalog_reader().read_guard();
374        let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
375        let table_name = table_catalog.name();
376        let db_id = table_catalog.database_id;
377        let schema_id = table_catalog.schema_id;
378        let schema_catalog = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
379        let schema_name = schema_catalog.name();
380        let name = format!("{}.{}", schema_name, table_name);
381        Ok(name)
382    }
383
384    pub(super) fn print_backfill_order_in_dot_format(
385        session: &SessionImpl,
386        order: BackfillOrder,
387    ) -> crate::error::Result<String> {
388        let mut result = String::new();
389        result.push_str("digraph G {\n");
390        // NOTE(kwannoel): This is a hack to make the edge ordering deterministic.
391        // so our planner tests are deterministic.
392        let mut edges = vec![];
393        for (start, end) in order.order {
394            let start_name = get_table_name(session, start)?;
395            for end in end.data {
396                let end_name = get_table_name(session, end)?;
397                edges.push(format!("  \"{}\" -> \"{}\";\n", start_name, end_name));
398            }
399        }
400        edges.sort();
401        for edge in edges {
402            result.push_str(&edge);
403        }
404        result.push_str("}\n");
405        Ok(result)
406    }
407}
408
409/// We only bind tables and materialized views.
410/// We need to bind sources and indices in the future as well.
411/// For auto backfill strategy,
412/// if a cycle forms due to the same relation being scanned twice in the derived order,
413/// we won't generate any backfill order strategy.
414/// For fixed backfill strategy,
415/// for scans on the same relation id, even though they may be in different fragments,
416/// they will all share the same backfill order.
417pub fn plan_backfill_order(
418    session: &SessionImpl,
419    backfill_order_strategy: BackfillOrderStrategy,
420    plan: PlanRef,
421) -> Result<BackfillOrder> {
422    let order = match backfill_order_strategy {
423        BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
424        BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
425        BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders)?,
426    };
427    Ok(BackfillOrder { order })
428}
429
430/// Plan the backfill order, and also output the backfill tree.
431pub fn explain_backfill_order_in_dot_format(
432    session: &SessionImpl,
433    backfill_order_strategy: BackfillOrderStrategy,
434    plan: PlanRef,
435) -> Result<String> {
436    let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
437    let dot_formatted_backfill_order =
438        display::print_backfill_order_in_dot_format(session, order.clone())?;
439    Ok(dot_formatted_backfill_order)
440}