risingwave_object_store/object/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(
16    unexpected_cfgs,
17    reason = "feature(hdfs-backend) is banned https://github.com/risingwavelabs/risingwave/pull/7875"
18)]
19
20pub mod sim;
21use std::ops::{Range, RangeBounds};
22use std::sync::Arc;
23use std::time::Duration;
24
25use bytes::Bytes;
26
27pub mod mem;
28pub use mem::*;
29
30pub mod opendal_engine;
31pub use opendal_engine::*;
32
33pub mod s3;
34use await_tree::{InstrumentAwait, SpanExt};
35use futures::stream::BoxStream;
36use futures::{Future, StreamExt};
37pub use risingwave_common::config::ObjectStoreConfig;
38pub use s3::*;
39
40pub mod error;
41pub mod object_metrics;
42
43pub mod prefix;
44
45pub use error::*;
46use object_metrics::ObjectStoreMetrics;
47use thiserror_ext::AsReport;
48use tokio_retry::strategy::{ExponentialBackoff, jitter};
49
50#[cfg(madsim)]
51use self::sim::SimObjectStore;
52
53pub type ObjectStoreRef = Arc<ObjectStoreImpl>;
54pub type ObjectStreamingUploader = StreamingUploaderImpl;
55
56pub trait ObjectRangeBounds = RangeBounds<usize> + Clone + Send + Sync + std::fmt::Debug + 'static;
57
58#[derive(Debug, Clone, PartialEq)]
59pub struct ObjectMetadata {
60    // Full path
61    pub key: String,
62    // Seconds since unix epoch.
63    pub last_modified: f64,
64    pub total_size: usize,
65}
66
67pub trait StreamingUploader: Send {
68    #[expect(async_fn_in_trait)]
69    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()>;
70
71    #[expect(async_fn_in_trait)]
72    async fn finish(self) -> ObjectResult<()>;
73
74    fn get_memory_usage(&self) -> u64;
75}
76
77/// The implementation must be thread-safe.
78#[async_trait::async_trait]
79pub trait ObjectStore: Send + Sync {
80    type StreamingUploader: StreamingUploader;
81    /// Get the key prefix for object, the prefix is determined by the type of object store and `devise_object_prefix`.
82    fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String;
83
84    /// Uploads the object to `ObjectStore`.
85    async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>;
86
87    async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader>;
88
89    /// If objects are PUT using a multipart upload, it's a good practice to GET them in the same
90    /// part sizes (or at least aligned to part boundaries) for best performance.
91    /// <https://d1.awsstatic.com/whitepapers/AmazonS3BestPractices.pdf?stod_obj2>
92    async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes>;
93
94    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
95    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
96    /// of data into memory that is read from the stream.
97    async fn streaming_read(
98        &self,
99        path: &str,
100        read_range: Range<usize>,
101    ) -> ObjectResult<ObjectDataStream>;
102
103    /// Obtains the object metadata.
104    async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata>;
105
106    /// Deletes blob permanently.
107    async fn delete(&self, path: &str) -> ObjectResult<()>;
108
109    /// Deletes the objects with the given paths permanently from the storage. If an object
110    /// specified in the request is not found, it will be considered as successfully deleted.
111    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>;
112
113    fn monitored(
114        self,
115        metrics: Arc<ObjectStoreMetrics>,
116        config: Arc<ObjectStoreConfig>,
117    ) -> MonitoredObjectStore<Self>
118    where
119        Self: Sized,
120    {
121        MonitoredObjectStore::new(self, metrics, config)
122    }
123
124    async fn list(
125        &self,
126        prefix: &str,
127        start_after: Option<String>,
128        limit: Option<usize>,
129    ) -> ObjectResult<ObjectMetadataIter>;
130
131    fn store_media_type(&self) -> &'static str;
132
133    fn support_streaming_upload(&self) -> bool {
134        true
135    }
136}
137
138#[cfg(not(madsim))]
139macro_rules! for_all_object_store {
140    ($macro:ident $($args:tt)*) => {
141        $macro! {
142            {
143                { InMem, InMemObjectStore },
144                { Opendal, OpendalObjectStore },
145                { S3, S3ObjectStore }
146            }
147            $($args)*
148        }
149    }
150}
151
152#[cfg(madsim)]
153macro_rules! for_all_object_store {
154    ($macro:ident $($args:tt)*) => {
155        $macro! {
156            {
157                { InMem, InMemObjectStore },
158                { Opendal, OpendalObjectStore },
159                { S3, S3ObjectStore },
160                { Sim, SimObjectStore }
161            }
162            $($args)*
163        }
164    }
165}
166
167macro_rules! enum_map {
168    (
169        {
170            $(
171                {$variant:ident, $_type_name:ty}
172            ),*
173        },
174        $object_store:expr,
175        $var_name:ident,
176        $func:expr
177    ) => {
178        match $object_store {
179            $(
180                ObjectStoreEnum::$variant($var_name) => ObjectStoreEnum::$variant({
181                    $func
182                }),
183            )*
184        }
185    };
186    ($object_store:expr, |$var_name:ident| $func:expr) => {
187        for_all_object_store! {
188            enum_map, $object_store, $var_name, $func
189        }
190    };
191}
192
193macro_rules! dispatch_object_store_enum {
194    (
195        {
196            $(
197                {$variant:ident, $_type_name:ty}
198            ),*
199        },
200        $object_store:expr,
201        $var_name:ident,
202        $func:expr
203    ) => {
204        match $object_store {
205            $(
206                ObjectStoreEnum::$variant($var_name) => {
207                    $func
208                },
209            )*
210        }
211    };
212    ($object_store:expr, |$var_name:ident| $func:expr) => {
213        for_all_object_store! {
214            dispatch_object_store_enum, $object_store, $var_name, $func
215        }
216    };
217}
218
219macro_rules! define_object_store_impl {
220    () => {
221        for_all_object_store! {
222            define_object_store_impl
223        }
224    };
225    (
226        {$(
227            {$variant:ident, $type_name:ty}
228        ),*}
229    ) => {
230        pub enum ObjectStoreEnum<
231            $($variant),*
232        > {
233            $(
234                $variant($variant),
235            )*
236        }
237
238        pub type ObjectStoreImpl = ObjectStoreEnum<
239            $(
240                MonitoredObjectStore<$type_name>,
241            )*
242        >;
243
244        pub type StreamingUploaderImpl = ObjectStoreEnum<
245            $(
246                MonitoredStreamingUploader<<$type_name as ObjectStore>::StreamingUploader>
247            ),*
248        >;
249    };
250}
251
252define_object_store_impl!();
253
254/// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl`
255/// enum type and the `path`.
256///
257/// Except for `InMem`,the operation should be performed on remote object store.
258macro_rules! object_store_impl_method_body {
259    // with await
260    ($object_store:expr, $method_name:ident ($($args:expr),*).await) => {
261        {
262            dispatch_object_store_enum! {$object_store, |os| {
263                os.$method_name($($args),*).await
264            }}
265        }
266    };
267    // no await
268    ($object_store:expr, $method_name:ident ($(, $args:expr)*)) => {
269        {
270            dispatch_object_store_enum! {$object_store, |os| {
271                os.$method_name($($args),*)
272            }}
273        }
274    };
275}
276
277impl StreamingUploaderImpl {
278    pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
279        object_store_impl_method_body!(self, write_bytes(data).await)
280    }
281
282    pub async fn finish(self) -> ObjectResult<()> {
283        object_store_impl_method_body!(self, finish().await)
284    }
285
286    pub fn get_memory_usage(&self) -> u64 {
287        object_store_impl_method_body!(self, get_memory_usage())
288    }
289}
290
291impl ObjectStoreImpl {
292    pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
293        object_store_impl_method_body!(self, upload(path, obj).await)
294    }
295
296    pub async fn streaming_upload(&self, path: &str) -> ObjectResult<ObjectStreamingUploader> {
297        Ok(enum_map!(self, |store| {
298            store.streaming_upload(path).await?
299        }))
300    }
301
302    pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
303        object_store_impl_method_body!(self, read(path, range).await)
304    }
305
306    pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
307        object_store_impl_method_body!(self, metadata(path).await)
308    }
309
310    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
311    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
312    /// of data into memory that is read from the stream.
313    pub async fn streaming_read(
314        &self,
315        path: &str,
316        start_loc: Range<usize>,
317    ) -> ObjectResult<MonitoredStreamingReader> {
318        object_store_impl_method_body!(self, streaming_read(path, start_loc).await)
319    }
320
321    pub async fn delete(&self, path: &str) -> ObjectResult<()> {
322        object_store_impl_method_body!(self, delete(path).await)
323    }
324
325    /// Deletes the objects with the given paths permanently from the storage. If an object
326    /// specified in the request is not found, it will be considered as successfully deleted.
327    ///
328    /// If a hybrid storage is used, the method will first attempt to delete objects in local
329    /// storage. Only if that is successful, it will remove objects from remote storage.
330    pub async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
331        object_store_impl_method_body!(self, delete_objects(paths).await)
332    }
333
334    pub async fn list(
335        &self,
336        prefix: &str,
337        start_after: Option<String>,
338        limit: Option<usize>,
339    ) -> ObjectResult<ObjectMetadataIter> {
340        object_store_impl_method_body!(self, list(prefix, start_after, limit).await)
341    }
342
343    pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
344        dispatch_object_store_enum!(self, |store| store
345            .inner
346            .get_object_prefix(obj_id, use_new_object_prefix_strategy))
347    }
348
349    pub fn support_streaming_upload(&self) -> bool {
350        dispatch_object_store_enum!(self, |store| store.inner.support_streaming_upload())
351    }
352
353    pub fn media_type(&self) -> &'static str {
354        object_store_impl_method_body!(self, media_type())
355    }
356}
357
358fn try_update_failure_metric<T>(
359    metrics: &Arc<ObjectStoreMetrics>,
360    result: &ObjectResult<T>,
361    operation_type: &'static str,
362) {
363    if let Err(e) = &result {
364        tracing::error!(error = %e.as_report(), "{} failed", operation_type);
365        metrics
366            .failure_count
367            .with_label_values(&[operation_type])
368            .inc();
369    }
370}
371
372/// `MonitoredStreamingUploader` will report the following metrics.
373/// - `write_bytes`: The number of bytes uploaded from the uploader's creation to finish.
374/// - `operation_size`:
375///   - `streaming_upload_write_bytes`: The number of bytes written for each call to `write_bytes`.
376///   - `streaming_upload`: Same as `write_bytes`.
377/// - `operation_latency`:
378///   - `streaming_upload_start`: The time spent creating the uploader.
379///   - `streaming_upload_write_bytes`: The time spent on each call to `write_bytes`.
380///   - `streaming_upload_finish`: The time spent calling `finish`.
381/// - `failure_count`: `streaming_upload_start`, `streaming_upload_write_bytes`,
382///   `streaming_upload_finish`
383pub struct MonitoredStreamingUploader<U: StreamingUploader> {
384    inner: U,
385    object_store_metrics: Arc<ObjectStoreMetrics>,
386    /// Length of data uploaded with this uploader.
387    operation_size: usize,
388}
389
390impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
391    pub fn new(handle: U, object_store_metrics: Arc<ObjectStoreMetrics>) -> Self {
392        Self {
393            inner: handle,
394            object_store_metrics,
395            operation_size: 0,
396        }
397    }
398}
399
400/// NOTICE: after #16231, streaming uploader implemented via aws-sdk-s3 will maintain metrics internally in s3.rs
401/// so `MonitoredStreamingUploader` will only be used when the inner object store is opendal.
402impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
403    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
404        let operation_type = OperationType::StreamingUpload;
405        let operation_type_str = operation_type.as_str();
406        let data_len = data.len();
407
408        let res = self
409            .inner
410            .write_bytes(data)
411            .instrument_await(operation_type_str.verbose())
412            .await;
413
414        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
415
416        // duration metrics is collected and reported inside the specific implementation of the streaming uploader.
417        self.object_store_metrics
418            .write_bytes
419            .inc_by(data_len as u64);
420        self.object_store_metrics
421            .operation_size
422            .with_label_values(&[operation_type_str])
423            .observe(data_len as f64);
424        self.operation_size += data_len;
425
426        res
427    }
428
429    async fn finish(self) -> ObjectResult<()> {
430        let operation_type = OperationType::StreamingUploadFinish;
431        let operation_type_str = operation_type.as_str();
432
433        let res =
434            // TODO: we should avoid this special case after fully migrating to opeandal for s3.
435            self.inner
436                .finish()
437                .instrument_await(operation_type_str.verbose())
438                .await;
439
440        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
441
442        // duration metrics is collected and reported inside the specific implementation of the streaming uploader.
443        self.object_store_metrics
444            .operation_size
445            .with_label_values(&[operation_type_str])
446            .observe(self.operation_size as f64);
447        res
448    }
449
450    fn get_memory_usage(&self) -> u64 {
451        self.inner.get_memory_usage()
452    }
453}
454
455pub struct MonitoredStreamingReader {
456    inner: ObjectDataStream,
457    object_store_metrics: Arc<ObjectStoreMetrics>,
458    operation_size: usize,
459    media_type: &'static str,
460    streaming_read_timeout: Option<Duration>,
461    operation_type_str: &'static str,
462}
463
464impl MonitoredStreamingReader {
465    pub fn new(
466        media_type: &'static str,
467        handle: ObjectDataStream,
468        object_store_metrics: Arc<ObjectStoreMetrics>,
469        streaming_read_timeout: Option<Duration>,
470    ) -> Self {
471        Self {
472            inner: handle,
473            object_store_metrics,
474            operation_size: 0,
475            media_type,
476            streaming_read_timeout,
477            operation_type_str: OperationType::StreamingRead.as_str(),
478        }
479    }
480
481    pub async fn read_bytes(&mut self) -> Option<ObjectResult<Bytes>> {
482        let _timer = self
483            .object_store_metrics
484            .operation_latency
485            .with_label_values(&[self.media_type, self.operation_type_str])
486            .start_timer();
487        let future = async {
488            self.inner
489                .next()
490                .instrument_await(self.operation_type_str.verbose())
491                .await
492        };
493        let res = match self.streaming_read_timeout.as_ref() {
494            None => future.await,
495            Some(timeout_duration) => tokio::time::timeout(*timeout_duration, future)
496                .await
497                .unwrap_or_else(|_| {
498                    Some(Err(ObjectError::timeout(format!(
499                        "Retry attempts exhausted for {}. Please modify {}_attempt_timeout_ms (current={:?}) under [storage.object_store.retry] in the config accordingly if needed.",
500                        self.operation_type_str, self.operation_type_str, timeout_duration.as_millis()
501                    ))))
502                }),
503        };
504
505        if let Some(ret) = &res {
506            try_update_failure_metric(&self.object_store_metrics, ret, self.operation_type_str);
507        }
508        if let Some(Ok(data)) = &res {
509            let data_len = data.len();
510            self.object_store_metrics.read_bytes.inc_by(data_len as u64);
511            self.object_store_metrics
512                .operation_size
513                .with_label_values(&[self.operation_type_str])
514                .observe(data_len as f64);
515            self.operation_size += data_len;
516        }
517        res
518    }
519}
520
521impl Drop for MonitoredStreamingReader {
522    fn drop(&mut self) {
523        self.object_store_metrics
524            .operation_size
525            .with_label_values(&[self.operation_type_str])
526            .observe(self.operation_size as f64);
527    }
528}
529
530pub struct MonitoredObjectStore<OS: ObjectStore> {
531    inner: OS,
532    object_store_metrics: Arc<ObjectStoreMetrics>,
533    config: Arc<ObjectStoreConfig>,
534}
535
536/// Manually dispatch trait methods.
537///
538/// The metrics are updated in the following order:
539/// - Write operations
540///   - `write_bytes`
541///   - `operation_size`
542///   - start `operation_latency` timer
543///   - `failure_count`
544/// - Read operations
545///   - start `operation_latency` timer
546///   - `failure-count`
547///   - `read_bytes`
548///   - `operation_size`
549/// - Other
550///   - start `operation_latency` timer
551///   - `failure-count`
552impl<OS: ObjectStore> MonitoredObjectStore<OS> {
553    pub fn new(
554        store: OS,
555        object_store_metrics: Arc<ObjectStoreMetrics>,
556        config: Arc<ObjectStoreConfig>,
557    ) -> Self {
558        Self {
559            object_store_metrics,
560            inner: store,
561            config,
562        }
563    }
564
565    fn media_type(&self) -> &'static str {
566        self.inner.store_media_type()
567    }
568
569    pub fn inner(&self) -> &OS {
570        &self.inner
571    }
572
573    pub fn mut_inner(&mut self) -> &mut OS {
574        &mut self.inner
575    }
576
577    pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
578        let operation_type = OperationType::Upload;
579        let operation_type_str = operation_type.as_str();
580        let media_type = self.media_type();
581
582        self.object_store_metrics
583            .write_bytes
584            .inc_by(obj.len() as u64);
585        self.object_store_metrics
586            .operation_size
587            .with_label_values(&[operation_type_str])
588            .observe(obj.len() as f64);
589        let _timer = self
590            .object_store_metrics
591            .operation_latency
592            .with_label_values(&[media_type, operation_type_str])
593            .start_timer();
594
595        let builder = || async {
596            self.inner
597                .upload(path, obj.clone())
598                .instrument_await(operation_type_str.verbose())
599                .await
600        };
601
602        let res = retry_request(
603            builder,
604            &self.config,
605            operation_type,
606            self.object_store_metrics.clone(),
607            media_type,
608        )
609        .await;
610
611        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
612        res
613    }
614
615    pub async fn streaming_upload(
616        &self,
617        path: &str,
618    ) -> ObjectResult<MonitoredStreamingUploader<OS::StreamingUploader>> {
619        let operation_type = OperationType::StreamingUploadInit;
620        let operation_type_str = operation_type.as_str();
621        let media_type = self.media_type();
622        let _timer = self
623            .object_store_metrics
624            .operation_latency
625            .with_label_values(&[media_type, operation_type_str])
626            .start_timer();
627
628        let res = self
629            .inner
630            .streaming_upload(path)
631            .instrument_await(operation_type_str.verbose())
632            .await;
633
634        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
635
636        Ok(MonitoredStreamingUploader::new(
637            res?,
638            self.object_store_metrics.clone(),
639        ))
640    }
641
642    pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
643        let operation_type = OperationType::Read;
644        let operation_type_str = operation_type.as_str();
645        let media_type = self.media_type();
646
647        let _timer = self
648            .object_store_metrics
649            .operation_latency
650            .with_label_values(&[media_type, operation_type_str])
651            .start_timer();
652
653        let builder = || async {
654            self.inner
655                .read(path, range.clone())
656                .instrument_await(operation_type_str.verbose())
657                .await
658        };
659
660        let res = retry_request(
661            builder,
662            &self.config,
663            operation_type,
664            self.object_store_metrics.clone(),
665            media_type,
666        )
667        .await;
668
669        if let Err(e) = &res
670            && e.is_object_not_found_error()
671            && !path.ends_with(".data")
672        {
673            // Some not_found_error is expected, e.g. metadata backup's manifest.json.
674            // This is a quick fix that'll only log error in `try_update_failure_metric` in state store usage.
675        } else {
676            try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
677        }
678
679        let data = res?;
680        self.object_store_metrics
681            .read_bytes
682            .inc_by(data.len() as u64);
683        self.object_store_metrics
684            .operation_size
685            .with_label_values(&[operation_type_str])
686            .observe(data.len() as f64);
687        Ok(data)
688    }
689
690    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
691    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
692    /// of data into memory that is read from the stream.
693    async fn streaming_read(
694        &self,
695        path: &str,
696        range: Range<usize>,
697    ) -> ObjectResult<MonitoredStreamingReader> {
698        let operation_type = OperationType::StreamingReadInit;
699        let operation_type_str = operation_type.as_str();
700        let media_type = self.media_type();
701        let _timer = self
702            .object_store_metrics
703            .operation_latency
704            .with_label_values(&[media_type, operation_type_str])
705            .start_timer();
706
707        let builder = || async {
708            self.inner
709                .streaming_read(path, range.clone())
710                .instrument_await(operation_type_str.verbose())
711                .await
712        };
713
714        let res = retry_request(
715            builder,
716            &self.config,
717            operation_type,
718            self.object_store_metrics.clone(),
719            media_type,
720        )
721        .await;
722
723        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
724
725        Ok(MonitoredStreamingReader::new(
726            media_type,
727            res?,
728            self.object_store_metrics.clone(),
729            Some(Duration::from_millis(
730                self.config.retry.streaming_read_attempt_timeout_ms,
731            )),
732        ))
733    }
734
735    pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
736        let operation_type = OperationType::Metadata;
737        let operation_type_str = operation_type.as_str();
738        let media_type = self.media_type();
739        let _timer = self
740            .object_store_metrics
741            .operation_latency
742            .with_label_values(&[media_type, operation_type_str])
743            .start_timer();
744
745        let builder = || async {
746            self.inner
747                .metadata(path)
748                .instrument_await(operation_type_str.verbose())
749                .await
750        };
751
752        let res = retry_request(
753            builder,
754            &self.config,
755            operation_type,
756            self.object_store_metrics.clone(),
757            media_type,
758        )
759        .await;
760
761        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
762        res
763    }
764
765    pub async fn delete(&self, path: &str) -> ObjectResult<()> {
766        let operation_type = OperationType::Delete;
767        let operation_type_str = operation_type.as_str();
768        let media_type = self.media_type();
769
770        let _timer = self
771            .object_store_metrics
772            .operation_latency
773            .with_label_values(&[media_type, operation_type_str])
774            .start_timer();
775
776        let builder = || async {
777            self.inner
778                .delete(path)
779                .instrument_await(operation_type_str.verbose())
780                .await
781        };
782
783        let res = retry_request(
784            builder,
785            &self.config,
786            operation_type,
787            self.object_store_metrics.clone(),
788            media_type,
789        )
790        .await;
791
792        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
793        res
794    }
795
796    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
797        let operation_type = OperationType::DeleteObjects;
798        let operation_type_str = operation_type.as_str();
799        let media_type = self.media_type();
800
801        let _timer = self
802            .object_store_metrics
803            .operation_latency
804            .with_label_values(&[self.media_type(), operation_type_str])
805            .start_timer();
806
807        let builder = || async {
808            self.inner
809                .delete_objects(paths)
810                .instrument_await(operation_type_str.verbose())
811                .await
812        };
813
814        let res = retry_request(
815            builder,
816            &self.config,
817            operation_type,
818            self.object_store_metrics.clone(),
819            media_type,
820        )
821        .await;
822
823        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
824        res
825    }
826
827    pub async fn list(
828        &self,
829        prefix: &str,
830        start_after: Option<String>,
831        limit: Option<usize>,
832    ) -> ObjectResult<ObjectMetadataIter> {
833        let operation_type = OperationType::List;
834        let operation_type_str = operation_type.as_str();
835        let media_type = self.media_type();
836
837        let _timer = self
838            .object_store_metrics
839            .operation_latency
840            .with_label_values(&[media_type, operation_type_str])
841            .start_timer();
842
843        let builder = || async {
844            self.inner
845                .list(prefix, start_after.clone(), limit)
846                .instrument_await(operation_type_str.verbose())
847                .await
848        };
849
850        let res = retry_request(
851            builder,
852            &self.config,
853            operation_type,
854            self.object_store_metrics.clone(),
855            media_type,
856        )
857        .await;
858
859        try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
860        res
861    }
862}
863
864/// Creates a new [`ObjectStore`] from the given `url`. Credentials are configured via environment
865/// variables.
866///
867/// # Panics
868/// If the `url` is invalid. Therefore, it is only suitable to be used during startup.
869pub async fn build_remote_object_store(
870    url: &str,
871    metrics: Arc<ObjectStoreMetrics>,
872    ident: &str,
873    config: Arc<ObjectStoreConfig>,
874) -> ObjectStoreImpl {
875    tracing::debug!(config=?config, "object store {ident}");
876    match url {
877        s3 if s3.starts_with("s3://") => {
878            if config.s3.developer.use_opendal {
879                let bucket = s3.strip_prefix("s3://").unwrap();
880                tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket);
881                ObjectStoreImpl::Opendal(
882                    OpendalObjectStore::new_s3_engine(
883                        bucket.to_owned(),
884                        config.clone(),
885                        metrics.clone(),
886                    )
887                    .unwrap()
888                    .monitored(metrics, config),
889                )
890            } else {
891                ObjectStoreImpl::S3(
892                    S3ObjectStore::new_with_config(
893                        s3.strip_prefix("s3://").unwrap().to_owned(),
894                        metrics.clone(),
895                        config.clone(),
896                    )
897                    .await
898                    .monitored(metrics, config),
899                )
900            }
901        }
902        #[cfg(feature = "hdfs-backend")]
903        hdfs if hdfs.starts_with("hdfs://") => {
904            let hdfs = hdfs.strip_prefix("hdfs://").unwrap();
905            let (namenode, root) = hdfs.split_once('@').unwrap_or((hdfs, ""));
906            ObjectStoreImpl::Opendal(
907                OpendalObjectStore::new_hdfs_engine(
908                    namenode.to_string(),
909                    root.to_string(),
910                    config.clone(),
911                    metrics.clone(),
912                )
913                .unwrap()
914                .monitored(metrics, config),
915            )
916        }
917        gcs if gcs.starts_with("gcs://") => {
918            let gcs = gcs.strip_prefix("gcs://").unwrap();
919            let (bucket, root) = gcs.split_once('@').unwrap_or((gcs, ""));
920            ObjectStoreImpl::Opendal(
921                OpendalObjectStore::new_gcs_engine(
922                    bucket.to_owned(),
923                    root.to_owned(),
924                    config.clone(),
925                    metrics.clone(),
926                )
927                .unwrap()
928                .monitored(metrics, config),
929            )
930        }
931        obs if obs.starts_with("obs://") => {
932            let obs = obs.strip_prefix("obs://").unwrap();
933            let (bucket, root) = obs.split_once('@').unwrap_or((obs, ""));
934            ObjectStoreImpl::Opendal(
935                OpendalObjectStore::new_obs_engine(
936                    bucket.to_owned(),
937                    root.to_owned(),
938                    config.clone(),
939                    metrics.clone(),
940                )
941                .unwrap()
942                .monitored(metrics, config),
943            )
944        }
945
946        oss if oss.starts_with("oss://") => {
947            let oss = oss.strip_prefix("oss://").unwrap();
948            let (bucket, root) = oss.split_once('@').unwrap_or((oss, ""));
949            ObjectStoreImpl::Opendal(
950                OpendalObjectStore::new_oss_engine(
951                    bucket.to_owned(),
952                    root.to_owned(),
953                    config.clone(),
954                    metrics.clone(),
955                )
956                .unwrap()
957                .monitored(metrics, config),
958            )
959        }
960        webhdfs if webhdfs.starts_with("webhdfs://") => {
961            let webhdfs = webhdfs.strip_prefix("webhdfs://").unwrap();
962            let (namenode, root) = webhdfs.split_once('@').unwrap_or((webhdfs, ""));
963            ObjectStoreImpl::Opendal(
964                OpendalObjectStore::new_webhdfs_engine(
965                    namenode.to_owned(),
966                    root.to_owned(),
967                    config.clone(),
968                    metrics.clone(),
969                )
970                .unwrap()
971                .monitored(metrics, config),
972            )
973        }
974        azblob if azblob.starts_with("azblob://") => {
975            let azblob = azblob.strip_prefix("azblob://").unwrap();
976            let (container_name, root) = azblob.split_once('@').unwrap_or((azblob, ""));
977            ObjectStoreImpl::Opendal(
978                OpendalObjectStore::new_azblob_engine(
979                    container_name.to_owned(),
980                    root.to_owned(),
981                    config.clone(),
982                    metrics.clone(),
983                )
984                .unwrap()
985                .monitored(metrics, config),
986            )
987        }
988        fs if fs.starts_with("fs://") => {
989            let fs = fs.strip_prefix("fs://").unwrap();
990            ObjectStoreImpl::Opendal(
991                OpendalObjectStore::new_fs_engine(fs.to_owned(), config.clone(), metrics.clone())
992                    .unwrap()
993                    .monitored(metrics, config),
994            )
995        }
996
997        s3_compatible if s3_compatible.starts_with("s3-compatible://") => {
998            tracing::error!("The s3 compatible mode has been unified with s3.");
999            tracing::error!("If you want to use s3 compatible storage, please set your access_key, secret_key and region to the environment variable AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION,
1000            set your endpoint to the environment variable RW_S3_ENDPOINT.");
1001            panic!(
1002                "Passing s3-compatible is not supported, please modify the environment variable and pass in s3."
1003            );
1004        }
1005        minio if minio.starts_with("minio://") => {
1006            if config.s3.developer.use_opendal {
1007                tracing::info!("Using OpenDAL to access minio.");
1008                ObjectStoreImpl::Opendal(
1009                    OpendalObjectStore::new_minio_engine(minio, config.clone(), metrics.clone())
1010                        .unwrap()
1011                        .monitored(metrics, config),
1012                )
1013            } else {
1014                ObjectStoreImpl::S3(
1015                    S3ObjectStore::new_minio_engine(minio, metrics.clone(), config.clone())
1016                        .await
1017                        .monitored(metrics, config),
1018                )
1019            }
1020        }
1021        "memory" => {
1022            if ident == "Meta Backup" {
1023                tracing::warn!(
1024                    "You're using in-memory remote object store for {}. This is not recommended for production environment.",
1025                    ident
1026                );
1027            } else {
1028                tracing::warn!(
1029                    "You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.",
1030                    ident
1031                );
1032            }
1033            ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config))
1034        }
1035        "memory-shared" => {
1036            if ident == "Meta Backup" {
1037                tracing::warn!(
1038                    "You're using shared in-memory remote object store for {}. This should never be used in production environment.",
1039                    ident
1040                );
1041            } else {
1042                tracing::warn!(
1043                    "You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.",
1044                    ident
1045                );
1046            }
1047            ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config))
1048        }
1049        #[cfg(madsim)]
1050        sim if sim.starts_with("sim://") => {
1051            ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config))
1052        }
1053        other => {
1054            unimplemented!(
1055                "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.",
1056                other
1057            )
1058        }
1059    }
1060}
1061
1062#[inline(always)]
1063fn get_retry_strategy(
1064    config: &ObjectStoreConfig,
1065    operation_type: OperationType,
1066) -> impl Iterator<Item = Duration> + use<> {
1067    let attempts = get_retry_attempts_by_type(config, operation_type);
1068    ExponentialBackoff::from_millis(config.retry.req_backoff_interval_ms)
1069        .max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
1070        .factor(config.retry.req_backoff_factor)
1071        .take(attempts)
1072        .map(jitter)
1073}
1074
1075pub type ObjectMetadataIter = BoxStream<'static, ObjectResult<ObjectMetadata>>;
1076pub type ObjectDataStream = BoxStream<'static, ObjectResult<Bytes>>;
1077
1078#[derive(Debug, Clone, Copy)]
1079enum OperationType {
1080    Upload,
1081    StreamingUploadInit,
1082    StreamingUpload,
1083    StreamingUploadFinish,
1084    Read,
1085    StreamingReadInit,
1086    StreamingRead,
1087    Metadata,
1088    Delete,
1089    DeleteObjects,
1090    List,
1091}
1092
1093impl OperationType {
1094    fn as_str(&self) -> &'static str {
1095        match self {
1096            Self::Upload => "upload",
1097            Self::StreamingUploadInit => "streaming_upload_init",
1098            Self::StreamingUpload => "streaming_upload",
1099            Self::StreamingUploadFinish => "streaming_upload_finish",
1100            Self::Read => "read",
1101            Self::StreamingReadInit => "streaming_read_init",
1102            Self::StreamingRead => "streaming_read",
1103            Self::Metadata => "metadata",
1104            Self::Delete => "delete",
1105            Self::DeleteObjects => "delete_objects",
1106            Self::List => "list",
1107        }
1108    }
1109}
1110
1111fn get_retry_attempts_by_type(config: &ObjectStoreConfig, operation_type: OperationType) -> usize {
1112    match operation_type {
1113        OperationType::Upload => config.retry.upload_retry_attempts,
1114        OperationType::StreamingUploadInit
1115        | OperationType::StreamingUpload
1116        | OperationType::StreamingUploadFinish => config.retry.streaming_upload_retry_attempts,
1117        OperationType::Read => config.retry.read_retry_attempts,
1118        OperationType::StreamingReadInit | OperationType::StreamingRead => {
1119            config.retry.streaming_read_retry_attempts
1120        }
1121        OperationType::Metadata => config.retry.metadata_retry_attempts,
1122        OperationType::Delete => config.retry.delete_retry_attempts,
1123        OperationType::DeleteObjects => config.retry.delete_objects_retry_attempts,
1124        OperationType::List => config.retry.list_retry_attempts,
1125    }
1126}
1127
1128fn get_attempt_timeout_by_type(config: &ObjectStoreConfig, operation_type: OperationType) -> u64 {
1129    match operation_type {
1130        OperationType::Upload => config.retry.upload_attempt_timeout_ms,
1131        OperationType::StreamingUploadInit
1132        | OperationType::StreamingUpload
1133        | OperationType::StreamingUploadFinish => config.retry.streaming_upload_attempt_timeout_ms,
1134        OperationType::Read => config.retry.read_attempt_timeout_ms,
1135        OperationType::StreamingReadInit | OperationType::StreamingRead => {
1136            config.retry.streaming_read_attempt_timeout_ms
1137        }
1138        OperationType::Metadata => config.retry.metadata_attempt_timeout_ms,
1139        OperationType::Delete => config.retry.delete_attempt_timeout_ms,
1140        OperationType::DeleteObjects => config.retry.delete_objects_attempt_timeout_ms,
1141        OperationType::List => config.retry.list_attempt_timeout_ms,
1142    }
1143}
1144
1145struct RetryCondition {
1146    operation_type: OperationType,
1147    retry_count: usize,
1148    metrics: Arc<ObjectStoreMetrics>,
1149    retry_opendal_s3_unknown_error: bool,
1150}
1151
1152impl RetryCondition {
1153    fn new(
1154        operation_type: OperationType,
1155        metrics: Arc<ObjectStoreMetrics>,
1156        retry_opendal_s3_unknown_error: bool,
1157    ) -> Self {
1158        Self {
1159            operation_type,
1160            retry_count: 0,
1161            metrics,
1162            retry_opendal_s3_unknown_error,
1163        }
1164    }
1165
1166    #[inline(always)]
1167    fn should_retry_inner(&mut self, err: &ObjectError) -> bool {
1168        let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error);
1169        if should_retry {
1170            self.retry_count += 1;
1171        }
1172
1173        should_retry
1174    }
1175}
1176
1177impl tokio_retry::Condition<ObjectError> for RetryCondition {
1178    fn should_retry(&mut self, err: &ObjectError) -> bool {
1179        self.should_retry_inner(err)
1180    }
1181}
1182
1183impl Drop for RetryCondition {
1184    fn drop(&mut self) {
1185        if self.retry_count > 0 {
1186            self.metrics
1187                .request_retry_count
1188                .with_label_values(&[self.operation_type.as_str()])
1189                .inc_by(self.retry_count as _);
1190        }
1191    }
1192}
1193
1194async fn retry_request<F, T, B>(
1195    builder: B,
1196    config: &ObjectStoreConfig,
1197    operation_type: OperationType,
1198    object_store_metrics: Arc<ObjectStoreMetrics>,
1199    media_type: &'static str,
1200) -> ObjectResult<T>
1201where
1202    B: Fn() -> F,
1203    F: Future<Output = ObjectResult<T>>,
1204{
1205    let backoff = get_retry_strategy(config, operation_type);
1206    let timeout_duration =
1207        Duration::from_millis(get_attempt_timeout_by_type(config, operation_type));
1208    let operation_type_str = operation_type.as_str();
1209
1210    let retry_condition = RetryCondition::new(
1211        operation_type,
1212        object_store_metrics,
1213        (config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error)
1214            && (media_type == opendal_engine::MediaType::S3.as_str()
1215                || media_type == opendal_engine::MediaType::Minio.as_str()),
1216    );
1217
1218    let f = || async {
1219        let future = builder();
1220        if timeout_duration.is_zero() {
1221            future.await
1222        } else {
1223            tokio::time::timeout(timeout_duration, future)
1224                .await
1225                .unwrap_or_else(|_| {
1226                    Err(ObjectError::timeout(format!(
1227                        "Retry attempts exhausted for {}. Please modify {}_attempt_timeout_ms (current={:?}) and {}_retry_attempts (current={}) under [storage.object_store.retry] in the config accordingly if needed.",
1228                        operation_type_str, operation_type_str, timeout_duration.as_millis(), operation_type_str, get_retry_attempts_by_type(config, operation_type)
1229                    )))
1230                })
1231        }
1232    };
1233
1234    tokio_retry::RetryIf::spawn(backoff, f, retry_condition).await
1235}