risingwave_object_store/object/
s3.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::BorrowMut;
16use std::cmp;
17use std::collections::VecDeque;
18use std::ops::Range;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22use std::time::Duration;
23
24use await_tree::{InstrumentAwait, SpanExt};
25use aws_sdk_s3::Client;
26use aws_sdk_s3::config::{Credentials, Region};
27use aws_sdk_s3::error::BoxError;
28use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
29use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
30use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
31use aws_sdk_s3::operation::delete_object::DeleteObjectError;
32use aws_sdk_s3::operation::delete_objects::DeleteObjectsError;
33use aws_sdk_s3::operation::get_object::GetObjectError;
34use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
35use aws_sdk_s3::operation::head_object::HeadObjectError;
36use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
37use aws_sdk_s3::operation::put_object::PutObjectError;
38use aws_sdk_s3::operation::upload_part::UploadPartOutput;
39use aws_sdk_s3::primitives::ByteStream;
40use aws_sdk_s3::types::{
41    AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
42    CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
43};
44use aws_smithy_http::futures_stream_adapter::FuturesStreamCompatByteStream;
45use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
46use aws_smithy_runtime_api::client::http::HttpClient;
47use aws_smithy_runtime_api::client::result::SdkError;
48use aws_smithy_types::body::SdkBody;
49use aws_smithy_types::error::metadata::ProvideErrorMetadata;
50use fail::fail_point;
51use futures::future::{BoxFuture, FutureExt, try_join_all};
52use futures::{Stream, StreamExt, TryStreamExt, stream};
53use hyper::Body;
54use itertools::Itertools;
55use risingwave_common::config::ObjectStoreConfig;
56use risingwave_common::monitor::monitor_connector;
57use risingwave_common::range::RangeBoundsExt;
58use thiserror_ext::AsReport;
59use tokio::task::JoinHandle;
60
61use super::object_metrics::ObjectStoreMetrics;
62use super::{
63    Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds, ObjectResult,
64    ObjectStore, StreamingUploader, prefix, retry_request,
65};
66use crate::object::{
67    ObjectDataStream, ObjectMetadataIter, OperationType, try_update_failure_metric,
68};
69
70type PartId = i32;
71
72/// MinIO and S3 share the same minimum part ID and part size.
73const MIN_PART_ID: PartId = 1;
74/// Stop multipart uploads that don't complete within a specified number of days after being
75/// initiated. (Day is the smallest granularity)
76const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
77
78/// S3 multipart upload handle. The multipart upload is not initiated until the first part is
79/// available for upload.
80///
81/// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html>
82pub struct S3StreamingUploader {
83    client: Client,
84    part_size: usize,
85    bucket: String,
86    /// The key of the object.
87    key: String,
88    /// The identifier of multipart upload task for S3.
89    upload_id: Option<String>,
90    /// Next part ID.
91    next_part_id: PartId,
92    /// Join handles for part uploads.
93    join_handles: Vec<JoinHandle<ObjectResult<(PartId, UploadPartOutput)>>>,
94    /// Buffer for data. It will store at least `part_size` bytes of data before wrapping itself
95    /// into a stream and upload to object store as a part.
96    buf: Vec<Bytes>,
97    /// Length of the data that have not been uploaded to S3.
98    not_uploaded_len: usize,
99    /// To record metrics for uploading part.
100    metrics: Arc<ObjectStoreMetrics>,
101
102    config: Arc<ObjectStoreConfig>,
103}
104
105impl S3StreamingUploader {
106    const MEDIA_TYPE: &'static str = "s3";
107
108    pub fn new(
109        client: Client,
110        bucket: String,
111        key: String,
112        metrics: Arc<ObjectStoreMetrics>,
113        config: Arc<ObjectStoreConfig>,
114    ) -> S3StreamingUploader {
115        /// The minimum number of bytes that is buffered before they are uploaded as a part.
116        /// Its value must be greater than the minimum part size of 5MiB.
117        ///
118        /// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
119        const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
120        const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024;
121        let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE);
122
123        Self {
124            client,
125            bucket,
126            part_size,
127            key,
128            upload_id: None,
129            next_part_id: MIN_PART_ID,
130            join_handles: Default::default(),
131            buf: Default::default(),
132            not_uploaded_len: 0,
133            metrics,
134            config,
135        }
136    }
137
138    async fn upload_next_part(&mut self) -> ObjectResult<()> {
139        let operation_type = OperationType::StreamingUpload;
140        let operation_type_str = operation_type.as_str();
141
142        // Lazily create multipart upload.
143        if self.upload_id.is_none() {
144            let builder = || async {
145                self.client
146                    .create_multipart_upload()
147                    .bucket(&self.bucket)
148                    .key(&self.key)
149                    .send()
150                    .await
151                    .map_err(|err| {
152                        set_error_should_retry::<CreateMultipartUploadError>(
153                            self.config.clone(),
154                            err.into(),
155                        )
156                    })
157            };
158
159            let resp = retry_request(
160                builder,
161                &self.config,
162                OperationType::StreamingUploadInit,
163                self.metrics.clone(),
164                Self::MEDIA_TYPE,
165            )
166            .await;
167
168            try_update_failure_metric(
169                &self.metrics,
170                &resp,
171                OperationType::StreamingUploadInit.as_str(),
172            );
173
174            self.upload_id = Some(resp?.upload_id.unwrap());
175        }
176
177        // Get the data to upload for the next part.
178        let data = self.buf.drain(..).collect_vec();
179        let len = self.not_uploaded_len;
180        debug_assert_eq!(
181            data.iter().map(|b| b.len()).sum::<usize>(),
182            self.not_uploaded_len
183        );
184
185        // Update part id.
186        let part_id = self.next_part_id;
187        self.next_part_id += 1;
188
189        // Clone the variables to be passed into the upload join handle.
190        let client_cloned = self.client.clone();
191        let bucket = self.bucket.clone();
192        let key = self.key.clone();
193        let upload_id = self.upload_id.clone().unwrap();
194
195        let metrics = self.metrics.clone();
196        metrics
197            .operation_size
198            .with_label_values(&[operation_type_str])
199            .observe(len as f64);
200        let config = self.config.clone();
201
202        self.join_handles.push(tokio::spawn(async move {
203            let _timer = metrics
204                .operation_latency
205                .with_label_values(&["s3", operation_type_str])
206                .start_timer();
207
208            let builder = || async {
209                client_cloned
210                    .upload_part()
211                    .bucket(bucket.clone())
212                    .key(key.clone())
213                    .upload_id(upload_id.clone())
214                    .part_number(part_id)
215                    .body(get_upload_body(data.clone()))
216                    .content_length(len as i64)
217                    .send()
218                    .await
219                    .map_err(|err| {
220                        set_error_should_retry::<CreateMultipartUploadError>(
221                            config.clone(),
222                            err.into(),
223                        )
224                    })
225            };
226
227            let res = retry_request(
228                builder,
229                &config,
230                operation_type,
231                metrics.clone(),
232                Self::MEDIA_TYPE,
233            )
234            .await;
235            try_update_failure_metric(&metrics, &res, operation_type_str);
236            Ok((part_id, res?))
237        }));
238
239        Ok(())
240    }
241
242    async fn flush_multipart_and_complete(&mut self) -> ObjectResult<()> {
243        let operation_type = OperationType::StreamingUploadFinish;
244
245        if !self.buf.is_empty() {
246            self.upload_next_part().await?;
247        }
248
249        // If any part fails to upload, abort the upload.
250        let join_handles = self.join_handles.drain(..).collect_vec();
251
252        let mut uploaded_parts = Vec::with_capacity(join_handles.len());
253        for result in try_join_all(join_handles)
254            .await
255            .map_err(ObjectError::internal)?
256        {
257            uploaded_parts.push(result?);
258        }
259
260        let completed_parts = Some(
261            uploaded_parts
262                .iter()
263                .map(|(part_id, output)| {
264                    CompletedPart::builder()
265                        .set_e_tag(output.e_tag.clone())
266                        .set_part_number(Some(*part_id))
267                        .build()
268                })
269                .collect_vec(),
270        );
271
272        let builder = || async {
273            self.client
274                .complete_multipart_upload()
275                .bucket(&self.bucket)
276                .key(&self.key)
277                .upload_id(self.upload_id.as_ref().unwrap())
278                .multipart_upload(
279                    CompletedMultipartUpload::builder()
280                        .set_parts(completed_parts.clone())
281                        .build(),
282                )
283                .send()
284                .await
285                .map_err(|err| {
286                    set_error_should_retry::<CompleteMultipartUploadError>(
287                        self.config.clone(),
288                        err.into(),
289                    )
290                })
291        };
292
293        let res = retry_request(
294            builder,
295            &self.config,
296            operation_type,
297            self.metrics.clone(),
298            Self::MEDIA_TYPE,
299        )
300        .await;
301        try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
302        let _res = res?;
303
304        Ok(())
305    }
306
307    async fn abort_multipart_upload(&self) -> ObjectResult<()> {
308        self.client
309            .abort_multipart_upload()
310            .bucket(&self.bucket)
311            .key(&self.key)
312            .upload_id(self.upload_id.as_ref().unwrap())
313            .send()
314            .await
315            .map_err(|err| {
316                set_error_should_retry::<AbortMultipartUploadError>(self.config.clone(), err.into())
317            })?;
318        Ok(())
319    }
320}
321
322impl StreamingUploader for S3StreamingUploader {
323    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
324        fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal(
325            "s3 write bytes error"
326        )));
327        let data_len = data.len();
328        self.not_uploaded_len += data_len;
329        self.buf.push(data);
330
331        if self.not_uploaded_len >= self.part_size {
332            self.upload_next_part()
333                .instrument_await("s3_upload_next_part".verbose())
334                .await?;
335            self.not_uploaded_len = 0;
336        }
337        Ok(())
338    }
339
340    /// If the multipart upload has not been initiated, we can use `PutObject` instead to save the
341    /// `CreateMultipartUpload` and `CompleteMultipartUpload` requests. Otherwise flush the
342    /// remaining data of the buffer to S3 as a new part.
343    async fn finish(mut self) -> ObjectResult<()> {
344        fail_point!("s3_finish_streaming_upload_err", |_| Err(
345            ObjectError::internal("s3 finish streaming upload error")
346        ));
347
348        if self.upload_id.is_none() {
349            debug_assert!(self.join_handles.is_empty());
350            if self.buf.is_empty() {
351                debug_assert_eq!(self.not_uploaded_len, 0);
352                Err(ObjectError::internal("upload empty object"))
353            } else {
354                let operation_type = OperationType::Upload;
355                let builder = || async {
356                    self.client
357                        .put_object()
358                        .bucket(&self.bucket)
359                        .body(get_upload_body(self.buf.clone()))
360                        .content_length(self.not_uploaded_len as i64)
361                        .key(&self.key)
362                        .send()
363                        .instrument_await("s3_put_object".verbose())
364                        .await
365                        .map_err(|err| {
366                            set_error_should_retry::<PutObjectError>(
367                                self.config.clone(),
368                                err.into(),
369                            )
370                        })
371                };
372
373                let res = retry_request(
374                    builder,
375                    &self.config,
376                    operation_type,
377                    self.metrics.clone(),
378                    Self::MEDIA_TYPE,
379                )
380                .await;
381                try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
382                res?;
383                Ok(())
384            }
385        } else {
386            match self
387                .flush_multipart_and_complete()
388                .instrument_await("s3_flush_multipart_and_complete".verbose())
389                .await
390            {
391                Err(e) => {
392                    tracing::warn!(key = self.key, error = %e.as_report(), "Failed to upload object");
393                    self.abort_multipart_upload().await?;
394                    Err(e)
395                }
396                _ => Ok(()),
397            }
398        }
399    }
400
401    fn get_memory_usage(&self) -> u64 {
402        self.part_size as u64
403    }
404}
405
406fn get_upload_body(data: Vec<Bytes>) -> ByteStream {
407    SdkBody::retryable(move || {
408        Body::wrap_stream(stream::iter(data.clone().into_iter().map(ObjectResult::Ok))).into()
409    })
410    .into()
411}
412
413/// Object store with S3 backend
414/// The full path to a file on S3 would be `s3://bucket/<data_directory>/prefix/file`
415#[derive(Clone)]
416pub struct S3ObjectStore {
417    client: Client,
418    bucket: String,
419    /// For S3 specific metrics.
420    metrics: Arc<ObjectStoreMetrics>,
421
422    config: Arc<ObjectStoreConfig>,
423}
424
425#[async_trait::async_trait]
426impl ObjectStore for S3ObjectStore {
427    type StreamingUploader = S3StreamingUploader;
428
429    fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
430        // Delegate to static method to avoid creating an `S3ObjectStore` in unit test.
431        // Using aws s3 sdk as object storage, the object prefix will be divided by default.
432        prefix::s3::get_object_prefix(obj_id)
433    }
434
435    async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
436        fail_point!("s3_upload_err", |_| Err(ObjectError::internal(
437            "s3 upload error"
438        )));
439        if obj.is_empty() {
440            Err(ObjectError::internal("upload empty object"))
441        } else {
442            self.client
443                .put_object()
444                .bucket(&self.bucket)
445                .body(ByteStream::from(obj))
446                .key(path)
447                .send()
448                .await
449                .map_err(|err| {
450                    set_error_should_retry::<PutObjectError>(self.config.clone(), err.into())
451                })?;
452            Ok(())
453        }
454    }
455
456    async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
457        fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal(
458            "s3 streaming upload error"
459        )));
460        Ok(S3StreamingUploader::new(
461            self.client.clone(),
462            self.bucket.clone(),
463            path.to_owned(),
464            self.metrics.clone(),
465            self.config.clone(),
466        ))
467    }
468
469    /// Amazon S3 doesn't support retrieving multiple ranges of data per GET request.
470    async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
471        fail_point!("s3_read_err", |_| Err(ObjectError::internal(
472            "s3 read error"
473        )));
474
475        let val = match self.obj_store_request(path, range.clone()).send().await {
476            Ok(resp) => resp
477                .body
478                .collect()
479                .await
480                .map_err(|err| {
481                    set_error_should_retry::<GetObjectError>(self.config.clone(), err.into())
482                })?
483                .into_bytes(),
484            Err(sdk_err) => {
485                return Err(set_error_should_retry::<GetObjectError>(
486                    self.config.clone(),
487                    sdk_err.into(),
488                ));
489            }
490        };
491
492        if let Some(len) = range.len()
493            && len != val.len()
494        {
495            return Err(ObjectError::internal(format!(
496                "mismatched size: expected {}, found {} when reading {} at {:?}",
497                len,
498                val.len(),
499                path,
500                range,
501            )));
502        }
503
504        Ok(val)
505    }
506
507    async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
508        fail_point!("s3_metadata_err", |_| Err(ObjectError::internal(
509            "s3 metadata error"
510        )));
511        let resp = self
512            .client
513            .head_object()
514            .bucket(&self.bucket)
515            .key(path)
516            .send()
517            .await
518            .map_err(|err| {
519                set_error_should_retry::<HeadObjectError>(self.config.clone(), err.into())
520            })?;
521        Ok(ObjectMetadata {
522            key: path.to_owned(),
523            last_modified: resp
524                .last_modified()
525                .expect("last_modified required")
526                .as_secs_f64(),
527            total_size: resp.content_length.unwrap_or_default() as usize,
528        })
529    }
530
531    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
532    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
533    /// of data into memory that is read from the stream.
534    async fn streaming_read(
535        &self,
536        path: &str,
537        range: Range<usize>,
538    ) -> ObjectResult<ObjectDataStream> {
539        fail_point!("s3_streaming_read_init_err", |_| Err(
540            ObjectError::internal("s3 streaming read init error")
541        ));
542
543        let resp = match self.obj_store_request(path, range.clone()).send().await {
544            Ok(resp) => resp,
545            Err(sdk_err) => {
546                return Err(set_error_should_retry::<GetObjectError>(
547                    self.config.clone(),
548                    sdk_err.into(),
549                ));
550            }
551        };
552
553        let reader = FuturesStreamCompatByteStream::new(resp.body);
554
555        Ok(Box::pin(
556            reader
557                .into_stream()
558                .map(|item| item.map_err(ObjectError::from)),
559        ))
560    }
561
562    /// Permanently deletes the whole object.
563    /// According to Amazon S3, this will simply return Ok if the object does not exist.
564    async fn delete(&self, path: &str) -> ObjectResult<()> {
565        fail_point!("s3_delete_err", |_| Err(ObjectError::internal(
566            "s3 delete error"
567        )));
568        self.client
569            .delete_object()
570            .bucket(&self.bucket)
571            .key(path)
572            .send()
573            .await
574            .map_err(|err| {
575                set_error_should_retry::<DeleteObjectError>(self.config.clone(), err.into())
576            })?;
577        Ok(())
578    }
579
580    /// Deletes the objects with the given paths permanently from the storage. If an object
581    /// specified in the request is not found, it will be considered as successfully deleted.
582    ///
583    /// Uses AWS' DeleteObjects API. See [AWS Docs](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) for more details.
584    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
585        // AWS restricts the number of objects per request to 1000.
586        const MAX_LEN: usize = 1000;
587        let mut all_errors = Vec::new();
588
589        // If needed, split given set into subsets of size with no more than `MAX_LEN` objects.
590        for start_idx /* inclusive */ in (0..paths.len()).step_by(MAX_LEN) {
591            let end_idx /* exclusive */ = cmp::min(paths.len(), start_idx + MAX_LEN);
592            let slice = &paths[start_idx..end_idx];
593            // Create identifiers from paths.
594            let mut obj_ids = Vec::with_capacity(slice.len());
595            for path in slice {
596                obj_ids.push(ObjectIdentifier::builder().key(path).build().unwrap());
597            }
598
599            // Build and submit request to delete objects.
600            let delete_builder = Delete::builder().set_objects(Some(obj_ids));
601            let delete_output = self
602                .client
603                .delete_objects()
604                .bucket(&self.bucket)
605                .delete(delete_builder.build().unwrap()).send()
606                .await.map_err(|err| {
607                    set_error_should_retry::<DeleteObjectsError>(self.config.clone(),err.into())
608                })?;
609
610            // Check if there were errors.
611            if !delete_output.errors().is_empty() {
612                all_errors.append(&mut delete_output.errors().to_owned());
613            }
614        }
615        if !all_errors.is_empty() {
616            return Err(ObjectError::internal(format!(
617                "DeleteObjects request returned exception for some objects: {:?}",
618                all_errors
619            )));
620        }
621
622        Ok(())
623    }
624
625    async fn list(
626        &self,
627        prefix: &str,
628        start_after: Option<String>,
629        limit: Option<usize>,
630    ) -> ObjectResult<ObjectMetadataIter> {
631        Ok(Box::pin(
632            S3ObjectIter::new(
633                self.client.clone(),
634                self.bucket.clone(),
635                prefix.to_owned(),
636                self.config.clone(),
637                start_after,
638            )
639            .take(limit.unwrap_or(usize::MAX)),
640        ))
641    }
642
643    fn store_media_type(&self) -> &'static str {
644        "s3"
645    }
646}
647
648impl S3ObjectStore {
649    pub fn new_http_client(config: &ObjectStoreConfig) -> impl HttpClient + use<> {
650        let mut http = hyper::client::HttpConnector::new();
651
652        // connection config
653        if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
654            http.set_keepalive(Some(Duration::from_millis(*keepalive_ms)));
655        }
656
657        if let Some(nodelay) = config.s3.nodelay.as_ref() {
658            http.set_nodelay(*nodelay);
659        }
660
661        if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() {
662            http.set_recv_buffer_size(Some(*recv_buffer_size));
663        }
664
665        if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() {
666            http.set_send_buffer_size(Some(*send_buffer_size));
667        }
668
669        http.enforce_http(false);
670
671        let conn = hyper_rustls::HttpsConnectorBuilder::new()
672            .with_webpki_roots()
673            .https_or_http()
674            .enable_all_versions()
675            .wrap_connector(http);
676
677        let conn = monitor_connector(conn, "S3");
678
679        HyperClientBuilder::new().build(conn)
680    }
681
682    /// Creates an S3 object store from environment variable.
683    ///
684    /// See [AWS Docs](https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credentials.html) on how to provide credentials and region from env variable. If you are running compute-node on EC2, no configuration is required.
685    pub async fn new_with_config(
686        bucket: String,
687        metrics: Arc<ObjectStoreMetrics>,
688        config: Arc<ObjectStoreConfig>,
689    ) -> Self {
690        let sdk_config_loader = aws_config::from_env().http_client(Self::new_http_client(&config));
691
692        // Retry 3 times if we get server-side errors or throttling errors
693        let client = match std::env::var("RW_S3_ENDPOINT") {
694            Ok(endpoint) => {
695                // s3 compatible storage
696                let is_force_path_style = match std::env::var("RW_IS_FORCE_PATH_STYLE") {
697                    Ok(value) => value == "true",
698                    Err(_) => false,
699                };
700
701                let sdk_config = sdk_config_loader.load().await;
702                #[cfg(madsim)]
703                let client = Client::new(&sdk_config);
704                #[cfg(not(madsim))]
705                let client = Client::from_conf(
706                    aws_sdk_s3::config::Builder::from(&sdk_config)
707                        .endpoint_url(endpoint)
708                        .force_path_style(is_force_path_style)
709                        .identity_cache(
710                            aws_sdk_s3::config::IdentityCache::lazy()
711                                .load_timeout(Duration::from_secs(
712                                    config.s3.identity_resolution_timeout_s,
713                                ))
714                                .build(),
715                        )
716                        .stalled_stream_protection(
717                            aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
718                        )
719                        .build(),
720                );
721                client
722            }
723            Err(_) => {
724                // s3
725                let sdk_config = sdk_config_loader.load().await;
726                #[cfg(madsim)]
727                let client = Client::new(&sdk_config);
728                #[cfg(not(madsim))]
729                let client = Client::from_conf(
730                    aws_sdk_s3::config::Builder::from(&sdk_config)
731                        .identity_cache(
732                            aws_sdk_s3::config::IdentityCache::lazy()
733                                .load_timeout(Duration::from_secs(
734                                    config.s3.identity_resolution_timeout_s,
735                                ))
736                                .build(),
737                        )
738                        .stalled_stream_protection(
739                            aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
740                        )
741                        .build(),
742                );
743                client
744            }
745        };
746
747        Self {
748            client,
749            bucket,
750            metrics,
751            config,
752        }
753    }
754
755    /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
756    pub async fn new_minio_engine(
757        server: &str,
758        metrics: Arc<ObjectStoreMetrics>,
759        object_store_config: Arc<ObjectStoreConfig>,
760    ) -> Self {
761        let server = server.strip_prefix("minio://").unwrap();
762        let (access_key_id, rest) = server.split_once(':').unwrap();
763        let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
764
765        let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
766            rest = rest_stripped;
767            "https://"
768        } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
769            rest = rest_stripped;
770            "http://"
771        } else {
772            "http://"
773        };
774        let (address, bucket) = rest.split_once('/').unwrap();
775
776        #[cfg(madsim)]
777        let builder = aws_sdk_s3::config::Builder::new().credentials_provider(
778            Credentials::from_keys(access_key_id, secret_access_key, None),
779        );
780        #[cfg(not(madsim))]
781        let builder = aws_sdk_s3::config::Builder::from(
782            &aws_config::ConfigLoader::default()
783                // FIXME: https://github.com/awslabs/aws-sdk-rust/issues/973
784                .credentials_provider(Credentials::from_keys(
785                    access_key_id,
786                    secret_access_key,
787                    None,
788                ))
789                .load()
790                .await,
791        )
792        .force_path_style(true)
793        .identity_cache(
794            aws_sdk_s3::config::IdentityCache::lazy()
795                .load_timeout(Duration::from_secs(
796                    object_store_config.s3.identity_resolution_timeout_s,
797                ))
798                .build(),
799        )
800        .http_client(Self::new_http_client(&object_store_config))
801        .behavior_version_latest()
802        .stalled_stream_protection(aws_sdk_s3::config::StalledStreamProtectionConfig::disabled());
803        let config = builder
804            .region(Region::new("custom"))
805            .endpoint_url(format!("{}{}", endpoint_prefix, address))
806            .build();
807        let client = Client::from_conf(config);
808
809        Self {
810            client,
811            bucket: bucket.to_owned(),
812            metrics,
813            config: object_store_config,
814        }
815    }
816
817    /// Generates an HTTP GET request to download the object specified in `path`. If given,
818    /// `start_pos` and `end_pos` specify the first and last byte to download, respectively. Both
819    /// are inclusive and 0-based. For example, set `start_pos = 0` and `end_pos = 7` to download
820    /// the first 8 bytes. If neither is given, the request will download the whole object.
821    fn obj_store_request(
822        &self,
823        path: &str,
824        range: impl ObjectRangeBounds,
825    ) -> GetObjectFluentBuilder {
826        let req = self.client.get_object().bucket(&self.bucket).key(path);
827        if range.is_full() {
828            return req;
829        }
830
831        let start = range.start().map(|v| v.to_string()).unwrap_or_default();
832        let end = range.end().map(|v| (v - 1).to_string()).unwrap_or_default(); // included
833
834        req.range(format!("bytes={}-{}", start, end))
835    }
836
837    // When multipart upload is aborted, if any part uploads are in progress, those part uploads
838    // might or might not succeed. As a result, these parts will remain in the bucket and be
839    // charged for part storage. Therefore, we need to configure the bucket to purge stale
840    // parts.
841    //
842    /// Note: This configuration only works for S3. MinIO automatically enables this feature, and it
843    /// is not configurable with S3 sdk. To verify that this feature is enabled, use `mc admin
844    /// config get <alias> api`.
845    ///
846    /// Reference:
847    /// - S3
848    ///   - <https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html>
849    ///   - <https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html>
850    /// - MinIO
851    ///   - <https://github.com/minio/minio/issues/15681#issuecomment-1245126561>
852    pub async fn configure_bucket_lifecycle(&self, data_directory: &str) -> bool {
853        // Check if lifecycle is already configured to avoid overriding existing configuration.
854        let bucket = self.bucket.as_str();
855        let mut configured_rules = vec![];
856        let get_config_result = self
857            .client
858            .get_bucket_lifecycle_configuration()
859            .bucket(bucket)
860            .send()
861            .await;
862        let mut is_expiration_configured = false;
863
864        if let Ok(config) = &get_config_result {
865            for rule in config.rules() {
866                if rule.expiration().is_some() {
867                    // When both of the conditions are met, it is considered that there is a risk of data deletion.
868                    //
869                    // 1. expiration status rule is enabled
870                    // 2. (a) prefix filter is not set
871                    // or (b) prefix filter is set to the data directory of RisingWave.
872                    //
873                    // P.S. 1 && (2a || 2b)
874                    is_expiration_configured |= rule.status == ExpirationStatus::Enabled // 1
875                    && match rule.filter().as_ref() {
876                        // 2a
877                        None => true,
878                        // 2b
879                        Some(LifecycleRuleFilter::Prefix(prefix))
880                            if data_directory.starts_with(prefix) =>
881                        {
882                            true
883                        }
884                        _ => false,
885                    };
886
887                    if matches!(rule.status(), ExpirationStatus::Enabled)
888                        && rule.abort_incomplete_multipart_upload().is_some()
889                    {
890                        configured_rules.push(rule);
891                    }
892                }
893            }
894        }
895
896        if !configured_rules.is_empty() {
897            tracing::info!(
898                "S3 bucket {} has already configured AbortIncompleteMultipartUpload: {:?}",
899                bucket,
900                configured_rules,
901            );
902        } else {
903            let bucket_lifecycle_rule = LifecycleRule::builder()
904                .id("abort-incomplete-multipart-upload")
905                .status(ExpirationStatus::Enabled)
906                .filter(LifecycleRuleFilter::Prefix(String::new()))
907                .abort_incomplete_multipart_upload(
908                    AbortIncompleteMultipartUpload::builder()
909                        .days_after_initiation(S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS)
910                        .build(),
911                )
912                .build()
913                .unwrap();
914            let bucket_lifecycle_config = BucketLifecycleConfiguration::builder()
915                .rules(bucket_lifecycle_rule)
916                .build()
917                .unwrap();
918            if self
919                .client
920                .put_bucket_lifecycle_configuration()
921                .bucket(bucket)
922                .lifecycle_configuration(bucket_lifecycle_config)
923                .send()
924                .await
925                .is_ok()
926            {
927                tracing::info!(
928                    "S3 bucket {:?} is configured to automatically purge abandoned MultipartUploads after {} days",
929                    bucket,
930                    S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS,
931                );
932            } else {
933                tracing::warn!(
934                    "Failed to configure life cycle rule for S3 bucket: {:?}. It is recommended to configure it manually to avoid unnecessary storage cost.",
935                    bucket
936                );
937            }
938        }
939        if is_expiration_configured {
940            tracing::info!(
941                "S3 bucket {} has already configured the expiration for the lifecycle.",
942                bucket,
943            );
944        }
945        is_expiration_configured
946    }
947}
948
949struct S3ObjectIter {
950    buffer: VecDeque<ObjectMetadata>,
951    client: Client,
952    bucket: String,
953    prefix: String,
954    next_continuation_token: Option<String>,
955    is_truncated: Option<bool>,
956    #[allow(clippy::type_complexity)]
957    send_future: Option<
958        BoxFuture<
959            'static,
960            Result<(Vec<ObjectMetadata>, Option<String>, Option<bool>), ObjectError>,
961        >,
962    >,
963
964    config: Arc<ObjectStoreConfig>,
965    start_after: Option<String>,
966}
967
968impl S3ObjectIter {
969    fn new(
970        client: Client,
971        bucket: String,
972        prefix: String,
973        config: Arc<ObjectStoreConfig>,
974        start_after: Option<String>,
975    ) -> Self {
976        Self {
977            buffer: VecDeque::default(),
978            client,
979            bucket,
980            prefix,
981            next_continuation_token: None,
982            is_truncated: Some(true),
983            send_future: None,
984            config,
985            start_after,
986        }
987    }
988}
989
990impl Stream for S3ObjectIter {
991    type Item = ObjectResult<ObjectMetadata>;
992
993    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
994        if let Some(e) = self.buffer.pop_front() {
995            return Poll::Ready(Some(Ok(e)));
996        }
997        if let Some(f) = self.send_future.as_mut() {
998            return match ready!(f.poll_unpin(cx)) {
999                Ok((more, next_continuation_token, is_truncated)) => {
1000                    self.next_continuation_token = next_continuation_token;
1001                    self.is_truncated = is_truncated;
1002                    self.buffer.extend(more);
1003                    self.send_future = None;
1004                    // only the first request may set start_after
1005                    self.start_after = None;
1006                    self.poll_next(cx)
1007                }
1008                Err(e) => {
1009                    self.send_future = None;
1010                    Poll::Ready(Some(Err(e)))
1011                }
1012            };
1013        }
1014        if !self.is_truncated.unwrap_or_default() {
1015            return Poll::Ready(None);
1016        }
1017        let mut request = self
1018            .client
1019            .list_objects_v2()
1020            .bucket(&self.bucket)
1021            .prefix(&self.prefix);
1022        #[cfg(not(madsim))]
1023        if let Some(start_after) = self.start_after.as_ref() {
1024            request = request.start_after(start_after);
1025        }
1026        if let Some(continuation_token) = self.next_continuation_token.as_ref() {
1027            request = request.continuation_token(continuation_token);
1028        }
1029        let config = self.config.clone();
1030        let f = async move {
1031            match request.send().await {
1032                Ok(r) => {
1033                    let more = r
1034                        .contents()
1035                        .iter()
1036                        .map(|obj| ObjectMetadata {
1037                            key: obj.key().expect("key required").to_owned(),
1038                            last_modified: obj
1039                                .last_modified()
1040                                .map(|l| l.as_secs_f64())
1041                                .unwrap_or(0f64),
1042                            total_size: obj.size().unwrap_or_default() as usize,
1043                        })
1044                        .collect_vec();
1045                    let is_truncated = r.is_truncated;
1046                    let next_continuation_token = r.next_continuation_token;
1047                    Ok((more, next_continuation_token, is_truncated))
1048                }
1049                Err(e) => Err(set_error_should_retry::<ListObjectsV2Error>(
1050                    config,
1051                    e.into(),
1052                )),
1053            }
1054        };
1055        self.send_future = Some(Box::pin(f));
1056        self.poll_next(cx)
1057    }
1058}
1059
1060fn set_error_should_retry<E>(config: Arc<ObjectStoreConfig>, object_err: ObjectError) -> ObjectError
1061where
1062    E: ProvideErrorMetadata + Into<BoxError> + Sync + Send + std::error::Error + 'static,
1063{
1064    let not_found = object_err.is_object_not_found_error();
1065
1066    if not_found {
1067        return object_err;
1068    }
1069
1070    let mut inner = object_err.into_inner();
1071    match inner.borrow_mut() {
1072        ObjectErrorInner::S3 {
1073            should_retry,
1074            inner,
1075        } => {
1076            let sdk_err = inner
1077                .as_ref()
1078                .downcast_ref::<SdkError<E, aws_smithy_runtime_api::http::Response<SdkBody>>>();
1079
1080            let err_should_retry = match sdk_err {
1081                Some(SdkError::DispatchFailure(e)) => {
1082                    if e.is_timeout() {
1083                        tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request.");
1084                        true
1085                    } else {
1086                        false
1087                    }
1088                }
1089
1090                Some(SdkError::ServiceError(e)) => match e.err().code() {
1091                    None => {
1092                        if config.s3.developer.retry_unknown_service_error
1093                            || config.s3.retry_unknown_service_error
1094                        {
1095                            tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
1096                            true
1097                        } else {
1098                            false
1099                        }
1100                    }
1101                    Some(code) => {
1102                        if config
1103                            .s3
1104                            .developer
1105                            .retryable_service_error_codes
1106                            .iter()
1107                            .any(|s| s.as_str().eq_ignore_ascii_case(code))
1108                        {
1109                            tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
1110                            true
1111                        } else {
1112                            false
1113                        }
1114                    }
1115                },
1116
1117                Some(SdkError::TimeoutError(_err)) => true,
1118
1119                _ => false,
1120            };
1121
1122            *should_retry = err_should_retry;
1123        }
1124
1125        _ => unreachable!(),
1126    }
1127
1128    ObjectError::from(inner)
1129}
1130
1131#[cfg(test)]
1132#[cfg(not(madsim))]
1133mod tests {
1134    use crate::object::prefix::s3::{NUM_BUCKET_PREFIXES, get_object_prefix};
1135
1136    fn get_hash_of_object(obj_id: u64) -> u32 {
1137        let crc_hash = crc32fast::hash(&obj_id.to_be_bytes());
1138        crc_hash % NUM_BUCKET_PREFIXES
1139    }
1140
1141    #[tokio::test]
1142    async fn test_get_object_prefix() {
1143        for obj_id in 0..99999 {
1144            let hash = get_hash_of_object(obj_id);
1145            let prefix = get_object_prefix(obj_id);
1146            assert_eq!(format!("{}/", hash), prefix);
1147        }
1148
1149        let obj_prefix = String::default();
1150        let path = format!("{}/{}{}.data", "hummock_001", obj_prefix, 101);
1151        assert_eq!("hummock_001/101.data", path);
1152    }
1153}