risingwave_stream/
error.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::ArrayError;
16use risingwave_common::secret::SecretError;
17use risingwave_connector::error::ConnectorError;
18use risingwave_expr::ExprError;
19use risingwave_pb::PbFieldNotFound;
20use risingwave_rpc_client::error::ToTonicStatus;
21use risingwave_storage::error::StorageError;
22
23use crate::executor::exchange::error::ExchangeChannelClosed;
24use crate::executor::{Barrier, StreamExecutorError};
25use crate::task::ActorId;
26
27/// A specialized Result type for streaming tasks.
28pub type StreamResult<T> = std::result::Result<T, StreamError>;
29
30/// The error type for streaming tasks.
31#[derive(
32    thiserror::Error,
33    thiserror_ext::ReportDebug,
34    thiserror_ext::Arc,
35    thiserror_ext::ContextInto,
36    thiserror_ext::Construct,
37)]
38#[thiserror_ext(newtype(name = StreamError, backtrace))]
39pub enum ErrorKind {
40    #[error("Storage error: {0}")]
41    Storage(
42        #[backtrace]
43        #[from]
44        StorageError,
45    ),
46
47    #[error("Expression error: {0}")]
48    Expression(
49        #[from]
50        #[backtrace]
51        ExprError,
52    ),
53
54    #[error("Array/Chunk error: {0}")]
55    Array(
56        #[from]
57        #[backtrace]
58        ArrayError,
59    ),
60
61    #[error("Executor error: {0}")]
62    Executor(
63        #[from]
64        #[backtrace]
65        StreamExecutorError,
66    ),
67
68    #[error("Actor {actor_id} exited unexpectedly: {source}")]
69    UnexpectedExit {
70        actor_id: ActorId,
71        #[backtrace]
72        source: StreamError,
73    },
74
75    #[error("Failed to send barrier with epoch {epoch} to actor {actor_id}: {reason}", epoch = .barrier.epoch.curr)]
76    BarrierSend {
77        barrier: Barrier,
78        actor_id: ActorId,
79        reason: &'static str,
80    },
81
82    #[error("Secret error: {0}")]
83    Secret(
84        #[from]
85        #[backtrace]
86        SecretError,
87    ),
88
89    #[error(transparent)]
90    Uncategorized(
91        #[from]
92        #[backtrace]
93        anyhow::Error,
94    ),
95}
96
97impl From<PbFieldNotFound> for StreamError {
98    fn from(err: PbFieldNotFound) -> Self {
99        Self::from(anyhow::anyhow!(
100            "Failed to decode prost: field not found `{}`",
101            err.0
102        ))
103    }
104}
105
106impl From<ConnectorError> for StreamError {
107    fn from(err: ConnectorError) -> Self {
108        StreamExecutorError::from(err).into()
109    }
110}
111
112impl From<ExchangeChannelClosed> for StreamError {
113    fn from(err: ExchangeChannelClosed) -> Self {
114        StreamExecutorError::from(err).into()
115    }
116}
117
118impl From<StreamError> for tonic::Status {
119    fn from(error: StreamError) -> Self {
120        error.to_status(tonic::Code::Internal, "stream")
121    }
122}
123
124static_assertions::const_assert_eq!(std::mem::size_of::<StreamError>(), 8);