risingwave_stream/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::ArrayError;
16use risingwave_common::error::tonic::extra::{Score, ScoredError};
17use risingwave_common::secret::SecretError;
18use risingwave_connector::error::ConnectorError;
19use risingwave_expr::ExprError;
20use risingwave_pb::PbFieldNotFound;
21use risingwave_rpc_client::error::ToTonicStatus;
22use risingwave_storage::error::StorageError;
23
24use crate::executor::exchange::error::ExchangeChannelClosed;
25use crate::executor::{Barrier, StreamExecutorError};
26use crate::task::ActorId;
27
28/// A specialized Result type for streaming tasks.
29pub type StreamResult<T> = std::result::Result<T, StreamError>;
30
31/// The error type for streaming tasks.
32#[derive(
33    thiserror::Error,
34    thiserror_ext::ReportDebug,
35    thiserror_ext::Arc,
36    thiserror_ext::ContextInto,
37    thiserror_ext::Construct,
38)]
39#[thiserror_ext(newtype(name = StreamError, backtrace))]
40pub enum ErrorKind {
41    #[error("Storage error: {0}")]
42    Storage(
43        #[backtrace]
44        #[from]
45        StorageError,
46    ),
47
48    #[error("Expression error: {0}")]
49    Expression(
50        #[from]
51        #[backtrace]
52        ExprError,
53    ),
54
55    #[error("Array/Chunk error: {0}")]
56    Array(
57        #[from]
58        #[backtrace]
59        ArrayError,
60    ),
61
62    #[error("Executor error: {0}")]
63    Executor(
64        #[from]
65        #[backtrace]
66        StreamExecutorError,
67    ),
68
69    #[error("Actor {actor_id} exited unexpectedly: {source}")]
70    UnexpectedExit {
71        actor_id: ActorId,
72        #[backtrace]
73        source: StreamError,
74    },
75
76    #[error("Failed to send barrier with epoch {epoch} to actor {actor_id}: {reason}", epoch = .barrier.epoch.curr)]
77    BarrierSend {
78        barrier: Barrier,
79        actor_id: ActorId,
80        reason: &'static str,
81    },
82
83    #[error("Secret error: {0}")]
84    Secret(
85        #[from]
86        #[backtrace]
87        SecretError,
88    ),
89
90    #[error(transparent)]
91    Uncategorized(
92        #[from]
93        #[backtrace]
94        anyhow::Error,
95    ),
96}
97
98impl From<PbFieldNotFound> for StreamError {
99    fn from(err: PbFieldNotFound) -> Self {
100        Self::from(anyhow::anyhow!(
101            "Failed to decode prost: field not found `{}`",
102            err.0
103        ))
104    }
105}
106
107impl From<ConnectorError> for StreamError {
108    fn from(err: ConnectorError) -> Self {
109        StreamExecutorError::from(err).into()
110    }
111}
112
113impl From<ExchangeChannelClosed> for StreamError {
114    fn from(err: ExchangeChannelClosed) -> Self {
115        StreamExecutorError::from(err).into()
116    }
117}
118
119impl From<StreamError> for tonic::Status {
120    fn from(error: StreamError) -> Self {
121        error.to_status(tonic::Code::Internal, "stream")
122    }
123}
124
125static_assertions::const_assert_eq!(std::mem::size_of::<StreamError>(), 8);
126
127/// A [`StreamError`] with a score, used to find the root cause of actor failures.
128pub type ScoredStreamError = ScoredError<StreamError>;
129
130impl StreamError {
131    /// Score the given error based on hard-coded rules.
132    pub fn with_score(self) -> ScoredStreamError {
133        // Explicitly list all error kinds here to notice developers to update this function when
134        // there are changes in error kinds.
135
136        fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
137            use crate::executor::error::ErrorKind;
138            match e.inner() {
139                // `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
140                // and not the root cause.
141                ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
142
143                // Normal errors.
144                ErrorKind::Uncategorized(_)
145                | ErrorKind::Storage(_)
146                | ErrorKind::ArrayError(_)
147                | ErrorKind::ExprError(_)
148                | ErrorKind::SerdeError(_)
149                | ErrorKind::SinkError(_, _)
150                | ErrorKind::RpcError(_)
151                | ErrorKind::AlignBarrier(_, _)
152                | ErrorKind::ConnectorError(_)
153                | ErrorKind::DmlError(_)
154                | ErrorKind::NotImplemented(_) => 999,
155            }
156        }
157
158        fn stream_error_score(e: &StreamError) -> i32 {
159            use crate::error::ErrorKind;
160            match e.inner() {
161                // `UnexpectedExit` wraps the original error. Score on the inner error.
162                ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
163
164                // `BarrierSend` is likely to be caused by actor exit and not the root cause.
165                ErrorKind::BarrierSend { .. } => 1,
166
167                // Executor errors first.
168                ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
169
170                // Then other errors.
171                ErrorKind::Uncategorized(_)
172                | ErrorKind::Storage(_)
173                | ErrorKind::Expression(_)
174                | ErrorKind::Array(_)
175                | ErrorKind::Secret(_) => 1000,
176            }
177        }
178
179        let score = Score(stream_error_score(&self));
180        ScoredStreamError { error: self, score }
181    }
182}