risingwave_meta/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::error::code::PostgresErrorCode;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::secret::SecretError;
18use risingwave_common::session_config::SessionConfigError;
19use risingwave_connector::error::ConnectorError;
20use risingwave_connector::sink::SinkError;
21use risingwave_meta_model::{ObjectId, WorkerId};
22use risingwave_pb::PbFieldNotFound;
23use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
24
25use crate::hummock::error::Error as HummockError;
26use crate::model::MetadataModelError;
27
28pub type MetaResult<T> = std::result::Result<T, MetaError>;
29
30// TODO(error-handling): provide more concrete error code for different object types.
31#[derive(
32    thiserror::Error,
33    thiserror_ext::ReportDebug,
34    thiserror_ext::Arc,
35    thiserror_ext::Construct,
36    thiserror_ext::Macro,
37)]
38#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
39pub enum MetaErrorInner {
40    #[error("MetadataModel error: {0}")]
41    MetadataModelError(
42        #[from]
43        #[backtrace]
44        MetadataModelError,
45    ),
46
47    #[error("Hummock error: {0}")]
48    HummockError(
49        #[from]
50        #[backtrace]
51        HummockError,
52    ),
53
54    #[error(transparent)]
55    RpcError(
56        #[from]
57        #[backtrace]
58        RpcError,
59    ),
60
61    #[error("PermissionDenied: {0}")]
62    PermissionDenied(String),
63
64    #[error("Invalid worker: {0}, {1}")]
65    InvalidWorker(WorkerId, String),
66
67    #[error("Invalid parameter: {0}")]
68    InvalidParameter(#[message] String),
69
70    // Used for catalog errors.
71    #[provide(PostgresErrorCode => PostgresErrorCode::UndefinedObject)]
72    #[error("{0} id not found: {1}")]
73    #[construct(skip)]
74    CatalogIdNotFound(&'static str, String),
75
76    #[error("table_fragment not exist: id={0}")]
77    FragmentNotFound(u32),
78
79    #[provide(PostgresErrorCode => PostgresErrorCode::DuplicateObject)]
80    #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
81    Duplicated(
82        &'static str,
83        String,
84        // if under creation, take streaming job id, otherwise None
85        Option<ObjectId>,
86    ),
87
88    #[error("Service unavailable: {0}")]
89    Unavailable(#[message] String),
90
91    #[error("Election failed: {0}")]
92    Election(#[source] BoxedError),
93
94    #[error("Cancelled: {0}")]
95    Cancelled(String),
96
97    #[error("SystemParams error: {0}")]
98    SystemParams(String),
99
100    #[error("SessionParams error: {0}")]
101    SessionConfig(
102        #[from]
103        #[backtrace]
104        SessionConfigError,
105    ),
106
107    #[error(transparent)]
108    Connector(
109        #[from]
110        #[backtrace]
111        ConnectorError,
112    ),
113
114    #[error("Sink error: {0}")]
115    Sink(
116        #[from]
117        #[backtrace]
118        SinkError,
119    ),
120
121    #[error(transparent)]
122    Internal(
123        #[from]
124        #[backtrace]
125        anyhow::Error,
126    ),
127
128    // Indicates that recovery was triggered manually.
129    #[error("adhoc recovery triggered")]
130    AdhocRecovery,
131
132    #[error("Integrity check failed")]
133    IntegrityCheckFailed,
134
135    #[error("{0} has been deprecated, please use {1} instead.")]
136    Deprecated(String, String),
137
138    #[error(transparent)]
139    NotImplemented(#[from] NotImplemented),
140
141    #[error("Secret error: {0}")]
142    SecretError(
143        #[from]
144        #[backtrace]
145        SecretError,
146    ),
147}
148
149impl MetaError {
150    pub fn is_invalid_worker(&self) -> bool {
151        matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
152    }
153
154    pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
155        MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
156    }
157
158    pub fn is_fragment_not_found(&self) -> bool {
159        matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
160    }
161
162    pub fn is_cancelled(&self) -> bool {
163        matches!(self.inner(), MetaErrorInner::Cancelled(..))
164    }
165
166    pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
167        MetaErrorInner::Duplicated(relation, name.into(), None).into()
168    }
169
170    pub fn catalog_under_creation<T: Into<String>>(
171        relation: &'static str,
172        name: T,
173        job_id: ObjectId,
174    ) -> Self {
175        MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
176    }
177}
178
179impl From<MetaError> for tonic::Status {
180    fn from(err: MetaError) -> Self {
181        use tonic::Code;
182
183        let code = match err.inner() {
184            MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
185            MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
186            MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
187            MetaErrorInner::Unavailable(_) => Code::Unavailable,
188            MetaErrorInner::Cancelled(_) => Code::Cancelled,
189            MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
190            _ => Code::Internal,
191        };
192
193        err.to_status(code, "meta")
194    }
195}
196
197impl From<PbFieldNotFound> for MetaError {
198    fn from(e: PbFieldNotFound) -> Self {
199        MetadataModelError::from(e).into()
200    }
201}
202
203impl From<MetaErrorInner> for SinkError {
204    fn from(e: MetaErrorInner) -> Self {
205        SinkError::Coordinator(e.into())
206    }
207}
208
209impl From<MetaError> for SinkError {
210    fn from(e: MetaError) -> Self {
211        SinkError::Coordinator(e.into())
212    }
213}