risingwave_batch/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod fast_insert;
16mod managed;
17pub mod test_utils;
18
19use std::future::Future;
20use std::sync::Arc;
21
22use anyhow::Context;
23use async_recursion::async_recursion;
24pub use fast_insert::*;
25use futures::future::BoxFuture;
26use futures::stream::BoxStream;
27pub use managed::*;
28use risingwave_common::array::DataChunk;
29use risingwave_common::catalog::Schema;
30use risingwave_pb::batch_plan::PlanNode;
31use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
32use risingwave_pb::common::BatchQueryEpoch;
33use thiserror_ext::AsReport;
34
35use crate::error::Result;
36use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
37
38pub type BoxedExecutor = Box<dyn Executor>;
39pub type BoxedDataChunkStream = BoxStream<'static, Result<DataChunk>>;
40
41pub struct ExecutorInfo {
42    pub schema: Schema,
43    pub id: String,
44}
45
46/// Refactoring of `Executor` using `Stream`.
47pub trait Executor: Send + 'static {
48    /// Returns the schema of the executor's return data.
49    ///
50    /// Schema must be available before `init`.
51    fn schema(&self) -> &Schema;
52
53    /// Identity string of the executor
54    fn identity(&self) -> &str;
55
56    /// Executes to return the data chunk stream.
57    ///
58    /// The implementation should guaranteed that each `DataChunk`'s cardinality is not zero.
59    fn execute(self: Box<Self>) -> BoxedDataChunkStream;
60}
61
62impl std::fmt::Debug for BoxedExecutor {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.write_str(self.identity())
65    }
66}
67
68/// Every Executor should impl this trait to provide a static method to build a `BoxedExecutor`
69/// from proto and global environment.
70pub trait BoxedExecutorBuilder {
71    fn new_boxed_executor(
72        source: &ExecutorBuilder<'_>,
73        inputs: Vec<BoxedExecutor>,
74    ) -> impl Future<Output = Result<BoxedExecutor>> + Send;
75}
76
77pub struct ExecutorBuilder<'a> {
78    pub plan_node: &'a PlanNode,
79    pub task_id: &'a TaskId,
80    context: Arc<dyn BatchTaskContext>,
81    epoch: BatchQueryEpoch,
82    shutdown_rx: ShutdownToken,
83}
84
85impl<'a> ExecutorBuilder<'a> {
86    pub fn new(
87        plan_node: &'a PlanNode,
88        task_id: &'a TaskId,
89        context: Arc<dyn BatchTaskContext>,
90        epoch: BatchQueryEpoch,
91        shutdown_rx: ShutdownToken,
92    ) -> Self {
93        Self {
94            plan_node,
95            task_id,
96            context,
97            epoch,
98            shutdown_rx,
99        }
100    }
101
102    #[must_use]
103    pub fn clone_for_plan(&self, plan_node: &'a PlanNode) -> Self {
104        ExecutorBuilder::new(
105            plan_node,
106            self.task_id,
107            self.context.clone(),
108            self.epoch,
109            self.shutdown_rx.clone(),
110        )
111    }
112
113    pub fn plan_node(&self) -> &PlanNode {
114        self.plan_node
115    }
116
117    pub fn context(&self) -> &Arc<dyn BatchTaskContext> {
118        &self.context
119    }
120
121    pub fn epoch(&self) -> BatchQueryEpoch {
122        self.epoch
123    }
124
125    pub fn shutdown_rx(&self) -> &ShutdownToken {
126        &self.shutdown_rx
127    }
128}
129
130/// Descriptor for executor builder.
131///
132/// We will call `builder` to build the executor if the `node_body` matches.
133pub struct ExecutorBuilderDescriptor {
134    pub node_body: NodeBodyDiscriminants,
135
136    /// Typically from [`BoxedExecutorBuilder::new_boxed_executor`].
137    pub builder: for<'a> fn(
138        source: &'a ExecutorBuilder<'a>,
139        inputs: Vec<BoxedExecutor>,
140    ) -> BoxFuture<'a, Result<BoxedExecutor>>,
141}
142
143/// All registered executor builders.
144#[linkme::distributed_slice]
145pub static BUILDER_DESCS: [ExecutorBuilderDescriptor];
146
147/// Register an executor builder so that it can be used to build the executor from protobuf.
148#[macro_export]
149macro_rules! register_executor {
150    ($node_body:ident, $builder:ty) => {
151        const _: () = {
152            use futures::FutureExt;
153            use risingwave_batch::executor::{BUILDER_DESCS, ExecutorBuilderDescriptor};
154            use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
155
156            #[linkme::distributed_slice(BUILDER_DESCS)]
157            static BUILDER: ExecutorBuilderDescriptor = ExecutorBuilderDescriptor {
158                node_body: NodeBodyDiscriminants::$node_body,
159                builder: |a, b| <$builder>::new_boxed_executor(a, b).boxed(),
160            };
161        };
162    };
163}
164pub use register_executor;
165
166impl ExecutorBuilder<'_> {
167    pub async fn build(&self) -> Result<BoxedExecutor> {
168        self.try_build()
169            .await
170            .inspect_err(|e| {
171                let plan_node = self.plan_node.get_node_body();
172                error!(error = %e.as_report(), ?plan_node, "failed to build executor");
173            })
174            .context("failed to build executor")
175            .map_err(Into::into)
176    }
177
178    #[async_recursion]
179    async fn try_build(&self) -> Result<BoxedExecutor> {
180        let mut inputs = Vec::with_capacity(self.plan_node.children.len());
181        for input_node in &self.plan_node.children {
182            let input = self.clone_for_plan(input_node).build().await?;
183            inputs.push(input);
184        }
185
186        let node_body_discriminants: NodeBodyDiscriminants =
187            self.plan_node.get_node_body().unwrap().into();
188
189        let builder = BUILDER_DESCS
190            .iter()
191            .find(|x| x.node_body == node_body_discriminants)
192            .with_context(|| {
193                format!(
194                    "no executor builder found for {:?}",
195                    node_body_discriminants
196                )
197            })?
198            .builder;
199
200        let real_executor = builder(self, inputs).await?;
201
202        Ok(Box::new(ManagedExecutor::new(
203            real_executor,
204            self.shutdown_rx.clone(),
205        )) as BoxedExecutor)
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use risingwave_hummock_sdk::test_batch_query_epoch;
212    use risingwave_pb::batch_plan::PlanNode;
213
214    use crate::executor::ExecutorBuilder;
215    use crate::task::{ComputeNodeContext, ShutdownToken, TaskId};
216
217    #[tokio::test]
218    async fn test_clone_for_plan() {
219        let plan_node = PlanNode::default();
220        let task_id = &TaskId {
221            task_id: 1,
222            stage_id: 1,
223            query_id: "test_query_id".to_owned(),
224        };
225        let builder = ExecutorBuilder::new(
226            &plan_node,
227            task_id,
228            ComputeNodeContext::for_test(),
229            test_batch_query_epoch(),
230            ShutdownToken::empty(),
231        );
232        let child_plan = &PlanNode::default();
233        let cloned_builder = builder.clone_for_plan(child_plan);
234        assert_eq!(builder.task_id, cloned_builder.task_id);
235    }
236}