risingwave_frontend/utils/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod pretty_serde;
16pub use pretty_serde::PrettySerde;
17mod column_index_mapping;
18use std::any::Any;
19use std::hash::{Hash, Hasher};
20use std::sync::LazyLock;
21
22pub use column_index_mapping::*;
23mod condition;
24pub mod data_type;
25pub use condition::*;
26mod connected_components;
27pub(crate) use connected_components::*;
28mod stream_graph_formatter;
29pub use stream_graph_formatter::*;
30mod with_options;
31use tokio::runtime::Runtime;
32pub use with_options::*;
33mod rewrite_index;
34pub use rewrite_index::*;
35mod index_set;
36pub use index_set::*;
37pub(crate) mod group_by;
38pub mod overwrite_options;
39pub use group_by::*;
40pub use overwrite_options::*;
41mod iceberg_predicate;
42pub use iceberg_predicate::*;
43#[cfg(feature = "datafusion")]
44mod drop_guard;
45#[cfg(feature = "datafusion")]
46pub use drop_guard::DropGuard;
47
48use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
49
50pub static FRONTEND_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
51    tokio::runtime::Builder::new_multi_thread()
52        .thread_name("rw-frontend")
53        .enable_all()
54        .build()
55        .expect("failed to build frontend runtime")
56});
57
58/// Substitute `InputRef` with corresponding `ExprImpl`.
59pub struct Substitute {
60    pub mapping: Vec<ExprImpl>,
61}
62
63impl ExprRewriter for Substitute {
64    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
65        assert!(
66            input_ref
67                .return_type()
68                .equals_datatype(&self.mapping[input_ref.index()].return_type()),
69            "Type mismatch when substituting {:?} of {:?} with {:?} of {:?}",
70            input_ref,
71            input_ref.return_type(),
72            self.mapping[input_ref.index()],
73            self.mapping[input_ref.index()].return_type()
74        );
75        self.mapping[input_ref.index()].clone()
76    }
77}
78
79// Traits for easy manipulation of recursive structures
80
81/// A `Layer` is a container with subcomponents of type `Sub`.
82/// We usually use `Layer` to represents one layer of a tree-like structure,
83/// where the subcomponents are the recursive subtrees.
84/// But in general, the subcomponent can be of different type than the `Layer`.
85/// Such structural relation between `Sub` and `Layer`
86/// allows us to lift transformation on `Sub` to that on `Layer.`
87/// A related and even more general notion is `Functor`,
88/// which might also be helpful to define in the future.
89pub trait Layer: Sized {
90    type Sub;
91
92    /// Given a transformation `f : Sub -> Sub`,
93    /// we can derive a transformation on the entire `Layer` by acting `f` on all subcomponents.
94    fn map<F>(self, f: F) -> Self
95    where
96        F: FnMut(Self::Sub) -> Self::Sub;
97
98    /// Given a traversal `f : Sub -> ()`,
99    /// we can derive a traversal on the entire `Layer`
100    /// by sequentially visiting the subcomponents with `f`.
101    fn descent<F>(&self, f: F)
102    where
103        F: FnMut(&Self::Sub);
104}
105
106/// A tree-like structure is a `Layer` where the subcomponents are recursively trees.
107pub trait Tree = Layer<Sub = Self>;
108
109/// Given a tree-like structure `T`,
110/// we usually can specify a transformation `T -> T`
111/// by providing a pre-order transformation `pre : T -> T`
112/// and a post-order transformation `post : T -> T`.
113/// Specifically, the derived transformation `apply : T -> T` first applies `pre`,
114/// then maps itself over the subtrees, and finally applies `post`.
115/// This allows us to obtain a global transformation acting recursively on all levels
116/// by specifying simpler transformations at acts locally.
117pub trait Endo<T: Tree> {
118    fn pre(&mut self, t: T) -> T {
119        t
120    }
121
122    fn post(&mut self, t: T) -> T {
123        t
124    }
125
126    /// The real application function is left undefined.
127    /// If we want the derived transformation
128    /// we can simply call `tree_apply` in the implementation.
129    /// But for more complicated requirements,
130    /// e.g. skipping over certain subtrees, custom logic can be added.
131    fn apply(&mut self, t: T) -> T;
132
133    /// The derived transformation based on `pre` and `post`.
134    fn tree_apply(&mut self, t: T) -> T {
135        let t = self.pre(t).map(|s| self.apply(s));
136        self.post(t)
137    }
138}
139
140/// A similar trait to generate traversal over tree-like structure.
141/// See `Endo` for more details.
142#[allow(unused_variables)]
143pub trait Visit<T: Tree> {
144    fn pre(&mut self, t: &T) {}
145
146    fn post(&mut self, t: &T) {}
147
148    fn visit(&mut self, t: &T);
149
150    fn tree_visit(&mut self, t: &T) {
151        self.pre(t);
152        t.descent(|i| self.visit(i));
153        self.post(t);
154    }
155}
156
157// Workaround object safety rules for Eq and Hash, adopted from
158// https://github.com/bevyengine/bevy/blob/f7fbfaf9c72035e98c6b6cec0c7d26ff9f5b1c82/crates/bevy_utils/src/label.rs
159
160/// An object safe version of [`Eq`]. This trait is automatically implemented
161/// for any `'static` type that implements `Eq`.
162pub trait DynEq: Any {
163    fn as_any(&self) -> &dyn Any;
164    fn dyn_eq(&self, other: &dyn DynEq) -> bool;
165}
166
167impl<T: Any + Eq> DynEq for T {
168    fn as_any(&self) -> &dyn Any {
169        self
170    }
171
172    fn dyn_eq(&self, other: &dyn DynEq) -> bool {
173        let other = other.as_any().downcast_ref::<T>();
174        other.is_some_and(|other| self == other)
175    }
176}
177
178impl PartialEq<dyn DynEq + 'static> for dyn DynEq {
179    fn eq(&self, other: &Self) -> bool {
180        self.dyn_eq(other)
181    }
182}
183
184impl Eq for dyn DynEq {
185    fn assert_receiver_is_total_eq(&self) {}
186}
187
188/// An object safe version of [`Hash`]. This trait is automatically implemented
189/// for any `'static` type that implements `Hash`.
190pub trait DynHash: DynEq {
191    fn as_dyn_eq(&self) -> &dyn DynEq;
192    fn dyn_hash(&self, state: &mut dyn Hasher);
193}
194
195impl<T: DynEq + Hash> DynHash for T {
196    fn as_dyn_eq(&self) -> &dyn DynEq {
197        self
198    }
199
200    fn dyn_hash(&self, mut state: &mut dyn Hasher) {
201        T::hash(self, &mut state);
202        self.type_id().hash(&mut state);
203    }
204}
205
206impl Hash for dyn DynHash {
207    fn hash<H: Hasher>(&self, state: &mut H) {
208        self.dyn_hash(state);
209    }
210}
211
212pub fn ordinal(i: usize) -> String {
213    let s = i.to_string();
214    let suffix = if s.ends_with('1') && !s.ends_with("11") {
215        "st"
216    } else if s.ends_with('2') && !s.ends_with("12") {
217        "nd"
218    } else if s.ends_with('3') && !s.ends_with("13") {
219        "rd"
220    } else {
221        "th"
222    };
223    s + suffix
224}