risingwave_frontend/optimizer/plan_node/
stream_iceberg_with_pk_index_writer.rs1use anyhow::Context;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::sink::catalog::desc::SinkDesc;
21use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
22use risingwave_pb::stream_plan::stream_node::NodeBody;
23
24use super::stream::prelude::*;
25use crate::TableCatalog;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::{
28 Distill, IndicesDisplay, TableCatalogBuilder, childless_record,
29};
30use crate::optimizer::plan_node::{
31 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
32};
33use crate::optimizer::property::{
34 Distribution, FunctionalDependencySet, MonotonicityMap, WatermarkColumns,
35};
36use crate::stream_fragmenter::BuildFragmentGraphState;
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct StreamIcebergWithPkIndexWriter {
45 pub base: PlanBase<Stream>,
46 pub input: PlanRef,
47 pub sink_desc: SinkDesc,
48 pub pk_index_table: TableCatalog,
49}
50
51impl StreamIcebergWithPkIndexWriter {
52 pub fn from_stream_sink(sink: &super::StreamSink) -> Result<Self> {
53 let output_schema = output_schema();
54 let fd_set = FunctionalDependencySet::new(output_schema.len());
55 let dist = match sink.distribution() {
56 Distribution::Single => Distribution::Single,
57 _ => Distribution::SomeShard,
58 };
59 let base = PlanBase::new_stream(
60 sink.ctx(),
61 output_schema,
62 sink.stream_key().map(|v| v.to_vec()),
63 fd_set,
64 dist,
65 StreamKind::AppendOnly,
66 sink.emit_on_window_close(),
67 WatermarkColumns::new(),
68 MonotonicityMap::new(),
69 );
70 let pk_index_table = build_iceberg_pk_state_table(sink.sink_desc())?;
71 Ok(Self {
72 base,
73 input: sink.input(),
74 sink_desc: sink.sink_desc().clone(),
75 pk_index_table,
76 })
77 }
78}
79
80fn output_schema() -> risingwave_common::catalog::Schema {
81 risingwave_common::catalog::Schema::new(vec![
82 Field::with_name(DataType::Varchar, "file_path"),
83 Field::with_name(DataType::Int64, "position"),
84 ])
85}
86
87fn build_iceberg_pk_state_table(sink_desc: &SinkDesc) -> Result<TableCatalog> {
88 let mut builder = TableCatalogBuilder::default();
89
90 let downstream_pk = sink_desc
91 .downstream_pk
92 .as_ref()
93 .context("Missing downstream PK in Iceberg sink desc")?;
94 for &idx in downstream_pk {
95 builder.add_column(&Field::from(&sink_desc.columns[idx].column_desc));
96 }
97 builder.add_column(&Field::with_name(DataType::Varchar, "file_path"));
98 builder.add_column(&Field::with_name(DataType::Int64, "position"));
99
100 for idx in 0..downstream_pk.len() {
101 builder.add_order_column(idx, OrderType::ascending());
102 }
103
104 let res = builder.build((0..downstream_pk.len()).collect(), downstream_pk.len());
105 Ok(res)
106}
107
108impl Distill for StreamIcebergWithPkIndexWriter {
109 fn distill<'a>(&self) -> XmlNode<'a> {
110 let column_names = self
111 .sink_desc
112 .columns
113 .iter()
114 .map(|col| col.name_with_hidden().to_string())
115 .map(Pretty::from)
116 .collect();
117 let column_names = Pretty::Array(column_names);
118 let mut vec = Vec::with_capacity(2);
119 vec.push(("columns", column_names));
120 if let Some(pk) = &self.sink_desc.downstream_pk {
121 let sink_pk = IndicesDisplay {
122 indices: pk,
123 schema: self.input.schema(),
124 };
125 vec.push(("downstream_pk", sink_pk.distill()));
126 }
127
128 childless_record("StreamIcebergWithPkIndexWriter", vec)
129 }
130}
131
132impl PlanTreeNodeUnary<Stream> for StreamIcebergWithPkIndexWriter {
133 fn input(&self) -> PlanRef {
134 self.input.clone()
135 }
136
137 fn clone_with_input(&self, input: PlanRef) -> Self {
138 Self {
139 base: self.base.clone(),
140 input,
141 sink_desc: self.sink_desc.clone(),
142 pk_index_table: self.pk_index_table.clone(),
143 }
144 }
145}
146
147impl_plan_tree_node_for_unary! { Stream, StreamIcebergWithPkIndexWriter }
148
149impl StreamNode for StreamIcebergWithPkIndexWriter {
150 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
151 let pk_index_table = self
152 .pk_index_table
153 .clone()
154 .with_id(state.gen_table_id_wrapped());
155
156 NodeBody::IcebergWithPkIndexWriter(Box::new(IcebergWithPkIndexWriterNode {
157 sink_desc: Some(self.sink_desc.to_proto()),
158 pk_index_table: Some(pk_index_table.to_internal_table_prost()),
159 }))
160 }
161}
162
163impl ExprRewritable<Stream> for StreamIcebergWithPkIndexWriter {}
164
165impl ExprVisitable for StreamIcebergWithPkIndexWriter {}
166
167#[cfg(test)]
168mod tests {
169 use std::collections::BTreeMap;
170
171 use risingwave_common::catalog::{
172 ColumnCatalog, ColumnDesc, ColumnId, CreateType, DEFAULT_SUPER_USER_ID, StreamJobStatus,
173 };
174 use risingwave_common::util::sort_util::ColumnOrder;
175 use risingwave_connector::sink::catalog::{SinkId, SinkType};
176
177 use super::*;
178
179 fn test_sink_desc() -> SinkDesc {
180 SinkDesc {
181 id: SinkId::placeholder(),
182 name: "s".to_owned(),
183 definition: "".to_owned(),
184 columns: vec![
185 ColumnCatalog::visible(ColumnDesc::named("id", ColumnId::new(0), DataType::Int32)),
186 ColumnCatalog::visible(ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32)),
187 ColumnCatalog::hidden(ColumnDesc::named(
188 "_row_id",
189 ColumnId::new(2),
190 DataType::Serial,
191 )),
192 ],
193 plan_pk: vec![ColumnOrder::new(2, OrderType::ascending())],
194 downstream_pk: Some(vec![1]),
195 distribution_key: vec![1],
196 properties: BTreeMap::new(),
197 secret_refs: BTreeMap::new(),
198 sink_type: SinkType::Upsert,
199 ignore_delete: false,
200 format_desc: None,
201 db_name: "dev".to_owned(),
202 sink_from_name: "t".to_owned(),
203 target_table: None,
204 extra_partition_col_idx: None,
205 create_type: CreateType::Foreground,
206 is_exactly_once: None,
207 auto_refresh_schema_from_table: None,
208 }
209 }
210
211 #[test]
212 fn test_build_iceberg_pk_state_table_uses_downstream_pk_columns() {
213 let table = build_iceberg_pk_state_table(&test_sink_desc()).unwrap();
214
215 assert_eq!(table.columns()[0].name(), "v1");
216 assert_eq!(table.columns()[1].name(), "file_path");
217 assert_eq!(table.columns()[2].name(), "position");
218 assert_eq!(table.pk().len(), 1);
219 assert_eq!(table.pk()[0].column_index, 0);
220 assert_eq!(table.distribution_key(), &[0]);
221 assert_eq!(table.read_prefix_len_hint, 1);
222 assert_eq!(table.owner, DEFAULT_SUPER_USER_ID);
223 assert_eq!(table.stream_job_status, StreamJobStatus::Creating);
224 }
225}