risingwave_frontend/binder/relation/
share.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::Field;
17
18use crate::binder::statement::RewriteExprsRecursive;
19use crate::binder::{BoundQuery, Relation, ShareId};
20use crate::error::{ErrorCode, Result};
21use crate::optimizer::plan_node::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP};
22
23/// Share a relation during binding and planning.
24/// It could be used to share a (recursive) CTE, a source, a view and so on.
25
26#[derive(Debug, Clone)]
27pub enum BoundShareInput {
28    Query(BoundQuery),
29    ChangeLog(Relation),
30}
31impl BoundShareInput {
32    pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
33        match self {
34            BoundShareInput::Query(q) => Ok(q
35                .schema()
36                .fields()
37                .iter()
38                .cloned()
39                .map(|f| (false, f))
40                .collect_vec()),
41            BoundShareInput::ChangeLog(r) => {
42                let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
43                    (
44                        bound_base_table.table_catalog.columns().to_vec(),
45                        bound_base_table.table_catalog.name().to_owned(),
46                    )
47                } else if let Relation::Source(bound_source) = r {
48                    (
49                        bound_source.catalog.columns.clone(),
50                        bound_source.catalog.name.clone(),
51                    )
52                } else {
53                    return Err(ErrorCode::BindError(
54                        "Change log CTE must be a table or source".to_owned(),
55                    )
56                    .into());
57                };
58                let fields = fields
59                    .into_iter()
60                    .map(|x| {
61                        (
62                            x.is_hidden,
63                            Field::with_name(x.data_type().clone(), x.name()),
64                        )
65                    })
66                    .chain(vec![
67                        (
68                            false,
69                            Field::with_name(
70                                risingwave_common::types::DataType::Int16,
71                                CHANGELOG_OP.to_owned(),
72                            ),
73                        ),
74                        (
75                            true,
76                            Field::with_name(
77                                risingwave_common::types::DataType::Serial,
78                                _CHANGELOG_ROW_ID.to_owned(),
79                            ),
80                        ),
81                    ])
82                    .collect();
83                Ok(fields)
84            }
85        }
86    }
87}
88#[derive(Debug, Clone)]
89pub struct BoundShare {
90    pub(crate) share_id: ShareId,
91    pub(crate) input: BoundShareInput,
92}
93
94impl RewriteExprsRecursive for BoundShare {
95    fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
96        match &mut self.input {
97            BoundShareInput::Query(q) => q.rewrite_exprs_recursive(rewriter),
98            BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
99        };
100    }
101}