risingwave_object_store/object/
s3.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::BorrowMut;
16use std::cmp;
17use std::collections::VecDeque;
18use std::ops::Range;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22use std::time::Duration;
23
24use await_tree::{InstrumentAwait, SpanExt};
25use aws_sdk_s3::Client;
26use aws_sdk_s3::config::{Credentials, Region};
27use aws_sdk_s3::error::BoxError;
28use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
29use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
30use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
31use aws_sdk_s3::operation::delete_object::DeleteObjectError;
32use aws_sdk_s3::operation::delete_objects::DeleteObjectsError;
33use aws_sdk_s3::operation::get_object::GetObjectError;
34use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
35use aws_sdk_s3::operation::head_object::HeadObjectError;
36use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
37use aws_sdk_s3::operation::put_object::PutObjectError;
38use aws_sdk_s3::operation::upload_part::UploadPartOutput;
39use aws_sdk_s3::primitives::ByteStream;
40use aws_sdk_s3::types::{
41    AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
42    CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
43};
44use aws_smithy_http::futures_stream_adapter::FuturesStreamCompatByteStream;
45use aws_smithy_http_client::{
46    Builder as SmithyHttpClientBuilder, Connector as SmithyConnector, tls,
47};
48use aws_smithy_runtime_api::client::http::HttpClient;
49use aws_smithy_runtime_api::client::result::SdkError;
50use aws_smithy_types::body::SdkBody;
51use aws_smithy_types::error::metadata::ProvideErrorMetadata;
52use bytes::BytesMut;
53use fail::fail_point;
54use futures::future::{BoxFuture, FutureExt, try_join_all};
55use futures::{Stream, StreamExt, TryStreamExt};
56use itertools::Itertools;
57use risingwave_common::config::ObjectStoreConfig;
58use risingwave_common::range::RangeBoundsExt;
59use thiserror_ext::AsReport;
60use tokio::task::JoinHandle;
61
62use super::object_metrics::ObjectStoreMetrics;
63use super::{
64    Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds, ObjectResult,
65    ObjectStore, StreamingUploader, prefix, retry_request,
66};
67use crate::object::{
68    ObjectDataStream, ObjectMetadataIter, OperationType, try_update_failure_metric,
69};
70
71type PartId = i32;
72
73/// MinIO and S3 share the same minimum part ID and part size.
74const MIN_PART_ID: PartId = 1;
75/// Stop multipart uploads that don't complete within a specified number of days after being
76/// initiated. (Day is the smallest granularity)
77const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
78
79/// S3 multipart upload handle. The multipart upload is not initiated until the first part is
80/// available for upload.
81///
82/// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html>
83pub struct S3StreamingUploader {
84    client: Client,
85    part_size: usize,
86    bucket: String,
87    /// The key of the object.
88    key: String,
89    /// The identifier of multipart upload task for S3.
90    upload_id: Option<String>,
91    /// Next part ID.
92    next_part_id: PartId,
93    /// Join handles for part uploads.
94    join_handles: Vec<JoinHandle<ObjectResult<(PartId, UploadPartOutput)>>>,
95    /// Buffer for data. It will store at least `part_size` bytes of data before wrapping itself
96    /// into a stream and upload to object store as a part.
97    buf: Vec<Bytes>,
98    /// Length of the data that have not been uploaded to S3.
99    not_uploaded_len: usize,
100    /// To record metrics for uploading part.
101    metrics: Arc<ObjectStoreMetrics>,
102
103    config: Arc<ObjectStoreConfig>,
104}
105
106impl S3StreamingUploader {
107    const MEDIA_TYPE: &'static str = "s3";
108
109    pub fn new(
110        client: Client,
111        bucket: String,
112        key: String,
113        metrics: Arc<ObjectStoreMetrics>,
114        config: Arc<ObjectStoreConfig>,
115    ) -> S3StreamingUploader {
116        /// The minimum number of bytes that is buffered before they are uploaded as a part.
117        /// Its value must be greater than the minimum part size of 5MiB.
118        ///
119        /// Reference: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
120        const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
121        const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024;
122        let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE);
123
124        Self {
125            client,
126            bucket,
127            part_size,
128            key,
129            upload_id: None,
130            next_part_id: MIN_PART_ID,
131            join_handles: Default::default(),
132            buf: Default::default(),
133            not_uploaded_len: 0,
134            metrics,
135            config,
136        }
137    }
138
139    async fn upload_next_part(&mut self) -> ObjectResult<()> {
140        let operation_type = OperationType::StreamingUpload;
141        let operation_type_str = operation_type.as_str();
142
143        // Lazily create multipart upload.
144        if self.upload_id.is_none() {
145            let builder = || async {
146                self.client
147                    .create_multipart_upload()
148                    .bucket(&self.bucket)
149                    .key(&self.key)
150                    .send()
151                    .await
152                    .map_err(|err| {
153                        set_error_should_retry::<CreateMultipartUploadError>(
154                            self.config.clone(),
155                            err.into(),
156                        )
157                    })
158            };
159
160            let resp = retry_request(
161                builder,
162                &self.config,
163                OperationType::StreamingUploadInit,
164                self.metrics.clone(),
165                Self::MEDIA_TYPE,
166            )
167            .await;
168
169            try_update_failure_metric(
170                &self.metrics,
171                &resp,
172                OperationType::StreamingUploadInit.as_str(),
173            );
174
175            self.upload_id = Some(resp?.upload_id.unwrap());
176        }
177
178        // Get the data to upload for the next part.
179        let data = self.buf.drain(..).collect_vec();
180        let len = self.not_uploaded_len;
181        debug_assert_eq!(
182            data.iter().map(|b| b.len()).sum::<usize>(),
183            self.not_uploaded_len
184        );
185
186        // Update part id.
187        let part_id = self.next_part_id;
188        self.next_part_id += 1;
189
190        // Clone the variables to be passed into the upload join handle.
191        let client_cloned = self.client.clone();
192        let bucket = self.bucket.clone();
193        let key = self.key.clone();
194        let upload_id = self.upload_id.clone().unwrap();
195
196        let metrics = self.metrics.clone();
197        metrics
198            .operation_size
199            .with_label_values(&[operation_type_str])
200            .observe(len as f64);
201        let config = self.config.clone();
202
203        self.join_handles.push(tokio::spawn(async move {
204            let _timer = metrics
205                .operation_latency
206                .with_label_values(&["s3", operation_type_str])
207                .start_timer();
208
209            let builder = || async {
210                client_cloned
211                    .upload_part()
212                    .bucket(bucket.clone())
213                    .key(key.clone())
214                    .upload_id(upload_id.clone())
215                    .part_number(part_id)
216                    .body(get_upload_body(data.clone()))
217                    .content_length(len as i64)
218                    .send()
219                    .await
220                    .map_err(|err| {
221                        set_error_should_retry::<CreateMultipartUploadError>(
222                            config.clone(),
223                            err.into(),
224                        )
225                    })
226            };
227
228            let res = retry_request(
229                builder,
230                &config,
231                operation_type,
232                metrics.clone(),
233                Self::MEDIA_TYPE,
234            )
235            .await;
236            try_update_failure_metric(&metrics, &res, operation_type_str);
237            Ok((part_id, res?))
238        }));
239
240        Ok(())
241    }
242
243    async fn flush_multipart_and_complete(&mut self) -> ObjectResult<()> {
244        let operation_type = OperationType::StreamingUploadFinish;
245
246        if !self.buf.is_empty() {
247            self.upload_next_part().await?;
248        }
249
250        // If any part fails to upload, abort the upload.
251        let join_handles = self.join_handles.drain(..).collect_vec();
252
253        let mut uploaded_parts = Vec::with_capacity(join_handles.len());
254        for result in try_join_all(join_handles)
255            .await
256            .map_err(ObjectError::internal)?
257        {
258            uploaded_parts.push(result?);
259        }
260
261        let completed_parts = Some(
262            uploaded_parts
263                .iter()
264                .map(|(part_id, output)| {
265                    CompletedPart::builder()
266                        .set_e_tag(output.e_tag.clone())
267                        .set_part_number(Some(*part_id))
268                        .build()
269                })
270                .collect_vec(),
271        );
272
273        let builder = || async {
274            self.client
275                .complete_multipart_upload()
276                .bucket(&self.bucket)
277                .key(&self.key)
278                .upload_id(self.upload_id.as_ref().unwrap())
279                .multipart_upload(
280                    CompletedMultipartUpload::builder()
281                        .set_parts(completed_parts.clone())
282                        .build(),
283                )
284                .send()
285                .await
286                .map_err(|err| {
287                    set_error_should_retry::<CompleteMultipartUploadError>(
288                        self.config.clone(),
289                        err.into(),
290                    )
291                })
292        };
293
294        let res = retry_request(
295            builder,
296            &self.config,
297            operation_type,
298            self.metrics.clone(),
299            Self::MEDIA_TYPE,
300        )
301        .await;
302        try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
303        let _res = res?;
304
305        Ok(())
306    }
307
308    async fn abort_multipart_upload(&self) -> ObjectResult<()> {
309        self.client
310            .abort_multipart_upload()
311            .bucket(&self.bucket)
312            .key(&self.key)
313            .upload_id(self.upload_id.as_ref().unwrap())
314            .send()
315            .await
316            .map_err(|err| {
317                set_error_should_retry::<AbortMultipartUploadError>(self.config.clone(), err.into())
318            })?;
319        Ok(())
320    }
321}
322
323impl StreamingUploader for S3StreamingUploader {
324    async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
325        fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal(
326            "s3 write bytes error"
327        )));
328        let data_len = data.len();
329        self.not_uploaded_len += data_len;
330        self.buf.push(data);
331
332        if self.not_uploaded_len >= self.part_size {
333            self.upload_next_part()
334                .instrument_await("s3_upload_next_part".verbose())
335                .await?;
336            self.not_uploaded_len = 0;
337        }
338        Ok(())
339    }
340
341    /// If the multipart upload has not been initiated, we can use `PutObject` instead to save the
342    /// `CreateMultipartUpload` and `CompleteMultipartUpload` requests. Otherwise flush the
343    /// remaining data of the buffer to S3 as a new part.
344    async fn finish(mut self) -> ObjectResult<()> {
345        fail_point!("s3_finish_streaming_upload_err", |_| Err(
346            ObjectError::internal("s3 finish streaming upload error")
347        ));
348
349        if self.upload_id.is_none() {
350            debug_assert!(self.join_handles.is_empty());
351            if self.buf.is_empty() {
352                debug_assert_eq!(self.not_uploaded_len, 0);
353                Err(ObjectError::internal("upload empty object"))
354            } else {
355                let operation_type = OperationType::Upload;
356                let builder = || async {
357                    self.client
358                        .put_object()
359                        .bucket(&self.bucket)
360                        .body(get_upload_body(self.buf.clone()))
361                        .content_length(self.not_uploaded_len as i64)
362                        .key(&self.key)
363                        .send()
364                        .instrument_await("s3_put_object".verbose())
365                        .await
366                        .map_err(|err| {
367                            set_error_should_retry::<PutObjectError>(
368                                self.config.clone(),
369                                err.into(),
370                            )
371                        })
372                };
373
374                let res = retry_request(
375                    builder,
376                    &self.config,
377                    operation_type,
378                    self.metrics.clone(),
379                    Self::MEDIA_TYPE,
380                )
381                .await;
382                try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
383                res?;
384                Ok(())
385            }
386        } else {
387            match self
388                .flush_multipart_and_complete()
389                .instrument_await("s3_flush_multipart_and_complete".verbose())
390                .await
391            {
392                Err(e) => {
393                    tracing::warn!(key = self.key, error = %e.as_report(), "Failed to upload object");
394                    self.abort_multipart_upload().await?;
395                    Err(e)
396                }
397                _ => Ok(()),
398            }
399        }
400    }
401
402    fn get_memory_usage(&self) -> u64 {
403        self.part_size as u64
404    }
405}
406
407fn get_upload_body(data: Vec<Bytes>) -> ByteStream {
408    // `ByteStream` is retryable when created from in-memory data.
409    // This code path is used for non-multipart uploads, so a copy is acceptable.
410    let total_len: usize = data.iter().map(|b| b.len()).sum();
411    let mut buf = BytesMut::with_capacity(total_len);
412    for chunk in data {
413        buf.extend_from_slice(&chunk);
414    }
415    ByteStream::from(buf.freeze())
416}
417
418/// Object store with S3 backend
419/// The full path to a file on S3 would be `s3://bucket/<data_directory>/prefix/file`
420#[derive(Clone)]
421pub struct S3ObjectStore {
422    client: Client,
423    bucket: String,
424    /// For S3 specific metrics.
425    metrics: Arc<ObjectStoreMetrics>,
426
427    config: Arc<ObjectStoreConfig>,
428}
429
430#[async_trait::async_trait]
431impl ObjectStore for S3ObjectStore {
432    type StreamingUploader = S3StreamingUploader;
433
434    fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
435        // Delegate to static method to avoid creating an `S3ObjectStore` in unit test.
436        // Using aws s3 sdk as object storage, the object prefix will be divided by default.
437        prefix::s3::get_object_prefix(obj_id)
438    }
439
440    async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
441        fail_point!("s3_upload_err", |_| Err(ObjectError::internal(
442            "s3 upload error"
443        )));
444        if obj.is_empty() {
445            Err(ObjectError::internal("upload empty object"))
446        } else {
447            self.client
448                .put_object()
449                .bucket(&self.bucket)
450                .body(ByteStream::from(obj))
451                .key(path)
452                .send()
453                .await
454                .map_err(|err| {
455                    set_error_should_retry::<PutObjectError>(self.config.clone(), err.into())
456                })?;
457            Ok(())
458        }
459    }
460
461    async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
462        fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal(
463            "s3 streaming upload error"
464        )));
465        Ok(S3StreamingUploader::new(
466            self.client.clone(),
467            self.bucket.clone(),
468            path.to_owned(),
469            self.metrics.clone(),
470            self.config.clone(),
471        ))
472    }
473
474    /// Amazon S3 doesn't support retrieving multiple ranges of data per GET request.
475    async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
476        fail_point!("s3_read_err", |_| Err(ObjectError::internal(
477            "s3 read error"
478        )));
479
480        let val = match self.obj_store_request(path, range.clone()).send().await {
481            Ok(resp) => resp
482                .body
483                .collect()
484                .await
485                .map_err(|err| {
486                    set_error_should_retry::<GetObjectError>(self.config.clone(), err.into())
487                })?
488                .into_bytes(),
489            Err(sdk_err) => {
490                return Err(set_error_should_retry::<GetObjectError>(
491                    self.config.clone(),
492                    sdk_err.into(),
493                ));
494            }
495        };
496
497        if let Some(len) = range.len()
498            && len != val.len()
499        {
500            return Err(ObjectError::internal(format!(
501                "mismatched size: expected {}, found {} when reading {} at {:?}",
502                len,
503                val.len(),
504                path,
505                range,
506            )));
507        }
508
509        Ok(val)
510    }
511
512    async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
513        fail_point!("s3_metadata_err", |_| Err(ObjectError::internal(
514            "s3 metadata error"
515        )));
516        let resp = self
517            .client
518            .head_object()
519            .bucket(&self.bucket)
520            .key(path)
521            .send()
522            .await
523            .map_err(|err| {
524                set_error_should_retry::<HeadObjectError>(self.config.clone(), err.into())
525            })?;
526        Ok(ObjectMetadata {
527            key: path.to_owned(),
528            last_modified: resp
529                .last_modified()
530                .expect("last_modified required")
531                .as_secs_f64(),
532            total_size: resp.content_length.unwrap_or_default() as usize,
533        })
534    }
535
536    /// Returns a stream reading the object specified in `path`. If given, the stream starts at the
537    /// byte with index `start_pos` (0-based). As far as possible, the stream only loads the amount
538    /// of data into memory that is read from the stream.
539    async fn streaming_read(
540        &self,
541        path: &str,
542        range: Range<usize>,
543    ) -> ObjectResult<ObjectDataStream> {
544        fail_point!("s3_streaming_read_init_err", |_| Err(
545            ObjectError::internal("s3 streaming read init error")
546        ));
547
548        let resp = match self.obj_store_request(path, range.clone()).send().await {
549            Ok(resp) => resp,
550            Err(sdk_err) => {
551                return Err(set_error_should_retry::<GetObjectError>(
552                    self.config.clone(),
553                    sdk_err.into(),
554                ));
555            }
556        };
557
558        let reader = FuturesStreamCompatByteStream::new(resp.body);
559
560        Ok(Box::pin(
561            reader
562                .into_stream()
563                .map(|item| item.map_err(ObjectError::from)),
564        ))
565    }
566
567    /// Permanently deletes the whole object.
568    /// According to Amazon S3, this will simply return Ok if the object does not exist.
569    async fn delete(&self, path: &str) -> ObjectResult<()> {
570        fail_point!("s3_delete_err", |_| Err(ObjectError::internal(
571            "s3 delete error"
572        )));
573        self.client
574            .delete_object()
575            .bucket(&self.bucket)
576            .key(path)
577            .send()
578            .await
579            .map_err(|err| {
580                set_error_should_retry::<DeleteObjectError>(self.config.clone(), err.into())
581            })?;
582        Ok(())
583    }
584
585    /// Deletes the objects with the given paths permanently from the storage. If an object
586    /// specified in the request is not found, it will be considered as successfully deleted.
587    ///
588    /// Uses AWS' `DeleteObjects` API. See [AWS Docs](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) for more details.
589    async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
590        // AWS restricts the number of objects per request to 1000.
591        const MAX_LEN: usize = 1000;
592        let mut all_errors = Vec::new();
593
594        // If needed, split given set into subsets of size with no more than `MAX_LEN` objects.
595        for start_idx /* inclusive */ in (0..paths.len()).step_by(MAX_LEN) {
596            let end_idx /* exclusive */ = cmp::min(paths.len(), start_idx + MAX_LEN);
597            let slice = &paths[start_idx..end_idx];
598            // Create identifiers from paths.
599            let mut obj_ids = Vec::with_capacity(slice.len());
600            for path in slice {
601                obj_ids.push(ObjectIdentifier::builder().key(path).build().unwrap());
602            }
603
604            // Build and submit request to delete objects.
605            let delete_builder = Delete::builder().set_objects(Some(obj_ids));
606            let delete_output = self
607                .client
608                .delete_objects()
609                .bucket(&self.bucket)
610                .delete(delete_builder.build().unwrap()).send()
611                .await.map_err(|err| {
612                    set_error_should_retry::<DeleteObjectsError>(self.config.clone(),err.into())
613                })?;
614
615            // Check if there were errors.
616            if !delete_output.errors().is_empty() {
617                all_errors.append(&mut delete_output.errors().to_owned());
618            }
619        }
620        if !all_errors.is_empty() {
621            return Err(ObjectError::internal(format!(
622                "DeleteObjects request returned exception for some objects: {:?}",
623                all_errors
624            )));
625        }
626
627        Ok(())
628    }
629
630    async fn list(
631        &self,
632        prefix: &str,
633        start_after: Option<String>,
634        limit: Option<usize>,
635    ) -> ObjectResult<ObjectMetadataIter> {
636        Ok(Box::pin(
637            S3ObjectIter::new(
638                self.client.clone(),
639                self.bucket.clone(),
640                prefix.to_owned(),
641                self.config.clone(),
642                start_after,
643            )
644            .take(limit.unwrap_or(usize::MAX)),
645        ))
646    }
647
648    fn store_media_type(&self) -> &'static str {
649        "s3"
650    }
651}
652
653impl S3ObjectStore {
654    pub fn new_http_client(config: &ObjectStoreConfig) -> impl HttpClient + use<> {
655        let nodelay = config.s3.nodelay;
656        let pool_idle_timeout = config.s3.keepalive_ms.map(Duration::from_millis);
657
658        // Use the Smithy default (hyper 1.x) client stack. This avoids the deprecated hyper 0.14
659        // connector path (`aws-smithy-runtime/tls-rustls`).
660        let tls_provider = tls::Provider::Rustls(tls::rustls_provider::CryptoMode::AwsLc);
661        SmithyHttpClientBuilder::new().build_with_connector_fn(move |settings, _components| {
662            let mut builder = SmithyConnector::builder();
663
664            if let Some(settings) = settings {
665                builder = builder.connector_settings(settings.clone());
666            }
667
668            if let Some(nodelay) = nodelay {
669                builder = builder.enable_tcp_nodelay(nodelay);
670            }
671
672            if let Some(pool_idle_timeout) = pool_idle_timeout {
673                builder = builder.pool_idle_timeout(pool_idle_timeout);
674            }
675
676            builder.tls_provider(tls_provider.clone()).build()
677        })
678    }
679
680    /// Creates an S3 object store from environment variable.
681    ///
682    /// See [AWS Docs](https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credentials.html) on how to provide credentials and region from env variable. If you are running compute-node on EC2, no configuration is required.
683    pub async fn new_with_config(
684        bucket: String,
685        metrics: Arc<ObjectStoreMetrics>,
686        config: Arc<ObjectStoreConfig>,
687    ) -> Self {
688        let sdk_config_loader = aws_config::from_env().http_client(Self::new_http_client(&config));
689
690        // Retry 3 times if we get server-side errors or throttling errors
691        let client = match std::env::var("RW_S3_ENDPOINT") {
692            Ok(endpoint) => {
693                // s3 compatible storage
694                let is_force_path_style = match std::env::var("RW_IS_FORCE_PATH_STYLE") {
695                    Ok(value) => value == "true",
696                    Err(_) => false,
697                };
698
699                let sdk_config = sdk_config_loader.load().await;
700                #[cfg(madsim)]
701                let client = Client::new(&sdk_config);
702                #[cfg(not(madsim))]
703                let client = Client::from_conf(
704                    aws_sdk_s3::config::Builder::from(&sdk_config)
705                        .endpoint_url(endpoint)
706                        .force_path_style(is_force_path_style)
707                        .identity_cache(
708                            aws_sdk_s3::config::IdentityCache::lazy()
709                                .load_timeout(Duration::from_secs(
710                                    config.s3.identity_resolution_timeout_s,
711                                ))
712                                .build(),
713                        )
714                        .stalled_stream_protection(
715                            aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
716                        )
717                        .build(),
718                );
719                client
720            }
721            Err(_) => {
722                // s3
723                let sdk_config = sdk_config_loader.load().await;
724                #[cfg(madsim)]
725                let client = Client::new(&sdk_config);
726                #[cfg(not(madsim))]
727                let client = Client::from_conf(
728                    aws_sdk_s3::config::Builder::from(&sdk_config)
729                        .identity_cache(
730                            aws_sdk_s3::config::IdentityCache::lazy()
731                                .load_timeout(Duration::from_secs(
732                                    config.s3.identity_resolution_timeout_s,
733                                ))
734                                .build(),
735                        )
736                        .stalled_stream_protection(
737                            aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
738                        )
739                        .build(),
740                );
741                client
742            }
743        };
744
745        Self {
746            client,
747            bucket,
748            metrics,
749            config,
750        }
751    }
752
753    /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
754    pub async fn new_minio_engine(
755        server: &str,
756        metrics: Arc<ObjectStoreMetrics>,
757        object_store_config: Arc<ObjectStoreConfig>,
758    ) -> Self {
759        let server = server.strip_prefix("minio://").unwrap();
760        let (access_key_id, rest) = server.split_once(':').unwrap();
761        let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
762
763        let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
764            rest = rest_stripped;
765            "https://"
766        } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
767            rest = rest_stripped;
768            "http://"
769        } else {
770            "http://"
771        };
772        let (address, bucket) = rest.split_once('/').unwrap();
773
774        #[cfg(madsim)]
775        let builder = aws_sdk_s3::config::Builder::new().credentials_provider(
776            Credentials::from_keys(access_key_id, secret_access_key, None),
777        );
778        #[cfg(not(madsim))]
779        let builder = aws_sdk_s3::config::Builder::from(
780            &aws_config::ConfigLoader::default()
781                // FIXME: https://github.com/awslabs/aws-sdk-rust/issues/973
782                .credentials_provider(Credentials::from_keys(
783                    access_key_id,
784                    secret_access_key,
785                    None,
786                ))
787                .load()
788                .await,
789        )
790        .force_path_style(true)
791        .identity_cache(
792            aws_sdk_s3::config::IdentityCache::lazy()
793                .load_timeout(Duration::from_secs(
794                    object_store_config.s3.identity_resolution_timeout_s,
795                ))
796                .build(),
797        )
798        .http_client(Self::new_http_client(&object_store_config))
799        .behavior_version_latest()
800        .stalled_stream_protection(aws_sdk_s3::config::StalledStreamProtectionConfig::disabled());
801        let config = builder
802            .region(Region::new("custom"))
803            .endpoint_url(format!("{}{}", endpoint_prefix, address))
804            .build();
805        let client = Client::from_conf(config);
806
807        Self {
808            client,
809            bucket: bucket.to_owned(),
810            metrics,
811            config: object_store_config,
812        }
813    }
814
815    /// Generates an HTTP GET request to download the object specified in `path`. If given,
816    /// `start_pos` and `end_pos` specify the first and last byte to download, respectively. Both
817    /// are inclusive and 0-based. For example, set `start_pos = 0` and `end_pos = 7` to download
818    /// the first 8 bytes. If neither is given, the request will download the whole object.
819    fn obj_store_request(
820        &self,
821        path: &str,
822        range: impl ObjectRangeBounds,
823    ) -> GetObjectFluentBuilder {
824        let req = self.client.get_object().bucket(&self.bucket).key(path);
825        if range.is_full() {
826            return req;
827        }
828
829        let start = range.start().map(|v| v.to_string()).unwrap_or_default();
830        let end = range.end().map(|v| (v - 1).to_string()).unwrap_or_default(); // included
831
832        req.range(format!("bytes={}-{}", start, end))
833    }
834
835    // When multipart upload is aborted, if any part uploads are in progress, those part uploads
836    // might or might not succeed. As a result, these parts will remain in the bucket and be
837    // charged for part storage. Therefore, we need to configure the bucket to purge stale
838    // parts.
839    //
840    /// Note: This configuration only works for S3. MinIO automatically enables this feature, and it
841    /// is not configurable with S3 sdk. To verify that this feature is enabled, use `mc admin
842    /// config get <alias> api`.
843    ///
844    /// Reference:
845    /// - S3
846    ///   - <https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html>
847    ///   - <https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html>
848    /// - MinIO
849    ///   - <https://github.com/minio/minio/issues/15681#issuecomment-1245126561>
850    pub async fn configure_bucket_lifecycle(&self, data_directory: &str) -> bool {
851        // Check if lifecycle is already configured to avoid overriding existing configuration.
852        let bucket = self.bucket.as_str();
853        let mut configured_rules = vec![];
854        let get_config_result = self
855            .client
856            .get_bucket_lifecycle_configuration()
857            .bucket(bucket)
858            .send()
859            .await;
860        let mut is_expiration_configured = false;
861
862        if let Ok(config) = &get_config_result {
863            for rule in config.rules() {
864                if rule.expiration().is_some() {
865                    // When both of the conditions are met, it is considered that there is a risk of data deletion.
866                    //
867                    // 1. expiration status rule is enabled
868                    // 2. (a) prefix filter is not set
869                    // or (b) prefix filter is set to the data directory of RisingWave.
870                    //
871                    // P.S. 1 && (2a || 2b)
872                    is_expiration_configured |= rule.status == ExpirationStatus::Enabled // 1
873                    && match rule.filter().as_ref() {
874                        // 2a
875                        None => true,
876                        // 2b
877                        Some(filter) => {
878                            // `LifecycleRuleFilter` is a struct in newer aws-sdk-s3 versions.
879                            //
880                            // Treat an "empty filter" as applying to the entire bucket, which is
881                            // considered risky for RisingWave data deletion.
882                            let is_empty = filter.prefix().is_none()
883                                && filter.tag().is_none()
884                                && filter.and().is_none()
885                                && filter.object_size_greater_than().is_none()
886                                && filter.object_size_less_than().is_none();
887                            if is_empty {
888                                true
889                            } else {
890                                filter
891                                    .prefix()
892                                    .is_some_and(|prefix| data_directory.starts_with(prefix))
893                            }
894                        }
895                    };
896
897                    if matches!(rule.status(), ExpirationStatus::Enabled)
898                        && rule.abort_incomplete_multipart_upload().is_some()
899                    {
900                        configured_rules.push(rule);
901                    }
902                }
903            }
904        }
905
906        if !configured_rules.is_empty() {
907            tracing::info!(
908                "S3 bucket {} has already configured AbortIncompleteMultipartUpload: {:?}",
909                bucket,
910                configured_rules,
911            );
912        } else {
913            let bucket_lifecycle_rule = LifecycleRule::builder()
914                .id("abort-incomplete-multipart-upload")
915                .status(ExpirationStatus::Enabled)
916                // Empty prefix means "all objects".
917                .filter(LifecycleRuleFilter::builder().prefix("").build())
918                .abort_incomplete_multipart_upload(
919                    AbortIncompleteMultipartUpload::builder()
920                        .days_after_initiation(S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS)
921                        .build(),
922                )
923                .build()
924                .unwrap();
925            let bucket_lifecycle_config = BucketLifecycleConfiguration::builder()
926                .rules(bucket_lifecycle_rule)
927                .build()
928                .unwrap();
929            if self
930                .client
931                .put_bucket_lifecycle_configuration()
932                .bucket(bucket)
933                .lifecycle_configuration(bucket_lifecycle_config)
934                .send()
935                .await
936                .is_ok()
937            {
938                tracing::info!(
939                    "S3 bucket {:?} is configured to automatically purge abandoned MultipartUploads after {} days",
940                    bucket,
941                    S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS,
942                );
943            } else {
944                tracing::warn!(
945                    "Failed to configure life cycle rule for S3 bucket: {:?}. It is recommended to configure it manually to avoid unnecessary storage cost.",
946                    bucket
947                );
948            }
949        }
950        if is_expiration_configured {
951            tracing::info!(
952                "S3 bucket {} has already configured the expiration for the lifecycle.",
953                bucket,
954            );
955        }
956        is_expiration_configured
957    }
958}
959
960struct S3ObjectIter {
961    buffer: VecDeque<ObjectMetadata>,
962    client: Client,
963    bucket: String,
964    prefix: String,
965    next_continuation_token: Option<String>,
966    is_truncated: Option<bool>,
967    #[allow(clippy::type_complexity)]
968    send_future: Option<
969        BoxFuture<
970            'static,
971            Result<(Vec<ObjectMetadata>, Option<String>, Option<bool>), ObjectError>,
972        >,
973    >,
974
975    config: Arc<ObjectStoreConfig>,
976    start_after: Option<String>,
977}
978
979impl S3ObjectIter {
980    fn new(
981        client: Client,
982        bucket: String,
983        prefix: String,
984        config: Arc<ObjectStoreConfig>,
985        start_after: Option<String>,
986    ) -> Self {
987        Self {
988            buffer: VecDeque::default(),
989            client,
990            bucket,
991            prefix,
992            next_continuation_token: None,
993            is_truncated: Some(true),
994            send_future: None,
995            config,
996            start_after,
997        }
998    }
999}
1000
1001impl Stream for S3ObjectIter {
1002    type Item = ObjectResult<ObjectMetadata>;
1003
1004    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1005        if let Some(e) = self.buffer.pop_front() {
1006            return Poll::Ready(Some(Ok(e)));
1007        }
1008        if let Some(f) = self.send_future.as_mut() {
1009            return match ready!(f.poll_unpin(cx)) {
1010                Ok((more, next_continuation_token, is_truncated)) => {
1011                    self.next_continuation_token = next_continuation_token;
1012                    self.is_truncated = is_truncated;
1013                    self.buffer.extend(more);
1014                    self.send_future = None;
1015                    // only the first request may set start_after
1016                    self.start_after = None;
1017                    self.poll_next(cx)
1018                }
1019                Err(e) => {
1020                    self.send_future = None;
1021                    Poll::Ready(Some(Err(e)))
1022                }
1023            };
1024        }
1025        if !self.is_truncated.unwrap_or_default() {
1026            return Poll::Ready(None);
1027        }
1028        let mut request = self
1029            .client
1030            .list_objects_v2()
1031            .bucket(&self.bucket)
1032            .prefix(&self.prefix);
1033        #[cfg(not(madsim))]
1034        if let Some(start_after) = self.start_after.as_ref() {
1035            request = request.start_after(start_after);
1036        }
1037        if let Some(continuation_token) = self.next_continuation_token.as_ref() {
1038            request = request.continuation_token(continuation_token);
1039        }
1040        let config = self.config.clone();
1041        let f = async move {
1042            match request.send().await {
1043                Ok(r) => {
1044                    let more = r
1045                        .contents()
1046                        .iter()
1047                        .map(|obj| ObjectMetadata {
1048                            key: obj.key().expect("key required").to_owned(),
1049                            last_modified: obj
1050                                .last_modified()
1051                                .map(|l| l.as_secs_f64())
1052                                .unwrap_or(0f64),
1053                            total_size: obj.size().unwrap_or_default() as usize,
1054                        })
1055                        .collect_vec();
1056                    let is_truncated = r.is_truncated;
1057                    let next_continuation_token = r.next_continuation_token;
1058                    Ok((more, next_continuation_token, is_truncated))
1059                }
1060                Err(e) => Err(set_error_should_retry::<ListObjectsV2Error>(
1061                    config,
1062                    e.into(),
1063                )),
1064            }
1065        };
1066        self.send_future = Some(Box::pin(f));
1067        self.poll_next(cx)
1068    }
1069}
1070
1071fn set_error_should_retry<E>(config: Arc<ObjectStoreConfig>, object_err: ObjectError) -> ObjectError
1072where
1073    E: ProvideErrorMetadata + Into<BoxError> + Sync + Send + std::error::Error + 'static,
1074{
1075    let not_found = object_err.is_object_not_found_error();
1076
1077    if not_found {
1078        return object_err;
1079    }
1080
1081    let mut inner = object_err.into_inner();
1082    match inner.borrow_mut() {
1083        ObjectErrorInner::S3 {
1084            should_retry,
1085            inner,
1086        } => {
1087            let sdk_err = inner
1088                .as_ref()
1089                .downcast_ref::<SdkError<E, aws_smithy_runtime_api::http::Response<SdkBody>>>();
1090
1091            let err_should_retry = match sdk_err {
1092                Some(SdkError::DispatchFailure(e)) => {
1093                    if e.is_timeout() {
1094                        tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request.");
1095                        true
1096                    } else {
1097                        false
1098                    }
1099                }
1100
1101                Some(SdkError::ServiceError(e)) => match e.err().code() {
1102                    None => {
1103                        if config.s3.developer.retry_unknown_service_error
1104                            || config.s3.retry_unknown_service_error
1105                        {
1106                            tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
1107                            true
1108                        } else {
1109                            false
1110                        }
1111                    }
1112                    Some(code) => {
1113                        if config
1114                            .s3
1115                            .developer
1116                            .retryable_service_error_codes
1117                            .iter()
1118                            .any(|s| s.as_str().eq_ignore_ascii_case(code))
1119                        {
1120                            tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
1121                            true
1122                        } else {
1123                            false
1124                        }
1125                    }
1126                },
1127
1128                Some(SdkError::TimeoutError(_err)) => true,
1129
1130                _ => false,
1131            };
1132
1133            *should_retry = err_should_retry;
1134        }
1135
1136        _ => unreachable!(),
1137    }
1138
1139    ObjectError::from(inner)
1140}
1141
1142#[cfg(test)]
1143#[cfg(not(madsim))]
1144mod tests {
1145    use crate::object::prefix::s3::{NUM_BUCKET_PREFIXES, get_object_prefix};
1146
1147    fn get_hash_of_object(obj_id: u64) -> u32 {
1148        let crc_hash = crc32fast::hash(&obj_id.to_be_bytes());
1149        crc_hash % NUM_BUCKET_PREFIXES
1150    }
1151
1152    #[tokio::test]
1153    async fn test_get_object_prefix() {
1154        for obj_id in 0..99999 {
1155            let hash = get_hash_of_object(obj_id);
1156            let prefix = get_object_prefix(obj_id);
1157            assert_eq!(format!("{}/", hash), prefix);
1158        }
1159
1160        let obj_prefix = String::default();
1161        let path = format!("{}/{}{}.data", "hummock_001", obj_prefix, 101);
1162        assert_eq!("hummock_001/101.data", path);
1163    }
1164}