risingwave_stream/executor/iceberg_with_pk_index/
dv_handler_impl.rs1use std::collections::{HashMap as StdHashMap, HashSet};
16
17use anyhow::Context;
18use hashbrown::HashMap;
19use iceberg::delete_vector::DeleteVector;
20use iceberg::io::FileIO;
21use iceberg::puffin::{CompressionCodec, PuffinReader, PuffinWriter};
22use iceberg::spec::{
23 DataContentType, DataFile, DataFileBuilder, DataFileFormat, ManifestContentType, PartitionKey,
24 SerializedDataFile,
25};
26use iceberg::table::Table;
27use iceberg::writer::file_writer::location_generator::{
28 DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, LocationGenerator,
29};
30use risingwave_connector::sink::iceberg::{IcebergCommitResult, IcebergConfig, truncate_datafile};
31use risingwave_pb::connector_service::SinkMetadata;
32use risingwave_pb::id::{ActorId, SinkId};
33use uuid::Uuid;
34
35use super::dv_merger::DvHandler;
36use crate::executor::{StreamExecutorError, StreamExecutorResult};
37
38const DELETION_VECTOR_PROPERTY_CARDINALITY: &str = "cardinality";
40const DELETION_VECTOR_PROPERTY_REFERENCED_DATA_FILE: &str = "referenced-data-file";
42
43#[derive(Debug, Default)]
44struct LoadedDeleteVectorEntry {
45 delete_vector: Option<DeleteVector>,
46 partition_key: Option<PartitionKey>,
47}
48
49pub struct DvHandlerImpl {
51 config: IcebergConfig,
52 location_generator: DefaultLocationGenerator,
53 file_name_generator: DefaultFileNameGenerator,
54 delete_vectors: HashMap<String, DeleteVector>,
55 sink_id: SinkId,
56}
57
58impl DvHandlerImpl {
59 pub async fn new(
60 config: IcebergConfig,
61 actor_id: ActorId,
62 sink_id: SinkId,
63 ) -> StreamExecutorResult<Self> {
64 let table = config
65 .load_table()
66 .await
67 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
68 let location_generator = DefaultLocationGenerator::new(table.metadata().clone())
69 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
70 let unique_uuid_suffix = Uuid::now_v7();
71 let file_name_generator = DefaultFileNameGenerator::new(
72 actor_id.to_string(),
73 Some(unique_uuid_suffix.to_string()),
74 DataFileFormat::Puffin,
75 );
76 Ok(Self {
77 config,
78 location_generator,
79 file_name_generator,
80 delete_vectors: HashMap::new(),
81 sink_id,
82 })
83 }
84}
85
86#[async_trait::async_trait]
87impl DvHandler for DvHandlerImpl {
88 fn write(&mut self, path: &str, pos: i64) -> StreamExecutorResult<()> {
89 let pos = pos.try_into().context("position should be non-negative")?;
90 self.delete_vectors.entry_ref(path).or_default().insert(pos);
91 Ok(())
92 }
93
94 async fn flush(&mut self) -> StreamExecutorResult<Option<SinkMetadata>> {
95 if self.delete_vectors.is_empty() {
96 return Ok(None);
97 }
98
99 let table = self
100 .config
101 .load_table()
102 .await
103 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
104 let mut loaded_delete_vectors = load_delete_vectors(
105 &table,
106 self.sink_id,
107 self.delete_vectors.keys().cloned().collect(),
108 )
109 .await?;
110
111 let mut data_files = Vec::with_capacity(self.delete_vectors.len());
112 for (data_file_path, mut delete_vector) in self.delete_vectors.drain() {
113 let LoadedDeleteVectorEntry {
114 delete_vector: existing_delete_vector,
115 partition_key,
116 } = loaded_delete_vectors
117 .remove(&data_file_path)
118 .unwrap_or_default();
119
120 if let Some(existing) = existing_delete_vector {
121 delete_vector |= existing;
122 }
123
124 let file_name = self.file_name_generator.generate_file_name();
125 let location = self
126 .location_generator
127 .generate_location(partition_key.as_ref(), &file_name);
128 let output_file = table
129 .file_io()
130 .new_output(&location)
131 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
132 let mut writer = PuffinWriter::new(&output_file, StdHashMap::new(), false)
133 .await
134 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
135
136 let cardinality = delete_vector.len();
137 let properties = StdHashMap::from([
138 (
139 DELETION_VECTOR_PROPERTY_CARDINALITY.to_owned(),
140 cardinality.to_string(),
141 ),
142 (
143 DELETION_VECTOR_PROPERTY_REFERENCED_DATA_FILE.to_owned(),
144 data_file_path.clone(),
145 ),
146 ]);
147 let blob = delete_vector
148 .to_puffin_blob(properties)
149 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
150 writer
151 .add(blob, CompressionCodec::None)
152 .await
153 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
154
155 let result = writer
156 .close_with_metadata()
157 .await
158 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
159 let blob_metadata = result
160 .blobs_metadata
161 .first()
162 .context("blob metadata should be present")?;
163
164 let mut builder = DataFileBuilder::default();
165 builder
166 .content(DataContentType::PositionDeletes)
167 .file_path(location)
168 .file_format(DataFileFormat::Puffin)
169 .record_count(cardinality)
170 .file_size_in_bytes(result.file_size_in_bytes)
171 .referenced_data_file(Some(data_file_path))
172 .content_offset(Some(blob_metadata.offset() as i64))
173 .content_size_in_bytes(Some(blob_metadata.length() as i64));
174 if let Some(partition_key) = partition_key {
175 builder
176 .partition(partition_key.data().clone())
177 .partition_spec_id(partition_key.spec().spec_id());
178 }
179 let data_file = builder.build().map_err(|err| {
180 StreamExecutorError::sink_error(
181 anyhow::anyhow!(err).context("Failed to build deletion vector file metadata"),
182 self.sink_id,
183 )
184 })?;
185
186 data_files.push(data_file);
187 }
188
189 if data_files.is_empty() {
190 return Ok(None);
191 }
192
193 let format_version = table.metadata().format_version();
194 let partition_type = table.metadata().default_partition_type();
195 let data_files = data_files
196 .into_iter()
197 .map(|f| {
198 let truncated = truncate_datafile(f);
200 SerializedDataFile::try_from(truncated, partition_type, format_version)
201 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))
202 })
203 .collect::<StreamExecutorResult<Vec<_>>>()?;
204 let sink_metadata = SinkMetadata::try_from(&IcebergCommitResult {
205 data_files,
206 schema_id: table.metadata().current_schema_id(),
207 partition_spec_id: table.metadata().default_partition_spec_id(),
208 })
209 .map_err(|e| StreamExecutorError::sink_error(e, self.sink_id))?;
210 Ok(Some(sink_metadata))
211 }
212}
213
214async fn load_delete_vectors(
225 table: &Table,
226 sink_id: SinkId,
227 mut paths: HashSet<String>,
228) -> StreamExecutorResult<HashMap<String, LoadedDeleteVectorEntry>> {
229 let mut result: HashMap<String, LoadedDeleteVectorEntry> = HashMap::with_capacity(paths.len());
230
231 let Some(snapshot) = table.metadata().current_snapshot() else {
232 return Ok(HashMap::new());
233 };
234
235 let partitioned = !table.metadata().default_partition_spec().is_unpartitioned();
236 let manifest_list = table
237 .object_cache()
238 .get_manifest_list(snapshot, &table.metadata_ref())
239 .await
240 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
241
242 'deletes_outer: for manifest_file in manifest_list.entries() {
243 if manifest_file.content != ManifestContentType::Deletes {
244 continue;
245 }
246
247 let manifest = manifest_file
248 .load_manifest(table.file_io())
249 .await
250 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
251
252 for entry in manifest.entries() {
253 if !entry.is_alive()
254 || entry.content_type() != DataContentType::PositionDeletes
255 || entry.file_format() != DataFileFormat::Puffin
256 {
257 continue;
258 }
259
260 let data_file = entry.data_file();
261 let Some(referenced_data_file) = data_file.referenced_data_file() else {
262 continue;
263 };
264 if !paths.remove(&referenced_data_file) {
265 continue;
266 }
267
268 let delete_vector =
269 read_dv_positions_from_data_file(table.file_io(), data_file, sink_id).await?;
270 let partition_key = if partitioned {
271 Some(build_partition_key_from_data_file(
272 table, data_file, sink_id,
273 )?)
274 } else {
275 None
276 };
277 result.insert(
278 referenced_data_file,
279 LoadedDeleteVectorEntry {
280 delete_vector: Some(delete_vector),
281 partition_key,
282 },
283 );
284
285 if paths.is_empty() {
286 break 'deletes_outer;
287 }
288 }
289 }
290
291 if paths.is_empty() || !partitioned {
292 return Ok(result);
293 }
294
295 'data_outer: for manifest_file in manifest_list.entries() {
296 if manifest_file.content != ManifestContentType::Data {
297 continue;
298 }
299
300 let manifest = manifest_file
301 .load_manifest(table.file_io())
302 .await
303 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
304
305 for entry in manifest.entries() {
306 if !entry.is_alive() || entry.content_type() != DataContentType::Data {
307 continue;
308 }
309
310 let data_file = entry.data_file();
311 if !paths.remove(data_file.file_path()) {
312 continue;
313 }
314
315 let partition_key = build_partition_key_from_data_file(table, data_file, sink_id)?;
316 result.insert(
317 data_file.file_path().to_owned(),
318 LoadedDeleteVectorEntry {
319 delete_vector: None,
320 partition_key: Some(partition_key),
321 },
322 );
323
324 if paths.is_empty() {
325 break 'data_outer;
326 }
327 }
328 }
329
330 Ok(result)
331}
332
333async fn read_dv_positions_from_data_file(
334 file_io: &FileIO,
335 data_file: &DataFile,
336 sink_id: SinkId,
337) -> StreamExecutorResult<DeleteVector> {
338 let blob_offset = data_file.content_offset().with_context(|| {
339 format!(
340 "DV file {} missing content_offset for referenced data file {:?}",
341 data_file.file_path(),
342 data_file.referenced_data_file()
343 )
344 })?;
345 let blob_length = data_file.content_size_in_bytes().with_context(|| {
346 format!(
347 "DV file {} missing content_size_in_bytes for referenced data file {:?}",
348 data_file.file_path(),
349 data_file.referenced_data_file()
350 )
351 })?;
352
353 let input_file = file_io
354 .new_input(data_file.file_path())
355 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
356 let puffin_reader = PuffinReader::new(input_file);
357 let file_metadata = puffin_reader
358 .file_metadata()
359 .await
360 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
361 let blob_metadata = file_metadata
362 .blobs()
363 .iter()
364 .find(|blob| blob.offset() == blob_offset as u64 && blob.length() == blob_length as u64)
365 .with_context(|| {
366 format!(
367 "DV blob metadata not found in {} at offset={} length={}",
368 data_file.file_path(),
369 blob_offset,
370 blob_length
371 )
372 })?;
373 let blob = puffin_reader
374 .blob(blob_metadata)
375 .await
376 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
377
378 let delete_vector = DeleteVector::from_puffin_blob(blob)
379 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
380 Ok(delete_vector)
381}
382
383fn build_partition_key_from_data_file(
384 table: &Table,
385 data_file: &DataFile,
386 sink_id: SinkId,
387) -> StreamExecutorResult<PartitionKey> {
388 let partition_spec_id = data_file.partition_spec_id();
389 let partition_spec = table
390 .metadata()
391 .partition_spec_by_id(partition_spec_id)
392 .with_context(|| {
393 format!(
394 "failed to find partition spec {} for manifest file {}",
395 partition_spec_id,
396 data_file.file_path()
397 )
398 })
399 .map_err(|e| StreamExecutorError::sink_error(e, sink_id))?;
400
401 Ok(PartitionKey::new(
402 partition_spec.as_ref().clone(),
403 table.metadata().current_schema().clone(),
404 data_file.partition().clone(),
405 ))
406}