risingwave_batch/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
16
17use std::sync::Arc;
18
19pub use anyhow::anyhow;
20use iceberg::Error as IcebergError;
21use mysql_async::Error as MySqlError;
22use parquet::errors::ParquetError;
23use risingwave_common::array::ArrayError;
24use risingwave_common::error::{BoxedError, def_anyhow_newtype, def_anyhow_variant};
25use risingwave_common::util::value_encoding::error::ValueEncodingError;
26use risingwave_connector::error::ConnectorError;
27use risingwave_dml::error::DmlError;
28use risingwave_expr::ExprError;
29use risingwave_pb::PbFieldNotFound;
30use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
31use risingwave_storage::error::StorageError;
32use thiserror::Error;
33use thiserror_ext::Construct;
34use tokio_postgres::Error as PostgresError;
35use tonic::Status;
36
37use crate::worker_manager::worker_node_manager::FragmentId;
38
39pub type Result<T> = std::result::Result<T, BatchError>;
40/// Batch result with shared error.
41pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
42
43pub trait Error = std::error::Error + Send + Sync + 'static;
44
45#[derive(Error, Debug, Construct)]
46pub enum BatchError {
47    #[error("Storage error: {0}")]
48    Storage(
49        #[backtrace]
50        #[from]
51        StorageError,
52    ),
53
54    #[error("Array error: {0}")]
55    Array(
56        #[from]
57        #[backtrace]
58        ArrayError,
59    ),
60
61    #[error("Expr error: {0}")]
62    Expr(
63        #[from]
64        #[backtrace]
65        ExprError,
66    ),
67
68    #[error("Serialize/deserialize error: {0}")]
69    Serde(
70        #[source]
71        #[backtrace]
72        BoxedError,
73    ),
74
75    #[error("Failed to send result to channel")]
76    SenderError,
77
78    #[error(transparent)]
79    Internal(
80        #[from]
81        #[backtrace]
82        anyhow::Error,
83    ),
84
85    #[error("Task aborted: {0}")]
86    Aborted(String),
87
88    #[error(transparent)]
89    PbFieldNotFound(#[from] PbFieldNotFound),
90
91    #[error(transparent)]
92    RpcError(
93        #[from]
94        #[backtrace]
95        RpcError,
96    ),
97
98    #[error("Connector error: {0}")]
99    Connector(
100        #[source]
101        #[backtrace]
102        BoxedError,
103    ),
104
105    #[error("Failed to read from system table: {0}")]
106    SystemTable(
107        #[from]
108        #[backtrace]
109        BoxedError,
110    ),
111
112    #[error(transparent)]
113    Dml(
114        #[from]
115        #[backtrace]
116        DmlError,
117    ),
118
119    #[error("External system error: {0}")]
120    ExternalSystemError(
121        #[from]
122        #[backtrace]
123        BatchExternalSystemError,
124    ),
125
126    // Make the ref-counted type to be a variant for easier code structuring.
127    // TODO(error-handling): replace with `thiserror_ext::Arc`
128    #[error(transparent)]
129    Shared(
130        #[from]
131        #[backtrace]
132        Arc<Self>,
133    ),
134
135    #[error("Empty workers found")]
136    EmptyWorkerNodes,
137
138    #[error("Serving vnode mapping not found for fragment {0}")]
139    ServingVnodeMappingNotFound(FragmentId),
140
141    #[error("Streaming vnode mapping not found for fragment {0}")]
142    StreamingVnodeMappingNotFound(FragmentId),
143
144    #[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
145    OutOfMemory(u64),
146
147    #[error("Failed to spill out to disk")]
148    Spill(
149        #[from]
150        #[backtrace]
151        opendal::Error,
152    ),
153
154    #[error("Failed to execute time travel query")]
155    TimeTravel(
156        #[source]
157        #[backtrace]
158        anyhow::Error,
159    ),
160}
161
162// Serialize/deserialize error.
163impl From<memcomparable::Error> for BatchError {
164    fn from(m: memcomparable::Error) -> Self {
165        Self::Serde(m.into())
166    }
167}
168impl From<ValueEncodingError> for BatchError {
169    fn from(e: ValueEncodingError) -> Self {
170        Self::Serde(e.into())
171    }
172}
173
174impl<'a> From<&'a BatchError> for Status {
175    fn from(err: &'a BatchError) -> Self {
176        err.to_status(tonic::Code::Internal, "batch")
177    }
178}
179
180impl From<BatchError> for Status {
181    fn from(err: BatchError) -> Self {
182        Self::from(&err)
183    }
184}
185
186impl From<ConnectorError> for BatchError {
187    fn from(value: ConnectorError) -> Self {
188        Self::Connector(value.into())
189    }
190}
191
192// Define a external system error
193def_anyhow_variant! {
194    pub BatchExternalSystemError,
195    BatchError ExternalSystemError,
196
197    PostgresError => "Postgres error",
198    IcebergError => "Iceberg error",
199    ParquetError => "Parquet error",
200    MySqlError => "MySQL error",
201}