risingwave_stream/executor/
wrapper.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::future::Either;
16
17use crate::executor::prelude::*;
18
19mod epoch_check;
20mod epoch_provide;
21mod schema_check;
22mod stream_node_metrics;
23mod trace;
24mod update_check;
25
26/// [`WrapperExecutor`] will do some sanity checks and logging for the wrapped executor.
27pub struct WrapperExecutor {
28    input: Executor,
29    actor_ctx: ActorContextRef,
30}
31
32impl WrapperExecutor {
33    pub fn new(input: Executor, actor_ctx: ActorContextRef) -> Self {
34        Self { input, actor_ctx }
35    }
36
37    #[allow(clippy::let_and_return)]
38    fn wrap_debug(
39        info: Arc<ExecutorInfo>,
40        stream: impl MessageStream + 'static,
41    ) -> impl MessageStream + 'static {
42        // Update check
43        let stream = update_check::update_check(info, stream);
44
45        stream
46    }
47
48    fn wrap(
49        info: Arc<ExecutorInfo>,
50        actor_ctx: ActorContextRef,
51        stream: impl MessageStream + 'static,
52    ) -> BoxedMessageStream {
53        // -- Shared wrappers --
54
55        // Schema check
56        let stream = schema_check::schema_check(info.clone(), stream);
57        // Epoch check
58        let stream = epoch_check::epoch_check(info.clone(), stream);
59
60        // Epoch provide
61        let stream = epoch_provide::epoch_provide(stream);
62
63        // Trace
64        let stream = trace::trace(info.clone(), actor_ctx.clone(), stream);
65
66        // operator-level metrics
67        let stream = stream_node_metrics::stream_node_metrics(info.clone(), stream, actor_ctx);
68
69        // -- Debug-only wrappers --
70        let stream = if cfg!(debug_assertions) {
71            Either::Left(Self::wrap_debug(info.clone(), stream))
72        } else {
73            Either::Right(stream)
74        };
75
76        // Await tree
77        // This should be the last wrapper, so that code in other wrappers are also instrumented with
78        // the span of the current executor.
79        trace::instrument_await_tree(info, stream).boxed()
80    }
81}
82
83impl Execute for WrapperExecutor {
84    fn execute(self: Box<Self>) -> BoxedMessageStream {
85        let info = Arc::new(self.input.info().clone());
86        Self::wrap(info, self.actor_ctx, self.input.execute()).boxed()
87    }
88
89    fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
90        let info = Arc::new(self.input.info().clone());
91        Self::wrap(info, self.actor_ctx, self.input.execute_with_epoch(epoch)).boxed()
92    }
93}