risingwave_stream/executor/iceberg_with_pk_index/
dv_handler_impl.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap as StdHashMap, HashSet};
16
17use anyhow::Context;
18use hashbrown::HashMap;
19use iceberg::delete_vector::DeleteVector;
20use iceberg::io::FileIO;
21use iceberg::puffin::{CompressionCodec, PuffinReader, PuffinWriter};
22use iceberg::spec::{
23    DataContentType, DataFile, DataFileBuilder, DataFileFormat, ManifestContentType, PartitionKey,
24    SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::file_writer::location_generator::{
28    DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
29};
30use risingwave_connector::sink::iceberg::{IcebergCommitResult, IcebergConfig, truncate_datafile};
31use risingwave_pb::connector_service::SinkMetadata;
32use risingwave_pb::id::{ActorId, SinkId};
33use uuid::Uuid;
34
35use super::dv_merger::DvHandler;
36use crate::executor::{StreamExecutorError, StreamExecutorResult};
37
38/// Puffin blob property for deletion vector cardinality.
39const DELETION_VECTOR_PROPERTY_CARDINALITY: &str = "cardinality";
40/// Puffin blob property for referenced data file path.
41const DELETION_VECTOR_PROPERTY_REFERENCED_DATA_FILE: &str = "referenced-data-file";
42
43#[derive(Debug, Default)]
44struct LoadedDeleteVectorEntry {
45    delete_vector: Option<DeleteVector>,
46    partition_key: Option<PartitionKey>,
47}
48
49/// Real implementation of [`DvHandler`] using the iceberg-rust crate.
50pub struct DvHandlerImpl {
51    config: IcebergConfig,
52    location_generator: DefaultLocationGenerator,
53    file_name_generator: DefaultFileNameGenerator,
54    delete_vectors: HashMap<String, DeleteVector>,
55    sink_id: SinkId,
56}
57
58impl DvHandlerImpl {
59    pub async fn new(
60        config: IcebergConfig,
61        actor_id: ActorId,
62        sink_id: SinkId,
63    ) -> StreamExecutorResult<Self> {
64        let table = config
65            .load_table()
66            .await
67            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
68        let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
69            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
70        let unique_uuid_suffix = Uuid::now_v7();
71        let file_name_generator = DefaultFileNameGenerator::new(
72            actor_id.to_string(),
73            Some(unique_uuid_suffix.to_string()),
74            DataFileFormat::Puffin,
75        );
76        Ok(Self {
77            config,
78            location_generator,
79            file_name_generator,
80            delete_vectors: HashMap::new(),
81            sink_id,
82        })
83    }
84}
85
86#[async_trait::async_trait]
87impl DvHandler for DvHandlerImpl {
88    fn write(&mut self, path: &str, pos: i64) -> StreamExecutorResult<()> {
89        let pos = pos.try_into().context("position should be non-negative")?;
90        self.delete_vectors.entry_ref(path).or_default().insert(pos);
91        Ok(())
92    }
93
94    async fn flush(&mut self) -> StreamExecutorResult<Option<SinkMetadata>> {
95        if self.delete_vectors.is_empty() {
96            return Ok(None);
97        }
98
99        let table = self
100            .config
101            .load_table()
102            .await
103            .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
104        let mut loaded_delete_vectors = load_delete_vectors(
105            &table,
106            self.sink_id,
107            self.delete_vectors.keys().cloned().collect(),
108        )
109        .await?;
110
111        let mut data_files = Vec::with_capacity(self.delete_vectors.len());
112        for (data_file_path, mut delete_vector) in self.delete_vectors.drain() {
113            let LoadedDeleteVectorEntry {
114                delete_vector: existing_delete_vector,
115                partition_key,
116            } = loaded_delete_vectors
117                .remove(&data_file_path)
118                .unwrap_or_default();
119
120            if let Some(existing) = existing_delete_vector {
121                delete_vector |= existing;
122            }
123
124            let file_name = self.file_name_generator.generate_file_name();
125            let location = self
126                .location_generator
127                .generate_location(partition_key.as_ref(), &file_name);
128            let output_file = table
129                .file_io()
130                .new_output(&location)
131                .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
132            let mut writer = PuffinWriter::new(&output_file, StdHashMap::new(), false)
133                .await
134                .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
135
136            let cardinality = delete_vector.len();
137            let properties = StdHashMap::from([
138                (
139                    DELETION_VECTOR_PROPERTY_CARDINALITY.to_owned(),
140                    cardinality.to_string(),
141                ),
142                (
143                    DELETION_VECTOR_PROPERTY_REFERENCED_DATA_FILE.to_owned(),
144                    data_file_path.clone(),
145                ),
146            ]);
147            let blob = delete_vector
148                .to_puffin_blob(properties)
149                .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
150            writer
151                .add(blob, CompressionCodec::None)
152                .await
153                .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
154
155            let result = writer
156                .close_with_metadata()
157                .await
158                .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
159            let blob_metadata = result
160                .blobs_metadata
161                .first()
162                .context("blob metadata should be present")?;
163
164            let mut builder = DataFileBuilder::default();
165            builder
166                .content(DataContentType::PositionDeletes)
167                .file_path(location)
168                .file_format(DataFileFormat::Puffin)
169                .record_count(cardinality)
170                .file_size_in_bytes(result.file_size_in_bytes)
171                .referenced_data_file(Some(data_file_path))
172                .content_offset(Some(blob_metadata.offset() as i64))
173                .content_size_in_bytes(Some(blob_metadata.length() as i64));
174            if let Some(partition_key) = partition_key {
175                builder
176                    .partition(partition_key.data().clone())
177                    .partition_spec_id(partition_key.spec().spec_id());
178            }
179            let data_file = builder.build().map_err(|err| {
180                StreamExecutorError::sink_error(
181                    anyhow::anyhow!(err).context("Failed to build deletion vector file metadata"),
182                    self.sink_id,
183                )
184            })?;
185
186            data_files.push(data_file);
187        }
188
189        if data_files.is_empty() {
190            return Ok(None);
191        }
192
193        let format_version = table.metadata().format_version();
194        let partition_type = table.metadata().default_partition_type();
195        let data_files = data_files
196            .into_iter()
197            .map(|f| {
198                // Truncate large column statistics BEFORE serialization
199                let truncated = truncate_datafile(f);
200                SerializedDataFile::try_from(truncated, partition_type, format_version)
201                    .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))
202            })
203            .collect::<StreamExecutorResult<Vec<_>>>()?;
204        let sink_metadata = SinkMetadata::try_from(&IcebergCommitResult {
205            data_files,
206            schema_id: table.metadata().current_schema_id(),
207            partition_spec_id: table.metadata().default_partition_spec_id(),
208        })
209        .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
210        Ok(Some(sink_metadata))
211    }
212}
213
214/// Loads existing delete vectors for the given data file paths by scanning the
215/// table's current snapshot. Returns a map from data file path to the loaded
216/// delete vector and partition key (if partitioned).
217///
218/// The lookup proceeds in two passes:
219/// 1. Scan delete manifests for entries matching the given data file paths. If found,
220///    load the DV (and partition key, when partitioned) from the delete entry. This
221///    reflects the latest DV state for the data file as of the current snapshot.
222/// 2. For partitioned tables, scan data manifests for any data files still missing a
223///    partition key after pass 1 (data files that have not yet accumulated any DV).
224async fn load_delete_vectors(
225    table: &Table,
226    sink_id: SinkId,
227    mut paths: HashSet<String>,
228) -> StreamExecutorResult<HashMap<String, LoadedDeleteVectorEntry>> {
229    let mut result: HashMap<String, LoadedDeleteVectorEntry> = HashMap::with_capacity(paths.len());
230
231    let Some(snapshot) = table.metadata().current_snapshot() else {
232        return Ok(HashMap::new());
233    };
234
235    let partitioned = !table.metadata().default_partition_spec().is_unpartitioned();
236    let manifest_list = table
237        .object_cache()
238        .get_manifest_list(snapshot, &table.metadata_ref())
239        .await
240        .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
241
242    'deletes_outer: for manifest_file in manifest_list.entries() {
243        if manifest_file.content != ManifestContentType::Deletes {
244            continue;
245        }
246
247        let manifest = manifest_file
248            .load_manifest(table.file_io())
249            .await
250            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
251
252        for entry in manifest.entries() {
253            if !entry.is_alive()
254                || entry.content_type() != DataContentType::PositionDeletes
255                || entry.file_format() != DataFileFormat::Puffin
256            {
257                continue;
258            }
259
260            let data_file = entry.data_file();
261            let Some(referenced_data_file) = data_file.referenced_data_file() else {
262                continue;
263            };
264            if !paths.remove(&referenced_data_file) {
265                continue;
266            }
267
268            let delete_vector =
269                read_dv_positions_from_data_file(table.file_io(), data_file, sink_id).await?;
270            let partition_key = if partitioned {
271                Some(build_partition_key_from_data_file(
272                    table, data_file, sink_id,
273                )?)
274            } else {
275                None
276            };
277            result.insert(
278                referenced_data_file,
279                LoadedDeleteVectorEntry {
280                    delete_vector: Some(delete_vector),
281                    partition_key,
282                },
283            );
284
285            if paths.is_empty() {
286                break 'deletes_outer;
287            }
288        }
289    }
290
291    if paths.is_empty() || !partitioned {
292        return Ok(result);
293    }
294
295    'data_outer: for manifest_file in manifest_list.entries() {
296        if manifest_file.content != ManifestContentType::Data {
297            continue;
298        }
299
300        let manifest = manifest_file
301            .load_manifest(table.file_io())
302            .await
303            .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
304
305        for entry in manifest.entries() {
306            if !entry.is_alive() || entry.content_type() != DataContentType::Data {
307                continue;
308            }
309
310            let data_file = entry.data_file();
311            if !paths.remove(data_file.file_path()) {
312                continue;
313            }
314
315            let partition_key = build_partition_key_from_data_file(table, data_file, sink_id)?;
316            result.insert(
317                data_file.file_path().to_owned(),
318                LoadedDeleteVectorEntry {
319                    delete_vector: None,
320                    partition_key: Some(partition_key),
321                },
322            );
323
324            if paths.is_empty() {
325                break 'data_outer;
326            }
327        }
328    }
329
330    Ok(result)
331}
332
333async fn read_dv_positions_from_data_file(
334    file_io: &FileIO,
335    data_file: &DataFile,
336    sink_id: SinkId,
337) -> StreamExecutorResult<DeleteVector> {
338    let blob_offset = data_file.content_offset().with_context(|| {
339        format!(
340            "DV file {} missing content_offset for referenced data file {:?}",
341            data_file.file_path(),
342            data_file.referenced_data_file()
343        )
344    })?;
345    let blob_length = data_file.content_size_in_bytes().with_context(|| {
346        format!(
347            "DV file {} missing content_size_in_bytes for referenced data file {:?}",
348            data_file.file_path(),
349            data_file.referenced_data_file()
350        )
351    })?;
352
353    let input_file = file_io
354        .new_input(data_file.file_path())
355        .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
356    let puffin_reader = PuffinReader::new(input_file);
357    let file_metadata = puffin_reader
358        .file_metadata()
359        .await
360        .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
361    let blob_metadata = file_metadata
362        .blobs()
363        .iter()
364        .find(|blob| blob.offset() == blob_offset as u64 && blob.length() == blob_length as u64)
365        .with_context(|| {
366            format!(
367                "DV blob metadata not found in {} at offset={} length={}",
368                data_file.file_path(),
369                blob_offset,
370                blob_length
371            )
372        })?;
373    let blob = puffin_reader
374        .blob(blob_metadata)
375        .await
376        .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
377
378    let delete_vector = DeleteVector::from_puffin_blob(blob)
379        .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
380    Ok(delete_vector)
381}
382
383fn build_partition_key_from_data_file(
384    table: &Table,
385    data_file: &DataFile,
386    sink_id: SinkId,
387) -> StreamExecutorResult<PartitionKey> {
388    let partition_spec_id = data_file.partition_spec_id();
389    let partition_spec = table
390        .metadata()
391        .partition_spec_by_id(partition_spec_id)
392        .with_context(|| {
393            format!(
394                "failed to find partition spec {} for manifest file {}",
395                partition_spec_id,
396                data_file.file_path()
397            )
398        })
399        .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
400
401    Ok(PartitionKey::new(
402        partition_spec.as_ref().clone(),
403        table.metadata().current_schema().clone(),
404        data_file.partition().clone(),
405    ))
406}