risingwave_frontend/utils/
mod.rs1mod pretty_serde;
16pub use pretty_serde::PrettySerde;
17mod column_index_mapping;
18use std::any::Any;
19use std::hash::{Hash, Hasher};
20use std::sync::LazyLock;
21
22pub use column_index_mapping::*;
23mod condition;
24pub mod data_type;
25pub use condition::*;
26mod connected_components;
27pub(crate) use connected_components::*;
28mod stream_graph_formatter;
29pub use stream_graph_formatter::*;
30mod with_options;
31use tokio::runtime::Runtime;
32pub use with_options::*;
33mod rewrite_index;
34pub use rewrite_index::*;
35mod index_set;
36pub use index_set::*;
37pub(crate) mod group_by;
38pub mod overwrite_options;
39pub use group_by::*;
40pub use overwrite_options::*;
41mod iceberg_predicate;
42pub use iceberg_predicate::*;
43#[cfg(feature = "datafusion")]
44mod drop_guard;
45#[cfg(feature = "datafusion")]
46pub use drop_guard::DropGuard;
47
48use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
49
50pub static FRONTEND_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
51 tokio::runtime::Builder::new_multi_thread()
52 .thread_name("rw-frontend")
53 .enable_all()
54 .build()
55 .expect("failed to build frontend runtime")
56});
57
58pub struct Substitute {
60 pub mapping: Vec<ExprImpl>,
61}
62
63impl ExprRewriter for Substitute {
64 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
65 assert!(
66 input_ref
67 .return_type()
68 .equals_datatype(&self.mapping[input_ref.index()].return_type()),
69 "Type mismatch when substituting {:?} of {:?} with {:?} of {:?}",
70 input_ref,
71 input_ref.return_type(),
72 self.mapping[input_ref.index()],
73 self.mapping[input_ref.index()].return_type()
74 );
75 self.mapping[input_ref.index()].clone()
76 }
77}
78
79pub trait Layer: Sized {
90 type Sub;
91
92 fn map<F>(self, f: F) -> Self
95 where
96 F: FnMut(Self::Sub) -> Self::Sub;
97
98 fn descent<F>(&self, f: F)
102 where
103 F: FnMut(&Self::Sub);
104}
105
106pub trait Tree = Layer<Sub = Self>;
108
109pub trait Endo<T: Tree> {
118 fn pre(&mut self, t: T) -> T {
119 t
120 }
121
122 fn post(&mut self, t: T) -> T {
123 t
124 }
125
126 fn apply(&mut self, t: T) -> T;
132
133 fn tree_apply(&mut self, t: T) -> T {
135 let t = self.pre(t).map(|s| self.apply(s));
136 self.post(t)
137 }
138}
139
140#[allow(unused_variables)]
143pub trait Visit<T: Tree> {
144 fn pre(&mut self, t: &T) {}
145
146 fn post(&mut self, t: &T) {}
147
148 fn visit(&mut self, t: &T);
149
150 fn tree_visit(&mut self, t: &T) {
151 self.pre(t);
152 t.descent(|i| self.visit(i));
153 self.post(t);
154 }
155}
156
157pub trait DynEq: Any {
163 fn as_any(&self) -> &dyn Any;
164 fn dyn_eq(&self, other: &dyn DynEq) -> bool;
165}
166
167impl<T: Any + Eq> DynEq for T {
168 fn as_any(&self) -> &dyn Any {
169 self
170 }
171
172 fn dyn_eq(&self, other: &dyn DynEq) -> bool {
173 let other = other.as_any().downcast_ref::<T>();
174 other.is_some_and(|other| self == other)
175 }
176}
177
178impl PartialEq<dyn DynEq + 'static> for dyn DynEq {
179 fn eq(&self, other: &Self) -> bool {
180 self.dyn_eq(other)
181 }
182}
183
184impl Eq for dyn DynEq {
185 fn assert_receiver_is_total_eq(&self) {}
186}
187
188pub trait DynHash: DynEq {
191 fn as_dyn_eq(&self) -> &dyn DynEq;
192 fn dyn_hash(&self, state: &mut dyn Hasher);
193}
194
195impl<T: DynEq + Hash> DynHash for T {
196 fn as_dyn_eq(&self) -> &dyn DynEq {
197 self
198 }
199
200 fn dyn_hash(&self, mut state: &mut dyn Hasher) {
201 T::hash(self, &mut state);
202 self.type_id().hash(&mut state);
203 }
204}
205
206impl Hash for dyn DynHash {
207 fn hash<H: Hasher>(&self, state: &mut H) {
208 self.dyn_hash(state);
209 }
210}
211
212pub fn ordinal(i: usize) -> String {
213 let s = i.to_string();
214 let suffix = if s.ends_with('1') && !s.ends_with("11") {
215 "st"
216 } else if s.ends_with('2') && !s.ends_with("12") {
217 "nd"
218 } else if s.ends_with('3') && !s.ends_with("13") {
219 "rd"
220 } else {
221 "th"
222 };
223 s + suffix
224}