1use risingwave_common::error::code::PostgresErrorCode;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::id::JobId;
18use risingwave_common::secret::SecretError;
19use risingwave_common::session_config::SessionConfigError;
20use risingwave_connector::error::ConnectorError;
21use risingwave_connector::sink::SinkError;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::PbFieldNotFound;
24use risingwave_pb::id::FragmentId;
25use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
26
27use crate::hummock::error::Error as HummockError;
28use crate::model::MetadataModelError;
29
30pub type MetaResult<T> = std::result::Result<T, MetaError>;
31
32#[derive(
34 thiserror::Error,
35 thiserror_ext::ReportDebug,
36 thiserror_ext::Arc,
37 thiserror_ext::Construct,
38 thiserror_ext::Macro,
39)]
40#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
41pub enum MetaErrorInner {
42 #[error("MetadataModel error: {0}")]
43 MetadataModelError(
44 #[from]
45 #[backtrace]
46 MetadataModelError,
47 ),
48
49 #[error("Hummock error: {0}")]
50 HummockError(
51 #[from]
52 #[backtrace]
53 HummockError,
54 ),
55
56 #[error(transparent)]
57 RpcError(
58 #[from]
59 #[backtrace]
60 RpcError,
61 ),
62
63 #[error("PermissionDenied: {0}")]
64 PermissionDenied(String),
65
66 #[error("Invalid worker: {0}, {1}")]
67 InvalidWorker(WorkerId, String),
68
69 #[error("Invalid parameter: {0}")]
70 InvalidParameter(#[message] String),
71
72 #[provide(PostgresErrorCode => PostgresErrorCode::UndefinedObject)]
74 #[error("{0} id not found: {1}")]
75 #[construct(skip)]
76 CatalogIdNotFound(&'static str, String),
77
78 #[error("table_fragment not exist: id={0}")]
79 FragmentNotFound(FragmentId),
80
81 #[provide(PostgresErrorCode => PostgresErrorCode::DuplicateObject)]
82 #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
83 Duplicated(
84 &'static str,
85 String,
86 Option<JobId>,
88 ),
89
90 #[error("Service unavailable: {0}")]
91 Unavailable(#[message] String),
92
93 #[error("Election failed: {0}")]
94 Election(#[source] BoxedError),
95
96 #[error("Cancelled: {0}")]
97 Cancelled(String),
98
99 #[error("SystemParams error: {0}")]
100 SystemParams(String),
101
102 #[error("SessionParams error: {0}")]
103 SessionConfig(
104 #[from]
105 #[backtrace]
106 SessionConfigError,
107 ),
108
109 #[error(transparent)]
110 Connector(
111 #[from]
112 #[backtrace]
113 ConnectorError,
114 ),
115
116 #[error("Sink error: {0}")]
117 Sink(
118 #[from]
119 #[backtrace]
120 SinkError,
121 ),
122
123 #[error(transparent)]
124 Internal(
125 #[from]
126 #[backtrace]
127 anyhow::Error,
128 ),
129
130 #[error("adhoc recovery triggered")]
132 AdhocRecovery,
133
134 #[error("Integrity check failed")]
135 IntegrityCheckFailed,
136
137 #[error("{0} has been deprecated, please use {1} instead.")]
138 Deprecated(String, String),
139
140 #[error(transparent)]
141 NotImplemented(#[from] NotImplemented),
142
143 #[error("Secret error: {0}")]
144 SecretError(
145 #[from]
146 #[backtrace]
147 SecretError,
148 ),
149}
150
151impl MetaError {
152 pub fn is_invalid_worker(&self) -> bool {
153 matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
154 }
155
156 pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
157 MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
158 }
159
160 pub fn is_fragment_not_found(&self) -> bool {
161 matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
162 }
163
164 pub fn is_cancelled(&self) -> bool {
165 matches!(self.inner(), MetaErrorInner::Cancelled(..))
166 }
167
168 pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
169 MetaErrorInner::Duplicated(relation, name.into(), None).into()
170 }
171
172 pub fn catalog_under_creation<T: Into<String>>(
173 relation: &'static str,
174 name: T,
175 job_id: JobId,
176 ) -> Self {
177 MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
178 }
179}
180
181impl From<MetaError> for tonic::Status {
182 fn from(err: MetaError) -> Self {
183 use tonic::Code;
184
185 let code = match err.inner() {
186 MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
187 MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
188 MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
189 MetaErrorInner::Unavailable(_) => Code::Unavailable,
190 MetaErrorInner::Cancelled(_) => Code::Cancelled,
191 MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
192 _ => Code::Internal,
193 };
194
195 err.to_status(code, "meta")
196 }
197}
198
199impl From<PbFieldNotFound> for MetaError {
200 fn from(e: PbFieldNotFound) -> Self {
201 MetadataModelError::from(e).into()
202 }
203}
204
205impl From<MetaErrorInner> for SinkError {
206 fn from(e: MetaErrorInner) -> Self {
207 SinkError::Coordinator(e.into())
208 }
209}
210
211impl From<MetaError> for SinkError {
212 fn from(e: MetaError) -> Self {
213 SinkError::Coordinator(e.into())
214 }
215}