risingwave_frontend/optimizer/
backfill_order_strategy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::PlanRef;
19use crate::error::Result;
20use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
21use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
22use crate::session::SessionImpl;
23
24pub mod auto {
25    use std::collections::{HashMap, HashSet};
26
27    use risingwave_common::catalog::ObjectId;
28    use risingwave_pb::common::Uint32Vector;
29
30    use crate::PlanRef;
31    use crate::optimizer::PlanNodeType;
32    use crate::optimizer::backfill_order_strategy::common::has_cycle;
33    use crate::session::SessionImpl;
34
35    #[derive(Debug)]
36    pub enum BackfillTreeNode {
37        Join {
38            lhs: Box<BackfillTreeNode>,
39            rhs: Box<BackfillTreeNode>,
40        },
41        Scan {
42            id: ObjectId,
43        },
44        Union {
45            children: Vec<BackfillTreeNode>,
46        },
47        Ignored,
48    }
49
50    /// TODO: Handle stream share
51    fn plan_graph_to_backfill_tree(
52        session: &SessionImpl,
53        plan: PlanRef,
54    ) -> Option<BackfillTreeNode> {
55        match plan.node_type() {
56            PlanNodeType::StreamHashJoin => {
57                assert_eq!(plan.inputs().len(), 2);
58                let mut inputs = plan.inputs().into_iter();
59                let l = inputs.next().unwrap();
60                let r = inputs.next().unwrap();
61                Some(BackfillTreeNode::Join {
62                    lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
63                    rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
64                })
65            }
66            PlanNodeType::StreamTableScan => {
67                let table_scan = plan.as_stream_table_scan().expect("table scan");
68                let relation_id = table_scan.core().table_catalog.id().into();
69                Some(BackfillTreeNode::Scan { id: relation_id })
70            }
71            PlanNodeType::StreamSourceScan => {
72                let source_scan = plan.as_stream_source_scan().expect("source scan");
73                let relation_id = source_scan.source_catalog().id;
74                Some(BackfillTreeNode::Scan { id: relation_id })
75            }
76            PlanNodeType::StreamUnion => {
77                let inputs = plan.inputs();
78                let mut children = Vec::with_capacity(inputs.len());
79                for child in inputs {
80                    let subtree = plan_graph_to_backfill_tree(session, child)?;
81                    if matches!(subtree, BackfillTreeNode::Ignored) {
82                        continue;
83                    }
84                    children.push(subtree);
85                }
86                Some(BackfillTreeNode::Union { children })
87            }
88            node_type => {
89                let inputs = plan.inputs();
90                match inputs.len() {
91                    0 => Some(BackfillTreeNode::Ignored),
92                    1 => {
93                        let mut inputs = inputs.into_iter();
94                        let child = inputs.next().unwrap();
95                        plan_graph_to_backfill_tree(session, child)
96                    }
97                    _ => {
98                        session.notice_to_user(format!(
99                            "Backfill order strategy is not supported for {:?}",
100                            node_type
101                        ));
102                        None
103                    }
104                }
105            }
106        }
107    }
108
109    /// For a given subtree, all the leaf nodes in the leftmost leaf-node node
110    /// must come _after_ all other leaf nodes in the subtree.
111    /// For example, for the following tree:
112    ///
113    /// ```text
114    ///       JOIN (A)
115    ///      /        \
116    ///     JOIN (B)   SCAN (C)
117    ///    /        \
118    ///   /         \
119    /// /           \
120    /// SCAN (D)    SCAN (E)
121    /// ```
122    ///
123    /// D is the leftmost leaf node.
124    /// {C, E} are the other leaf nodes.
125    ///
126    /// So the partial order is:
127    /// {C, E} -> {D}
128    /// Expanded:
129    /// C -> D
130    /// E -> D
131    ///
132    /// Next, we have to consider UNION as well.
133    /// If a UNION node is the leftmost child,
134    /// then for all subtrees in the UNION,
135    /// their leftmost leaf nodes must come after
136    /// all other leaf nodes in the subtree.
137    ///
138    /// ``` text
139    ///         JOIN (A)
140    ///        /        \
141    ///       JOIN (B)   SCAN (C)
142    ///       /       \
143    ///      /         \
144    ///     UNION (D)   SCAN (E)
145    ///    /        \
146    ///   /         \
147    /// SCAN (F)    JOIN (G)
148    ///            /        \
149    ///           /          \
150    ///          SCAN (H)   SCAN (I)
151    /// ```
152    ///
153    /// In this case, {F, H} are the leftmost leaf nodes.
154    /// {C, E} -> {F, H}
155    /// I -> H
156    /// Expanded:
157    /// C -> F
158    /// E -> F
159    /// C -> H
160    /// E -> H
161    /// I -> H
162    fn fold_backfill_tree_to_partial_order(
163        tree: BackfillTreeNode,
164    ) -> HashMap<ObjectId, Uint32Vector> {
165        let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
166
167        // Returns terminal nodes of the subtree
168        // This is recursive algorithm we use to traverse the tree and compute partial orders.
169        fn traverse_backfill_tree(
170            tree: BackfillTreeNode,
171            order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
172            is_leftmost_child: bool,
173            mut prior_terminal_nodes: HashSet<ObjectId>,
174        ) -> HashSet<ObjectId> {
175            match tree {
176                BackfillTreeNode::Ignored => HashSet::new(),
177                BackfillTreeNode::Scan { id } => {
178                    if is_leftmost_child {
179                        for prior_terminal_node in prior_terminal_nodes {
180                            order.entry(prior_terminal_node).or_default().insert(id);
181                        }
182                    }
183                    HashSet::from([id])
184                }
185                BackfillTreeNode::Union { children } => {
186                    let mut terminal_nodes = HashSet::new();
187                    for child in children {
188                        let child_terminal_nodes = traverse_backfill_tree(
189                            child,
190                            order,
191                            is_leftmost_child,
192                            prior_terminal_nodes.clone(),
193                        );
194                        terminal_nodes.extend(child_terminal_nodes);
195                    }
196                    terminal_nodes
197                }
198                BackfillTreeNode::Join { lhs, rhs } => {
199                    let rhs_terminal_nodes =
200                        traverse_backfill_tree(*rhs, order, false, HashSet::new());
201                    prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
202                    traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
203                }
204            }
205        }
206
207        traverse_backfill_tree(tree, &mut order, false, HashSet::new());
208
209        order
210            .into_iter()
211            .map(|(k, v)| {
212                let data = v.into_iter().collect();
213                (k, Uint32Vector { data })
214            })
215            .collect()
216    }
217
218    pub(super) fn plan_auto_strategy(
219        session: &SessionImpl,
220        plan: PlanRef,
221    ) -> HashMap<ObjectId, Uint32Vector> {
222        if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
223            let order = fold_backfill_tree_to_partial_order(tree);
224            if has_cycle(&order) {
225                tracing::warn!(?order, "Backfill order strategy has a cycle");
226                session.notice_to_user("Backfill order strategy has a cycle");
227                return Default::default();
228            }
229            return order;
230        }
231        Default::default()
232    }
233}
234
235mod fixed {
236    use std::collections::HashMap;
237
238    use risingwave_common::bail;
239    use risingwave_common::catalog::ObjectId;
240    use risingwave_pb::common::Uint32Vector;
241    use risingwave_sqlparser::ast::ObjectName;
242
243    use crate::error::Result;
244    use crate::optimizer::backfill_order_strategy::common::{
245        bind_backfill_relation_id_by_name, has_cycle,
246    };
247    use crate::session::SessionImpl;
248
249    pub(super) fn plan_fixed_strategy(
250        session: &SessionImpl,
251        orders: Vec<(ObjectName, ObjectName)>,
252    ) -> Result<HashMap<ObjectId, Uint32Vector>> {
253        let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
254        for (start_name, end_name) in orders {
255            let start_relation_id = bind_backfill_relation_id_by_name(session, start_name)?;
256            let end_relation_id = bind_backfill_relation_id_by_name(session, end_name)?;
257            order
258                .entry(start_relation_id)
259                .or_default()
260                .data
261                .push(end_relation_id);
262        }
263        if has_cycle(&order) {
264            bail!("Backfill order strategy has a cycle");
265        }
266        Ok(order)
267    }
268}
269
270mod common {
271    use std::collections::{HashMap, HashSet};
272
273    use risingwave_common::catalog::ObjectId;
274    use risingwave_pb::common::Uint32Vector;
275    use risingwave_sqlparser::ast::ObjectName;
276
277    use crate::Binder;
278    use crate::catalog::CatalogError;
279    use crate::catalog::root_catalog::SchemaPath;
280    use crate::catalog::schema_catalog::SchemaCatalog;
281    use crate::error::Result;
282    use crate::session::SessionImpl;
283
284    /// Check if the backfill order has a cycle.
285    pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
286        fn dfs(
287            node: ObjectId,
288            order: &HashMap<ObjectId, Uint32Vector>,
289            visited: &mut HashSet<ObjectId>,
290            stack: &mut HashSet<ObjectId>,
291        ) -> bool {
292            if stack.contains(&node) {
293                return true; // Cycle detected
294            }
295
296            if visited.insert(node) {
297                stack.insert(node);
298                if let Some(downstreams) = order.get(&node) {
299                    for neighbor in &downstreams.data {
300                        if dfs(*neighbor, order, visited, stack) {
301                            return true;
302                        }
303                    }
304                }
305                stack.remove(&node);
306            }
307            false
308        }
309
310        let mut visited = HashSet::new();
311        let mut stack = HashSet::new();
312        for &start in order.keys() {
313            if dfs(start, order, &mut visited, &mut stack) {
314                return true;
315            }
316        }
317
318        false
319    }
320
321    pub(super) fn bind_backfill_relation_id_by_name(
322        session: &SessionImpl,
323        name: ObjectName,
324    ) -> Result<ObjectId> {
325        let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(name)?;
326        let db_name = db_name.unwrap_or(session.database());
327
328        let reader = session.env().catalog_reader().read_guard();
329
330        match schema_name {
331            Some(name) => {
332                let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
333                bind_table(schema_catalog, &rel_name)
334            }
335            None => {
336                let search_path = session.config().search_path();
337                let user_name = session.user_name();
338                let schema_path = SchemaPath::Path(&search_path, &user_name);
339                let result: crate::error::Result<Option<(ObjectId, &str)>> =
340                    schema_path.try_find(|schema_name| {
341                        if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
342                            && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
343                        {
344                            Ok(Some(relation_id))
345                        } else {
346                            Ok(None)
347                        }
348                    });
349                if let Some((relation_id, _schema_name)) = result? {
350                    return Ok(relation_id);
351                }
352                Err(CatalogError::NotFound("table", rel_name.to_owned()).into())
353            }
354        }
355    }
356
357    fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
358        if let Some(table) = schema_catalog.get_created_table_by_name(name) {
359            Ok(table.id().table_id)
360        } else if let Some(source) = schema_catalog.get_source_by_name(name) {
361            Ok(source.id)
362        } else {
363            Err(CatalogError::NotFound("table or source", name.to_owned()).into())
364        }
365    }
366}
367
368pub mod display {
369    use risingwave_common::catalog::ObjectId;
370    use risingwave_pb::stream_plan::BackfillOrder;
371
372    use crate::session::SessionImpl;
373
374    fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
375        let catalog_reader = session.env().catalog_reader().read_guard();
376        let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
377        let table_name = table_catalog.name();
378        let db_id = table_catalog.database_id;
379        let schema_id = table_catalog.schema_id;
380        let schema_catalog = catalog_reader.get_schema_by_id(&db_id, &schema_id)?;
381        let schema_name = schema_catalog.name();
382        let name = format!("{}.{}", schema_name, table_name);
383        Ok(name)
384    }
385
386    pub(super) fn print_backfill_order_in_dot_format(
387        session: &SessionImpl,
388        order: BackfillOrder,
389    ) -> crate::error::Result<String> {
390        let mut result = String::new();
391        result.push_str("digraph G {\n");
392        // NOTE(kwannoel): This is a hack to make the edge ordering deterministic.
393        // so our planner tests are deterministic.
394        let mut edges = vec![];
395        for (start, end) in order.order {
396            let start_name = get_table_name(session, start)?;
397            for end in end.data {
398                let end_name = get_table_name(session, end)?;
399                edges.push(format!("  \"{}\" -> \"{}\";\n", start_name, end_name));
400            }
401        }
402        edges.sort();
403        for edge in edges {
404            result.push_str(&edge);
405        }
406        result.push_str("}\n");
407        Ok(result)
408    }
409}
410
411/// We only bind tables and materialized views.
412/// We need to bind sources and indices in the future as well.
413/// For auto backfill strategy,
414/// if a cycle forms due to the same relation being scanned twice in the derived order,
415/// we won't generate any backfill order strategy.
416/// For fixed backfill strategy,
417/// for scans on the same relation id, even though they may be in different fragments,
418/// they will all share the same backfill order.
419pub fn plan_backfill_order(
420    session: &SessionImpl,
421    backfill_order_strategy: BackfillOrderStrategy,
422    plan: PlanRef,
423) -> Result<BackfillOrder> {
424    let order = match backfill_order_strategy {
425        BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
426        BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
427        BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders)?,
428    };
429    Ok(BackfillOrder { order })
430}
431
432/// Plan the backfill order, and also output the backfill tree.
433pub fn explain_backfill_order_in_dot_format(
434    session: &SessionImpl,
435    backfill_order_strategy: BackfillOrderStrategy,
436    plan: PlanRef,
437) -> Result<String> {
438    let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
439    let dot_formatted_backfill_order =
440        display::print_backfill_order_in_dot_format(session, order.clone())?;
441    Ok(dot_formatted_backfill_order)
442}