risingwave_object_store/object/opendal_engine/
opendal_object_store.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17use std::time::Duration;
18
19use bytes::Bytes;
20use fail::fail_point;
21use futures::{StreamExt, stream};
22use opendal::layers::{RetryLayer, TimeoutLayer};
23use opendal::raw::BoxedStaticFuture;
24use opendal::services::Memory;
25use opendal::{Execute, Executor, Operator, Writer};
26use risingwave_common::config::ObjectStoreConfig;
27use risingwave_common::range::RangeBoundsExt;
28use thiserror_ext::AsReport;
29
30use crate::object::object_metrics::ObjectStoreMetrics;
31use crate::object::{
32    ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds,
33    ObjectResult, ObjectStore, OperationType, StreamingUploader, prefix,
34};
35
36/// Opendal object storage.
37#[derive(Clone)]
38pub struct OpendalObjectStore {
39    pub(crate) op: Operator,
40    pub(crate) media_type: MediaType,
41    pub(crate) config: Arc<ObjectStoreConfig>,
42    pub(crate) metrics: Arc<ObjectStoreMetrics>,
43}
44
45#[derive(Clone)]
46pub enum MediaType {
47    Memory,
48    Hdfs,
49    Gcs,
50    Minio,
51    S3,
52    Obs,
53    Oss,
54    Webhdfs,
55    Azblob,
56    Fs,
57}
58
59impl MediaType {
60    pub fn as_str(&self) -> &'static str {
61        match self {
62            MediaType::Memory => "Memory",
63            MediaType::Hdfs => "Hdfs",
64            MediaType::Gcs => "Gcs",
65            MediaType::Minio => "Minio",
66            MediaType::S3 => "S3",
67            MediaType::Obs => "Obs",
68            MediaType::Oss => "Oss",
69            MediaType::Webhdfs => "Webhdfs",
70            MediaType::Azblob => "Azblob",
71            MediaType::Fs => "Fs",
72        }
73    }
74}
75
76impl OpendalObjectStore {
77    /// create opendal memory engine, used for unit tests.
78    pub fn test_new_memory_engine() -> ObjectResult<Self> {
79        // Create memory backend builder.
80        let builder = Memory::default();
81        let op: Operator = Operator::new(builder)?.finish();
82
83        Ok(Self {
84            op,
85            media_type: MediaType::Memory,
86            config: Arc::new(ObjectStoreConfig::default()),
87            metrics: Arc::new(ObjectStoreMetrics::unused()),
88        })
89    }
90}
91
92#[inline]
93fn timestamp_to_secs(t: opendal::raw::Timestamp) -> f64 {
94    t.into_inner().as_second() as f64
95}
96
97#[async_trait::async_trait]
98impl ObjectStore for OpendalObjectStore {
99    type StreamingUploader = OpendalStreamingUploader;
100
101    fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
102        match self.media_type {
103            MediaType::S3 => prefix::s3::get_object_prefix(obj_id),
104            MediaType::Minio => prefix::s3::get_object_prefix(obj_id),
105            MediaType::Memory => String::default(),
106            MediaType::Hdfs
107            | MediaType::Gcs
108            | MediaType::Obs
109            | MediaType::Oss
110            | MediaType::Webhdfs
111            | MediaType::Azblob
112            | MediaType::Fs => {
113                prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy)
114            }
115        }
116    }
117
118    async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
119        if obj.is_empty() {
120            Err(ObjectError::internal("upload empty object"))
121        } else {
122            self.op.write(path, obj).await?;
123            Ok(())
124        }
125    }
126
127    async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
128        Ok(OpendalStreamingUploader::new(
129            self.op.clone(),
130            path.to_owned(),
131            self.config.clone(),
132            self.metrics.clone(),
133            self.store_media_type(),
134        )
135        .await?)
136    }
137
138    async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
139        let data = if range.is_full() {
140            self.op.read(path).await?
141        } else {
142            self.op
143                .read_with(path)
144                .range(range.map(|v| *v as u64))
145                .await?
146        };
147
148        if let Some(len) = range.len()
149            && len != data.len()
150        {
151            return Err(ObjectError::internal(format!(
152                "mismatched size: expected {}, found {} when reading {} at {:?}",
153                len,
154                data.len(),
155                path,
156                range,
157            )));
158        }
159
160        Ok(data.to_bytes())
161    }
162
163    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
164    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
165    /// of data into memory that is read from the stream.
166    async fn streaming_read(
167        &self,
168        path: &str,
169        range: Range<usize>,
170    ) -> ObjectResult<ObjectDataStream> {
171        fail_point!("opendal_streaming_read_err", |_| Err(
172            ObjectError::internal("opendal streaming read error")
173        ));
174        let range: Range<u64> = (range.start as u64)..(range.end as u64);
175
176        // The layer specified first will be executed first.
177        // `TimeoutLayer` must be specified before `RetryLayer`.
178        // Otherwise, it will lead to bad state inside OpenDAL and panic.
179        // See https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html#panics
180        let reader = self
181            .op
182            .clone()
183            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
184                self.config.retry.streaming_read_attempt_timeout_ms,
185            )))
186            .layer(
187                RetryLayer::new()
188                    .with_min_delay(Duration::from_millis(
189                        self.config.retry.req_backoff_interval_ms,
190                    ))
191                    .with_max_delay(Duration::from_millis(
192                        self.config.retry.req_backoff_max_delay_ms,
193                    ))
194                    .with_max_times(self.config.retry.streaming_read_retry_attempts)
195                    .with_factor(self.config.retry.req_backoff_factor as f32)
196                    .with_jitter(),
197            )
198            .reader_with(path)
199            .await?;
200        let stream = reader.into_bytes_stream(range).await?.map(|item| {
201            item.map(|b| Bytes::copy_from_slice(b.as_ref()))
202                .map_err(|e| {
203                    ObjectError::internal(format!("reader into_stream fail {}", e.as_report()))
204                })
205        });
206
207        Ok(Box::pin(stream))
208    }
209
210    async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
211        let opendal_metadata = self.op.stat(path).await?;
212        let key = path.to_owned();
213        let last_modified = match opendal_metadata.last_modified() {
214            Some(t) => timestamp_to_secs(t),
215            None => 0_f64,
216        };
217
218        let total_size = opendal_metadata.content_length() as usize;
219        let metadata = ObjectMetadata {
220            key,
221            last_modified,
222            total_size,
223        };
224        Ok(metadata)
225    }
226
227    async fn delete(&self, path: &str) -> ObjectResult<()> {
228        self.op.delete(path).await?;
229        Ok(())
230    }
231
232    /// Deletes the objects with the given paths permanently from the storage. If an object
233    /// specified in the request is not found, it will be considered as successfully deleted.
234    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
235        self.op.delete_iter(paths.to_vec()).await?;
236        Ok(())
237    }
238
239    async fn list(
240        &self,
241        prefix: &str,
242        start_after: Option<String>,
243        limit: Option<usize>,
244    ) -> ObjectResult<ObjectMetadataIter> {
245        let mut object_lister = self.op.lister_with(prefix).recursive(true);
246        if let Some(start_after) = start_after {
247            object_lister = object_lister.start_after(&start_after);
248        }
249        let object_lister = object_lister.await?;
250
251        let op = self.op.clone();
252        let stream = stream::unfold(object_lister, move |mut object_lister| {
253            let op = op.clone();
254
255            async move {
256                match object_lister.next().await {
257                    Some(Ok(object)) => {
258                        let key = object.path().to_owned();
259
260                        // OpenDAL 0.55 removed list metadata capability flags and reports
261                        // unknown content length as 0. Use listed metadata first, and call
262                        // stat() if timestamp is missing or size is 0 to avoid treating
263                        // unknown sizes as real zero-byte objects.
264                        let meta = object.metadata();
265                        let mut last_modified = meta.last_modified().map(timestamp_to_secs);
266                        let mut total_size = meta.content_length() as usize;
267                        if last_modified.is_none() || total_size == 0 {
268                            let stat_meta = op.stat(&key).await.ok()?;
269                            last_modified = stat_meta.last_modified().map(timestamp_to_secs);
270                            total_size = stat_meta.content_length() as usize;
271                        }
272
273                        let metadata = ObjectMetadata {
274                            key,
275                            last_modified: last_modified.unwrap_or(0_f64),
276                            total_size,
277                        };
278                        Some((Ok(metadata), object_lister))
279                    }
280                    Some(Err(err)) => Some((Err(err.into()), object_lister)),
281                    None => None,
282                }
283            }
284        });
285
286        Ok(stream.take(limit.unwrap_or(usize::MAX)).boxed())
287    }
288
289    fn store_media_type(&self) -> &'static str {
290        self.media_type.as_str()
291    }
292
293    fn support_streaming_upload(&self) -> bool {
294        self.op.info().native_capability().write_can_multi
295    }
296}
297
298impl OpendalObjectStore {
299    pub async fn copy(&self, from_path: &str, to_path: &str) -> ObjectResult<()> {
300        self.op.copy(from_path, to_path).await?;
301        Ok(())
302    }
303}
304
305struct OpendalStreamingUploaderExecute {
306    /// To record metrics for uploading part.
307    metrics: Arc<ObjectStoreMetrics>,
308    media_type: &'static str,
309}
310
311impl OpendalStreamingUploaderExecute {
312    const STREAMING_UPLOAD_TYPE: OperationType = OperationType::StreamingUpload;
313
314    fn new(metrics: Arc<ObjectStoreMetrics>, media_type: &'static str) -> Self {
315        Self {
316            metrics,
317            media_type,
318        }
319    }
320}
321
322impl Execute for OpendalStreamingUploaderExecute {
323    fn execute(&self, f: BoxedStaticFuture<()>) {
324        let operation_type_str = Self::STREAMING_UPLOAD_TYPE.as_str();
325        let media_type = self.media_type;
326
327        let metrics = self.metrics.clone();
328        let _handle = tokio::spawn(async move {
329            let _timer = metrics
330                .operation_latency
331                .with_label_values(&[media_type, operation_type_str])
332                .start_timer();
333
334            f.await
335        });
336    }
337}
338
339/// Store multiple parts in a map, and concatenate them on finish.
340pub struct OpendalStreamingUploader {
341    writer: Writer,
342    /// Buffer for data. It will store at least `UPLOAD_BUFFER_SIZE` bytes of data before wrapping itself
343    /// into a stream and upload to object store as a part.
344    buf: Vec<Bytes>,
345    /// Length of the data that have not been uploaded to object store.
346    not_uploaded_len: usize,
347    /// Whether the writer is valid. The writer is invalid after abort/close.
348    is_valid: bool,
349
350    abort_on_err: bool,
351
352    upload_part_size: usize,
353}
354
355impl OpendalStreamingUploader {
356    pub async fn new(
357        op: Operator,
358        path: String,
359        config: Arc<ObjectStoreConfig>,
360        metrics: Arc<ObjectStoreMetrics>,
361        media_type: &'static str,
362    ) -> ObjectResult<Self> {
363        let monitored_execute = OpendalStreamingUploaderExecute::new(metrics, media_type);
364        let executor = Executor::with(monitored_execute);
365        op.update_executor(|_| executor);
366
367        let writer = op
368            .clone()
369            .layer(TimeoutLayer::new().with_io_timeout(Duration::from_millis(
370                config.retry.streaming_upload_attempt_timeout_ms,
371            )))
372            .layer(
373                RetryLayer::new()
374                    .with_min_delay(Duration::from_millis(config.retry.req_backoff_interval_ms))
375                    .with_max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
376                    .with_max_times(config.retry.streaming_upload_retry_attempts)
377                    .with_factor(config.retry.req_backoff_factor as f32)
378                    .with_jitter(),
379            )
380            .writer_with(&path)
381            .concurrent(config.opendal_upload_concurrency)
382            .await?;
383        Ok(Self {
384            writer,
385            buf: vec![],
386            not_uploaded_len: 0,
387            is_valid: true,
388            abort_on_err: config.opendal_writer_abort_on_err,
389            upload_part_size: config.upload_part_size,
390        })
391    }
392
393    async fn flush(&mut self) -> ObjectResult<()> {
394        let data: Vec<Bytes> = self.buf.drain(..).collect();
395        debug_assert_eq!(
396            data.iter().map(|b| b.len()).sum::<usize>(),
397            self.not_uploaded_len
398        );
399        if let Err(err) = self.writer.write(data).await {
400            self.is_valid = false;
401            if self.abort_on_err {
402                self.writer.abort().await?;
403            }
404            return Err(err.into());
405        }
406        self.not_uploaded_len = 0;
407        Ok(())
408    }
409}
410
411impl StreamingUploader for OpendalStreamingUploader {
412    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
413        assert!(self.is_valid);
414        self.not_uploaded_len += data.len();
415        self.buf.push(data);
416        if self.not_uploaded_len >= self.upload_part_size {
417            self.flush().await?;
418        }
419        Ok(())
420    }
421
422    async fn finish(mut self) -> ObjectResult<()> {
423        assert!(self.is_valid);
424        if self.not_uploaded_len > 0 {
425            self.flush().await?;
426        }
427
428        assert!(self.buf.is_empty());
429        assert_eq!(self.not_uploaded_len, 0);
430
431        self.is_valid = false;
432        match self.writer.close().await {
433            Ok(_) => (),
434            Err(err) => {
435                if self.abort_on_err {
436                    self.writer.abort().await?;
437                }
438                return Err(err.into());
439            }
440        };
441
442        Ok(())
443    }
444
445    // Not absolutely accurate. Some bytes may be in the infight request.
446    fn get_memory_usage(&self) -> u64 {
447        self.not_uploaded_len as u64
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use stream::TryStreamExt;
454
455    use super::*;
456
457    async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec<ObjectMetadata> {
458        store
459            .list(prefix, None, None)
460            .await
461            .unwrap()
462            .try_collect::<Vec<_>>()
463            .await
464            .unwrap()
465    }
466
467    #[tokio::test]
468    async fn test_memory_upload() {
469        let block = Bytes::from("123456");
470        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
471        store.upload("/abc", block).await.unwrap();
472
473        // No such object.
474        store.read("/ab", 0..3).await.unwrap_err();
475
476        let bytes = store.read("/abc", 4..6).await.unwrap();
477        assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_owned());
478
479        store.delete("/abc").await.unwrap();
480
481        // No such object.
482        store.read("/abc", 0..3).await.unwrap_err();
483    }
484
485    #[tokio::test]
486    #[should_panic]
487    async fn test_memory_read_overflow() {
488        let block = Bytes::from("123456");
489        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
490        store.upload("/abc", block).await.unwrap();
491
492        // Overflow.
493        store.read("/abc", 4..44).await.unwrap_err();
494    }
495
496    #[tokio::test]
497    async fn test_memory_metadata() {
498        let block = Bytes::from("123456");
499        let path = "/abc".to_owned();
500        let obj_store = OpendalObjectStore::test_new_memory_engine().unwrap();
501        obj_store.upload("/abc", block).await.unwrap();
502
503        let err = obj_store.metadata("/not_exist").await.unwrap_err();
504        assert!(err.is_object_not_found_error());
505        let metadata = obj_store.metadata("/abc").await.unwrap();
506        assert_eq!(metadata.total_size, 6);
507        obj_store.delete(&path).await.unwrap();
508    }
509
510    #[tokio::test]
511    async fn test_memory_delete_objects_and_list_object() {
512        let block1 = Bytes::from("123456");
513        let block2 = Bytes::from("987654");
514        let store = OpendalObjectStore::test_new_memory_engine().unwrap();
515        store.upload("abc", Bytes::from("123456")).await.unwrap();
516        store.upload("prefix/abc", block1).await.unwrap();
517
518        store.upload("prefix/xyz", block2).await.unwrap();
519
520        assert_eq!(list_all("", &store).await.len(), 3);
521        assert_eq!(list_all("prefix/", &store).await.len(), 2);
522        let str_list = [String::from("prefix/abc"), String::from("prefix/xyz")];
523
524        store.delete_objects(&str_list).await.unwrap();
525
526        assert!(store.read("prefix/abc/", ..).await.is_err());
527        assert!(store.read("prefix/xyz/", ..).await.is_err());
528        assert_eq!(list_all("", &store).await.len(), 1);
529        assert_eq!(list_all("prefix/", &store).await.len(), 0);
530    }
531}