risingwave_frontend/
error.rs1use risingwave_batch::error::BatchError;
16use risingwave_common::array::ArrayError;
17use risingwave_common::error::{BoxedError, NoFunction, NotImplemented};
18use risingwave_common::license::FeatureNotAvailable;
19use risingwave_common::secret::SecretError;
20use risingwave_common::session_config::SessionConfigError;
21use risingwave_common::util::value_encoding::error::ValueEncodingError;
22use risingwave_connector::error::ConnectorError;
23use risingwave_connector::sink::SinkError;
24use risingwave_expr::ExprError;
25use risingwave_pb::PbFieldNotFound;
26use risingwave_rpc_client::error::{RpcError, ToTonicStatus, TonicStatusWrapper};
27use thiserror::Error;
28use thiserror_ext::AsReport;
29use tokio::task::JoinError;
30
31use crate::expr::CastError;
32
33#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Macro)]
41#[thiserror_ext(newtype(name = RwError, backtrace), macro(path = "crate::error"))]
42pub enum ErrorCode {
43 #[error("internal error: {0}")]
44 InternalError(String),
45 #[error(transparent)]
47 Uncategorized(
48 #[from]
49 #[backtrace]
50 anyhow::Error,
51 ),
52 #[error("connector error: {0}")]
53 ConnectorError(
54 #[source]
55 #[backtrace]
56 BoxedError,
57 ),
58 #[error(transparent)]
59 NotImplemented(#[from] NotImplemented),
60 #[error("Not supported: {0}\nHINT: {1}")]
62 NotSupported(String, String),
63 #[error(transparent)]
64 NoFunction(#[from] NoFunction),
65 #[error(transparent)]
66 IoError(#[from] std::io::Error),
67 #[error("Storage error: {0}")]
68 StorageError(
69 #[backtrace]
70 #[source]
71 BoxedError,
72 ),
73 #[error("Expr error: {0}")]
74 ExprError(
75 #[source]
76 #[backtrace]
77 BoxedError,
78 ),
79 #[error("{0}")]
82 BatchError(
83 #[source]
84 #[backtrace]
85 BoxedError,
87 ),
88 #[error("Array error: {0}")]
89 ArrayError(
90 #[from]
91 #[backtrace]
92 ArrayError,
93 ),
94 #[cfg(feature = "datafusion")]
95 #[error("DataFusion error: {0}")]
96 DataFusionError(
97 #[from]
98 #[backtrace]
99 datafusion_common::DataFusionError,
100 ),
101 #[error("Stream error: {0}")]
102 StreamError(
103 #[backtrace]
104 #[source]
105 BoxedError,
106 ),
107 #[error("{0}")]
110 RpcError(
111 #[source]
112 #[backtrace]
113 BoxedError,
115 ),
116 #[error("Bind error: {0}")]
119 BindError(#[message] String),
120 #[error("Failed to bind expression: {expr}: {error}")]
122 BindErrorRoot {
123 expr: String,
124 #[source]
125 #[backtrace]
126 error: BoxedError,
127 },
128 #[error(transparent)]
129 CastError(
130 #[from]
131 #[backtrace]
132 CastError,
133 ),
134 #[error("Catalog error: {0}")]
135 CatalogError(
136 #[source]
137 #[backtrace]
138 BoxedError,
139 ),
140 #[error("Protocol error: {0}")]
141 ProtocolError(String),
142 #[error("Scheduler error: {0}")]
143 SchedulerError(
144 #[source]
145 #[backtrace]
146 BoxedError,
147 ),
148 #[error("Task not found")]
149 TaskNotFound,
150 #[error("Session not found")]
151 SessionNotFound,
152 #[error("Item not found: {0}")]
153 ItemNotFound(String),
154 #[error("Invalid input syntax: {0}")]
155 InvalidInputSyntax(#[message] String),
156 #[error("Can not compare in memory: {0}")]
157 MemComparableError(#[from] memcomparable::Error),
158 #[error("Error while de/se values: {0}")]
159 ValueEncodingError(
160 #[from]
161 #[backtrace]
162 ValueEncodingError,
163 ),
164 #[error("Invalid value `{config_value}` for `{config_entry}`")]
165 InvalidConfigValue {
166 config_entry: String,
167 config_value: String,
168 },
169 #[error("Invalid Parameter Value: {0}")]
170 InvalidParameterValue(String),
171 #[error("Sink error: {0}")]
172 SinkError(
173 #[source]
174 #[backtrace]
175 BoxedError,
176 ),
177 #[error("Permission denied: {0}")]
178 PermissionDenied(String),
179 #[error("Failed to get/set session config: {0}")]
180 SessionConfig(
181 #[from]
182 #[backtrace]
183 SessionConfigError,
184 ),
185 #[error("Secret error: {0}")]
186 SecretError(
187 #[from]
188 #[backtrace]
189 SecretError,
190 ),
191 #[error("{0} has been deprecated, please use {1} instead.")]
192 Deprecated(String, String),
193 #[error(transparent)]
194 FeatureNotAvailable(
195 #[from]
196 #[backtrace]
197 FeatureNotAvailable,
198 ),
199}
200
201pub type Result<T> = std::result::Result<T, RwError>;
203
204impl From<TonicStatusWrapper> for RwError {
205 fn from(status: TonicStatusWrapper) -> Self {
206 use tonic::Code;
207
208 let message = status.inner().message();
209
210 match status.inner().code() {
212 Code::InvalidArgument => ErrorCode::InvalidParameterValue(message.to_owned()),
213 Code::NotFound | Code::AlreadyExists => ErrorCode::CatalogError(status.into()),
214 Code::PermissionDenied => ErrorCode::PermissionDenied(message.to_owned()),
215 Code::Cancelled => ErrorCode::SchedulerError(status.into()),
216 _ => ErrorCode::RpcError(status.into()),
217 }
218 .into()
219 }
220}
221
222impl From<RpcError> for RwError {
223 fn from(r: RpcError) -> Self {
224 match r {
225 RpcError::GrpcStatus(status) => TonicStatusWrapper::into(*status),
226 _ => ErrorCode::RpcError(r.into()).into(),
227 }
228 }
229}
230
231impl From<ExprError> for RwError {
232 fn from(s: ExprError) -> Self {
233 ErrorCode::ExprError(Box::new(s)).into()
234 }
235}
236
237impl From<SinkError> for RwError {
238 fn from(e: SinkError) -> Self {
239 ErrorCode::SinkError(Box::new(e)).into()
240 }
241}
242
243impl From<ConnectorError> for RwError {
244 fn from(e: ConnectorError) -> Self {
245 ErrorCode::ConnectorError(e.into()).into()
246 }
247}
248
249impl From<PbFieldNotFound> for RwError {
250 fn from(err: PbFieldNotFound) -> Self {
251 ErrorCode::InternalError(format!(
252 "Failed to decode prost: field not found `{}`",
253 err.0
254 ))
255 .into()
256 }
257}
258
259impl From<BatchError> for RwError {
260 fn from(s: BatchError) -> Self {
261 ErrorCode::BatchError(Box::new(s)).into()
262 }
263}
264
265impl From<JoinError> for RwError {
266 fn from(join_error: JoinError) -> Self {
267 ErrorCode::Uncategorized(join_error.into()).into()
268 }
269}
270
271impl From<BoxedError> for RwError {
273 fn from(e: BoxedError) -> Self {
274 let e = anyhow::__private::kind::BoxedKind::anyhow_kind(&e).new(e);
277 ErrorCode::Uncategorized(e).into()
278 }
279}
280
281impl From<risingwave_sqlparser::parser::ParserError> for ErrorCode {
282 fn from(e: risingwave_sqlparser::parser::ParserError) -> Self {
283 ErrorCode::InvalidInputSyntax(e.to_report_string())
284 }
285}
286
287impl From<RwError> for tonic::Status {
288 fn from(err: RwError) -> Self {
289 err.to_status(tonic::Code::Internal, "RwError")
290 }
291}