risingwave_stream/executor/
error.rs1use risingwave_common::array::ArrayError;
16use risingwave_common::error::{BoxedError, NotImplemented};
17use risingwave_common::id::SinkId;
18use risingwave_common::util::value_encoding::error::ValueEncodingError;
19use risingwave_connector::error::ConnectorError;
20use risingwave_connector::sink::SinkError;
21use risingwave_dml::error::DmlError;
22use risingwave_expr::ExprError;
23use risingwave_pb::PbFieldNotFound;
24use risingwave_rpc_client::error::RpcError;
25use risingwave_storage::error::StorageError;
26use strum_macros::AsRefStr;
27
28use super::Barrier;
29use super::exchange::error::ExchangeChannelClosed;
30
31pub type StreamExecutorResult<T> = std::result::Result<T, StreamExecutorError>;
33
34#[derive(
36 thiserror::Error, thiserror_ext::ReportDebug, thiserror_ext::Box, thiserror_ext::Construct,
37)]
38#[thiserror_ext(newtype(name = StreamExecutorError, backtrace))]
39#[derive(AsRefStr)]
40pub enum ErrorKind {
41 #[error("Storage error: {0}")]
42 Storage(
43 #[backtrace]
44 #[from]
45 StorageError,
46 ),
47
48 #[error("Chunk operation error: {0}")]
49 ArrayError(
50 #[from]
51 #[backtrace]
52 ArrayError,
53 ),
54
55 #[error("Chunk operation error: {0}")]
56 ExprError(
57 #[from]
58 #[backtrace]
59 ExprError,
60 ),
61
62 #[error("Serialize/deserialize error: {0}")]
64 SerdeError(
65 #[source]
66 #[backtrace]
67 BoxedError,
68 ),
69
70 #[error("Sink error: sink_id={1}, error: {0}")]
71 SinkError(
72 #[source]
73 #[backtrace]
74 SinkError,
75 SinkId,
76 ),
77
78 #[error(transparent)]
79 RpcError(
80 #[from]
81 #[backtrace]
82 RpcError,
83 ),
84
85 #[error("Channel closed: {0}")]
86 ChannelClosed(String),
87
88 #[error(transparent)]
89 ExchangeChannelClosed(
90 #[from]
91 #[backtrace]
92 ExchangeChannelClosed,
93 ),
94
95 #[error("Failed to align barrier: expected `{0:?}` but got `{1:?}`")]
96 AlignBarrier(Box<Barrier>, Box<Barrier>),
97
98 #[error("Connector error: {0}")]
99 ConnectorError(
100 #[source]
101 #[backtrace]
102 BoxedError,
103 ),
104
105 #[error(transparent)]
106 DmlError(
107 #[from]
108 #[backtrace]
109 DmlError,
110 ),
111
112 #[error(transparent)]
113 NotImplemented(#[from] NotImplemented),
114
115 #[error(transparent)]
116 Uncategorized(
117 #[from]
118 #[backtrace]
119 anyhow::Error,
120 ),
121}
122
123impl From<memcomparable::Error> for StreamExecutorError {
125 fn from(m: memcomparable::Error) -> Self {
126 Self::serde_error(m)
127 }
128}
129impl From<ValueEncodingError> for StreamExecutorError {
130 fn from(e: ValueEncodingError) -> Self {
131 Self::serde_error(e)
132 }
133}
134
135impl From<ConnectorError> for StreamExecutorError {
137 fn from(s: ConnectorError) -> Self {
138 Self::connector_error(s)
139 }
140}
141
142impl From<PbFieldNotFound> for StreamExecutorError {
143 fn from(err: PbFieldNotFound) -> Self {
144 Self::from(anyhow::anyhow!(
145 "Failed to decode prost: field not found `{}`",
146 err.0
147 ))
148 }
149}
150
151impl From<String> for StreamExecutorError {
152 fn from(s: String) -> Self {
153 ErrorKind::Uncategorized(anyhow::anyhow!(s)).into()
154 }
155}
156
157impl From<(SinkError, SinkId)> for StreamExecutorError {
158 fn from((err, sink_id): (SinkError, SinkId)) -> Self {
159 ErrorKind::SinkError(err, sink_id).into()
160 }
161}
162
163impl StreamExecutorError {
164 pub fn variant_name(&self) -> &str {
165 self.0.inner().as_ref()
166 }
167}
168
169static_assertions::const_assert_eq!(std::mem::size_of::<StreamExecutorError>(), 8);
170
171#[cfg(test)]
172mod tests {
173 use risingwave_common::bail;
174
175 use super::*;
176
177 fn func_return_error() -> StreamExecutorResult<()> {
178 bail!("test_error")
179 }
180
181 #[test]
182 #[should_panic]
183 #[ignore]
184 fn executor_error_ui_test_1() {
185 func_return_error().unwrap();
187 }
188
189 #[test]
190 #[ignore]
191 fn executor_error_ui_test_2() {
192 func_return_error().map_err(|e| println!("{:?}", e)).ok();
194 }
195}