risingwave_frontend/optimizer/plan_visitor/
jsonb_stream_key_checker.rs1use risingwave_common::catalog::{Field, FieldDisplay};
16use risingwave_common::types::DataType;
17
18use super::{DefaultBehavior, Merge};
19use crate::optimizer::plan_node::generic::GenericPlanRef;
20use crate::optimizer::plan_node::*;
21use crate::optimizer::plan_visitor::PlanVisitor;
22
23#[derive(Debug, Clone, Default)]
24pub struct StreamKeyChecker;
25
26impl StreamKeyChecker {
27 fn visit_inputs(&mut self, plan: &impl PlanNode) -> Option<String> {
28 let results = plan.inputs().into_iter().map(|input| self.visit(input));
29 Self::default_behavior().apply(results)
30 }
31
32 fn err_msg(target: &str, field: &Field) -> String {
33 format!(
34 "JSONB column \"{}\" should not be in the {}.",
35 target,
36 FieldDisplay(field)
37 )
38 }
39}
40
41impl PlanVisitor for StreamKeyChecker {
42 type Result = Option<String>;
43
44 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
45
46 fn default_behavior() -> Self::DefaultBehavior {
47 Merge(|a: Option<String>, b| a.or(b))
48 }
49
50 fn visit_logical_dedup(&mut self, plan: &LogicalDedup) -> Self::Result {
51 let input = plan.input();
52 let schema = input.schema();
53 let data_types = schema.data_types();
54 for idx in plan.dedup_cols() {
55 if data_types[*idx] == DataType::Jsonb {
56 return Some(StreamKeyChecker::err_msg("distinct key", &schema[*idx]));
57 }
58 }
59 self.visit_inputs(plan)
60 }
61
62 fn visit_logical_top_n(&mut self, plan: &LogicalTopN) -> Self::Result {
63 let input = plan.input();
64 let schema = input.schema();
65 let data_types = schema.data_types();
66 for idx in plan.group_key() {
67 if data_types[*idx] == DataType::Jsonb {
68 return Some(StreamKeyChecker::err_msg("TopN group key", &schema[*idx]));
69 }
70 }
71 for idx in plan
72 .topn_order()
73 .column_orders
74 .iter()
75 .map(|c| c.column_index)
76 {
77 if data_types[idx] == DataType::Jsonb {
78 return Some(StreamKeyChecker::err_msg("TopN order key", &schema[idx]));
79 }
80 }
81 self.visit_inputs(plan)
82 }
83
84 fn visit_logical_union(&mut self, plan: &LogicalUnion) -> Self::Result {
85 if !plan.all() {
86 for field in &plan.inputs()[0].schema().fields {
87 if field.data_type() == DataType::Jsonb {
88 return Some(StreamKeyChecker::err_msg("field", field));
89 }
90 }
91 }
92 self.visit_inputs(plan)
93 }
94
95 fn visit_logical_agg(&mut self, plan: &LogicalAgg) -> Self::Result {
96 let input = plan.input();
97 let schema = input.schema();
98 let data_types = schema.data_types();
99 for idx in plan.group_key().indices() {
100 if data_types[idx] == DataType::Jsonb {
101 return Some(StreamKeyChecker::err_msg(
102 "aggregation group key",
103 &schema[idx],
104 ));
105 }
106 }
107 self.visit_inputs(plan)
108 }
109
110 fn visit_logical_over_window(&mut self, plan: &LogicalOverWindow) -> Self::Result {
111 let input = plan.input();
112 let schema = input.schema();
113 let data_types = schema.data_types();
114
115 for func in plan.window_functions() {
116 for idx in func.partition_by.iter().map(|e| e.index()) {
117 if data_types[idx] == DataType::Jsonb {
118 return Some(StreamKeyChecker::err_msg(
119 "over window partition key",
120 &schema[idx],
121 ));
122 }
123 }
124
125 for idx in func.order_by.iter().map(|c| c.column_index) {
126 if data_types[idx] == DataType::Jsonb {
127 return Some(StreamKeyChecker::err_msg(
128 "over window order by key",
129 &schema[idx],
130 ));
131 }
132 }
133 }
134 self.visit_inputs(plan)
135 }
136}