risingwave_frontend/optimizer/plan_node/
stream_iceberg_with_pk_index_writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_connector::sink::catalog::desc::SinkDesc;
20use risingwave_pb::stream_plan::IcebergWithPkIndexWriterNode;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22
23use super::stream::prelude::*;
24use crate::TableCatalog;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::utils::{
27    Distill, IndicesDisplay, TableCatalogBuilder, childless_record,
28};
29use crate::optimizer::plan_node::{
30    ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, StreamPlanRef as PlanRef,
31};
32use crate::optimizer::property::{
33    Distribution, FunctionalDependencySet, MonotonicityMap, WatermarkColumns,
34};
35use crate::stream_fragmenter::BuildFragmentGraphState;
36
37/// `StreamIcebergWithPkIndexWriter` is the stateful writer executor for the Iceberg
38/// with pk index sink. It maintains a PK index and writes data files to Iceberg.
39///
40/// Output schema: `[file_path: Varchar, position: Int64]` — delete-position info for
41/// the downstream `DvMerger`.
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub struct StreamIcebergWithPkIndexWriter {
44    pub base: PlanBase<Stream>,
45    pub input: PlanRef,
46    pub sink_desc: SinkDesc,
47    pub pk_index_table: TableCatalog,
48}
49
50impl StreamIcebergWithPkIndexWriter {
51    pub fn from_stream_sink(sink: &super::StreamSink) -> Result<Self> {
52        let output_schema = output_schema();
53        let fd_set = FunctionalDependencySet::new(output_schema.len());
54        let dist = match sink.distribution() {
55            Distribution::Single => Distribution::Single,
56            _ => Distribution::SomeShard,
57        };
58        let base = PlanBase::new_stream(
59            sink.ctx(),
60            output_schema,
61            Some(vec![]),
62            fd_set,
63            dist,
64            StreamKind::AppendOnly,
65            sink.emit_on_window_close(),
66            WatermarkColumns::new(),
67            MonotonicityMap::new(),
68        );
69        let pk_index_table = build_iceberg_pk_state_table(sink.sink_desc())?;
70        Ok(Self {
71            base,
72            input: sink.input(),
73            sink_desc: sink.sink_desc().clone(),
74            pk_index_table,
75        })
76    }
77}
78
79fn output_schema() -> risingwave_common::catalog::Schema {
80    risingwave_common::catalog::Schema::new(vec![
81        Field::with_name(DataType::Varchar, "file_path"),
82        Field::with_name(DataType::Int64, "position"),
83    ])
84}
85
86fn build_iceberg_pk_state_table(sink_desc: &SinkDesc) -> Result<TableCatalog> {
87    let mut builder = TableCatalogBuilder::default();
88
89    let downstream_pk = sink_desc
90        .downstream_pk
91        .as_ref()
92        .context("Missing downstream PK in Iceberg sink desc")?;
93    for &idx in downstream_pk {
94        let order = &sink_desc.plan_pk[idx];
95        builder.add_column(&Field::from(
96            &sink_desc.columns[order.column_index].column_desc,
97        ));
98    }
99    builder.add_column(&Field::with_name(DataType::Varchar, "file_path"));
100    builder.add_column(&Field::with_name(DataType::Int64, "position"));
101
102    for (idx, order) in sink_desc.plan_pk.iter().enumerate() {
103        builder.add_order_column(idx, order.order_type);
104    }
105
106    let res = builder.build(
107        (0..sink_desc.plan_pk.len()).collect(),
108        sink_desc.plan_pk.len(),
109    );
110    Ok(res)
111}
112
113impl Distill for StreamIcebergWithPkIndexWriter {
114    fn distill<'a>(&self) -> XmlNode<'a> {
115        let column_names = self
116            .sink_desc
117            .columns
118            .iter()
119            .map(|col| col.name_with_hidden().to_string())
120            .map(Pretty::from)
121            .collect();
122        let column_names = Pretty::Array(column_names);
123        let mut vec = Vec::with_capacity(2);
124        vec.push(("columns", column_names));
125        if let Some(pk) = &self.sink_desc.downstream_pk {
126            let sink_pk = IndicesDisplay {
127                indices: pk,
128                schema: self.input.schema(),
129            };
130            vec.push(("downstream_pk", sink_pk.distill()));
131        }
132
133        childless_record("StreamIcebergWithPkIndexWriter", vec)
134    }
135}
136
137impl PlanTreeNodeUnary<Stream> for StreamIcebergWithPkIndexWriter {
138    fn input(&self) -> PlanRef {
139        self.input.clone()
140    }
141
142    fn clone_with_input(&self, input: PlanRef) -> Self {
143        Self {
144            base: self.base.clone(),
145            input,
146            sink_desc: self.sink_desc.clone(),
147            pk_index_table: self.pk_index_table.clone(),
148        }
149    }
150}
151
152impl_plan_tree_node_for_unary! { Stream, StreamIcebergWithPkIndexWriter }
153
154impl StreamNode for StreamIcebergWithPkIndexWriter {
155    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody {
156        let pk_index_table = self
157            .pk_index_table
158            .clone()
159            .with_id(state.gen_table_id_wrapped());
160
161        NodeBody::IcebergWithPkIndexWriter(Box::new(IcebergWithPkIndexWriterNode {
162            sink_desc: Some(self.sink_desc.to_proto()),
163            pk_index_table: Some(pk_index_table.to_internal_table_prost()),
164        }))
165    }
166}
167
168impl ExprRewritable<Stream> for StreamIcebergWithPkIndexWriter {}
169
170impl ExprVisitable for StreamIcebergWithPkIndexWriter {}