risingwave_frontend/binder/relation/
share.rs1use either::Either;
16use itertools::Itertools;
17use risingwave_common::catalog::Field;
18
19use crate::binder::bind_context::RecursiveUnion;
20use crate::binder::statement::RewriteExprsRecursive;
21use crate::binder::{BoundQuery, Relation, ShareId};
22use crate::error::{ErrorCode, Result};
23use crate::optimizer::plan_node::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP};
24
25#[derive(Debug, Clone)]
29pub enum BoundShareInput {
30 Query(Either<BoundQuery, RecursiveUnion>),
31 ChangeLog(Relation),
32}
33impl BoundShareInput {
34 pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
35 match self {
36 BoundShareInput::Query(q) => match q {
37 Either::Left(q) => Ok(q
38 .schema()
39 .fields()
40 .iter()
41 .cloned()
42 .map(|f| (false, f))
43 .collect_vec()),
44 Either::Right(r) => Ok(r
45 .schema
46 .fields()
47 .iter()
48 .cloned()
49 .map(|f| (false, f))
50 .collect_vec()),
51 },
52 BoundShareInput::ChangeLog(r) => {
53 let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
54 (
55 bound_base_table.table_catalog.columns().to_vec(),
56 bound_base_table.table_catalog.name().to_owned(),
57 )
58 } else if let Relation::Source(bound_source) = r {
59 (
60 bound_source.catalog.columns.clone(),
61 bound_source.catalog.name.clone(),
62 )
63 } else {
64 return Err(ErrorCode::BindError(
65 "Change log CTE must be a table or source".to_owned(),
66 )
67 .into());
68 };
69 let fields = fields
70 .into_iter()
71 .map(|x| {
72 (
73 x.is_hidden,
74 Field::with_name(x.data_type().clone(), x.name()),
75 )
76 })
77 .chain(vec![
78 (
79 false,
80 Field::with_name(
81 risingwave_common::types::DataType::Int16,
82 CHANGELOG_OP.to_owned(),
83 ),
84 ),
85 (
86 true,
87 Field::with_name(
88 risingwave_common::types::DataType::Serial,
89 _CHANGELOG_ROW_ID.to_owned(),
90 ),
91 ),
92 ])
93 .collect();
94 Ok(fields)
95 }
96 }
97 }
98}
99#[derive(Debug, Clone)]
100pub struct BoundShare {
101 pub(crate) share_id: ShareId,
102 pub(crate) input: BoundShareInput,
103}
104
105impl RewriteExprsRecursive for BoundShare {
106 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
107 match &mut self.input {
108 BoundShareInput::Query(q) => match q {
109 Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
110 Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
111 },
112 BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
113 };
114 }
115}