risingwave_batch/executor/
mod.rs1mod fast_insert;
16mod managed;
17pub mod test_utils;
18
19use std::future::Future;
20use std::sync::Arc;
21
22use anyhow::Context;
23use async_recursion::async_recursion;
24pub use fast_insert::*;
25use futures::future::BoxFuture;
26use futures::stream::BoxStream;
27pub use managed::*;
28use risingwave_common::array::DataChunk;
29use risingwave_common::catalog::Schema;
30use risingwave_pb::batch_plan::PlanNode;
31use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
32use risingwave_pb::common::BatchQueryEpoch;
33use thiserror_ext::AsReport;
34
35use crate::error::Result;
36use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
37
38pub type BoxedExecutor = Box<dyn Executor>;
39pub type BoxedDataChunkStream = BoxStream<'static, Result<DataChunk>>;
40
41pub struct ExecutorInfo {
42 pub schema: Schema,
43 pub id: String,
44}
45
46pub trait Executor: Send + 'static {
48 fn schema(&self) -> &Schema;
52
53 fn identity(&self) -> &str;
55
56 fn execute(self: Box<Self>) -> BoxedDataChunkStream;
60}
61
62impl std::fmt::Debug for BoxedExecutor {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.write_str(self.identity())
65 }
66}
67
68pub trait BoxedExecutorBuilder {
71 fn new_boxed_executor(
72 source: &ExecutorBuilder<'_>,
73 inputs: Vec<BoxedExecutor>,
74 ) -> impl Future<Output = Result<BoxedExecutor>> + Send;
75}
76
77pub struct ExecutorBuilder<'a> {
78 pub plan_node: &'a PlanNode,
79 pub task_id: &'a TaskId,
80 context: Arc<dyn BatchTaskContext>,
81 epoch: BatchQueryEpoch,
82 shutdown_rx: ShutdownToken,
83}
84
85impl<'a> ExecutorBuilder<'a> {
86 pub fn new(
87 plan_node: &'a PlanNode,
88 task_id: &'a TaskId,
89 context: Arc<dyn BatchTaskContext>,
90 epoch: BatchQueryEpoch,
91 shutdown_rx: ShutdownToken,
92 ) -> Self {
93 Self {
94 plan_node,
95 task_id,
96 context,
97 epoch,
98 shutdown_rx,
99 }
100 }
101
102 #[must_use]
103 pub fn clone_for_plan(&self, plan_node: &'a PlanNode) -> Self {
104 ExecutorBuilder::new(
105 plan_node,
106 self.task_id,
107 self.context.clone(),
108 self.epoch,
109 self.shutdown_rx.clone(),
110 )
111 }
112
113 pub fn plan_node(&self) -> &PlanNode {
114 self.plan_node
115 }
116
117 pub fn context(&self) -> &Arc<dyn BatchTaskContext> {
118 &self.context
119 }
120
121 pub fn epoch(&self) -> BatchQueryEpoch {
122 self.epoch
123 }
124
125 pub fn shutdown_rx(&self) -> &ShutdownToken {
126 &self.shutdown_rx
127 }
128}
129
130pub struct ExecutorBuilderDescriptor {
134 pub node_body: NodeBodyDiscriminants,
135
136 pub builder: for<'a> fn(
138 source: &'a ExecutorBuilder<'a>,
139 inputs: Vec<BoxedExecutor>,
140 ) -> BoxFuture<'a, Result<BoxedExecutor>>,
141}
142
143#[linkme::distributed_slice]
145pub static BUILDER_DESCS: [ExecutorBuilderDescriptor];
146
147#[macro_export]
149macro_rules! register_executor {
150 ($node_body:ident, $builder:ty) => {
151 const _: () = {
152 use futures::FutureExt;
153 use risingwave_batch::executor::{BUILDER_DESCS, ExecutorBuilderDescriptor};
154 use risingwave_pb::batch_plan::plan_node::NodeBodyDiscriminants;
155
156 #[linkme::distributed_slice(BUILDER_DESCS)]
157 static BUILDER: ExecutorBuilderDescriptor = ExecutorBuilderDescriptor {
158 node_body: NodeBodyDiscriminants::$node_body,
159 builder: |a, b| <$builder>::new_boxed_executor(a, b).boxed(),
160 };
161 };
162 };
163}
164pub use register_executor;
165
166impl ExecutorBuilder<'_> {
167 pub async fn build(&self) -> Result<BoxedExecutor> {
168 self.try_build()
169 .await
170 .inspect_err(|e| {
171 let plan_node = self.plan_node.get_node_body();
172 error!(error = %e.as_report(), ?plan_node, "failed to build executor");
173 })
174 .context("failed to build executor")
175 .map_err(Into::into)
176 }
177
178 #[async_recursion]
179 async fn try_build(&self) -> Result<BoxedExecutor> {
180 let mut inputs = Vec::with_capacity(self.plan_node.children.len());
181 for input_node in &self.plan_node.children {
182 let input = self.clone_for_plan(input_node).build().await?;
183 inputs.push(input);
184 }
185
186 let node_body_discriminants: NodeBodyDiscriminants =
187 self.plan_node.get_node_body().unwrap().into();
188
189 let builder = BUILDER_DESCS
190 .iter()
191 .find(|x| x.node_body == node_body_discriminants)
192 .with_context(|| {
193 format!(
194 "no executor builder found for {:?}",
195 node_body_discriminants
196 )
197 })?
198 .builder;
199
200 let real_executor = builder(self, inputs).await?;
201
202 Ok(Box::new(ManagedExecutor::new(
203 real_executor,
204 self.shutdown_rx.clone(),
205 )) as BoxedExecutor)
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use risingwave_hummock_sdk::test_batch_query_epoch;
212 use risingwave_pb::batch_plan::PlanNode;
213
214 use crate::executor::ExecutorBuilder;
215 use crate::task::{ComputeNodeContext, ShutdownToken, TaskId};
216
217 #[tokio::test]
218 async fn test_clone_for_plan() {
219 let plan_node = PlanNode::default();
220 let task_id = &TaskId {
221 task_id: 1,
222 stage_id: 1,
223 query_id: "test_query_id".to_owned(),
224 };
225 let builder = ExecutorBuilder::new(
226 &plan_node,
227 task_id,
228 ComputeNodeContext::for_test(),
229 test_batch_query_epoch(),
230 ShutdownToken::empty(),
231 );
232 let child_plan = &PlanNode::default();
233 let cloned_builder = builder.clone_for_plan(child_plan);
234 assert_eq!(builder.task_id, cloned_builder.task_id);
235 }
236}