risingwave_frontend/optimizer/
backfill_order_strategy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::error::Result;
19use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
20use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
21use crate::optimizer::plan_node::StreamPlanRef;
22use crate::session::SessionImpl;
23
24pub mod auto {
25    use std::collections::{HashMap, HashSet};
26
27    use risingwave_common::catalog::ObjectId;
28    use risingwave_pb::common::Uint32Vector;
29
30    use crate::optimizer::backfill_order_strategy::common::has_cycle;
31    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
32    use crate::session::SessionImpl;
33
34    #[derive(Debug)]
35    pub enum BackfillTreeNode {
36        Join {
37            lhs: Box<BackfillTreeNode>,
38            rhs: Box<BackfillTreeNode>,
39        },
40        Scan {
41            id: ObjectId,
42        },
43        Union {
44            children: Vec<BackfillTreeNode>,
45        },
46        Ignored,
47    }
48
49    /// TODO: Handle stream share
50    fn plan_graph_to_backfill_tree(
51        session: &SessionImpl,
52        plan: StreamPlanRef,
53    ) -> Option<BackfillTreeNode> {
54        match plan.node_type() {
55            StreamPlanNodeType::StreamHashJoin => {
56                assert_eq!(plan.inputs().len(), 2);
57                let mut inputs = plan.inputs().into_iter();
58                let l = inputs.next().unwrap();
59                let r = inputs.next().unwrap();
60                Some(BackfillTreeNode::Join {
61                    lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
62                    rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
63                })
64            }
65            StreamPlanNodeType::StreamTableScan => {
66                let table_scan = plan.as_stream_table_scan().expect("table scan");
67                let relation_id = table_scan.core().table_catalog.id().into();
68                Some(BackfillTreeNode::Scan { id: relation_id })
69            }
70            StreamPlanNodeType::StreamSourceScan => {
71                let source_scan = plan.as_stream_source_scan().expect("source scan");
72                let relation_id = source_scan.source_catalog().id;
73                Some(BackfillTreeNode::Scan { id: relation_id })
74            }
75            StreamPlanNodeType::StreamUnion => {
76                let inputs = plan.inputs();
77                let mut children = Vec::with_capacity(inputs.len());
78                for child in inputs {
79                    let subtree = plan_graph_to_backfill_tree(session, child)?;
80                    if matches!(subtree, BackfillTreeNode::Ignored) {
81                        continue;
82                    }
83                    children.push(subtree);
84                }
85                Some(BackfillTreeNode::Union { children })
86            }
87            node_type => {
88                let inputs = plan.inputs();
89                match inputs.len() {
90                    0 => Some(BackfillTreeNode::Ignored),
91                    1 => {
92                        let mut inputs = inputs.into_iter();
93                        let child = inputs.next().unwrap();
94                        plan_graph_to_backfill_tree(session, child)
95                    }
96                    _ => {
97                        session.notice_to_user(format!(
98                            "Backfill order strategy is not supported for {:?}",
99                            node_type
100                        ));
101                        None
102                    }
103                }
104            }
105        }
106    }
107
108    /// For a given subtree, all the leaf nodes in the leftmost leaf-node node
109    /// must come _after_ all other leaf nodes in the subtree.
110    /// For example, for the following tree:
111    ///
112    /// ```text
113    ///       JOIN (A)
114    ///      /        \
115    ///     JOIN (B)   SCAN (C)
116    ///    /        \
117    ///   /         \
118    /// /           \
119    /// SCAN (D)    SCAN (E)
120    /// ```
121    ///
122    /// D is the leftmost leaf node.
123    /// {C, E} are the other leaf nodes.
124    ///
125    /// So the partial order is:
126    /// {C, E} -> {D}
127    /// Expanded:
128    /// C -> D
129    /// E -> D
130    ///
131    /// Next, we have to consider UNION as well.
132    /// If a UNION node is the leftmost child,
133    /// then for all subtrees in the UNION,
134    /// their leftmost leaf nodes must come after
135    /// all other leaf nodes in the subtree.
136    ///
137    /// ``` text
138    ///         JOIN (A)
139    ///        /        \
140    ///       JOIN (B)   SCAN (C)
141    ///       /       \
142    ///      /         \
143    ///     UNION (D)   SCAN (E)
144    ///    /        \
145    ///   /         \
146    /// SCAN (F)    JOIN (G)
147    ///            /        \
148    ///           /          \
149    ///          SCAN (H)   SCAN (I)
150    /// ```
151    ///
152    /// In this case, {F, H} are the leftmost leaf nodes.
153    /// {C, E} -> {F, H}
154    /// I -> H
155    /// Expanded:
156    /// C -> F
157    /// E -> F
158    /// C -> H
159    /// E -> H
160    /// I -> H
161    fn fold_backfill_tree_to_partial_order(
162        tree: BackfillTreeNode,
163    ) -> HashMap<ObjectId, Uint32Vector> {
164        let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
165
166        // Returns terminal nodes of the subtree
167        // This is recursive algorithm we use to traverse the tree and compute partial orders.
168        fn traverse_backfill_tree(
169            tree: BackfillTreeNode,
170            order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
171            is_leftmost_child: bool,
172            mut prior_terminal_nodes: HashSet<ObjectId>,
173        ) -> HashSet<ObjectId> {
174            match tree {
175                BackfillTreeNode::Ignored => HashSet::new(),
176                BackfillTreeNode::Scan { id } => {
177                    if is_leftmost_child {
178                        for prior_terminal_node in prior_terminal_nodes {
179                            order.entry(prior_terminal_node).or_default().insert(id);
180                        }
181                    }
182                    HashSet::from([id])
183                }
184                BackfillTreeNode::Union { children } => {
185                    let mut terminal_nodes = HashSet::new();
186                    for child in children {
187                        let child_terminal_nodes = traverse_backfill_tree(
188                            child,
189                            order,
190                            is_leftmost_child,
191                            prior_terminal_nodes.clone(),
192                        );
193                        terminal_nodes.extend(child_terminal_nodes);
194                    }
195                    terminal_nodes
196                }
197                BackfillTreeNode::Join { lhs, rhs } => {
198                    let rhs_terminal_nodes =
199                        traverse_backfill_tree(*rhs, order, false, HashSet::new());
200                    prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
201                    traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
202                }
203            }
204        }
205
206        traverse_backfill_tree(tree, &mut order, false, HashSet::new());
207
208        order
209            .into_iter()
210            .map(|(k, v)| {
211                let data = v.into_iter().collect();
212                (k, Uint32Vector { data })
213            })
214            .collect()
215    }
216
217    pub(super) fn plan_auto_strategy(
218        session: &SessionImpl,
219        plan: StreamPlanRef,
220    ) -> HashMap<ObjectId, Uint32Vector> {
221        if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
222            let order = fold_backfill_tree_to_partial_order(tree);
223            if has_cycle(&order) {
224                tracing::warn!(?order, "Backfill order strategy has a cycle");
225                session.notice_to_user("Backfill order strategy has a cycle");
226                return Default::default();
227            }
228            return order;
229        }
230        Default::default()
231    }
232}
233
234mod fixed {
235    use std::collections::{HashMap, HashSet};
236
237    use risingwave_common::bail;
238    use risingwave_common::catalog::ObjectId;
239    use risingwave_pb::common::Uint32Vector;
240    use risingwave_sqlparser::ast::ObjectName;
241
242    use crate::error::Result;
243    use crate::optimizer::backfill_order_strategy::common::{
244        bind_backfill_relation_id_by_name, has_cycle,
245    };
246    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
247    use crate::session::SessionImpl;
248
249    /// Collect all relation IDs (tables and sources) that are scanned in the plan.
250    fn collect_scanned_relation_ids(plan: StreamPlanRef) -> HashSet<ObjectId> {
251        let mut relation_ids = HashSet::new();
252
253        fn visit(plan: StreamPlanRef, relation_ids: &mut HashSet<ObjectId>) {
254            match plan.node_type() {
255                StreamPlanNodeType::StreamTableScan => {
256                    let table_scan = plan.as_stream_table_scan().expect("table scan");
257                    let relation_id = table_scan.core().table_catalog.id().into();
258                    relation_ids.insert(relation_id);
259                }
260                StreamPlanNodeType::StreamSourceScan => {
261                    let source_scan = plan.as_stream_source_scan().expect("source scan");
262                    let relation_id = source_scan.source_catalog().id;
263                    relation_ids.insert(relation_id);
264                }
265                _ => {}
266            }
267
268            // Recursively visit all inputs
269            for child in plan.inputs() {
270                visit(child, relation_ids);
271            }
272        }
273
274        visit(plan, &mut relation_ids);
275        relation_ids
276    }
277
278    pub(super) fn plan_fixed_strategy(
279        session: &SessionImpl,
280        orders: Vec<(ObjectName, ObjectName)>,
281        plan: StreamPlanRef,
282    ) -> Result<HashMap<ObjectId, Uint32Vector>> {
283        // Collect all scanned relation IDs from the plan
284        let scanned_relation_ids = collect_scanned_relation_ids(plan);
285
286        let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
287        for (start_name, end_name) in orders {
288            let start_relation_id = bind_backfill_relation_id_by_name(session, start_name.clone())?;
289            let end_relation_id = bind_backfill_relation_id_by_name(session, end_name.clone())?;
290
291            // Validate that both relations are present in the query plan
292            if !scanned_relation_ids.contains(&start_relation_id) {
293                bail!(
294                    "Table or source '{}' specified in backfill_order is not used in the query",
295                    start_name
296                );
297            }
298            if !scanned_relation_ids.contains(&end_relation_id) {
299                bail!(
300                    "Table or source '{}' specified in backfill_order is not used in the query",
301                    end_name
302                );
303            }
304
305            order
306                .entry(start_relation_id)
307                .or_default()
308                .data
309                .push(end_relation_id);
310        }
311        if has_cycle(&order) {
312            bail!("Backfill order strategy has a cycle");
313        }
314        Ok(order)
315    }
316}
317
318mod common {
319    use std::collections::{HashMap, HashSet};
320
321    use risingwave_common::catalog::ObjectId;
322    use risingwave_pb::common::Uint32Vector;
323    use risingwave_sqlparser::ast::ObjectName;
324
325    use crate::Binder;
326    use crate::catalog::CatalogError;
327    use crate::catalog::root_catalog::SchemaPath;
328    use crate::catalog::schema_catalog::SchemaCatalog;
329    use crate::error::Result;
330    use crate::session::SessionImpl;
331
332    /// Check if the backfill order has a cycle.
333    pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
334        fn dfs(
335            node: ObjectId,
336            order: &HashMap<ObjectId, Uint32Vector>,
337            visited: &mut HashSet<ObjectId>,
338            stack: &mut HashSet<ObjectId>,
339        ) -> bool {
340            if stack.contains(&node) {
341                return true; // Cycle detected
342            }
343
344            if visited.insert(node) {
345                stack.insert(node);
346                if let Some(downstreams) = order.get(&node) {
347                    for neighbor in &downstreams.data {
348                        if dfs(*neighbor, order, visited, stack) {
349                            return true;
350                        }
351                    }
352                }
353                stack.remove(&node);
354            }
355            false
356        }
357
358        let mut visited = HashSet::new();
359        let mut stack = HashSet::new();
360        for &start in order.keys() {
361            if dfs(start, order, &mut visited, &mut stack) {
362                return true;
363            }
364        }
365
366        false
367    }
368
369    pub(super) fn bind_backfill_relation_id_by_name(
370        session: &SessionImpl,
371        name: ObjectName,
372    ) -> Result<ObjectId> {
373        let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
374        let db_name = db_name.unwrap_or(session.database());
375
376        let reader = session.env().catalog_reader().read_guard();
377
378        match schema_name {
379            Some(name) => {
380                let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
381                bind_table(schema_catalog, &rel_name)
382            }
383            None => {
384                let search_path = session.config().search_path();
385                let user_name = session.user_name();
386                let schema_path = SchemaPath::Path(&search_path, &user_name);
387                let result: crate::error::Result<Option<(ObjectId, &str)>> =
388                    schema_path.try_find(|schema_name| {
389                        if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
390                            && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
391                        {
392                            Ok(Some(relation_id))
393                        } else {
394                            Ok(None)
395                        }
396                    });
397                if let Some((relation_id, _schema_name)) = result? {
398                    return Ok(relation_id);
399                }
400                Err(CatalogError::NotFound("table", rel_name.clone()).into())
401            }
402        }
403    }
404
405    fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
406        if let Some(table) = schema_catalog.get_created_table_by_name(name) {
407            Ok(table.id().as_raw_id())
408        } else if let Some(source) = schema_catalog.get_source_by_name(name) {
409            Ok(source.id)
410        } else {
411            Err(CatalogError::NotFound("table or source", name.to_owned()).into())
412        }
413    }
414}
415
416pub mod display {
417    use risingwave_common::catalog::ObjectId;
418    use risingwave_pb::stream_plan::BackfillOrder;
419
420    use crate::session::SessionImpl;
421
422    fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
423        let catalog_reader = session.env().catalog_reader().read_guard();
424        let table_catalog = catalog_reader.get_any_table_by_id(&(id.into()))?;
425        let table_name = table_catalog.name();
426        let db_id = table_catalog.database_id;
427        let schema_id = table_catalog.schema_id;
428        let schema_catalog = catalog_reader.get_schema_by_id(db_id, schema_id)?;
429        let schema_name = schema_catalog.name();
430        let name = format!("{}.{}", schema_name, table_name);
431        Ok(name)
432    }
433
434    pub(super) fn print_backfill_order_in_dot_format(
435        session: &SessionImpl,
436        order: BackfillOrder,
437    ) -> crate::error::Result<String> {
438        let mut result = String::new();
439        result.push_str("digraph G {\n");
440        // NOTE(kwannoel): This is a hack to make the edge ordering deterministic.
441        // so our planner tests are deterministic.
442        let mut edges = vec![];
443        for (start, end) in order.order {
444            let start_name = get_table_name(session, start)?;
445            for end in end.data {
446                let end_name = get_table_name(session, end)?;
447                edges.push(format!("  \"{}\" -> \"{}\";\n", start_name, end_name));
448            }
449        }
450        edges.sort();
451        for edge in edges {
452            result.push_str(&edge);
453        }
454        result.push_str("}\n");
455        Ok(result)
456    }
457}
458
459/// We only bind tables and materialized views.
460/// We need to bind sources and indices in the future as well.
461/// For auto backfill strategy,
462/// if a cycle forms due to the same relation being scanned twice in the derived order,
463/// we won't generate any backfill order strategy.
464/// For fixed backfill strategy,
465/// for scans on the same relation id, even though they may be in different fragments,
466/// they will all share the same backfill order.
467pub fn plan_backfill_order(
468    session: &SessionImpl,
469    backfill_order_strategy: BackfillOrderStrategy,
470    plan: StreamPlanRef,
471) -> Result<BackfillOrder> {
472    let order = match backfill_order_strategy {
473        BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
474        BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
475        BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders, plan)?,
476    };
477    Ok(BackfillOrder { order })
478}
479
480/// Plan the backfill order, and also output the backfill tree.
481pub fn explain_backfill_order_in_dot_format(
482    session: &SessionImpl,
483    backfill_order_strategy: BackfillOrderStrategy,
484    plan: StreamPlanRef,
485) -> Result<String> {
486    let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
487    let dot_formatted_backfill_order = display::print_backfill_order_in_dot_format(session, order)?;
488    Ok(dot_formatted_backfill_order)
489}