risingwave_frontend/optimizer/plan_node/
stream_iceberg_with_pk_index_writer.rs1use anyhow::Context;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::sink::catalog::desc::SinkDesc;
21use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
22use risingwave_pb::stream_plan::stream_node::NodeBody;
23
24use super::stream::prelude::*;
25use crate::TableCatalog;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
28use crate::optimizer::plan_node::{
29 ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::optimizer::property::{
32 Distribution, FunctionalDependencySet, MonotonicityMap, WatermarkColumns,
33};
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct StreamIcebergWithPkIndexWriter {
43 pub base: PlanBase<Stream>,
44 pub input: PlanRef,
45 pub sink_desc: SinkDesc,
46 pub pk_index_table: TableCatalog,
47}
48
49impl StreamIcebergWithPkIndexWriter {
50 pub fn from_stream_sink(sink: &super::StreamSink) -> Result<Self> {
51 let output_schema = output_schema();
52 let fd_set = FunctionalDependencySet::new(output_schema.len());
53 let dist = match sink.distribution() {
54 Distribution::Single => Distribution::Single,
55 _ => Distribution::SomeShard,
56 };
57 let base = PlanBase::new_stream(
58 sink.ctx(),
59 output_schema,
60 sink.stream_key().map(|v| v.to_vec()),
61 fd_set,
62 dist,
63 StreamKind::AppendOnly,
64 sink.emit_on_window_close(),
65 WatermarkColumns::new(),
66 MonotonicityMap::new(),
67 );
68 let pk_index_table = build_iceberg_pk_state_table(sink.sink_desc())?;
69 Ok(Self {
70 base,
71 input: sink.input(),
72 sink_desc: sink.sink_desc().clone(),
73 pk_index_table,
74 })
75 }
76}
77
78fn output_schema() -> risingwave_common::catalog::Schema {
79 risingwave_common::catalog::Schema::new(vec![
80 Field::with_name(DataType::Varchar, "file_path"),
81 Field::with_name(DataType::Int64, "position"),
82 ])
83}
84
85fn build_iceberg_pk_state_table(sink_desc: &SinkDesc) -> Result<TableCatalog> {
86 let mut builder = TableCatalogBuilder::default();
87
88 let downstream_pk = sink_desc
89 .downstream_pk
90 .as_deref()
91 .context("Missing downstream PK in Iceberg sink desc")?;
92 for &idx in downstream_pk {
93 builder.add_column(&Field::from(&sink_desc.columns[idx].column_desc));
94 }
95 builder.add_column(&Field::with_name(DataType::Varchar, "file_path"));
96 builder.add_column(&Field::with_name(DataType::Int64, "position"));
97
98 for idx in 0..downstream_pk.len() {
99 builder.add_order_column(idx, OrderType::ascending());
100 }
101
102 let res = builder.build((0..downstream_pk.len()).collect(), downstream_pk.len());
103 Ok(res)
104}
105
106impl Distill for StreamIcebergWithPkIndexWriter {
107 fn distill<'a>(&self) -> XmlNode<'a> {
108 let column_names = self
109 .sink_desc
110 .columns
111 .iter()
112 .map(|col| col.name_with_hidden().to_string())
113 .map(Pretty::from)
114 .collect();
115 let column_names = Pretty::Array(column_names);
116 let mut vec = Vec::with_capacity(2);
117 vec.push(("columns", column_names));
118 if let Some(pk) = &self.sink_desc.downstream_pk {
119 let column_names = pk
120 .iter()
121 .map(|&idx| self.sink_desc.columns[idx].name_with_hidden().to_string())
122 .map(Pretty::from)
123 .collect();
124 let column_names = Pretty::Array(column_names);
125 vec.push(("downstream_pk", column_names));
126 }
127
128 childless_record("StreamIcebergWithPkIndexWriter", vec)
129 }
130}
131
132impl PlanTreeNodeUnary<Stream> for StreamIcebergWithPkIndexWriter {
133 fn input(&self) -> PlanRef {
134 self.input.clone()
135 }
136
137 fn clone_with_input(&self, input: PlanRef) -> Self {
138 Self {
139 base: self.base.clone(),
140 input,
141 sink_desc: self.sink_desc.clone(),
142 pk_index_table: self.pk_index_table.clone(),
143 }
144 }
145}
146
147impl_plan_tree_node_for_unary! { Stream, StreamIcebergWithPkIndexWriter }
148
149impl StreamNode for StreamIcebergWithPkIndexWriter {
150 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
151 let pk_index_table = self
152 .pk_index_table
153 .clone()
154 .with_id(state.gen_table_id_wrapped());
155
156 NodeBody::IcebergWithPkIndexWriter(Box::new(IcebergWithPkIndexWriterNode {
157 sink_desc: Some(self.sink_desc.to_proto()),
158 pk_index_table: Some(pk_index_table.to_internal_table_prost()),
159 }))
160 }
161}
162
163impl ExprRewritable<Stream> for StreamIcebergWithPkIndexWriter {}
164
165impl ExprVisitable for StreamIcebergWithPkIndexWriter {}
166
167#[cfg(test)]
168mod tests {
169 use std::collections::BTreeMap;
170
171 use risingwave_common::catalog::{
172 ColumnCatalog, ColumnDesc, ColumnId, CreateType, DEFAULT_SUPER_USER_ID, StreamJobStatus,
173 };
174 use risingwave_common::util::sort_util::ColumnOrder;
175 use risingwave_connector::sink::catalog::{SinkId, SinkType};
176
177 use super::*;
178
179 fn test_sink_desc() -> SinkDesc {
180 SinkDesc {
181 id: SinkId::placeholder(),
182 name: "s".to_owned(),
183 definition: "".to_owned(),
184 columns: vec![
185 ColumnCatalog::visible(ColumnDesc::named("id", ColumnId::new(0), DataType::Int32)),
186 ColumnCatalog::visible(ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32)),
187 ColumnCatalog::hidden(ColumnDesc::named(
188 "_row_id",
189 ColumnId::new(2),
190 DataType::Serial,
191 )),
192 ],
193 plan_pk: vec![ColumnOrder::new(2, OrderType::ascending())],
194 downstream_pk: Some(vec![1]),
195 distribution_key: vec![1],
196 properties: BTreeMap::new(),
197 secret_refs: BTreeMap::new(),
198 sink_type: SinkType::Upsert,
199 ignore_delete: false,
200 format_desc: None,
201 db_name: "dev".to_owned(),
202 sink_from_name: "t".to_owned(),
203 target_table: None,
204 extra_partition_col_idx: None,
205 create_type: CreateType::Foreground,
206 is_exactly_once: None,
207 auto_refresh_schema_from_table: None,
208 }
209 }
210
211 #[test]
212 fn test_build_iceberg_pk_state_table_uses_downstream_pk_columns() {
213 let table = build_iceberg_pk_state_table(&test_sink_desc()).unwrap();
214
215 assert_eq!(table.columns()[0].name(), "v1");
216 assert_eq!(table.columns()[1].name(), "file_path");
217 assert_eq!(table.columns()[2].name(), "position");
218 assert_eq!(table.pk().len(), 1);
219 assert_eq!(table.pk()[0].column_index, 0);
220 assert_eq!(table.distribution_key(), &[0]);
221 assert_eq!(table.read_prefix_len_hint, 1);
222 assert_eq!(table.owner, DEFAULT_SUPER_USER_ID);
223 assert_eq!(table.stream_job_status, StreamJobStatus::Creating);
224 }
225
226 #[test]
227 fn test_build_iceberg_pk_state_table_with_multi_column_pk() {
228 let mut desc = test_sink_desc();
231 desc.columns.push(ColumnCatalog::visible(ColumnDesc::named(
232 "order_id",
233 ColumnId::new(3),
234 DataType::Int64,
235 )));
236 desc.columns.push(ColumnCatalog::visible(ColumnDesc::named(
237 "shard_id",
238 ColumnId::new(4),
239 DataType::Int64,
240 )));
241 desc.downstream_pk = Some(vec![1, 3, 4]); let table = build_iceberg_pk_state_table(&desc).unwrap();
244
245 let names: Vec<_> = table
246 .columns()
247 .iter()
248 .map(|c| c.name().to_owned())
249 .collect();
250 assert_eq!(
251 names,
252 vec!["v1", "order_id", "shard_id", "file_path", "position"]
253 );
254 assert_eq!(table.pk().len(), 3);
255 assert_eq!(table.distribution_key(), &[0, 1, 2]);
256 assert_eq!(table.read_prefix_len_hint, 3);
257 }
258
259 #[test]
260 fn test_build_iceberg_pk_state_table_with_visible_extra_only() {
261 let mut desc = test_sink_desc();
263 desc.columns.push(ColumnCatalog::visible(ColumnDesc::named(
264 "order_id",
265 ColumnId::new(3),
266 DataType::Int64,
267 )));
268 desc.downstream_pk = Some(vec![1, 3]); let table = build_iceberg_pk_state_table(&desc).unwrap();
271
272 let names: Vec<_> = table
273 .columns()
274 .iter()
275 .map(|c| c.name().to_owned())
276 .collect();
277 assert_eq!(names, vec!["v1", "order_id", "file_path", "position"]);
278 assert_eq!(table.pk().len(), 2);
279 assert_eq!(table.distribution_key(), &[0, 1]);
280 assert_eq!(table.read_prefix_len_hint, 2);
281 }
282}