risingwave_frontend/binder/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_common::catalog::FunctionId;
21use risingwave_common::session_config::{SearchPath, SessionConfig};
22use risingwave_common::types::DataType;
23use risingwave_common::util::iter_util::ZipEqDebug;
24use risingwave_sqlparser::ast::{Expr as AstExpr, SelectItem, SetExpr, Statement};
25
26use crate::error::Result;
27
28mod bind_context;
29mod bind_param;
30mod create;
31mod create_view;
32mod declare_cursor;
33mod delete;
34mod expr;
35pub mod fetch_cursor;
36mod for_system;
37mod insert;
38mod query;
39mod relation;
40mod select;
41mod set_expr;
42mod statement;
43mod struct_field;
44mod update;
45mod values;
46
47pub use bind_context::{BindContext, Clause, LateralBindContext};
48pub use create_view::BoundCreateView;
49pub use delete::BoundDelete;
50pub use expr::bind_data_type;
51pub use insert::BoundInsert;
52use pgwire::pg_server::{Session, SessionId};
53pub use query::BoundQuery;
54pub use relation::{
55    BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource,
56    BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation,
57    ResolveQualifiedNameError, WindowTableFunctionKind,
58};
59pub use select::{BoundDistinct, BoundSelect};
60pub use set_expr::*;
61pub use statement::BoundStatement;
62pub use update::{BoundUpdate, UpdateProject};
63pub use values::BoundValues;
64
65use crate::catalog::catalog_service::CatalogReadGuard;
66use crate::catalog::root_catalog::SchemaPath;
67use crate::catalog::schema_catalog::SchemaCatalog;
68use crate::catalog::{CatalogResult, DatabaseId, TableId, ViewId};
69use crate::error::ErrorCode;
70use crate::expr::ExprImpl;
71use crate::session::{AuthContext, SessionImpl, TemporarySourceManager};
72use crate::user::user_service::UserInfoReadGuard;
73
74pub type ShareId = usize;
75
76/// The type of binding statement.
77enum BindFor {
78    /// Binding MV/SINK
79    Stream,
80    /// Binding a batch query
81    Batch,
82    /// Binding a DDL (e.g. CREATE TABLE/SOURCE)
83    Ddl,
84    /// Binding a system query (e.g. SHOW)
85    System,
86}
87
88/// `Binder` binds the identifiers in AST to columns in relations
89pub struct Binder {
90    // TODO: maybe we can only lock the database, but not the whole catalog.
91    catalog: CatalogReadGuard,
92    user: UserInfoReadGuard,
93    db_name: String,
94    database_id: DatabaseId,
95    session_id: SessionId,
96    context: BindContext,
97    auth_context: Arc<AuthContext>,
98    /// A stack holding contexts of outer queries when binding a subquery.
99    /// It also holds all of the lateral contexts for each respective
100    /// subquery.
101    ///
102    /// See [`Binder::bind_subquery_expr`] for details.
103    upper_subquery_contexts: Vec<(BindContext, Vec<LateralBindContext>)>,
104
105    /// A stack holding contexts of left-lateral `TableFactor`s.
106    ///
107    /// We need a separate stack as `CorrelatedInputRef` depth is
108    /// determined by the upper subquery context depth, not the lateral context stack depth.
109    lateral_contexts: Vec<LateralBindContext>,
110
111    next_subquery_id: usize,
112    next_values_id: usize,
113    /// The `ShareId` is used to identify the share relation which could be a CTE, a source, a view
114    /// and so on.
115    next_share_id: ShareId,
116
117    session_config: Arc<RwLock<SessionConfig>>,
118
119    search_path: SearchPath,
120    /// The type of binding statement.
121    bind_for: BindFor,
122
123    /// `ShareId`s identifying shared views.
124    shared_views: HashMap<ViewId, ShareId>,
125
126    /// The included relations while binding a query.
127    included_relations: HashSet<TableId>,
128
129    /// The included user-defined functions while binding a query.
130    included_udfs: HashSet<FunctionId>,
131
132    param_types: ParameterTypes,
133
134    /// The sql udf context that will be used during binding phase
135    udf_context: UdfContext,
136
137    /// The temporary sources that will be used during binding phase
138    temporary_source_manager: TemporarySourceManager,
139
140    /// Information for `secure_compare` function. It's ONLY available when binding the
141    /// `VALIDATE` clause of Webhook source i.e. `VALIDATE SECRET ... AS SECURE_COMPARE(...)`.
142    secure_compare_context: Option<SecureCompareContext>,
143}
144
145// There's one more hidden name, `HEADERS`, which is a reserved identifier for HTTP headers. Its type is `JSONB`.
146#[derive(Default, Clone, Debug)]
147pub struct SecureCompareContext {
148    /// The column name to store the whole payload in `JSONB`, but during validation it will be used as `bytea`
149    pub column_name: String,
150    /// The secret (usually a token provided by the webhook source user) to validate the calls
151    pub secret_name: Option<String>,
152}
153
154#[derive(Clone, Debug, Default)]
155pub struct UdfContext {
156    /// The mapping from `sql udf parameters` to a bound `ExprImpl` generated from `ast expressions`
157    /// Note: The expressions are constructed during runtime, correspond to the actual users' input
158    udf_param_context: HashMap<String, ExprImpl>,
159
160    /// The global counter that records the calling stack depth
161    /// of the current binding sql udf chain
162    udf_global_counter: u32,
163}
164
165impl UdfContext {
166    pub fn new() -> Self {
167        Self {
168            udf_param_context: HashMap::new(),
169            udf_global_counter: 0,
170        }
171    }
172
173    pub fn global_count(&self) -> u32 {
174        self.udf_global_counter
175    }
176
177    pub fn incr_global_count(&mut self) {
178        self.udf_global_counter += 1;
179    }
180
181    pub fn decr_global_count(&mut self) {
182        self.udf_global_counter -= 1;
183    }
184
185    pub fn _is_empty(&self) -> bool {
186        self.udf_param_context.is_empty()
187    }
188
189    pub fn update_context(&mut self, context: HashMap<String, ExprImpl>) {
190        self.udf_param_context = context;
191    }
192
193    pub fn _clear(&mut self) {
194        self.udf_global_counter = 0;
195        self.udf_param_context.clear();
196    }
197
198    pub fn get_expr(&self, name: &str) -> Option<&ExprImpl> {
199        self.udf_param_context.get(name)
200    }
201
202    pub fn get_context(&self) -> HashMap<String, ExprImpl> {
203        self.udf_param_context.clone()
204    }
205
206    /// A common utility function to extract sql udf
207    /// expression out from the input `ast`
208    pub fn extract_udf_expression(ast: Vec<Statement>) -> Result<AstExpr> {
209        if ast.len() != 1 {
210            return Err(ErrorCode::InvalidInputSyntax(
211                "the query for sql udf should contain only one statement".to_owned(),
212            )
213            .into());
214        }
215
216        // Extract the expression out
217        let Statement::Query(query) = ast[0].clone() else {
218            return Err(ErrorCode::InvalidInputSyntax(
219                "invalid function definition, please recheck the syntax".to_owned(),
220            )
221            .into());
222        };
223
224        let SetExpr::Select(select) = query.body else {
225            return Err(ErrorCode::InvalidInputSyntax(
226                "missing `select` body for sql udf expression, please recheck the syntax"
227                    .to_owned(),
228            )
229            .into());
230        };
231
232        if select.projection.len() != 1 {
233            return Err(ErrorCode::InvalidInputSyntax(
234                "`projection` should contain only one `SelectItem`".to_owned(),
235            )
236            .into());
237        }
238
239        let SelectItem::UnnamedExpr(expr) = select.projection[0].clone() else {
240            return Err(ErrorCode::InvalidInputSyntax(
241                "expect `UnnamedExpr` for `projection`".to_owned(),
242            )
243            .into());
244        };
245
246        Ok(expr)
247    }
248}
249
250/// `ParameterTypes` is used to record the types of the parameters during binding prepared stataments.
251/// It works by following the rules:
252/// 1. At the beginning, it contains the user specified parameters type.
253/// 2. When the binder encounters a parameter, it will record it as unknown(call `record_new_param`)
254///    if it didn't exist in `ParameterTypes`.
255/// 3. When the binder encounters a cast on parameter, if it's a unknown type, the cast function
256///    will record the target type as infer type for that parameter(call `record_infer_type`). If the
257///    parameter has been inferred, the cast function will act as a normal cast.
258/// 4. After bind finished:
259///    (a) parameter not in `ParameterTypes` means that the user didn't specify it and it didn't
260///    occur in the query. `export` will return error if there is a kind of
261///    parameter. This rule is compatible with PostgreSQL
262///    (b) parameter is None means that it's a unknown type. The user didn't specify it
263///    and we can't infer it in the query. We will treat it as VARCHAR type finally. This rule is
264///    compatible with PostgreSQL.
265///    (c) parameter is Some means that it's a known type.
266#[derive(Clone, Debug)]
267pub struct ParameterTypes(Arc<RwLock<HashMap<u64, Option<DataType>>>>);
268
269impl ParameterTypes {
270    pub fn new(specified_param_types: Vec<Option<DataType>>) -> Self {
271        let map = specified_param_types
272            .into_iter()
273            .enumerate()
274            .map(|(index, data_type)| ((index + 1) as u64, data_type))
275            .collect::<HashMap<u64, Option<DataType>>>();
276        Self(Arc::new(RwLock::new(map)))
277    }
278
279    pub fn has_infer(&self, index: u64) -> bool {
280        self.0.read().get(&index).unwrap().is_some()
281    }
282
283    pub fn read_type(&self, index: u64) -> Option<DataType> {
284        self.0.read().get(&index).unwrap().clone()
285    }
286
287    pub fn record_new_param(&mut self, index: u64) {
288        self.0.write().entry(index).or_insert(None);
289    }
290
291    pub fn record_infer_type(&mut self, index: u64, data_type: DataType) {
292        assert!(
293            !self.has_infer(index),
294            "The parameter has been inferred, should not be inferred again."
295        );
296        self.0.write().get_mut(&index).unwrap().replace(data_type);
297    }
298
299    pub fn export(&self) -> Result<Vec<DataType>> {
300        let types = self
301            .0
302            .read()
303            .clone()
304            .into_iter()
305            .sorted_by_key(|(index, _)| *index)
306            .collect::<Vec<_>>();
307
308        // Check if all the parameters have been inferred.
309        for ((index, _), expect_index) in types.iter().zip_eq_debug(1_u64..=types.len() as u64) {
310            if *index != expect_index {
311                return Err(ErrorCode::InvalidInputSyntax(format!(
312                    "Cannot infer the type of the parameter {}.",
313                    expect_index
314                ))
315                .into());
316            }
317        }
318
319        Ok(types
320            .into_iter()
321            .map(|(_, data_type)| data_type.unwrap_or(DataType::Varchar))
322            .collect::<Vec<_>>())
323    }
324}
325
326impl Binder {
327    fn new_inner(
328        session: &SessionImpl,
329        bind_for: BindFor,
330        param_types: Vec<Option<DataType>>,
331    ) -> Binder {
332        Binder {
333            catalog: session.env().catalog_reader().read_guard(),
334            user: session.env().user_info_reader().read_guard(),
335            db_name: session.database(),
336            database_id: session.database_id(),
337            session_id: session.id(),
338            context: BindContext::new(),
339            auth_context: session.auth_context(),
340            upper_subquery_contexts: vec![],
341            lateral_contexts: vec![],
342            next_subquery_id: 0,
343            next_values_id: 0,
344            next_share_id: 0,
345            session_config: session.shared_config(),
346            search_path: session.config().search_path(),
347            bind_for,
348            shared_views: HashMap::new(),
349            included_relations: HashSet::new(),
350            included_udfs: HashSet::new(),
351            param_types: ParameterTypes::new(param_types),
352            udf_context: UdfContext::new(),
353            temporary_source_manager: session.temporary_source_manager(),
354            secure_compare_context: None,
355        }
356    }
357
358    pub fn new(session: &SessionImpl) -> Binder {
359        Self::new_inner(session, BindFor::Batch, vec![])
360    }
361
362    pub fn new_with_param_types(
363        session: &SessionImpl,
364        param_types: Vec<Option<DataType>>,
365    ) -> Binder {
366        Self::new_inner(session, BindFor::Batch, param_types)
367    }
368
369    pub fn new_for_stream(session: &SessionImpl) -> Binder {
370        Self::new_inner(session, BindFor::Stream, vec![])
371    }
372
373    pub fn new_for_ddl(session: &SessionImpl) -> Binder {
374        Self::new_inner(session, BindFor::Ddl, vec![])
375    }
376
377    pub fn new_for_ddl_with_secure_compare(
378        session: &SessionImpl,
379        ctx: SecureCompareContext,
380    ) -> Binder {
381        let mut binder = Self::new_inner(session, BindFor::Ddl, vec![]);
382        binder.secure_compare_context = Some(ctx);
383        binder
384    }
385
386    pub fn new_for_system(session: &SessionImpl) -> Binder {
387        Self::new_inner(session, BindFor::System, vec![])
388    }
389
390    fn is_for_stream(&self) -> bool {
391        matches!(self.bind_for, BindFor::Stream)
392    }
393
394    #[allow(dead_code)]
395    fn is_for_batch(&self) -> bool {
396        matches!(self.bind_for, BindFor::Batch)
397    }
398
399    fn is_for_ddl(&self) -> bool {
400        matches!(self.bind_for, BindFor::Ddl)
401    }
402
403    /// Bind a [`Statement`].
404    pub fn bind(&mut self, stmt: Statement) -> Result<BoundStatement> {
405        self.bind_statement(stmt)
406    }
407
408    pub fn export_param_types(&self) -> Result<Vec<DataType>> {
409        self.param_types.export()
410    }
411
412    /// Get included relations in the query after binding. This is used for resolving relation
413    /// dependencies. Note that it only contains referenced relations discovered during binding.
414    /// After the plan is built, the referenced relations may be changed. We cannot rely on the
415    /// collection result of plan, because we still need to record the dependencies that have been
416    /// optimised away.
417    pub fn included_relations(&self) -> &HashSet<TableId> {
418        &self.included_relations
419    }
420
421    /// Get included user-defined functions in the query after binding.
422    pub fn included_udfs(&self) -> &HashSet<FunctionId> {
423        &self.included_udfs
424    }
425
426    fn push_context(&mut self) {
427        let new_context = std::mem::take(&mut self.context);
428        self.context
429            .cte_to_relation
430            .clone_from(&new_context.cte_to_relation);
431        self.context.disable_security_invoker = new_context.disable_security_invoker;
432        let new_lateral_contexts = std::mem::take(&mut self.lateral_contexts);
433        self.upper_subquery_contexts
434            .push((new_context, new_lateral_contexts));
435    }
436
437    fn pop_context(&mut self) -> Result<()> {
438        let (old_context, old_lateral_contexts) = self
439            .upper_subquery_contexts
440            .pop()
441            .ok_or_else(|| ErrorCode::InternalError("Popping non-existent context".to_owned()))?;
442        self.context = old_context;
443        self.lateral_contexts = old_lateral_contexts;
444        Ok(())
445    }
446
447    fn push_lateral_context(&mut self) {
448        let new_context = std::mem::take(&mut self.context);
449        self.context
450            .cte_to_relation
451            .clone_from(&new_context.cte_to_relation);
452        self.context.disable_security_invoker = new_context.disable_security_invoker;
453        self.lateral_contexts.push(LateralBindContext {
454            is_visible: false,
455            context: new_context,
456        });
457    }
458
459    fn pop_and_merge_lateral_context(&mut self) -> Result<()> {
460        let mut old_context = self
461            .lateral_contexts
462            .pop()
463            .ok_or_else(|| ErrorCode::InternalError("Popping non-existent context".to_owned()))?
464            .context;
465        old_context.merge_context(self.context.clone())?;
466        self.context = old_context;
467        Ok(())
468    }
469
470    fn try_mark_lateral_as_visible(&mut self) {
471        if let Some(mut ctx) = self.lateral_contexts.pop() {
472            ctx.is_visible = true;
473            self.lateral_contexts.push(ctx);
474        }
475    }
476
477    fn try_mark_lateral_as_invisible(&mut self) {
478        if let Some(mut ctx) = self.lateral_contexts.pop() {
479            ctx.is_visible = false;
480            self.lateral_contexts.push(ctx);
481        }
482    }
483
484    fn next_subquery_id(&mut self) -> usize {
485        let id = self.next_subquery_id;
486        self.next_subquery_id += 1;
487        id
488    }
489
490    fn next_values_id(&mut self) -> usize {
491        let id = self.next_values_id;
492        self.next_values_id += 1;
493        id
494    }
495
496    fn next_share_id(&mut self) -> ShareId {
497        let id = self.next_share_id;
498        self.next_share_id += 1;
499        id
500    }
501
502    fn first_valid_schema(&self) -> CatalogResult<&SchemaCatalog> {
503        self.catalog.first_valid_schema(
504            &self.db_name,
505            &self.search_path,
506            &self.auth_context.user_name,
507        )
508    }
509
510    fn bind_schema_path<'a>(&'a self, schema_name: Option<&'a str>) -> SchemaPath<'a> {
511        SchemaPath::new(schema_name, &self.search_path, &self.auth_context.user_name)
512    }
513
514    pub fn set_clause(&mut self, clause: Option<Clause>) {
515        self.context.clause = clause;
516    }
517
518    pub fn udf_context_mut(&mut self) -> &mut UdfContext {
519        &mut self.udf_context
520    }
521}
522
523/// The column name stored in [`BindContext`] for a column without an alias.
524pub const UNNAMED_COLUMN: &str = "?column?";
525/// The table name stored in [`BindContext`] for a subquery without an alias.
526const UNNAMED_SUBQUERY: &str = "?subquery?";
527/// The table name stored in [`BindContext`] for a column group.
528const COLUMN_GROUP_PREFIX: &str = "?column_group_id?";
529
530#[cfg(test)]
531pub mod test_utils {
532    use risingwave_common::types::DataType;
533
534    use super::Binder;
535    use crate::session::SessionImpl;
536
537    #[cfg(test)]
538    pub fn mock_binder() -> Binder {
539        Binder::new(&SessionImpl::mock())
540    }
541
542    #[cfg(test)]
543    pub fn mock_binder_with_param_types(param_types: Vec<Option<DataType>>) -> Binder {
544        Binder::new_with_param_types(&SessionImpl::mock(), param_types)
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use expect_test::expect;
551
552    use super::test_utils::*;
553
554    #[tokio::test]
555    async fn test_rcte() {
556        let stmt = risingwave_sqlparser::parser::Parser::parse_sql(
557            "WITH RECURSIVE t1 AS (SELECT 1 AS a UNION ALL SELECT a + 1 FROM t1 WHERE a < 10) SELECT * FROM t1",
558        ).unwrap().into_iter().next().unwrap();
559        let mut binder = mock_binder();
560        let bound = binder.bind(stmt).unwrap();
561
562        let expected = expect![[r#"
563            Query(
564                BoundQuery {
565                    body: Select(
566                        BoundSelect {
567                            distinct: All,
568                            select_items: [
569                                InputRef(
570                                    InputRef {
571                                        index: 0,
572                                        data_type: Int32,
573                                    },
574                                ),
575                            ],
576                            aliases: [
577                                Some(
578                                    "a",
579                                ),
580                            ],
581                            from: Some(
582                                Share(
583                                    BoundShare {
584                                        share_id: 0,
585                                        input: Query(
586                                            Right(
587                                                RecursiveUnion {
588                                                    all: true,
589                                                    base: Select(
590                                                        BoundSelect {
591                                                            distinct: All,
592                                                            select_items: [
593                                                                Literal(
594                                                                    Literal {
595                                                                        data: Some(
596                                                                            Int32(
597                                                                                1,
598                                                                            ),
599                                                                        ),
600                                                                        data_type: Some(
601                                                                            Int32,
602                                                                        ),
603                                                                    },
604                                                                ),
605                                                            ],
606                                                            aliases: [
607                                                                Some(
608                                                                    "a",
609                                                                ),
610                                                            ],
611                                                            from: None,
612                                                            where_clause: None,
613                                                            group_by: GroupKey(
614                                                                [],
615                                                            ),
616                                                            having: None,
617                                                            window: {},
618                                                            schema: Schema {
619                                                                fields: [
620                                                                    a:Int32,
621                                                                ],
622                                                            },
623                                                        },
624                                                    ),
625                                                    recursive: Select(
626                                                        BoundSelect {
627                                                            distinct: All,
628                                                            select_items: [
629                                                                FunctionCall(
630                                                                    FunctionCall {
631                                                                        func_type: Add,
632                                                                        return_type: Int32,
633                                                                        inputs: [
634                                                                            InputRef(
635                                                                                InputRef {
636                                                                                    index: 0,
637                                                                                    data_type: Int32,
638                                                                                },
639                                                                            ),
640                                                                            Literal(
641                                                                                Literal {
642                                                                                    data: Some(
643                                                                                        Int32(
644                                                                                            1,
645                                                                                        ),
646                                                                                    ),
647                                                                                    data_type: Some(
648                                                                                        Int32,
649                                                                                    ),
650                                                                                },
651                                                                            ),
652                                                                        ],
653                                                                    },
654                                                                ),
655                                                            ],
656                                                            aliases: [
657                                                                None,
658                                                            ],
659                                                            from: Some(
660                                                                BackCteRef(
661                                                                    BoundBackCteRef {
662                                                                        share_id: 0,
663                                                                        base: Select(
664                                                                            BoundSelect {
665                                                                                distinct: All,
666                                                                                select_items: [
667                                                                                    Literal(
668                                                                                        Literal {
669                                                                                            data: Some(
670                                                                                                Int32(
671                                                                                                    1,
672                                                                                                ),
673                                                                                            ),
674                                                                                            data_type: Some(
675                                                                                                Int32,
676                                                                                            ),
677                                                                                        },
678                                                                                    ),
679                                                                                ],
680                                                                                aliases: [
681                                                                                    Some(
682                                                                                        "a",
683                                                                                    ),
684                                                                                ],
685                                                                                from: None,
686                                                                                where_clause: None,
687                                                                                group_by: GroupKey(
688                                                                                    [],
689                                                                                ),
690                                                                                having: None,
691                                                                                window: {},
692                                                                                schema: Schema {
693                                                                                    fields: [
694                                                                                        a:Int32,
695                                                                                    ],
696                                                                                },
697                                                                            },
698                                                                        ),
699                                                                    },
700                                                                ),
701                                                            ),
702                                                            where_clause: Some(
703                                                                FunctionCall(
704                                                                    FunctionCall {
705                                                                        func_type: LessThan,
706                                                                        return_type: Boolean,
707                                                                        inputs: [
708                                                                            InputRef(
709                                                                                InputRef {
710                                                                                    index: 0,
711                                                                                    data_type: Int32,
712                                                                                },
713                                                                            ),
714                                                                            Literal(
715                                                                                Literal {
716                                                                                    data: Some(
717                                                                                        Int32(
718                                                                                            10,
719                                                                                        ),
720                                                                                    ),
721                                                                                    data_type: Some(
722                                                                                        Int32,
723                                                                                    ),
724                                                                                },
725                                                                            ),
726                                                                        ],
727                                                                    },
728                                                                ),
729                                                            ),
730                                                            group_by: GroupKey(
731                                                                [],
732                                                            ),
733                                                            having: None,
734                                                            window: {},
735                                                            schema: Schema {
736                                                                fields: [
737                                                                    ?column?:Int32,
738                                                                ],
739                                                            },
740                                                        },
741                                                    ),
742                                                    schema: Schema {
743                                                        fields: [
744                                                            a:Int32,
745                                                        ],
746                                                    },
747                                                },
748                                            ),
749                                        ),
750                                    },
751                                ),
752                            ),
753                            where_clause: None,
754                            group_by: GroupKey(
755                                [],
756                            ),
757                            having: None,
758                            window: {},
759                            schema: Schema {
760                                fields: [
761                                    a:Int32,
762                                ],
763                            },
764                        },
765                    ),
766                    order: [],
767                    limit: None,
768                    offset: None,
769                    with_ties: false,
770                    extra_order_exprs: [],
771                },
772            )"#]];
773
774        expected.assert_eq(&format!("{:#?}", bound));
775    }
776
777    #[tokio::test]
778    async fn test_bind_approx_percentile() {
779        let stmt = risingwave_sqlparser::parser::Parser::parse_sql(
780            "SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (ORDER BY generate_series) FROM generate_series(1, 100)",
781        ).unwrap().into_iter().next().unwrap();
782        let parse_expected = expect![[r#"
783            Query(
784                Query {
785                    with: None,
786                    body: Select(
787                        Select {
788                            distinct: All,
789                            projection: [
790                                UnnamedExpr(
791                                    Function(
792                                        Function {
793                                            scalar_as_agg: false,
794                                            name: ObjectName(
795                                                [
796                                                    Ident {
797                                                        value: "approx_percentile",
798                                                        quote_style: None,
799                                                    },
800                                                ],
801                                            ),
802                                            arg_list: FunctionArgList {
803                                                distinct: false,
804                                                args: [
805                                                    Unnamed(
806                                                        Expr(
807                                                            Value(
808                                                                Number(
809                                                                    "0.5",
810                                                                ),
811                                                            ),
812                                                        ),
813                                                    ),
814                                                    Unnamed(
815                                                        Expr(
816                                                            Value(
817                                                                Number(
818                                                                    "0.01",
819                                                                ),
820                                                            ),
821                                                        ),
822                                                    ),
823                                                ],
824                                                variadic: false,
825                                                order_by: [],
826                                                ignore_nulls: false,
827                                            },
828                                            within_group: Some(
829                                                OrderByExpr {
830                                                    expr: Identifier(
831                                                        Ident {
832                                                            value: "generate_series",
833                                                            quote_style: None,
834                                                        },
835                                                    ),
836                                                    asc: None,
837                                                    nulls_first: None,
838                                                },
839                                            ),
840                                            filter: None,
841                                            over: None,
842                                        },
843                                    ),
844                                ),
845                            ],
846                            from: [
847                                TableWithJoins {
848                                    relation: TableFunction {
849                                        name: ObjectName(
850                                            [
851                                                Ident {
852                                                    value: "generate_series",
853                                                    quote_style: None,
854                                                },
855                                            ],
856                                        ),
857                                        alias: None,
858                                        args: [
859                                            Unnamed(
860                                                Expr(
861                                                    Value(
862                                                        Number(
863                                                            "1",
864                                                        ),
865                                                    ),
866                                                ),
867                                            ),
868                                            Unnamed(
869                                                Expr(
870                                                    Value(
871                                                        Number(
872                                                            "100",
873                                                        ),
874                                                    ),
875                                                ),
876                                            ),
877                                        ],
878                                        with_ordinality: false,
879                                    },
880                                    joins: [],
881                                },
882                            ],
883                            lateral_views: [],
884                            selection: None,
885                            group_by: [],
886                            having: None,
887                            window: [],
888                        },
889                    ),
890                    order_by: [],
891                    limit: None,
892                    offset: None,
893                    fetch: None,
894                },
895            )"#]];
896        parse_expected.assert_eq(&format!("{:#?}", stmt));
897
898        let mut binder = mock_binder();
899        let bound = binder.bind(stmt).unwrap();
900
901        let expected = expect![[r#"
902            Query(
903                BoundQuery {
904                    body: Select(
905                        BoundSelect {
906                            distinct: All,
907                            select_items: [
908                                AggCall(
909                                    AggCall {
910                                        agg_type: Builtin(
911                                            ApproxPercentile,
912                                        ),
913                                        return_type: Float64,
914                                        args: [
915                                            FunctionCall(
916                                                FunctionCall {
917                                                    func_type: Cast,
918                                                    return_type: Float64,
919                                                    inputs: [
920                                                        InputRef(
921                                                            InputRef {
922                                                                index: 0,
923                                                                data_type: Int32,
924                                                            },
925                                                        ),
926                                                    ],
927                                                },
928                                            ),
929                                        ],
930                                        filter: Condition {
931                                            conjunctions: [],
932                                        },
933                                        distinct: false,
934                                        order_by: OrderBy {
935                                            sort_exprs: [
936                                                OrderByExpr {
937                                                    expr: InputRef(
938                                                        InputRef {
939                                                            index: 0,
940                                                            data_type: Int32,
941                                                        },
942                                                    ),
943                                                    order_type: OrderType {
944                                                        direction: Ascending,
945                                                        nulls_are: Largest,
946                                                    },
947                                                },
948                                            ],
949                                        },
950                                        direct_args: [
951                                            Literal {
952                                                data: Some(
953                                                    Float64(
954                                                        OrderedFloat(
955                                                            0.5,
956                                                        ),
957                                                    ),
958                                                ),
959                                                data_type: Some(
960                                                    Float64,
961                                                ),
962                                            },
963                                            Literal {
964                                                data: Some(
965                                                    Float64(
966                                                        OrderedFloat(
967                                                            0.01,
968                                                        ),
969                                                    ),
970                                                ),
971                                                data_type: Some(
972                                                    Float64,
973                                                ),
974                                            },
975                                        ],
976                                    },
977                                ),
978                            ],
979                            aliases: [
980                                Some(
981                                    "approx_percentile",
982                                ),
983                            ],
984                            from: Some(
985                                TableFunction {
986                                    expr: TableFunction(
987                                        FunctionCall {
988                                            function_type: GenerateSeries,
989                                            return_type: Int32,
990                                            args: [
991                                                Literal(
992                                                    Literal {
993                                                        data: Some(
994                                                            Int32(
995                                                                1,
996                                                            ),
997                                                        ),
998                                                        data_type: Some(
999                                                            Int32,
1000                                                        ),
1001                                                    },
1002                                                ),
1003                                                Literal(
1004                                                    Literal {
1005                                                        data: Some(
1006                                                            Int32(
1007                                                                100,
1008                                                            ),
1009                                                        ),
1010                                                        data_type: Some(
1011                                                            Int32,
1012                                                        ),
1013                                                    },
1014                                                ),
1015                                            ],
1016                                        },
1017                                    ),
1018                                    with_ordinality: false,
1019                                },
1020                            ),
1021                            where_clause: None,
1022                            group_by: GroupKey(
1023                                [],
1024                            ),
1025                            having: None,
1026                            window: {},
1027                            schema: Schema {
1028                                fields: [
1029                                    approx_percentile:Float64,
1030                                ],
1031                            },
1032                        },
1033                    ),
1034                    order: [],
1035                    limit: None,
1036                    offset: None,
1037                    with_ties: false,
1038                    extra_order_exprs: [],
1039                },
1040            )"#]];
1041
1042        expected.assert_eq(&format!("{:#?}", bound));
1043    }
1044}