risingwave_object_store/object/opendal_engine/
opendal_object_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17use std::time::Duration;
18
19use bytes::Bytes;
20use fail::fail_point;
21use futures::{StreamExt, stream};
22use opendal::layers::{RetryLayer, TimeoutLayer};
23use opendal::raw::BoxedStaticFuture;
24use opendal::services::Memory;
25use opendal::{Execute, Executor, Operator, Writer};
26use risingwave_common::config::ObjectStoreConfig;
27use risingwave_common::range::RangeBoundsExt;
28use thiserror_ext::AsReport;
29
30use crate::object::object_metrics::ObjectStoreMetrics;
31use crate::object::{
32    ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds,
33    ObjectResult, ObjectStore, OperationType, StreamingUploader, prefix,
34};
35
36/// Opendal object storage.
37#[derive(Clone)]
38pub struct OpendalObjectStore {
39    pub(crate) op: Operator,
40    pub(crate) media_type: MediaType,
41    pub(crate) config: Arc<ObjectStoreConfig>,
42    pub(crate) metrics: Arc<ObjectStoreMetrics>,
43}
44
45#[derive(Clone)]
46pub enum MediaType {
47    Memory,
48    Hdfs,
49    Gcs,
50    Minio,
51    S3,
52    Obs,
53    Oss,
54    Webhdfs,
55    Azblob,
56    Fs,
57}
58
59impl MediaType {
60    pub fn as_str(&self) -> &'static str {
61        match self {
62            MediaType::Memory => "Memory",
63            MediaType::Hdfs => "Hdfs",
64            MediaType::Gcs => "Gcs",
65            MediaType::Minio => "Minio",
66            MediaType::S3 => "S3",
67            MediaType::Obs => "Obs",
68            MediaType::Oss => "Oss",
69            MediaType::Webhdfs => "Webhdfs",
70            MediaType::Azblob => "Azblob",
71            MediaType::Fs => "Fs",
72        }
73    }
74}
75
76impl OpendalObjectStore {
77    /// create opendal memory engine, used for unit tests.
78    pub fn test_new_memory_engine() -> ObjectResult<Self> {
79        // Create memory backend builder.
80        let builder = Memory::default();
81        let op: Operator = Operator::new(builder)?.finish();
82
83        Ok(Self {
84            op,
85            media_type: MediaType::Memory,
86            config: Arc::new(ObjectStoreConfig::default()),
87            metrics: Arc::new(ObjectStoreMetrics::unused()),
88        })
89    }
90}
91
92#[async_trait::async_trait]
93impl ObjectStore for OpendalObjectStore {
94    type StreamingUploader = OpendalStreamingUploader;
95
96    fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
97        match self.media_type {
98            MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
99            MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
100            MediaType::Memory => String::default(),
101            MediaType::Hdfs
102            | MediaType::Gcs
103            | MediaType::Obs
104            | MediaType::Oss
105            | MediaType::Webhdfs
106            | MediaType::Azblob
107            | MediaType::Fs => {
108                prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
109            }
110        }
111    }
112
113    async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
114        if obj.is_empty() {
115            Err(ObjectError::internal("upload empty object"))
116        } else {
117            self.op.write(path, obj).await?;
118            Ok(())
119        }
120    }
121
122    async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
123        Ok(OpendalStreamingUploader::new(
124            self.op.clone(),
125            path.to_owned(),
126            self.config.clone(),
127            self.metrics.clone(),
128            self.store_media_type(),
129        )
130        .await?)
131    }
132
133    async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
134        let data = if range.is_full() {
135            self.op.read(path).await?
136        } else {
137            self.op
138                .read_with(path)
139                .range(range.map(|v| *v as u64))
140                .await?
141        };
142
143        if let Some(len) = range.len()
144            && len != data.len()
145        {
146            return Err(ObjectError::internal(format!(
147                "mismatched size: expected {}, found {} when reading {} at {:?}",
148                len,
149                data.len(),
150                path,
151                range,
152            )));
153        }
154
155        Ok(data.to_bytes())
156    }
157
158    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
159    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
160    /// of data into memory that is read from the stream.
161    async fn streaming_read(
162        &self,
163        path: &str,
164        range: Range<usize>,
165    ) -> ObjectResult<ObjectDataStream> {
166        fail_point!("opendal_streaming_read_err", |_| Err(
167            ObjectError::internal("opendal streaming read error")
168        ));
169        let range: Range<u64> = (range.start as u64)..(range.end as u64);
170
171        // The layer specified first will be executed first.
172        // `TimeoutLayer` must be specified before `RetryLayer`.
173        // Otherwise, it will lead to bad state inside OpenDAL and panic.
174        // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics
175        let reader = self
176            .op
177            .clone()
178            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
179                self.config.retry.streaming_read_attempt_timeout_ms,
180            )))
181            .layer(
182                RetryLayer::new()
183                    .with_min_delay(Duration::from_millis(
184                        self.config.retry.req_backoff_interval_ms,
185                    ))
186                    .with_max_delay(Duration::from_millis(
187                        self.config.retry.req_backoff_max_delay_ms,
188                    ))
189                    .with_max_times(self.config.retry.streaming_read_retry_attempts)
190                    .with_factor(self.config.retry.req_backoff_factor as f32)
191                    .with_jitter(),
192            )
193            .reader_with(path)
194            .await?;
195        let stream = reader.into_bytes_stream(range).await?.map(|item| {
196            item.map(|b| Bytes::copy_from_slice(b.as_ref()))
197                .map_err(|e| {
198                    ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
199                })
200        });
201
202        Ok(Box::pin(stream))
203    }
204
205    async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
206        let opendal_metadata = self.op.stat(path).await?;
207        let key = path.to_owned();
208        let last_modified = match opendal_metadata.last_modified() {
209            Some(t) => t.timestamp() as f64,
210            None => 0_f64,
211        };
212
213        let total_size = opendal_metadata.content_length() as usize;
214        let metadata = ObjectMetadata {
215            key,
216            last_modified,
217            total_size,
218        };
219        Ok(metadata)
220    }
221
222    async fn delete(&self, path: &str) -> ObjectResult<()> {
223        self.op.delete(path).await?;
224        Ok(())
225    }
226
227    /// Deletes the objects with the given paths permanently from the storage. If an object
228    /// specified in the request is not found, it will be considered as successfully deleted.
229    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
230        self.op.delete_iter(paths.to_vec()).await?;
231        Ok(())
232    }
233
234    async fn list(
235        &self,
236        prefix: &str,
237        start_after: Option<String>,
238        limit: Option<usize>,
239    ) -> ObjectResult<ObjectMetadataIter> {
240        let mut object_lister = self.op.lister_with(prefix).recursive(true);
241        if let Some(start_after) = start_after {
242            object_lister = object_lister.start_after(&start_after);
243        }
244        let object_lister = object_lister.await?;
245
246        let op = self.op.clone();
247        let full_capability = op.info().full_capability();
248        let stream = stream::unfold(object_lister, move |mut object_lister| {
249            let op = op.clone();
250
251            async move {
252                match object_lister.next().await {
253                    Some(Ok(object)) => {
254                        let key = object.path().to_owned();
255
256                        // Check if we need to call stat() to get complete metadata.
257                        let (last_modified, total_size) = if !full_capability
258                            .list_has_content_length
259                            || !full_capability.list_has_last_modified
260                        {
261                            // Need complete metadata, call stat()
262                            let stat_meta = op.stat(&key).await.ok()?;
263                            let last_modified = match stat_meta.last_modified() {
264                                Some(t) => t.timestamp() as f64,
265                                None => 0_f64,
266                            };
267                            let total_size = stat_meta.content_length() as usize;
268                            (last_modified, total_size)
269                        } else {
270                            // Use metadata from list operation
271                            let meta = object.metadata();
272                            let last_modified = match meta.last_modified() {
273                                Some(t) => t.timestamp() as f64,
274                                None => 0_f64,
275                            };
276                            let total_size = meta.content_length() as usize;
277                            (last_modified, total_size)
278                        };
279
280                        let metadata = ObjectMetadata {
281                            key,
282                            last_modified,
283                            total_size,
284                        };
285                        Some((Ok(metadata), object_lister))
286                    }
287                    Some(Err(err)) => Some((Err(err.into()), object_lister)),
288                    None => None,
289                }
290            }
291        });
292
293        Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
294    }
295
296    fn store_media_type(&self) -> &'static str {
297        self.media_type.as_str()
298    }
299
300    fn support_streaming_upload(&self) -> bool {
301        self.op.info().native_capability().write_can_multi
302    }
303}
304
305impl OpendalObjectStore {
306    pub async fn copy(&self, from_path: &str, to_path: &str) -> ObjectResult<()> {
307        self.op.copy(from_path, to_path).await?;
308        Ok(())
309    }
310}
311
312struct OpendalStreamingUploaderExecute {
313    /// To record metrics for uploading part.
314    metrics: Arc<ObjectStoreMetrics>,
315    media_type: &'static str,
316}
317
318impl OpendalStreamingUploaderExecute {
319    const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload;
320
321    fn new(metrics: Arc<ObjectStoreMetrics>, media_type: &'static str) -> Self {
322        Self {
323            metrics,
324            media_type,
325        }
326    }
327}
328
329impl Execute for OpendalStreamingUploaderExecute {
330    fn execute(&self, f: BoxedStaticFuture<()>) {
331        let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str();
332        let media_type = self.media_type;
333
334        let metrics = self.metrics.clone();
335        let _handle = tokio::spawn(async move {
336            let _timer = metrics
337                .operation_latency
338                .with_label_values(&[media_type, operation_type_str])
339                .start_timer();
340
341            f.await
342        });
343    }
344}
345
346/// Store multiple parts in a map, and concatenate them on finish.
347pub struct OpendalStreamingUploader {
348    writer: Writer,
349    /// Buffer for data. It will store at least `UPLOAD_BUFFER_SIZE` bytes of data before wrapping itself
350    /// into a stream and upload to object store as a part.
351    buf: Vec<Bytes>,
352    /// Length of the data that have not been uploaded to object store.
353    not_uploaded_len: usize,
354    /// Whether the writer is valid. The writer is invalid after abort/close.
355    is_valid: bool,
356
357    abort_on_err: bool,
358
359    upload_part_size: usize,
360}
361
362impl OpendalStreamingUploader {
363    pub async fn new(
364        op: Operator,
365        path: String,
366        config: Arc<ObjectStoreConfig>,
367        metrics: Arc<ObjectStoreMetrics>,
368        media_type: &'static str,
369    ) -> ObjectResult<Self> {
370        let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type);
371        let executor = Executor::with(monitored_execute);
372        op.update_executor(|_| executor);
373
374        let writer = op
375            .clone()
376            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
377                config.retry.streaming_upload_attempt_timeout_ms,
378            )))
379            .layer(
380                RetryLayer::new()
381                    .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms))
382                    .with_max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
383                    .with_max_times(config.retry.streaming_upload_retry_attempts)
384                    .with_factor(config.retry.req_backoff_factor as f32)
385                    .with_jitter(),
386            )
387            .writer_with(&path)
388            .concurrent(config.opendal_upload_concurrency)
389            .await?;
390        Ok(Self {
391            writer,
392            buf: vec![],
393            not_uploaded_len: 0,
394            is_valid: true,
395            abort_on_err: config.opendal_writer_abort_on_err,
396            upload_part_size: config.upload_part_size,
397        })
398    }
399
400    async fn flush(&mut self) -> ObjectResult<()> {
401        let data: Vec<Bytes> = self.buf.drain(..).collect();
402        debug_assert_eq!(
403            data.iter().map(|b| b.len()).sum::<usize>(),
404            self.not_uploaded_len
405        );
406        if let Err(err) = self.writer.write(data).await {
407            self.is_valid = false;
408            if self.abort_on_err {
409                self.writer.abort().await?;
410            }
411            return Err(err.into());
412        }
413        self.not_uploaded_len = 0;
414        Ok(())
415    }
416}
417
418impl StreamingUploader for OpendalStreamingUploader {
419    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
420        assert!(self.is_valid);
421        self.not_uploaded_len += data.len();
422        self.buf.push(data);
423        if self.not_uploaded_len >= self.upload_part_size {
424            self.flush().await?;
425        }
426        Ok(())
427    }
428
429    async fn finish(mut self) -> ObjectResult<()> {
430        assert!(self.is_valid);
431        if self.not_uploaded_len > 0 {
432            self.flush().await?;
433        }
434
435        assert!(self.buf.is_empty());
436        assert_eq!(self.not_uploaded_len, 0);
437
438        self.is_valid = false;
439        match self.writer.close().await {
440            Ok(_) => (),
441            Err(err) => {
442                if self.abort_on_err {
443                    self.writer.abort().await?;
444                }
445                return Err(err.into());
446            }
447        };
448
449        Ok(())
450    }
451
452    // Not absolutely accurate. Some bytes may be in the infight request.
453    fn get_memory_usage(&self) -> u64 {
454        self.not_uploaded_len as u64
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use stream::TryStreamExt;
461
462    use super::*;
463
464    async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
465        store
466            .list(prefix, None, None)
467            .await
468            .unwrap()
469            .try_collect::<Vec<_>>()
470            .await
471            .unwrap()
472    }
473
474    #[tokio::test]
475    async fn test_memory_upload() {
476        let block = Bytes::from("123456");
477        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
478        store.upload("/abc", block).await.unwrap();
479
480        // No such object.
481        store.read("/ab", 0..3).await.unwrap_err();
482
483        let bytes = store.read("/abc", 4..6).await.unwrap();
484        assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_owned());
485
486        store.delete("/abc").await.unwrap();
487
488        // No such object.
489        store.read("/abc", 0..3).await.unwrap_err();
490    }
491
492    #[tokio::test]
493    #[should_panic]
494    async fn test_memory_read_overflow() {
495        let block = Bytes::from("123456");
496        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
497        store.upload("/abc", block).await.unwrap();
498
499        // Overflow.
500        store.read("/abc", 4..44).await.unwrap_err();
501    }
502
503    #[tokio::test]
504    async fn test_memory_metadata() {
505        let block = Bytes::from("123456");
506        let path = "/abc".to_owned();
507        let obj_store = OpendalObjectStore::test_new_memory_engine().unwrap();
508        obj_store.upload("/abc", block).await.unwrap();
509
510        let err = obj_store.metadata("/not_exist").await.unwrap_err();
511        assert!(err.is_object_not_found_error());
512        let metadata = obj_store.metadata("/abc").await.unwrap();
513        assert_eq!(metadata.total_size, 6);
514        obj_store.delete(&path).await.unwrap();
515    }
516
517    #[tokio::test]
518    async fn test_memory_delete_objects_and_list_object() {
519        let block1 = Bytes::from("123456");
520        let block2 = Bytes::from("987654");
521        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
522        store.upload("abc", Bytes::from("123456")).await.unwrap();
523        store.upload("prefix/abc", block1).await.unwrap();
524
525        store.upload("prefix/xyz", block2).await.unwrap();
526
527        assert_eq!(list_all("", &store).await.len(), 3);
528        assert_eq!(list_all("prefix/", &store).await.len(), 2);
529        let str_list = [String::from("prefix/abc"), String::from("prefix/xyz")];
530
531        store.delete_objects(&str_list).await.unwrap();
532
533        assert!(store.read("prefix/abc/", ..).await.is_err());
534        assert!(store.read("prefix/xyz/", ..).await.is_err());
535        assert_eq!(list_all("", &store).await.len(), 1);
536        assert_eq!(list_all("prefix/", &store).await.len(), 0);
537    }
538}