risingwave_frontend/optimizer/plan_visitor/
relation_collector_visitor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18
19use super::{
20    BatchPlanVisitor, DefaultBehavior, DefaultValue, LogicalPlanVisitor, StreamPlanVisitor,
21};
22use crate::PlanRef;
23use crate::optimizer::plan_node::{
24    BatchSource, ConventionMarker, LogicalScan, StreamSource, StreamTableScan,
25};
26use crate::optimizer::plan_visitor::PlanVisitor;
27
28/// TODO(rc): maybe we should rename this to `DependencyCollectorVisitor`.
29#[derive(Debug, Clone, Default)]
30pub struct RelationCollectorVisitor {
31    relations: HashSet<TableId>,
32}
33
34impl RelationCollectorVisitor {
35    fn new_with(relations: HashSet<TableId>) -> Self {
36        Self { relations }
37    }
38
39    /// `collect_with` will collect all the relations in the plan with some default ones, which are
40    /// collected during the binding phase. Note that during visit the collected relations might be
41    /// duplicated with the default ones. The collection is necessary, because implicit dependencies
42    /// on indices can only be discovered after plan is built.
43    pub fn collect_with<C: ConventionMarker>(
44        relations: HashSet<TableId>,
45        plan: PlanRef<C>,
46    ) -> HashSet<TableId>
47    where
48        Self: PlanVisitor<C>,
49    {
50        let mut visitor = Self::new_with(relations);
51        visitor.visit(plan);
52        visitor.relations
53    }
54}
55
56impl LogicalPlanVisitor for RelationCollectorVisitor {
57    type Result = ();
58
59    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
60
61    fn default_behavior() -> Self::DefaultBehavior {
62        DefaultValue
63    }
64
65    fn visit_logical_scan(&mut self, plan: &LogicalScan) {
66        self.relations.insert(plan.table().id);
67    }
68}
69
70impl StreamPlanVisitor for RelationCollectorVisitor {
71    type Result = ();
72
73    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
74
75    fn default_behavior() -> Self::DefaultBehavior {
76        DefaultValue
77    }
78
79    fn visit_stream_table_scan(&mut self, plan: &StreamTableScan) {
80        let logical = plan.core();
81        self.relations.insert(logical.table_catalog.id);
82    }
83
84    fn visit_stream_source(&mut self, plan: &StreamSource) {
85        if let Some(catalog) = plan.source_catalog() {
86            self.relations.insert(catalog.id.into());
87        }
88    }
89}
90
91impl BatchPlanVisitor for RelationCollectorVisitor {
92    type Result = ();
93
94    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
95
96    fn default_behavior() -> Self::DefaultBehavior {
97        DefaultValue
98    }
99
100    fn visit_batch_seq_scan(&mut self, plan: &crate::optimizer::plan_node::BatchSeqScan) {
101        self.relations.insert(plan.core().table_catalog.id);
102    }
103
104    fn visit_batch_source(&mut self, plan: &BatchSource) {
105        if let Some(catalog) = plan.source_catalog() {
106            self.relations.insert(catalog.id.into());
107        }
108    }
109}