risingwave_frontend/
error.rs1use risingwave_batch::error::BatchError;
16use risingwave_common::array::ArrayError;
17use risingwave_common::error::{BoxedError, NoFunction, NotImplemented};
18use risingwave_common::secret::SecretError;
19use risingwave_common::session_config::SessionConfigError;
20use risingwave_common::util::value_encoding::error::ValueEncodingError;
21use risingwave_connector::error::ConnectorError;
22use risingwave_connector::sink::SinkError;
23use risingwave_expr::ExprError;
24use risingwave_pb::PbFieldNotFound;
25use risingwave_rpc_client::error::{RpcError, TonicStatusWrapper};
26use thiserror::Error;
27use tokio::task::JoinError;
28
29use crate::expr::CastError;
30
31#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)]
39#[thiserror_ext(newtype(name = RwError, backtrace), macro(path = "crate::error"))]
40pub enum ErrorCode {
41 #[error("internal error: {0}")]
42 InternalError(String),
43 #[error(transparent)]
45 Uncategorized(
46 #[from]
47 #[backtrace]
48 anyhow::Error,
49 ),
50 #[error("connector error: {0}")]
51 ConnectorError(
52 #[source]
53 #[backtrace]
54 BoxedError,
55 ),
56 #[error(transparent)]
57 NotImplemented(#[from] NotImplemented),
58 #[error("Not supported: {0}\nHINT: {1}")]
60 NotSupported(String, String),
61 #[error(transparent)]
62 NoFunction(#[from] NoFunction),
63 #[error(transparent)]
64 IoError(#[from] std::io::Error),
65 #[error("Storage error: {0}")]
66 StorageError(
67 #[backtrace]
68 #[source]
69 BoxedError,
70 ),
71 #[error("Expr error: {0}")]
72 ExprError(
73 #[source]
74 #[backtrace]
75 BoxedError,
76 ),
77 #[error("{0}")]
80 BatchError(
81 #[source]
82 #[backtrace]
83 BoxedError,
85 ),
86 #[error("Array error: {0}")]
87 ArrayError(
88 #[from]
89 #[backtrace]
90 ArrayError,
91 ),
92 #[error("Stream error: {0}")]
93 StreamError(
94 #[backtrace]
95 #[source]
96 BoxedError,
97 ),
98 #[error("{0}")]
101 RpcError(
102 #[source]
103 #[backtrace]
104 BoxedError,
106 ),
107 #[error("Bind error: {0}")]
110 BindError(#[message] String),
111 #[error("Failed to bind expression: {expr}: {error}")]
113 BindErrorRoot {
114 expr: String,
115 #[source]
116 #[backtrace]
117 error: BoxedError,
118 },
119 #[error(transparent)]
120 CastError(
121 #[from]
122 #[backtrace]
123 CastError,
124 ),
125 #[error("Catalog error: {0}")]
126 CatalogError(
127 #[source]
128 #[backtrace]
129 BoxedError,
130 ),
131 #[error("Protocol error: {0}")]
132 ProtocolError(String),
133 #[error("Scheduler error: {0}")]
134 SchedulerError(
135 #[source]
136 #[backtrace]
137 BoxedError,
138 ),
139 #[error("Task not found")]
140 TaskNotFound,
141 #[error("Session not found")]
142 SessionNotFound,
143 #[error("Item not found: {0}")]
144 ItemNotFound(String),
145 #[error("Invalid input syntax: {0}")]
146 InvalidInputSyntax(String),
147 #[error("Can not compare in memory: {0}")]
148 MemComparableError(#[from] memcomparable::Error),
149 #[error("Error while de/se values: {0}")]
150 ValueEncodingError(
151 #[from]
152 #[backtrace]
153 ValueEncodingError,
154 ),
155 #[error("Invalid value `{config_value}` for `{config_entry}`")]
156 InvalidConfigValue {
157 config_entry: String,
158 config_value: String,
159 },
160 #[error("Invalid Parameter Value: {0}")]
161 InvalidParameterValue(String),
162 #[error("Sink error: {0}")]
163 SinkError(
164 #[source]
165 #[backtrace]
166 BoxedError,
167 ),
168 #[error("Permission denied: {0}")]
169 PermissionDenied(String),
170 #[error("Failed to get/set session config: {0}")]
171 SessionConfig(
172 #[from]
173 #[backtrace]
174 SessionConfigError,
175 ),
176 #[error("Secret error: {0}")]
177 SecretError(
178 #[from]
179 #[backtrace]
180 SecretError,
181 ),
182 #[error("{0} has been deprecated, please use {1} instead.")]
183 Deprecated(String, String),
184}
185
186pub type Result<T> = std::result::Result<T, RwError>;
188
189impl From<TonicStatusWrapper> for RwError {
190 fn from(status: TonicStatusWrapper) -> Self {
191 use tonic::Code;
192
193 let message = status.inner().message();
194
195 match status.inner().code() {
197 Code::InvalidArgument => ErrorCode::InvalidParameterValue(message.to_owned()),
198 Code::NotFound | Code::AlreadyExists => ErrorCode::CatalogError(status.into()),
199 Code::PermissionDenied => ErrorCode::PermissionDenied(message.to_owned()),
200 Code::Cancelled => ErrorCode::SchedulerError(status.into()),
201 _ => ErrorCode::RpcError(status.into()),
202 }
203 .into()
204 }
205}
206
207impl From<RpcError> for RwError {
208 fn from(r: RpcError) -> Self {
209 match r {
210 RpcError::GrpcStatus(status) => TonicStatusWrapper::into(*status),
211 _ => ErrorCode::RpcError(r.into()).into(),
212 }
213 }
214}
215
216impl From<ExprError> for RwError {
217 fn from(s: ExprError) -> Self {
218 ErrorCode::ExprError(Box::new(s)).into()
219 }
220}
221
222impl From<SinkError> for RwError {
223 fn from(e: SinkError) -> Self {
224 ErrorCode::SinkError(Box::new(e)).into()
225 }
226}
227
228impl From<ConnectorError> for RwError {
229 fn from(e: ConnectorError) -> Self {
230 ErrorCode::ConnectorError(e.into()).into()
231 }
232}
233
234impl From<PbFieldNotFound> for RwError {
235 fn from(err: PbFieldNotFound) -> Self {
236 ErrorCode::InternalError(format!(
237 "Failed to decode prost: field not found `{}`",
238 err.0
239 ))
240 .into()
241 }
242}
243
244impl From<BatchError> for RwError {
245 fn from(s: BatchError) -> Self {
246 ErrorCode::BatchError(Box::new(s)).into()
247 }
248}
249
250impl From<JoinError> for RwError {
251 fn from(join_error: JoinError) -> Self {
252 ErrorCode::Uncategorized(join_error.into()).into()
253 }
254}
255
256impl From<BoxedError> for RwError {
258 fn from(e: BoxedError) -> Self {
259 let e = anyhow::__private::kind::BoxedKind::anyhow_kind(&e).new(e);
262 ErrorCode::Uncategorized(e).into()
263 }
264}