risingwave_frontend/
error.rs1use risingwave_batch::error::BatchError;
16use risingwave_common::array::ArrayError;
17use risingwave_common::error::{BoxedError, NoFunction, NotImplemented};
18use risingwave_common::license::FeatureNotAvailable;
19use risingwave_common::secret::SecretError;
20use risingwave_common::session_config::SessionConfigError;
21use risingwave_common::util::value_encoding::error::ValueEncodingError;
22use risingwave_connector::error::ConnectorError;
23use risingwave_connector::sink::SinkError;
24use risingwave_expr::ExprError;
25use risingwave_pb::PbFieldNotFound;
26use risingwave_rpc_client::error::{RpcError, ToTonicStatus, TonicStatusWrapper};
27use thiserror::Error;
28use thiserror_ext::AsReport;
29use tokio::task::JoinError;
30
31use crate::expr::CastError;
32
33#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)]
41#[thiserror_ext(newtype(name = RwError, backtrace), macro(path = "crate::error"))]
42pub enum ErrorCode {
43 #[error("internal error: {0}")]
44 InternalError(String),
45 #[error(transparent)]
47 Uncategorized(
48 #[from]
49 #[backtrace]
50 anyhow::Error,
51 ),
52 #[error("connector error: {0}")]
53 ConnectorError(
54 #[source]
55 #[backtrace]
56 BoxedError,
57 ),
58 #[error(transparent)]
59 NotImplemented(#[from] NotImplemented),
60 #[error("Not supported: {0}\nHINT: {1}")]
62 NotSupported(String, String),
63 #[error(transparent)]
64 NoFunction(#[from] NoFunction),
65 #[error(transparent)]
66 IoError(#[from] std::io::Error),
67 #[error("Storage error: {0}")]
68 StorageError(
69 #[backtrace]
70 #[source]
71 BoxedError,
72 ),
73 #[error("Expr error: {0}")]
74 ExprError(
75 #[source]
76 #[backtrace]
77 BoxedError,
78 ),
79 #[error("{0}")]
82 BatchError(
83 #[source]
84 #[backtrace]
85 BoxedError,
87 ),
88 #[error("Array error: {0}")]
89 ArrayError(
90 #[from]
91 #[backtrace]
92 ArrayError,
93 ),
94 #[cfg(feature = "datafusion")]
95 #[error("DataFusion error: {0}")]
96 DataFusionError(
97 #[from]
98 #[backtrace]
99 datafusion_common::DataFusionError,
100 ),
101 #[error("Stream error: {0}")]
102 StreamError(
103 #[backtrace]
104 #[source]
105 BoxedError,
106 ),
107 #[error("{0}")]
110 RpcError(
111 #[source]
112 #[backtrace]
113 BoxedError,
115 ),
116 #[error("Bind error: {0}")]
119 BindError(#[message] String),
120 #[error("Failed to bind expression: {expr}: {error}")]
122 BindErrorRoot {
123 expr: String,
124 #[source]
125 #[backtrace]
126 error: BoxedError,
127 },
128 #[error(transparent)]
129 CastError(
130 #[from]
131 #[backtrace]
132 CastError,
133 ),
134 #[error("Catalog error: {0}")]
135 CatalogError(
136 #[source]
137 #[backtrace]
138 #[message]
139 BoxedError,
140 ),
141 #[error("Protocol error: {0}")]
142 ProtocolError(#[message] String),
143 #[error("Scheduler error: {0}")]
144 SchedulerError(
145 #[source]
146 #[backtrace]
147 BoxedError,
148 ),
149 #[error("Task not found")]
150 TaskNotFound,
151 #[error("Session not found")]
152 SessionNotFound,
153 #[error("Invalid reference: {0}")]
154 InvalidReference(String),
155 #[error("Item not found: {0}")]
156 ItemNotFound(String),
157 #[error("Duplicate Relation Name: {0}")]
158 DuplicateRelationName(String),
159 #[error("Invalid input syntax: {0}")]
160 InvalidInputSyntax(#[message] String),
161 #[error("Can not compare in memory: {0}")]
162 MemComparableError(#[from] memcomparable::Error),
163 #[error("Error while de/se values: {0}")]
164 ValueEncodingError(
165 #[from]
166 #[backtrace]
167 ValueEncodingError,
168 ),
169 #[error("Invalid value `{config_value}` for `{config_entry}`")]
170 InvalidConfigValue {
171 config_entry: String,
172 config_value: String,
173 },
174 #[error("Invalid Parameter Value: {0}")]
175 InvalidParameterValue(String),
176 #[error("Sink error: {0}")]
177 SinkError(
178 #[source]
179 #[backtrace]
180 BoxedError,
181 ),
182 #[error("Permission denied: {0}")]
183 PermissionDenied(#[message] String),
184 #[error("Failed to get/set session config: {0}")]
185 SessionConfig(
186 #[from]
187 #[backtrace]
188 SessionConfigError,
189 ),
190 #[error("Secret error: {0}")]
191 SecretError(
192 #[from]
193 #[backtrace]
194 SecretError,
195 ),
196 #[error("{0} has been deprecated, please use {1} instead.")]
197 Deprecated(String, String),
198 #[error(transparent)]
199 FeatureNotAvailable(
200 #[from]
201 #[backtrace]
202 FeatureNotAvailable,
203 ),
204}
205
206pub type Result<T> = std::result::Result<T, RwError>;
208
209impl From<TonicStatusWrapper> for RwError {
210 fn from(status: TonicStatusWrapper) -> Self {
211 use tonic::Code;
212
213 let message = status.inner().message();
214
215 match status.inner().code() {
217 Code::InvalidArgument => ErrorCode::InvalidParameterValue(message.to_owned()),
218 Code::NotFound | Code::AlreadyExists => ErrorCode::CatalogError(status.into()),
219 Code::PermissionDenied => ErrorCode::PermissionDenied(message.to_owned()),
220 Code::Cancelled => ErrorCode::SchedulerError(status.into()),
221 _ => ErrorCode::RpcError(status.into()),
222 }
223 .into()
224 }
225}
226
227impl From<RpcError> for RwError {
228 fn from(r: RpcError) -> Self {
229 match r {
230 RpcError::GrpcStatus(status) => TonicStatusWrapper::into(*status),
231 _ => ErrorCode::RpcError(r.into()).into(),
232 }
233 }
234}
235
236impl From<ExprError> for RwError {
237 fn from(s: ExprError) -> Self {
238 ErrorCode::ExprError(Box::new(s)).into()
239 }
240}
241
242impl From<SinkError> for RwError {
243 fn from(e: SinkError) -> Self {
244 ErrorCode::SinkError(Box::new(e)).into()
245 }
246}
247
248impl From<ConnectorError> for RwError {
249 fn from(e: ConnectorError) -> Self {
250 ErrorCode::ConnectorError(e.into()).into()
251 }
252}
253
254impl From<PbFieldNotFound> for RwError {
255 fn from(err: PbFieldNotFound) -> Self {
256 ErrorCode::InternalError(format!(
257 "Failed to decode prost: field not found `{}`",
258 err.0
259 ))
260 .into()
261 }
262}
263
264impl From<BatchError> for RwError {
265 fn from(s: BatchError) -> Self {
266 ErrorCode::BatchError(Box::new(s)).into()
267 }
268}
269
270impl From<JoinError> for RwError {
271 fn from(join_error: JoinError) -> Self {
272 ErrorCode::Uncategorized(join_error.into()).into()
273 }
274}
275
276impl From<BoxedError> for RwError {
278 fn from(e: BoxedError) -> Self {
279 let e = anyhow::__private::kind::BoxedKind::anyhow_kind(&e).new(e);
282 ErrorCode::Uncategorized(e).into()
283 }
284}
285
286impl From<risingwave_sqlparser::parser::ParserError> for ErrorCode {
287 fn from(e: risingwave_sqlparser::parser::ParserError) -> Self {
288 ErrorCode::InvalidInputSyntax(e.to_report_string())
289 }
290}
291
292impl From<RwError> for tonic::Status {
293 fn from(err: RwError) -> Self {
294 err.to_status(tonic::Code::Internal, "RwError")
295 }
296}