risingwave_frontend/binder/relation/share.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use either::Either;
use itertools::Itertools;
use risingwave_common::catalog::Field;
use crate::binder::bind_context::RecursiveUnion;
use crate::binder::statement::RewriteExprsRecursive;
use crate::binder::{BoundQuery, Relation, ShareId};
use crate::error::{ErrorCode, Result};
use crate::optimizer::plan_node::generic::{CHANGELOG_OP, _CHANGELOG_ROW_ID};
/// Share a relation during binding and planning.
/// It could be used to share a (recursive) CTE, a source, a view and so on.
#[derive(Debug, Clone)]
pub enum BoundShareInput {
Query(Either<BoundQuery, RecursiveUnion>),
ChangeLog(Relation),
}
impl BoundShareInput {
pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
match self {
BoundShareInput::Query(q) => match q {
Either::Left(q) => Ok(q
.schema()
.fields()
.iter()
.cloned()
.map(|f| (false, f))
.collect_vec()),
Either::Right(r) => Ok(r
.schema
.fields()
.iter()
.cloned()
.map(|f| (false, f))
.collect_vec()),
},
BoundShareInput::ChangeLog(r) => {
let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
(
bound_base_table.table_catalog.columns().to_vec(),
bound_base_table.table_catalog.name().to_string(),
)
} else {
return Err(ErrorCode::BindError(
"Change log CTE must be a base table".to_string(),
)
.into());
};
let fields = fields
.into_iter()
.map(|x| {
(
x.is_hidden,
Field::with_name(x.data_type().clone(), x.name()),
)
})
.chain(vec![
(
false,
Field::with_name(
risingwave_common::types::DataType::Int16,
CHANGELOG_OP.to_string(),
),
),
(
true,
Field::with_name(
risingwave_common::types::DataType::Serial,
_CHANGELOG_ROW_ID.to_string(),
),
),
])
.collect();
Ok(fields)
}
}
}
}
#[derive(Debug, Clone)]
pub struct BoundShare {
pub(crate) share_id: ShareId,
pub(crate) input: BoundShareInput,
}
impl RewriteExprsRecursive for BoundShare {
fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
match &mut self.input {
BoundShareInput::Query(q) => match q {
Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
},
BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
};
}
}