risingwave_object_store/object/opendal_engine/
opendal_object_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17use std::time::Duration;
18
19use bytes::Bytes;
20use fail::fail_point;
21use futures::{StreamExt, stream};
22use opendal::layers::{RetryLayer, TimeoutLayer};
23use opendal::raw::BoxedStaticFuture;
24use opendal::services::Memory;
25use opendal::{Execute, Executor, Metakey, Operator, Writer};
26use risingwave_common::config::ObjectStoreConfig;
27use risingwave_common::range::RangeBoundsExt;
28use thiserror_ext::AsReport;
29
30use crate::object::object_metrics::ObjectStoreMetrics;
31use crate::object::{
32    ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds,
33    ObjectResult, ObjectStore, OperationType, StreamingUploader, prefix,
34};
35
36/// Opendal object storage.
37#[derive(Clone)]
38pub struct OpendalObjectStore {
39    pub(crate) op: Operator,
40    pub(crate) media_type: MediaType,
41
42    pub(crate) config: Arc<ObjectStoreConfig>,
43    pub(crate) metrics: Arc<ObjectStoreMetrics>,
44}
45
46#[derive(Clone)]
47pub enum MediaType {
48    Memory,
49    Hdfs,
50    Gcs,
51    Minio,
52    S3,
53    Obs,
54    Oss,
55    Webhdfs,
56    Azblob,
57    Fs,
58}
59
60impl MediaType {
61    pub fn as_str(&self) -> &'static str {
62        match self {
63            MediaType::Memory => "Memory",
64            MediaType::Hdfs => "Hdfs",
65            MediaType::Gcs => "Gcs",
66            MediaType::Minio => "Minio",
67            MediaType::S3 => "S3",
68            MediaType::Obs => "Obs",
69            MediaType::Oss => "Oss",
70            MediaType::Webhdfs => "Webhdfs",
71            MediaType::Azblob => "Azblob",
72            MediaType::Fs => "Fs",
73        }
74    }
75}
76
77impl OpendalObjectStore {
78    /// create opendal memory engine, used for unit tests.
79    pub fn test_new_memory_engine() -> ObjectResult<Self> {
80        // Create memory backend builder.
81        let builder = Memory::default();
82        let op: Operator = Operator::new(builder)?.finish();
83        Ok(Self {
84            op,
85            media_type: MediaType::Memory,
86            config: Arc::new(ObjectStoreConfig::default()),
87            metrics: Arc::new(ObjectStoreMetrics::unused()),
88        })
89    }
90}
91
92#[async_trait::async_trait]
93impl ObjectStore for OpendalObjectStore {
94    type StreamingUploader = OpendalStreamingUploader;
95
96    fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
97        match self.media_type {
98            MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
99            MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
100            MediaType::Memory => String::default(),
101            MediaType::Hdfs
102            | MediaType::Gcs
103            | MediaType::Obs
104            | MediaType::Oss
105            | MediaType::Webhdfs
106            | MediaType::Azblob
107            | MediaType::Fs => {
108                prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
109            }
110        }
111    }
112
113    async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
114        if obj.is_empty() {
115            Err(ObjectError::internal("upload empty object"))
116        } else {
117            self.op.write(path, obj).await?;
118            Ok(())
119        }
120    }
121
122    async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
123        Ok(OpendalStreamingUploader::new(
124            self.op.clone(),
125            path.to_owned(),
126            self.config.clone(),
127            self.metrics.clone(),
128            self.store_media_type(),
129        )
130        .await?)
131    }
132
133    async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
134        let data = if range.is_full() {
135            self.op.read(path).await?
136        } else {
137            self.op
138                .read_with(path)
139                .range(range.map(|v| *v as u64))
140                .await?
141        };
142
143        if let Some(len) = range.len()
144            && len != data.len()
145        {
146            return Err(ObjectError::internal(format!(
147                "mismatched size: expected {}, found {} when reading {} at {:?}",
148                len,
149                data.len(),
150                path,
151                range,
152            )));
153        }
154
155        Ok(data.to_bytes())
156    }
157
158    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
159    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
160    /// of data into memory that is read from the stream.
161    async fn streaming_read(
162        &self,
163        path: &str,
164        range: Range<usize>,
165    ) -> ObjectResult<ObjectDataStream> {
166        fail_point!("opendal_streaming_read_err", |_| Err(
167            ObjectError::internal("opendal streaming read error")
168        ));
169        let range: Range<u64> = (range.start as u64)..(range.end as u64);
170
171        // The layer specified first will be executed first.
172        // `TimeoutLayer` must be specified before `RetryLayer`.
173        // Otherwise, it will lead to bad state inside OpenDAL and panic.
174        // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics
175        let reader = self
176            .op
177            .clone()
178            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
179                self.config.retry.streaming_read_attempt_timeout_ms,
180            )))
181            .layer(
182                RetryLayer::new()
183                    .with_min_delay(Duration::from_millis(
184                        self.config.retry.req_backoff_interval_ms,
185                    ))
186                    .with_max_delay(Duration::from_millis(
187                        self.config.retry.req_backoff_max_delay_ms,
188                    ))
189                    .with_max_times(self.config.retry.streaming_read_retry_attempts)
190                    .with_factor(self.config.retry.req_backoff_factor as f32)
191                    .with_jitter(),
192            )
193            .reader_with(path)
194            .await?;
195        let stream = reader.into_bytes_stream(range).await?.map(|item| {
196            item.map(|b| Bytes::copy_from_slice(b.as_ref()))
197                .map_err(|e| {
198                    ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
199                })
200        });
201
202        Ok(Box::pin(stream))
203    }
204
205    async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
206        let opendal_metadata = self.op.stat(path).await?;
207        let key = path.to_owned();
208        let last_modified = match opendal_metadata.last_modified() {
209            Some(t) => t.timestamp() as f64,
210            None => 0_f64,
211        };
212
213        let total_size = opendal_metadata.content_length() as usize;
214        let metadata = ObjectMetadata {
215            key,
216            last_modified,
217            total_size,
218        };
219        Ok(metadata)
220    }
221
222    async fn delete(&self, path: &str) -> ObjectResult<()> {
223        self.op.delete(path).await?;
224        Ok(())
225    }
226
227    /// Deletes the objects with the given paths permanently from the storage. If an object
228    /// specified in the request is not found, it will be considered as successfully deleted.
229    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
230        self.op.remove(paths.to_vec()).await?;
231        Ok(())
232    }
233
234    async fn list(
235        &self,
236        prefix: &str,
237        start_after: Option<String>,
238        limit: Option<usize>,
239    ) -> ObjectResult<ObjectMetadataIter> {
240        let mut object_lister = self
241            .op
242            .lister_with(prefix)
243            .recursive(true)
244            .metakey(Metakey::ContentLength);
245        if let Some(start_after) = start_after {
246            object_lister = object_lister.start_after(&start_after);
247        }
248        let object_lister = object_lister.await?;
249
250        let stream = stream::unfold(object_lister, |mut object_lister| async move {
251            match object_lister.next().await {
252                Some(Ok(object)) => {
253                    let key = object.path().to_owned();
254                    let om = object.metadata();
255                    let last_modified = match om.last_modified() {
256                        Some(t) => t.timestamp() as f64,
257                        None => 0_f64,
258                    };
259                    let total_size = om.content_length() as usize;
260                    let metadata = ObjectMetadata {
261                        key,
262                        last_modified,
263                        total_size,
264                    };
265                    Some((Ok(metadata), object_lister))
266                }
267                Some(Err(err)) => Some((Err(err.into()), object_lister)),
268                None => None,
269            }
270        });
271
272        Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
273    }
274
275    fn store_media_type(&self) -> &'static str {
276        self.media_type.as_str()
277    }
278
279    fn support_streaming_upload(&self) -> bool {
280        self.op.info().native_capability().write_can_multi
281    }
282}
283
284impl OpendalObjectStore {
285    pub async fn copy(&self, from_path: &str, to_path: &str) -> ObjectResult<()> {
286        self.op.copy(from_path, to_path).await?;
287        Ok(())
288    }
289}
290
291struct OpendalStreamingUploaderExecute {
292    /// To record metrics for uploading part.
293    metrics: Arc<ObjectStoreMetrics>,
294    media_type: &'static str,
295}
296
297impl OpendalStreamingUploaderExecute {
298    const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload;
299
300    fn new(metrics: Arc<ObjectStoreMetrics>, media_type: &'static str) -> Self {
301        Self {
302            metrics,
303            media_type,
304        }
305    }
306}
307
308impl Execute for OpendalStreamingUploaderExecute {
309    fn execute(&self, f: BoxedStaticFuture<()>) {
310        let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str();
311        let media_type = self.media_type;
312
313        let metrics = self.metrics.clone();
314        let _handle = tokio::spawn(async move {
315            let _timer = metrics
316                .operation_latency
317                .with_label_values(&[media_type, operation_type_str])
318                .start_timer();
319
320            f.await
321        });
322    }
323}
324
325/// Store multiple parts in a map, and concatenate them on finish.
326pub struct OpendalStreamingUploader {
327    writer: Writer,
328    /// Buffer for data. It will store at least `UPLOAD_BUFFER_SIZE` bytes of data before wrapping itself
329    /// into a stream and upload to object store as a part.
330    buf: Vec<Bytes>,
331    /// Length of the data that have not been uploaded to object store.
332    not_uploaded_len: usize,
333    /// Whether the writer is valid. The writer is invalid after abort/close.
334    is_valid: bool,
335
336    abort_on_err: bool,
337}
338
339impl OpendalStreamingUploader {
340    const UPLOAD_BUFFER_SIZE: usize = 16 * 1024 * 1024;
341
342    pub async fn new(
343        op: Operator,
344        path: String,
345        config: Arc<ObjectStoreConfig>,
346        metrics: Arc<ObjectStoreMetrics>,
347        media_type: &'static str,
348    ) -> ObjectResult<Self> {
349        let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type);
350
351        // The layer specified first will be executed first.
352        // `TimeoutLayer` must be specified before `RetryLayer`.
353        // Otherwise, it will lead to bad state inside OpenDAL and panic.
354        // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics
355        let writer = op
356            .clone()
357            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
358                config.retry.streaming_upload_attempt_timeout_ms,
359            )))
360            .layer(
361                RetryLayer::new()
362                    .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms))
363                    .with_max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
364                    .with_max_times(config.retry.streaming_upload_retry_attempts)
365                    .with_factor(config.retry.req_backoff_factor as f32)
366                    .with_jitter(),
367            )
368            .writer_with(&path)
369            .concurrent(config.opendal_upload_concurrency)
370            .executor(Executor::with(monitored_execute))
371            .await?;
372        Ok(Self {
373            writer,
374            buf: vec![],
375            not_uploaded_len: 0,
376            is_valid: true,
377            abort_on_err: config.opendal_writer_abort_on_err,
378        })
379    }
380
381    async fn flush(&mut self) -> ObjectResult<()> {
382        let data: Vec<Bytes> = self.buf.drain(..).collect();
383        debug_assert_eq!(
384            data.iter().map(|b| b.len()).sum::<usize>(),
385            self.not_uploaded_len
386        );
387        if let Err(err) = self.writer.write(data).await {
388            self.is_valid = false;
389            if self.abort_on_err {
390                self.writer.abort().await?;
391            }
392            return Err(err.into());
393        }
394        self.not_uploaded_len = 0;
395        Ok(())
396    }
397}
398
399impl StreamingUploader for OpendalStreamingUploader {
400    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
401        assert!(self.is_valid);
402        self.not_uploaded_len += data.len();
403        self.buf.push(data);
404        if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE {
405            self.flush().await?;
406        }
407        Ok(())
408    }
409
410    async fn finish(mut self) -> ObjectResult<()> {
411        assert!(self.is_valid);
412        if self.not_uploaded_len > 0 {
413            self.flush().await?;
414        }
415
416        assert!(self.buf.is_empty());
417        assert_eq!(self.not_uploaded_len, 0);
418
419        self.is_valid = false;
420        match self.writer.close().await {
421            Ok(_) => (),
422            Err(err) => {
423                if self.abort_on_err {
424                    self.writer.abort().await?;
425                }
426                return Err(err.into());
427            }
428        };
429
430        Ok(())
431    }
432
433    fn get_memory_usage(&self) -> u64 {
434        Self::UPLOAD_BUFFER_SIZE as u64
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use stream::TryStreamExt;
441
442    use super::*;
443
444    async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
445        store
446            .list(prefix, None, None)
447            .await
448            .unwrap()
449            .try_collect::<Vec<_>>()
450            .await
451            .unwrap()
452    }
453
454    #[tokio::test]
455    async fn test_memory_upload() {
456        let block = Bytes::from("123456");
457        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
458        store.upload("/abc", block).await.unwrap();
459
460        // No such object.
461        store.read("/ab", 0..3).await.unwrap_err();
462
463        let bytes = store.read("/abc", 4..6).await.unwrap();
464        assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_owned());
465
466        store.delete("/abc").await.unwrap();
467
468        // No such object.
469        store.read("/abc", 0..3).await.unwrap_err();
470    }
471
472    #[tokio::test]
473    #[should_panic]
474    async fn test_memory_read_overflow() {
475        let block = Bytes::from("123456");
476        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
477        store.upload("/abc", block).await.unwrap();
478
479        // Overflow.
480        store.read("/abc", 4..44).await.unwrap_err();
481    }
482
483    #[tokio::test]
484    async fn test_memory_metadata() {
485        let block = Bytes::from("123456");
486        let path = "/abc".to_owned();
487        let obj_store = OpendalObjectStore::test_new_memory_engine().unwrap();
488        obj_store.upload("/abc", block).await.unwrap();
489
490        let err = obj_store.metadata("/not_exist").await.unwrap_err();
491        assert!(err.is_object_not_found_error());
492        let metadata = obj_store.metadata("/abc").await.unwrap();
493        assert_eq!(metadata.total_size, 6);
494        obj_store.delete(&path).await.unwrap();
495    }
496
497    #[tokio::test]
498    async fn test_memory_delete_objects_and_list_object() {
499        let block1 = Bytes::from("123456");
500        let block2 = Bytes::from("987654");
501        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
502        store.upload("abc", Bytes::from("123456")).await.unwrap();
503        store.upload("prefix/abc", block1).await.unwrap();
504
505        store.upload("prefix/xyz", block2).await.unwrap();
506
507        assert_eq!(list_all("", &store).await.len(), 3);
508        assert_eq!(list_all("prefix/", &store).await.len(), 2);
509        let str_list = [String::from("prefix/abc"), String::from("prefix/xyz")];
510
511        store.delete_objects(&str_list).await.unwrap();
512
513        assert!(store.read("prefix/abc/", ..).await.is_err());
514        assert!(store.read("prefix/xyz/", ..).await.is_err());
515        assert_eq!(list_all("", &store).await.len(), 1);
516        assert_eq!(list_all("prefix/", &store).await.len(), 0);
517    }
518}