risingwave_frontend/binder/relation/
share.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use itertools::Itertools;
17use risingwave_common::catalog::Field;
18
19use crate::binder::bind_context::RecursiveUnion;
20use crate::binder::statement::RewriteExprsRecursive;
21use crate::binder::{BoundQuery, Relation, ShareId};
22use crate::error::{ErrorCode, Result};
23use crate::optimizer::plan_node::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP};
24
25/// Share a relation during binding and planning.
26/// It could be used to share a (recursive) CTE, a source, a view and so on.
27
28#[derive(Debug, Clone)]
29pub enum BoundShareInput {
30    Query(Either<BoundQuery, RecursiveUnion>),
31    ChangeLog(Relation),
32}
33impl BoundShareInput {
34    pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
35        match self {
36            BoundShareInput::Query(q) => match q {
37                Either::Left(q) => Ok(q
38                    .schema()
39                    .fields()
40                    .iter()
41                    .cloned()
42                    .map(|f| (false, f))
43                    .collect_vec()),
44                Either::Right(r) => Ok(r
45                    .schema
46                    .fields()
47                    .iter()
48                    .cloned()
49                    .map(|f| (false, f))
50                    .collect_vec()),
51            },
52            BoundShareInput::ChangeLog(r) => {
53                let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
54                    (
55                        bound_base_table.table_catalog.columns().to_vec(),
56                        bound_base_table.table_catalog.name().to_owned(),
57                    )
58                } else if let Relation::Source(bound_source) = r {
59                    (
60                        bound_source.catalog.columns.clone(),
61                        bound_source.catalog.name.clone(),
62                    )
63                } else {
64                    return Err(ErrorCode::BindError(
65                        "Change log CTE must be a table or source".to_owned(),
66                    )
67                    .into());
68                };
69                let fields = fields
70                    .into_iter()
71                    .map(|x| {
72                        (
73                            x.is_hidden,
74                            Field::with_name(x.data_type().clone(), x.name()),
75                        )
76                    })
77                    .chain(vec![
78                        (
79                            false,
80                            Field::with_name(
81                                risingwave_common::types::DataType::Int16,
82                                CHANGELOG_OP.to_owned(),
83                            ),
84                        ),
85                        (
86                            true,
87                            Field::with_name(
88                                risingwave_common::types::DataType::Serial,
89                                _CHANGELOG_ROW_ID.to_owned(),
90                            ),
91                        ),
92                    ])
93                    .collect();
94                Ok(fields)
95            }
96        }
97    }
98}
99#[derive(Debug, Clone)]
100pub struct BoundShare {
101    pub(crate) share_id: ShareId,
102    pub(crate) input: BoundShareInput,
103}
104
105impl RewriteExprsRecursive for BoundShare {
106    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
107        match &mut self.input {
108            BoundShareInput::Query(q) => match q {
109                Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
110                Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
111            },
112            BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
113        };
114    }
115}