risingwave_batch/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17pub use anyhow::anyhow;
18use mysql_async::Error as MySqlError;
19use parquet::errors::ParquetError;
20use risingwave_common::array::ArrayError;
21use risingwave_common::error::{BoxedError, IcebergError, def_anyhow_newtype, def_anyhow_variant};
22use risingwave_common::id::FragmentId;
23use risingwave_common::util::value_encoding::error::ValueEncodingError;
24use risingwave_connector::error::ConnectorError;
25use risingwave_dml::error::DmlError;
26use risingwave_expr::ExprError;
27use risingwave_pb::PbFieldNotFound;
28use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
29use risingwave_storage::error::StorageError;
30use thiserror::Error;
31use thiserror_ext::Construct;
32use tokio_postgres::Error as PostgresError;
33use tonic::Status;
34
35pub type Result<T> = std::result::Result<T, BatchError>;
36/// Batch result with shared error.
37pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
38
39pub trait Error = std::error::Error + Send + Sync + 'static;
40
41#[derive(Error, Debug, Construct)]
42pub enum BatchError {
43    #[error("Storage error: {0}")]
44    Storage(
45        #[backtrace]
46        #[from]
47        StorageError,
48    ),
49
50    #[error("Array error: {0}")]
51    Array(
52        #[from]
53        #[backtrace]
54        ArrayError,
55    ),
56
57    #[error("Expr error: {0}")]
58    Expr(
59        #[from]
60        #[backtrace]
61        ExprError,
62    ),
63
64    #[error("Serialize/deserialize error: {0}")]
65    Serde(
66        #[source]
67        #[backtrace]
68        BoxedError,
69    ),
70
71    #[error("Failed to send result to channel")]
72    SenderError,
73
74    #[error(transparent)]
75    Internal(
76        #[from]
77        #[backtrace]
78        anyhow::Error,
79    ),
80
81    #[error("Task aborted: {0}")]
82    Aborted(String),
83
84    #[error(transparent)]
85    PbFieldNotFound(#[from] PbFieldNotFound),
86
87    #[error(transparent)]
88    RpcError(
89        #[from]
90        #[backtrace]
91        RpcError,
92    ),
93
94    #[error("Connector error: {0}")]
95    Connector(
96        #[source]
97        #[backtrace]
98        BoxedError,
99    ),
100
101    #[error("Failed to read from system table: {0}")]
102    SystemTable(
103        #[from]
104        #[backtrace]
105        BoxedError,
106    ),
107
108    #[error(transparent)]
109    Dml(
110        #[from]
111        #[backtrace]
112        DmlError,
113    ),
114
115    #[error("External system error: {0}")]
116    ExternalSystemError(
117        #[from]
118        #[backtrace]
119        BatchExternalSystemError,
120    ),
121
122    // Make the ref-counted type to be a variant for easier code structuring.
123    // TODO(error-handling): replace with `thiserror_ext::Arc`
124    #[error(transparent)]
125    Shared(
126        #[from]
127        #[backtrace]
128        Arc<Self>,
129    ),
130
131    #[error("Empty workers found")]
132    EmptyWorkerNodes,
133
134    #[error("Serving vnode mapping not found for fragment {0}")]
135    ServingVnodeMappingNotFound(FragmentId),
136
137    #[error("Streaming vnode mapping has not been initialized")]
138    StreamingVnodeMappingNotInitialized,
139
140    #[error("Streaming vnode mapping not found for fragment {0}")]
141    StreamingVnodeMappingNotFound(FragmentId),
142
143    #[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
144    OutOfMemory(u64),
145
146    #[error("Failed to spill out to disk")]
147    Spill(
148        #[from]
149        #[backtrace]
150        opendal::Error,
151    ),
152
153    #[error("Failed to execute time travel query")]
154    TimeTravel(
155        #[source]
156        #[backtrace]
157        anyhow::Error,
158    ),
159}
160
161// Serialize/deserialize error.
162impl From<memcomparable::Error> for BatchError {
163    fn from(m: memcomparable::Error) -> Self {
164        Self::Serde(m.into())
165    }
166}
167impl From<ValueEncodingError> for BatchError {
168    fn from(e: ValueEncodingError) -> Self {
169        Self::Serde(e.into())
170    }
171}
172
173impl<'a> From<&'a BatchError> for Status {
174    fn from(err: &'a BatchError) -> Self {
175        err.to_status(tonic::Code::Internal, "batch")
176    }
177}
178
179impl From<BatchError> for Status {
180    fn from(err: BatchError) -> Self {
181        Self::from(&err)
182    }
183}
184
185impl From<ConnectorError> for BatchError {
186    fn from(value: ConnectorError) -> Self {
187        Self::Connector(value.into())
188    }
189}
190
191// Define a external system error
192def_anyhow_variant! {
193    pub BatchExternalSystemError,
194    BatchError ExternalSystemError,
195
196    PostgresError => "Postgres error",
197    IcebergError => "Iceberg error",
198    ParquetError => "Parquet error",
199    MySqlError => "MySQL error",
200}
201
202#[expect(clippy::disallowed_types)]
203impl From<iceberg::Error> for BatchExternalSystemError {
204    fn from(value: iceberg::Error) -> Self {
205        risingwave_common::error::IcebergError::from(value).into()
206    }
207}