use std::borrow::BorrowMut;
use std::cmp;
use std::collections::VecDeque;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use std::time::Duration;
use await_tree::InstrumentAwait;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::BoxError;
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
use aws_sdk_s3::operation::delete_object::DeleteObjectError;
use aws_sdk_s3::operation::delete_objects::DeleteObjectsError;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error;
use aws_sdk_s3::operation::put_object::PutObjectError;
use aws_sdk_s3::operation::upload_part::UploadPartOutput;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{
AbortIncompleteMultipartUpload, BucketLifecycleConfiguration, CompletedMultipartUpload,
CompletedPart, Delete, ExpirationStatus, LifecycleRule, LifecycleRuleFilter, ObjectIdentifier,
};
use aws_sdk_s3::Client;
use aws_smithy_http::futures_stream_adapter::FuturesStreamCompatByteStream;
use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
use aws_smithy_runtime_api::client::http::HttpClient;
use aws_smithy_runtime_api::client::result::SdkError;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::error::metadata::ProvideErrorMetadata;
use fail::fail_point;
use futures::future::{try_join_all, BoxFuture, FutureExt};
use futures::{stream, Stream, StreamExt, TryStreamExt};
use hyper::Body;
use itertools::Itertools;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::monitor::monitor_connector;
use risingwave_common::range::RangeBoundsExt;
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use super::object_metrics::ObjectStoreMetrics;
use super::{
prefix, retry_request, Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds,
ObjectResult, ObjectStore, StreamingUploader,
};
use crate::object::{
try_update_failure_metric, ObjectDataStream, ObjectMetadataIter, OperationType,
};
type PartId = i32;
const MIN_PART_ID: PartId = 1;
const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
pub struct S3StreamingUploader {
client: Client,
part_size: usize,
bucket: String,
key: String,
upload_id: Option<String>,
next_part_id: PartId,
join_handles: Vec<JoinHandle<ObjectResult<(PartId, UploadPartOutput)>>>,
buf: Vec<Bytes>,
not_uploaded_len: usize,
metrics: Arc<ObjectStoreMetrics>,
config: Arc<ObjectStoreConfig>,
}
impl S3StreamingUploader {
const MEDIA_TYPE: &'static str = "s3";
pub fn new(
client: Client,
bucket: String,
key: String,
metrics: Arc<ObjectStoreMetrics>,
config: Arc<ObjectStoreConfig>,
) -> S3StreamingUploader {
const MIN_PART_SIZE: usize = 5 * 1024 * 1024;
const MAX_PART_SIZE: usize = 5 * 1024 * 1024 * 1024;
let part_size = config.upload_part_size.clamp(MIN_PART_SIZE, MAX_PART_SIZE);
Self {
client,
bucket,
part_size,
key,
upload_id: None,
next_part_id: MIN_PART_ID,
join_handles: Default::default(),
buf: Default::default(),
not_uploaded_len: 0,
metrics,
config,
}
}
async fn upload_next_part(&mut self) -> ObjectResult<()> {
let operation_type = OperationType::StreamingUpload;
let operation_type_str = operation_type.as_str();
if self.upload_id.is_none() {
let builder = || async {
self.client
.create_multipart_upload()
.bucket(&self.bucket)
.key(&self.key)
.send()
.await
.map_err(|err| {
set_error_should_retry::<CreateMultipartUploadError>(
self.config.clone(),
err.into(),
)
})
};
let resp = retry_request(
builder,
&self.config,
OperationType::StreamingUploadInit,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(
&self.metrics,
&resp,
OperationType::StreamingUploadInit.as_str(),
);
self.upload_id = Some(resp?.upload_id.unwrap());
}
let data = self.buf.drain(..).collect_vec();
let len = self.not_uploaded_len;
debug_assert_eq!(
data.iter().map(|b| b.len()).sum::<usize>(),
self.not_uploaded_len
);
let part_id = self.next_part_id;
self.next_part_id += 1;
let client_cloned = self.client.clone();
let bucket = self.bucket.clone();
let key = self.key.clone();
let upload_id = self.upload_id.clone().unwrap();
let metrics = self.metrics.clone();
metrics
.operation_size
.with_label_values(&[operation_type_str])
.observe(len as f64);
let config = self.config.clone();
self.join_handles.push(tokio::spawn(async move {
let _timer = metrics
.operation_latency
.with_label_values(&["s3", operation_type_str])
.start_timer();
let builder = || async {
client_cloned
.upload_part()
.bucket(bucket.clone())
.key(key.clone())
.upload_id(upload_id.clone())
.part_number(part_id)
.body(get_upload_body(data.clone()))
.content_length(len as i64)
.send()
.await
.map_err(|err| {
set_error_should_retry::<CreateMultipartUploadError>(
config.clone(),
err.into(),
)
})
};
let res = retry_request(
builder,
&config,
operation_type,
metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&metrics, &res, operation_type_str);
Ok((part_id, res?))
}));
Ok(())
}
async fn flush_multipart_and_complete(&mut self) -> ObjectResult<()> {
let operation_type = OperationType::StreamingUploadFinish;
if !self.buf.is_empty() {
self.upload_next_part().await?;
}
let join_handles = self.join_handles.drain(..).collect_vec();
let mut uploaded_parts = Vec::with_capacity(join_handles.len());
for result in try_join_all(join_handles)
.await
.map_err(ObjectError::internal)?
{
uploaded_parts.push(result?);
}
let completed_parts = Some(
uploaded_parts
.iter()
.map(|(part_id, output)| {
CompletedPart::builder()
.set_e_tag(output.e_tag.clone())
.set_part_number(Some(*part_id))
.build()
})
.collect_vec(),
);
let builder = || async {
self.client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(&self.key)
.upload_id(self.upload_id.as_ref().unwrap())
.multipart_upload(
CompletedMultipartUpload::builder()
.set_parts(completed_parts.clone())
.build(),
)
.send()
.await
.map_err(|err| {
set_error_should_retry::<CompleteMultipartUploadError>(
self.config.clone(),
err.into(),
)
})
};
let res = retry_request(
builder,
&self.config,
operation_type,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
let _res = res?;
Ok(())
}
async fn abort_multipart_upload(&self) -> ObjectResult<()> {
self.client
.abort_multipart_upload()
.bucket(&self.bucket)
.key(&self.key)
.upload_id(self.upload_id.as_ref().unwrap())
.send()
.await
.map_err(|err| {
set_error_should_retry::<AbortMultipartUploadError>(self.config.clone(), err.into())
})?;
Ok(())
}
}
impl StreamingUploader for S3StreamingUploader {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal(
"s3 write bytes error"
)));
let data_len = data.len();
self.not_uploaded_len += data_len;
self.buf.push(data);
if self.not_uploaded_len >= self.part_size {
self.upload_next_part()
.verbose_instrument_await("s3_upload_next_part")
.await?;
self.not_uploaded_len = 0;
}
Ok(())
}
async fn finish(mut self) -> ObjectResult<()> {
fail_point!("s3_finish_streaming_upload_err", |_| Err(
ObjectError::internal("s3 finish streaming upload error")
));
if self.upload_id.is_none() {
debug_assert!(self.join_handles.is_empty());
if self.buf.is_empty() {
debug_assert_eq!(self.not_uploaded_len, 0);
Err(ObjectError::internal("upload empty object"))
} else {
let operation_type = OperationType::Upload;
let builder = || async {
self.client
.put_object()
.bucket(&self.bucket)
.body(get_upload_body(self.buf.clone()))
.content_length(self.not_uploaded_len as i64)
.key(&self.key)
.send()
.verbose_instrument_await("s3_put_object")
.await
.map_err(|err| {
set_error_should_retry::<PutObjectError>(
self.config.clone(),
err.into(),
)
})
};
let res = retry_request(
builder,
&self.config,
operation_type,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
res?;
Ok(())
}
} else if let Err(e) = self
.flush_multipart_and_complete()
.verbose_instrument_await("s3_flush_multipart_and_complete")
.await
{
tracing::warn!(key = self.key, error = %e.as_report(), "Failed to upload object");
self.abort_multipart_upload().await?;
Err(e)
} else {
Ok(())
}
}
fn get_memory_usage(&self) -> u64 {
self.part_size as u64
}
}
fn get_upload_body(data: Vec<Bytes>) -> ByteStream {
SdkBody::retryable(move || {
Body::wrap_stream(stream::iter(data.clone().into_iter().map(ObjectResult::Ok))).into()
})
.into()
}
#[derive(Clone)]
pub struct S3ObjectStore {
client: Client,
bucket: String,
metrics: Arc<ObjectStoreMetrics>,
config: Arc<ObjectStoreConfig>,
}
#[async_trait::async_trait]
impl ObjectStore for S3ObjectStore {
type StreamingUploader = S3StreamingUploader;
fn get_object_prefix(&self, obj_id: u64, _use_new_object_prefix_strategy: bool) -> String {
prefix::s3::get_object_prefix(obj_id)
}
async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
fail_point!("s3_upload_err", |_| Err(ObjectError::internal(
"s3 upload error"
)));
if obj.is_empty() {
Err(ObjectError::internal("upload empty object"))
} else {
self.client
.put_object()
.bucket(&self.bucket)
.body(ByteStream::from(obj))
.key(path)
.send()
.await
.map_err(|err| {
set_error_should_retry::<PutObjectError>(self.config.clone(), err.into())
})?;
Ok(())
}
}
async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal(
"s3 streaming upload error"
)));
Ok(S3StreamingUploader::new(
self.client.clone(),
self.bucket.clone(),
path.to_string(),
self.metrics.clone(),
self.config.clone(),
))
}
async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
fail_point!("s3_read_err", |_| Err(ObjectError::internal(
"s3 read error"
)));
let val = match self.obj_store_request(path, range.clone()).send().await {
Ok(resp) => resp
.body
.collect()
.await
.map_err(|err| {
set_error_should_retry::<GetObjectError>(self.config.clone(), err.into())
})?
.into_bytes(),
Err(sdk_err) => {
return Err(set_error_should_retry::<GetObjectError>(
self.config.clone(),
sdk_err.into(),
));
}
};
if let Some(len) = range.len()
&& len != val.len()
{
return Err(ObjectError::internal(format!(
"mismatched size: expected {}, found {} when reading {} at {:?}",
len,
val.len(),
path,
range,
)));
}
Ok(val)
}
async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
fail_point!("s3_metadata_err", |_| Err(ObjectError::internal(
"s3 metadata error"
)));
let resp = self
.client
.head_object()
.bucket(&self.bucket)
.key(path)
.send()
.await
.map_err(|err| {
set_error_should_retry::<HeadObjectError>(self.config.clone(), err.into())
})?;
Ok(ObjectMetadata {
key: path.to_owned(),
last_modified: resp
.last_modified()
.expect("last_modified required")
.as_secs_f64(),
total_size: resp.content_length.unwrap_or_default() as usize,
})
}
async fn streaming_read(
&self,
path: &str,
range: Range<usize>,
) -> ObjectResult<ObjectDataStream> {
fail_point!("s3_streaming_read_init_err", |_| Err(
ObjectError::internal("s3 streaming read init error")
));
let resp = match self.obj_store_request(path, range.clone()).send().await {
Ok(resp) => resp,
Err(sdk_err) => {
return Err(set_error_should_retry::<GetObjectError>(
self.config.clone(),
sdk_err.into(),
));
}
};
let reader = FuturesStreamCompatByteStream::new(resp.body);
Ok(Box::pin(
reader
.into_stream()
.map(|item| item.map_err(ObjectError::from)),
))
}
async fn delete(&self, path: &str) -> ObjectResult<()> {
fail_point!("s3_delete_err", |_| Err(ObjectError::internal(
"s3 delete error"
)));
self.client
.delete_object()
.bucket(&self.bucket)
.key(path)
.send()
.await
.map_err(|err| {
set_error_should_retry::<DeleteObjectError>(self.config.clone(), err.into())
})?;
Ok(())
}
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
const MAX_LEN: usize = 1000;
let mut all_errors = Vec::new();
for start_idx in (0..paths.len()).step_by(MAX_LEN) {
let end_idx = cmp::min(paths.len(), start_idx + MAX_LEN);
let slice = &paths[start_idx..end_idx];
let mut obj_ids = Vec::with_capacity(slice.len());
for path in slice {
obj_ids.push(ObjectIdentifier::builder().key(path).build().unwrap());
}
let delete_builder = Delete::builder().set_objects(Some(obj_ids));
let delete_output = self
.client
.delete_objects()
.bucket(&self.bucket)
.delete(delete_builder.build().unwrap()).send()
.await.map_err(|err| {
set_error_should_retry::<DeleteObjectsError>(self.config.clone(),err.into())
})?;
if !delete_output.errors().is_empty() {
all_errors.append(&mut delete_output.errors().to_owned());
}
}
if !all_errors.is_empty() {
return Err(ObjectError::internal(format!(
"DeleteObjects request returned exception for some objects: {:?}",
all_errors
)));
}
Ok(())
}
async fn list(
&self,
prefix: &str,
start_after: Option<String>,
limit: Option<usize>,
) -> ObjectResult<ObjectMetadataIter> {
Ok(Box::pin(
S3ObjectIter::new(
self.client.clone(),
self.bucket.clone(),
prefix.to_string(),
self.config.clone(),
start_after,
)
.take(limit.unwrap_or(usize::MAX)),
))
}
fn store_media_type(&self) -> &'static str {
"s3"
}
}
impl S3ObjectStore {
pub fn new_http_client(config: &ObjectStoreConfig) -> impl HttpClient {
let mut http = hyper::client::HttpConnector::new();
if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
http.set_keepalive(Some(Duration::from_millis(*keepalive_ms)));
}
if let Some(nodelay) = config.s3.nodelay.as_ref() {
http.set_nodelay(*nodelay);
}
if let Some(recv_buffer_size) = config.s3.recv_buffer_size.as_ref() {
http.set_recv_buffer_size(Some(*recv_buffer_size));
}
if let Some(send_buffer_size) = config.s3.send_buffer_size.as_ref() {
http.set_send_buffer_size(Some(*send_buffer_size));
}
http.enforce_http(false);
let conn = hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_all_versions()
.wrap_connector(http);
let conn = monitor_connector(conn, "S3");
HyperClientBuilder::new().build(conn)
}
pub async fn new_with_config(
bucket: String,
metrics: Arc<ObjectStoreMetrics>,
config: Arc<ObjectStoreConfig>,
) -> Self {
let sdk_config_loader = aws_config::from_env().http_client(Self::new_http_client(&config));
let client = match std::env::var("RW_S3_ENDPOINT") {
Ok(endpoint) => {
let is_force_path_style = match std::env::var("RW_IS_FORCE_PATH_STYLE") {
Ok(value) => value == "true",
Err(_) => false,
};
let sdk_config = sdk_config_loader.load().await;
#[cfg(madsim)]
let client = Client::new(&sdk_config);
#[cfg(not(madsim))]
let client = Client::from_conf(
aws_sdk_s3::config::Builder::from(&sdk_config)
.endpoint_url(endpoint)
.force_path_style(is_force_path_style)
.identity_cache(
aws_sdk_s3::config::IdentityCache::lazy()
.load_timeout(Duration::from_secs(
config.s3.identity_resolution_timeout_s,
))
.build(),
)
.stalled_stream_protection(
aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
)
.build(),
);
client
}
Err(_) => {
let sdk_config = sdk_config_loader.load().await;
#[cfg(madsim)]
let client = Client::new(&sdk_config);
#[cfg(not(madsim))]
let client = Client::from_conf(
aws_sdk_s3::config::Builder::from(&sdk_config)
.identity_cache(
aws_sdk_s3::config::IdentityCache::lazy()
.load_timeout(Duration::from_secs(
config.s3.identity_resolution_timeout_s,
))
.build(),
)
.stalled_stream_protection(
aws_sdk_s3::config::StalledStreamProtectionConfig::disabled(),
)
.build(),
);
client
}
};
Self {
client,
bucket,
metrics,
config,
}
}
pub async fn new_minio_engine(
server: &str,
metrics: Arc<ObjectStoreMetrics>,
object_store_config: Arc<ObjectStoreConfig>,
) -> Self {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();
#[cfg(madsim)]
let builder = aws_sdk_s3::config::Builder::new().credentials_provider(
Credentials::from_keys(access_key_id, secret_access_key, None),
);
#[cfg(not(madsim))]
let builder = aws_sdk_s3::config::Builder::from(
&aws_config::ConfigLoader::default()
.credentials_provider(Credentials::from_keys(
access_key_id,
secret_access_key,
None,
))
.load()
.await,
)
.force_path_style(true)
.identity_cache(
aws_sdk_s3::config::IdentityCache::lazy()
.load_timeout(Duration::from_secs(
object_store_config.s3.identity_resolution_timeout_s,
))
.build(),
)
.http_client(Self::new_http_client(&object_store_config))
.behavior_version_latest()
.stalled_stream_protection(aws_sdk_s3::config::StalledStreamProtectionConfig::disabled());
let config = builder
.region(Region::new("custom"))
.endpoint_url(format!("{}{}", endpoint_prefix, address))
.build();
let client = Client::from_conf(config);
Self {
client,
bucket: bucket.to_string(),
metrics,
config: object_store_config,
}
}
fn obj_store_request(
&self,
path: &str,
range: impl ObjectRangeBounds,
) -> GetObjectFluentBuilder {
let req = self.client.get_object().bucket(&self.bucket).key(path);
if range.is_full() {
return req;
}
let start = range.start().map(|v| v.to_string()).unwrap_or_default();
let end = range.end().map(|v| (v - 1).to_string()).unwrap_or_default(); req.range(format!("bytes={}-{}", start, end))
}
pub async fn configure_bucket_lifecycle(&self, data_directory: &str) -> bool {
let bucket = self.bucket.as_str();
let mut configured_rules = vec![];
let get_config_result = self
.client
.get_bucket_lifecycle_configuration()
.bucket(bucket)
.send()
.await;
let mut is_expiration_configured = false;
if let Ok(config) = &get_config_result {
for rule in config.rules() {
if rule.expiration().is_some() {
is_expiration_configured |= rule.status == ExpirationStatus::Enabled && match rule.filter().as_ref() {
None => true,
Some(LifecycleRuleFilter::Prefix(prefix))
if data_directory.starts_with(prefix) =>
{
true
}
_ => false,
};
if matches!(rule.status(), ExpirationStatus::Enabled)
&& rule.abort_incomplete_multipart_upload().is_some()
{
configured_rules.push(rule);
}
}
}
}
if !configured_rules.is_empty() {
tracing::info!(
"S3 bucket {} has already configured AbortIncompleteMultipartUpload: {:?}",
bucket,
configured_rules,
);
} else {
let bucket_lifecycle_rule = LifecycleRule::builder()
.id("abort-incomplete-multipart-upload")
.status(ExpirationStatus::Enabled)
.filter(LifecycleRuleFilter::Prefix(String::new()))
.abort_incomplete_multipart_upload(
AbortIncompleteMultipartUpload::builder()
.days_after_initiation(S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS)
.build(),
)
.build()
.unwrap();
let bucket_lifecycle_config = BucketLifecycleConfiguration::builder()
.rules(bucket_lifecycle_rule)
.build()
.unwrap();
if self
.client
.put_bucket_lifecycle_configuration()
.bucket(bucket)
.lifecycle_configuration(bucket_lifecycle_config)
.send()
.await
.is_ok()
{
tracing::info!(
"S3 bucket {:?} is configured to automatically purge abandoned MultipartUploads after {} days",
bucket,
S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS,
);
} else {
tracing::warn!("Failed to configure life cycle rule for S3 bucket: {:?}. It is recommended to configure it manually to avoid unnecessary storage cost.", bucket);
}
}
if is_expiration_configured {
tracing::info!(
"S3 bucket {} has already configured the expiration for the lifecycle.",
bucket,
);
}
is_expiration_configured
}
}
struct S3ObjectIter {
buffer: VecDeque<ObjectMetadata>,
client: Client,
bucket: String,
prefix: String,
next_continuation_token: Option<String>,
is_truncated: Option<bool>,
#[allow(clippy::type_complexity)]
send_future: Option<
BoxFuture<
'static,
Result<(Vec<ObjectMetadata>, Option<String>, Option<bool>), ObjectError>,
>,
>,
config: Arc<ObjectStoreConfig>,
start_after: Option<String>,
}
impl S3ObjectIter {
fn new(
client: Client,
bucket: String,
prefix: String,
config: Arc<ObjectStoreConfig>,
start_after: Option<String>,
) -> Self {
Self {
buffer: VecDeque::default(),
client,
bucket,
prefix,
next_continuation_token: None,
is_truncated: Some(true),
send_future: None,
config,
start_after,
}
}
}
impl Stream for S3ObjectIter {
type Item = ObjectResult<ObjectMetadata>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(e) = self.buffer.pop_front() {
return Poll::Ready(Some(Ok(e)));
}
if let Some(f) = self.send_future.as_mut() {
return match ready!(f.poll_unpin(cx)) {
Ok((more, next_continuation_token, is_truncated)) => {
self.next_continuation_token = next_continuation_token;
self.is_truncated = is_truncated;
self.buffer.extend(more);
self.send_future = None;
self.start_after = None;
self.poll_next(cx)
}
Err(e) => {
self.send_future = None;
Poll::Ready(Some(Err(e)))
}
};
}
if !self.is_truncated.unwrap_or_default() {
return Poll::Ready(None);
}
let mut request = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(&self.prefix);
#[cfg(not(madsim))]
if let Some(start_after) = self.start_after.as_ref() {
request = request.start_after(start_after);
}
if let Some(continuation_token) = self.next_continuation_token.as_ref() {
request = request.continuation_token(continuation_token);
}
let config = self.config.clone();
let f = async move {
match request.send().await {
Ok(r) => {
let more = r
.contents()
.iter()
.map(|obj| ObjectMetadata {
key: obj.key().expect("key required").to_owned(),
last_modified: obj
.last_modified()
.map(|l| l.as_secs_f64())
.unwrap_or(0f64),
total_size: obj.size().unwrap_or_default() as usize,
})
.collect_vec();
let is_truncated = r.is_truncated;
let next_continuation_token = r.next_continuation_token;
Ok((more, next_continuation_token, is_truncated))
}
Err(e) => Err(set_error_should_retry::<ListObjectsV2Error>(
config,
e.into(),
)),
}
};
self.send_future = Some(Box::pin(f));
self.poll_next(cx)
}
}
fn set_error_should_retry<E>(config: Arc<ObjectStoreConfig>, object_err: ObjectError) -> ObjectError
where
E: ProvideErrorMetadata + Into<BoxError> + Sync + Send + std::error::Error + 'static,
{
let not_found = object_err.is_object_not_found_error();
if not_found {
return object_err;
}
let mut inner = object_err.into_inner();
match inner.borrow_mut() {
ObjectErrorInner::S3 {
should_retry,
inner,
} => {
let sdk_err = inner
.as_ref()
.downcast_ref::<SdkError<E, aws_smithy_runtime_api::http::Response<SdkBody>>>();
let err_should_retry = match sdk_err {
Some(SdkError::DispatchFailure(e)) => {
if e.is_timeout() {
tracing::warn!(target: "http_timeout_retry", "{e:?} occurs, retry S3 get_object request.");
true
} else {
false
}
}
Some(SdkError::ServiceError(e)) => {
let retry = match e.err().code() {
None => {
if config.s3.developer.retry_unknown_service_error
|| config.s3.retry_unknown_service_error
{
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
true
} else {
false
}
}
Some(code) => {
if config
.s3
.developer
.retryable_service_error_codes
.iter()
.any(|s| s.as_str().eq_ignore_ascii_case(code))
{
tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
true
} else {
false
}
}
};
retry
}
Some(SdkError::TimeoutError(_err)) => true,
_ => false,
};
*should_retry = err_should_retry;
}
_ => unreachable!(),
}
ObjectError::from(inner)
}
#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
use crate::object::prefix::s3::{get_object_prefix, NUM_BUCKET_PREFIXES};
fn get_hash_of_object(obj_id: u64) -> u32 {
let crc_hash = crc32fast::hash(&obj_id.to_be_bytes());
crc_hash % NUM_BUCKET_PREFIXES
}
#[tokio::test]
async fn test_get_object_prefix() {
for obj_id in 0..99999 {
let hash = get_hash_of_object(obj_id);
let prefix = get_object_prefix(obj_id);
assert_eq!(format!("{}/", hash), prefix);
}
let obj_prefix = String::default();
let path = format!("{}/{}{}.data", "hummock_001", obj_prefix, 101);
assert_eq!("hummock_001/101.data", path);
}
}