risingwave_stream/
error.rs1use risingwave_common::array::ArrayError;
16use risingwave_common::error::tonic::extra::{Score, ScoredError};
17use risingwave_common::secret::SecretError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_expr::ExprError;
20use risingwave_pb::PbFieldNotFound;
21use risingwave_rpc_client::error::ToTonicStatus;
22use risingwave_storage::error::StorageError;
23
24use crate::executor::exchange::error::ExchangeChannelClosed;
25use crate::executor::{Barrier, StreamExecutorError};
26use crate::task::ActorId;
27
28pub type StreamResult<T> = std::result::Result<T, StreamError>;
30
31#[derive(
33 thiserror::Error,
34 thiserror_ext::ReportDebug,
35 thiserror_ext::Arc,
36 thiserror_ext::ContextInto,
37 thiserror_ext::Construct,
38)]
39#[thiserror_ext(newtype(name = StreamError, backtrace))]
40pub enum ErrorKind {
41 #[error("Storage error: {0}")]
42 Storage(
43 #[backtrace]
44 #[from]
45 StorageError,
46 ),
47
48 #[error("Expression error: {0}")]
49 Expression(
50 #[from]
51 #[backtrace]
52 ExprError,
53 ),
54
55 #[error("Array/Chunk error: {0}")]
56 Array(
57 #[from]
58 #[backtrace]
59 ArrayError,
60 ),
61
62 #[error("Executor error: {0}")]
63 Executor(
64 #[from]
65 #[backtrace]
66 StreamExecutorError,
67 ),
68
69 #[error("Actor {actor_id} exited unexpectedly: {source}")]
70 UnexpectedExit {
71 actor_id: ActorId,
72 #[backtrace]
73 source: StreamError,
74 },
75
76 #[error("Failed to send barrier with epoch {epoch} to actor {actor_id}: {reason}", epoch = .barrier.epoch.curr)]
77 BarrierSend {
78 barrier: Barrier,
79 actor_id: ActorId,
80 reason: &'static str,
81 },
82
83 #[error("Secret error: {0}")]
84 Secret(
85 #[from]
86 #[backtrace]
87 SecretError,
88 ),
89
90 #[error(transparent)]
91 Uncategorized(
92 #[from]
93 #[backtrace]
94 anyhow::Error,
95 ),
96}
97
98impl From<PbFieldNotFound> for StreamError {
99 fn from(err: PbFieldNotFound) -> Self {
100 Self::from(anyhow::anyhow!(
101 "Failed to decode prost: field not found `{}`",
102 err.0
103 ))
104 }
105}
106
107impl From<ConnectorError> for StreamError {
108 fn from(err: ConnectorError) -> Self {
109 StreamExecutorError::from(err).into()
110 }
111}
112
113impl From<ExchangeChannelClosed> for StreamError {
114 fn from(err: ExchangeChannelClosed) -> Self {
115 StreamExecutorError::from(err).into()
116 }
117}
118
119impl From<StreamError> for tonic::Status {
120 fn from(error: StreamError) -> Self {
121 error.to_status(tonic::Code::Internal, "stream")
122 }
123}
124
125static_assertions::const_assert_eq!(std::mem::size_of::<StreamError>(), 8);
126
127pub type ScoredStreamError = ScoredError<StreamError>;
129
130impl StreamError {
131 pub fn with_score(self) -> ScoredStreamError {
133 fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
137 use crate::executor::error::ErrorKind;
138 match e.inner() {
139 ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
142
143 ErrorKind::Uncategorized(_)
145 | ErrorKind::Storage(_)
146 | ErrorKind::ArrayError(_)
147 | ErrorKind::ExprError(_)
148 | ErrorKind::SerdeError(_)
149 | ErrorKind::SinkError(_, _)
150 | ErrorKind::RpcError(_)
151 | ErrorKind::AlignBarrier(_, _)
152 | ErrorKind::ConnectorError(_)
153 | ErrorKind::DmlError(_)
154 | ErrorKind::NotImplemented(_) => 999,
155 }
156 }
157
158 fn stream_error_score(e: &StreamError) -> i32 {
159 use crate::error::ErrorKind;
160 match e.inner() {
161 ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
163
164 ErrorKind::BarrierSend { .. } => 1,
166
167 ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
169
170 ErrorKind::Uncategorized(_)
172 | ErrorKind::Storage(_)
173 | ErrorKind::Expression(_)
174 | ErrorKind::Array(_)
175 | ErrorKind::Secret(_) => 1000,
176 }
177 }
178
179 let score = Score(stream_error_score(&self));
180 ScoredStreamError { error: self, score }
181 }
182}