risingwave_meta/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::error::code::PostgresErrorCode;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::id::JobId;
18use risingwave_common::secret::SecretError;
19use risingwave_common::session_config::SessionConfigError;
20use risingwave_connector::error::ConnectorError;
21use risingwave_connector::sink::SinkError;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::PbFieldNotFound;
24use risingwave_pb::id::FragmentId;
25use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
26
27use crate::hummock::error::Error as HummockError;
28use crate::model::MetadataModelError;
29
30pub type MetaResult<T> = std::result::Result<T, MetaError>;
31
32// TODO(error-handling): provide more concrete error code for different object types.
33#[derive(
34    thiserror::Error,
35    thiserror_ext::ReportDebug,
36    thiserror_ext::Arc,
37    thiserror_ext::Construct,
38    thiserror_ext::Macro,
39)]
40#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))]
41pub enum MetaErrorInner {
42    #[error("MetadataModel error: {0}")]
43    MetadataModelError(
44        #[from]
45        #[backtrace]
46        MetadataModelError,
47    ),
48
49    #[error("Hummock error: {0}")]
50    HummockError(
51        #[from]
52        #[backtrace]
53        HummockError,
54    ),
55
56    #[error(transparent)]
57    RpcError(
58        #[from]
59        #[backtrace]
60        RpcError,
61    ),
62
63    #[error("PermissionDenied: {0}")]
64    PermissionDenied(String),
65
66    #[error("Invalid worker: {0}, {1}")]
67    InvalidWorker(WorkerId, String),
68
69    #[error("Invalid parameter: {0}")]
70    InvalidParameter(#[message] String),
71
72    // Used for catalog errors.
73    #[provide(PostgresErrorCode => PostgresErrorCode::UndefinedObject)]
74    #[error("{0} id not found: {1}")]
75    #[construct(skip)]
76    CatalogIdNotFound(&'static str, String),
77
78    #[error("table_fragment not exist: id={0}")]
79    FragmentNotFound(FragmentId),
80
81    #[provide(PostgresErrorCode => PostgresErrorCode::DuplicateObject)]
82    #[error("{0} with name {1} exists{under_creation}", under_creation = (.2).map(|_| " but under creation").unwrap_or(""))]
83    Duplicated(
84        &'static str,
85        String,
86        // if under creation, take streaming job id, otherwise None
87        Option<JobId>,
88    ),
89
90    #[error("Service unavailable: {0}")]
91    Unavailable(#[message] String),
92
93    #[error("Election failed: {0}")]
94    Election(#[source] BoxedError),
95
96    #[error("Cancelled: {0}")]
97    Cancelled(String),
98
99    #[error("SystemParams error: {0}")]
100    SystemParams(String),
101
102    #[error("SessionParams error: {0}")]
103    SessionConfig(
104        #[from]
105        #[backtrace]
106        SessionConfigError,
107    ),
108
109    #[error(transparent)]
110    Connector(
111        #[from]
112        #[backtrace]
113        ConnectorError,
114    ),
115
116    #[error("Sink error: {0}")]
117    Sink(
118        #[from]
119        #[backtrace]
120        SinkError,
121    ),
122
123    #[error(transparent)]
124    Internal(
125        #[from]
126        #[backtrace]
127        anyhow::Error,
128    ),
129
130    // Indicates that recovery was triggered manually.
131    #[error("adhoc recovery triggered")]
132    AdhocRecovery,
133
134    #[error("Integrity check failed")]
135    IntegrityCheckFailed,
136
137    #[error("{0} has been deprecated, please use {1} instead.")]
138    Deprecated(String, String),
139
140    #[error(transparent)]
141    NotImplemented(#[from] NotImplemented),
142
143    #[error("Secret error: {0}")]
144    SecretError(
145        #[from]
146        #[backtrace]
147        SecretError,
148    ),
149}
150
151impl MetaError {
152    pub fn is_invalid_worker(&self) -> bool {
153        matches!(self.inner(), MetaErrorInner::InvalidWorker(..))
154    }
155
156    pub fn catalog_id_not_found<T: ToString>(relation: &'static str, id: T) -> Self {
157        MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into()
158    }
159
160    pub fn is_fragment_not_found(&self) -> bool {
161        matches!(self.inner(), MetaErrorInner::FragmentNotFound(..))
162    }
163
164    pub fn is_cancelled(&self) -> bool {
165        matches!(self.inner(), MetaErrorInner::Cancelled(..))
166    }
167
168    pub fn catalog_duplicated<T: Into<String>>(relation: &'static str, name: T) -> Self {
169        MetaErrorInner::Duplicated(relation, name.into(), None).into()
170    }
171
172    pub fn catalog_under_creation<T: Into<String>>(
173        relation: &'static str,
174        name: T,
175        job_id: JobId,
176    ) -> Self {
177        MetaErrorInner::Duplicated(relation, name.into(), Some(job_id)).into()
178    }
179}
180
181impl From<MetaError> for tonic::Status {
182    fn from(err: MetaError) -> Self {
183        use tonic::Code;
184
185        let code = match err.inner() {
186            MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied,
187            MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound,
188            MetaErrorInner::Duplicated(_, _, _) => Code::AlreadyExists,
189            MetaErrorInner::Unavailable(_) => Code::Unavailable,
190            MetaErrorInner::Cancelled(_) => Code::Cancelled,
191            MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument,
192            _ => Code::Internal,
193        };
194
195        err.to_status(code, "meta")
196    }
197}
198
199impl From<PbFieldNotFound> for MetaError {
200    fn from(e: PbFieldNotFound) -> Self {
201        MetadataModelError::from(e).into()
202    }
203}
204
205impl From<MetaErrorInner> for SinkError {
206    fn from(e: MetaErrorInner) -> Self {
207        SinkError::Coordinator(e.into())
208    }
209}
210
211impl From<MetaError> for SinkError {
212    fn from(e: MetaError) -> Self {
213        SinkError::Coordinator(e.into())
214    }
215}