risingwave_batch/
error.rs1use std::sync::Arc;
16
17pub use anyhow::anyhow;
18use mysql_async::Error as MySqlError;
19use parquet::errors::ParquetError;
20use risingwave_common::array::ArrayError;
21use risingwave_common::error::{BoxedError, IcebergError, def_anyhow_newtype, def_anyhow_variant};
22use risingwave_common::util::value_encoding::error::ValueEncodingError;
23use risingwave_connector::error::ConnectorError;
24use risingwave_dml::error::DmlError;
25use risingwave_expr::ExprError;
26use risingwave_pb::PbFieldNotFound;
27use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
28use risingwave_storage::error::StorageError;
29use thiserror::Error;
30use thiserror_ext::Construct;
31use tokio_postgres::Error as PostgresError;
32use tonic::Status;
33
34use crate::worker_manager::worker_node_manager::FragmentId;
35
36pub type Result<T> = std::result::Result<T, BatchError>;
37pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
39
40pub trait Error = std::error::Error + Send + Sync + 'static;
41
42#[derive(Error, Debug, Construct)]
43pub enum BatchError {
44 #[error("Storage error: {0}")]
45 Storage(
46 #[backtrace]
47 #[from]
48 StorageError,
49 ),
50
51 #[error("Array error: {0}")]
52 Array(
53 #[from]
54 #[backtrace]
55 ArrayError,
56 ),
57
58 #[error("Expr error: {0}")]
59 Expr(
60 #[from]
61 #[backtrace]
62 ExprError,
63 ),
64
65 #[error("Serialize/deserialize error: {0}")]
66 Serde(
67 #[source]
68 #[backtrace]
69 BoxedError,
70 ),
71
72 #[error("Failed to send result to channel")]
73 SenderError,
74
75 #[error(transparent)]
76 Internal(
77 #[from]
78 #[backtrace]
79 anyhow::Error,
80 ),
81
82 #[error("Task aborted: {0}")]
83 Aborted(String),
84
85 #[error(transparent)]
86 PbFieldNotFound(#[from] PbFieldNotFound),
87
88 #[error(transparent)]
89 RpcError(
90 #[from]
91 #[backtrace]
92 RpcError,
93 ),
94
95 #[error("Connector error: {0}")]
96 Connector(
97 #[source]
98 #[backtrace]
99 BoxedError,
100 ),
101
102 #[error("Failed to read from system table: {0}")]
103 SystemTable(
104 #[from]
105 #[backtrace]
106 BoxedError,
107 ),
108
109 #[error(transparent)]
110 Dml(
111 #[from]
112 #[backtrace]
113 DmlError,
114 ),
115
116 #[error("External system error: {0}")]
117 ExternalSystemError(
118 #[from]
119 #[backtrace]
120 BatchExternalSystemError,
121 ),
122
123 #[error(transparent)]
126 Shared(
127 #[from]
128 #[backtrace]
129 Arc<Self>,
130 ),
131
132 #[error("Empty workers found")]
133 EmptyWorkerNodes,
134
135 #[error("Serving vnode mapping not found for fragment {0}")]
136 ServingVnodeMappingNotFound(FragmentId),
137
138 #[error("Streaming vnode mapping not found for fragment {0}")]
139 StreamingVnodeMappingNotFound(FragmentId),
140
141 #[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
142 OutOfMemory(u64),
143
144 #[error("Failed to spill out to disk")]
145 Spill(
146 #[from]
147 #[backtrace]
148 opendal::Error,
149 ),
150
151 #[error("Failed to execute time travel query")]
152 TimeTravel(
153 #[source]
154 #[backtrace]
155 anyhow::Error,
156 ),
157}
158
159impl From<memcomparable::Error> for BatchError {
161 fn from(m: memcomparable::Error) -> Self {
162 Self::Serde(m.into())
163 }
164}
165impl From<ValueEncodingError> for BatchError {
166 fn from(e: ValueEncodingError) -> Self {
167 Self::Serde(e.into())
168 }
169}
170
171impl<'a> From<&'a BatchError> for Status {
172 fn from(err: &'a BatchError) -> Self {
173 err.to_status(tonic::Code::Internal, "batch")
174 }
175}
176
177impl From<BatchError> for Status {
178 fn from(err: BatchError) -> Self {
179 Self::from(&err)
180 }
181}
182
183impl From<ConnectorError> for BatchError {
184 fn from(value: ConnectorError) -> Self {
185 Self::Connector(value.into())
186 }
187}
188
189def_anyhow_variant! {
191 pub BatchExternalSystemError,
192 BatchError ExternalSystemError,
193
194 PostgresError => "Postgres error",
195 IcebergError => "Iceberg error",
196 ParquetError => "Parquet error",
197 MySqlError => "MySQL error",
198}
199
200#[expect(clippy::disallowed_types)]
201impl From<iceberg::Error> for BatchExternalSystemError {
202 fn from(value: iceberg::Error) -> Self {
203 risingwave_common::error::IcebergError::from(value).into()
204 }
205}