risingwave_stream/
error.rs1use risingwave_common::error::tonic::extra::{Score, ScoredError};
16use risingwave_common::secret::SecretError;
17use risingwave_connector::error::ConnectorError;
18use risingwave_expr::ExprError;
19use risingwave_pb::PbFieldNotFound;
20use risingwave_rpc_client::error::ToTonicStatus;
21use risingwave_storage::error::StorageError;
22
23use crate::executor::exchange::error::ExchangeChannelClosed;
24use crate::executor::{Barrier, StreamExecutorError};
25use crate::task::ActorId;
26
27pub type StreamResult<T> = std::result::Result<T, StreamError>;
29
30#[derive(
32 thiserror::Error,
33 thiserror_ext::ReportDebug,
34 thiserror_ext::Arc,
35 thiserror_ext::ContextInto,
36 thiserror_ext::Construct,
37)]
38#[thiserror_ext(newtype(name = StreamError, backtrace))]
39pub enum ErrorKind {
40 #[error("Storage error: {0}")]
41 Storage(
42 #[backtrace]
43 #[from]
44 StorageError,
45 ),
46
47 #[error("Expression error: {0}")]
48 Expression(
49 #[from]
50 #[backtrace]
51 ExprError,
52 ),
53
54 #[error("Executor error: {0}")]
55 Executor(
56 #[from]
57 #[backtrace]
58 StreamExecutorError,
59 ),
60
61 #[error("Actor {actor_id} exited unexpectedly: {source}")]
62 UnexpectedExit {
63 actor_id: ActorId,
64 #[backtrace]
65 source: StreamError,
66 },
67
68 #[error("Failed to send barrier with epoch {epoch} to actor {actor_id}: {reason}", epoch = .barrier.epoch.curr)]
69 BarrierSend {
70 barrier: Barrier,
71 actor_id: ActorId,
72 reason: &'static str,
73 },
74
75 #[error("Secret error: {0}")]
76 Secret(
77 #[from]
78 #[backtrace]
79 SecretError,
80 ),
81
82 #[error(transparent)]
83 Uncategorized(
84 #[from]
85 #[backtrace]
86 anyhow::Error,
87 ),
88}
89
90impl From<PbFieldNotFound> for StreamError {
91 fn from(err: PbFieldNotFound) -> Self {
92 Self::from(anyhow::anyhow!(
93 "Failed to decode prost: field not found `{}`",
94 err.0
95 ))
96 }
97}
98
99impl From<ConnectorError> for StreamError {
100 fn from(err: ConnectorError) -> Self {
101 StreamExecutorError::from(err).into()
102 }
103}
104
105impl From<ExchangeChannelClosed> for StreamError {
106 fn from(err: ExchangeChannelClosed) -> Self {
107 StreamExecutorError::from(err).into()
108 }
109}
110
111impl From<StreamError> for tonic::Status {
112 fn from(error: StreamError) -> Self {
113 error.to_status(tonic::Code::Internal, "stream")
114 }
115}
116
117static_assertions::const_assert_eq!(std::mem::size_of::<StreamError>(), 8);
118
119pub type ScoredStreamError = ScoredError<StreamError>;
121
122impl StreamError {
123 pub fn with_score(self) -> ScoredStreamError {
125 fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
129 use crate::executor::error::ErrorKind;
130 match e.inner() {
131 ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
134
135 ErrorKind::Uncategorized(_)
137 | ErrorKind::Storage(_)
138 | ErrorKind::ArrayError(_)
139 | ErrorKind::ExprError(_)
140 | ErrorKind::SerdeError(_)
141 | ErrorKind::SinkError(_, _)
142 | ErrorKind::RpcError(_)
143 | ErrorKind::AlignBarrier(_, _)
144 | ErrorKind::ConnectorError(_)
145 | ErrorKind::DmlError(_)
146 | ErrorKind::NotImplemented(_) => 999,
147 }
148 }
149
150 fn stream_error_score(e: &StreamError) -> i32 {
151 use crate::error::ErrorKind;
152 match e.inner() {
153 ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
155
156 ErrorKind::BarrierSend { .. } => 1,
158
159 ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
161
162 ErrorKind::Uncategorized(e) => {
164 for cause in e.chain() {
165 if let Some(e) = cause.downcast_ref::<StreamError>() {
166 return stream_error_score(e);
167 } else if let Some(ee) = cause.downcast_ref::<StreamExecutorError>() {
168 return 2000 + stream_executor_error_score(ee);
169 }
170 }
171 1000
172 }
173
174 ErrorKind::Storage(_) | ErrorKind::Expression(_) | ErrorKind::Secret(_) => 1000,
176 }
177 }
178
179 let score = Score(stream_error_score(&self));
180 ScoredStreamError { error: self, score }
181 }
182}