risingwave_frontend/binder/relation/
share.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use itertools::Itertools;
17use risingwave_common::catalog::Field;
18
19use crate::binder::bind_context::RecursiveUnion;
20use crate::binder::statement::RewriteExprsRecursive;
21use crate::binder::{BoundQuery, Relation, ShareId};
22use crate::error::{ErrorCode, Result};
23use crate::optimizer::plan_node::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP};
24
25/// Share a relation during binding and planning.
26/// It could be used to share a (recursive) CTE, a source, a view and so on.
27
28#[derive(Debug, Clone)]
29pub enum BoundShareInput {
30    Query(Either<BoundQuery, RecursiveUnion>),
31    ChangeLog(Relation),
32}
33impl BoundShareInput {
34    pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
35        match self {
36            BoundShareInput::Query(q) => match q {
37                Either::Left(q) => Ok(q
38                    .schema()
39                    .fields()
40                    .iter()
41                    .cloned()
42                    .map(|f| (false, f))
43                    .collect_vec()),
44                Either::Right(r) => Ok(r
45                    .schema
46                    .fields()
47                    .iter()
48                    .cloned()
49                    .map(|f| (false, f))
50                    .collect_vec()),
51            },
52            BoundShareInput::ChangeLog(r) => {
53                let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
54                    (
55                        bound_base_table.table_catalog.columns().to_vec(),
56                        bound_base_table.table_catalog.name().to_owned(),
57                    )
58                } else {
59                    return Err(ErrorCode::BindError(
60                        "Change log CTE must be a base table".to_owned(),
61                    )
62                    .into());
63                };
64                let fields = fields
65                    .into_iter()
66                    .map(|x| {
67                        (
68                            x.is_hidden,
69                            Field::with_name(x.data_type().clone(), x.name()),
70                        )
71                    })
72                    .chain(vec![
73                        (
74                            false,
75                            Field::with_name(
76                                risingwave_common::types::DataType::Int16,
77                                CHANGELOG_OP.to_owned(),
78                            ),
79                        ),
80                        (
81                            true,
82                            Field::with_name(
83                                risingwave_common::types::DataType::Serial,
84                                _CHANGELOG_ROW_ID.to_owned(),
85                            ),
86                        ),
87                    ])
88                    .collect();
89                Ok(fields)
90            }
91        }
92    }
93}
94#[derive(Debug, Clone)]
95pub struct BoundShare {
96    pub(crate) share_id: ShareId,
97    pub(crate) input: BoundShareInput,
98}
99
100impl RewriteExprsRecursive for BoundShare {
101    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
102        match &mut self.input {
103            BoundShareInput::Query(q) => match q {
104                Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
105                Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
106            },
107            BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
108        };
109    }
110}