risingwave_stream/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::error::tonic::extra::{Score, ScoredError};
16use risingwave_common::secret::SecretError;
17use risingwave_connector::error::ConnectorError;
18use risingwave_expr::ExprError;
19use risingwave_pb::PbFieldNotFound;
20use risingwave_rpc_client::error::ToTonicStatus;
21use risingwave_storage::error::StorageError;
22
23use crate::executor::exchange::error::ExchangeChannelClosed;
24use crate::executor::{Barrier, StreamExecutorError};
25use crate::task::ActorId;
26
27/// A specialized Result type for streaming tasks.
28pub type StreamResult<T> = std::result::Result<T, StreamError>;
29
30/// The error type for streaming tasks.
31#[derive(
32    thiserror::Error,
33    thiserror_ext::ReportDebug,
34    thiserror_ext::Arc,
35    thiserror_ext::ContextInto,
36    thiserror_ext::Construct,
37)]
38#[thiserror_ext(newtype(name = StreamError, backtrace))]
39pub enum ErrorKind {
40    #[error("Storage error: {0}")]
41    Storage(
42        #[backtrace]
43        #[from]
44        StorageError,
45    ),
46
47    #[error("Expression error: {0}")]
48    Expression(
49        #[from]
50        #[backtrace]
51        ExprError,
52    ),
53
54    #[error("Executor error: {0}")]
55    Executor(
56        #[from]
57        #[backtrace]
58        StreamExecutorError,
59    ),
60
61    #[error("Actor {actor_id} exited unexpectedly: {source}")]
62    UnexpectedExit {
63        actor_id: ActorId,
64        #[backtrace]
65        source: StreamError,
66    },
67
68    #[error("Failed to send barrier with epoch {epoch} to actor {actor_id}: {reason}", epoch = .barrier.epoch.curr)]
69    BarrierSend {
70        barrier: Barrier,
71        actor_id: ActorId,
72        reason: &'static str,
73    },
74
75    #[error("Secret error: {0}")]
76    Secret(
77        #[from]
78        #[backtrace]
79        SecretError,
80    ),
81
82    #[error(transparent)]
83    Uncategorized(
84        #[from]
85        #[backtrace]
86        anyhow::Error,
87    ),
88}
89
90impl From<PbFieldNotFound> for StreamError {
91    fn from(err: PbFieldNotFound) -> Self {
92        Self::from(anyhow::anyhow!(
93            "Failed to decode prost: field not found `{}`",
94            err.0
95        ))
96    }
97}
98
99impl From<ConnectorError> for StreamError {
100    fn from(err: ConnectorError) -> Self {
101        StreamExecutorError::from(err).into()
102    }
103}
104
105impl From<ExchangeChannelClosed> for StreamError {
106    fn from(err: ExchangeChannelClosed) -> Self {
107        StreamExecutorError::from(err).into()
108    }
109}
110
111impl From<StreamError> for tonic::Status {
112    fn from(error: StreamError) -> Self {
113        error.to_status(tonic::Code::Internal, "stream")
114    }
115}
116
117static_assertions::const_assert_eq!(std::mem::size_of::<StreamError>(), 8);
118
119/// A [`StreamError`] with a score, used to find the root cause of actor failures.
120pub type ScoredStreamError = ScoredError<StreamError>;
121
122impl StreamError {
123    /// Score the given error based on hard-coded rules.
124    pub fn with_score(self) -> ScoredStreamError {
125        // Explicitly list all error kinds here to notice developers to update this function when
126        // there are changes in error kinds.
127
128        fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
129            use crate::executor::error::ErrorKind;
130            match e.inner() {
131                // `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
132                // and not the root cause.
133                ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
134
135                // Normal errors.
136                ErrorKind::Uncategorized(_)
137                | ErrorKind::Storage(_)
138                | ErrorKind::ArrayError(_)
139                | ErrorKind::ExprError(_)
140                | ErrorKind::SerdeError(_)
141                | ErrorKind::SinkError(_, _)
142                | ErrorKind::RpcError(_)
143                | ErrorKind::AlignBarrier(_, _)
144                | ErrorKind::ConnectorError(_)
145                | ErrorKind::DmlError(_)
146                | ErrorKind::NotImplemented(_) => 999,
147            }
148        }
149
150        fn stream_error_score(e: &StreamError) -> i32 {
151            use crate::error::ErrorKind;
152            match e.inner() {
153                // `UnexpectedExit` wraps the original error. Score on the inner error.
154                ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
155
156                // `BarrierSend` is likely to be caused by actor exit and not the root cause.
157                ErrorKind::BarrierSend { .. } => 1,
158
159                // Executor errors first.
160                ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
161
162                // Uncategorized: try to downcast to known types and reuse their scores. Fallback to 1000.
163                ErrorKind::Uncategorized(e) => {
164                    for cause in e.chain() {
165                        if let Some(e) = cause.downcast_ref::<StreamError>() {
166                            return stream_error_score(e);
167                        } else if let Some(ee) = cause.downcast_ref::<StreamExecutorError>() {
168                            return 2000 + stream_executor_error_score(ee);
169                        }
170                    }
171                    1000
172                }
173
174                // Then other errors.
175                ErrorKind::Storage(_) | ErrorKind::Expression(_) | ErrorKind::Secret(_) => 1000,
176            }
177        }
178
179        let score = Score(stream_error_score(&self));
180        ScoredStreamError { error: self, score }
181    }
182}