risingwave_batch/
error.rs1use std::sync::Arc;
16
17pub use anyhow::anyhow;
18use mysql_async::Error as MySqlError;
19use parquet::errors::ParquetError;
20use risingwave_common::array::ArrayError;
21use risingwave_common::error::{BoxedError, IcebergError, def_anyhow_newtype, def_anyhow_variant};
22use risingwave_common::id::FragmentId;
23use risingwave_common::util::value_encoding::error::ValueEncodingError;
24use risingwave_connector::error::ConnectorError;
25use risingwave_dml::error::DmlError;
26use risingwave_expr::ExprError;
27use risingwave_pb::PbFieldNotFound;
28use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
29use risingwave_storage::error::StorageError;
30use thiserror::Error;
31use thiserror_ext::Construct;
32use tokio_postgres::Error as PostgresError;
33use tonic::Status;
34
35pub type Result<T> = std::result::Result<T, BatchError>;
36pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
38
39pub trait Error = std::error::Error + Send + Sync + 'static;
40
41#[derive(Error, Debug, Construct)]
42pub enum BatchError {
43 #[error("Storage error: {0}")]
44 Storage(
45 #[backtrace]
46 #[from]
47 StorageError,
48 ),
49
50 #[error("Array error: {0}")]
51 Array(
52 #[from]
53 #[backtrace]
54 ArrayError,
55 ),
56
57 #[error("Expr error: {0}")]
58 Expr(
59 #[from]
60 #[backtrace]
61 ExprError,
62 ),
63
64 #[error("Serialize/deserialize error: {0}")]
65 Serde(
66 #[source]
67 #[backtrace]
68 BoxedError,
69 ),
70
71 #[error("Failed to send result to channel")]
72 SenderError,
73
74 #[error(transparent)]
75 Internal(
76 #[from]
77 #[backtrace]
78 anyhow::Error,
79 ),
80
81 #[error("Task aborted: {0}")]
82 Aborted(String),
83
84 #[error(transparent)]
85 PbFieldNotFound(#[from] PbFieldNotFound),
86
87 #[error(transparent)]
88 RpcError(
89 #[from]
90 #[backtrace]
91 RpcError,
92 ),
93
94 #[error("Connector error: {0}")]
95 Connector(
96 #[source]
97 #[backtrace]
98 BoxedError,
99 ),
100
101 #[error("Failed to read from system table: {0}")]
102 SystemTable(
103 #[from]
104 #[backtrace]
105 BoxedError,
106 ),
107
108 #[error(transparent)]
109 Dml(
110 #[from]
111 #[backtrace]
112 DmlError,
113 ),
114
115 #[error("External system error: {0}")]
116 ExternalSystemError(
117 #[from]
118 #[backtrace]
119 BatchExternalSystemError,
120 ),
121
122 #[error(transparent)]
125 Shared(
126 #[from]
127 #[backtrace]
128 Arc<Self>,
129 ),
130
131 #[error("Empty workers found")]
132 EmptyWorkerNodes,
133
134 #[error("Serving vnode mapping not found for fragment {0}")]
135 ServingVnodeMappingNotFound(FragmentId),
136
137 #[error("Streaming vnode mapping has not been initialized")]
138 StreamingVnodeMappingNotInitialized,
139
140 #[error("Streaming vnode mapping not found for fragment {0}")]
141 StreamingVnodeMappingNotFound(FragmentId),
142
143 #[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
144 OutOfMemory(u64),
145
146 #[error("Failed to spill out to disk")]
147 Spill(
148 #[from]
149 #[backtrace]
150 opendal::Error,
151 ),
152
153 #[error("Failed to execute time travel query")]
154 TimeTravel(
155 #[source]
156 #[backtrace]
157 anyhow::Error,
158 ),
159}
160
161impl From<memcomparable::Error> for BatchError {
163 fn from(m: memcomparable::Error) -> Self {
164 Self::Serde(m.into())
165 }
166}
167impl From<ValueEncodingError> for BatchError {
168 fn from(e: ValueEncodingError) -> Self {
169 Self::Serde(e.into())
170 }
171}
172
173impl<'a> From<&'a BatchError> for Status {
174 fn from(err: &'a BatchError) -> Self {
175 err.to_status(tonic::Code::Internal, "batch")
176 }
177}
178
179impl From<BatchError> for Status {
180 fn from(err: BatchError) -> Self {
181 Self::from(&err)
182 }
183}
184
185impl From<ConnectorError> for BatchError {
186 fn from(value: ConnectorError) -> Self {
187 Self::Connector(value.into())
188 }
189}
190
191def_anyhow_variant! {
193 pub BatchExternalSystemError,
194 BatchError ExternalSystemError,
195
196 PostgresError => "Postgres error",
197 IcebergError => "Iceberg error",
198 ParquetError => "Parquet error",
199 MySqlError => "MySQL error",
200}
201
202#[expect(clippy::disallowed_types)]
203impl From<iceberg::Error> for BatchExternalSystemError {
204 fn from(value: iceberg::Error) -> Self {
205 risingwave_common::error::IcebergError::from(value).into()
206 }
207}