risingwave_frontend/optimizer/plan_visitor/
jsonb_stream_key_checker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{Field, FieldDisplay};
16use risingwave_common::types::DataType;
17
18use super::{DefaultBehavior, Merge};
19use crate::optimizer::plan_node::generic::GenericPlanRef;
20use crate::optimizer::plan_node::*;
21use crate::optimizer::plan_visitor::PlanVisitor;
22
23#[derive(Debug, Clone, Default)]
24pub struct StreamKeyChecker;
25
26impl StreamKeyChecker {
27    fn visit_inputs(&mut self, plan: &impl PlanNode) -> Option<String> {
28        let results = plan.inputs().into_iter().map(|input| self.visit(input));
29        Self::default_behavior().apply(results)
30    }
31
32    fn err_msg(target: &str, field: &Field) -> String {
33        format!(
34            "JSONB column \"{}\" should not be in the {}.",
35            target,
36            FieldDisplay(field)
37        )
38    }
39}
40
41impl PlanVisitor for StreamKeyChecker {
42    type Result = Option<String>;
43
44    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
45
46    fn default_behavior() -> Self::DefaultBehavior {
47        Merge(|a: Option<String>, b| a.or(b))
48    }
49
50    fn visit_logical_dedup(&mut self, plan: &LogicalDedup) -> Self::Result {
51        let input = plan.input();
52        let schema = input.schema();
53        let data_types = schema.data_types();
54        for idx in plan.dedup_cols() {
55            if data_types[*idx] == DataType::Jsonb {
56                return Some(StreamKeyChecker::err_msg("distinct key", &schema[*idx]));
57            }
58        }
59        self.visit_inputs(plan)
60    }
61
62    fn visit_logical_top_n(&mut self, plan: &LogicalTopN) -> Self::Result {
63        let input = plan.input();
64        let schema = input.schema();
65        let data_types = schema.data_types();
66        for idx in plan.group_key() {
67            if data_types[*idx] == DataType::Jsonb {
68                return Some(StreamKeyChecker::err_msg("TopN group key", &schema[*idx]));
69            }
70        }
71        for idx in plan
72            .topn_order()
73            .column_orders
74            .iter()
75            .map(|c| c.column_index)
76        {
77            if data_types[idx] == DataType::Jsonb {
78                return Some(StreamKeyChecker::err_msg("TopN order key", &schema[idx]));
79            }
80        }
81        self.visit_inputs(plan)
82    }
83
84    fn visit_logical_union(&mut self, plan: &LogicalUnion) -> Self::Result {
85        if !plan.all() {
86            for field in &plan.inputs()[0].schema().fields {
87                if field.data_type() == DataType::Jsonb {
88                    return Some(StreamKeyChecker::err_msg("field", field));
89                }
90            }
91        }
92        self.visit_inputs(plan)
93    }
94
95    fn visit_logical_agg(&mut self, plan: &LogicalAgg) -> Self::Result {
96        let input = plan.input();
97        let schema = input.schema();
98        let data_types = schema.data_types();
99        for idx in plan.group_key().indices() {
100            if data_types[idx] == DataType::Jsonb {
101                return Some(StreamKeyChecker::err_msg(
102                    "aggregation group key",
103                    &schema[idx],
104                ));
105            }
106        }
107        self.visit_inputs(plan)
108    }
109
110    fn visit_logical_over_window(&mut self, plan: &LogicalOverWindow) -> Self::Result {
111        let input = plan.input();
112        let schema = input.schema();
113        let data_types = schema.data_types();
114
115        for func in plan.window_functions() {
116            for idx in func.partition_by.iter().map(|e| e.index()) {
117                if data_types[idx] == DataType::Jsonb {
118                    return Some(StreamKeyChecker::err_msg(
119                        "over window partition key",
120                        &schema[idx],
121                    ));
122                }
123            }
124
125            for idx in func.order_by.iter().map(|c| c.column_index) {
126                if data_types[idx] == DataType::Jsonb {
127                    return Some(StreamKeyChecker::err_msg(
128                        "over window order by key",
129                        &schema[idx],
130                    ));
131                }
132            }
133        }
134        self.visit_inputs(plan)
135    }
136}