risingwave_frontend/optimizer/
backfill_order_strategy.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::BackfillOrder;
16use risingwave_sqlparser::ast::BackfillOrderStrategy;
17
18use crate::error::Result;
19use crate::optimizer::backfill_order_strategy::auto::plan_auto_strategy;
20use crate::optimizer::backfill_order_strategy::fixed::plan_fixed_strategy;
21use crate::optimizer::plan_node::StreamPlanRef;
22use crate::session::SessionImpl;
23
24type ObjectId = u32;
25
26pub mod auto {
27    use std::collections::{HashMap, HashSet};
28
29    use risingwave_pb::common::Uint32Vector;
30
31    use super::ObjectId;
32    use crate::optimizer::backfill_order_strategy::common::has_cycle;
33    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
34    use crate::session::SessionImpl;
35
36    #[derive(Debug)]
37    pub(super) enum BackfillTreeNode {
38        Join {
39            lhs: Box<BackfillTreeNode>,
40            rhs: Box<BackfillTreeNode>,
41        },
42        Scan {
43            id: ObjectId,
44        },
45        Union {
46            children: Vec<BackfillTreeNode>,
47        },
48        Ignored,
49    }
50
51    /// TODO: Handle stream share
52    fn plan_graph_to_backfill_tree(
53        session: &SessionImpl,
54        plan: StreamPlanRef,
55    ) -> Option<BackfillTreeNode> {
56        match plan.node_type() {
57            StreamPlanNodeType::StreamHashJoin => {
58                assert_eq!(plan.inputs().len(), 2);
59                let mut inputs = plan.inputs().into_iter();
60                let l = inputs.next().unwrap();
61                let r = inputs.next().unwrap();
62                Some(BackfillTreeNode::Join {
63                    lhs: Box::new(plan_graph_to_backfill_tree(session, l)?),
64                    rhs: Box::new(plan_graph_to_backfill_tree(session, r)?),
65                })
66            }
67            StreamPlanNodeType::StreamTableScan => {
68                let table_scan = plan.as_stream_table_scan().expect("table scan");
69                let relation_id = table_scan.core().table_catalog.id().as_raw_id();
70                Some(BackfillTreeNode::Scan { id: relation_id })
71            }
72            StreamPlanNodeType::StreamSourceScan => {
73                let source_scan = plan.as_stream_source_scan().expect("source scan");
74                let relation_id = source_scan.source_catalog().id.as_raw_id();
75                Some(BackfillTreeNode::Scan { id: relation_id })
76            }
77            StreamPlanNodeType::StreamUnion => {
78                let inputs = plan.inputs();
79                let mut children = Vec::with_capacity(inputs.len());
80                for child in inputs {
81                    let subtree = plan_graph_to_backfill_tree(session, child)?;
82                    if matches!(subtree, BackfillTreeNode::Ignored) {
83                        continue;
84                    }
85                    children.push(subtree);
86                }
87                Some(BackfillTreeNode::Union { children })
88            }
89            node_type => {
90                let inputs = plan.inputs();
91                match inputs.len() {
92                    0 => Some(BackfillTreeNode::Ignored),
93                    1 => {
94                        let mut inputs = inputs.into_iter();
95                        let child = inputs.next().unwrap();
96                        plan_graph_to_backfill_tree(session, child)
97                    }
98                    _ => {
99                        session.notice_to_user(format!(
100                            "Backfill order strategy is not supported for {:?}",
101                            node_type
102                        ));
103                        None
104                    }
105                }
106            }
107        }
108    }
109
110    /// For a given subtree, all the leaf nodes in the leftmost leaf-node node
111    /// must come _after_ all other leaf nodes in the subtree.
112    /// For example, for the following tree:
113    ///
114    /// ```text
115    ///       JOIN (A)
116    ///      /        \
117    ///     JOIN (B)   SCAN (C)
118    ///    /        \
119    ///   /         \
120    /// /           \
121    /// SCAN (D)    SCAN (E)
122    /// ```
123    ///
124    /// D is the leftmost leaf node.
125    /// {C, E} are the other leaf nodes.
126    ///
127    /// So the partial order is:
128    /// {C, E} -> {D}
129    /// Expanded:
130    /// C -> D
131    /// E -> D
132    ///
133    /// Next, we have to consider UNION as well.
134    /// If a UNION node is the leftmost child,
135    /// then for all subtrees in the UNION,
136    /// their leftmost leaf nodes must come after
137    /// all other leaf nodes in the subtree.
138    ///
139    /// ``` text
140    ///         JOIN (A)
141    ///        /        \
142    ///       JOIN (B)   SCAN (C)
143    ///       /       \
144    ///      /         \
145    ///     UNION (D)   SCAN (E)
146    ///    /        \
147    ///   /         \
148    /// SCAN (F)    JOIN (G)
149    ///            /        \
150    ///           /          \
151    ///          SCAN (H)   SCAN (I)
152    /// ```
153    ///
154    /// In this case, {F, H} are the leftmost leaf nodes.
155    /// {C, E} -> {F, H}
156    /// I -> H
157    /// Expanded:
158    /// C -> F
159    /// E -> F
160    /// C -> H
161    /// E -> H
162    /// I -> H
163    fn fold_backfill_tree_to_partial_order(
164        tree: BackfillTreeNode,
165    ) -> HashMap<ObjectId, Uint32Vector> {
166        let mut order: HashMap<ObjectId, HashSet<ObjectId>> = HashMap::new();
167
168        // Returns terminal nodes of the subtree
169        // This is recursive algorithm we use to traverse the tree and compute partial orders.
170        fn traverse_backfill_tree(
171            tree: BackfillTreeNode,
172            order: &mut HashMap<ObjectId, HashSet<ObjectId>>,
173            is_leftmost_child: bool,
174            mut prior_terminal_nodes: HashSet<ObjectId>,
175        ) -> HashSet<ObjectId> {
176            match tree {
177                BackfillTreeNode::Ignored => HashSet::new(),
178                BackfillTreeNode::Scan { id } => {
179                    if is_leftmost_child {
180                        for prior_terminal_node in prior_terminal_nodes {
181                            order.entry(prior_terminal_node).or_default().insert(id);
182                        }
183                    }
184                    HashSet::from([id])
185                }
186                BackfillTreeNode::Union { children } => {
187                    let mut terminal_nodes = HashSet::new();
188                    for child in children {
189                        let child_terminal_nodes = traverse_backfill_tree(
190                            child,
191                            order,
192                            is_leftmost_child,
193                            prior_terminal_nodes.clone(),
194                        );
195                        terminal_nodes.extend(child_terminal_nodes);
196                    }
197                    terminal_nodes
198                }
199                BackfillTreeNode::Join { lhs, rhs } => {
200                    let rhs_terminal_nodes =
201                        traverse_backfill_tree(*rhs, order, false, HashSet::new());
202                    prior_terminal_nodes.extend(rhs_terminal_nodes.iter().cloned());
203                    traverse_backfill_tree(*lhs, order, true, prior_terminal_nodes)
204                }
205            }
206        }
207
208        traverse_backfill_tree(tree, &mut order, false, HashSet::new());
209
210        order
211            .into_iter()
212            .map(|(k, v)| {
213                let data = v.into_iter().collect();
214                (k, Uint32Vector { data })
215            })
216            .collect()
217    }
218
219    pub(super) fn plan_auto_strategy(
220        session: &SessionImpl,
221        plan: StreamPlanRef,
222    ) -> HashMap<ObjectId, Uint32Vector> {
223        if let Some(tree) = plan_graph_to_backfill_tree(session, plan) {
224            let order = fold_backfill_tree_to_partial_order(tree);
225            if has_cycle(&order) {
226                tracing::warn!(?order, "Backfill order strategy has a cycle");
227                session.notice_to_user("Backfill order strategy has a cycle");
228                return Default::default();
229            }
230            return order;
231        }
232        Default::default()
233    }
234}
235
236mod fixed {
237    use std::collections::{HashMap, HashSet};
238
239    use risingwave_common::bail;
240    use risingwave_pb::common::Uint32Vector;
241    use risingwave_sqlparser::ast::ObjectName;
242
243    use super::ObjectId;
244    use crate::error::Result;
245    use crate::optimizer::backfill_order_strategy::common::{
246        bind_backfill_relation_id_by_name, has_cycle,
247    };
248    use crate::optimizer::plan_node::{StreamPlanNodeType, StreamPlanRef};
249    use crate::session::SessionImpl;
250
251    /// Collect all relation IDs (tables and sources) that are scanned in the plan.
252    fn collect_scanned_relation_ids(plan: StreamPlanRef) -> HashSet<ObjectId> {
253        let mut relation_ids = HashSet::new();
254
255        fn visit(plan: StreamPlanRef, relation_ids: &mut HashSet<ObjectId>) {
256            match plan.node_type() {
257                StreamPlanNodeType::StreamTableScan => {
258                    let table_scan = plan.as_stream_table_scan().expect("table scan");
259                    let relation_id = table_scan.core().table_catalog.id().as_raw_id();
260                    relation_ids.insert(relation_id);
261                }
262                StreamPlanNodeType::StreamSourceScan => {
263                    let source_scan = plan.as_stream_source_scan().expect("source scan");
264                    let relation_id = source_scan.source_catalog().id.as_raw_id();
265                    relation_ids.insert(relation_id);
266                }
267                _ => {}
268            }
269
270            // Recursively visit all inputs
271            for child in plan.inputs() {
272                visit(child, relation_ids);
273            }
274        }
275
276        visit(plan, &mut relation_ids);
277        relation_ids
278    }
279
280    pub(super) fn plan_fixed_strategy(
281        session: &SessionImpl,
282        orders: Vec<(ObjectName, ObjectName)>,
283        plan: StreamPlanRef,
284    ) -> Result<HashMap<ObjectId, Uint32Vector>> {
285        // Collect all scanned relation IDs from the plan
286        let scanned_relation_ids = collect_scanned_relation_ids(plan);
287
288        let mut order: HashMap<ObjectId, Uint32Vector> = HashMap::new();
289        for (start_name, end_name) in orders {
290            let start_relation_id = bind_backfill_relation_id_by_name(session, start_name.clone())?;
291            let end_relation_id = bind_backfill_relation_id_by_name(session, end_name.clone())?;
292
293            // Validate that both relations are present in the query plan
294            if !scanned_relation_ids.contains(&start_relation_id) {
295                bail!(
296                    "Table or source '{}' specified in backfill_order is not used in the query",
297                    start_name
298                );
299            }
300            if !scanned_relation_ids.contains(&end_relation_id) {
301                bail!(
302                    "Table or source '{}' specified in backfill_order is not used in the query",
303                    end_name
304                );
305            }
306
307            order
308                .entry(start_relation_id)
309                .or_default()
310                .data
311                .push(end_relation_id);
312        }
313        if has_cycle(&order) {
314            bail!("Backfill order strategy has a cycle");
315        }
316        Ok(order)
317    }
318}
319
320mod common {
321    use std::collections::{HashMap, HashSet};
322
323    use risingwave_pb::common::Uint32Vector;
324    use risingwave_sqlparser::ast::ObjectName;
325
326    use super::ObjectId;
327    use crate::Binder;
328    use crate::catalog::CatalogError;
329    use crate::catalog::root_catalog::SchemaPath;
330    use crate::catalog::schema_catalog::SchemaCatalog;
331    use crate::error::Result;
332    use crate::session::SessionImpl;
333
334    /// Check if the backfill order has a cycle.
335    pub(super) fn has_cycle(order: &HashMap<ObjectId, Uint32Vector>) -> bool {
336        fn dfs(
337            node: ObjectId,
338            order: &HashMap<ObjectId, Uint32Vector>,
339            visited: &mut HashSet<ObjectId>,
340            stack: &mut HashSet<ObjectId>,
341        ) -> bool {
342            if stack.contains(&node) {
343                return true; // Cycle detected
344            }
345
346            if visited.insert(node) {
347                stack.insert(node);
348                if let Some(downstreams) = order.get(&node) {
349                    for neighbor in &downstreams.data {
350                        if dfs(*neighbor, order, visited, stack) {
351                            return true;
352                        }
353                    }
354                }
355                stack.remove(&node);
356            }
357            false
358        }
359
360        let mut visited = HashSet::new();
361        let mut stack = HashSet::new();
362        for &start in order.keys() {
363            if dfs(start, order, &mut visited, &mut stack) {
364                return true;
365            }
366        }
367
368        false
369    }
370
371    pub(super) fn bind_backfill_relation_id_by_name(
372        session: &SessionImpl,
373        name: ObjectName,
374    ) -> Result<ObjectId> {
375        let (db_name, schema_name, rel_name) = Binder::resolve_db_schema_qualified_name(&name)?;
376        let db_name = db_name.unwrap_or(session.database());
377
378        let reader = session.env().catalog_reader().read_guard();
379
380        match schema_name {
381            Some(name) => {
382                let schema_catalog = reader.get_schema_by_name(&db_name, &name)?;
383                bind_table(schema_catalog, &rel_name)
384            }
385            None => {
386                let search_path = session.config().search_path();
387                let user_name = session.user_name();
388                let schema_path = SchemaPath::Path(&search_path, &user_name);
389                let result: crate::error::Result<Option<(ObjectId, &str)>> =
390                    schema_path.try_find(|schema_name| {
391                        if let Ok(schema_catalog) = reader.get_schema_by_name(&db_name, schema_name)
392                            && let Ok(relation_id) = bind_table(schema_catalog, &rel_name)
393                        {
394                            Ok(Some(relation_id))
395                        } else {
396                            Ok(None)
397                        }
398                    });
399                if let Some((relation_id, _schema_name)) = result? {
400                    return Ok(relation_id);
401                }
402                Err(CatalogError::NotFound("table", rel_name.clone()).into())
403            }
404        }
405    }
406
407    fn bind_table(schema_catalog: &SchemaCatalog, name: &String) -> crate::error::Result<ObjectId> {
408        if let Some(table) = schema_catalog.get_created_table_by_name(name) {
409            Ok(table.id().as_raw_id())
410        } else if let Some(source) = schema_catalog.get_source_by_name(name) {
411            Ok(source.id.as_raw_id())
412        } else {
413            Err(CatalogError::NotFound("table or source", name.to_owned()).into())
414        }
415    }
416}
417
418pub mod display {
419    use risingwave_pb::stream_plan::BackfillOrder;
420
421    use super::ObjectId;
422    use crate::session::SessionImpl;
423
424    fn get_table_name(session: &SessionImpl, id: ObjectId) -> crate::error::Result<String> {
425        let catalog_reader = session.env().catalog_reader().read_guard();
426        let table_catalog = catalog_reader.get_any_table_by_id(id.into())?;
427        let table_name = table_catalog.name();
428        let db_id = table_catalog.database_id;
429        let schema_id = table_catalog.schema_id;
430        let schema_catalog = catalog_reader.get_schema_by_id(db_id, schema_id)?;
431        let schema_name = schema_catalog.name();
432        let name = format!("{}.{}", schema_name, table_name);
433        Ok(name)
434    }
435
436    pub(super) fn print_backfill_order_in_dot_format(
437        session: &SessionImpl,
438        order: BackfillOrder,
439    ) -> crate::error::Result<String> {
440        let mut result = String::new();
441        result.push_str("digraph G {\n");
442        // NOTE(kwannoel): This is a hack to make the edge ordering deterministic.
443        // so our planner tests are deterministic.
444        let mut edges = vec![];
445        for (start, end) in order.order {
446            let start_name = get_table_name(session, start)?;
447            for end in end.data {
448                let end_name = get_table_name(session, end)?;
449                edges.push(format!("  \"{}\" -> \"{}\";\n", start_name, end_name));
450            }
451        }
452        edges.sort();
453        for edge in edges {
454            result.push_str(&edge);
455        }
456        result.push_str("}\n");
457        Ok(result)
458    }
459}
460
461/// We only bind tables and materialized views.
462/// We need to bind sources and indices in the future as well.
463/// For auto backfill strategy,
464/// if a cycle forms due to the same relation being scanned twice in the derived order,
465/// we won't generate any backfill order strategy.
466/// For fixed backfill strategy,
467/// for scans on the same relation id, even though they may be in different fragments,
468/// they will all share the same backfill order.
469pub fn plan_backfill_order(
470    session: &SessionImpl,
471    backfill_order_strategy: BackfillOrderStrategy,
472    plan: StreamPlanRef,
473) -> Result<BackfillOrder> {
474    let order = match backfill_order_strategy {
475        BackfillOrderStrategy::Default | BackfillOrderStrategy::None => Default::default(),
476        BackfillOrderStrategy::Auto => plan_auto_strategy(session, plan),
477        BackfillOrderStrategy::Fixed(orders) => plan_fixed_strategy(session, orders, plan)?,
478    };
479    Ok(BackfillOrder { order })
480}
481
482/// Plan the backfill order, and also output the backfill tree.
483pub fn explain_backfill_order_in_dot_format(
484    session: &SessionImpl,
485    backfill_order_strategy: BackfillOrderStrategy,
486    plan: StreamPlanRef,
487) -> Result<String> {
488    let order = plan_backfill_order(session, backfill_order_strategy, plan)?;
489    let dot_formatted_backfill_order = display::print_backfill_order_in_dot_format(session, order)?;
490    Ok(dot_formatted_backfill_order)
491}