risingwave_meta/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::error::{BoxedError, NotImplemented};
16use risingwave_common::secret::SecretError;
17use risingwave_common::session_config::SessionConfigError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::sink::SinkError;
20use risingwave_meta_model::WorkerId;
21use risingwave_pb::PbFieldNotFound;
22use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
23
24use crate::hummock::error::Error as HummockError;
25use crate::model::MetadataModelError;
26
27pub type MetaResult<T> = std::result::Result<T, MetaError>;
28
29#[derive(
30    thiserror::Error,
31    thiserror_ext::ReportDebug,
32    thiserror_ext::Arc,
33    thiserror_ext::Construct,
34    thiserror_ext::Macro,
35)]
36#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
37pub enum MetaErrorInner {
38    #[error("MetadataModel error: {0}")]
39    MetadataModelError(
40        #[from]
41        #[backtrace]
42        MetadataModelError,
43    ),
44
45    #[error("Hummock error: {0}")]
46    HummockError(
47        #[from]
48        #[backtrace]
49        HummockError,
50    ),
51
52    #[error(transparent)]
53    RpcError(
54        #[from]
55        #[backtrace]
56        RpcError,
57    ),
58
59    #[error("PermissionDenied: {0}")]
60    PermissionDenied(String),
61
62    #[error("Invalid worker: {0}, {1}")]
63    InvalidWorker(WorkerId, String),
64
65    #[error("Invalid parameter: {0}")]
66    InvalidParameter(#[message] String),
67
68    // Used for catalog errors.
69    #[error("{0} id not found: {1}")]
70    #[construct(skip)]
71    CatalogIdNotFound(&'static str, String),
72
73    #[error("table_fragment not exist: id={0}")]
74    FragmentNotFound(u32),
75
76    #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).then_some(" but under creation").unwrap_or(""))]
77    Duplicated(
78        &'static str,
79        String,
80        // whether the object is under creation
81        bool,
82    ),
83
84    #[error("Service unavailable: {0}")]
85    Unavailable(#[message] String),
86
87    #[error("Election failed: {0}")]
88    Election(#[source] BoxedError),
89
90    #[error("Cancelled: {0}")]
91    Cancelled(String),
92
93    #[error("SystemParams error: {0}")]
94    SystemParams(String),
95
96    #[error("SessionParams error: {0}")]
97    SessionConfig(
98        #[from]
99        #[backtrace]
100        SessionConfigError,
101    ),
102
103    #[error(transparent)]
104    Connector(
105        #[from]
106        #[backtrace]
107        ConnectorError,
108    ),
109
110    #[error("Sink error: {0}")]
111    Sink(
112        #[from]
113        #[backtrace]
114        SinkError,
115    ),
116
117    #[error(transparent)]
118    Internal(
119        #[from]
120        #[backtrace]
121        anyhow::Error,
122    ),
123
124    // Indicates that recovery was triggered manually.
125    #[error("adhoc recovery triggered")]
126    AdhocRecovery,
127
128    #[error("Integrity check failed")]
129    IntegrityCheckFailed,
130
131    #[error("{0} has been deprecated, please use {1} instead.")]
132    Deprecated(String, String),
133
134    #[error(transparent)]
135    NotImplemented(#[from] NotImplemented),
136
137    #[error("Secret error: {0}")]
138    SecretError(
139        #[from]
140        #[backtrace]
141        SecretError,
142    ),
143}
144
145impl MetaError {
146    pub fn is_invalid_worker(&self) -> bool {
147        matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
148    }
149
150    pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
151        MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
152    }
153
154    pub fn is_fragment_not_found(&self) -> bool {
155        matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
156    }
157
158    pub fn is_cancelled(&self) -> bool {
159        matches!(self.inner(), MetaErrorInner::Cancelled(..))
160    }
161
162    pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
163        MetaErrorInner::Duplicated(relation, name.into(), false).into()
164    }
165
166    pub fn catalog_under_creation<T: Into<String>>(relation: &'static str, name: T) -> Self {
167        MetaErrorInner::Duplicated(relation, name.into(), true).into()
168    }
169}
170
171impl From<MetaError> for tonic::Status {
172    fn from(err: MetaError) -> Self {
173        use tonic::Code;
174
175        let code = match err.inner() {
176            MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
177            MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
178            MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
179            MetaErrorInner::Unavailable(_) => Code::Unavailable,
180            MetaErrorInner::Cancelled(_) => Code::Cancelled,
181            MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
182            _ => Code::Internal,
183        };
184
185        err.to_status(code, "meta")
186    }
187}
188
189impl From<PbFieldNotFound> for MetaError {
190    fn from(e: PbFieldNotFound) -> Self {
191        MetadataModelError::from(e).into()
192    }
193}
194
195impl From<MetaErrorInner> for SinkError {
196    fn from(e: MetaErrorInner) -> Self {
197        SinkError::Coordinator(e.into())
198    }
199}
200
201impl From<MetaError> for SinkError {
202    fn from(e: MetaError) -> Self {
203        SinkError::Coordinator(e.into())
204    }
205}