risingwave_batch/executor/
mod.rs1mod fast_insert;
16mod managed;
17pub mod test_utils;
18
19use std::future::Future;
20use std::sync::Arc;
21
22use anyhow::Context;
23use async_recursion::async_recursion;
24pub use fast_insert::*;
25use futures::future::BoxFuture;
26use futures::stream::BoxStream;
27pub use managed::*;
28use risingwave_common::array::DataChunk;
29use risingwave_common::catalog::Schema;
30use risingwave_pb::batch_plan::PlanNode;
31use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
32use thiserror_ext::AsReport;
33
34use crate::error::Result;
35use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
36
37pub type BoxedExecutor = Box<dyn Executor>;
38pub type BoxedDataChunkStream = BoxStream<'static, Result<DataChunk>>;
39
40pub struct ExecutorInfo {
41    pub schema: Schema,
42    pub id: String,
43}
44
45pub trait Executor: Send + 'static {
47    fn schema(&self) -> &Schema;
51
52    fn identity(&self) -> &str;
54
55    fn execute(self: Box<Self>) -> BoxedDataChunkStream;
59}
60
61impl std::fmt::Debug for BoxedExecutor {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.write_str(self.identity())
64    }
65}
66
67pub trait BoxedExecutorBuilder {
70    fn new_boxed_executor(
71        source: &ExecutorBuilder<'_>,
72        inputs: Vec<BoxedExecutor>,
73    ) -> impl Future<Output = Result<BoxedExecutor>> + Send;
74}
75
76pub struct ExecutorBuilder<'a> {
77    pub plan_node: &'a PlanNode,
78    pub task_id: &'a TaskId,
79    context: Arc<dyn BatchTaskContext>,
80    shutdown_rx: ShutdownToken,
81}
82
83impl<'a> ExecutorBuilder<'a> {
84    pub fn new(
85        plan_node: &'a PlanNode,
86        task_id: &'a TaskId,
87        context: Arc<dyn BatchTaskContext>,
88        shutdown_rx: ShutdownToken,
89    ) -> Self {
90        Self {
91            plan_node,
92            task_id,
93            context,
94            shutdown_rx,
95        }
96    }
97
98    #[must_use]
99    pub fn clone_for_plan(&self, plan_node: &'a PlanNode) -> Self {
100        ExecutorBuilder::new(
101            plan_node,
102            self.task_id,
103            self.context.clone(),
104            self.shutdown_rx.clone(),
105        )
106    }
107
108    pub fn plan_node(&self) -> &PlanNode {
109        self.plan_node
110    }
111
112    pub fn context(&self) -> &Arc<dyn BatchTaskContext> {
113        &self.context
114    }
115
116    pub fn shutdown_rx(&self) -> &ShutdownToken {
117        &self.shutdown_rx
118    }
119}
120
121pub struct ExecutorBuilderDescriptor {
125    pub node_body: NodeBodyDiscriminants,
126
127    pub builder: for<'a> fn(
129        source: &'a ExecutorBuilder<'a>,
130        inputs: Vec<BoxedExecutor>,
131    ) -> BoxFuture<'a, Result<BoxedExecutor>>,
132}
133
134#[linkme::distributed_slice]
136pub static BUILDER_DESCS: [ExecutorBuilderDescriptor];
137
138#[macro_export]
140macro_rules! register_executor {
141    ($node_body:ident, $builder:ty) => {
142        const _: () = {
143            use futures::FutureExt;
144            use risingwave_batch::executor::{BUILDER_DESCS, ExecutorBuilderDescriptor};
145            use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
146
147            #[linkme::distributed_slice(BUILDER_DESCS)]
148            static BUILDER: ExecutorBuilderDescriptor = ExecutorBuilderDescriptor {
149                node_body: NodeBodyDiscriminants::$node_body,
150                builder: |a, b| <$builder>::new_boxed_executor(a, b).boxed(),
151            };
152        };
153    };
154}
155pub use register_executor;
156
157impl ExecutorBuilder<'_> {
158    pub async fn build(&self) -> Result<BoxedExecutor> {
159        self.try_build()
160            .await
161            .inspect_err(|e| {
162                let plan_node = self.plan_node.get_node_body();
163                error!(error = %e.as_report(), ?plan_node, "failed to build executor");
164            })
165            .context("failed to build executor")
166            .map_err(Into::into)
167    }
168
169    #[async_recursion]
170    async fn try_build(&self) -> Result<BoxedExecutor> {
171        let mut inputs = Vec::with_capacity(self.plan_node.children.len());
172        for input_node in &self.plan_node.children {
173            let input = self.clone_for_plan(input_node).build().await?;
174            inputs.push(input);
175        }
176
177        let node_body_discriminants: NodeBodyDiscriminants =
178            self.plan_node.get_node_body().unwrap().into();
179
180        let builder = BUILDER_DESCS
181            .iter()
182            .find(|x| x.node_body == node_body_discriminants)
183            .with_context(|| {
184                format!(
185                    "no executor builder found for {:?}",
186                    node_body_discriminants
187                )
188            })?
189            .builder;
190
191        let real_executor = builder(self, inputs).await?;
192
193        Ok(Box::new(ManagedExecutor::new(
194            real_executor,
195            self.shutdown_rx.clone(),
196        )) as BoxedExecutor)
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use risingwave_pb::batch_plan::PlanNode;
203
204    use crate::executor::ExecutorBuilder;
205    use crate::task::{ComputeNodeContext, ShutdownToken, TaskId};
206
207    #[tokio::test]
208    async fn test_clone_for_plan() {
209        let plan_node = PlanNode::default();
210        let task_id = &TaskId {
211            task_id: 1,
212            stage_id: 1,
213            query_id: "test_query_id".to_owned(),
214        };
215        let builder = ExecutorBuilder::new(
216            &plan_node,
217            task_id,
218            ComputeNodeContext::for_test(),
219            ShutdownToken::empty(),
220        );
221        let child_plan = &PlanNode::default();
222        let cloned_builder = builder.clone_for_plan(child_plan);
223        assert_eq!(builder.task_id, cloned_builder.task_id);
224    }
225}