risingwave_batch/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17pub use anyhow::anyhow;
18use mysql_async::Error as MySqlError;
19use parquet::errors::ParquetError;
20use risingwave_common::array::ArrayError;
21use risingwave_common::error::{BoxedError, IcebergError, def_anyhow_newtype, def_anyhow_variant};
22use risingwave_common::util::value_encoding::error::ValueEncodingError;
23use risingwave_connector::error::ConnectorError;
24use risingwave_dml::error::DmlError;
25use risingwave_expr::ExprError;
26use risingwave_pb::PbFieldNotFound;
27use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
28use risingwave_storage::error::StorageError;
29use thiserror::Error;
30use thiserror_ext::Construct;
31use tokio_postgres::Error as PostgresError;
32use tonic::Status;
33
34use crate::worker_manager::worker_node_manager::FragmentId;
35
36pub type Result<T> = std::result::Result<T, BatchError>;
37/// Batch result with shared error.
38pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
39
40pub trait Error = std::error::Error + Send + Sync + 'static;
41
42#[derive(Error, Debug, Construct)]
43pub enum BatchError {
44    #[error("Storage error: {0}")]
45    Storage(
46        #[backtrace]
47        #[from]
48        StorageError,
49    ),
50
51    #[error("Array error: {0}")]
52    Array(
53        #[from]
54        #[backtrace]
55        ArrayError,
56    ),
57
58    #[error("Expr error: {0}")]
59    Expr(
60        #[from]
61        #[backtrace]
62        ExprError,
63    ),
64
65    #[error("Serialize/deserialize error: {0}")]
66    Serde(
67        #[source]
68        #[backtrace]
69        BoxedError,
70    ),
71
72    #[error("Failed to send result to channel")]
73    SenderError,
74
75    #[error(transparent)]
76    Internal(
77        #[from]
78        #[backtrace]
79        anyhow::Error,
80    ),
81
82    #[error("Task aborted: {0}")]
83    Aborted(String),
84
85    #[error(transparent)]
86    PbFieldNotFound(#[from] PbFieldNotFound),
87
88    #[error(transparent)]
89    RpcError(
90        #[from]
91        #[backtrace]
92        RpcError,
93    ),
94
95    #[error("Connector error: {0}")]
96    Connector(
97        #[source]
98        #[backtrace]
99        BoxedError,
100    ),
101
102    #[error("Failed to read from system table: {0}")]
103    SystemTable(
104        #[from]
105        #[backtrace]
106        BoxedError,
107    ),
108
109    #[error(transparent)]
110    Dml(
111        #[from]
112        #[backtrace]
113        DmlError,
114    ),
115
116    #[error("External system error: {0}")]
117    ExternalSystemError(
118        #[from]
119        #[backtrace]
120        BatchExternalSystemError,
121    ),
122
123    // Make the ref-counted type to be a variant for easier code structuring.
124    // TODO(error-handling): replace with `thiserror_ext::Arc`
125    #[error(transparent)]
126    Shared(
127        #[from]
128        #[backtrace]
129        Arc<Self>,
130    ),
131
132    #[error("Empty workers found")]
133    EmptyWorkerNodes,
134
135    #[error("Serving vnode mapping not found for fragment {0}")]
136    ServingVnodeMappingNotFound(FragmentId),
137
138    #[error("Streaming vnode mapping not found for fragment {0}")]
139    StreamingVnodeMappingNotFound(FragmentId),
140
141    #[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
142    OutOfMemory(u64),
143
144    #[error("Failed to spill out to disk")]
145    Spill(
146        #[from]
147        #[backtrace]
148        opendal::Error,
149    ),
150
151    #[error("Failed to execute time travel query")]
152    TimeTravel(
153        #[source]
154        #[backtrace]
155        anyhow::Error,
156    ),
157}
158
159// Serialize/deserialize error.
160impl From<memcomparable::Error> for BatchError {
161    fn from(m: memcomparable::Error) -> Self {
162        Self::Serde(m.into())
163    }
164}
165impl From<ValueEncodingError> for BatchError {
166    fn from(e: ValueEncodingError) -> Self {
167        Self::Serde(e.into())
168    }
169}
170
171impl<'a> From<&'a BatchError> for Status {
172    fn from(err: &'a BatchError) -> Self {
173        err.to_status(tonic::Code::Internal, "batch")
174    }
175}
176
177impl From<BatchError> for Status {
178    fn from(err: BatchError) -> Self {
179        Self::from(&err)
180    }
181}
182
183impl From<ConnectorError> for BatchError {
184    fn from(value: ConnectorError) -> Self {
185        Self::Connector(value.into())
186    }
187}
188
189// Define a external system error
190def_anyhow_variant! {
191    pub BatchExternalSystemError,
192    BatchError ExternalSystemError,
193
194    PostgresError => "Postgres error",
195    IcebergError => "Iceberg error",
196    ParquetError => "Parquet error",
197    MySqlError => "MySQL error",
198}
199
200#[expect(clippy::disallowed_types)]
201impl From<iceberg::Error> for BatchExternalSystemError {
202    fn from(value: iceberg::Error) -> Self {
203        risingwave_common::error::IcebergError::from(value).into()
204    }
205}