1use std::sync::Arc;
16
17use bytes::Bytes;
18use fail::fail_point;
19use foyer::HybridCacheProperties;
20use risingwave_hummock_sdk::HummockSstableObjectId;
21use risingwave_object_store::object::ObjectStreamingUploader;
22use tokio::task::JoinHandle;
23use zstd::zstd_safe::WriteBuf;
24
25use super::multi_builder::UploadJoinHandle;
26use super::{Block, BlockMeta};
27use crate::hummock::utils::MemoryTracker;
28use crate::hummock::{
29 CachePolicy, HummockResult, SstableBlockIndex, SstableBuilderOptions, SstableMeta,
30 SstableStore, SstableStoreRef,
31};
32
33#[async_trait::async_trait]
35pub trait SstableWriter: Send {
36 type Output;
37
38 async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()>;
40
41 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()>;
42
43 async fn finish(self, meta: SstableMeta) -> HummockResult<Self::Output>;
45
46 fn data_len(&self) -> usize;
48}
49
50pub struct InMemWriter {
52 buf: Vec<u8>,
53}
54
55impl InMemWriter {
56 pub fn new(capacity: usize) -> Self {
57 Self {
58 buf: Vec::with_capacity(capacity),
59 }
60 }
61}
62
63impl From<&SstableBuilderOptions> for InMemWriter {
64 fn from(options: &SstableBuilderOptions) -> Self {
65 Self::new(options.capacity + options.block_capacity)
66 }
67}
68
69#[async_trait::async_trait]
70impl SstableWriter for InMemWriter {
71 type Output = (Bytes, SstableMeta);
72
73 async fn write_block(&mut self, block: &[u8], _meta: &BlockMeta) -> HummockResult<()> {
74 self.buf.extend_from_slice(block);
75 Ok(())
76 }
77
78 async fn write_block_bytes(&mut self, block: Bytes, _meta: &BlockMeta) -> HummockResult<()> {
79 self.buf.extend_from_slice(&block);
80 Ok(())
81 }
82
83 async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
84 meta.encode_to(&mut self.buf);
85 Ok((Bytes::from(self.buf), meta))
86 }
87
88 fn data_len(&self) -> usize {
89 self.buf.len()
90 }
91}
92
93pub struct SstableWriterOptions {
94 pub capacity_hint: Option<usize>,
96 pub tracker: Option<MemoryTracker>,
97 pub policy: CachePolicy,
98}
99
100impl Default for SstableWriterOptions {
101 fn default() -> Self {
102 Self {
103 capacity_hint: None,
104 tracker: None,
105 policy: CachePolicy::NotFill,
106 }
107 }
108}
109#[async_trait::async_trait]
110pub trait SstableWriterFactory: Send {
111 type Writer: SstableWriter<Output = UploadJoinHandle>;
112
113 async fn create_sst_writer(
114 &mut self,
115 object_id: impl Into<HummockSstableObjectId> + Send,
116 options: SstableWriterOptions,
117 ) -> HummockResult<Self::Writer>;
118}
119
120pub struct BatchSstableWriterFactory {
121 sstable_store: SstableStoreRef,
122}
123
124impl BatchSstableWriterFactory {
125 pub fn new(sstable_store: SstableStoreRef) -> Self {
126 BatchSstableWriterFactory { sstable_store }
127 }
128}
129
130#[async_trait::async_trait]
131impl SstableWriterFactory for BatchSstableWriterFactory {
132 type Writer = BatchUploadWriter;
133
134 async fn create_sst_writer(
135 &mut self,
136 object_id: impl Into<HummockSstableObjectId> + Send,
137 options: SstableWriterOptions,
138 ) -> HummockResult<Self::Writer> {
139 Ok(BatchUploadWriter::new(
140 object_id,
141 self.sstable_store.clone(),
142 options,
143 ))
144 }
145}
146
147pub struct BatchUploadWriter {
150 object_id: HummockSstableObjectId,
151 sstable_store: SstableStoreRef,
152 policy: CachePolicy,
153 buf: Vec<u8>,
154 block_info: Vec<Block>,
155 tracker: Option<MemoryTracker>,
156}
157
158impl BatchUploadWriter {
159 pub fn new(
160 object_id: impl Into<HummockSstableObjectId>,
161 sstable_store: Arc<SstableStore>,
162 options: SstableWriterOptions,
163 ) -> Self {
164 Self {
165 object_id: object_id.into(),
166 sstable_store,
167 policy: options.policy,
168 buf: Vec::with_capacity(options.capacity_hint.unwrap_or(0)),
169 block_info: Vec::new(),
170 tracker: options.tracker,
171 }
172 }
173}
174
175#[async_trait::async_trait]
176impl SstableWriter for BatchUploadWriter {
177 type Output = JoinHandle<HummockResult<()>>;
178
179 async fn write_block(&mut self, block: &[u8], meta: &BlockMeta) -> HummockResult<()> {
180 self.buf.extend_from_slice(block);
181 if let CachePolicy::Fill(_) = self.policy {
182 self.block_info.push(Block::decode(
183 Bytes::from(block.to_vec()),
184 meta.uncompressed_size as usize,
185 )?);
186 }
187 Ok(())
188 }
189
190 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
191 self.buf.extend_from_slice(&block);
192 if let CachePolicy::Fill(_) = self.policy {
193 self.block_info
194 .push(Block::decode(block, meta.uncompressed_size as usize)?);
195 }
196 Ok(())
197 }
198
199 async fn finish(mut self, meta: SstableMeta) -> HummockResult<Self::Output> {
200 fail_point!("data_upload_err");
201 let join_handle = tokio::spawn(async move {
202 meta.encode_to(&mut self.buf);
203 let data = Bytes::from(self.buf);
204 let _tracker = self.tracker.map(|mut t| {
205 if !t.try_increase_memory(data.capacity() as u64) {
206 tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
207 self.object_id, data.capacity());
208 }
209 t
210 });
211
212 self.sstable_store
214 .clone()
215 .put_sst_data(self.object_id, data)
216 .await?;
217 self.sstable_store.insert_meta_cache(self.object_id, meta);
218
219 if let Some(filter) = self.sstable_store.recent_filter() {
221 filter.insert((self.object_id, usize::MAX));
222 }
223
224 if let CachePolicy::Fill(hint) = self.policy {
226 for (block_idx, block) in self.block_info.into_iter().enumerate() {
229 self.sstable_store.block_cache().insert_with_properties(
230 SstableBlockIndex {
231 sst_id: self.object_id,
232 block_idx: block_idx as _,
233 },
234 Box::new(block),
235 HybridCacheProperties::default().with_hint(hint),
236 );
237 }
238 }
239 Ok(())
240 });
241 Ok(join_handle)
242 }
243
244 fn data_len(&self) -> usize {
245 self.buf.len()
246 }
247}
248
249pub struct StreamingUploadWriter {
250 object_id: HummockSstableObjectId,
251 sstable_store: SstableStoreRef,
252 policy: CachePolicy,
253 object_uploader: ObjectStreamingUploader,
255 blocks: Vec<Block>,
257 data_len: usize,
258 tracker: Option<MemoryTracker>,
259}
260
261impl StreamingUploadWriter {
262 pub fn new(
263 object_id: HummockSstableObjectId,
264 sstable_store: SstableStoreRef,
265 object_uploader: ObjectStreamingUploader,
266 options: SstableWriterOptions,
267 ) -> Self {
268 Self {
269 object_id,
270 sstable_store,
271 policy: options.policy,
272 object_uploader,
273 blocks: Vec::new(),
274 data_len: 0,
275 tracker: options.tracker,
276 }
277 }
278}
279
280pub enum UnifiedSstableWriter {
281 StreamingSstableWriter(StreamingUploadWriter),
282 BatchSstableWriter(BatchUploadWriter),
283}
284
285#[async_trait::async_trait]
286impl SstableWriter for StreamingUploadWriter {
287 type Output = JoinHandle<HummockResult<()>>;
288
289 async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
290 self.data_len += block_data.len();
291 let block_data = Bytes::from(block_data.to_vec());
292 if let CachePolicy::Fill(_) = self.policy {
293 let block = Block::decode(block_data.clone(), meta.uncompressed_size as usize)?;
294 self.blocks.push(block);
295 }
296 self.object_uploader
297 .write_bytes(block_data)
298 .await
299 .map_err(Into::into)
300 }
301
302 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
303 self.data_len += block.len();
304 if let CachePolicy::Fill(_) = self.policy {
305 let block = Block::decode(block.clone(), meta.uncompressed_size as usize)?;
306 self.blocks.push(block);
307 }
308 self.object_uploader
309 .write_bytes(block)
310 .await
311 .map_err(Into::into)
312 }
313
314 async fn finish(mut self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
315 let metadata = Bytes::from(meta.encode_to_bytes());
316
317 self.object_uploader.write_bytes(metadata).await?;
318 let join_handle = tokio::spawn(async move {
319 let uploader_memory_usage = self.object_uploader.get_memory_usage();
320 let _tracker = self.tracker.map(|mut t| {
321 if !t.try_increase_memory(uploader_memory_usage) {
322 tracing::debug!("failed to allocate increase memory for data file, sst object id: {}, file size: {}",
323 self.object_id, uploader_memory_usage);
324 }
325 t
326 });
327
328 assert!(!meta.block_metas.is_empty());
329
330 self.object_uploader.finish().await?;
332 self.sstable_store.insert_meta_cache(self.object_id, meta);
334
335 if let CachePolicy::Fill(hint) = self.policy
337 && !self.blocks.is_empty()
338 {
339 for (block_idx, block) in self.blocks.into_iter().enumerate() {
340 self.sstable_store.block_cache().insert_with_properties(
341 SstableBlockIndex {
342 sst_id: self.object_id,
343 block_idx: block_idx as _,
344 },
345 Box::new(block),
346 HybridCacheProperties::default().with_hint(hint),
347 );
348 }
349 }
350 Ok(())
351 });
352 Ok(join_handle)
353 }
354
355 fn data_len(&self) -> usize {
356 self.data_len
357 }
358}
359
360pub struct StreamingSstableWriterFactory {
361 sstable_store: SstableStoreRef,
362}
363
364impl StreamingSstableWriterFactory {
365 pub fn new(sstable_store: SstableStoreRef) -> Self {
366 StreamingSstableWriterFactory { sstable_store }
367 }
368}
369pub struct UnifiedSstableWriterFactory {
370 sstable_store: SstableStoreRef,
371}
372
373impl UnifiedSstableWriterFactory {
374 pub fn new(sstable_store: SstableStoreRef) -> Self {
375 UnifiedSstableWriterFactory { sstable_store }
376 }
377}
378
379#[async_trait::async_trait]
380impl SstableWriterFactory for UnifiedSstableWriterFactory {
381 type Writer = UnifiedSstableWriter;
382
383 async fn create_sst_writer(
384 &mut self,
385 object_id: impl Into<HummockSstableObjectId> + Send,
386 options: SstableWriterOptions,
387 ) -> HummockResult<Self::Writer> {
388 let object_id = object_id.into();
389 if self.sstable_store.store().support_streaming_upload() {
390 let path = self.sstable_store.get_sst_data_path(object_id);
391 let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
392 let streaming_uploader_writer = StreamingUploadWriter::new(
393 object_id,
394 self.sstable_store.clone(),
395 uploader,
396 options,
397 );
398
399 Ok(UnifiedSstableWriter::StreamingSstableWriter(
400 streaming_uploader_writer,
401 ))
402 } else {
403 let batch_uploader_writer =
404 BatchUploadWriter::new(object_id, self.sstable_store.clone(), options);
405 Ok(UnifiedSstableWriter::BatchSstableWriter(
406 batch_uploader_writer,
407 ))
408 }
409 }
410}
411
412#[async_trait::async_trait]
413impl SstableWriterFactory for StreamingSstableWriterFactory {
414 type Writer = StreamingUploadWriter;
415
416 async fn create_sst_writer(
417 &mut self,
418 object_id: impl Into<HummockSstableObjectId> + Send,
419 options: SstableWriterOptions,
420 ) -> HummockResult<Self::Writer> {
421 let object_id = object_id.into();
422 let path = self.sstable_store.get_sst_data_path(object_id);
423 let uploader = self.sstable_store.create_streaming_uploader(&path).await?;
424 Ok(StreamingUploadWriter::new(
425 object_id,
426 self.sstable_store.clone(),
427 uploader,
428 options,
429 ))
430 }
431}
432
433#[async_trait::async_trait]
434impl SstableWriter for UnifiedSstableWriter {
435 type Output = JoinHandle<HummockResult<()>>;
436
437 async fn write_block(&mut self, block_data: &[u8], meta: &BlockMeta) -> HummockResult<()> {
438 match self {
439 UnifiedSstableWriter::StreamingSstableWriter(stream) => {
440 stream.write_block(block_data, meta).await
441 }
442 UnifiedSstableWriter::BatchSstableWriter(batch) => {
443 batch.write_block(block_data, meta).await
444 }
445 }
446 }
447
448 async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> {
449 match self {
450 UnifiedSstableWriter::StreamingSstableWriter(stream) => {
451 stream.write_block_bytes(block, meta).await
452 }
453 UnifiedSstableWriter::BatchSstableWriter(batch) => {
454 batch.write_block_bytes(block, meta).await
455 }
456 }
457 }
458
459 async fn finish(self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
460 match self {
461 UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.finish(meta).await,
462 UnifiedSstableWriter::BatchSstableWriter(batch) => batch.finish(meta).await,
463 }
464 }
465
466 fn data_len(&self) -> usize {
467 match self {
468 UnifiedSstableWriter::StreamingSstableWriter(stream) => stream.data_len(),
469 UnifiedSstableWriter::BatchSstableWriter(batch) => batch.data_len(),
470 }
471 }
472}
473
474#[cfg(test)]
475mod tests {
476
477 use bytes::Bytes;
478 use rand::{Rng, SeedableRng};
479 use risingwave_common::util::iter_util::ZipEqFast;
480
481 use crate::hummock::sstable::VERSION;
482 use crate::hummock::{BlockMeta, InMemWriter, SstableMeta, SstableWriter};
483
484 fn get_sst() -> (Bytes, Vec<Bytes>, SstableMeta) {
485 let mut rng = rand::rngs::StdRng::seed_from_u64(0);
486 let mut buffer: Vec<u8> = vec![0; 5000];
487 rng.fill(&mut buffer[..]);
488 buffer.extend((5_u32).to_le_bytes());
489 let data = Bytes::from(buffer);
490
491 let mut blocks = Vec::with_capacity(5);
492 let mut block_metas = Vec::with_capacity(5);
493 for i in 0..5 {
494 block_metas.push(BlockMeta {
495 smallest_key: Vec::new(),
496 len: 1000,
497 offset: i * 1000,
498 ..Default::default()
499 });
500 blocks.push(data.slice((i * 1000) as usize..((i + 1) * 1000) as usize));
501 }
502 #[expect(deprecated)]
503 let meta = SstableMeta {
504 block_metas,
505 bloom_filter: vec![],
506 estimated_size: 0,
507 key_count: 0,
508 smallest_key: Vec::new(),
509 largest_key: Vec::new(),
510 meta_offset: data.len() as u64,
511 monotonic_tombstone_events: vec![],
512 version: VERSION,
513 };
514
515 (data, blocks, meta)
516 }
517
518 #[tokio::test]
519 async fn test_in_mem_writer() {
520 let (data, blocks, meta) = get_sst();
521 let mut writer = Box::new(InMemWriter::new(0));
522 for (block, meta) in blocks.iter().zip_eq_fast(meta.block_metas.iter()) {
523 writer.write_block(&block[..], meta).await.unwrap();
524 }
525
526 let meta_offset = meta.meta_offset as usize;
527 let (output_data, _) = writer.finish(meta).await.unwrap();
528 assert_eq!(output_data.slice(0..meta_offset), data);
529 }
530}