risingwave_stream/executor/
error.rs1use risingwave_common::array::ArrayError;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::util::value_encoding::error::ValueEncodingError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_connector::sink::SinkError;
20use risingwave_dml::error::DmlError;
21use risingwave_expr::ExprError;
22use risingwave_pb::PbFieldNotFound;
23use risingwave_rpc_client::error::RpcError;
24use risingwave_storage::error::StorageError;
25use strum_macros::AsRefStr;
26
27use super::Barrier;
28use super::exchange::error::ExchangeChannelClosed;
29
30pub type StreamExecutorResult<T> = std::result::Result<T, StreamExecutorError>;
32
33#[derive(
35 thiserror::Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Construct,
36)]
37#[thiserror_ext(newtype(name = StreamExecutorError, backtrace))]
38#[derive(AsRefStr)]
39pub enum ErrorKind {
40 #[error("Storage error: {0}")]
41 Storage(
42 #[backtrace]
43 #[from]
44 StorageError,
45 ),
46
47 #[error("Chunk operation error: {0}")]
48 ArrayError(
49 #[from]
50 #[backtrace]
51 ArrayError,
52 ),
53
54 #[error("Chunk operation error: {0}")]
55 ExprError(
56 #[from]
57 #[backtrace]
58 ExprError,
59 ),
60
61 #[error("Serialize/deserialize error: {0}")]
63 SerdeError(
64 #[source]
65 #[backtrace]
66 BoxedError,
67 ),
68
69 #[error("Sink error: sink_id={1}, error: {0}")]
70 SinkError(
71 #[source]
72 #[backtrace]
73 SinkError,
74 u32,
75 ),
76
77 #[error(transparent)]
78 RpcError(
79 #[from]
80 #[backtrace]
81 RpcError,
82 ),
83
84 #[error("Channel closed: {0}")]
85 ChannelClosed(String),
86
87 #[error(transparent)]
88 ExchangeChannelClosed(
89 #[from]
90 #[backtrace]
91 ExchangeChannelClosed,
92 ),
93
94 #[error("Failed to align barrier: expected `{0:?}` but got `{1:?}`")]
95 AlignBarrier(Box<Barrier>, Box<Barrier>),
96
97 #[error("Connector error: {0}")]
98 ConnectorError(
99 #[source]
100 #[backtrace]
101 BoxedError,
102 ),
103
104 #[error(transparent)]
105 DmlError(
106 #[from]
107 #[backtrace]
108 DmlError,
109 ),
110
111 #[error(transparent)]
112 NotImplemented(#[from] NotImplemented),
113
114 #[error(transparent)]
115 Uncategorized(
116 #[from]
117 #[backtrace]
118 anyhow::Error,
119 ),
120}
121
122impl From<memcomparable::Error> for StreamExecutorError {
124 fn from(m: memcomparable::Error) -> Self {
125 Self::serde_error(m)
126 }
127}
128impl From<ValueEncodingError> for StreamExecutorError {
129 fn from(e: ValueEncodingError) -> Self {
130 Self::serde_error(e)
131 }
132}
133
134impl From<ConnectorError> for StreamExecutorError {
136 fn from(s: ConnectorError) -> Self {
137 Self::connector_error(s)
138 }
139}
140
141impl From<PbFieldNotFound> for StreamExecutorError {
142 fn from(err: PbFieldNotFound) -> Self {
143 Self::from(anyhow::anyhow!(
144 "Failed to decode prost: field not found `{}`",
145 err.0
146 ))
147 }
148}
149
150impl From<String> for StreamExecutorError {
151 fn from(s: String) -> Self {
152 ErrorKind::Uncategorized(anyhow::anyhow!(s)).into()
153 }
154}
155
156impl From<(SinkError, u32)> for StreamExecutorError {
157 fn from((err, sink_id): (SinkError, u32)) -> Self {
158 ErrorKind::SinkError(err, sink_id).into()
159 }
160}
161
162impl StreamExecutorError {
163 pub fn variant_name(&self) -> &str {
164 self.0.inner().as_ref()
165 }
166}
167
168static_assertions::const_assert_eq!(std::mem::size_of::<StreamExecutorError>(), 8);
169
170#[cfg(test)]
171mod tests {
172 use risingwave_common::bail;
173
174 use super::*;
175
176 fn func_return_error() -> StreamExecutorResult<()> {
177 bail!("test_error")
178 }
179
180 #[test]
181 #[should_panic]
182 #[ignore]
183 fn executor_error_ui_test_1() {
184 func_return_error().unwrap();
186 }
187
188 #[test]
189 #[ignore]
190 fn executor_error_ui_test_2() {
191 func_return_error().map_err(|e| println!("{:?}", e)).ok();
193 }
194}