1use risingwave_common::error::{BoxedError, NotImplemented};
16use risingwave_common::secret::SecretError;
17use risingwave_common::session_config::SessionConfigError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::sink::SinkError;
20use risingwave_meta_model::{ObjectId, WorkerId};
21use risingwave_pb::PbFieldNotFound;
22use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
23
24use crate::hummock::error::Error as HummockError;
25use crate::model::MetadataModelError;
26
27pub type MetaResult<T> = std::result::Result<T, MetaError>;
28
29#[derive(
30 thiserror::Error,
31 thiserror_ext::ReportDebug,
32 thiserror_ext::Arc,
33 thiserror_ext::Construct,
34 thiserror_ext::Macro,
35)]
36#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
37pub enum MetaErrorInner {
38 #[error("MetadataModel error: {0}")]
39 MetadataModelError(
40 #[from]
41 #[backtrace]
42 MetadataModelError,
43 ),
44
45 #[error("Hummock error: {0}")]
46 HummockError(
47 #[from]
48 #[backtrace]
49 HummockError,
50 ),
51
52 #[error(transparent)]
53 RpcError(
54 #[from]
55 #[backtrace]
56 RpcError,
57 ),
58
59 #[error("PermissionDenied: {0}")]
60 PermissionDenied(String),
61
62 #[error("Invalid worker: {0}, {1}")]
63 InvalidWorker(WorkerId, String),
64
65 #[error("Invalid parameter: {0}")]
66 InvalidParameter(#[message] String),
67
68 #[error("{0} id not found: {1}")]
70 #[construct(skip)]
71 CatalogIdNotFound(&'static str, String),
72
73 #[error("table_fragment not exist: id={0}")]
74 FragmentNotFound(u32),
75
76 #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
77 Duplicated(
78 &'static str,
79 String,
80 Option<ObjectId>,
82 ),
83
84 #[error("Service unavailable: {0}")]
85 Unavailable(#[message] String),
86
87 #[error("Election failed: {0}")]
88 Election(#[source] BoxedError),
89
90 #[error("Cancelled: {0}")]
91 Cancelled(String),
92
93 #[error("SystemParams error: {0}")]
94 SystemParams(String),
95
96 #[error("SessionParams error: {0}")]
97 SessionConfig(
98 #[from]
99 #[backtrace]
100 SessionConfigError,
101 ),
102
103 #[error(transparent)]
104 Connector(
105 #[from]
106 #[backtrace]
107 ConnectorError,
108 ),
109
110 #[error("Sink error: {0}")]
111 Sink(
112 #[from]
113 #[backtrace]
114 SinkError,
115 ),
116
117 #[error(transparent)]
118 Internal(
119 #[from]
120 #[backtrace]
121 anyhow::Error,
122 ),
123
124 #[error("adhoc recovery triggered")]
126 AdhocRecovery,
127
128 #[error("Integrity check failed")]
129 IntegrityCheckFailed,
130
131 #[error("{0} has been deprecated, please use {1} instead.")]
132 Deprecated(String, String),
133
134 #[error(transparent)]
135 NotImplemented(#[from] NotImplemented),
136
137 #[error("Secret error: {0}")]
138 SecretError(
139 #[from]
140 #[backtrace]
141 SecretError,
142 ),
143}
144
145impl MetaError {
146 pub fn is_invalid_worker(&self) -> bool {
147 matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
148 }
149
150 pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
151 MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
152 }
153
154 pub fn is_fragment_not_found(&self) -> bool {
155 matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
156 }
157
158 pub fn is_cancelled(&self) -> bool {
159 matches!(self.inner(), MetaErrorInner::Cancelled(..))
160 }
161
162 pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
163 MetaErrorInner::Duplicated(relation, name.into(), None).into()
164 }
165
166 pub fn catalog_under_creation<T: Into<String>>(
167 relation: &'static str,
168 name: T,
169 job_id: ObjectId,
170 ) -> Self {
171 MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
172 }
173}
174
175impl From<MetaError> for tonic::Status {
176 fn from(err: MetaError) -> Self {
177 use tonic::Code;
178
179 let code = match err.inner() {
180 MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
181 MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
182 MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
183 MetaErrorInner::Unavailable(_) => Code::Unavailable,
184 MetaErrorInner::Cancelled(_) => Code::Cancelled,
185 MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
186 _ => Code::Internal,
187 };
188
189 err.to_status(code, "meta")
190 }
191}
192
193impl From<PbFieldNotFound> for MetaError {
194 fn from(e: PbFieldNotFound) -> Self {
195 MetadataModelError::from(e).into()
196 }
197}
198
199impl From<MetaErrorInner> for SinkError {
200 fn from(e: MetaErrorInner) -> Self {
201 SinkError::Coordinator(e.into())
202 }
203}
204
205impl From<MetaError> for SinkError {
206 fn from(e: MetaError) -> Self {
207 SinkError::Coordinator(e.into())
208 }
209}