Skip to main content

risingwave_frontend/optimizer/plan_node/
stream_iceberg_with_pk_index_writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::sink::catalog::desc::SinkDesc;
21use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
22use risingwave_pb::stream_plan::stream_node::NodeBody;
23
24use super::stream::prelude::*;
25use crate::TableCatalog;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::{Distill, TableCatalogBuilder, childless_record};
28use crate::optimizer::plan_node::{
29    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
30};
31use crate::optimizer::property::{
32    Distribution, FunctionalDependencySet, MonotonicityMap, WatermarkColumns,
33};
34use crate::stream_fragmenter::BuildFragmentGraphState;
35
36/// `StreamIcebergWithPkIndexWriter` is the stateful writer executor for the Iceberg
37/// with pk index sink. It maintains a PK index and writes data files to Iceberg.
38///
39/// Output schema: `[file_path: Varchar, position: Int64]` — delete-position info for
40/// the downstream `DvMerger`.
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct StreamIcebergWithPkIndexWriter {
43    pub base: PlanBase<Stream>,
44    pub input: PlanRef,
45    pub sink_desc: SinkDesc,
46    pub pk_index_table: TableCatalog,
47}
48
49impl StreamIcebergWithPkIndexWriter {
50    pub fn from_stream_sink(sink: &super::StreamSink) -> Result<Self> {
51        let output_schema = output_schema();
52        let fd_set = FunctionalDependencySet::new(output_schema.len());
53        let dist = match sink.distribution() {
54            Distribution::Single => Distribution::Single,
55            _ => Distribution::SomeShard,
56        };
57        let base = PlanBase::new_stream(
58            sink.ctx(),
59            output_schema,
60            sink.stream_key().map(|v| v.to_vec()),
61            fd_set,
62            dist,
63            StreamKind::AppendOnly,
64            sink.emit_on_window_close(),
65            WatermarkColumns::new(),
66            MonotonicityMap::new(),
67        );
68        let pk_index_table = build_iceberg_pk_state_table(sink.sink_desc())?;
69        Ok(Self {
70            base,
71            input: sink.input(),
72            sink_desc: sink.sink_desc().clone(),
73            pk_index_table,
74        })
75    }
76}
77
78fn output_schema() -> risingwave_common::catalog::Schema {
79    risingwave_common::catalog::Schema::new(vec![
80        Field::with_name(DataType::Varchar, "file_path"),
81        Field::with_name(DataType::Int64, "position"),
82    ])
83}
84
85fn build_iceberg_pk_state_table(sink_desc: &SinkDesc) -> Result<TableCatalog> {
86    let mut builder = TableCatalogBuilder::default();
87
88    let downstream_pk = sink_desc
89        .downstream_pk
90        .as_deref()
91        .context("Missing downstream PK in Iceberg sink desc")?;
92    for &idx in downstream_pk {
93        builder.add_column(&Field::from(&sink_desc.columns[idx].column_desc));
94    }
95    builder.add_column(&Field::with_name(DataType::Varchar, "file_path"));
96    builder.add_column(&Field::with_name(DataType::Int64, "position"));
97
98    for idx in 0..downstream_pk.len() {
99        builder.add_order_column(idx, OrderType::ascending());
100    }
101
102    let res = builder.build((0..downstream_pk.len()).collect(), downstream_pk.len());
103    Ok(res)
104}
105
106impl Distill for StreamIcebergWithPkIndexWriter {
107    fn distill<'a>(&self) -> XmlNode<'a> {
108        let column_names = self
109            .sink_desc
110            .columns
111            .iter()
112            .map(|col| col.name_with_hidden().to_string())
113            .map(Pretty::from)
114            .collect();
115        let column_names = Pretty::Array(column_names);
116        let mut vec = Vec::with_capacity(2);
117        vec.push(("columns", column_names));
118        if let Some(pk) = &self.sink_desc.downstream_pk {
119            let column_names = pk
120                .iter()
121                .map(|&idx| self.sink_desc.columns[idx].name_with_hidden().to_string())
122                .map(Pretty::from)
123                .collect();
124            let column_names = Pretty::Array(column_names);
125            vec.push(("downstream_pk", column_names));
126        }
127
128        childless_record("StreamIcebergWithPkIndexWriter", vec)
129    }
130}
131
132impl PlanTreeNodeUnary<Stream> for StreamIcebergWithPkIndexWriter {
133    fn input(&self) -> PlanRef {
134        self.input.clone()
135    }
136
137    fn clone_with_input(&self, input: PlanRef) -> Self {
138        Self {
139            base: self.base.clone(),
140            input,
141            sink_desc: self.sink_desc.clone(),
142            pk_index_table: self.pk_index_table.clone(),
143        }
144    }
145}
146
147impl_plan_tree_node_for_unary! { Stream, StreamIcebergWithPkIndexWriter }
148
149impl StreamNode for StreamIcebergWithPkIndexWriter {
150    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
151        let pk_index_table = self
152            .pk_index_table
153            .clone()
154            .with_id(state.gen_table_id_wrapped());
155
156        NodeBody::IcebergWithPkIndexWriter(Box::new(IcebergWithPkIndexWriterNode {
157            sink_desc: Some(self.sink_desc.to_proto()),
158            pk_index_table: Some(pk_index_table.to_internal_table_prost()),
159        }))
160    }
161}
162
163impl ExprRewritable<Stream> for StreamIcebergWithPkIndexWriter {}
164
165impl ExprVisitable for StreamIcebergWithPkIndexWriter {}
166
167#[cfg(test)]
168mod tests {
169    use std::collections::BTreeMap;
170
171    use risingwave_common::catalog::{
172        ColumnCatalog, ColumnDesc, ColumnId, CreateType, DEFAULT_SUPER_USER_ID, StreamJobStatus,
173    };
174    use risingwave_common::util::sort_util::ColumnOrder;
175    use risingwave_connector::sink::catalog::{SinkId, SinkType};
176
177    use super::*;
178
179    fn test_sink_desc() -> SinkDesc {
180        SinkDesc {
181            id: SinkId::placeholder(),
182            name: "s".to_owned(),
183            definition: "".to_owned(),
184            columns: vec![
185                ColumnCatalog::visible(ColumnDesc::named("id", ColumnId::new(0), DataType::Int32)),
186                ColumnCatalog::visible(ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32)),
187                ColumnCatalog::hidden(ColumnDesc::named(
188                    "_row_id",
189                    ColumnId::new(2),
190                    DataType::Serial,
191                )),
192            ],
193            plan_pk: vec![ColumnOrder::new(2, OrderType::ascending())],
194            downstream_pk: Some(vec![1]),
195            distribution_key: vec![1],
196            properties: BTreeMap::new(),
197            secret_refs: BTreeMap::new(),
198            sink_type: SinkType::Upsert,
199            ignore_delete: false,
200            format_desc: None,
201            db_name: "dev".to_owned(),
202            sink_from_name: "t".to_owned(),
203            target_table: None,
204            extra_partition_col_idx: None,
205            create_type: CreateType::Foreground,
206            is_exactly_once: None,
207            auto_refresh_schema_from_table: None,
208        }
209    }
210
211    #[test]
212    fn test_build_iceberg_pk_state_table_uses_downstream_pk_columns() {
213        let table = build_iceberg_pk_state_table(&test_sink_desc()).unwrap();
214
215        assert_eq!(table.columns()[0].name(), "v1");
216        assert_eq!(table.columns()[1].name(), "file_path");
217        assert_eq!(table.columns()[2].name(), "position");
218        assert_eq!(table.pk().len(), 1);
219        assert_eq!(table.pk()[0].column_index, 0);
220        assert_eq!(table.distribution_key(), &[0]);
221        assert_eq!(table.read_prefix_len_hint, 1);
222        assert_eq!(table.owner, DEFAULT_SUPER_USER_ID);
223        assert_eq!(table.stream_job_status, StreamJobStatus::Creating);
224    }
225
226    #[test]
227    fn test_build_iceberg_pk_state_table_with_multi_column_pk() {
228        // Simulate planner output where the derived pk spans several stream-key columns, e.g. a
229        // hidden upstream column (`order_id`) promoted to visible and carried in verbatim.
230        let mut desc = test_sink_desc();
231        desc.columns.push(ColumnCatalog::visible(ColumnDesc::named(
232            "order_id",
233            ColumnId::new(3),
234            DataType::Int64,
235        )));
236        desc.columns.push(ColumnCatalog::visible(ColumnDesc::named(
237            "shard_id",
238            ColumnId::new(4),
239            DataType::Int64,
240        )));
241        desc.downstream_pk = Some(vec![1, 3, 4]); // v1, order_id, shard_id
242
243        let table = build_iceberg_pk_state_table(&desc).unwrap();
244
245        let names: Vec<_> = table
246            .columns()
247            .iter()
248            .map(|c| c.name().to_owned())
249            .collect();
250        assert_eq!(
251            names,
252            vec!["v1", "order_id", "shard_id", "file_path", "position"]
253        );
254        assert_eq!(table.pk().len(), 3);
255        assert_eq!(table.distribution_key(), &[0, 1, 2]);
256        assert_eq!(table.read_prefix_len_hint, 3);
257    }
258
259    #[test]
260    fn test_build_iceberg_pk_state_table_with_visible_extra_only() {
261        // Simulate planner output: downstream_pk = [user_pk, visible_extra].
262        let mut desc = test_sink_desc();
263        desc.columns.push(ColumnCatalog::visible(ColumnDesc::named(
264            "order_id",
265            ColumnId::new(3),
266            DataType::Int64,
267        )));
268        desc.downstream_pk = Some(vec![1, 3]); // v1, order_id
269
270        let table = build_iceberg_pk_state_table(&desc).unwrap();
271
272        let names: Vec<_> = table
273            .columns()
274            .iter()
275            .map(|c| c.name().to_owned())
276            .collect();
277        assert_eq!(names, vec!["v1", "order_id", "file_path", "position"]);
278        assert_eq!(table.pk().len(), 2);
279        assert_eq!(table.distribution_key(), &[0, 1]);
280        assert_eq!(table.read_prefix_len_hint, 2);
281    }
282}