1use risingwave_common::error::code::PostgresErrorCode;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::secret::SecretError;
18use risingwave_common::session_config::SessionConfigError;
19use risingwave_connector::error::ConnectorError;
20use risingwave_connector::sink::SinkError;
21use risingwave_meta_model::{ObjectId, WorkerId};
22use risingwave_pb::PbFieldNotFound;
23use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
24
25use crate::hummock::error::Error as HummockError;
26use crate::model::MetadataModelError;
27
28pub type MetaResult<T> = std::result::Result<T, MetaError>;
29
30#[derive(
32 thiserror::Error,
33 thiserror_ext::ReportDebug,
34 thiserror_ext::Arc,
35 thiserror_ext::Construct,
36 thiserror_ext::Macro,
37)]
38#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
39pub enum MetaErrorInner {
40 #[error("MetadataModel error: {0}")]
41 MetadataModelError(
42 #[from]
43 #[backtrace]
44 MetadataModelError,
45 ),
46
47 #[error("Hummock error: {0}")]
48 HummockError(
49 #[from]
50 #[backtrace]
51 HummockError,
52 ),
53
54 #[error(transparent)]
55 RpcError(
56 #[from]
57 #[backtrace]
58 RpcError,
59 ),
60
61 #[error("PermissionDenied: {0}")]
62 PermissionDenied(String),
63
64 #[error("Invalid worker: {0}, {1}")]
65 InvalidWorker(WorkerId, String),
66
67 #[error("Invalid parameter: {0}")]
68 InvalidParameter(#[message] String),
69
70 #[provide(PostgresErrorCode => PostgresErrorCode::UndefinedObject)]
72 #[error("{0} id not found: {1}")]
73 #[construct(skip)]
74 CatalogIdNotFound(&'static str, String),
75
76 #[error("table_fragment not exist: id={0}")]
77 FragmentNotFound(u32),
78
79 #[provide(PostgresErrorCode => PostgresErrorCode::DuplicateObject)]
80 #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
81 Duplicated(
82 &'static str,
83 String,
84 Option<ObjectId>,
86 ),
87
88 #[error("Service unavailable: {0}")]
89 Unavailable(#[message] String),
90
91 #[error("Election failed: {0}")]
92 Election(#[source] BoxedError),
93
94 #[error("Cancelled: {0}")]
95 Cancelled(String),
96
97 #[error("SystemParams error: {0}")]
98 SystemParams(String),
99
100 #[error("SessionParams error: {0}")]
101 SessionConfig(
102 #[from]
103 #[backtrace]
104 SessionConfigError,
105 ),
106
107 #[error(transparent)]
108 Connector(
109 #[from]
110 #[backtrace]
111 ConnectorError,
112 ),
113
114 #[error("Sink error: {0}")]
115 Sink(
116 #[from]
117 #[backtrace]
118 SinkError,
119 ),
120
121 #[error(transparent)]
122 Internal(
123 #[from]
124 #[backtrace]
125 anyhow::Error,
126 ),
127
128 #[error("adhoc recovery triggered")]
130 AdhocRecovery,
131
132 #[error("Integrity check failed")]
133 IntegrityCheckFailed,
134
135 #[error("{0} has been deprecated, please use {1} instead.")]
136 Deprecated(String, String),
137
138 #[error(transparent)]
139 NotImplemented(#[from] NotImplemented),
140
141 #[error("Secret error: {0}")]
142 SecretError(
143 #[from]
144 #[backtrace]
145 SecretError,
146 ),
147}
148
149impl MetaError {
150 pub fn is_invalid_worker(&self) -> bool {
151 matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
152 }
153
154 pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
155 MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
156 }
157
158 pub fn is_fragment_not_found(&self) -> bool {
159 matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
160 }
161
162 pub fn is_cancelled(&self) -> bool {
163 matches!(self.inner(), MetaErrorInner::Cancelled(..))
164 }
165
166 pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
167 MetaErrorInner::Duplicated(relation, name.into(), None).into()
168 }
169
170 pub fn catalog_under_creation<T: Into<String>>(
171 relation: &'static str,
172 name: T,
173 job_id: ObjectId,
174 ) -> Self {
175 MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
176 }
177}
178
179impl From<MetaError> for tonic::Status {
180 fn from(err: MetaError) -> Self {
181 use tonic::Code;
182
183 let code = match err.inner() {
184 MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
185 MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
186 MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
187 MetaErrorInner::Unavailable(_) => Code::Unavailable,
188 MetaErrorInner::Cancelled(_) => Code::Cancelled,
189 MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
190 _ => Code::Internal,
191 };
192
193 err.to_status(code, "meta")
194 }
195}
196
197impl From<PbFieldNotFound> for MetaError {
198 fn from(e: PbFieldNotFound) -> Self {
199 MetadataModelError::from(e).into()
200 }
201}
202
203impl From<MetaErrorInner> for SinkError {
204 fn from(e: MetaErrorInner) -> Self {
205 SinkError::Coordinator(e.into())
206 }
207}
208
209impl From<MetaError> for SinkError {
210 fn from(e: MetaError) -> Self {
211 SinkError::Coordinator(e.into())
212 }
213}