risingwave_object_store/object/
error.rs1use std::io;
16
17use aws_sdk_s3::operation::get_object::GetObjectError;
18use aws_sdk_s3::operation::head_object::HeadObjectError;
19use aws_sdk_s3::primitives::ByteStreamError;
20use aws_smithy_types::body::SdkBody;
21use risingwave_common::error::BoxedError;
22use thiserror::Error;
23use thiserror_ext::AsReport;
24use tokio::sync::oneshot::error::RecvError;
25
26#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Construct)]
27#[thiserror_ext(newtype(name = ObjectError, backtrace))]
28pub enum ObjectErrorInner {
29 #[error("s3 error: {inner}")]
30 S3 {
31 should_retry: bool,
33 #[source]
34 inner: BoxedError,
35 },
36 #[error("disk error: {msg}")]
37 Disk {
38 msg: String,
39 #[source]
40 inner: io::Error,
41 },
42 #[error(transparent)]
43 Opendal(#[from] opendal::Error),
44 #[error(transparent)]
45 Mem(#[from] crate::object::mem::Error),
46 #[error("Internal error: {0}")]
47 #[construct(skip)]
48 Internal(String),
49 #[cfg(madsim)]
50 #[error(transparent)]
51 Sim(#[from] crate::object::sim::SimError),
52
53 #[error("Timeout error: {0}")]
54 Timeout(String),
55}
56
57impl ObjectError {
58 pub fn internal(msg: impl ToString) -> Self {
59 ObjectErrorInner::Internal(msg.to_string()).into()
60 }
61
62 pub fn is_object_not_found_error(&self) -> bool {
64 match self.inner() {
65 ObjectErrorInner::S3 {
66 inner,
67 should_retry: _,
68 } => {
69 if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) =
70 inner.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
71 GetObjectError,
72 aws_smithy_runtime_api::http::Response<SdkBody>,
73 >>()
74 {
75 return matches!(err.err(), GetObjectError::NoSuchKey(_));
76 }
77 if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) =
78 inner.downcast_ref::<aws_smithy_runtime_api::client::result::SdkError<
79 HeadObjectError,
80 aws_smithy_runtime_api::http::Response<SdkBody>,
81 >>()
82 {
83 return matches!(err.err(), HeadObjectError::NotFound(_));
84 }
85 }
86 ObjectErrorInner::Opendal(e) => {
87 return matches!(e.kind(), opendal::ErrorKind::NotFound);
88 }
89 ObjectErrorInner::Disk { msg: _msg, inner } => {
90 return matches!(inner.kind(), io::ErrorKind::NotFound);
91 }
92 ObjectErrorInner::Mem(e) => {
93 return e.is_object_not_found_error();
94 }
95 #[cfg(madsim)]
96 ObjectErrorInner::Sim(e) => {
97 return e.is_object_not_found_error();
98 }
99 _ => {}
100 };
101 false
102 }
103
104 pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool {
105 match self.inner() {
106 ObjectErrorInner::S3 {
107 inner: _,
108 should_retry,
109 } => *should_retry,
110
111 ObjectErrorInner::Opendal(e) => {
112 e.is_temporary()
113 || (retry_opendal_s3_unknown_error
114 && e.kind() == opendal::ErrorKind::Unexpected)
115 }
116
117 ObjectErrorInner::Timeout(_) => true,
118
119 _ => false,
120 }
121 }
122}
123
124impl<E, R> From<aws_smithy_runtime_api::client::result::SdkError<E, R>> for ObjectError
125where
126 E: std::error::Error + Sync + Send + 'static,
127 R: Send + Sync + 'static + std::fmt::Debug,
128{
129 fn from(e: aws_smithy_runtime_api::client::result::SdkError<E, R>) -> Self {
130 ObjectErrorInner::S3 {
131 inner: e.into(),
132 should_retry: false,
133 }
134 .into()
135 }
136}
137
138impl From<RecvError> for ObjectError {
139 fn from(e: RecvError) -> Self {
140 ObjectErrorInner::Internal(e.to_report_string()).into()
141 }
142}
143
144impl From<ByteStreamError> for ObjectError {
145 fn from(e: ByteStreamError) -> Self {
146 ObjectErrorInner::S3 {
147 inner: e.into(),
148 should_retry: true,
149 }
150 .into()
151 }
152}
153
154#[cfg(madsim)]
155impl From<std::io::Error> for ObjectError {
156 fn from(e: std::io::Error) -> Self {
157 ObjectErrorInner::Internal(e.to_string()).into()
158 }
159}
160
161pub type ObjectResult<T> = std::result::Result<T, ObjectError>;