risingwave_batch/executor/
mod.rs1mod fast_insert;
16mod managed;
17pub mod test_utils;
18
19use std::future::Future;
20use std::sync::Arc;
21
22use anyhow::Context;
23use async_recursion::async_recursion;
24pub use fast_insert::*;
25use futures::future::BoxFuture;
26use futures::stream::BoxStream;
27pub use managed::*;
28use risingwave_common::array::DataChunk;
29use risingwave_common::catalog::Schema;
30use risingwave_pb::batch_plan::PlanNode;
31use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
32use thiserror_ext::AsReport;
33
34use crate::error::Result;
35use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
36
37pub type BoxedExecutor = Box<dyn Executor>;
38pub type BoxedDataChunkStream = BoxStream<'static, Result<DataChunk>>;
39
40pub struct ExecutorInfo {
41 pub schema: Schema,
42 pub id: String,
43}
44
45pub trait Executor: Send + 'static {
47 fn schema(&self) -> &Schema;
51
52 fn identity(&self) -> &str;
54
55 fn execute(self: Box<Self>) -> BoxedDataChunkStream;
59}
60
61impl std::fmt::Debug for BoxedExecutor {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.write_str(self.identity())
64 }
65}
66
67pub trait BoxedExecutorBuilder {
70 fn new_boxed_executor(
71 source: &ExecutorBuilder<'_>,
72 inputs: Vec<BoxedExecutor>,
73 ) -> impl Future<Output = Result<BoxedExecutor>> + Send;
74}
75
76pub struct ExecutorBuilder<'a> {
77 pub plan_node: &'a PlanNode,
78 pub task_id: &'a TaskId,
79 context: Arc<dyn BatchTaskContext>,
80 shutdown_rx: ShutdownToken,
81}
82
83impl<'a> ExecutorBuilder<'a> {
84 pub fn new(
85 plan_node: &'a PlanNode,
86 task_id: &'a TaskId,
87 context: Arc<dyn BatchTaskContext>,
88 shutdown_rx: ShutdownToken,
89 ) -> Self {
90 Self {
91 plan_node,
92 task_id,
93 context,
94 shutdown_rx,
95 }
96 }
97
98 #[must_use]
99 pub fn clone_for_plan(&self, plan_node: &'a PlanNode) -> Self {
100 ExecutorBuilder::new(
101 plan_node,
102 self.task_id,
103 self.context.clone(),
104 self.shutdown_rx.clone(),
105 )
106 }
107
108 pub fn plan_node(&self) -> &PlanNode {
109 self.plan_node
110 }
111
112 pub fn context(&self) -> &Arc<dyn BatchTaskContext> {
113 &self.context
114 }
115
116 pub fn shutdown_rx(&self) -> &ShutdownToken {
117 &self.shutdown_rx
118 }
119}
120
121pub struct ExecutorBuilderDescriptor {
125 pub node_body: NodeBodyDiscriminants,
126
127 pub builder: for<'a> fn(
129 source: &'a ExecutorBuilder<'a>,
130 inputs: Vec<BoxedExecutor>,
131 ) -> BoxFuture<'a, Result<BoxedExecutor>>,
132}
133
134#[linkme::distributed_slice]
136pub static BUILDER_DESCS: [ExecutorBuilderDescriptor];
137
138#[macro_export]
140macro_rules! register_executor {
141 ($node_body:ident, $builder:ty) => {
142 const _: () = {
143 use futures::FutureExt;
144 use risingwave_batch::executor::{BUILDER_DESCS, ExecutorBuilderDescriptor};
145 use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
146
147 #[linkme::distributed_slice(BUILDER_DESCS)]
148 static BUILDER: ExecutorBuilderDescriptor = ExecutorBuilderDescriptor {
149 node_body: NodeBodyDiscriminants::$node_body,
150 builder: |a, b| <$builder>::new_boxed_executor(a, b).boxed(),
151 };
152 };
153 };
154}
155pub use register_executor;
156
157impl ExecutorBuilder<'_> {
158 pub async fn build(&self) -> Result<BoxedExecutor> {
159 self.try_build()
160 .await
161 .inspect_err(|e| {
162 let plan_node = self.plan_node.get_node_body();
163 error!(error = %e.as_report(), ?plan_node, "failed to build executor");
164 })
165 .context("failed to build executor")
166 .map_err(Into::into)
167 }
168
169 #[async_recursion]
170 async fn try_build(&self) -> Result<BoxedExecutor> {
171 let mut inputs = Vec::with_capacity(self.plan_node.children.len());
172 for input_node in &self.plan_node.children {
173 let input = self.clone_for_plan(input_node).build().await?;
174 inputs.push(input);
175 }
176
177 let node_body_discriminants: NodeBodyDiscriminants =
178 self.plan_node.get_node_body().unwrap().into();
179
180 let builder = BUILDER_DESCS
181 .iter()
182 .find(|x| x.node_body == node_body_discriminants)
183 .with_context(|| {
184 format!(
185 "no executor builder found for {:?}",
186 node_body_discriminants
187 )
188 })?
189 .builder;
190
191 let real_executor = builder(self, inputs).await?;
192
193 Ok(Box::new(ManagedExecutor::new(
194 real_executor,
195 self.shutdown_rx.clone(),
196 )) as BoxedExecutor)
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use risingwave_pb::batch_plan::PlanNode;
203
204 use crate::executor::ExecutorBuilder;
205 use crate::task::{ComputeNodeContext, ShutdownToken, TaskId};
206
207 #[tokio::test]
208 async fn test_clone_for_plan() {
209 let plan_node = PlanNode::default();
210 let task_id = &TaskId {
211 task_id: 1,
212 stage_id: 1,
213 query_id: "test_query_id".to_owned(),
214 };
215 let builder = ExecutorBuilder::new(
216 &plan_node,
217 task_id,
218 ComputeNodeContext::for_test(),
219 ShutdownToken::empty(),
220 );
221 let child_plan = &PlanNode::default();
222 let cloned_builder = builder.clone_for_plan(child_plan);
223 assert_eq!(builder.task_id, cloned_builder.task_id);
224 }
225}