risingwave_object_store/object/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16
17use aws_sdk_s3::operation::get_object::GetObjectError;
18use aws_sdk_s3::operation::head_object::HeadObjectError;
19use aws_sdk_s3::primitives::ByteStreamError;
20use aws_smithy_types::body::SdkBody;
21use risingwave_common::error::BoxedError;
22use thiserror::Error;
23use thiserror_ext::AsReport;
24use tokio::sync::oneshot::error::RecvError;
25
26#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Construct)]
27#[thiserror_ext(newtype(name = ObjectError, backtrace))]
28pub enum ObjectErrorInner {
29    #[error("s3 error: {inner}")]
30    S3 {
31        // TODO: remove this after switch s3 backend to opendal
32        should_retry: bool,
33        #[source]
34        inner: BoxedError,
35    },
36    #[error("disk error: {msg}")]
37    Disk {
38        msg: String,
39        #[source]
40        inner: io::Error,
41    },
42    #[error(transparent)]
43    Opendal(#[from] opendal::Error),
44    #[error(transparent)]
45    Mem(#[from] crate::object::mem::Error),
46    #[error("Internal error: {0}")]
47    #[construct(skip)]
48    Internal(String),
49    #[cfg(madsim)]
50    #[error(transparent)]
51    Sim(#[from] crate::object::sim::SimError),
52
53    #[error("Timeout error: {0}")]
54    Timeout(String),
55}
56
57impl ObjectError {
58    pub fn internal(msg: impl ToString) -> Self {
59        ObjectErrorInner::Internal(msg.to_string()).into()
60    }
61
62    /// Tells whether the error indicates the target object is not found.
63    pub fn is_object_not_found_error(&self) -> bool {
64        match self.inner() {
65            ObjectErrorInner::S3 {
66                inner,
67                should_retry: _,
68            } => {
69                if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) =
70                    inner.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
71                        GetObjectError,
72                        aws_smithy_runtime_api::http::Response<SdkBody>,
73                    >>()
74                {
75                    return matches!(err.err(), GetObjectError::NoSuchKey(_));
76                }
77                if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) =
78                    inner.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
79                        HeadObjectError,
80                        aws_smithy_runtime_api::http::Response<SdkBody>,
81                    >>()
82                {
83                    return matches!(err.err(), HeadObjectError::NotFound(_));
84                }
85            }
86            ObjectErrorInner::Opendal(e) => {
87                return matches!(e.kind(), opendal::ErrorKind::NotFound);
88            }
89            ObjectErrorInner::Disk { msg: _msg, inner } => {
90                return matches!(inner.kind(), io::ErrorKind::NotFound);
91            }
92            ObjectErrorInner::Mem(e) => {
93                return e.is_object_not_found_error();
94            }
95            #[cfg(madsim)]
96            ObjectErrorInner::Sim(e) => {
97                return e.is_object_not_found_error();
98            }
99            _ => {}
100        };
101        false
102    }
103
104    pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool {
105        match self.inner() {
106            ObjectErrorInner::S3 {
107                inner: _,
108                should_retry,
109            } => *should_retry,
110
111            ObjectErrorInner::Opendal(e) => {
112                e.is_temporary()
113                    || (retry_opendal_s3_unknown_error
114                        && e.kind() == opendal::ErrorKind::Unexpected)
115            }
116
117            ObjectErrorInner::Timeout(_) => true,
118
119            _ => false,
120        }
121    }
122}
123
124impl<E, R> From<aws_smithy_runtime_api::client::result::SdkError<E, R>> for ObjectError
125where
126    E: std::error::Error + Sync + Send + 'static,
127    R: Send + Sync + 'static + std::fmt::Debug,
128{
129    fn from(e: aws_smithy_runtime_api::client::result::SdkError<E, R>) -> Self {
130        ObjectErrorInner::S3 {
131            inner: e.into(),
132            should_retry: false,
133        }
134        .into()
135    }
136}
137
138impl From<RecvError> for ObjectError {
139    fn from(e: RecvError) -> Self {
140        ObjectErrorInner::Internal(e.to_report_string()).into()
141    }
142}
143
144impl From<ByteStreamError> for ObjectError {
145    fn from(e: ByteStreamError) -> Self {
146        ObjectErrorInner::S3 {
147            inner: e.into(),
148            should_retry: true,
149        }
150        .into()
151    }
152}
153
154#[cfg(madsim)]
155impl From<std::io::Error> for ObjectError {
156    fn from(e: std::io::Error) -> Self {
157        ObjectErrorInner::Internal(e.to_string()).into()
158    }
159}
160
161pub type ObjectResult<T> = std::result::Result<T, ObjectError>;