risingwave_frontend/optimizer/plan_node/
stream_iceberg_with_pk_index_writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_connector::sink::catalog::desc::SinkDesc;
21use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
22use risingwave_pb::stream_plan::stream_node::NodeBody;
23
24use super::stream::prelude::*;
25use crate::TableCatalog;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::plan_node::utils::{
28    Distill, IndicesDisplay, TableCatalogBuilder, childless_record,
29};
30use crate::optimizer::plan_node::{
31    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
32};
33use crate::optimizer::property::{
34    Distribution, FunctionalDependencySet, MonotonicityMap, WatermarkColumns,
35};
36use crate::stream_fragmenter::BuildFragmentGraphState;
37
38/// `StreamIcebergWithPkIndexWriter` is the stateful writer executor for the Iceberg
39/// with pk index sink. It maintains a PK index and writes data files to Iceberg.
40///
41/// Output schema: `[file_path: Varchar, position: Int64]` — delete-position info for
42/// the downstream `DvMerger`.
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub struct StreamIcebergWithPkIndexWriter {
45    pub base: PlanBase<Stream>,
46    pub input: PlanRef,
47    pub sink_desc: SinkDesc,
48    pub pk_index_table: TableCatalog,
49}
50
51impl StreamIcebergWithPkIndexWriter {
52    pub fn from_stream_sink(sink: &super::StreamSink) -> Result<Self> {
53        let output_schema = output_schema();
54        let fd_set = FunctionalDependencySet::new(output_schema.len());
55        let dist = match sink.distribution() {
56            Distribution::Single => Distribution::Single,
57            _ => Distribution::SomeShard,
58        };
59        let base = PlanBase::new_stream(
60            sink.ctx(),
61            output_schema,
62            sink.stream_key().map(|v| v.to_vec()),
63            fd_set,
64            dist,
65            StreamKind::AppendOnly,
66            sink.emit_on_window_close(),
67            WatermarkColumns::new(),
68            MonotonicityMap::new(),
69        );
70        let pk_index_table = build_iceberg_pk_state_table(sink.sink_desc())?;
71        Ok(Self {
72            base,
73            input: sink.input(),
74            sink_desc: sink.sink_desc().clone(),
75            pk_index_table,
76        })
77    }
78}
79
80fn output_schema() -> risingwave_common::catalog::Schema {
81    risingwave_common::catalog::Schema::new(vec![
82        Field::with_name(DataType::Varchar, "file_path"),
83        Field::with_name(DataType::Int64, "position"),
84    ])
85}
86
87fn build_iceberg_pk_state_table(sink_desc: &SinkDesc) -> Result<TableCatalog> {
88    let mut builder = TableCatalogBuilder::default();
89
90    let downstream_pk = sink_desc
91        .downstream_pk
92        .as_ref()
93        .context("Missing downstream PK in Iceberg sink desc")?;
94    for &idx in downstream_pk {
95        builder.add_column(&Field::from(&sink_desc.columns[idx].column_desc));
96    }
97    builder.add_column(&Field::with_name(DataType::Varchar, "file_path"));
98    builder.add_column(&Field::with_name(DataType::Int64, "position"));
99
100    for idx in 0..downstream_pk.len() {
101        builder.add_order_column(idx, OrderType::ascending());
102    }
103
104    let res = builder.build((0..downstream_pk.len()).collect(), downstream_pk.len());
105    Ok(res)
106}
107
108impl Distill for StreamIcebergWithPkIndexWriter {
109    fn distill<'a>(&self) -> XmlNode<'a> {
110        let column_names = self
111            .sink_desc
112            .columns
113            .iter()
114            .map(|col| col.name_with_hidden().to_string())
115            .map(Pretty::from)
116            .collect();
117        let column_names = Pretty::Array(column_names);
118        let mut vec = Vec::with_capacity(2);
119        vec.push(("columns", column_names));
120        if let Some(pk) = &self.sink_desc.downstream_pk {
121            let sink_pk = IndicesDisplay {
122                indices: pk,
123                schema: self.input.schema(),
124            };
125            vec.push(("downstream_pk", sink_pk.distill()));
126        }
127
128        childless_record("StreamIcebergWithPkIndexWriter", vec)
129    }
130}
131
132impl PlanTreeNodeUnary<Stream> for StreamIcebergWithPkIndexWriter {
133    fn input(&self) -> PlanRef {
134        self.input.clone()
135    }
136
137    fn clone_with_input(&self, input: PlanRef) -> Self {
138        Self {
139            base: self.base.clone(),
140            input,
141            sink_desc: self.sink_desc.clone(),
142            pk_index_table: self.pk_index_table.clone(),
143        }
144    }
145}
146
147impl_plan_tree_node_for_unary! { Stream, StreamIcebergWithPkIndexWriter }
148
149impl StreamNode for StreamIcebergWithPkIndexWriter {
150    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
151        let pk_index_table = self
152            .pk_index_table
153            .clone()
154            .with_id(state.gen_table_id_wrapped());
155
156        NodeBody::IcebergWithPkIndexWriter(Box::new(IcebergWithPkIndexWriterNode {
157            sink_desc: Some(self.sink_desc.to_proto()),
158            pk_index_table: Some(pk_index_table.to_internal_table_prost()),
159        }))
160    }
161}
162
163impl ExprRewritable<Stream> for StreamIcebergWithPkIndexWriter {}
164
165impl ExprVisitable for StreamIcebergWithPkIndexWriter {}
166
167#[cfg(test)]
168mod tests {
169    use std::collections::BTreeMap;
170
171    use risingwave_common::catalog::{
172        ColumnCatalog, ColumnDesc, ColumnId, CreateType, DEFAULT_SUPER_USER_ID, StreamJobStatus,
173    };
174    use risingwave_common::util::sort_util::ColumnOrder;
175    use risingwave_connector::sink::catalog::{SinkId, SinkType};
176
177    use super::*;
178
179    fn test_sink_desc() -> SinkDesc {
180        SinkDesc {
181            id: SinkId::placeholder(),
182            name: "s".to_owned(),
183            definition: "".to_owned(),
184            columns: vec![
185                ColumnCatalog::visible(ColumnDesc::named("id", ColumnId::new(0), DataType::Int32)),
186                ColumnCatalog::visible(ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32)),
187                ColumnCatalog::hidden(ColumnDesc::named(
188                    "_row_id",
189                    ColumnId::new(2),
190                    DataType::Serial,
191                )),
192            ],
193            plan_pk: vec![ColumnOrder::new(2, OrderType::ascending())],
194            downstream_pk: Some(vec![1]),
195            distribution_key: vec![1],
196            properties: BTreeMap::new(),
197            secret_refs: BTreeMap::new(),
198            sink_type: SinkType::Upsert,
199            ignore_delete: false,
200            format_desc: None,
201            db_name: "dev".to_owned(),
202            sink_from_name: "t".to_owned(),
203            target_table: None,
204            extra_partition_col_idx: None,
205            create_type: CreateType::Foreground,
206            is_exactly_once: None,
207            auto_refresh_schema_from_table: None,
208        }
209    }
210
211    #[test]
212    fn test_build_iceberg_pk_state_table_uses_downstream_pk_columns() {
213        let table = build_iceberg_pk_state_table(&test_sink_desc()).unwrap();
214
215        assert_eq!(table.columns()[0].name(), "v1");
216        assert_eq!(table.columns()[1].name(), "file_path");
217        assert_eq!(table.columns()[2].name(), "position");
218        assert_eq!(table.pk().len(), 1);
219        assert_eq!(table.pk()[0].column_index, 0);
220        assert_eq!(table.distribution_key(), &[0]);
221        assert_eq!(table.read_prefix_len_hint, 1);
222        assert_eq!(table.owner, DEFAULT_SUPER_USER_ID);
223        assert_eq!(table.stream_job_status, StreamJobStatus::Creating);
224    }
225}