risingwave_stream/executor/
wrapper.rs1use futures::future::Either;
16
17use crate::executor::prelude::*;
18
19mod epoch_check;
20mod epoch_provide;
21mod schema_check;
22mod stream_node_metrics;
23mod trace;
24mod update_check;
25
26pub struct WrapperExecutor {
28 input: Executor,
29 actor_ctx: ActorContextRef,
30}
31
32impl WrapperExecutor {
33 pub fn new(input: Executor, actor_ctx: ActorContextRef) -> Self {
34 Self { input, actor_ctx }
35 }
36
37 #[allow(clippy::let_and_return)]
38 fn wrap_debug(
39 info: Arc<ExecutorInfo>,
40 stream: impl MessageStream + 'static,
41 ) -> impl MessageStream + 'static {
42 let stream = update_check::update_check(info, stream);
44
45 stream
46 }
47
48 fn wrap(
49 info: Arc<ExecutorInfo>,
50 actor_ctx: ActorContextRef,
51 stream: impl MessageStream + 'static,
52 ) -> BoxedMessageStream {
53 let stream = schema_check::schema_check(info.clone(), stream);
57 let stream = epoch_check::epoch_check(info.clone(), stream);
59
60 let stream = epoch_provide::epoch_provide(stream);
62
63 let stream = trace::trace(info.clone(), actor_ctx.clone(), stream);
65
66 let stream = stream_node_metrics::stream_node_metrics(info.clone(), stream, actor_ctx);
68
69 let stream = if cfg!(debug_assertions) {
71 Either::Left(Self::wrap_debug(info.clone(), stream))
72 } else {
73 Either::Right(stream)
74 };
75
76 trace::instrument_await_tree(info, stream).boxed()
80 }
81}
82
83impl Execute for WrapperExecutor {
84 fn execute(self: Box<Self>) -> BoxedMessageStream {
85 let info = Arc::new(self.input.info().clone());
86 Self::wrap(info, self.actor_ctx, self.input.execute()).boxed()
87 }
88
89 fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
90 let info = Arc::new(self.input.info().clone());
91 Self::wrap(info, self.actor_ctx, self.input.execute_with_epoch(epoch)).boxed()
92 }
93}