1use risingwave_common::error::code::PostgresErrorCode;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::id::JobId;
18use risingwave_common::secret::SecretError;
19use risingwave_common::session_config::SessionConfigError;
20use risingwave_connector::error::ConnectorError;
21use risingwave_connector::sink::SinkError;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::PbFieldNotFound;
24use risingwave_pb::id::FragmentId;
25use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
26
27use crate::hummock::error::Error as HummockError;
28use crate::model::MetadataModelError;
29
30pub type MetaResult<T> = std::result::Result<T, MetaError>;
31
32#[derive(
34 thiserror::Error,
35 thiserror_ext::ReportDebug,
36 thiserror_ext::Arc,
37 thiserror_ext::Construct,
38 thiserror_ext::Macro,
39)]
40#[thiserror_ext(
41 newtype(name = MetaError, backtrace, extra_provide = Self::provide_postgres_error_code),
42 macro(path = "crate::error")
43)]
44pub enum MetaErrorInner {
45 #[error("MetadataModel error: {0}")]
46 MetadataModelError(
47 #[from]
48 #[backtrace]
49 MetadataModelError,
50 ),
51
52 #[error("Hummock error: {0}")]
53 HummockError(
54 #[from]
55 #[backtrace]
56 HummockError,
57 ),
58
59 #[error(transparent)]
60 RpcError(
61 #[from]
62 #[backtrace]
63 RpcError,
64 ),
65
66 #[error("PermissionDenied: {0}")]
67 PermissionDenied(String),
68
69 #[error("Invalid worker: {0}, {1}")]
70 InvalidWorker(WorkerId, String),
71
72 #[error("Invalid parameter: {0}")]
73 InvalidParameter(#[message] String),
74
75 #[error("{0} id not found: {1}")]
77 #[construct(skip)]
78 CatalogIdNotFound(&'static str, String),
79
80 #[error("table_fragment not exist: id={0}")]
81 FragmentNotFound(FragmentId),
82
83 #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
84 Duplicated(
85 &'static str,
86 String,
87 Option<JobId>,
89 ),
90
91 #[error("Service unavailable: {0}")]
92 Unavailable(#[message] String),
93
94 #[error("Election failed: {0}")]
95 Election(#[source] BoxedError),
96
97 #[error("Cancelled: {0}")]
98 Cancelled(String),
99
100 #[error("SystemParams error: {0}")]
101 SystemParams(String),
102
103 #[error("SessionParams error: {0}")]
104 SessionConfig(
105 #[from]
106 #[backtrace]
107 SessionConfigError,
108 ),
109
110 #[error(transparent)]
111 Connector(
112 #[from]
113 #[backtrace]
114 ConnectorError,
115 ),
116
117 #[error("Sink error: {0}")]
118 Sink(
119 #[from]
120 #[backtrace]
121 SinkError,
122 ),
123
124 #[error(transparent)]
125 Internal(
126 #[from]
127 #[backtrace]
128 anyhow::Error,
129 ),
130
131 #[error("adhoc recovery triggered")]
133 AdhocRecovery,
134
135 #[error("Integrity check failed")]
136 IntegrityCheckFailed,
137
138 #[error("{0} has been deprecated, please use {1} instead.")]
139 Deprecated(String, String),
140
141 #[error(transparent)]
142 NotImplemented(#[from] NotImplemented),
143
144 #[error("Secret error: {0}")]
145 SecretError(
146 #[from]
147 #[backtrace]
148 SecretError,
149 ),
150}
151
152impl MetaError {
153 fn provide_postgres_error_code(&self, request: &mut std::error::Request<'_>) {
155 match self.inner() {
156 MetaErrorInner::CatalogIdNotFound { .. } => {
157 request.provide_value(PostgresErrorCode::UndefinedObject);
158 }
159 MetaErrorInner::Duplicated { .. } => {
160 request.provide_value(PostgresErrorCode::DuplicateObject);
161 }
162 _ => {}
163 };
164 }
165}
166
167impl MetaError {
168 pub fn is_invalid_worker(&self) -> bool {
169 matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
170 }
171
172 pub fn is_catalog_id_not_found(&self, relation: &'static str) -> bool {
173 matches!(self.inner(), MetaErrorInner::CatalogIdNotFound(found_relation, _) if *found_relation == relation)
174 }
175
176 pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
177 MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
178 }
179
180 pub fn is_fragment_not_found(&self) -> bool {
181 matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
182 }
183
184 pub fn is_cancelled(&self) -> bool {
185 matches!(self.inner(), MetaErrorInner::Cancelled(..))
186 }
187
188 pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
189 MetaErrorInner::Duplicated(relation, name.into(), None).into()
190 }
191
192 pub fn catalog_under_creation<T: Into<String>>(
193 relation: &'static str,
194 name: T,
195 job_id: JobId,
196 ) -> Self {
197 MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
198 }
199}
200
201impl From<MetaError> for tonic::Status {
202 fn from(err: MetaError) -> Self {
203 use tonic::Code;
204
205 let code = match err.inner() {
206 MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
207 MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
208 MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
209 MetaErrorInner::Unavailable(_) => Code::Unavailable,
210 MetaErrorInner::Cancelled(_) => Code::Cancelled,
211 MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
212 _ => Code::Internal,
213 };
214
215 err.to_status(code, "meta")
216 }
217}
218
219impl From<PbFieldNotFound> for MetaError {
220 fn from(e: PbFieldNotFound) -> Self {
221 MetadataModelError::from(e).into()
222 }
223}
224
225impl From<MetaErrorInner> for SinkError {
226 fn from(e: MetaErrorInner) -> Self {
227 SinkError::Coordinator(e.into())
228 }
229}
230
231impl From<MetaError> for SinkError {
232 fn from(e: MetaError) -> Self {
233 SinkError::Coordinator(e.into())
234 }
235}