risingwave_frontend/binder/relation/
share.rs1use either::Either;
16use itertools::Itertools;
17use risingwave_common::catalog::Field;
18
19use crate::binder::bind_context::RecursiveUnion;
20use crate::binder::statement::RewriteExprsRecursive;
21use crate::binder::{BoundQuery, Relation, ShareId};
22use crate::error::{ErrorCode, Result};
23use crate::optimizer::plan_node::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP};
24
25#[derive(Debug, Clone)]
29pub enum BoundShareInput {
30 Query(Either<BoundQuery, RecursiveUnion>),
31 ChangeLog(Relation),
32}
33impl BoundShareInput {
34 pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
35 match self {
36 BoundShareInput::Query(q) => match q {
37 Either::Left(q) => Ok(q
38 .schema()
39 .fields()
40 .iter()
41 .cloned()
42 .map(|f| (false, f))
43 .collect_vec()),
44 Either::Right(r) => Ok(r
45 .schema
46 .fields()
47 .iter()
48 .cloned()
49 .map(|f| (false, f))
50 .collect_vec()),
51 },
52 BoundShareInput::ChangeLog(r) => {
53 let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
54 (
55 bound_base_table.table_catalog.columns().to_vec(),
56 bound_base_table.table_catalog.name().to_owned(),
57 )
58 } else {
59 return Err(ErrorCode::BindError(
60 "Change log CTE must be a base table".to_owned(),
61 )
62 .into());
63 };
64 let fields = fields
65 .into_iter()
66 .map(|x| {
67 (
68 x.is_hidden,
69 Field::with_name(x.data_type().clone(), x.name()),
70 )
71 })
72 .chain(vec![
73 (
74 false,
75 Field::with_name(
76 risingwave_common::types::DataType::Int16,
77 CHANGELOG_OP.to_owned(),
78 ),
79 ),
80 (
81 true,
82 Field::with_name(
83 risingwave_common::types::DataType::Serial,
84 _CHANGELOG_ROW_ID.to_owned(),
85 ),
86 ),
87 ])
88 .collect();
89 Ok(fields)
90 }
91 }
92 }
93}
94#[derive(Debug, Clone)]
95pub struct BoundShare {
96 pub(crate) share_id: ShareId,
97 pub(crate) input: BoundShareInput,
98}
99
100impl RewriteExprsRecursive for BoundShare {
101 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
102 match &mut self.input {
103 BoundShareInput::Query(q) => match q {
104 Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
105 Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
106 },
107 BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
108 };
109 }
110}