1use std::borrow::BorrowMut;
16use std::cmp;
17use std::collections::VecDeque;
18use std::ops::Range;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22use std::time::Duration;
23
24use await_tree::{InstrumentAwait, SpanExt};
25use aws_sdk_s3::Client;
26use aws_sdk_s3::config::{Credentials, Region};
27use aws_sdk_s3::error::BoxError;
28use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
29use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
30use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
31use aws_sdk_s3::operation::delete_object::DeleteObjectError;
32use aws_sdk_s3::operation::delete_objects::DeleteObjectsError;
33use aws_sdk_s3::operation::get_object::GetObjectError;
34use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
35use aws_sdk_s3::operation::head_object::HeadObjectError;
36use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
37use aws_sdk_s3::operation::put_object::PutObjectError;
38use aws_sdk_s3::operation::upload_part::UploadPartOutput;
39use aws_sdk_s3::primitives::ByteStream;
40use aws_sdk_s3::types::{
41 AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
42 CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
43};
44use aws_smithy_http::futures_stream_adapter::FuturesStreamCompatByteStream;
45use aws_smithy_http_client::{
46 Builder as SmithyHttpClientBuilder, Connector as SmithyConnector, tls,
47};
48use aws_smithy_runtime_api::client::http::HttpClient;
49use aws_smithy_runtime_api::client::result::SdkError;
50use aws_smithy_types::body::SdkBody;
51use aws_smithy_types::error::metadata::ProvideErrorMetadata;
52use bytes::BytesMut;
53use fail::fail_point;
54use futures::future::{BoxFuture, FutureExt, try_join_all};
55use futures::{Stream, StreamExt, TryStreamExt};
56use itertools::Itertools;
57use risingwave_common::config::ObjectStoreConfig;
58use risingwave_common::range::RangeBoundsExt;
59use thiserror_ext::AsReport;
60use tokio::task::JoinHandle;
61
62use super::object_metrics::ObjectStoreMetrics;
63use super::{
64 Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds, ObjectResult,
65 ObjectStore, StreamingUploader, prefix, retry_request,
66};
67use crate::object::{
68 ObjectDataStream, ObjectMetadataIter, OperationType, try_update_failure_metric,
69};
70
71type PartId = i32;
72
73const MIN_PART_ID: PartId = 1;
75const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
78
79pub struct S3StreamingUploader {
84 client: Client,
85 part_size: usize,
86 bucket: String,
87 key: String,
89 upload_id: Option<String>,
91 next_part_id: PartId,
93 join_handles: Vec<JoinHandle<ObjectResult<(PartId, UploadPartOutput)>>>,
95 buf: Vec<Bytes>,
98 not_uploaded_len: usize,
100 metrics: Arc<ObjectStoreMetrics>,
102
103 config: Arc<ObjectStoreConfig>,
104}
105
106impl S3StreamingUploader {
107 const MEDIA_TYPE: &'static str = "s3";
108
109 pub fn new(
110 client: Client,
111 bucket: String,
112 key: String,
113 metrics: Arc<ObjectStoreMetrics>,
114 config: Arc<ObjectStoreConfig>,
115 ) -> S3StreamingUploader {
116 const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
121 const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024;
122 let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE);
123
124 Self {
125 client,
126 bucket,
127 part_size,
128 key,
129 upload_id: None,
130 next_part_id: MIN_PART_ID,
131 join_handles: Default::default(),
132 buf: Default::default(),
133 not_uploaded_len: 0,
134 metrics,
135 config,
136 }
137 }
138
139 async fn upload_next_part(&mut self) -> ObjectResult<()> {
140 let operation_type = OperationType::StreamingUpload;
141 let operation_type_str = operation_type.as_str();
142
143 if self.upload_id.is_none() {
145 let builder = || async {
146 self.client
147 .create_multipart_upload()
148 .bucket(&self.bucket)
149 .key(&self.key)
150 .send()
151 .await
152 .map_err(|err| {
153 set_error_should_retry::<CreateMultipartUploadError>(
154 self.config.clone(),
155 err.into(),
156 )
157 })
158 };
159
160 let resp = retry_request(
161 builder,
162 &self.config,
163 OperationType::StreamingUploadInit,
164 self.metrics.clone(),
165 Self::MEDIA_TYPE,
166 )
167 .await;
168
169 try_update_failure_metric(
170 &self.metrics,
171 &resp,
172 OperationType::StreamingUploadInit.as_str(),
173 );
174
175 self.upload_id = Some(resp?.upload_id.unwrap());
176 }
177
178 let data = self.buf.drain(..).collect_vec();
180 let len = self.not_uploaded_len;
181 debug_assert_eq!(
182 data.iter().map(|b| b.len()).sum::<usize>(),
183 self.not_uploaded_len
184 );
185
186 let part_id = self.next_part_id;
188 self.next_part_id += 1;
189
190 let client_cloned = self.client.clone();
192 let bucket = self.bucket.clone();
193 let key = self.key.clone();
194 let upload_id = self.upload_id.clone().unwrap();
195
196 let metrics = self.metrics.clone();
197 metrics
198 .operation_size
199 .with_label_values(&[operation_type_str])
200 .observe(len as f64);
201 let config = self.config.clone();
202
203 self.join_handles.push(tokio::spawn(async move {
204 let _timer = metrics
205 .operation_latency
206 .with_label_values(&["s3", operation_type_str])
207 .start_timer();
208
209 let builder = || async {
210 client_cloned
211 .upload_part()
212 .bucket(bucket.clone())
213 .key(key.clone())
214 .upload_id(upload_id.clone())
215 .part_number(part_id)
216 .body(get_upload_body(data.clone()))
217 .content_length(len as i64)
218 .send()
219 .await
220 .map_err(|err| {
221 set_error_should_retry::<CreateMultipartUploadError>(
222 config.clone(),
223 err.into(),
224 )
225 })
226 };
227
228 let res = retry_request(
229 builder,
230 &config,
231 operation_type,
232 metrics.clone(),
233 Self::MEDIA_TYPE,
234 )
235 .await;
236 try_update_failure_metric(&metrics, &res, operation_type_str);
237 Ok((part_id, res?))
238 }));
239
240 Ok(())
241 }
242
243 async fn flush_multipart_and_complete(&mut self) -> ObjectResult<()> {
244 let operation_type = OperationType::StreamingUploadFinish;
245
246 if !self.buf.is_empty() {
247 self.upload_next_part().await?;
248 }
249
250 let join_handles = self.join_handles.drain(..).collect_vec();
252
253 let mut uploaded_parts = Vec::with_capacity(join_handles.len());
254 for result in try_join_all(join_handles)
255 .await
256 .map_err(ObjectError::internal)?
257 {
258 uploaded_parts.push(result?);
259 }
260
261 let completed_parts = Some(
262 uploaded_parts
263 .iter()
264 .map(|(part_id, output)| {
265 CompletedPart::builder()
266 .set_e_tag(output.e_tag.clone())
267 .set_part_number(Some(*part_id))
268 .build()
269 })
270 .collect_vec(),
271 );
272
273 let builder = || async {
274 self.client
275 .complete_multipart_upload()
276 .bucket(&self.bucket)
277 .key(&self.key)
278 .upload_id(self.upload_id.as_ref().unwrap())
279 .multipart_upload(
280 CompletedMultipartUpload::builder()
281 .set_parts(completed_parts.clone())
282 .build(),
283 )
284 .send()
285 .await
286 .map_err(|err| {
287 set_error_should_retry::<CompleteMultipartUploadError>(
288 self.config.clone(),
289 err.into(),
290 )
291 })
292 };
293
294 let res = retry_request(
295 builder,
296 &self.config,
297 operation_type,
298 self.metrics.clone(),
299 Self::MEDIA_TYPE,
300 )
301 .await;
302 try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
303 let _res = res?;
304
305 Ok(())
306 }
307
308 async fn abort_multipart_upload(&self) -> ObjectResult<()> {
309 self.client
310 .abort_multipart_upload()
311 .bucket(&self.bucket)
312 .key(&self.key)
313 .upload_id(self.upload_id.as_ref().unwrap())
314 .send()
315 .await
316 .map_err(|err| {
317 set_error_should_retry::<AbortMultipartUploadError>(self.config.clone(), err.into())
318 })?;
319 Ok(())
320 }
321}
322
323impl StreamingUploader for S3StreamingUploader {
324 async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
325 fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal(
326 "s3 write bytes error"
327 )));
328 let data_len = data.len();
329 self.not_uploaded_len += data_len;
330 self.buf.push(data);
331
332 if self.not_uploaded_len >= self.part_size {
333 self.upload_next_part()
334 .instrument_await("s3_upload_next_part".verbose())
335 .await?;
336 self.not_uploaded_len = 0;
337 }
338 Ok(())
339 }
340
341 async fn finish(mut self) -> ObjectResult<()> {
345 fail_point!("s3_finish_streaming_upload_err", |_| Err(
346 ObjectError::internal("s3 finish streaming upload error")
347 ));
348
349 if self.upload_id.is_none() {
350 debug_assert!(self.join_handles.is_empty());
351 if self.buf.is_empty() {
352 debug_assert_eq!(self.not_uploaded_len, 0);
353 Err(ObjectError::internal("upload empty object"))
354 } else {
355 let operation_type = OperationType::Upload;
356 let builder = || async {
357 self.client
358 .put_object()
359 .bucket(&self.bucket)
360 .body(get_upload_body(self.buf.clone()))
361 .content_length(self.not_uploaded_len as i64)
362 .key(&self.key)
363 .send()
364 .instrument_await("s3_put_object".verbose())
365 .await
366 .map_err(|err| {
367 set_error_should_retry::<PutObjectError>(
368 self.config.clone(),
369 err.into(),
370 )
371 })
372 };
373
374 let res = retry_request(
375 builder,
376 &self.config,
377 operation_type,
378 self.metrics.clone(),
379 Self::MEDIA_TYPE,
380 )
381 .await;
382 try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
383 res?;
384 Ok(())
385 }
386 } else {
387 match self
388 .flush_multipart_and_complete()
389 .instrument_await("s3_flush_multipart_and_complete".verbose())
390 .await
391 {
392 Err(e) => {
393 tracing::warn!(key = self.key, error = %e.as_report(), "Failed to upload object");
394 self.abort_multipart_upload().await?;
395 Err(e)
396 }
397 _ => Ok(()),
398 }
399 }
400 }
401
402 fn get_memory_usage(&self) -> u64 {
403 self.part_size as u64
404 }
405}
406
407fn get_upload_body(data: Vec<Bytes>) -> ByteStream {
408 let total_len: usize = data.iter().map(|b| b.len()).sum();
411 let mut buf = BytesMut::with_capacity(total_len);
412 for chunk in data {
413 buf.extend_from_slice(&chunk);
414 }
415 ByteStream::from(buf.freeze())
416}
417
418#[derive(Clone)]
421pub struct S3ObjectStore {
422 client: Client,
423 bucket: String,
424 metrics: Arc<ObjectStoreMetrics>,
426
427 config: Arc<ObjectStoreConfig>,
428}
429
430#[async_trait::async_trait]
431impl ObjectStore for S3ObjectStore {
432 type StreamingUploader = S3StreamingUploader;
433
434 fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
435 prefix::s3::get_object_prefix(obj_id)
438 }
439
440 async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
441 fail_point!("s3_upload_err", |_| Err(ObjectError::internal(
442 "s3 upload error"
443 )));
444 if obj.is_empty() {
445 Err(ObjectError::internal("upload empty object"))
446 } else {
447 self.client
448 .put_object()
449 .bucket(&self.bucket)
450 .body(ByteStream::from(obj))
451 .key(path)
452 .send()
453 .await
454 .map_err(|err| {
455 set_error_should_retry::<PutObjectError>(self.config.clone(), err.into())
456 })?;
457 Ok(())
458 }
459 }
460
461 async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
462 fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal(
463 "s3 streaming upload error"
464 )));
465 Ok(S3StreamingUploader::new(
466 self.client.clone(),
467 self.bucket.clone(),
468 path.to_owned(),
469 self.metrics.clone(),
470 self.config.clone(),
471 ))
472 }
473
474 async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
476 fail_point!("s3_read_err", |_| Err(ObjectError::internal(
477 "s3 read error"
478 )));
479
480 let val = match self.obj_store_request(path, range.clone()).send().await {
481 Ok(resp) => resp
482 .body
483 .collect()
484 .await
485 .map_err(|err| {
486 set_error_should_retry::<GetObjectError>(self.config.clone(), err.into())
487 })?
488 .into_bytes(),
489 Err(sdk_err) => {
490 return Err(set_error_should_retry::<GetObjectError>(
491 self.config.clone(),
492 sdk_err.into(),
493 ));
494 }
495 };
496
497 if let Some(len) = range.len()
498 && len != val.len()
499 {
500 return Err(ObjectError::internal(format!(
501 "mismatched size: expected {}, found {} when reading {} at {:?}",
502 len,
503 val.len(),
504 path,
505 range,
506 )));
507 }
508
509 Ok(val)
510 }
511
512 async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
513 fail_point!("s3_metadata_err", |_| Err(ObjectError::internal(
514 "s3 metadata error"
515 )));
516 let resp = self
517 .client
518 .head_object()
519 .bucket(&self.bucket)
520 .key(path)
521 .send()
522 .await
523 .map_err(|err| {
524 set_error_should_retry::<HeadObjectError>(self.config.clone(), err.into())
525 })?;
526 Ok(ObjectMetadata {
527 key: path.to_owned(),
528 last_modified: resp
529 .last_modified()
530 .expect("last_modified required")
531 .as_secs_f64(),
532 total_size: resp.content_length.unwrap_or_default() as usize,
533 })
534 }
535
536 async fn streaming_read(
540 &self,
541 path: &str,
542 range: Range<usize>,
543 ) -> ObjectResult<ObjectDataStream> {
544 fail_point!("s3_streaming_read_init_err", |_| Err(
545 ObjectError::internal("s3 streaming read init error")
546 ));
547
548 let resp = match self.obj_store_request(path, range.clone()).send().await {
549 Ok(resp) => resp,
550 Err(sdk_err) => {
551 return Err(set_error_should_retry::<GetObjectError>(
552 self.config.clone(),
553 sdk_err.into(),
554 ));
555 }
556 };
557
558 let reader = FuturesStreamCompatByteStream::new(resp.body);
559
560 Ok(Box::pin(
561 reader
562 .into_stream()
563 .map(|item| item.map_err(ObjectError::from)),
564 ))
565 }
566
567 async fn delete(&self, path: &str) -> ObjectResult<()> {
570 fail_point!("s3_delete_err", |_| Err(ObjectError::internal(
571 "s3 delete error"
572 )));
573 self.client
574 .delete_object()
575 .bucket(&self.bucket)
576 .key(path)
577 .send()
578 .await
579 .map_err(|err| {
580 set_error_should_retry::<DeleteObjectError>(self.config.clone(), err.into())
581 })?;
582 Ok(())
583 }
584
585 async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
590 const MAX_LEN: usize = 1000;
592 let mut all_errors = Vec::new();
593
594 for start_idx in (0..paths.len()).step_by(MAX_LEN) {
596 let end_idx = cmp::min(paths.len(), start_idx + MAX_LEN);
597 let slice = &paths[start_idx..end_idx];
598 let mut obj_ids = Vec::with_capacity(slice.len());
600 for path in slice {
601 obj_ids.push(ObjectIdentifier::builder().key(path).build().unwrap());
602 }
603
604 let delete_builder = Delete::builder().set_objects(Some(obj_ids));
606 let delete_output = self
607 .client
608 .delete_objects()
609 .bucket(&self.bucket)
610 .delete(delete_builder.build().unwrap()).send()
611 .await.map_err(|err| {
612 set_error_should_retry::<DeleteObjectsError>(self.config.clone(),err.into())
613 })?;
614
615 if !delete_output.errors().is_empty() {
617 all_errors.append(&mut delete_output.errors().to_owned());
618 }
619 }
620 if !all_errors.is_empty() {
621 return Err(ObjectError::internal(format!(
622 "DeleteObjects request returned exception for some objects: {:?}",
623 all_errors
624 )));
625 }
626
627 Ok(())
628 }
629
630 async fn list(
631 &self,
632 prefix: &str,
633 start_after: Option<String>,
634 limit: Option<usize>,
635 ) -> ObjectResult<ObjectMetadataIter> {
636 Ok(Box::pin(
637 S3ObjectIter::new(
638 self.client.clone(),
639 self.bucket.clone(),
640 prefix.to_owned(),
641 self.config.clone(),
642 start_after,
643 )
644 .take(limit.unwrap_or(usize::MAX)),
645 ))
646 }
647
648 fn store_media_type(&self) -> &'static str {
649 "s3"
650 }
651}
652
653impl S3ObjectStore {
654 pub fn new_http_client(config: &ObjectStoreConfig) -> impl HttpClient + use<> {
655 let nodelay = config.s3.nodelay;
656 let pool_idle_timeout = config.s3.keepalive_ms.map(Duration::from_millis);
657
658 let tls_provider = tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc);
661 SmithyHttpClientBuilder::new().build_with_connector_fn(move |settings, _components| {
662 let mut builder = SmithyConnector::builder();
663
664 if let Some(settings) = settings {
665 builder = builder.connector_settings(settings.clone());
666 }
667
668 if let Some(nodelay) = nodelay {
669 builder = builder.enable_tcp_nodelay(nodelay);
670 }
671
672 if let Some(pool_idle_timeout) = pool_idle_timeout {
673 builder = builder.pool_idle_timeout(pool_idle_timeout);
674 }
675
676 builder.tls_provider(tls_provider.clone()).build()
677 })
678 }
679
680 pub async fn new_with_config(
684 bucket: String,
685 metrics: Arc<ObjectStoreMetrics>,
686 config: Arc<ObjectStoreConfig>,
687 ) -> Self {
688 let sdk_config_loader = aws_config::from_env().http_client(Self::new_http_client(&config));
689
690 let client = match std::env::var("RW_S3_ENDPOINT") {
692 Ok(endpoint) => {
693 let is_force_path_style = match std::env::var("RW_IS_FORCE_PATH_STYLE") {
695 Ok(value) => value == "true",
696 Err(_) => false,
697 };
698
699 let sdk_config = sdk_config_loader.load().await;
700 #[cfg(madsim)]
701 let client = Client::new(&sdk_config);
702 #[cfg(not(madsim))]
703 let client = Client::from_conf(
704 aws_sdk_s3::config::Builder::from(&sdk_config)
705 .endpoint_url(endpoint)
706 .force_path_style(is_force_path_style)
707 .identity_cache(
708 aws_sdk_s3::config::IdentityCache::lazy()
709 .load_timeout(Duration::from_secs(
710 config.s3.identity_resolution_timeout_s,
711 ))
712 .build(),
713 )
714 .stalled_stream_protection(
715 aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
716 )
717 .build(),
718 );
719 client
720 }
721 Err(_) => {
722 let sdk_config = sdk_config_loader.load().await;
724 #[cfg(madsim)]
725 let client = Client::new(&sdk_config);
726 #[cfg(not(madsim))]
727 let client = Client::from_conf(
728 aws_sdk_s3::config::Builder::from(&sdk_config)
729 .identity_cache(
730 aws_sdk_s3::config::IdentityCache::lazy()
731 .load_timeout(Duration::from_secs(
732 config.s3.identity_resolution_timeout_s,
733 ))
734 .build(),
735 )
736 .stalled_stream_protection(
737 aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
738 )
739 .build(),
740 );
741 client
742 }
743 };
744
745 Self {
746 client,
747 bucket,
748 metrics,
749 config,
750 }
751 }
752
753 pub async fn new_minio_engine(
755 server: &str,
756 metrics: Arc<ObjectStoreMetrics>,
757 object_store_config: Arc<ObjectStoreConfig>,
758 ) -> Self {
759 let server = server.strip_prefix("minio://").unwrap();
760 let (access_key_id, rest) = server.split_once(':').unwrap();
761 let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
762
763 let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
764 rest = rest_stripped;
765 "https://"
766 } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
767 rest = rest_stripped;
768 "http://"
769 } else {
770 "http://"
771 };
772 let (address, bucket) = rest.split_once('/').unwrap();
773
774 #[cfg(madsim)]
775 let builder = aws_sdk_s3::config::Builder::new().credentials_provider(
776 Credentials::from_keys(access_key_id, secret_access_key, None),
777 );
778 #[cfg(not(madsim))]
779 let builder = aws_sdk_s3::config::Builder::from(
780 &aws_config::ConfigLoader::default()
781 .credentials_provider(Credentials::from_keys(
783 access_key_id,
784 secret_access_key,
785 None,
786 ))
787 .load()
788 .await,
789 )
790 .force_path_style(true)
791 .identity_cache(
792 aws_sdk_s3::config::IdentityCache::lazy()
793 .load_timeout(Duration::from_secs(
794 object_store_config.s3.identity_resolution_timeout_s,
795 ))
796 .build(),
797 )
798 .http_client(Self::new_http_client(&object_store_config))
799 .behavior_version_latest()
800 .stalled_stream_protection(aws_sdk_s3::config::StalledStreamProtectionConfig::disabled());
801 let config = builder
802 .region(Region::new("custom"))
803 .endpoint_url(format!("{}{}", endpoint_prefix, address))
804 .build();
805 let client = Client::from_conf(config);
806
807 Self {
808 client,
809 bucket: bucket.to_owned(),
810 metrics,
811 config: object_store_config,
812 }
813 }
814
815 fn obj_store_request(
820 &self,
821 path: &str,
822 range: impl ObjectRangeBounds,
823 ) -> GetObjectFluentBuilder {
824 let req = self.client.get_object().bucket(&self.bucket).key(path);
825 if range.is_full() {
826 return req;
827 }
828
829 let start = range.start().map(|v| v.to_string()).unwrap_or_default();
830 let end = range.end().map(|v| (v - 1).to_string()).unwrap_or_default(); req.range(format!("bytes={}-{}", start, end))
833 }
834
835 pub async fn configure_bucket_lifecycle(&self, data_directory: &str) -> bool {
851 let bucket = self.bucket.as_str();
853 let mut configured_rules = vec![];
854 let get_config_result = self
855 .client
856 .get_bucket_lifecycle_configuration()
857 .bucket(bucket)
858 .send()
859 .await;
860 let mut is_expiration_configured = false;
861
862 if let Ok(config) = &get_config_result {
863 for rule in config.rules() {
864 if rule.expiration().is_some() {
865 is_expiration_configured |= rule.status == ExpirationStatus::Enabled && match rule.filter().as_ref() {
874 None => true,
876 Some(filter) => {
878 let is_empty = filter.prefix().is_none()
883 && filter.tag().is_none()
884 && filter.and().is_none()
885 && filter.object_size_greater_than().is_none()
886 && filter.object_size_less_than().is_none();
887 if is_empty {
888 true
889 } else {
890 filter
891 .prefix()
892 .is_some_and(|prefix| data_directory.starts_with(prefix))
893 }
894 }
895 };
896
897 if matches!(rule.status(), ExpirationStatus::Enabled)
898 && rule.abort_incomplete_multipart_upload().is_some()
899 {
900 configured_rules.push(rule);
901 }
902 }
903 }
904 }
905
906 if !configured_rules.is_empty() {
907 tracing::info!(
908 "S3 bucket {} has already configured AbortIncompleteMultipartUpload: {:?}",
909 bucket,
910 configured_rules,
911 );
912 } else {
913 let bucket_lifecycle_rule = LifecycleRule::builder()
914 .id("abort-incomplete-multipart-upload")
915 .status(ExpirationStatus::Enabled)
916 .filter(LifecycleRuleFilter::builder().prefix("").build())
918 .abort_incomplete_multipart_upload(
919 AbortIncompleteMultipartUpload::builder()
920 .days_after_initiation(S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS)
921 .build(),
922 )
923 .build()
924 .unwrap();
925 let bucket_lifecycle_config = BucketLifecycleConfiguration::builder()
926 .rules(bucket_lifecycle_rule)
927 .build()
928 .unwrap();
929 if self
930 .client
931 .put_bucket_lifecycle_configuration()
932 .bucket(bucket)
933 .lifecycle_configuration(bucket_lifecycle_config)
934 .send()
935 .await
936 .is_ok()
937 {
938 tracing::info!(
939 "S3 bucket {:?} is configured to automatically purge abandoned MultipartUploads after {} days",
940 bucket,
941 S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS,
942 );
943 } else {
944 tracing::warn!(
945 "Failed to configure life cycle rule for S3 bucket: {:?}. It is recommended to configure it manually to avoid unnecessary storage cost.",
946 bucket
947 );
948 }
949 }
950 if is_expiration_configured {
951 tracing::info!(
952 "S3 bucket {} has already configured the expiration for the lifecycle.",
953 bucket,
954 );
955 }
956 is_expiration_configured
957 }
958}
959
960struct S3ObjectIter {
961 buffer: VecDeque<ObjectMetadata>,
962 client: Client,
963 bucket: String,
964 prefix: String,
965 next_continuation_token: Option<String>,
966 is_truncated: Option<bool>,
967 #[allow(clippy::type_complexity)]
968 send_future: Option<
969 BoxFuture<
970 'static,
971 Result<(Vec<ObjectMetadata>, Option<String>, Option<bool>), ObjectError>,
972 >,
973 >,
974
975 config: Arc<ObjectStoreConfig>,
976 start_after: Option<String>,
977}
978
979impl S3ObjectIter {
980 fn new(
981 client: Client,
982 bucket: String,
983 prefix: String,
984 config: Arc<ObjectStoreConfig>,
985 start_after: Option<String>,
986 ) -> Self {
987 Self {
988 buffer: VecDeque::default(),
989 client,
990 bucket,
991 prefix,
992 next_continuation_token: None,
993 is_truncated: Some(true),
994 send_future: None,
995 config,
996 start_after,
997 }
998 }
999}
1000
1001impl Stream for S3ObjectIter {
1002 type Item = ObjectResult<ObjectMetadata>;
1003
1004 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1005 if let Some(e) = self.buffer.pop_front() {
1006 return Poll::Ready(Some(Ok(e)));
1007 }
1008 if let Some(f) = self.send_future.as_mut() {
1009 return match ready!(f.poll_unpin(cx)) {
1010 Ok((more, next_continuation_token, is_truncated)) => {
1011 self.next_continuation_token = next_continuation_token;
1012 self.is_truncated = is_truncated;
1013 self.buffer.extend(more);
1014 self.send_future = None;
1015 self.start_after = None;
1017 self.poll_next(cx)
1018 }
1019 Err(e) => {
1020 self.send_future = None;
1021 Poll::Ready(Some(Err(e)))
1022 }
1023 };
1024 }
1025 if !self.is_truncated.unwrap_or_default() {
1026 return Poll::Ready(None);
1027 }
1028 let mut request = self
1029 .client
1030 .list_objects_v2()
1031 .bucket(&self.bucket)
1032 .prefix(&self.prefix);
1033 #[cfg(not(madsim))]
1034 if let Some(start_after) = self.start_after.as_ref() {
1035 request = request.start_after(start_after);
1036 }
1037 if let Some(continuation_token) = self.next_continuation_token.as_ref() {
1038 request = request.continuation_token(continuation_token);
1039 }
1040 let config = self.config.clone();
1041 let f = async move {
1042 match request.send().await {
1043 Ok(r) => {
1044 let more = r
1045 .contents()
1046 .iter()
1047 .map(|obj| ObjectMetadata {
1048 key: obj.key().expect("key required").to_owned(),
1049 last_modified: obj
1050 .last_modified()
1051 .map(|l| l.as_secs_f64())
1052 .unwrap_or(0f64),
1053 total_size: obj.size().unwrap_or_default() as usize,
1054 })
1055 .collect_vec();
1056 let is_truncated = r.is_truncated;
1057 let next_continuation_token = r.next_continuation_token;
1058 Ok((more, next_continuation_token, is_truncated))
1059 }
1060 Err(e) => Err(set_error_should_retry::<ListObjectsV2Error>(
1061 config,
1062 e.into(),
1063 )),
1064 }
1065 };
1066 self.send_future = Some(Box::pin(f));
1067 self.poll_next(cx)
1068 }
1069}
1070
1071fn set_error_should_retry<E>(config: Arc<ObjectStoreConfig>, object_err: ObjectError) -> ObjectError
1072where
1073 E: ProvideErrorMetadata + Into<BoxError> + Sync + Send + std::error::Error + 'static,
1074{
1075 let not_found = object_err.is_object_not_found_error();
1076
1077 if not_found {
1078 return object_err;
1079 }
1080
1081 let mut inner = object_err.into_inner();
1082 match inner.borrow_mut() {
1083 ObjectErrorInner::S3 {
1084 should_retry,
1085 inner,
1086 } => {
1087 let sdk_err = inner
1088 .as_ref()
1089 .downcast_ref::<SdkError<E, aws_smithy_runtime_api::http::Response<SdkBody>>>();
1090
1091 let err_should_retry = match sdk_err {
1092 Some(SdkError::DispatchFailure(e)) => {
1093 if e.is_timeout() {
1094 tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request.");
1095 true
1096 } else {
1097 false
1098 }
1099 }
1100
1101 Some(SdkError::ServiceError(e)) => match e.err().code() {
1102 None => {
1103 if config.s3.developer.retry_unknown_service_error
1104 || config.s3.retry_unknown_service_error
1105 {
1106 tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
1107 true
1108 } else {
1109 false
1110 }
1111 }
1112 Some(code) => {
1113 if config
1114 .s3
1115 .developer
1116 .retryable_service_error_codes
1117 .iter()
1118 .any(|s| s.as_str().eq_ignore_ascii_case(code))
1119 {
1120 tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
1121 true
1122 } else {
1123 false
1124 }
1125 }
1126 },
1127
1128 Some(SdkError::TimeoutError(_err)) => true,
1129
1130 _ => false,
1131 };
1132
1133 *should_retry = err_should_retry;
1134 }
1135
1136 _ => unreachable!(),
1137 }
1138
1139 ObjectError::from(inner)
1140}
1141
1142#[cfg(test)]
1143#[cfg(not(madsim))]
1144mod tests {
1145 use crate::object::prefix::s3::{NUM_BUCKET_PREFIXES, get_object_prefix};
1146
1147 fn get_hash_of_object(obj_id: u64) -> u32 {
1148 let crc_hash = crc32fast::hash(&obj_id.to_be_bytes());
1149 crc_hash % NUM_BUCKET_PREFIXES
1150 }
1151
1152 #[tokio::test]
1153 async fn test_get_object_prefix() {
1154 for obj_id in 0..99999 {
1155 let hash = get_hash_of_object(obj_id);
1156 let prefix = get_object_prefix(obj_id);
1157 assert_eq!(format!("{}/", hash), prefix);
1158 }
1159
1160 let obj_prefix = String::default();
1161 let path = format!("{}/{}{}.data", "hummock_001", obj_prefix, 101);
1162 assert_eq!("hummock_001/101.data", path);
1163 }
1164}