risingwave_stream/executor/
wrapper.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::executor::prelude::*;

mod epoch_check;
mod epoch_provide;
mod schema_check;
mod trace;
mod update_check;

/// [`WrapperExecutor`] will do some sanity checks and logging for the wrapped executor.
pub struct WrapperExecutor {
    input: Executor,

    actor_ctx: ActorContextRef,

    enable_executor_row_count: bool,
}

impl WrapperExecutor {
    pub fn new(
        input: Executor,
        actor_ctx: ActorContextRef,
        enable_executor_row_count: bool,
    ) -> Self {
        Self {
            input,
            actor_ctx,
            enable_executor_row_count,
        }
    }

    #[allow(clippy::let_and_return)]
    fn wrap_debug(
        info: Arc<ExecutorInfo>,
        stream: impl MessageStream + 'static,
    ) -> impl MessageStream + 'static {
        // Update check
        let stream = update_check::update_check(info, stream);

        stream
    }

    fn wrap(
        enable_executor_row_count: bool,
        info: Arc<ExecutorInfo>,
        actor_ctx: ActorContextRef,
        stream: impl MessageStream + 'static,
    ) -> BoxedMessageStream {
        // -- Shared wrappers --

        // Await tree
        let stream = trace::instrument_await_tree(info.clone(), stream);

        // Schema check
        let stream = schema_check::schema_check(info.clone(), stream);
        // Epoch check
        let stream = epoch_check::epoch_check(info.clone(), stream);

        // Epoch provide
        let stream = epoch_provide::epoch_provide(stream);

        // Trace
        let stream = trace::trace(enable_executor_row_count, info.clone(), actor_ctx, stream);

        if cfg!(debug_assertions) {
            Self::wrap_debug(info, stream).boxed()
        } else {
            stream.boxed()
        }
    }
}

impl Execute for WrapperExecutor {
    fn execute(self: Box<Self>) -> BoxedMessageStream {
        let info = Arc::new(self.input.info().clone());
        Self::wrap(
            self.enable_executor_row_count,
            info,
            self.actor_ctx,
            self.input.execute(),
        )
        .boxed()
    }

    fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
        let info = Arc::new(self.input.info().clone());
        Self::wrap(
            self.enable_executor_row_count,
            info,
            self.actor_ctx,
            self.input.execute_with_epoch(epoch),
        )
        .boxed()
    }
}