risingwave_batch/
error.rs1#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
16
17use std::sync::Arc;
18
19pub use anyhow::anyhow;
20use iceberg::Error as IcebergError;
21use mysql_async::Error as MySqlError;
22use parquet::errors::ParquetError;
23use risingwave_common::array::ArrayError;
24use risingwave_common::error::{BoxedError, def_anyhow_newtype, def_anyhow_variant};
25use risingwave_common::util::value_encoding::error::ValueEncodingError;
26use risingwave_connector::error::ConnectorError;
27use risingwave_dml::error::DmlError;
28use risingwave_expr::ExprError;
29use risingwave_pb::PbFieldNotFound;
30use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
31use risingwave_storage::error::StorageError;
32use thiserror::Error;
33use thiserror_ext::Construct;
34use tokio_postgres::Error as PostgresError;
35use tonic::Status;
36
37use crate::worker_manager::worker_node_manager::FragmentId;
38
39pub type Result<T> = std::result::Result<T, BatchError>;
40pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
42
43pub trait Error = std::error::Error + Send + Sync + 'static;
44
45#[derive(Error, Debug, Construct)]
46pub enum BatchError {
47 #[error("Storage error: {0}")]
48 Storage(
49 #[backtrace]
50 #[from]
51 StorageError,
52 ),
53
54 #[error("Array error: {0}")]
55 Array(
56 #[from]
57 #[backtrace]
58 ArrayError,
59 ),
60
61 #[error("Expr error: {0}")]
62 Expr(
63 #[from]
64 #[backtrace]
65 ExprError,
66 ),
67
68 #[error("Serialize/deserialize error: {0}")]
69 Serde(
70 #[source]
71 #[backtrace]
72 BoxedError,
73 ),
74
75 #[error("Failed to send result to channel")]
76 SenderError,
77
78 #[error(transparent)]
79 Internal(
80 #[from]
81 #[backtrace]
82 anyhow::Error,
83 ),
84
85 #[error("Task aborted: {0}")]
86 Aborted(String),
87
88 #[error(transparent)]
89 PbFieldNotFound(#[from] PbFieldNotFound),
90
91 #[error(transparent)]
92 RpcError(
93 #[from]
94 #[backtrace]
95 RpcError,
96 ),
97
98 #[error("Connector error: {0}")]
99 Connector(
100 #[source]
101 #[backtrace]
102 BoxedError,
103 ),
104
105 #[error("Failed to read from system table: {0}")]
106 SystemTable(
107 #[from]
108 #[backtrace]
109 BoxedError,
110 ),
111
112 #[error(transparent)]
113 Dml(
114 #[from]
115 #[backtrace]
116 DmlError,
117 ),
118
119 #[error("External system error: {0}")]
120 ExternalSystemError(
121 #[from]
122 #[backtrace]
123 BatchExternalSystemError,
124 ),
125
126 #[error(transparent)]
129 Shared(
130 #[from]
131 #[backtrace]
132 Arc<Self>,
133 ),
134
135 #[error("Empty workers found")]
136 EmptyWorkerNodes,
137
138 #[error("Serving vnode mapping not found for fragment {0}")]
139 ServingVnodeMappingNotFound(FragmentId),
140
141 #[error("Streaming vnode mapping not found for fragment {0}")]
142 StreamingVnodeMappingNotFound(FragmentId),
143
144 #[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
145 OutOfMemory(u64),
146
147 #[error("Failed to spill out to disk")]
148 Spill(
149 #[from]
150 #[backtrace]
151 opendal::Error,
152 ),
153
154 #[error("Failed to execute time travel query")]
155 TimeTravel(
156 #[source]
157 #[backtrace]
158 anyhow::Error,
159 ),
160}
161
162impl From<memcomparable::Error> for BatchError {
164 fn from(m: memcomparable::Error) -> Self {
165 Self::Serde(m.into())
166 }
167}
168impl From<ValueEncodingError> for BatchError {
169 fn from(e: ValueEncodingError) -> Self {
170 Self::Serde(e.into())
171 }
172}
173
174impl<'a> From<&'a BatchError> for Status {
175 fn from(err: &'a BatchError) -> Self {
176 err.to_status(tonic::Code::Internal, "batch")
177 }
178}
179
180impl From<BatchError> for Status {
181 fn from(err: BatchError) -> Self {
182 Self::from(&err)
183 }
184}
185
186impl From<ConnectorError> for BatchError {
187 fn from(value: ConnectorError) -> Self {
188 Self::Connector(value.into())
189 }
190}
191
192def_anyhow_variant! {
194 pub BatchExternalSystemError,
195 BatchError ExternalSystemError,
196
197 PostgresError => "Postgres error",
198 IcebergError => "Iceberg error",
199 ParquetError => "Parquet error",
200 MySqlError => "MySQL error",
201}