risingwave_frontend/
error.rs1use risingwave_batch::error::BatchError;
16use risingwave_common::array::ArrayError;
17use risingwave_common::error::{BoxedError, NoFunction, NotImplemented};
18use risingwave_common::secret::SecretError;
19use risingwave_common::session_config::SessionConfigError;
20use risingwave_common::util::value_encoding::error::ValueEncodingError;
21use risingwave_connector::error::ConnectorError;
22use risingwave_connector::sink::SinkError;
23use risingwave_expr::ExprError;
24use risingwave_pb::PbFieldNotFound;
25use risingwave_rpc_client::error::{RpcError, TonicStatusWrapper};
26use thiserror::Error;
27use thiserror_ext::AsReport;
28use tokio::task::JoinError;
29
30use crate::expr::CastError;
31
32#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)]
40#[thiserror_ext(newtype(name = RwError, backtrace), macro(path = "crate::error"))]
41pub enum ErrorCode {
42 #[error("internal error: {0}")]
43 InternalError(String),
44 #[error(transparent)]
46 Uncategorized(
47 #[from]
48 #[backtrace]
49 anyhow::Error,
50 ),
51 #[error("connector error: {0}")]
52 ConnectorError(
53 #[source]
54 #[backtrace]
55 BoxedError,
56 ),
57 #[error(transparent)]
58 NotImplemented(#[from] NotImplemented),
59 #[error("Not supported: {0}\nHINT: {1}")]
61 NotSupported(String, String),
62 #[error(transparent)]
63 NoFunction(#[from] NoFunction),
64 #[error(transparent)]
65 IoError(#[from] std::io::Error),
66 #[error("Storage error: {0}")]
67 StorageError(
68 #[backtrace]
69 #[source]
70 BoxedError,
71 ),
72 #[error("Expr error: {0}")]
73 ExprError(
74 #[source]
75 #[backtrace]
76 BoxedError,
77 ),
78 #[error("{0}")]
81 BatchError(
82 #[source]
83 #[backtrace]
84 BoxedError,
86 ),
87 #[error("Array error: {0}")]
88 ArrayError(
89 #[from]
90 #[backtrace]
91 ArrayError,
92 ),
93 #[error("Stream error: {0}")]
94 StreamError(
95 #[backtrace]
96 #[source]
97 BoxedError,
98 ),
99 #[error("{0}")]
102 RpcError(
103 #[source]
104 #[backtrace]
105 BoxedError,
107 ),
108 #[error("Bind error: {0}")]
111 BindError(#[message] String),
112 #[error("Failed to bind expression: {expr}: {error}")]
114 BindErrorRoot {
115 expr: String,
116 #[source]
117 #[backtrace]
118 error: BoxedError,
119 },
120 #[error(transparent)]
121 CastError(
122 #[from]
123 #[backtrace]
124 CastError,
125 ),
126 #[error("Catalog error: {0}")]
127 CatalogError(
128 #[source]
129 #[backtrace]
130 BoxedError,
131 ),
132 #[error("Protocol error: {0}")]
133 ProtocolError(String),
134 #[error("Scheduler error: {0}")]
135 SchedulerError(
136 #[source]
137 #[backtrace]
138 BoxedError,
139 ),
140 #[error("Task not found")]
141 TaskNotFound,
142 #[error("Session not found")]
143 SessionNotFound,
144 #[error("Item not found: {0}")]
145 ItemNotFound(String),
146 #[error("Invalid input syntax: {0}")]
147 InvalidInputSyntax(String),
148 #[error("Can not compare in memory: {0}")]
149 MemComparableError(#[from] memcomparable::Error),
150 #[error("Error while de/se values: {0}")]
151 ValueEncodingError(
152 #[from]
153 #[backtrace]
154 ValueEncodingError,
155 ),
156 #[error("Invalid value `{config_value}` for `{config_entry}`")]
157 InvalidConfigValue {
158 config_entry: String,
159 config_value: String,
160 },
161 #[error("Invalid Parameter Value: {0}")]
162 InvalidParameterValue(String),
163 #[error("Sink error: {0}")]
164 SinkError(
165 #[source]
166 #[backtrace]
167 BoxedError,
168 ),
169 #[error("Permission denied: {0}")]
170 PermissionDenied(String),
171 #[error("Failed to get/set session config: {0}")]
172 SessionConfig(
173 #[from]
174 #[backtrace]
175 SessionConfigError,
176 ),
177 #[error("Secret error: {0}")]
178 SecretError(
179 #[from]
180 #[backtrace]
181 SecretError,
182 ),
183 #[error("{0} has been deprecated, please use {1} instead.")]
184 Deprecated(String, String),
185}
186
187pub type Result<T> = std::result::Result<T, RwError>;
189
190impl From<TonicStatusWrapper> for RwError {
191 fn from(status: TonicStatusWrapper) -> Self {
192 use tonic::Code;
193
194 let message = status.inner().message();
195
196 match status.inner().code() {
198 Code::InvalidArgument => ErrorCode::InvalidParameterValue(message.to_owned()),
199 Code::NotFound | Code::AlreadyExists => ErrorCode::CatalogError(status.into()),
200 Code::PermissionDenied => ErrorCode::PermissionDenied(message.to_owned()),
201 Code::Cancelled => ErrorCode::SchedulerError(status.into()),
202 _ => ErrorCode::RpcError(status.into()),
203 }
204 .into()
205 }
206}
207
208impl From<RpcError> for RwError {
209 fn from(r: RpcError) -> Self {
210 match r {
211 RpcError::GrpcStatus(status) => TonicStatusWrapper::into(*status),
212 _ => ErrorCode::RpcError(r.into()).into(),
213 }
214 }
215}
216
217impl From<ExprError> for RwError {
218 fn from(s: ExprError) -> Self {
219 ErrorCode::ExprError(Box::new(s)).into()
220 }
221}
222
223impl From<SinkError> for RwError {
224 fn from(e: SinkError) -> Self {
225 ErrorCode::SinkError(Box::new(e)).into()
226 }
227}
228
229impl From<ConnectorError> for RwError {
230 fn from(e: ConnectorError) -> Self {
231 ErrorCode::ConnectorError(e.into()).into()
232 }
233}
234
235impl From<PbFieldNotFound> for RwError {
236 fn from(err: PbFieldNotFound) -> Self {
237 ErrorCode::InternalError(format!(
238 "Failed to decode prost: field not found `{}`",
239 err.0
240 ))
241 .into()
242 }
243}
244
245impl From<BatchError> for RwError {
246 fn from(s: BatchError) -> Self {
247 ErrorCode::BatchError(Box::new(s)).into()
248 }
249}
250
251impl From<JoinError> for RwError {
252 fn from(join_error: JoinError) -> Self {
253 ErrorCode::Uncategorized(join_error.into()).into()
254 }
255}
256
257impl From<BoxedError> for RwError {
259 fn from(e: BoxedError) -> Self {
260 let e = anyhow::__private::kind::BoxedKind::anyhow_kind(&e).new(e);
263 ErrorCode::Uncategorized(e).into()
264 }
265}
266
267impl From<risingwave_sqlparser::parser::ParserError> for ErrorCode {
268 fn from(e: risingwave_sqlparser::parser::ParserError) -> Self {
269 ErrorCode::InvalidInputSyntax(e.to_report_string())
270 }
271}