risingwave_frontend/optimizer/plan_visitor/
temporal_join_validator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_sqlparser::ast::AsOf;
16
17use super::{BatchPlanVisitor, DefaultBehavior, LogicalPlanVisitor, Merge, StreamPlanVisitor};
18use crate::PlanRef;
19use crate::optimizer::plan_node::generic::GenericPlanRef;
20use crate::optimizer::plan_node::{
21    BatchSeqScan, ConventionMarker, LogicalScan, PlanTreeNodeBinary, StreamTableScan,
22    StreamTemporalJoin,
23};
24use crate::optimizer::plan_visitor::PlanVisitor;
25
26#[derive(Debug, Clone, Default)]
27pub struct TemporalJoinValidator {
28    found_non_append_only_temporal_join: bool,
29}
30
31impl TemporalJoinValidator {
32    pub fn exist_dangling_temporal_scan<C: ConventionMarker>(plan: PlanRef<C>) -> bool
33    where
34        Self: PlanVisitor<C, Result = bool>,
35    {
36        let mut decider = TemporalJoinValidator {
37            found_non_append_only_temporal_join: false,
38        };
39        let ctx = plan.ctx();
40        let has_dangling_temporal_scan = decider.visit(plan);
41        if decider.found_non_append_only_temporal_join {
42            ctx.session_ctx().notice_to_user("A non-append-only temporal join is used in the query. It would introduce a additional memo-table comparing to append-only temporal join.");
43        }
44        has_dangling_temporal_scan
45    }
46}
47
48impl LogicalPlanVisitor for TemporalJoinValidator {
49    type Result = bool;
50
51    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
52
53    fn default_behavior() -> Self::DefaultBehavior {
54        Merge(|a, b| a | b)
55    }
56
57    fn visit_logical_scan(&mut self, logical_scan: &LogicalScan) -> bool {
58        matches!(logical_scan.as_of(), Some(AsOf::ProcessTime))
59    }
60}
61
62impl BatchPlanVisitor for TemporalJoinValidator {
63    type Result = bool;
64
65    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
66
67    fn default_behavior() -> Self::DefaultBehavior {
68        Merge(|a, b| a | b)
69    }
70
71    fn visit_batch_seq_scan(&mut self, batch_seq_scan: &BatchSeqScan) -> bool {
72        matches!(batch_seq_scan.core().as_of, Some(AsOf::ProcessTime))
73    }
74}
75
76impl StreamPlanVisitor for TemporalJoinValidator {
77    type Result = bool;
78
79    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
80
81    fn default_behavior() -> Self::DefaultBehavior {
82        Merge(|a, b| a | b)
83    }
84
85    fn visit_stream_table_scan(&mut self, stream_table_scan: &StreamTableScan) -> bool {
86        matches!(stream_table_scan.core().as_of, Some(AsOf::ProcessTime))
87    }
88
89    fn visit_stream_temporal_join(&mut self, stream_temporal_join: &StreamTemporalJoin) -> bool {
90        if !stream_temporal_join.append_only() {
91            self.found_non_append_only_temporal_join = true;
92        }
93        self.visit_stream(stream_temporal_join.left())
94    }
95}