risingwave_stream/executor/
wrapper.rs1use crate::executor::prelude::*;
16
17mod epoch_check;
18mod epoch_provide;
19mod schema_check;
20mod stream_node_metrics;
21mod trace;
22mod update_check;
23
24pub struct WrapperExecutor {
26 input: Executor,
27
28 actor_ctx: ActorContextRef,
29
30 enable_executor_row_count: bool,
31
32 enable_explain_analyze_stats: bool,
33}
34
35impl WrapperExecutor {
36 pub fn new(
37 input: Executor,
38 actor_ctx: ActorContextRef,
39 enable_executor_row_count: bool,
40 enable_explain_analyze_stats: bool,
41 ) -> Self {
42 Self {
43 input,
44 actor_ctx,
45 enable_executor_row_count,
46 enable_explain_analyze_stats: enable_explain_analyze_stats
47 && cfg!(all(not(test), not(madsim))),
48 }
49 }
50
51 #[allow(clippy::let_and_return)]
52 fn wrap_debug(
53 info: Arc<ExecutorInfo>,
54 stream: impl MessageStream + 'static,
55 ) -> impl MessageStream + 'static {
56 let stream = update_check::update_check(info, stream);
58
59 stream
60 }
61
62 fn wrap(
63 enable_executor_row_count: bool,
64 enable_explain_analyze_stats: bool,
65 info: Arc<ExecutorInfo>,
66 actor_ctx: ActorContextRef,
67 stream: impl MessageStream + 'static,
68 ) -> BoxedMessageStream {
69 let stream = trace::instrument_await_tree(info.clone(), stream);
73
74 let stream = schema_check::schema_check(info.clone(), stream);
76 let stream = epoch_check::epoch_check(info.clone(), stream);
78
79 let stream = epoch_provide::epoch_provide(stream);
81
82 let stream = trace::trace(
84 enable_executor_row_count,
85 info.clone(),
86 actor_ctx.clone(),
87 stream,
88 );
89
90 let stream = stream_node_metrics::stream_node_metrics(
92 info.clone(),
93 enable_explain_analyze_stats,
94 stream,
95 actor_ctx.clone(),
96 );
97
98 if cfg!(debug_assertions) {
99 Self::wrap_debug(info, stream).boxed()
100 } else {
101 stream.boxed()
102 }
103 }
104}
105
106impl Execute for WrapperExecutor {
107 fn execute(self: Box<Self>) -> BoxedMessageStream {
108 let info = Arc::new(self.input.info().clone());
109 Self::wrap(
110 self.enable_executor_row_count,
111 self.enable_explain_analyze_stats,
112 info,
113 self.actor_ctx,
114 self.input.execute(),
115 )
116 .boxed()
117 }
118
119 fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
120 let info = Arc::new(self.input.info().clone());
121 Self::wrap(
122 self.enable_executor_row_count,
123 self.enable_explain_analyze_stats,
124 info,
125 self.actor_ctx,
126 self.input.execute_with_epoch(epoch),
127 )
128 .boxed()
129 }
130}