risingwave_stream/executor/
wrapper.rsuse crate::executor::prelude::*;
mod epoch_check;
mod epoch_provide;
mod schema_check;
mod trace;
mod update_check;
pub struct WrapperExecutor {
input: Executor,
actor_ctx: ActorContextRef,
enable_executor_row_count: bool,
}
impl WrapperExecutor {
pub fn new(
input: Executor,
actor_ctx: ActorContextRef,
enable_executor_row_count: bool,
) -> Self {
Self {
input,
actor_ctx,
enable_executor_row_count,
}
}
#[allow(clippy::let_and_return)]
fn wrap_debug(
info: Arc<ExecutorInfo>,
stream: impl MessageStream + 'static,
) -> impl MessageStream + 'static {
let stream = update_check::update_check(info, stream);
stream
}
fn wrap(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
actor_ctx: ActorContextRef,
stream: impl MessageStream + 'static,
) -> BoxedMessageStream {
let stream = trace::instrument_await_tree(info.clone(), stream);
let stream = schema_check::schema_check(info.clone(), stream);
let stream = epoch_check::epoch_check(info.clone(), stream);
let stream = epoch_provide::epoch_provide(stream);
let stream = trace::trace(enable_executor_row_count, info.clone(), actor_ctx, stream);
if cfg!(debug_assertions) {
Self::wrap_debug(info, stream).boxed()
} else {
stream.boxed()
}
}
}
impl Execute for WrapperExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
let info = Arc::new(self.input.info().clone());
Self::wrap(
self.enable_executor_row_count,
info,
self.actor_ctx,
self.input.execute(),
)
.boxed()
}
fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
let info = Arc::new(self.input.info().clone());
Self::wrap(
self.enable_executor_row_count,
info,
self.actor_ctx,
self.input.execute_with_epoch(epoch),
)
.boxed()
}
}