risingwave_meta/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::error::code::PostgresErrorCode;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::id::JobId;
18use risingwave_common::secret::SecretError;
19use risingwave_common::session_config::SessionConfigError;
20use risingwave_connector::error::ConnectorError;
21use risingwave_connector::sink::SinkError;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::PbFieldNotFound;
24use risingwave_pb::id::FragmentId;
25use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
26
27use crate::hummock::error::Error as HummockError;
28use crate::model::MetadataModelError;
29
30pub type MetaResult<T> = std::result::Result<T, MetaError>;
31
32// TODO(error-handling): provide more concrete error code for different object types.
33#[derive(
34    thiserror::Error,
35    thiserror_ext::ReportDebug,
36    thiserror_ext::Arc,
37    thiserror_ext::Construct,
38    thiserror_ext::Macro,
39)]
40#[thiserror_ext(
41    newtype(name = MetaError, backtrace, extra_provide = Self::provide_postgres_error_code),
42    macro(path = "crate::error")
43)]
44pub enum MetaErrorInner {
45    #[error("MetadataModel error: {0}")]
46    MetadataModelError(
47        #[from]
48        #[backtrace]
49        MetadataModelError,
50    ),
51
52    #[error("Hummock error: {0}")]
53    HummockError(
54        #[from]
55        #[backtrace]
56        HummockError,
57    ),
58
59    #[error(transparent)]
60    RpcError(
61        #[from]
62        #[backtrace]
63        RpcError,
64    ),
65
66    #[error("PermissionDenied: {0}")]
67    PermissionDenied(String),
68
69    #[error("Invalid worker: {0}, {1}")]
70    InvalidWorker(WorkerId, String),
71
72    #[error("Invalid parameter: {0}")]
73    InvalidParameter(#[message] String),
74
75    // Used for catalog errors.
76    #[error("{0} id not found: {1}")]
77    #[construct(skip)]
78    CatalogIdNotFound(&'static str, String),
79
80    #[error("table_fragment not exist: id={0}")]
81    FragmentNotFound(FragmentId),
82
83    #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
84    Duplicated(
85        &'static str,
86        String,
87        // if under creation, take streaming job id, otherwise None
88        Option<JobId>,
89    ),
90
91    #[error("Service unavailable: {0}")]
92    Unavailable(#[message] String),
93
94    #[error("Election failed: {0}")]
95    Election(#[source] BoxedError),
96
97    #[error("Cancelled: {0}")]
98    Cancelled(String),
99
100    #[error("SystemParams error: {0}")]
101    SystemParams(String),
102
103    #[error("SessionParams error: {0}")]
104    SessionConfig(
105        #[from]
106        #[backtrace]
107        SessionConfigError,
108    ),
109
110    #[error(transparent)]
111    Connector(
112        #[from]
113        #[backtrace]
114        ConnectorError,
115    ),
116
117    #[error("Sink error: {0}")]
118    Sink(
119        #[from]
120        #[backtrace]
121        SinkError,
122    ),
123
124    #[error(transparent)]
125    Internal(
126        #[from]
127        #[backtrace]
128        anyhow::Error,
129    ),
130
131    // Indicates that recovery was triggered manually.
132    #[error("adhoc recovery triggered")]
133    AdhocRecovery,
134
135    #[error("Integrity check failed")]
136    IntegrityCheckFailed,
137
138    #[error("{0} has been deprecated, please use {1} instead.")]
139    Deprecated(String, String),
140
141    #[error(transparent)]
142    NotImplemented(#[from] NotImplemented),
143
144    #[error("Secret error: {0}")]
145    SecretError(
146        #[from]
147        #[backtrace]
148        SecretError,
149    ),
150}
151
152impl MetaError {
153    /// Provide the Postgres error code for the error.
154    fn provide_postgres_error_code(&self, request: &mut std::error::Request<'_>) {
155        match self.inner() {
156            MetaErrorInner::CatalogIdNotFound { .. } => {
157                request.provide_value(PostgresErrorCode::UndefinedObject);
158            }
159            MetaErrorInner::Duplicated { .. } => {
160                request.provide_value(PostgresErrorCode::DuplicateObject);
161            }
162            _ => {}
163        };
164    }
165}
166
167impl MetaError {
168    pub fn is_invalid_worker(&self) -> bool {
169        matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
170    }
171
172    pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
173        MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
174    }
175
176    pub fn is_fragment_not_found(&self) -> bool {
177        matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
178    }
179
180    pub fn is_cancelled(&self) -> bool {
181        matches!(self.inner(), MetaErrorInner::Cancelled(..))
182    }
183
184    pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
185        MetaErrorInner::Duplicated(relation, name.into(), None).into()
186    }
187
188    pub fn catalog_under_creation<T: Into<String>>(
189        relation: &'static str,
190        name: T,
191        job_id: JobId,
192    ) -> Self {
193        MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
194    }
195}
196
197impl From<MetaError> for tonic::Status {
198    fn from(err: MetaError) -> Self {
199        use tonic::Code;
200
201        let code = match err.inner() {
202            MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
203            MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
204            MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
205            MetaErrorInner::Unavailable(_) => Code::Unavailable,
206            MetaErrorInner::Cancelled(_) => Code::Cancelled,
207            MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
208            _ => Code::Internal,
209        };
210
211        err.to_status(code, "meta")
212    }
213}
214
215impl From<PbFieldNotFound> for MetaError {
216    fn from(e: PbFieldNotFound) -> Self {
217        MetadataModelError::from(e).into()
218    }
219}
220
221impl From<MetaErrorInner> for SinkError {
222    fn from(e: MetaErrorInner) -> Self {
223        SinkError::Coordinator(e.into())
224    }
225}
226
227impl From<MetaError> for SinkError {
228    fn from(e: MetaError) -> Self {
229        SinkError::Coordinator(e.into())
230    }
231}