risingwave_batch/executor/
managed.rs1use std::pin::pin;
16
17use futures::future::{Either, select};
18use futures::stream::StreamExt;
19use futures_async_stream::try_stream;
20use risingwave_common::array::DataChunk;
21use risingwave_common::catalog::Schema;
22use tracing::Instrument;
23
24use crate::error::BatchError;
25use crate::executor::{BoxedExecutor, Executor};
26use crate::task::{ShutdownMsg, ShutdownToken};
27
28pub struct ManagedExecutor {
32 child: BoxedExecutor,
33 shutdown_rx: ShutdownToken,
34}
35
36impl ManagedExecutor {
37 pub fn new(child: BoxedExecutor, shutdown_rx: ShutdownToken) -> Self {
38 Self { child, shutdown_rx }
39 }
40}
41
42impl Executor for ManagedExecutor {
43 fn schema(&self) -> &Schema {
44 self.child.schema()
45 }
46
47 fn identity(&self) -> &str {
48 self.child.identity()
49 }
50
51 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
52 async fn execute(mut self: Box<Self>) {
53 let input_desc = self.child.identity().to_owned();
54 let span = tracing::info_span!("batch_executor", "otel.name" = input_desc);
55
56 let mut child_stream = self.child.execute();
57
58 loop {
59 let shutdown = pin!(self.shutdown_rx.cancelled());
60
61 match select(shutdown, child_stream.next().instrument(span.clone())).await {
62 Either::Left(_) => break,
63 Either::Right((res, _)) => {
64 if let Some(chunk) = res {
65 yield chunk?;
66 } else {
67 return Ok(());
68 }
69 }
70 }
71 }
72
73 match self.shutdown_rx.message() {
74 ShutdownMsg::Abort(reason) => {
75 return Err(BatchError::aborted(reason));
76 }
77 ShutdownMsg::Cancel => {
78 return Err(BatchError::aborted("cancelled"));
79 }
80 ShutdownMsg::Init => {}
81 }
82 }
83}