risingwave_frontend/optimizer/
backfill_order_strategy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::common::Uint32Vector;
16use risingwave_pb::id::RelationId;
17use risingwave_pb::stream_plan::BackfillOrder;
18use risingwave_sqlparser::ast::BackfillOrderStrategy;
19
20use crate::error::Result;
21use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
22use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
23use crate::optimizer::plan_node::StreamPlanRef;
24use crate::session::SessionImpl;
25
26pub mod auto {
27    use std::collections::{HashMap, HashSet};
28
29    use risingwave_pb::id::RelationId;
30
31    use crate::optimizer::backfill_order_strategy::common::has_cycle;
32    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
33    use crate::session::SessionImpl;
34
35    #[derive(Debug)]
36    pub(super) enum BackfillTreeNode {
37        Join {
38            lhs: Box<BackfillTreeNode>,
39            rhs: Box<BackfillTreeNode>,
40        },
41        Scan {
42            id: RelationId,
43        },
44        Union {
45            children: Vec<BackfillTreeNode>,
46        },
47        Ignored,
48    }
49
50    /// TODO: Handle stream share
51    fn plan_graph_to_backfill_tree(
52        session: &SessionImpl,
53        plan: StreamPlanRef,
54    ) -> Option<BackfillTreeNode> {
55        match plan.node_type() {
56            StreamPlanNodeType::StreamHashJoin => {
57                assert_eq!(plan.inputs().len(), 2);
58                let mut inputs = plan.inputs().into_iter();
59                let l = inputs.next().unwrap();
60                let r = inputs.next().unwrap();
61                Some(BackfillTreeNode::Join {
62                    lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
63                    rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
64                })
65            }
66            StreamPlanNodeType::StreamTableScan => {
67                let table_scan = plan.as_stream_table_scan().expect("table scan");
68                let relation_id = table_scan.core().table_catalog.id().as_relation_id();
69                Some(BackfillTreeNode::Scan { id: relation_id })
70            }
71            StreamPlanNodeType::StreamSourceScan => {
72                let source_scan = plan.as_stream_source_scan().expect("source scan");
73                let relation_id = source_scan.source_catalog().id.as_relation_id();
74                Some(BackfillTreeNode::Scan { id: relation_id })
75            }
76            StreamPlanNodeType::StreamUnion => {
77                let inputs = plan.inputs();
78                let mut children = Vec::with_capacity(inputs.len());
79                for child in inputs {
80                    let subtree = plan_graph_to_backfill_tree(session, child)?;
81                    if matches!(subtree, BackfillTreeNode::Ignored) {
82                        continue;
83                    }
84                    children.push(subtree);
85                }
86                Some(BackfillTreeNode::Union { children })
87            }
88            node_type => {
89                let inputs = plan.inputs();
90                match inputs.len() {
91                    0 => Some(BackfillTreeNode::Ignored),
92                    1 => {
93                        let mut inputs = inputs.into_iter();
94                        let child = inputs.next().unwrap();
95                        plan_graph_to_backfill_tree(session, child)
96                    }
97                    _ => {
98                        session.notice_to_user(format!(
99                            "Backfill order strategy is not supported for {:?}",
100                            node_type
101                        ));
102                        None
103                    }
104                }
105            }
106        }
107    }
108
109    /// For a given subtree, all the leaf nodes in the leftmost leaf-node node
110    /// must come _after_ all other leaf nodes in the subtree.
111    /// For example, for the following tree:
112    ///
113    /// ```text
114    ///       JOIN (A)
115    ///      /        \
116    ///     JOIN (B)   SCAN (C)
117    ///    /        \
118    ///   /         \
119    /// /           \
120    /// SCAN (D)    SCAN (E)
121    /// ```
122    ///
123    /// D is the leftmost leaf node.
124    /// {C, E} are the other leaf nodes.
125    ///
126    /// So the partial order is:
127    /// {C, E} -> {D}
128    /// Expanded:
129    /// C -> D
130    /// E -> D
131    ///
132    /// Next, we have to consider UNION as well.
133    /// If a UNION node is the leftmost child,
134    /// then for all subtrees in the UNION,
135    /// their leftmost leaf nodes must come after
136    /// all other leaf nodes in the subtree.
137    ///
138    /// ``` text
139    ///         JOIN (A)
140    ///        /        \
141    ///       JOIN (B)   SCAN (C)
142    ///       /       \
143    ///      /         \
144    ///     UNION (D)   SCAN (E)
145    ///    /        \
146    ///   /         \
147    /// SCAN (F)    JOIN (G)
148    ///            /        \
149    ///           /          \
150    ///          SCAN (H)   SCAN (I)
151    /// ```
152    ///
153    /// In this case, {F, H} are the leftmost leaf nodes.
154    /// {C, E} -> {F, H}
155    /// I -> H
156    /// Expanded:
157    /// C -> F
158    /// E -> F
159    /// C -> H
160    /// E -> H
161    /// I -> H
162    fn fold_backfill_tree_to_partial_order(
163        tree: BackfillTreeNode,
164    ) -> HashMap<RelationId, HashSet<RelationId>> {
165        let mut order: HashMap<RelationId, HashSet<RelationId>> = HashMap::new();
166
167        // Returns terminal nodes of the subtree
168        // This is recursive algorithm we use to traverse the tree and compute partial orders.
169        fn traverse_backfill_tree(
170            tree: BackfillTreeNode,
171            order: &mut HashMap<RelationId, HashSet<RelationId>>,
172            is_leftmost_child: bool,
173            mut prior_terminal_nodes: HashSet<RelationId>,
174        ) -> HashSet<RelationId> {
175            match tree {
176                BackfillTreeNode::Ignored => HashSet::new(),
177                BackfillTreeNode::Scan { id } => {
178                    if is_leftmost_child {
179                        for prior_terminal_node in prior_terminal_nodes {
180                            order.entry(prior_terminal_node).or_default().insert(id);
181                        }
182                    }
183                    HashSet::from([id])
184                }
185                BackfillTreeNode::Union { children } => {
186                    let mut terminal_nodes = HashSet::new();
187                    for child in children {
188                        let child_terminal_nodes = traverse_backfill_tree(
189                            child,
190                            order,
191                            is_leftmost_child,
192                            prior_terminal_nodes.clone(),
193                        );
194                        terminal_nodes.extend(child_terminal_nodes);
195                    }
196                    terminal_nodes
197                }
198                BackfillTreeNode::Join { lhs, rhs } => {
199                    let rhs_terminal_nodes =
200                        traverse_backfill_tree(*rhs, order, false, HashSet::new());
201                    prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
202                    traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
203                }
204            }
205        }
206
207        traverse_backfill_tree(tree, &mut order, false, HashSet::new());
208
209        order
210    }
211
212    pub(super) fn plan_auto_strategy(
213        session: &SessionImpl,
214        plan: StreamPlanRef,
215    ) -> HashMap<RelationId, HashSet<RelationId>> {
216        if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
217            let order = fold_backfill_tree_to_partial_order(tree);
218            if has_cycle(&order) {
219                tracing::warn!(?order, "Backfill order strategy has a cycle");
220                session.notice_to_user("Backfill order strategy has a cycle");
221                return Default::default();
222            }
223            return order;
224        }
225        Default::default()
226    }
227}
228
229mod fixed {
230    use std::collections::{HashMap, HashSet};
231
232    use risingwave_common::bail;
233    use risingwave_pb::id::RelationId;
234    use risingwave_sqlparser::ast::ObjectName;
235
236    use crate::error::Result;
237    use crate::optimizer::backfill_order_strategy::common::{
238        bind_backfill_relation_id_by_name, has_cycle,
239    };
240    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
241    use crate::session::SessionImpl;
242
243    /// Collect all relation IDs (tables and sources) that are scanned in the plan.
244    fn collect_scanned_relation_ids(plan: StreamPlanRef) -> HashSet<RelationId> {
245        let mut relation_ids = HashSet::new();
246
247        fn visit(plan: StreamPlanRef, relation_ids: &mut HashSet<RelationId>) {
248            match plan.node_type() {
249                StreamPlanNodeType::StreamTableScan => {
250                    let table_scan = plan.as_stream_table_scan().expect("table scan");
251                    let relation_id = table_scan.core().table_catalog.id().as_relation_id();
252                    relation_ids.insert(relation_id);
253                }
254                StreamPlanNodeType::StreamSourceScan => {
255                    let source_scan = plan.as_stream_source_scan().expect("source scan");
256                    let relation_id = source_scan.source_catalog().id.as_relation_id();
257                    relation_ids.insert(relation_id);
258                }
259                _ => {}
260            }
261
262            // Recursively visit all inputs
263            for child in plan.inputs() {
264                visit(child, relation_ids);
265            }
266        }
267
268        visit(plan, &mut relation_ids);
269        relation_ids
270    }
271
272    pub(super) fn plan_fixed_strategy(
273        session: &SessionImpl,
274        orders: Vec<(ObjectName, ObjectName)>,
275        plan: StreamPlanRef,
276    ) -> Result<HashMap<RelationId, HashSet<RelationId>>> {
277        // Collect all scanned relation IDs from the plan
278        let scanned_relation_ids = collect_scanned_relation_ids(plan);
279
280        let mut order: HashMap<RelationId, HashSet<RelationId>> = HashMap::new();
281        for (start_name, end_name) in orders {
282            let start_relation_id = bind_backfill_relation_id_by_name(session, start_name.clone())?;
283            let end_relation_id = bind_backfill_relation_id_by_name(session, end_name.clone())?;
284
285            // Validate that both relations are present in the query plan
286            if !scanned_relation_ids.contains(&start_relation_id) {
287                bail!(
288                    "Table or source '{}' specified in backfill_order is not used in the query",
289                    start_name
290                );
291            }
292            if !scanned_relation_ids.contains(&end_relation_id) {
293                bail!(
294                    "Table or source '{}' specified in backfill_order is not used in the query",
295                    end_name
296                );
297            }
298
299            order
300                .entry(start_relation_id)
301                .or_default()
302                .insert(end_relation_id);
303        }
304        if has_cycle(&order) {
305            bail!("Backfill order strategy has a cycle");
306        }
307        Ok(order)
308    }
309}
310
311mod common {
312    use std::collections::{HashMap, HashSet};
313
314    use risingwave_pb::id::RelationId;
315    use risingwave_sqlparser::ast::ObjectName;
316
317    use crate::Binder;
318    use crate::catalog::CatalogError;
319    use crate::catalog::root_catalog::SchemaPath;
320    use crate::catalog::schema_catalog::SchemaCatalog;
321    use crate::error::Result;
322    use crate::session::SessionImpl;
323
324    /// Check if the backfill order has a cycle.
325    pub(super) fn has_cycle(order: &HashMap<RelationId, HashSet<RelationId>>) -> bool {
326        fn dfs(
327            node: RelationId,
328            order: &HashMap<RelationId, HashSet<RelationId>>,
329            visited: &mut HashSet<RelationId>,
330            stack: &mut HashSet<RelationId>,
331        ) -> bool {
332            if stack.contains(&node) {
333                return true; // Cycle detected
334            }
335
336            if visited.insert(node) {
337                stack.insert(node);
338                if let Some(downstreams) = order.get(&node) {
339                    for neighbor in downstreams {
340                        if dfs(*neighbor, order, visited, stack) {
341                            return true;
342                        }
343                    }
344                }
345                stack.remove(&node);
346            }
347            false
348        }
349
350        let mut visited = HashSet::new();
351        let mut stack = HashSet::new();
352        for &start in order.keys() {
353            if dfs(start, order, &mut visited, &mut stack) {
354                return true;
355            }
356        }
357
358        false
359    }
360
361    pub(super) fn bind_backfill_relation_id_by_name(
362        session: &SessionImpl,
363        name: ObjectName,
364    ) -> Result<RelationId> {
365        let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
366        let db_name = db_name.unwrap_or(session.database());
367
368        let reader = session.env().catalog_reader().read_guard();
369
370        match schema_name {
371            Some(name) => {
372                let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
373                bind_table(schema_catalog, &rel_name)
374            }
375            None => {
376                let search_path = session.config().search_path();
377                let user_name = session.user_name();
378                let schema_path = SchemaPath::Path(&search_path, &user_name);
379                let result: crate::error::Result<Option<(RelationId, &str)>> = schema_path
380                    .try_find(|schema_name| {
381                        if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
382                            && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
383                        {
384                            Ok(Some(relation_id))
385                        } else {
386                            Ok(None)
387                        }
388                    });
389                if let Some((relation_id, _schema_name)) = result? {
390                    return Ok(relation_id);
391                }
392                Err(CatalogError::not_found("table", &rel_name).into())
393            }
394        }
395    }
396
397    fn bind_table(
398        schema_catalog: &SchemaCatalog,
399        name: &String,
400    ) -> crate::error::Result<RelationId> {
401        if let Some(table) = schema_catalog.get_created_table_by_name(name) {
402            Ok(table.id().as_relation_id())
403        } else if let Some(source) = schema_catalog.get_source_by_name(name) {
404            Ok(source.id.as_relation_id())
405        } else {
406            Err(CatalogError::not_found("table or source", name).into())
407        }
408    }
409}
410
411pub mod display {
412    use risingwave_pb::stream_plan::BackfillOrder;
413
414    use crate::session::SessionImpl;
415
416    fn get_table_name(session: &SessionImpl, id: u32) -> crate::error::Result<String> {
417        let catalog_reader = session.env().catalog_reader().read_guard();
418        let table_catalog = catalog_reader.get_any_table_by_id(id.into())?;
419        let table_name = table_catalog.name();
420        let db_id = table_catalog.database_id;
421        let schema_id = table_catalog.schema_id;
422        let schema_catalog = catalog_reader.get_schema_by_id(db_id, schema_id)?;
423        let schema_name = schema_catalog.name();
424        let name = format!("{}.{}", schema_name, table_name);
425        Ok(name)
426    }
427
428    pub(super) fn print_backfill_order_in_dot_format(
429        session: &SessionImpl,
430        order: BackfillOrder,
431    ) -> crate::error::Result<String> {
432        let mut result = String::new();
433        result.push_str("digraph G {\n");
434        // NOTE(kwannoel): This is a hack to make the edge ordering deterministic.
435        // so our planner tests are deterministic.
436        let mut edges = vec![];
437        for (start, end) in order.order {
438            let start_name = get_table_name(session, start.as_raw_id())?;
439            for end in end.data {
440                let end_name = get_table_name(session, end)?;
441                edges.push(format!("  \"{}\" -> \"{}\";\n", start_name, end_name));
442            }
443        }
444        edges.sort();
445        for edge in edges {
446            result.push_str(&edge);
447        }
448        result.push_str("}\n");
449        Ok(result)
450    }
451}
452
453/// We only bind tables and materialized views.
454/// We need to bind sources and indices in the future as well.
455/// For auto backfill strategy,
456/// if a cycle forms due to the same relation being scanned twice in the derived order,
457/// we won't generate any backfill order strategy.
458/// For fixed backfill strategy,
459/// for scans on the same relation id, even though they may be in different fragments,
460/// they will all share the same backfill order.
461pub fn plan_backfill_order(
462    session: &SessionImpl,
463    backfill_order_strategy: BackfillOrderStrategy,
464    plan: StreamPlanRef,
465) -> Result<BackfillOrder> {
466    let order = match backfill_order_strategy {
467        BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
468        BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
469        BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders, plan)?,
470    };
471    Ok(BackfillOrder {
472        order: order
473            .into_iter()
474            .map(|(relation_id, dependencies)| {
475                (
476                    relation_id,
477                    Uint32Vector {
478                        data: dependencies
479                            .into_iter()
480                            .map(RelationId::as_raw_id)
481                            .collect(),
482                    },
483                )
484            })
485            .collect(),
486    })
487}
488
489/// Plan the backfill order, and also output the backfill tree.
490pub fn explain_backfill_order_in_dot_format(
491    session: &SessionImpl,
492    backfill_order_strategy: BackfillOrderStrategy,
493    plan: StreamPlanRef,
494) -> Result<String> {
495    let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
496    let dot_formatted_backfill_order = display::print_backfill_order_in_dot_format(session, order)?;
497    Ok(dot_formatted_backfill_order)
498}