risingwave_meta/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::error::{BoxedError, NotImplemented};
16use risingwave_common::secret::SecretError;
17use risingwave_common::session_config::SessionConfigError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::sink::SinkError;
20use risingwave_meta_model::WorkerId;
21use risingwave_pb::PbFieldNotFound;
22use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
23
24use crate::hummock::error::Error as HummockError;
25use crate::model::MetadataModelError;
26use crate::storage::MetaStoreError;
27
28pub type MetaResult<T> = std::result::Result<T, MetaError>;
29
30#[derive(
31    thiserror::Error,
32    thiserror_ext::ReportDebug,
33    thiserror_ext::Arc,
34    thiserror_ext::Construct,
35    thiserror_ext::Macro,
36)]
37#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
38pub enum MetaErrorInner {
39    #[error("MetaStore transaction error: {0}")]
40    TransactionError(
41        #[source]
42        #[backtrace]
43        MetaStoreError,
44    ),
45
46    #[error("MetadataModel error: {0}")]
47    MetadataModelError(
48        #[from]
49        #[backtrace]
50        MetadataModelError,
51    ),
52
53    #[error("Hummock error: {0}")]
54    HummockError(
55        #[from]
56        #[backtrace]
57        HummockError,
58    ),
59
60    #[error(transparent)]
61    RpcError(
62        #[from]
63        #[backtrace]
64        RpcError,
65    ),
66
67    #[error("PermissionDenied: {0}")]
68    PermissionDenied(String),
69
70    #[error("Invalid worker: {0}, {1}")]
71    InvalidWorker(WorkerId, String),
72
73    #[error("Invalid parameter: {0}")]
74    InvalidParameter(#[message] String),
75
76    // Used for catalog errors.
77    #[error("{0} id not found: {1}")]
78    #[construct(skip)]
79    CatalogIdNotFound(&'static str, String),
80
81    #[error("table_fragment not exist: id={0}")]
82    FragmentNotFound(u32),
83
84    #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).then_some(" but under creation").unwrap_or(""))]
85    Duplicated(
86        &'static str,
87        String,
88        // whether the object is under creation
89        bool,
90    ),
91
92    #[error("Service unavailable: {0}")]
93    Unavailable(#[message] String),
94
95    #[error("Election failed: {0}")]
96    Election(#[source] BoxedError),
97
98    #[error("Cancelled: {0}")]
99    Cancelled(String),
100
101    #[error("SystemParams error: {0}")]
102    SystemParams(String),
103
104    #[error("SessionParams error: {0}")]
105    SessionConfig(
106        #[from]
107        #[backtrace]
108        SessionConfigError,
109    ),
110
111    #[error(transparent)]
112    Connector(
113        #[from]
114        #[backtrace]
115        ConnectorError,
116    ),
117
118    #[error("Sink error: {0}")]
119    Sink(
120        #[from]
121        #[backtrace]
122        SinkError,
123    ),
124
125    #[error(transparent)]
126    Internal(
127        #[from]
128        #[backtrace]
129        anyhow::Error,
130    ),
131
132    // Indicates that recovery was triggered manually.
133    #[error("adhoc recovery triggered")]
134    AdhocRecovery,
135
136    #[error("Integrity check failed")]
137    IntegrityCheckFailed,
138
139    #[error("{0} has been deprecated, please use {1} instead.")]
140    Deprecated(String, String),
141
142    #[error(transparent)]
143    NotImplemented(#[from] NotImplemented),
144
145    #[error("Secret error: {0}")]
146    SecretError(
147        #[from]
148        #[backtrace]
149        SecretError,
150    ),
151}
152
153impl MetaError {
154    pub fn is_invalid_worker(&self) -> bool {
155        matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
156    }
157
158    pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
159        MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
160    }
161
162    pub fn is_fragment_not_found(&self) -> bool {
163        matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
164    }
165
166    pub fn is_cancelled(&self) -> bool {
167        matches!(self.inner(), MetaErrorInner::Cancelled(..))
168    }
169
170    pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
171        MetaErrorInner::Duplicated(relation, name.into(), false).into()
172    }
173
174    pub fn catalog_under_creation<T: Into<String>>(relation: &'static str, name: T) -> Self {
175        MetaErrorInner::Duplicated(relation, name.into(), true).into()
176    }
177}
178
179impl From<MetaError> for tonic::Status {
180    fn from(err: MetaError) -> Self {
181        use tonic::Code;
182
183        let code = match err.inner() {
184            MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
185            MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
186            MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
187            MetaErrorInner::Unavailable(_) => Code::Unavailable,
188            MetaErrorInner::Cancelled(_) => Code::Cancelled,
189            MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
190            _ => Code::Internal,
191        };
192
193        err.to_status(code, "meta")
194    }
195}
196
197impl From<PbFieldNotFound> for MetaError {
198    fn from(e: PbFieldNotFound) -> Self {
199        MetadataModelError::from(e).into()
200    }
201}
202
203impl From<MetaStoreError> for MetaError {
204    fn from(e: MetaStoreError) -> Self {
205        match e {
206            // `MetaStore::txn` method error.
207            MetaStoreError::TransactionAbort() => MetaErrorInner::TransactionError(e).into(),
208            _ => MetadataModelError::from(e).into(),
209        }
210    }
211}
212
213impl From<MetaErrorInner> for SinkError {
214    fn from(e: MetaErrorInner) -> Self {
215        SinkError::Coordinator(e.into())
216    }
217}
218
219impl From<MetaError> for SinkError {
220    fn from(e: MetaError) -> Self {
221        SinkError::Coordinator(e.into())
222    }
223}