risingwave_frontend/optimizer/plan_visitor/
relation_collector_visitor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::id::ObjectId;
18
19use super::{
20    BatchPlanVisitor, DefaultBehavior, DefaultValue, LogicalPlanVisitor, StreamPlanVisitor,
21};
22use crate::PlanRef;
23use crate::optimizer::plan_node::{
24    BatchSource, ConventionMarker, LogicalScan, StreamSource, StreamTableScan,
25};
26use crate::optimizer::plan_visitor::PlanVisitor;
27
28/// TODO(rc): maybe we should rename this to `DependencyCollectorVisitor`.
29#[derive(Debug, Clone, Default)]
30pub struct RelationCollectorVisitor {
31    relations: HashSet<ObjectId>,
32}
33
34impl RelationCollectorVisitor {
35    fn new_with(relations: HashSet<ObjectId>) -> Self {
36        Self { relations }
37    }
38
39    /// `collect_with` will collect all the relations in the plan with some default ones, which are
40    /// collected during the binding phase. Note that during visit the collected relations might be
41    /// duplicated with the default ones. The collection is necessary, because implicit dependencies
42    /// on indices can only be discovered after plan is built.
43    pub fn collect_with<C: ConventionMarker>(
44        relations: HashSet<ObjectId>,
45        plan: PlanRef<C>,
46    ) -> HashSet<ObjectId>
47    where
48        Self: PlanVisitor<C>,
49    {
50        let mut visitor = Self::new_with(relations);
51        visitor.visit(plan);
52        visitor.relations
53    }
54}
55
56impl LogicalPlanVisitor for RelationCollectorVisitor {
57    type Result = ();
58
59    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
60
61    fn default_behavior() -> Self::DefaultBehavior {
62        DefaultValue
63    }
64
65    fn visit_logical_scan(&mut self, plan: &LogicalScan) {
66        self.relations.insert(plan.table().id.as_object_id());
67    }
68}
69
70impl StreamPlanVisitor for RelationCollectorVisitor {
71    type Result = ();
72
73    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
74
75    fn default_behavior() -> Self::DefaultBehavior {
76        DefaultValue
77    }
78
79    fn visit_stream_table_scan(&mut self, plan: &StreamTableScan) {
80        let logical = plan.core();
81        self.relations
82            .insert(logical.table_catalog.id.as_object_id());
83    }
84
85    fn visit_stream_source(&mut self, plan: &StreamSource) {
86        if let Some(catalog) = plan.source_catalog() {
87            self.relations.insert(catalog.id.as_object_id());
88        }
89    }
90}
91
92impl BatchPlanVisitor for RelationCollectorVisitor {
93    type Result = ();
94
95    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
96
97    fn default_behavior() -> Self::DefaultBehavior {
98        DefaultValue
99    }
100
101    fn visit_batch_seq_scan(&mut self, plan: &crate::optimizer::plan_node::BatchSeqScan) {
102        self.relations
103            .insert(plan.core().table_catalog.id.as_object_id());
104    }
105
106    fn visit_batch_source(&mut self, plan: &BatchSource) {
107        if let Some(catalog) = plan.source_catalog() {
108            self.relations.insert(catalog.id.as_object_id());
109        }
110    }
111}