1use std::borrow::BorrowMut;
16use std::cmp;
17use std::collections::VecDeque;
18use std::ops::Range;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22use std::time::Duration;
23
24use await_tree::{InstrumentAwait, SpanExt};
25use aws_sdk_s3::Client;
26use aws_sdk_s3::config::{Credentials, Region};
27use aws_sdk_s3::error::BoxError;
28use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
29use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
30use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
31use aws_sdk_s3::operation::delete_object::DeleteObjectError;
32use aws_sdk_s3::operation::delete_objects::DeleteObjectsError;
33use aws_sdk_s3::operation::get_object::GetObjectError;
34use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
35use aws_sdk_s3::operation::head_object::HeadObjectError;
36use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
37use aws_sdk_s3::operation::put_object::PutObjectError;
38use aws_sdk_s3::operation::upload_part::UploadPartOutput;
39use aws_sdk_s3::primitives::ByteStream;
40use aws_sdk_s3::types::{
41 AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
42 CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
43};
44use aws_smithy_http::futures_stream_adapter::FuturesStreamCompatByteStream;
45use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
46use aws_smithy_runtime_api::client::http::HttpClient;
47use aws_smithy_runtime_api::client::result::SdkError;
48use aws_smithy_types::body::SdkBody;
49use aws_smithy_types::error::metadata::ProvideErrorMetadata;
50use fail::fail_point;
51use futures::future::{BoxFuture, FutureExt, try_join_all};
52use futures::{Stream, StreamExt, TryStreamExt, stream};
53use hyper::Body;
54use itertools::Itertools;
55use risingwave_common::config::ObjectStoreConfig;
56use risingwave_common::monitor::monitor_connector;
57use risingwave_common::range::RangeBoundsExt;
58use thiserror_ext::AsReport;
59use tokio::task::JoinHandle;
60
61use super::object_metrics::ObjectStoreMetrics;
62use super::{
63 Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds, ObjectResult,
64 ObjectStore, StreamingUploader, prefix, retry_request,
65};
66use crate::object::{
67 ObjectDataStream, ObjectMetadataIter, OperationType, try_update_failure_metric,
68};
69
70type PartId = i32;
71
72const MIN_PART_ID: PartId = 1;
74const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
77
78pub struct S3StreamingUploader {
83 client: Client,
84 part_size: usize,
85 bucket: String,
86 key: String,
88 upload_id: Option<String>,
90 next_part_id: PartId,
92 join_handles: Vec<JoinHandle<ObjectResult<(PartId, UploadPartOutput)>>>,
94 buf: Vec<Bytes>,
97 not_uploaded_len: usize,
99 metrics: Arc<ObjectStoreMetrics>,
101
102 config: Arc<ObjectStoreConfig>,
103}
104
105impl S3StreamingUploader {
106 const MEDIA_TYPE: &'static str = "s3";
107
108 pub fn new(
109 client: Client,
110 bucket: String,
111 key: String,
112 metrics: Arc<ObjectStoreMetrics>,
113 config: Arc<ObjectStoreConfig>,
114 ) -> S3StreamingUploader {
115 const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
120 const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024;
121 let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE);
122
123 Self {
124 client,
125 bucket,
126 part_size,
127 key,
128 upload_id: None,
129 next_part_id: MIN_PART_ID,
130 join_handles: Default::default(),
131 buf: Default::default(),
132 not_uploaded_len: 0,
133 metrics,
134 config,
135 }
136 }
137
138 async fn upload_next_part(&mut self) -> ObjectResult<()> {
139 let operation_type = OperationType::StreamingUpload;
140 let operation_type_str = operation_type.as_str();
141
142 if self.upload_id.is_none() {
144 let builder = || async {
145 self.client
146 .create_multipart_upload()
147 .bucket(&self.bucket)
148 .key(&self.key)
149 .send()
150 .await
151 .map_err(|err| {
152 set_error_should_retry::<CreateMultipartUploadError>(
153 self.config.clone(),
154 err.into(),
155 )
156 })
157 };
158
159 let resp = retry_request(
160 builder,
161 &self.config,
162 OperationType::StreamingUploadInit,
163 self.metrics.clone(),
164 Self::MEDIA_TYPE,
165 )
166 .await;
167
168 try_update_failure_metric(
169 &self.metrics,
170 &resp,
171 OperationType::StreamingUploadInit.as_str(),
172 );
173
174 self.upload_id = Some(resp?.upload_id.unwrap());
175 }
176
177 let data = self.buf.drain(..).collect_vec();
179 let len = self.not_uploaded_len;
180 debug_assert_eq!(
181 data.iter().map(|b| b.len()).sum::<usize>(),
182 self.not_uploaded_len
183 );
184
185 let part_id = self.next_part_id;
187 self.next_part_id += 1;
188
189 let client_cloned = self.client.clone();
191 let bucket = self.bucket.clone();
192 let key = self.key.clone();
193 let upload_id = self.upload_id.clone().unwrap();
194
195 let metrics = self.metrics.clone();
196 metrics
197 .operation_size
198 .with_label_values(&[operation_type_str])
199 .observe(len as f64);
200 let config = self.config.clone();
201
202 self.join_handles.push(tokio::spawn(async move {
203 let _timer = metrics
204 .operation_latency
205 .with_label_values(&["s3", operation_type_str])
206 .start_timer();
207
208 let builder = || async {
209 client_cloned
210 .upload_part()
211 .bucket(bucket.clone())
212 .key(key.clone())
213 .upload_id(upload_id.clone())
214 .part_number(part_id)
215 .body(get_upload_body(data.clone()))
216 .content_length(len as i64)
217 .send()
218 .await
219 .map_err(|err| {
220 set_error_should_retry::<CreateMultipartUploadError>(
221 config.clone(),
222 err.into(),
223 )
224 })
225 };
226
227 let res = retry_request(
228 builder,
229 &config,
230 operation_type,
231 metrics.clone(),
232 Self::MEDIA_TYPE,
233 )
234 .await;
235 try_update_failure_metric(&metrics, &res, operation_type_str);
236 Ok((part_id, res?))
237 }));
238
239 Ok(())
240 }
241
242 async fn flush_multipart_and_complete(&mut self) -> ObjectResult<()> {
243 let operation_type = OperationType::StreamingUploadFinish;
244
245 if !self.buf.is_empty() {
246 self.upload_next_part().await?;
247 }
248
249 let join_handles = self.join_handles.drain(..).collect_vec();
251
252 let mut uploaded_parts = Vec::with_capacity(join_handles.len());
253 for result in try_join_all(join_handles)
254 .await
255 .map_err(ObjectError::internal)?
256 {
257 uploaded_parts.push(result?);
258 }
259
260 let completed_parts = Some(
261 uploaded_parts
262 .iter()
263 .map(|(part_id, output)| {
264 CompletedPart::builder()
265 .set_e_tag(output.e_tag.clone())
266 .set_part_number(Some(*part_id))
267 .build()
268 })
269 .collect_vec(),
270 );
271
272 let builder = || async {
273 self.client
274 .complete_multipart_upload()
275 .bucket(&self.bucket)
276 .key(&self.key)
277 .upload_id(self.upload_id.as_ref().unwrap())
278 .multipart_upload(
279 CompletedMultipartUpload::builder()
280 .set_parts(completed_parts.clone())
281 .build(),
282 )
283 .send()
284 .await
285 .map_err(|err| {
286 set_error_should_retry::<CompleteMultipartUploadError>(
287 self.config.clone(),
288 err.into(),
289 )
290 })
291 };
292
293 let res = retry_request(
294 builder,
295 &self.config,
296 operation_type,
297 self.metrics.clone(),
298 Self::MEDIA_TYPE,
299 )
300 .await;
301 try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
302 let _res = res?;
303
304 Ok(())
305 }
306
307 async fn abort_multipart_upload(&self) -> ObjectResult<()> {
308 self.client
309 .abort_multipart_upload()
310 .bucket(&self.bucket)
311 .key(&self.key)
312 .upload_id(self.upload_id.as_ref().unwrap())
313 .send()
314 .await
315 .map_err(|err| {
316 set_error_should_retry::<AbortMultipartUploadError>(self.config.clone(), err.into())
317 })?;
318 Ok(())
319 }
320}
321
322impl StreamingUploader for S3StreamingUploader {
323 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
324 fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal(
325 "s3 write bytes error"
326 )));
327 let data_len = data.len();
328 self.not_uploaded_len += data_len;
329 self.buf.push(data);
330
331 if self.not_uploaded_len >= self.part_size {
332 self.upload_next_part()
333 .instrument_await("s3_upload_next_part".verbose())
334 .await?;
335 self.not_uploaded_len = 0;
336 }
337 Ok(())
338 }
339
340 async fn finish(mut self) -> ObjectResult<()> {
344 fail_point!("s3_finish_streaming_upload_err", |_| Err(
345 ObjectError::internal("s3 finish streaming upload error")
346 ));
347
348 if self.upload_id.is_none() {
349 debug_assert!(self.join_handles.is_empty());
350 if self.buf.is_empty() {
351 debug_assert_eq!(self.not_uploaded_len, 0);
352 Err(ObjectError::internal("upload empty object"))
353 } else {
354 let operation_type = OperationType::Upload;
355 let builder = || async {
356 self.client
357 .put_object()
358 .bucket(&self.bucket)
359 .body(get_upload_body(self.buf.clone()))
360 .content_length(self.not_uploaded_len as i64)
361 .key(&self.key)
362 .send()
363 .instrument_await("s3_put_object".verbose())
364 .await
365 .map_err(|err| {
366 set_error_should_retry::<PutObjectError>(
367 self.config.clone(),
368 err.into(),
369 )
370 })
371 };
372
373 let res = retry_request(
374 builder,
375 &self.config,
376 operation_type,
377 self.metrics.clone(),
378 Self::MEDIA_TYPE,
379 )
380 .await;
381 try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
382 res?;
383 Ok(())
384 }
385 } else {
386 match self
387 .flush_multipart_and_complete()
388 .instrument_await("s3_flush_multipart_and_complete".verbose())
389 .await
390 {
391 Err(e) => {
392 tracing::warn!(key = self.key, error = %e.as_report(), "Failed to upload object");
393 self.abort_multipart_upload().await?;
394 Err(e)
395 }
396 _ => Ok(()),
397 }
398 }
399 }
400
401 fn get_memory_usage(&self) -> u64 {
402 self.part_size as u64
403 }
404}
405
406fn get_upload_body(data: Vec<Bytes>) -> ByteStream {
407 SdkBody::retryable(move || {
408 Body::wrap_stream(stream::iter(data.clone().into_iter().map(ObjectResult::Ok))).into()
409 })
410 .into()
411}
412
413#[derive(Clone)]
416pub struct S3ObjectStore {
417 client: Client,
418 bucket: String,
419 metrics: Arc<ObjectStoreMetrics>,
421
422 config: Arc<ObjectStoreConfig>,
423}
424
425#[async_trait::async_trait]
426impl ObjectStore for S3ObjectStore {
427 type StreamingUploader = S3StreamingUploader;
428
429 fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
430 prefix::s3::get_object_prefix(obj_id)
433 }
434
435 async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
436 fail_point!("s3_upload_err", |_| Err(ObjectError::internal(
437 "s3 upload error"
438 )));
439 if obj.is_empty() {
440 Err(ObjectError::internal("upload empty object"))
441 } else {
442 self.client
443 .put_object()
444 .bucket(&self.bucket)
445 .body(ByteStream::from(obj))
446 .key(path)
447 .send()
448 .await
449 .map_err(|err| {
450 set_error_should_retry::<PutObjectError>(self.config.clone(), err.into())
451 })?;
452 Ok(())
453 }
454 }
455
456 async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
457 fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal(
458 "s3 streaming upload error"
459 )));
460 Ok(S3StreamingUploader::new(
461 self.client.clone(),
462 self.bucket.clone(),
463 path.to_owned(),
464 self.metrics.clone(),
465 self.config.clone(),
466 ))
467 }
468
469 async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
471 fail_point!("s3_read_err", |_| Err(ObjectError::internal(
472 "s3 read error"
473 )));
474
475 let val = match self.obj_store_request(path, range.clone()).send().await {
476 Ok(resp) => resp
477 .body
478 .collect()
479 .await
480 .map_err(|err| {
481 set_error_should_retry::<GetObjectError>(self.config.clone(), err.into())
482 })?
483 .into_bytes(),
484 Err(sdk_err) => {
485 return Err(set_error_should_retry::<GetObjectError>(
486 self.config.clone(),
487 sdk_err.into(),
488 ));
489 }
490 };
491
492 if let Some(len) = range.len()
493 && len != val.len()
494 {
495 return Err(ObjectError::internal(format!(
496 "mismatched size: expected {}, found {} when reading {} at {:?}",
497 len,
498 val.len(),
499 path,
500 range,
501 )));
502 }
503
504 Ok(val)
505 }
506
507 async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
508 fail_point!("s3_metadata_err", |_| Err(ObjectError::internal(
509 "s3 metadata error"
510 )));
511 let resp = self
512 .client
513 .head_object()
514 .bucket(&self.bucket)
515 .key(path)
516 .send()
517 .await
518 .map_err(|err| {
519 set_error_should_retry::<HeadObjectError>(self.config.clone(), err.into())
520 })?;
521 Ok(ObjectMetadata {
522 key: path.to_owned(),
523 last_modified: resp
524 .last_modified()
525 .expect("last_modified required")
526 .as_secs_f64(),
527 total_size: resp.content_length.unwrap_or_default() as usize,
528 })
529 }
530
531 async fn streaming_read(
535 &self,
536 path: &str,
537 range: Range<usize>,
538 ) -> ObjectResult<ObjectDataStream> {
539 fail_point!("s3_streaming_read_init_err", |_| Err(
540 ObjectError::internal("s3 streaming read init error")
541 ));
542
543 let resp = match self.obj_store_request(path, range.clone()).send().await {
544 Ok(resp) => resp,
545 Err(sdk_err) => {
546 return Err(set_error_should_retry::<GetObjectError>(
547 self.config.clone(),
548 sdk_err.into(),
549 ));
550 }
551 };
552
553 let reader = FuturesStreamCompatByteStream::new(resp.body);
554
555 Ok(Box::pin(
556 reader
557 .into_stream()
558 .map(|item| item.map_err(ObjectError::from)),
559 ))
560 }
561
562 async fn delete(&self, path: &str) -> ObjectResult<()> {
565 fail_point!("s3_delete_err", |_| Err(ObjectError::internal(
566 "s3 delete error"
567 )));
568 self.client
569 .delete_object()
570 .bucket(&self.bucket)
571 .key(path)
572 .send()
573 .await
574 .map_err(|err| {
575 set_error_should_retry::<DeleteObjectError>(self.config.clone(), err.into())
576 })?;
577 Ok(())
578 }
579
580 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
585 const MAX_LEN: usize = 1000;
587 let mut all_errors = Vec::new();
588
589 for start_idx in (0..paths.len()).step_by(MAX_LEN) {
591 let end_idx = cmp::min(paths.len(), start_idx + MAX_LEN);
592 let slice = &paths[start_idx..end_idx];
593 let mut obj_ids = Vec::with_capacity(slice.len());
595 for path in slice {
596 obj_ids.push(ObjectIdentifier::builder().key(path).build().unwrap());
597 }
598
599 let delete_builder = Delete::builder().set_objects(Some(obj_ids));
601 let delete_output = self
602 .client
603 .delete_objects()
604 .bucket(&self.bucket)
605 .delete(delete_builder.build().unwrap()).send()
606 .await.map_err(|err| {
607 set_error_should_retry::<DeleteObjectsError>(self.config.clone(),err.into())
608 })?;
609
610 if !delete_output.errors().is_empty() {
612 all_errors.append(&mut delete_output.errors().to_owned());
613 }
614 }
615 if !all_errors.is_empty() {
616 return Err(ObjectError::internal(format!(
617 "DeleteObjects request returned exception for some objects: {:?}",
618 all_errors
619 )));
620 }
621
622 Ok(())
623 }
624
625 async fn list(
626 &self,
627 prefix: &str,
628 start_after: Option<String>,
629 limit: Option<usize>,
630 ) -> ObjectResult<ObjectMetadataIter> {
631 Ok(Box::pin(
632 S3ObjectIter::new(
633 self.client.clone(),
634 self.bucket.clone(),
635 prefix.to_owned(),
636 self.config.clone(),
637 start_after,
638 )
639 .take(limit.unwrap_or(usize::MAX)),
640 ))
641 }
642
643 fn store_media_type(&self) -> &'static str {
644 "s3"
645 }
646}
647
648impl S3ObjectStore {
649 pub fn new_http_client(config: &ObjectStoreConfig) -> impl HttpClient + use<> {
650 let mut http = hyper::client::HttpConnector::new();
651
652 if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
654 http.set_keepalive(Some(Duration::from_millis(*keepalive_ms)));
655 }
656
657 if let Some(nodelay) = config.s3.nodelay.as_ref() {
658 http.set_nodelay(*nodelay);
659 }
660
661 if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() {
662 http.set_recv_buffer_size(Some(*recv_buffer_size));
663 }
664
665 if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() {
666 http.set_send_buffer_size(Some(*send_buffer_size));
667 }
668
669 http.enforce_http(false);
670
671 let conn = hyper_rustls::HttpsConnectorBuilder::new()
672 .with_webpki_roots()
673 .https_or_http()
674 .enable_all_versions()
675 .wrap_connector(http);
676
677 let conn = monitor_connector(conn, "S3");
678
679 HyperClientBuilder::new().build(conn)
680 }
681
682 pub async fn new_with_config(
686 bucket: String,
687 metrics: Arc<ObjectStoreMetrics>,
688 config: Arc<ObjectStoreConfig>,
689 ) -> Self {
690 let sdk_config_loader = aws_config::from_env().http_client(Self::new_http_client(&config));
691
692 let client = match std::env::var("RW_S3_ENDPOINT") {
694 Ok(endpoint) => {
695 let is_force_path_style = match std::env::var("RW_IS_FORCE_PATH_STYLE") {
697 Ok(value) => value == "true",
698 Err(_) => false,
699 };
700
701 let sdk_config = sdk_config_loader.load().await;
702 #[cfg(madsim)]
703 let client = Client::new(&sdk_config);
704 #[cfg(not(madsim))]
705 let client = Client::from_conf(
706 aws_sdk_s3::config::Builder::from(&sdk_config)
707 .endpoint_url(endpoint)
708 .force_path_style(is_force_path_style)
709 .identity_cache(
710 aws_sdk_s3::config::IdentityCache::lazy()
711 .load_timeout(Duration::from_secs(
712 config.s3.identity_resolution_timeout_s,
713 ))
714 .build(),
715 )
716 .stalled_stream_protection(
717 aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
718 )
719 .build(),
720 );
721 client
722 }
723 Err(_) => {
724 let sdk_config = sdk_config_loader.load().await;
726 #[cfg(madsim)]
727 let client = Client::new(&sdk_config);
728 #[cfg(not(madsim))]
729 let client = Client::from_conf(
730 aws_sdk_s3::config::Builder::from(&sdk_config)
731 .identity_cache(
732 aws_sdk_s3::config::IdentityCache::lazy()
733 .load_timeout(Duration::from_secs(
734 config.s3.identity_resolution_timeout_s,
735 ))
736 .build(),
737 )
738 .stalled_stream_protection(
739 aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
740 )
741 .build(),
742 );
743 client
744 }
745 };
746
747 Self {
748 client,
749 bucket,
750 metrics,
751 config,
752 }
753 }
754
755 pub async fn new_minio_engine(
757 server: &str,
758 metrics: Arc<ObjectStoreMetrics>,
759 object_store_config: Arc<ObjectStoreConfig>,
760 ) -> Self {
761 let server = server.strip_prefix("minio://").unwrap();
762 let (access_key_id, rest) = server.split_once(':').unwrap();
763 let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
764
765 let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
766 rest = rest_stripped;
767 "https://"
768 } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
769 rest = rest_stripped;
770 "http://"
771 } else {
772 "http://"
773 };
774 let (address, bucket) = rest.split_once('/').unwrap();
775
776 #[cfg(madsim)]
777 let builder = aws_sdk_s3::config::Builder::new().credentials_provider(
778 Credentials::from_keys(access_key_id, secret_access_key, None),
779 );
780 #[cfg(not(madsim))]
781 let builder = aws_sdk_s3::config::Builder::from(
782 &aws_config::ConfigLoader::default()
783 .credentials_provider(Credentials::from_keys(
785 access_key_id,
786 secret_access_key,
787 None,
788 ))
789 .load()
790 .await,
791 )
792 .force_path_style(true)
793 .identity_cache(
794 aws_sdk_s3::config::IdentityCache::lazy()
795 .load_timeout(Duration::from_secs(
796 object_store_config.s3.identity_resolution_timeout_s,
797 ))
798 .build(),
799 )
800 .http_client(Self::new_http_client(&object_store_config))
801 .behavior_version_latest()
802 .stalled_stream_protection(aws_sdk_s3::config::StalledStreamProtectionConfig::disabled());
803 let config = builder
804 .region(Region::new("custom"))
805 .endpoint_url(format!("{}{}", endpoint_prefix, address))
806 .build();
807 let client = Client::from_conf(config);
808
809 Self {
810 client,
811 bucket: bucket.to_owned(),
812 metrics,
813 config: object_store_config,
814 }
815 }
816
817 fn obj_store_request(
822 &self,
823 path: &str,
824 range: impl ObjectRangeBounds,
825 ) -> GetObjectFluentBuilder {
826 let req = self.client.get_object().bucket(&self.bucket).key(path);
827 if range.is_full() {
828 return req;
829 }
830
831 let start = range.start().map(|v| v.to_string()).unwrap_or_default();
832 let end = range.end().map(|v| (v - 1).to_string()).unwrap_or_default(); req.range(format!("bytes={}-{}", start, end))
835 }
836
837 pub async fn configure_bucket_lifecycle(&self, data_directory: &str) -> bool {
853 let bucket = self.bucket.as_str();
855 let mut configured_rules = vec![];
856 let get_config_result = self
857 .client
858 .get_bucket_lifecycle_configuration()
859 .bucket(bucket)
860 .send()
861 .await;
862 let mut is_expiration_configured = false;
863
864 if let Ok(config) = &get_config_result {
865 for rule in config.rules() {
866 if rule.expiration().is_some() {
867 is_expiration_configured |= rule.status == ExpirationStatus::Enabled && match rule.filter().as_ref() {
876 None => true,
878 Some(LifecycleRuleFilter::Prefix(prefix))
880 if data_directory.starts_with(prefix) =>
881 {
882 true
883 }
884 _ => false,
885 };
886
887 if matches!(rule.status(), ExpirationStatus::Enabled)
888 && rule.abort_incomplete_multipart_upload().is_some()
889 {
890 configured_rules.push(rule);
891 }
892 }
893 }
894 }
895
896 if !configured_rules.is_empty() {
897 tracing::info!(
898 "S3 bucket {} has already configured AbortIncompleteMultipartUpload: {:?}",
899 bucket,
900 configured_rules,
901 );
902 } else {
903 let bucket_lifecycle_rule = LifecycleRule::builder()
904 .id("abort-incomplete-multipart-upload")
905 .status(ExpirationStatus::Enabled)
906 .filter(LifecycleRuleFilter::Prefix(String::new()))
907 .abort_incomplete_multipart_upload(
908 AbortIncompleteMultipartUpload::builder()
909 .days_after_initiation(S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS)
910 .build(),
911 )
912 .build()
913 .unwrap();
914 let bucket_lifecycle_config = BucketLifecycleConfiguration::builder()
915 .rules(bucket_lifecycle_rule)
916 .build()
917 .unwrap();
918 if self
919 .client
920 .put_bucket_lifecycle_configuration()
921 .bucket(bucket)
922 .lifecycle_configuration(bucket_lifecycle_config)
923 .send()
924 .await
925 .is_ok()
926 {
927 tracing::info!(
928 "S3 bucket {:?} is configured to automatically purge abandoned MultipartUploads after {} days",
929 bucket,
930 S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS,
931 );
932 } else {
933 tracing::warn!(
934 "Failed to configure life cycle rule for S3 bucket: {:?}. It is recommended to configure it manually to avoid unnecessary storage cost.",
935 bucket
936 );
937 }
938 }
939 if is_expiration_configured {
940 tracing::info!(
941 "S3 bucket {} has already configured the expiration for the lifecycle.",
942 bucket,
943 );
944 }
945 is_expiration_configured
946 }
947}
948
949struct S3ObjectIter {
950 buffer: VecDeque<ObjectMetadata>,
951 client: Client,
952 bucket: String,
953 prefix: String,
954 next_continuation_token: Option<String>,
955 is_truncated: Option<bool>,
956 #[allow(clippy::type_complexity)]
957 send_future: Option<
958 BoxFuture<
959 'static,
960 Result<(Vec<ObjectMetadata>, Option<String>, Option<bool>), ObjectError>,
961 >,
962 >,
963
964 config: Arc<ObjectStoreConfig>,
965 start_after: Option<String>,
966}
967
968impl S3ObjectIter {
969 fn new(
970 client: Client,
971 bucket: String,
972 prefix: String,
973 config: Arc<ObjectStoreConfig>,
974 start_after: Option<String>,
975 ) -> Self {
976 Self {
977 buffer: VecDeque::default(),
978 client,
979 bucket,
980 prefix,
981 next_continuation_token: None,
982 is_truncated: Some(true),
983 send_future: None,
984 config,
985 start_after,
986 }
987 }
988}
989
990impl Stream for S3ObjectIter {
991 type Item = ObjectResult<ObjectMetadata>;
992
993 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
994 if let Some(e) = self.buffer.pop_front() {
995 return Poll::Ready(Some(Ok(e)));
996 }
997 if let Some(f) = self.send_future.as_mut() {
998 return match ready!(f.poll_unpin(cx)) {
999 Ok((more, next_continuation_token, is_truncated)) => {
1000 self.next_continuation_token = next_continuation_token;
1001 self.is_truncated = is_truncated;
1002 self.buffer.extend(more);
1003 self.send_future = None;
1004 self.start_after = None;
1006 self.poll_next(cx)
1007 }
1008 Err(e) => {
1009 self.send_future = None;
1010 Poll::Ready(Some(Err(e)))
1011 }
1012 };
1013 }
1014 if !self.is_truncated.unwrap_or_default() {
1015 return Poll::Ready(None);
1016 }
1017 let mut request = self
1018 .client
1019 .list_objects_v2()
1020 .bucket(&self.bucket)
1021 .prefix(&self.prefix);
1022 #[cfg(not(madsim))]
1023 if let Some(start_after) = self.start_after.as_ref() {
1024 request = request.start_after(start_after);
1025 }
1026 if let Some(continuation_token) = self.next_continuation_token.as_ref() {
1027 request = request.continuation_token(continuation_token);
1028 }
1029 let config = self.config.clone();
1030 let f = async move {
1031 match request.send().await {
1032 Ok(r) => {
1033 let more = r
1034 .contents()
1035 .iter()
1036 .map(|obj| ObjectMetadata {
1037 key: obj.key().expect("key required").to_owned(),
1038 last_modified: obj
1039 .last_modified()
1040 .map(|l| l.as_secs_f64())
1041 .unwrap_or(0f64),
1042 total_size: obj.size().unwrap_or_default() as usize,
1043 })
1044 .collect_vec();
1045 let is_truncated = r.is_truncated;
1046 let next_continuation_token = r.next_continuation_token;
1047 Ok((more, next_continuation_token, is_truncated))
1048 }
1049 Err(e) => Err(set_error_should_retry::<ListObjectsV2Error>(
1050 config,
1051 e.into(),
1052 )),
1053 }
1054 };
1055 self.send_future = Some(Box::pin(f));
1056 self.poll_next(cx)
1057 }
1058}
1059
1060fn set_error_should_retry<E>(config: Arc<ObjectStoreConfig>, object_err: ObjectError) -> ObjectError
1061where
1062 E: ProvideErrorMetadata + Into<BoxError> + Sync + Send + std::error::Error + 'static,
1063{
1064 let not_found = object_err.is_object_not_found_error();
1065
1066 if not_found {
1067 return object_err;
1068 }
1069
1070 let mut inner = object_err.into_inner();
1071 match inner.borrow_mut() {
1072 ObjectErrorInner::S3 {
1073 should_retry,
1074 inner,
1075 } => {
1076 let sdk_err = inner
1077 .as_ref()
1078 .downcast_ref::<SdkError<E, aws_smithy_runtime_api::http::Response<SdkBody>>>();
1079
1080 let err_should_retry = match sdk_err {
1081 Some(SdkError::DispatchFailure(e)) => {
1082 if e.is_timeout() {
1083 tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request.");
1084 true
1085 } else {
1086 false
1087 }
1088 }
1089
1090 Some(SdkError::ServiceError(e)) => match e.err().code() {
1091 None => {
1092 if config.s3.developer.retry_unknown_service_error
1093 || config.s3.retry_unknown_service_error
1094 {
1095 tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
1096 true
1097 } else {
1098 false
1099 }
1100 }
1101 Some(code) => {
1102 if config
1103 .s3
1104 .developer
1105 .retryable_service_error_codes
1106 .iter()
1107 .any(|s| s.as_str().eq_ignore_ascii_case(code))
1108 {
1109 tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
1110 true
1111 } else {
1112 false
1113 }
1114 }
1115 },
1116
1117 Some(SdkError::TimeoutError(_err)) => true,
1118
1119 _ => false,
1120 };
1121
1122 *should_retry = err_should_retry;
1123 }
1124
1125 _ => unreachable!(),
1126 }
1127
1128 ObjectError::from(inner)
1129}
1130
1131#[cfg(test)]
1132#[cfg(not(madsim))]
1133mod tests {
1134 use crate::object::prefix::s3::{NUM_BUCKET_PREFIXES, get_object_prefix};
1135
1136 fn get_hash_of_object(obj_id: u64) -> u32 {
1137 let crc_hash = crc32fast::hash(&obj_id.to_be_bytes());
1138 crc_hash % NUM_BUCKET_PREFIXES
1139 }
1140
1141 #[tokio::test]
1142 async fn test_get_object_prefix() {
1143 for obj_id in 0..99999 {
1144 let hash = get_hash_of_object(obj_id);
1145 let prefix = get_object_prefix(obj_id);
1146 assert_eq!(format!("{}/", hash), prefix);
1147 }
1148
1149 let obj_prefix = String::default();
1150 let path = format!("{}/{}{}.data", "hummock_001", obj_prefix, 101);
1151 assert_eq!("hummock_001/101.data", path);
1152 }
1153}