risingwave_stream/executor/
wrapper.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::executor::prelude::*;
16
17mod epoch_check;
18mod epoch_provide;
19mod schema_check;
20mod stream_node_metrics;
21mod trace;
22mod update_check;
23
24/// [`WrapperExecutor`] will do some sanity checks and logging for the wrapped executor.
25pub struct WrapperExecutor {
26    input: Executor,
27
28    actor_ctx: ActorContextRef,
29
30    enable_executor_row_count: bool,
31
32    enable_explain_analyze_stats: bool,
33}
34
35impl WrapperExecutor {
36    pub fn new(
37        input: Executor,
38        actor_ctx: ActorContextRef,
39        enable_executor_row_count: bool,
40        enable_explain_analyze_stats: bool,
41    ) -> Self {
42        Self {
43            input,
44            actor_ctx,
45            enable_executor_row_count,
46            enable_explain_analyze_stats,
47        }
48    }
49
50    #[allow(clippy::let_and_return)]
51    fn wrap_debug(
52        info: Arc<ExecutorInfo>,
53        stream: impl MessageStream + 'static,
54    ) -> impl MessageStream + 'static {
55        // Update check
56        let stream = update_check::update_check(info, stream);
57
58        stream
59    }
60
61    fn wrap(
62        enable_executor_row_count: bool,
63        enable_explain_analyze_stats: bool,
64        info: Arc<ExecutorInfo>,
65        actor_ctx: ActorContextRef,
66        stream: impl MessageStream + 'static,
67    ) -> BoxedMessageStream {
68        // -- Shared wrappers --
69
70        // Await tree
71        let stream = trace::instrument_await_tree(info.clone(), stream);
72
73        // Schema check
74        let stream = schema_check::schema_check(info.clone(), stream);
75        // Epoch check
76        let stream = epoch_check::epoch_check(info.clone(), stream);
77
78        // Epoch provide
79        let stream = epoch_provide::epoch_provide(stream);
80
81        // Trace
82        let stream = trace::trace(
83            enable_executor_row_count,
84            info.clone(),
85            actor_ctx.clone(),
86            stream,
87        );
88
89        // operator-level metrics
90        let stream = stream_node_metrics::stream_node_metrics(
91            info.clone(),
92            enable_explain_analyze_stats,
93            stream,
94            actor_ctx.clone(),
95        );
96
97        if cfg!(debug_assertions) {
98            Self::wrap_debug(info, stream).boxed()
99        } else {
100            stream.boxed()
101        }
102    }
103}
104
105impl Execute for WrapperExecutor {
106    fn execute(self: Box<Self>) -> BoxedMessageStream {
107        let info = Arc::new(self.input.info().clone());
108        Self::wrap(
109            self.enable_executor_row_count,
110            self.enable_explain_analyze_stats,
111            info,
112            self.actor_ctx,
113            self.input.execute(),
114        )
115        .boxed()
116    }
117
118    fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
119        let info = Arc::new(self.input.info().clone());
120        Self::wrap(
121            self.enable_executor_row_count,
122            self.enable_explain_analyze_stats,
123            info,
124            self.actor_ctx,
125            self.input.execute_with_epoch(epoch),
126        )
127        .boxed()
128    }
129}