risingwave_frontend/
error.rs1use risingwave_batch::error::BatchError;
16use risingwave_common::array::ArrayError;
17use risingwave_common::error::{BoxedError, NoFunction, NotImplemented};
18use risingwave_common::license::FeatureNotAvailable;
19use risingwave_common::secret::SecretError;
20use risingwave_common::session_config::SessionConfigError;
21use risingwave_common::util::value_encoding::error::ValueEncodingError;
22use risingwave_connector::error::ConnectorError;
23use risingwave_connector::sink::SinkError;
24use risingwave_expr::ExprError;
25use risingwave_pb::PbFieldNotFound;
26use risingwave_rpc_client::error::{RpcError, ToTonicStatus, TonicStatusWrapper};
27use thiserror::Error;
28use thiserror_ext::AsReport;
29use tokio::task::JoinError;
30
31use crate::expr::CastError;
32
33#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)]
41#[thiserror_ext(newtype(name = RwError, backtrace), macro(path = "crate::error"))]
42pub enum ErrorCode {
43 #[error("internal error: {0}")]
44 InternalError(String),
45 #[error(transparent)]
47 Uncategorized(
48 #[from]
49 #[backtrace]
50 anyhow::Error,
51 ),
52 #[error("connector error: {0}")]
53 ConnectorError(
54 #[source]
55 #[backtrace]
56 BoxedError,
57 ),
58 #[error(transparent)]
59 NotImplemented(#[from] NotImplemented),
60 #[error("Not supported: {0}\nHINT: {1}")]
62 NotSupported(String, String),
63 #[error(transparent)]
64 NoFunction(#[from] NoFunction),
65 #[error(transparent)]
66 IoError(#[from] std::io::Error),
67 #[error("Storage error: {0}")]
68 StorageError(
69 #[backtrace]
70 #[source]
71 BoxedError,
72 ),
73 #[error("Expr error: {0}")]
74 ExprError(
75 #[source]
76 #[backtrace]
77 BoxedError,
78 ),
79 #[error("{0}")]
82 BatchError(
83 #[source]
84 #[backtrace]
85 BoxedError,
87 ),
88 #[error("Array error: {0}")]
89 ArrayError(
90 #[from]
91 #[backtrace]
92 ArrayError,
93 ),
94 #[cfg(feature = "datafusion")]
95 #[error("DataFusion error: {0}")]
96 DataFusionError(
97 #[from]
98 #[backtrace]
99 datafusion_common::DataFusionError,
100 ),
101 #[error("Stream error: {0}")]
102 StreamError(
103 #[backtrace]
104 #[source]
105 BoxedError,
106 ),
107 #[error("{0}")]
110 RpcError(
111 #[source]
112 #[backtrace]
113 BoxedError,
115 ),
116 #[error("Bind error: {0}")]
119 BindError(#[message] String),
120 #[error("Failed to bind expression: {expr}: {error}")]
122 BindErrorRoot {
123 expr: String,
124 #[source]
125 #[backtrace]
126 error: BoxedError,
127 },
128 #[error(transparent)]
129 CastError(
130 #[from]
131 #[backtrace]
132 CastError,
133 ),
134 #[error("Catalog error: {0}")]
135 CatalogError(
136 #[source]
137 #[backtrace]
138 #[message]
139 BoxedError,
140 ),
141 #[error("Protocol error: {0}")]
142 ProtocolError(#[message] String),
143 #[error("Scheduler error: {0}")]
144 SchedulerError(
145 #[source]
146 #[backtrace]
147 BoxedError,
148 ),
149 #[error("Task not found")]
150 TaskNotFound,
151 #[error("Session not found")]
152 SessionNotFound,
153 #[error("Item not found: {0}")]
154 ItemNotFound(String),
155 #[error("Invalid input syntax: {0}")]
156 InvalidInputSyntax(#[message] String),
157 #[error("Can not compare in memory: {0}")]
158 MemComparableError(#[from] memcomparable::Error),
159 #[error("Error while de/se values: {0}")]
160 ValueEncodingError(
161 #[from]
162 #[backtrace]
163 ValueEncodingError,
164 ),
165 #[error("Invalid value `{config_value}` for `{config_entry}`")]
166 InvalidConfigValue {
167 config_entry: String,
168 config_value: String,
169 },
170 #[error("Invalid Parameter Value: {0}")]
171 InvalidParameterValue(String),
172 #[error("Sink error: {0}")]
173 SinkError(
174 #[source]
175 #[backtrace]
176 BoxedError,
177 ),
178 #[error("Permission denied: {0}")]
179 PermissionDenied(#[message] String),
180 #[error("Failed to get/set session config: {0}")]
181 SessionConfig(
182 #[from]
183 #[backtrace]
184 SessionConfigError,
185 ),
186 #[error("Secret error: {0}")]
187 SecretError(
188 #[from]
189 #[backtrace]
190 SecretError,
191 ),
192 #[error("{0} has been deprecated, please use {1} instead.")]
193 Deprecated(String, String),
194 #[error(transparent)]
195 FeatureNotAvailable(
196 #[from]
197 #[backtrace]
198 FeatureNotAvailable,
199 ),
200}
201
202pub type Result<T> = std::result::Result<T, RwError>;
204
205impl From<TonicStatusWrapper> for RwError {
206 fn from(status: TonicStatusWrapper) -> Self {
207 use tonic::Code;
208
209 let message = status.inner().message();
210
211 match status.inner().code() {
213 Code::InvalidArgument => ErrorCode::InvalidParameterValue(message.to_owned()),
214 Code::NotFound | Code::AlreadyExists => ErrorCode::CatalogError(status.into()),
215 Code::PermissionDenied => ErrorCode::PermissionDenied(message.to_owned()),
216 Code::Cancelled => ErrorCode::SchedulerError(status.into()),
217 _ => ErrorCode::RpcError(status.into()),
218 }
219 .into()
220 }
221}
222
223impl From<RpcError> for RwError {
224 fn from(r: RpcError) -> Self {
225 match r {
226 RpcError::GrpcStatus(status) => TonicStatusWrapper::into(*status),
227 _ => ErrorCode::RpcError(r.into()).into(),
228 }
229 }
230}
231
232impl From<ExprError> for RwError {
233 fn from(s: ExprError) -> Self {
234 ErrorCode::ExprError(Box::new(s)).into()
235 }
236}
237
238impl From<SinkError> for RwError {
239 fn from(e: SinkError) -> Self {
240 ErrorCode::SinkError(Box::new(e)).into()
241 }
242}
243
244impl From<ConnectorError> for RwError {
245 fn from(e: ConnectorError) -> Self {
246 ErrorCode::ConnectorError(e.into()).into()
247 }
248}
249
250impl From<PbFieldNotFound> for RwError {
251 fn from(err: PbFieldNotFound) -> Self {
252 ErrorCode::InternalError(format!(
253 "Failed to decode prost: field not found `{}`",
254 err.0
255 ))
256 .into()
257 }
258}
259
260impl From<BatchError> for RwError {
261 fn from(s: BatchError) -> Self {
262 ErrorCode::BatchError(Box::new(s)).into()
263 }
264}
265
266impl From<JoinError> for RwError {
267 fn from(join_error: JoinError) -> Self {
268 ErrorCode::Uncategorized(join_error.into()).into()
269 }
270}
271
272impl From<BoxedError> for RwError {
274 fn from(e: BoxedError) -> Self {
275 let e = anyhow::__private::kind::BoxedKind::anyhow_kind(&e).new(e);
278 ErrorCode::Uncategorized(e).into()
279 }
280}
281
282impl From<risingwave_sqlparser::parser::ParserError> for ErrorCode {
283 fn from(e: risingwave_sqlparser::parser::ParserError) -> Self {
284 ErrorCode::InvalidInputSyntax(e.to_report_string())
285 }
286}
287
288impl From<RwError> for tonic::Status {
289 fn from(err: RwError) -> Self {
290 err.to_status(tonic::Code::Internal, "RwError")
291 }
292}