1use risingwave_common::error::{BoxedError, NotImplemented};
16use risingwave_common::secret::SecretError;
17use risingwave_common::session_config::SessionConfigError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::sink::SinkError;
20use risingwave_meta_model::WorkerId;
21use risingwave_pb::PbFieldNotFound;
22use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
23
24use crate::hummock::error::Error as HummockError;
25use crate::model::MetadataModelError;
26use crate::storage::MetaStoreError;
27
28pub type MetaResult<T> = std::result::Result<T, MetaError>;
29
30#[derive(
31 thiserror::Error,
32 thiserror_ext::ReportDebug,
33 thiserror_ext::Arc,
34 thiserror_ext::Construct,
35 thiserror_ext::Macro,
36)]
37#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
38pub enum MetaErrorInner {
39 #[error("MetaStore transaction error: {0}")]
40 TransactionError(
41 #[source]
42 #[backtrace]
43 MetaStoreError,
44 ),
45
46 #[error("MetadataModel error: {0}")]
47 MetadataModelError(
48 #[from]
49 #[backtrace]
50 MetadataModelError,
51 ),
52
53 #[error("Hummock error: {0}")]
54 HummockError(
55 #[from]
56 #[backtrace]
57 HummockError,
58 ),
59
60 #[error(transparent)]
61 RpcError(
62 #[from]
63 #[backtrace]
64 RpcError,
65 ),
66
67 #[error("PermissionDenied: {0}")]
68 PermissionDenied(String),
69
70 #[error("Invalid worker: {0}, {1}")]
71 InvalidWorker(WorkerId, String),
72
73 #[error("Invalid parameter: {0}")]
74 InvalidParameter(#[message] String),
75
76 #[error("{0} id not found: {1}")]
78 #[construct(skip)]
79 CatalogIdNotFound(&'static str, String),
80
81 #[error("table_fragment not exist: id={0}")]
82 FragmentNotFound(u32),
83
84 #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).then_some(" but under creation").unwrap_or(""))]
85 Duplicated(
86 &'static str,
87 String,
88 bool,
90 ),
91
92 #[error("Service unavailable: {0}")]
93 Unavailable(#[message] String),
94
95 #[error("Election failed: {0}")]
96 Election(#[source] BoxedError),
97
98 #[error("Cancelled: {0}")]
99 Cancelled(String),
100
101 #[error("SystemParams error: {0}")]
102 SystemParams(String),
103
104 #[error("SessionParams error: {0}")]
105 SessionConfig(
106 #[from]
107 #[backtrace]
108 SessionConfigError,
109 ),
110
111 #[error(transparent)]
112 Connector(
113 #[from]
114 #[backtrace]
115 ConnectorError,
116 ),
117
118 #[error("Sink error: {0}")]
119 Sink(
120 #[from]
121 #[backtrace]
122 SinkError,
123 ),
124
125 #[error(transparent)]
126 Internal(
127 #[from]
128 #[backtrace]
129 anyhow::Error,
130 ),
131
132 #[error("adhoc recovery triggered")]
134 AdhocRecovery,
135
136 #[error("Integrity check failed")]
137 IntegrityCheckFailed,
138
139 #[error("{0} has been deprecated, please use {1} instead.")]
140 Deprecated(String, String),
141
142 #[error(transparent)]
143 NotImplemented(#[from] NotImplemented),
144
145 #[error("Secret error: {0}")]
146 SecretError(
147 #[from]
148 #[backtrace]
149 SecretError,
150 ),
151}
152
153impl MetaError {
154 pub fn is_invalid_worker(&self) -> bool {
155 matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
156 }
157
158 pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
159 MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
160 }
161
162 pub fn is_fragment_not_found(&self) -> bool {
163 matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
164 }
165
166 pub fn is_cancelled(&self) -> bool {
167 matches!(self.inner(), MetaErrorInner::Cancelled(..))
168 }
169
170 pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
171 MetaErrorInner::Duplicated(relation, name.into(), false).into()
172 }
173
174 pub fn catalog_under_creation<T: Into<String>>(relation: &'static str, name: T) -> Self {
175 MetaErrorInner::Duplicated(relation, name.into(), true).into()
176 }
177}
178
179impl From<MetaError> for tonic::Status {
180 fn from(err: MetaError) -> Self {
181 use tonic::Code;
182
183 let code = match err.inner() {
184 MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
185 MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
186 MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
187 MetaErrorInner::Unavailable(_) => Code::Unavailable,
188 MetaErrorInner::Cancelled(_) => Code::Cancelled,
189 MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
190 _ => Code::Internal,
191 };
192
193 err.to_status(code, "meta")
194 }
195}
196
197impl From<PbFieldNotFound> for MetaError {
198 fn from(e: PbFieldNotFound) -> Self {
199 MetadataModelError::from(e).into()
200 }
201}
202
203impl From<MetaStoreError> for MetaError {
204 fn from(e: MetaStoreError) -> Self {
205 match e {
206 MetaStoreError::TransactionAbort() => MetaErrorInner::TransactionError(e).into(),
208 _ => MetadataModelError::from(e).into(),
209 }
210 }
211}
212
213impl From<MetaErrorInner> for SinkError {
214 fn from(e: MetaErrorInner) -> Self {
215 SinkError::Coordinator(e.into())
216 }
217}
218
219impl From<MetaError> for SinkError {
220 fn from(e: MetaError) -> Self {
221 SinkError::Coordinator(e.into())
222 }
223}