1#![expect(
16 unexpected_cfgs,
17 reason = "feature(hdfs-backend) is banned https://github.com/risingwavelabs/risingwave/pull/7875"
18)]
19
20pub mod sim;
21use std::ops::{Range, RangeBounds};
22use std::sync::Arc;
23use std::time::Duration;
24
25use bytes::Bytes;
26
27pub mod mem;
28pub use mem::*;
29
30pub mod opendal_engine;
31pub use opendal_engine::*;
32
33pub mod s3;
34use await_tree::{InstrumentAwait, SpanExt};
35use futures::stream::BoxStream;
36use futures::{Future, StreamExt};
37pub use risingwave_common::config::ObjectStoreConfig;
38pub use s3::*;
39
40pub mod error;
41pub mod object_metrics;
42
43pub mod prefix;
44
45pub use error::*;
46use object_metrics::ObjectStoreMetrics;
47use thiserror_ext::AsReport;
48use tokio_retry::strategy::{ExponentialBackoff, jitter};
49
50#[cfg(madsim)]
51use self::sim::SimObjectStore;
52
53pub type ObjectStoreRef = Arc<ObjectStoreImpl>;
54pub type ObjectStreamingUploader = StreamingUploaderImpl;
55
56pub trait ObjectRangeBounds = RangeBounds<usize> + Clone + Send + Sync + std::fmt::Debug + 'static;
57
58#[derive(Debug, Clone, PartialEq)]
59pub struct ObjectMetadata {
60 pub key: String,
62 pub last_modified: f64,
64 pub total_size: usize,
65}
66
67pub trait StreamingUploader: Send {
68 #[expect(async_fn_in_trait)]
69 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()>;
70
71 #[expect(async_fn_in_trait)]
72 async fn finish(self) -> ObjectResult<()>;
73
74 fn get_memory_usage(&self) -> u64;
75}
76
77#[async_trait::async_trait]
79pub trait ObjectStore: Send + Sync {
80 type StreamingUploader: StreamingUploader;
81 fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String;
83
84 async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>;
86
87 async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader>;
88
89 async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes>;
93
94 async fn streaming_read(
98 &self,
99 path: &str,
100 read_range: Range<usize>,
101 ) -> ObjectResult<ObjectDataStream>;
102
103 async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata>;
105
106 async fn delete(&self, path: &str) -> ObjectResult<()>;
108
109 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()>;
112
113 fn monitored(
114 self,
115 metrics: Arc<ObjectStoreMetrics>,
116 config: Arc<ObjectStoreConfig>,
117 ) -> MonitoredObjectStore<Self>
118 where
119 Self: Sized,
120 {
121 MonitoredObjectStore::new(self, metrics, config)
122 }
123
124 async fn list(
125 &self,
126 prefix: &str,
127 start_after: Option<String>,
128 limit: Option<usize>,
129 ) -> ObjectResult<ObjectMetadataIter>;
130
131 fn store_media_type(&self) -> &'static str;
132
133 fn support_streaming_upload(&self) -> bool {
134 true
135 }
136}
137
138#[cfg(not(madsim))]
139macro_rules! for_all_object_store {
140 ($macro:ident $($args:tt)*) => {
141 $macro! {
142 {
143 { InMem, InMemObjectStore },
144 { Opendal, OpendalObjectStore },
145 { S3, S3ObjectStore }
146 }
147 $($args)*
148 }
149 }
150}
151
152#[cfg(madsim)]
153macro_rules! for_all_object_store {
154 ($macro:ident $($args:tt)*) => {
155 $macro! {
156 {
157 { InMem, InMemObjectStore },
158 { Opendal, OpendalObjectStore },
159 { S3, S3ObjectStore },
160 { Sim, SimObjectStore }
161 }
162 $($args)*
163 }
164 }
165}
166
167macro_rules! enum_map {
168 (
169 {
170 $(
171 {$variant:ident, $_type_name:ty}
172 ),*
173 },
174 $object_store:expr,
175 $var_name:ident,
176 $func:expr
177 ) => {
178 match $object_store {
179 $(
180 ObjectStoreEnum::$variant($var_name) => ObjectStoreEnum::$variant({
181 $func
182 }),
183 )*
184 }
185 };
186 ($object_store:expr, |$var_name:ident| $func:expr) => {
187 for_all_object_store! {
188 enum_map, $object_store, $var_name, $func
189 }
190 };
191}
192
193macro_rules! dispatch_object_store_enum {
194 (
195 {
196 $(
197 {$variant:ident, $_type_name:ty}
198 ),*
199 },
200 $object_store:expr,
201 $var_name:ident,
202 $func:expr
203 ) => {
204 match $object_store {
205 $(
206 ObjectStoreEnum::$variant($var_name) => {
207 $func
208 },
209 )*
210 }
211 };
212 ($object_store:expr, |$var_name:ident| $func:expr) => {
213 for_all_object_store! {
214 dispatch_object_store_enum, $object_store, $var_name, $func
215 }
216 };
217}
218
219macro_rules! define_object_store_impl {
220 () => {
221 for_all_object_store! {
222 define_object_store_impl
223 }
224 };
225 (
226 {$(
227 {$variant:ident, $type_name:ty}
228 ),*}
229 ) => {
230 pub enum ObjectStoreEnum<
231 $($variant),*
232 > {
233 $(
234 $variant($variant),
235 )*
236 }
237
238 pub type ObjectStoreImpl = ObjectStoreEnum<
239 $(
240 MonitoredObjectStore<$type_name>,
241 )*
242 >;
243
244 pub type StreamingUploaderImpl = ObjectStoreEnum<
245 $(
246 MonitoredStreamingUploader<<$type_name as ObjectStore>::StreamingUploader>
247 ),*
248 >;
249 };
250}
251
252define_object_store_impl!();
253
254macro_rules! object_store_impl_method_body {
259 ($object_store:expr, $method_name:ident ($($args:expr),*).await) => {
261 {
262 dispatch_object_store_enum! {$object_store, |os| {
263 os.$method_name($($args),*).await
264 }}
265 }
266 };
267 ($object_store:expr, $method_name:ident ($(, $args:expr)*)) => {
269 {
270 dispatch_object_store_enum! {$object_store, |os| {
271 os.$method_name($($args),*)
272 }}
273 }
274 };
275}
276
277impl StreamingUploaderImpl {
278 pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
279 object_store_impl_method_body!(self, write_bytes(data).await)
280 }
281
282 pub async fn finish(self) -> ObjectResult<()> {
283 object_store_impl_method_body!(self, finish().await)
284 }
285
286 pub fn get_memory_usage(&self) -> u64 {
287 object_store_impl_method_body!(self, get_memory_usage())
288 }
289}
290
291impl ObjectStoreImpl {
292 pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
293 object_store_impl_method_body!(self, upload(path, obj).await)
294 }
295
296 pub async fn streaming_upload(&self, path: &str) -> ObjectResult<ObjectStreamingUploader> {
297 Ok(enum_map!(self, |store| {
298 store.streaming_upload(path).await?
299 }))
300 }
301
302 pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
303 object_store_impl_method_body!(self, read(path, range).await)
304 }
305
306 pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
307 object_store_impl_method_body!(self, metadata(path).await)
308 }
309
310 pub async fn streaming_read(
314 &self,
315 path: &str,
316 start_loc: Range<usize>,
317 ) -> ObjectResult<MonitoredStreamingReader> {
318 object_store_impl_method_body!(self, streaming_read(path, start_loc).await)
319 }
320
321 pub async fn delete(&self, path: &str) -> ObjectResult<()> {
322 object_store_impl_method_body!(self, delete(path).await)
323 }
324
325 pub async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
331 object_store_impl_method_body!(self, delete_objects(paths).await)
332 }
333
334 pub async fn list(
335 &self,
336 prefix: &str,
337 start_after: Option<String>,
338 limit: Option<usize>,
339 ) -> ObjectResult<ObjectMetadataIter> {
340 object_store_impl_method_body!(self, list(prefix, start_after, limit).await)
341 }
342
343 pub fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String {
344 dispatch_object_store_enum!(self, |store| store
345 .inner
346 .get_object_prefix(obj_id, use_new_object_prefix_strategy))
347 }
348
349 pub fn support_streaming_upload(&self) -> bool {
350 dispatch_object_store_enum!(self, |store| store.inner.support_streaming_upload())
351 }
352
353 pub fn media_type(&self) -> &'static str {
354 object_store_impl_method_body!(self, media_type())
355 }
356}
357
358fn try_update_failure_metric<T>(
359 metrics: &Arc<ObjectStoreMetrics>,
360 result: &ObjectResult<T>,
361 operation_type: &'static str,
362) {
363 if let Err(e) = &result {
364 tracing::error!(error = %e.as_report(), "{} failed", operation_type);
365 metrics
366 .failure_count
367 .with_label_values(&[operation_type])
368 .inc();
369 }
370}
371
372pub struct MonitoredStreamingUploader<U: StreamingUploader> {
384 inner: U,
385 object_store_metrics: Arc<ObjectStoreMetrics>,
386 operation_size: usize,
388}
389
390impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
391 pub fn new(handle: U, object_store_metrics: Arc<ObjectStoreMetrics>) -> Self {
392 Self {
393 inner: handle,
394 object_store_metrics,
395 operation_size: 0,
396 }
397 }
398}
399
400impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
403 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
404 let operation_type = OperationType::StreamingUpload;
405 let operation_type_str = operation_type.as_str();
406 let data_len = data.len();
407
408 let res = self
409 .inner
410 .write_bytes(data)
411 .instrument_await(operation_type_str.verbose())
412 .await;
413
414 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
415
416 self.object_store_metrics
418 .write_bytes
419 .inc_by(data_len as u64);
420 self.object_store_metrics
421 .operation_size
422 .with_label_values(&[operation_type_str])
423 .observe(data_len as f64);
424 self.operation_size += data_len;
425
426 res
427 }
428
429 async fn finish(self) -> ObjectResult<()> {
430 let operation_type = OperationType::StreamingUploadFinish;
431 let operation_type_str = operation_type.as_str();
432
433 let res =
434 self.inner
436 .finish()
437 .instrument_await(operation_type_str.verbose())
438 .await;
439
440 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
441
442 self.object_store_metrics
444 .operation_size
445 .with_label_values(&[operation_type_str])
446 .observe(self.operation_size as f64);
447 res
448 }
449
450 fn get_memory_usage(&self) -> u64 {
451 self.inner.get_memory_usage()
452 }
453}
454
455pub struct MonitoredStreamingReader {
456 inner: ObjectDataStream,
457 object_store_metrics: Arc<ObjectStoreMetrics>,
458 operation_size: usize,
459 media_type: &'static str,
460 streaming_read_timeout: Option<Duration>,
461 operation_type_str: &'static str,
462}
463
464impl MonitoredStreamingReader {
465 pub fn new(
466 media_type: &'static str,
467 handle: ObjectDataStream,
468 object_store_metrics: Arc<ObjectStoreMetrics>,
469 streaming_read_timeout: Option<Duration>,
470 ) -> Self {
471 Self {
472 inner: handle,
473 object_store_metrics,
474 operation_size: 0,
475 media_type,
476 streaming_read_timeout,
477 operation_type_str: OperationType::StreamingRead.as_str(),
478 }
479 }
480
481 pub async fn read_bytes(&mut self) -> Option<ObjectResult<Bytes>> {
482 let _timer = self
483 .object_store_metrics
484 .operation_latency
485 .with_label_values(&[self.media_type, self.operation_type_str])
486 .start_timer();
487 let future = async {
488 self.inner
489 .next()
490 .instrument_await(self.operation_type_str.verbose())
491 .await
492 };
493 let res = match self.streaming_read_timeout.as_ref() {
494 None => future.await,
495 Some(timeout_duration) => tokio::time::timeout(*timeout_duration, future)
496 .await
497 .unwrap_or_else(|_| {
498 Some(Err(ObjectError::timeout(format!(
499 "Retry attempts exhausted for {}. Please modify {}_attempt_timeout_ms (current={:?}) under [storage.object_store.retry] in the config accordingly if needed.",
500 self.operation_type_str, self.operation_type_str, timeout_duration.as_millis()
501 ))))
502 }),
503 };
504
505 if let Some(ret) = &res {
506 try_update_failure_metric(&self.object_store_metrics, ret, self.operation_type_str);
507 }
508 if let Some(Ok(data)) = &res {
509 let data_len = data.len();
510 self.object_store_metrics.read_bytes.inc_by(data_len as u64);
511 self.object_store_metrics
512 .operation_size
513 .with_label_values(&[self.operation_type_str])
514 .observe(data_len as f64);
515 self.operation_size += data_len;
516 }
517 res
518 }
519}
520
521impl Drop for MonitoredStreamingReader {
522 fn drop(&mut self) {
523 self.object_store_metrics
524 .operation_size
525 .with_label_values(&[self.operation_type_str])
526 .observe(self.operation_size as f64);
527 }
528}
529
530pub struct MonitoredObjectStore<OS: ObjectStore> {
531 inner: OS,
532 object_store_metrics: Arc<ObjectStoreMetrics>,
533 config: Arc<ObjectStoreConfig>,
534}
535
536impl<OS: ObjectStore> MonitoredObjectStore<OS> {
553 pub fn new(
554 store: OS,
555 object_store_metrics: Arc<ObjectStoreMetrics>,
556 config: Arc<ObjectStoreConfig>,
557 ) -> Self {
558 Self {
559 object_store_metrics,
560 inner: store,
561 config,
562 }
563 }
564
565 fn media_type(&self) -> &'static str {
566 self.inner.store_media_type()
567 }
568
569 pub fn inner(&self) -> &OS {
570 &self.inner
571 }
572
573 pub fn mut_inner(&mut self) -> &mut OS {
574 &mut self.inner
575 }
576
577 pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
578 let operation_type = OperationType::Upload;
579 let operation_type_str = operation_type.as_str();
580 let media_type = self.media_type();
581
582 self.object_store_metrics
583 .write_bytes
584 .inc_by(obj.len() as u64);
585 self.object_store_metrics
586 .operation_size
587 .with_label_values(&[operation_type_str])
588 .observe(obj.len() as f64);
589 let _timer = self
590 .object_store_metrics
591 .operation_latency
592 .with_label_values(&[media_type, operation_type_str])
593 .start_timer();
594
595 let builder = || async {
596 self.inner
597 .upload(path, obj.clone())
598 .instrument_await(operation_type_str.verbose())
599 .await
600 };
601
602 let res = retry_request(
603 builder,
604 &self.config,
605 operation_type,
606 self.object_store_metrics.clone(),
607 media_type,
608 )
609 .await;
610
611 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
612 res
613 }
614
615 pub async fn streaming_upload(
616 &self,
617 path: &str,
618 ) -> ObjectResult<MonitoredStreamingUploader<OS::StreamingUploader>> {
619 let operation_type = OperationType::StreamingUploadInit;
620 let operation_type_str = operation_type.as_str();
621 let media_type = self.media_type();
622 let _timer = self
623 .object_store_metrics
624 .operation_latency
625 .with_label_values(&[media_type, operation_type_str])
626 .start_timer();
627
628 let res = self
629 .inner
630 .streaming_upload(path)
631 .instrument_await(operation_type_str.verbose())
632 .await;
633
634 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
635
636 Ok(MonitoredStreamingUploader::new(
637 res?,
638 self.object_store_metrics.clone(),
639 ))
640 }
641
642 pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
643 let operation_type = OperationType::Read;
644 let operation_type_str = operation_type.as_str();
645 let media_type = self.media_type();
646
647 let _timer = self
648 .object_store_metrics
649 .operation_latency
650 .with_label_values(&[media_type, operation_type_str])
651 .start_timer();
652
653 let builder = || async {
654 self.inner
655 .read(path, range.clone())
656 .instrument_await(operation_type_str.verbose())
657 .await
658 };
659
660 let res = retry_request(
661 builder,
662 &self.config,
663 operation_type,
664 self.object_store_metrics.clone(),
665 media_type,
666 )
667 .await;
668
669 if let Err(e) = &res
670 && e.is_object_not_found_error()
671 && !path.ends_with(".data")
672 {
673 } else {
676 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
677 }
678
679 let data = res?;
680 self.object_store_metrics
681 .read_bytes
682 .inc_by(data.len() as u64);
683 self.object_store_metrics
684 .operation_size
685 .with_label_values(&[operation_type_str])
686 .observe(data.len() as f64);
687 Ok(data)
688 }
689
690 async fn streaming_read(
694 &self,
695 path: &str,
696 range: Range<usize>,
697 ) -> ObjectResult<MonitoredStreamingReader> {
698 let operation_type = OperationType::StreamingReadInit;
699 let operation_type_str = operation_type.as_str();
700 let media_type = self.media_type();
701 let _timer = self
702 .object_store_metrics
703 .operation_latency
704 .with_label_values(&[media_type, operation_type_str])
705 .start_timer();
706
707 let builder = || async {
708 self.inner
709 .streaming_read(path, range.clone())
710 .instrument_await(operation_type_str.verbose())
711 .await
712 };
713
714 let res = retry_request(
715 builder,
716 &self.config,
717 operation_type,
718 self.object_store_metrics.clone(),
719 media_type,
720 )
721 .await;
722
723 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
724
725 Ok(MonitoredStreamingReader::new(
726 media_type,
727 res?,
728 self.object_store_metrics.clone(),
729 Some(Duration::from_millis(
730 self.config.retry.streaming_read_attempt_timeout_ms,
731 )),
732 ))
733 }
734
735 pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
736 let operation_type = OperationType::Metadata;
737 let operation_type_str = operation_type.as_str();
738 let media_type = self.media_type();
739 let _timer = self
740 .object_store_metrics
741 .operation_latency
742 .with_label_values(&[media_type, operation_type_str])
743 .start_timer();
744
745 let builder = || async {
746 self.inner
747 .metadata(path)
748 .instrument_await(operation_type_str.verbose())
749 .await
750 };
751
752 let res = retry_request(
753 builder,
754 &self.config,
755 operation_type,
756 self.object_store_metrics.clone(),
757 media_type,
758 )
759 .await;
760
761 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
762 res
763 }
764
765 pub async fn delete(&self, path: &str) -> ObjectResult<()> {
766 let operation_type = OperationType::Delete;
767 let operation_type_str = operation_type.as_str();
768 let media_type = self.media_type();
769
770 let _timer = self
771 .object_store_metrics
772 .operation_latency
773 .with_label_values(&[media_type, operation_type_str])
774 .start_timer();
775
776 let builder = || async {
777 self.inner
778 .delete(path)
779 .instrument_await(operation_type_str.verbose())
780 .await
781 };
782
783 let res = retry_request(
784 builder,
785 &self.config,
786 operation_type,
787 self.object_store_metrics.clone(),
788 media_type,
789 )
790 .await;
791
792 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
793 res
794 }
795
796 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
797 let operation_type = OperationType::DeleteObjects;
798 let operation_type_str = operation_type.as_str();
799 let media_type = self.media_type();
800
801 let _timer = self
802 .object_store_metrics
803 .operation_latency
804 .with_label_values(&[self.media_type(), operation_type_str])
805 .start_timer();
806
807 let builder = || async {
808 self.inner
809 .delete_objects(paths)
810 .instrument_await(operation_type_str.verbose())
811 .await
812 };
813
814 let res = retry_request(
815 builder,
816 &self.config,
817 operation_type,
818 self.object_store_metrics.clone(),
819 media_type,
820 )
821 .await;
822
823 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
824 res
825 }
826
827 pub async fn list(
828 &self,
829 prefix: &str,
830 start_after: Option<String>,
831 limit: Option<usize>,
832 ) -> ObjectResult<ObjectMetadataIter> {
833 let operation_type = OperationType::List;
834 let operation_type_str = operation_type.as_str();
835 let media_type = self.media_type();
836
837 let _timer = self
838 .object_store_metrics
839 .operation_latency
840 .with_label_values(&[media_type, operation_type_str])
841 .start_timer();
842
843 let builder = || async {
844 self.inner
845 .list(prefix, start_after.clone(), limit)
846 .instrument_await(operation_type_str.verbose())
847 .await
848 };
849
850 let res = retry_request(
851 builder,
852 &self.config,
853 operation_type,
854 self.object_store_metrics.clone(),
855 media_type,
856 )
857 .await;
858
859 try_update_failure_metric(&self.object_store_metrics, &res, operation_type_str);
860 res
861 }
862}
863
864pub async fn build_remote_object_store(
870 url: &str,
871 metrics: Arc<ObjectStoreMetrics>,
872 ident: &str,
873 config: Arc<ObjectStoreConfig>,
874) -> ObjectStoreImpl {
875 tracing::debug!(config=?config, "object store {ident}");
876 match url {
877 s3 if s3.starts_with("s3://") => {
878 if config.s3.developer.use_opendal {
879 let bucket = s3.strip_prefix("s3://").unwrap();
880 tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket);
881 ObjectStoreImpl::Opendal(
882 OpendalObjectStore::new_s3_engine(
883 bucket.to_owned(),
884 config.clone(),
885 metrics.clone(),
886 )
887 .unwrap()
888 .monitored(metrics, config),
889 )
890 } else {
891 ObjectStoreImpl::S3(
892 S3ObjectStore::new_with_config(
893 s3.strip_prefix("s3://").unwrap().to_owned(),
894 metrics.clone(),
895 config.clone(),
896 )
897 .await
898 .monitored(metrics, config),
899 )
900 }
901 }
902 #[cfg(feature = "hdfs-backend")]
903 hdfs if hdfs.starts_with("hdfs://") => {
904 let hdfs = hdfs.strip_prefix("hdfs://").unwrap();
905 let (namenode, root) = hdfs.split_once('@').unwrap_or((hdfs, ""));
906 ObjectStoreImpl::Opendal(
907 OpendalObjectStore::new_hdfs_engine(
908 namenode.to_string(),
909 root.to_string(),
910 config.clone(),
911 metrics.clone(),
912 )
913 .unwrap()
914 .monitored(metrics, config),
915 )
916 }
917 gcs if gcs.starts_with("gcs://") => {
918 let gcs = gcs.strip_prefix("gcs://").unwrap();
919 let (bucket, root) = gcs.split_once('@').unwrap_or((gcs, ""));
920 ObjectStoreImpl::Opendal(
921 OpendalObjectStore::new_gcs_engine(
922 bucket.to_owned(),
923 root.to_owned(),
924 config.clone(),
925 metrics.clone(),
926 )
927 .unwrap()
928 .monitored(metrics, config),
929 )
930 }
931 obs if obs.starts_with("obs://") => {
932 let obs = obs.strip_prefix("obs://").unwrap();
933 let (bucket, root) = obs.split_once('@').unwrap_or((obs, ""));
934 ObjectStoreImpl::Opendal(
935 OpendalObjectStore::new_obs_engine(
936 bucket.to_owned(),
937 root.to_owned(),
938 config.clone(),
939 metrics.clone(),
940 )
941 .unwrap()
942 .monitored(metrics, config),
943 )
944 }
945
946 oss if oss.starts_with("oss://") => {
947 let oss = oss.strip_prefix("oss://").unwrap();
948 let (bucket, root) = oss.split_once('@').unwrap_or((oss, ""));
949 ObjectStoreImpl::Opendal(
950 OpendalObjectStore::new_oss_engine(
951 bucket.to_owned(),
952 root.to_owned(),
953 config.clone(),
954 metrics.clone(),
955 )
956 .unwrap()
957 .monitored(metrics, config),
958 )
959 }
960 webhdfs if webhdfs.starts_with("webhdfs://") => {
961 let webhdfs = webhdfs.strip_prefix("webhdfs://").unwrap();
962 let (namenode, root) = webhdfs.split_once('@').unwrap_or((webhdfs, ""));
963 ObjectStoreImpl::Opendal(
964 OpendalObjectStore::new_webhdfs_engine(
965 namenode.to_owned(),
966 root.to_owned(),
967 config.clone(),
968 metrics.clone(),
969 )
970 .unwrap()
971 .monitored(metrics, config),
972 )
973 }
974 azblob if azblob.starts_with("azblob://") => {
975 let azblob = azblob.strip_prefix("azblob://").unwrap();
976 let (container_name, root) = azblob.split_once('@').unwrap_or((azblob, ""));
977 ObjectStoreImpl::Opendal(
978 OpendalObjectStore::new_azblob_engine(
979 container_name.to_owned(),
980 root.to_owned(),
981 config.clone(),
982 metrics.clone(),
983 )
984 .unwrap()
985 .monitored(metrics, config),
986 )
987 }
988 fs if fs.starts_with("fs://") => {
989 let fs = fs.strip_prefix("fs://").unwrap();
990 ObjectStoreImpl::Opendal(
991 OpendalObjectStore::new_fs_engine(fs.to_owned(), config.clone(), metrics.clone())
992 .unwrap()
993 .monitored(metrics, config),
994 )
995 }
996
997 s3_compatible if s3_compatible.starts_with("s3-compatible://") => {
998 tracing::error!("The s3 compatible mode has been unified with s3.");
999 tracing::error!("If you want to use s3 compatible storage, please set your access_key, secret_key and region to the environment variable AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION,
1000 set your endpoint to the environment variable RW_S3_ENDPOINT.");
1001 panic!(
1002 "Passing s3-compatible is not supported, please modify the environment variable and pass in s3."
1003 );
1004 }
1005 minio if minio.starts_with("minio://") => {
1006 if config.s3.developer.use_opendal {
1007 tracing::info!("Using OpenDAL to access minio.");
1008 ObjectStoreImpl::Opendal(
1009 OpendalObjectStore::new_minio_engine(minio, config.clone(), metrics.clone())
1010 .unwrap()
1011 .monitored(metrics, config),
1012 )
1013 } else {
1014 ObjectStoreImpl::S3(
1015 S3ObjectStore::new_minio_engine(minio, metrics.clone(), config.clone())
1016 .await
1017 .monitored(metrics, config),
1018 )
1019 }
1020 }
1021 "memory" => {
1022 if ident == "Meta Backup" {
1023 tracing::warn!(
1024 "You're using in-memory remote object store for {}. This is not recommended for production environment.",
1025 ident
1026 );
1027 } else {
1028 tracing::warn!(
1029 "You're using in-memory remote object store for {}. This should never be used in benchmarks and production environment.",
1030 ident
1031 );
1032 }
1033 ObjectStoreImpl::InMem(InMemObjectStore::new().monitored(metrics, config))
1034 }
1035 "memory-shared" => {
1036 if ident == "Meta Backup" {
1037 tracing::warn!(
1038 "You're using shared in-memory remote object store for {}. This should never be used in production environment.",
1039 ident
1040 );
1041 } else {
1042 tracing::warn!(
1043 "You're using shared in-memory remote object store for {}. This should never be used in benchmarks and production environment.",
1044 ident
1045 );
1046 }
1047 ObjectStoreImpl::InMem(InMemObjectStore::shared().monitored(metrics, config))
1048 }
1049 #[cfg(madsim)]
1050 sim if sim.starts_with("sim://") => {
1051 ObjectStoreImpl::Sim(SimObjectStore::new(url).monitored(metrics, config))
1052 }
1053 other => {
1054 unimplemented!(
1055 "{} remote object store only supports s3, minio, gcs, oss, cos, azure blob, hdfs, disk, memory, and memory-shared.",
1056 other
1057 )
1058 }
1059 }
1060}
1061
1062#[inline(always)]
1063fn get_retry_strategy(
1064 config: &ObjectStoreConfig,
1065 operation_type: OperationType,
1066) -> impl Iterator<Item = Duration> + use<> {
1067 let attempts = get_retry_attempts_by_type(config, operation_type);
1068 ExponentialBackoff::from_millis(config.retry.req_backoff_interval_ms)
1069 .max_delay(Duration::from_millis(config.retry.req_backoff_max_delay_ms))
1070 .factor(config.retry.req_backoff_factor)
1071 .take(attempts)
1072 .map(jitter)
1073}
1074
1075pub type ObjectMetadataIter = BoxStream<'static, ObjectResult<ObjectMetadata>>;
1076pub type ObjectDataStream = BoxStream<'static, ObjectResult<Bytes>>;
1077
1078#[derive(Debug, Clone, Copy)]
1079enum OperationType {
1080 Upload,
1081 StreamingUploadInit,
1082 StreamingUpload,
1083 StreamingUploadFinish,
1084 Read,
1085 StreamingReadInit,
1086 StreamingRead,
1087 Metadata,
1088 Delete,
1089 DeleteObjects,
1090 List,
1091}
1092
1093impl OperationType {
1094 fn as_str(&self) -> &'static str {
1095 match self {
1096 Self::Upload => "upload",
1097 Self::StreamingUploadInit => "streaming_upload_init",
1098 Self::StreamingUpload => "streaming_upload",
1099 Self::StreamingUploadFinish => "streaming_upload_finish",
1100 Self::Read => "read",
1101 Self::StreamingReadInit => "streaming_read_init",
1102 Self::StreamingRead => "streaming_read",
1103 Self::Metadata => "metadata",
1104 Self::Delete => "delete",
1105 Self::DeleteObjects => "delete_objects",
1106 Self::List => "list",
1107 }
1108 }
1109}
1110
1111fn get_retry_attempts_by_type(config: &ObjectStoreConfig, operation_type: OperationType) -> usize {
1112 match operation_type {
1113 OperationType::Upload => config.retry.upload_retry_attempts,
1114 OperationType::StreamingUploadInit
1115 | OperationType::StreamingUpload
1116 | OperationType::StreamingUploadFinish => config.retry.streaming_upload_retry_attempts,
1117 OperationType::Read => config.retry.read_retry_attempts,
1118 OperationType::StreamingReadInit | OperationType::StreamingRead => {
1119 config.retry.streaming_read_retry_attempts
1120 }
1121 OperationType::Metadata => config.retry.metadata_retry_attempts,
1122 OperationType::Delete => config.retry.delete_retry_attempts,
1123 OperationType::DeleteObjects => config.retry.delete_objects_retry_attempts,
1124 OperationType::List => config.retry.list_retry_attempts,
1125 }
1126}
1127
1128fn get_attempt_timeout_by_type(config: &ObjectStoreConfig, operation_type: OperationType) -> u64 {
1129 match operation_type {
1130 OperationType::Upload => config.retry.upload_attempt_timeout_ms,
1131 OperationType::StreamingUploadInit
1132 | OperationType::StreamingUpload
1133 | OperationType::StreamingUploadFinish => config.retry.streaming_upload_attempt_timeout_ms,
1134 OperationType::Read => config.retry.read_attempt_timeout_ms,
1135 OperationType::StreamingReadInit | OperationType::StreamingRead => {
1136 config.retry.streaming_read_attempt_timeout_ms
1137 }
1138 OperationType::Metadata => config.retry.metadata_attempt_timeout_ms,
1139 OperationType::Delete => config.retry.delete_attempt_timeout_ms,
1140 OperationType::DeleteObjects => config.retry.delete_objects_attempt_timeout_ms,
1141 OperationType::List => config.retry.list_attempt_timeout_ms,
1142 }
1143}
1144
1145struct RetryCondition {
1146 operation_type: OperationType,
1147 retry_count: usize,
1148 metrics: Arc<ObjectStoreMetrics>,
1149 retry_opendal_s3_unknown_error: bool,
1150}
1151
1152impl RetryCondition {
1153 fn new(
1154 operation_type: OperationType,
1155 metrics: Arc<ObjectStoreMetrics>,
1156 retry_opendal_s3_unknown_error: bool,
1157 ) -> Self {
1158 Self {
1159 operation_type,
1160 retry_count: 0,
1161 metrics,
1162 retry_opendal_s3_unknown_error,
1163 }
1164 }
1165
1166 #[inline(always)]
1167 fn should_retry_inner(&mut self, err: &ObjectError) -> bool {
1168 let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error);
1169 if should_retry {
1170 self.retry_count += 1;
1171 }
1172
1173 should_retry
1174 }
1175}
1176
1177impl tokio_retry::Condition<ObjectError> for RetryCondition {
1178 fn should_retry(&mut self, err: &ObjectError) -> bool {
1179 self.should_retry_inner(err)
1180 }
1181}
1182
1183impl Drop for RetryCondition {
1184 fn drop(&mut self) {
1185 if self.retry_count > 0 {
1186 self.metrics
1187 .request_retry_count
1188 .with_label_values(&[self.operation_type.as_str()])
1189 .inc_by(self.retry_count as _);
1190 }
1191 }
1192}
1193
1194async fn retry_request<F, T, B>(
1195 builder: B,
1196 config: &ObjectStoreConfig,
1197 operation_type: OperationType,
1198 object_store_metrics: Arc<ObjectStoreMetrics>,
1199 media_type: &'static str,
1200) -> ObjectResult<T>
1201where
1202 B: Fn() -> F,
1203 F: Future<Output = ObjectResult<T>>,
1204{
1205 let backoff = get_retry_strategy(config, operation_type);
1206 let timeout_duration =
1207 Duration::from_millis(get_attempt_timeout_by_type(config, operation_type));
1208 let operation_type_str = operation_type.as_str();
1209
1210 let retry_condition = RetryCondition::new(
1211 operation_type,
1212 object_store_metrics,
1213 (config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error)
1214 && (media_type == opendal_engine::MediaType::S3.as_str()
1215 || media_type == opendal_engine::MediaType::Minio.as_str()),
1216 );
1217
1218 let f = || async {
1219 let future = builder();
1220 if timeout_duration.is_zero() {
1221 future.await
1222 } else {
1223 tokio::time::timeout(timeout_duration, future)
1224 .await
1225 .unwrap_or_else(|_| {
1226 Err(ObjectError::timeout(format!(
1227 "Retry attempts exhausted for {}. Please modify {}_attempt_timeout_ms (current={:?}) and {}_retry_attempts (current={}) under [storage.object_store.retry] in the config accordingly if needed.",
1228 operation_type_str, operation_type_str, timeout_duration.as_millis(), operation_type_str, get_retry_attempts_by_type(config, operation_type)
1229 )))
1230 })
1231 }
1232 };
1233
1234 tokio_retry::RetryIf::spawn(backoff, f, retry_condition).await
1235}