risingwave_stream/
error.rs1use risingwave_common::array::ArrayError;
16use risingwave_common::secret::SecretError;
17use risingwave_connector::error::ConnectorError;
18use risingwave_expr::ExprError;
19use risingwave_pb::PbFieldNotFound;
20use risingwave_rpc_client::error::ToTonicStatus;
21use risingwave_storage::error::StorageError;
22
23use crate::executor::exchange::error::ExchangeChannelClosed;
24use crate::executor::{Barrier, StreamExecutorError};
25use crate::task::ActorId;
26
27pub type StreamResult<T> = std::result::Result<T, StreamError>;
29
30#[derive(
32 thiserror::Error,
33 thiserror_ext::ReportDebug,
34 thiserror_ext::Arc,
35 thiserror_ext::ContextInto,
36 thiserror_ext::Construct,
37)]
38#[thiserror_ext(newtype(name = StreamError, backtrace))]
39pub enum ErrorKind {
40 #[error("Storage error: {0}")]
41 Storage(
42 #[backtrace]
43 #[from]
44 StorageError,
45 ),
46
47 #[error("Expression error: {0}")]
48 Expression(
49 #[from]
50 #[backtrace]
51 ExprError,
52 ),
53
54 #[error("Array/Chunk error: {0}")]
55 Array(
56 #[from]
57 #[backtrace]
58 ArrayError,
59 ),
60
61 #[error("Executor error: {0}")]
62 Executor(
63 #[from]
64 #[backtrace]
65 StreamExecutorError,
66 ),
67
68 #[error("Actor {actor_id} exited unexpectedly: {source}")]
69 UnexpectedExit {
70 actor_id: ActorId,
71 #[backtrace]
72 source: StreamError,
73 },
74
75 #[error("Failed to send barrier with epoch {epoch} to actor {actor_id}: {reason}", epoch = .barrier.epoch.curr)]
76 BarrierSend {
77 barrier: Barrier,
78 actor_id: ActorId,
79 reason: &'static str,
80 },
81
82 #[error("Secret error: {0}")]
83 Secret(
84 #[from]
85 #[backtrace]
86 SecretError,
87 ),
88
89 #[error(transparent)]
90 Uncategorized(
91 #[from]
92 #[backtrace]
93 anyhow::Error,
94 ),
95}
96
97impl From<PbFieldNotFound> for StreamError {
98 fn from(err: PbFieldNotFound) -> Self {
99 Self::from(anyhow::anyhow!(
100 "Failed to decode prost: field not found `{}`",
101 err.0
102 ))
103 }
104}
105
106impl From<ConnectorError> for StreamError {
107 fn from(err: ConnectorError) -> Self {
108 StreamExecutorError::from(err).into()
109 }
110}
111
112impl From<ExchangeChannelClosed> for StreamError {
113 fn from(err: ExchangeChannelClosed) -> Self {
114 StreamExecutorError::from(err).into()
115 }
116}
117
118impl From<StreamError> for tonic::Status {
119 fn from(error: StreamError) -> Self {
120 error.to_status(tonic::Code::Internal, "stream")
121 }
122}
123
124static_assertions::const_assert_eq!(std::mem::size_of::<StreamError>(), 8);