risingwave_frontend/optimizer/plan_node/
stream_iceberg_with_pk_index_writer.rs1use anyhow::Context;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_connector::sink::catalog::desc::SinkDesc;
20use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22
23use super::stream::prelude::*;
24use crate::TableCatalog;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::utils::{
27 Distill, IndicesDisplay, TableCatalogBuilder, childless_record,
28};
29use crate::optimizer::plan_node::{
30 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
31};
32use crate::optimizer::property::{
33 Distribution, FunctionalDependencySet, MonotonicityMap, WatermarkColumns,
34};
35use crate::stream_fragmenter::BuildFragmentGraphState;
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamIcebergWithPkIndexWriter {
44 pub base: PlanBase<Stream>,
45 pub input: PlanRef,
46 pub sink_desc: SinkDesc,
47 pub pk_index_table: TableCatalog,
48}
49
50impl StreamIcebergWithPkIndexWriter {
51 pub fn from_stream_sink(sink: &super::StreamSink) -> Result<Self> {
52 let output_schema = output_schema();
53 let fd_set = FunctionalDependencySet::new(output_schema.len());
54 let dist = match sink.distribution() {
55 Distribution::Single => Distribution::Single,
56 _ => Distribution::SomeShard,
57 };
58 let base = PlanBase::new_stream(
59 sink.ctx(),
60 output_schema,
61 Some(vec![]),
62 fd_set,
63 dist,
64 StreamKind::AppendOnly,
65 sink.emit_on_window_close(),
66 WatermarkColumns::new(),
67 MonotonicityMap::new(),
68 );
69 let pk_index_table = build_iceberg_pk_state_table(sink.sink_desc())?;
70 Ok(Self {
71 base,
72 input: sink.input(),
73 sink_desc: sink.sink_desc().clone(),
74 pk_index_table,
75 })
76 }
77}
78
79fn output_schema() -> risingwave_common::catalog::Schema {
80 risingwave_common::catalog::Schema::new(vec![
81 Field::with_name(DataType::Varchar, "file_path"),
82 Field::with_name(DataType::Int64, "position"),
83 ])
84}
85
86fn build_iceberg_pk_state_table(sink_desc: &SinkDesc) -> Result<TableCatalog> {
87 let mut builder = TableCatalogBuilder::default();
88
89 let downstream_pk = sink_desc
90 .downstream_pk
91 .as_ref()
92 .context("Missing downstream PK in Iceberg sink desc")?;
93 for &idx in downstream_pk {
94 let order = &sink_desc.plan_pk[idx];
95 builder.add_column(&Field::from(
96 &sink_desc.columns[order.column_index].column_desc,
97 ));
98 }
99 builder.add_column(&Field::with_name(DataType::Varchar, "file_path"));
100 builder.add_column(&Field::with_name(DataType::Int64, "position"));
101
102 for (idx, order) in sink_desc.plan_pk.iter().enumerate() {
103 builder.add_order_column(idx, order.order_type);
104 }
105
106 let res = builder.build(
107 (0..sink_desc.plan_pk.len()).collect(),
108 sink_desc.plan_pk.len(),
109 );
110 Ok(res)
111}
112
113impl Distill for StreamIcebergWithPkIndexWriter {
114 fn distill<'a>(&self) -> XmlNode<'a> {
115 let column_names = self
116 .sink_desc
117 .columns
118 .iter()
119 .map(|col| col.name_with_hidden().to_string())
120 .map(Pretty::from)
121 .collect();
122 let column_names = Pretty::Array(column_names);
123 let mut vec = Vec::with_capacity(2);
124 vec.push(("columns", column_names));
125 if let Some(pk) = &self.sink_desc.downstream_pk {
126 let sink_pk = IndicesDisplay {
127 indices: pk,
128 schema: self.input.schema(),
129 };
130 vec.push(("downstream_pk", sink_pk.distill()));
131 }
132
133 childless_record("StreamIcebergWithPkIndexWriter", vec)
134 }
135}
136
137impl PlanTreeNodeUnary<Stream> for StreamIcebergWithPkIndexWriter {
138 fn input(&self) -> PlanRef {
139 self.input.clone()
140 }
141
142 fn clone_with_input(&self, input: PlanRef) -> Self {
143 Self {
144 base: self.base.clone(),
145 input,
146 sink_desc: self.sink_desc.clone(),
147 pk_index_table: self.pk_index_table.clone(),
148 }
149 }
150}
151
152impl_plan_tree_node_for_unary! { Stream, StreamIcebergWithPkIndexWriter }
153
154impl StreamNode for StreamIcebergWithPkIndexWriter {
155 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
156 let pk_index_table = self
157 .pk_index_table
158 .clone()
159 .with_id(state.gen_table_id_wrapped());
160
161 NodeBody::IcebergWithPkIndexWriter(Box::new(IcebergWithPkIndexWriterNode {
162 sink_desc: Some(self.sink_desc.to_proto()),
163 pk_index_table: Some(pk_index_table.to_internal_table_prost()),
164 }))
165 }
166}
167
168impl ExprRewritable<Stream> for StreamIcebergWithPkIndexWriter {}
169
170impl ExprVisitable for StreamIcebergWithPkIndexWriter {}