risingwave_stream/executor/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::ArrayError;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::id::SinkId;
18use risingwave_common::util::value_encoding::error::ValueEncodingError;
19use risingwave_connector::error::ConnectorError;
20use risingwave_connector::sink::SinkError;
21use risingwave_dml::error::DmlError;
22use risingwave_expr::ExprError;
23use risingwave_pb::PbFieldNotFound;
24use risingwave_rpc_client::error::RpcError;
25use risingwave_storage::error::StorageError;
26use strum_macros::AsRefStr;
27
28use super::Barrier;
29use super::exchange::error::ExchangeChannelClosed;
30
31/// A specialized Result type for streaming executors.
32pub type StreamExecutorResult<T> = std::result::Result<T, StreamExecutorError>;
33
34/// The error type for streaming executors.
35#[derive(
36    thiserror::Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Construct,
37)]
38#[thiserror_ext(newtype(name = StreamExecutorError, backtrace))]
39#[derive(AsRefStr)]
40pub enum ErrorKind {
41    #[error("Storage error: {0}")]
42    Storage(
43        #[backtrace]
44        #[from]
45        StorageError,
46    ),
47
48    #[error("Chunk operation error: {0}")]
49    ArrayError(
50        #[from]
51        #[backtrace]
52        ArrayError,
53    ),
54
55    #[error("Chunk operation error: {0}")]
56    ExprError(
57        #[from]
58        #[backtrace]
59        ExprError,
60    ),
61
62    // TODO: remove this after state table is fully used
63    #[error("Serialize/deserialize error: {0}")]
64    SerdeError(
65        #[source]
66        #[backtrace]
67        BoxedError,
68    ),
69
70    #[error("Sink error: sink_id={1}, error: {0}")]
71    SinkError(
72        #[source]
73        #[backtrace]
74        SinkError,
75        SinkId,
76    ),
77
78    #[error(transparent)]
79    RpcError(
80        #[from]
81        #[backtrace]
82        RpcError,
83    ),
84
85    #[error("Channel closed: {0}")]
86    ChannelClosed(String),
87
88    #[error(transparent)]
89    ExchangeChannelClosed(
90        #[from]
91        #[backtrace]
92        ExchangeChannelClosed,
93    ),
94
95    #[error("Failed to align barrier: expected `{0:?}` but got `{1:?}`")]
96    AlignBarrier(Box<Barrier>, Box<Barrier>),
97
98    #[error("Connector error: {0}")]
99    ConnectorError(
100        #[source]
101        #[backtrace]
102        BoxedError,
103    ),
104
105    #[error(transparent)]
106    DmlError(
107        #[from]
108        #[backtrace]
109        DmlError,
110    ),
111
112    #[error(transparent)]
113    NotImplemented(#[from] NotImplemented),
114
115    #[error(transparent)]
116    Uncategorized(
117        #[from]
118        #[backtrace]
119        anyhow::Error,
120    ),
121}
122
123/// Serialize/deserialize error.
124impl From<memcomparable::Error> for StreamExecutorError {
125    fn from(m: memcomparable::Error) -> Self {
126        Self::serde_error(m)
127    }
128}
129impl From<ValueEncodingError> for StreamExecutorError {
130    fn from(e: ValueEncodingError) -> Self {
131        Self::serde_error(e)
132    }
133}
134
135/// Connector error.
136impl From<ConnectorError> for StreamExecutorError {
137    fn from(s: ConnectorError) -> Self {
138        Self::connector_error(s)
139    }
140}
141
142impl From<PbFieldNotFound> for StreamExecutorError {
143    fn from(err: PbFieldNotFound) -> Self {
144        Self::from(anyhow::anyhow!(
145            "Failed to decode prost: field not found `{}`",
146            err.0
147        ))
148    }
149}
150
151impl From<String> for StreamExecutorError {
152    fn from(s: String) -> Self {
153        ErrorKind::Uncategorized(anyhow::anyhow!(s)).into()
154    }
155}
156
157impl From<(SinkError, SinkId)> for StreamExecutorError {
158    fn from((err, sink_id): (SinkError, SinkId)) -> Self {
159        ErrorKind::SinkError(err, sink_id).into()
160    }
161}
162
163impl StreamExecutorError {
164    pub fn variant_name(&self) -> &str {
165        self.0.inner().as_ref()
166    }
167}
168
169static_assertions::const_assert_eq!(std::mem::size_of::<StreamExecutorError>(), 8);
170
171#[cfg(test)]
172mod tests {
173    use risingwave_common::bail;
174
175    use super::*;
176
177    fn func_return_error() -> StreamExecutorResult<()> {
178        bail!("test_error")
179    }
180
181    #[test]
182    #[should_panic]
183    #[ignore]
184    fn executor_error_ui_test_1() {
185        // For this test, ensure that we have only one backtrace from error when panic.
186        func_return_error().unwrap();
187    }
188
189    #[test]
190    #[ignore]
191    fn executor_error_ui_test_2() {
192        // For this test, ensure that we have only one backtrace from error when panic.
193        func_return_error().map_err(|e| println!("{:?}", e)).ok();
194    }
195}