risingwave_stream/executor/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::ArrayError;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::util::value_encoding::error::ValueEncodingError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::sink::SinkError;
20use risingwave_dml::error::DmlError;
21use risingwave_expr::ExprError;
22use risingwave_pb::PbFieldNotFound;
23use risingwave_rpc_client::error::RpcError;
24use risingwave_storage::error::StorageError;
25use strum_macros::AsRefStr;
26
27use super::Barrier;
28use super::exchange::error::ExchangeChannelClosed;
29
30/// A specialized Result type for streaming executors.
31pub type StreamExecutorResult<T> = std::result::Result<T, StreamExecutorError>;
32
33/// The error type for streaming executors.
34#[derive(
35    thiserror::Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Construct,
36)]
37#[thiserror_ext(newtype(name = StreamExecutorError, backtrace))]
38#[derive(AsRefStr)]
39pub enum ErrorKind {
40    #[error("Storage error: {0}")]
41    Storage(
42        #[backtrace]
43        #[from]
44        StorageError,
45    ),
46
47    #[error("Chunk operation error: {0}")]
48    ArrayError(
49        #[from]
50        #[backtrace]
51        ArrayError,
52    ),
53
54    #[error("Chunk operation error: {0}")]
55    ExprError(
56        #[from]
57        #[backtrace]
58        ExprError,
59    ),
60
61    // TODO: remove this after state table is fully used
62    #[error("Serialize/deserialize error: {0}")]
63    SerdeError(
64        #[source]
65        #[backtrace]
66        BoxedError,
67    ),
68
69    #[error("Sink error: sink_id={1}, error: {0}")]
70    SinkError(
71        #[source]
72        #[backtrace]
73        SinkError,
74        u32,
75    ),
76
77    #[error(transparent)]
78    RpcError(
79        #[from]
80        #[backtrace]
81        RpcError,
82    ),
83
84    #[error("Channel closed: {0}")]
85    ChannelClosed(String),
86
87    #[error(transparent)]
88    ExchangeChannelClosed(
89        #[from]
90        #[backtrace]
91        ExchangeChannelClosed,
92    ),
93
94    #[error("Failed to align barrier: expected `{0:?}` but got `{1:?}`")]
95    AlignBarrier(Box<Barrier>, Box<Barrier>),
96
97    #[error("Connector error: {0}")]
98    ConnectorError(
99        #[source]
100        #[backtrace]
101        BoxedError,
102    ),
103
104    #[error(transparent)]
105    DmlError(
106        #[from]
107        #[backtrace]
108        DmlError,
109    ),
110
111    #[error(transparent)]
112    NotImplemented(#[from] NotImplemented),
113
114    #[error(transparent)]
115    Uncategorized(
116        #[from]
117        #[backtrace]
118        anyhow::Error,
119    ),
120}
121
122/// Serialize/deserialize error.
123impl From<memcomparable::Error> for StreamExecutorError {
124    fn from(m: memcomparable::Error) -> Self {
125        Self::serde_error(m)
126    }
127}
128impl From<ValueEncodingError> for StreamExecutorError {
129    fn from(e: ValueEncodingError) -> Self {
130        Self::serde_error(e)
131    }
132}
133
134/// Connector error.
135impl From<ConnectorError> for StreamExecutorError {
136    fn from(s: ConnectorError) -> Self {
137        Self::connector_error(s)
138    }
139}
140
141impl From<PbFieldNotFound> for StreamExecutorError {
142    fn from(err: PbFieldNotFound) -> Self {
143        Self::from(anyhow::anyhow!(
144            "Failed to decode prost: field not found `{}`",
145            err.0
146        ))
147    }
148}
149
150impl From<String> for StreamExecutorError {
151    fn from(s: String) -> Self {
152        ErrorKind::Uncategorized(anyhow::anyhow!(s)).into()
153    }
154}
155
156impl From<(SinkError, u32)> for StreamExecutorError {
157    fn from((err, sink_id): (SinkError, u32)) -> Self {
158        ErrorKind::SinkError(err, sink_id).into()
159    }
160}
161
162impl StreamExecutorError {
163    pub fn variant_name(&self) -> &str {
164        self.0.inner().as_ref()
165    }
166}
167
168static_assertions::const_assert_eq!(std::mem::size_of::<StreamExecutorError>(), 8);
169
170#[cfg(test)]
171mod tests {
172    use risingwave_common::bail;
173
174    use super::*;
175
176    fn func_return_error() -> StreamExecutorResult<()> {
177        bail!("test_error")
178    }
179
180    #[test]
181    #[should_panic]
182    #[ignore]
183    fn executor_error_ui_test_1() {
184        // For this test, ensure that we have only one backtrace from error when panic.
185        func_return_error().unwrap();
186    }
187
188    #[test]
189    #[ignore]
190    fn executor_error_ui_test_2() {
191        // For this test, ensure that we have only one backtrace from error when panic.
192        func_return_error().map_err(|e| println!("{:?}", e)).ok();
193    }
194}