risingwave_frontend/binder/relation/
share.rs1use itertools::Itertools;
16use risingwave_common::catalog::Field;
17
18use crate::binder::statement::RewriteExprsRecursive;
19use crate::binder::{BoundQuery, Relation, ShareId};
20use crate::error::{ErrorCode, Result};
21use crate::optimizer::plan_node::generic::{_CHANGELOG_ROW_ID, CHANGELOG_OP};
22
23#[derive(Debug, Clone)]
27pub enum BoundShareInput {
28 Query(BoundQuery),
29 ChangeLog(Relation),
30}
31impl BoundShareInput {
32 pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
33 match self {
34 BoundShareInput::Query(q) => Ok(q
35 .schema()
36 .fields()
37 .iter()
38 .cloned()
39 .map(|f| (false, f))
40 .collect_vec()),
41 BoundShareInput::ChangeLog(r) => {
42 let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
43 (
44 bound_base_table.table_catalog.columns().to_vec(),
45 bound_base_table.table_catalog.name().to_owned(),
46 )
47 } else if let Relation::Source(bound_source) = r {
48 (
49 bound_source.catalog.columns.clone(),
50 bound_source.catalog.name.clone(),
51 )
52 } else {
53 return Err(ErrorCode::BindError(
54 "Change log CTE must be a table or source".to_owned(),
55 )
56 .into());
57 };
58 let fields = fields
59 .into_iter()
60 .map(|x| {
61 (
62 x.is_hidden,
63 Field::with_name(x.data_type().clone(), x.name()),
64 )
65 })
66 .chain(vec![
67 (
68 false,
69 Field::with_name(
70 risingwave_common::types::DataType::Int16,
71 CHANGELOG_OP.to_owned(),
72 ),
73 ),
74 (
75 true,
76 Field::with_name(
77 risingwave_common::types::DataType::Serial,
78 _CHANGELOG_ROW_ID.to_owned(),
79 ),
80 ),
81 ])
82 .collect();
83 Ok(fields)
84 }
85 }
86 }
87}
88#[derive(Debug, Clone)]
89pub struct BoundShare {
90 pub(crate) share_id: ShareId,
91 pub(crate) input: BoundShareInput,
92}
93
94impl RewriteExprsRecursive for BoundShare {
95 fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
96 match &mut self.input {
97 BoundShareInput::Query(q) => q.rewrite_exprs_recursive(rewriter),
98 BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
99 };
100 }
101}