risingwave_stream/executor/
wrapper.rs1use crate::executor::prelude::*;
16
17mod epoch_check;
18mod epoch_provide;
19mod schema_check;
20mod stream_node_metrics;
21mod trace;
22mod update_check;
23
24pub struct WrapperExecutor {
26 input: Executor,
27
28 actor_ctx: ActorContextRef,
29
30 enable_executor_row_count: bool,
31
32 enable_explain_analyze_stats: bool,
33}
34
35impl WrapperExecutor {
36 pub fn new(
37 input: Executor,
38 actor_ctx: ActorContextRef,
39 enable_executor_row_count: bool,
40 enable_explain_analyze_stats: bool,
41 ) -> Self {
42 Self {
43 input,
44 actor_ctx,
45 enable_executor_row_count,
46 enable_explain_analyze_stats,
47 }
48 }
49
50 #[allow(clippy::let_and_return)]
51 fn wrap_debug(
52 info: Arc<ExecutorInfo>,
53 stream: impl MessageStream + 'static,
54 ) -> impl MessageStream + 'static {
55 let stream = update_check::update_check(info, stream);
57
58 stream
59 }
60
61 fn wrap(
62 enable_executor_row_count: bool,
63 enable_explain_analyze_stats: bool,
64 info: Arc<ExecutorInfo>,
65 actor_ctx: ActorContextRef,
66 stream: impl MessageStream + 'static,
67 ) -> BoxedMessageStream {
68 let stream = trace::instrument_await_tree(info.clone(), stream);
72
73 let stream = schema_check::schema_check(info.clone(), stream);
75 let stream = epoch_check::epoch_check(info.clone(), stream);
77
78 let stream = epoch_provide::epoch_provide(stream);
80
81 let stream = trace::trace(
83 enable_executor_row_count,
84 info.clone(),
85 actor_ctx.clone(),
86 stream,
87 );
88
89 let stream = stream_node_metrics::stream_node_metrics(
91 info.clone(),
92 enable_explain_analyze_stats,
93 stream,
94 actor_ctx.clone(),
95 );
96
97 if cfg!(debug_assertions) {
98 Self::wrap_debug(info, stream).boxed()
99 } else {
100 stream.boxed()
101 }
102 }
103}
104
105impl Execute for WrapperExecutor {
106 fn execute(self: Box<Self>) -> BoxedMessageStream {
107 let info = Arc::new(self.input.info().clone());
108 Self::wrap(
109 self.enable_executor_row_count,
110 self.enable_explain_analyze_stats,
111 info,
112 self.actor_ctx,
113 self.input.execute(),
114 )
115 .boxed()
116 }
117
118 fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
119 let info = Arc::new(self.input.info().clone());
120 Self::wrap(
121 self.enable_executor_row_count,
122 self.enable_explain_analyze_stats,
123 info,
124 self.actor_ctx,
125 self.input.execute_with_epoch(epoch),
126 )
127 .boxed()
128 }
129}