risingwave_stream/executor/
wrapper.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::executor::prelude::*;
16
17mod epoch_check;
18mod epoch_provide;
19mod schema_check;
20mod stream_node_metrics;
21mod trace;
22mod update_check;
23
24/// [`WrapperExecutor`] will do some sanity checks and logging for the wrapped executor.
25pub struct WrapperExecutor {
26    input: Executor,
27
28    actor_ctx: ActorContextRef,
29
30    enable_executor_row_count: bool,
31
32    enable_explain_analyze_stats: bool,
33}
34
35impl WrapperExecutor {
36    pub fn new(
37        input: Executor,
38        actor_ctx: ActorContextRef,
39        enable_executor_row_count: bool,
40        enable_explain_analyze_stats: bool,
41    ) -> Self {
42        Self {
43            input,
44            actor_ctx,
45            enable_executor_row_count,
46            enable_explain_analyze_stats: enable_explain_analyze_stats
47                && cfg!(all(not(test), not(madsim))),
48        }
49    }
50
51    #[allow(clippy::let_and_return)]
52    fn wrap_debug(
53        info: Arc<ExecutorInfo>,
54        stream: impl MessageStream + 'static,
55    ) -> impl MessageStream + 'static {
56        // Update check
57        let stream = update_check::update_check(info, stream);
58
59        stream
60    }
61
62    fn wrap(
63        enable_executor_row_count: bool,
64        enable_explain_analyze_stats: bool,
65        info: Arc<ExecutorInfo>,
66        actor_ctx: ActorContextRef,
67        stream: impl MessageStream + 'static,
68    ) -> BoxedMessageStream {
69        // -- Shared wrappers --
70
71        // Await tree
72        let stream = trace::instrument_await_tree(info.clone(), stream);
73
74        // Schema check
75        let stream = schema_check::schema_check(info.clone(), stream);
76        // Epoch check
77        let stream = epoch_check::epoch_check(info.clone(), stream);
78
79        // Epoch provide
80        let stream = epoch_provide::epoch_provide(stream);
81
82        // Trace
83        let stream = trace::trace(
84            enable_executor_row_count,
85            info.clone(),
86            actor_ctx.clone(),
87            stream,
88        );
89
90        // operator-level metrics
91        let stream = stream_node_metrics::stream_node_metrics(
92            info.clone(),
93            enable_explain_analyze_stats,
94            stream,
95            actor_ctx.clone(),
96        );
97
98        if cfg!(debug_assertions) {
99            Self::wrap_debug(info, stream).boxed()
100        } else {
101            stream.boxed()
102        }
103    }
104}
105
106impl Execute for WrapperExecutor {
107    fn execute(self: Box<Self>) -> BoxedMessageStream {
108        let info = Arc::new(self.input.info().clone());
109        Self::wrap(
110            self.enable_executor_row_count,
111            self.enable_explain_analyze_stats,
112            info,
113            self.actor_ctx,
114            self.input.execute(),
115        )
116        .boxed()
117    }
118
119    fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
120        let info = Arc::new(self.input.info().clone());
121        Self::wrap(
122            self.enable_executor_row_count,
123            self.enable_explain_analyze_stats,
124            info,
125            self.actor_ctx,
126            self.input.execute_with_epoch(epoch),
127        )
128        .boxed()
129    }
130}