1use std::collections::HashMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::stream::BoxStream;
23use futures::{FutureExt, StreamExt};
24use futures_async_stream::try_stream;
25use itertools::Itertools;
26use pgwire::pg_server::BoxedError;
27use risingwave_batch::error::BatchError;
28use risingwave_batch::executor::ExecutorBuilder;
29use risingwave_batch::task::{ShutdownToken, TaskId};
30use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
31use risingwave_common::array::DataChunk;
32use risingwave_common::bail;
33use risingwave_common::hash::WorkerSlotMapping;
34use risingwave_common::util::iter_util::ZipEqFast;
35use risingwave_common::util::tracing::{InstrumentStream, TracingContext};
36use risingwave_connector::source::SplitMetaData;
37use risingwave_pb::batch_plan::exchange_info::DistributionMode;
38use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
39use risingwave_pb::batch_plan::plan_node::NodeBody;
40use risingwave_pb::batch_plan::{
41 ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PbPlanNode,
42 TaskOutputId,
43};
44use risingwave_pb::common::WorkerNode;
45use tokio::sync::mpsc;
46use tokio_stream::wrappers::ReceiverStream;
47use tracing::debug;
48
49use super::plan_fragmenter::{PartitionInfo, QueryStage};
50use crate::catalog::{FragmentId, TableId};
51use crate::error::RwError;
52use crate::optimizer::plan_node::BatchPlanNodeType;
53use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
54use crate::scheduler::task_context::FrontendBatchTaskContext;
55use crate::scheduler::{SchedulerError, SchedulerResult};
56use crate::session::{FrontendEnv, SessionImpl};
57
58pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;
60pub struct LocalQueryExecution {
61 query: Query,
62 front_env: FrontendEnv,
63 session: Arc<SessionImpl>,
64 worker_node_manager: WorkerNodeSelector,
65 timeout: Option<Duration>,
66}
67
68impl LocalQueryExecution {
69 pub fn new(
70 query: Query,
71 front_env: FrontendEnv,
72 support_barrier_read: bool,
73 session: Arc<SessionImpl>,
74 timeout: Option<Duration>,
75 ) -> Self {
76 let worker_node_manager =
77 WorkerNodeSelector::new(front_env.worker_node_manager_ref(), support_barrier_read);
78
79 Self {
80 query,
81 front_env,
82 session,
83 worker_node_manager,
84 timeout,
85 }
86 }
87
88 fn shutdown_rx(&self) -> ShutdownToken {
89 self.session.reset_cancel_query_flag()
90 }
91
92 #[try_stream(ok = DataChunk, error = RwError)]
93 pub async fn run_inner(self) {
94 debug!(
95 query_id = %self.query.query_id,
96 "Starting to run query"
97 );
98 let context = FrontendBatchTaskContext::create(self.session.clone());
99 let task_id = TaskId {
100 query_id: self.query.query_id.id.clone(),
101 stage_id: 0,
102 task_id: 0,
103 };
104
105 let plan_fragment = self.create_plan_fragment()?;
106 let plan_node = plan_fragment.root.unwrap();
107
108 let executor = ExecutorBuilder::new(&plan_node, &task_id, context, self.shutdown_rx());
109 let executor = executor.build().await?;
110 drop(plan_node);
113 drop(self);
114
115 #[for_await]
116 for chunk in executor.execute() {
117 yield chunk?;
118 }
119 }
120
121 fn run(self) -> BoxStream<'static, Result<DataChunk, RwError>> {
122 let span = tracing::info_span!("local_execute", query_id = self.query.query_id.id,);
123 Box::pin(self.run_inner().instrument(span))
124 }
125
126 pub fn stream_rows(self) -> LocalQueryStream {
127 let compute_runtime = self.front_env.compute_runtime();
128 let (sender, receiver) = mpsc::channel(10);
129 let shutdown_rx = self.shutdown_rx();
130
131 let catalog_reader = self.front_env.catalog_reader().clone();
132 let user_info_reader = self.front_env.user_info_reader().clone();
133 let auth_context = self.session.auth_context();
134 let db_name = self.session.database();
135 let search_path = self.session.config().search_path();
136 let time_zone = self.session.config().timezone();
137 let strict_mode = self.session.config().batch_expr_strict_mode();
138 let timeout = self.timeout;
139 let meta_client = self.front_env.meta_client_ref();
140
141 let sender1 = sender.clone();
142 let exec = async move {
143 let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError));
144 while let Some(mut r) = data_stream.next().await {
145 if r.is_err() && shutdown_rx.is_cancelled() {
147 r = Err(Box::new(SchedulerError::QueryCancelled(
148 "Cancelled by user".to_owned(),
149 )) as BoxedError);
150 }
151 if sender1.send(r).await.is_err() {
152 tracing::info!("Receiver closed.");
153 return;
154 }
155 }
156 };
157
158 use risingwave_expr::expr_context::*;
159
160 use crate::expr::function_impl::context::{
161 AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER,
162 };
163
164 let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await }.boxed();
166 let exec = async move { USER_INFO_READER::scope(user_info_reader, exec).await }.boxed();
167 let exec = async move { DB_NAME::scope(db_name, exec).await }.boxed();
168 let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
169 let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
170 let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
171 let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed();
172 let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed();
173
174 if let Some(timeout) = timeout {
175 let exec = async move {
176 if let Err(_e) = tokio::time::timeout(timeout, exec).await {
177 tracing::error!(
178 "Local query execution timeout after {} seconds",
179 timeout.as_secs()
180 );
181 if sender
182 .send(Err(Box::new(SchedulerError::QueryCancelled(format!(
183 "timeout after {} seconds",
184 timeout.as_secs(),
185 ))) as BoxedError))
186 .await
187 .is_err()
188 {
189 tracing::info!("Receiver closed.");
190 }
191 }
192 };
193 compute_runtime.spawn(exec);
194 } else {
195 compute_runtime.spawn(exec);
196 }
197
198 ReceiverStream::new(receiver)
199 }
200
201 fn create_plan_fragment(&self) -> SchedulerResult<PlanFragment> {
212 let next_executor_id = Arc::new(AtomicU32::new(0));
213 let root_stage_id = self.query.root_stage_id();
214 let root_stage = self.query.stage_graph.stages.get(&root_stage_id).unwrap();
215 assert_eq!(root_stage.parallelism.unwrap(), 1);
216 let second_stage_id = self.query.stage_graph.get_child_stages(&root_stage_id);
217 let plan_node_prost = match second_stage_id {
218 None => {
219 debug!("Local execution mode converts a plan with a single stage");
220 self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
221 }
222 Some(second_stage_ids) => {
223 debug!("Local execution mode converts a plan with two stages");
224 if second_stage_ids.is_empty() {
225 self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
228 } else {
229 let mut second_stages = HashMap::new();
230 for second_stage_id in second_stage_ids {
231 let second_stage =
232 self.query.stage_graph.stages.get(second_stage_id).unwrap();
233 second_stages.insert(*second_stage_id, second_stage);
234 }
235 let mut stage_id_to_plan = Some(second_stages);
236 let res = self.convert_plan_node(
237 &root_stage.root,
238 &mut stage_id_to_plan,
239 None,
240 next_executor_id,
241 )?;
242 assert!(
243 stage_id_to_plan.as_ref().unwrap().is_empty(),
244 "We expect that all the child stage plan fragments have been used"
245 );
246 res
247 }
248 }
249 };
250
251 Ok(PlanFragment {
252 root: Some(plan_node_prost),
253 exchange_info: None,
257 })
258 }
259
260 fn convert_plan_node<'a>(
261 &'a self,
262 execution_plan_node: &ExecutionPlanNode,
263 second_stages: &mut Option<HashMap<StageId, &'a QueryStage>>,
264 partition: Option<PartitionInfo>,
265 next_executor_id: Arc<AtomicU32>,
266 ) -> SchedulerResult<PbPlanNode> {
267 let identity = format!(
268 "{:?}-{}",
269 execution_plan_node.plan_node_type,
270 next_executor_id.fetch_add(1, Ordering::Relaxed)
271 );
272 match execution_plan_node.plan_node_type {
273 BatchPlanNodeType::BatchExchange => {
274 let exchange_source_stage_id = execution_plan_node
275 .source_stage_id
276 .expect("We expect stage id for Exchange Operator");
277 let Some(second_stages) = second_stages.as_mut() else {
278 bail!(
279 "Unexpected exchange detected. We are either converting a single stage plan or converting the second stage of the plan."
280 )
281 };
282 let second_stage = second_stages.remove(&exchange_source_stage_id).expect(
283 "We expect child stage fragment for Exchange Operator running in the frontend",
284 );
285 let mut node_body = execution_plan_node.node.clone();
286 let sources = match &mut node_body {
287 NodeBody::Exchange(exchange_node) => &mut exchange_node.sources,
288 NodeBody::MergeSortExchange(merge_sort_exchange_node) => {
289 &mut merge_sort_exchange_node
290 .exchange
291 .as_mut()
292 .expect("MergeSortExchangeNode must have a exchange node")
293 .sources
294 }
295 _ => unreachable!(),
296 };
297 assert!(sources.is_empty());
298
299 let tracing_context = TracingContext::from_current_span().to_protobuf();
300
301 if let Some(table_scan_info) = second_stage.table_scan_info.clone()
302 && let Some(vnode_bitmaps) = table_scan_info.partitions()
303 {
304 let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) =
308 vnode_bitmaps.clone().into_iter().unzip();
309 let workers = self
310 .worker_node_manager
311 .manager
312 .get_workers_by_worker_slot_ids(&worker_ids)?;
313 for (idx, (worker_node, partition)) in
314 (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate()
315 {
316 let second_stage_plan_node = self.convert_plan_node(
317 &second_stage.root,
318 &mut None,
319 Some(PartitionInfo::Table(partition)),
320 next_executor_id.clone(),
321 )?;
322 let second_stage_plan_fragment = PlanFragment {
323 root: Some(second_stage_plan_node),
324 exchange_info: Some(ExchangeInfo {
325 mode: DistributionMode::Single as i32,
326 ..Default::default()
327 }),
328 };
329 let local_execute_plan = LocalExecutePlan {
330 plan: Some(second_stage_plan_fragment),
331 tracing_context: tracing_context.clone(),
332 };
333 let exchange_source = ExchangeSource {
334 task_output_id: Some(TaskOutputId {
335 task_id: Some(PbTaskId {
336 task_id: idx as u64,
337 stage_id: exchange_source_stage_id.into(),
338 query_id: self.query.query_id.id.clone(),
339 }),
340 output_id: 0,
341 }),
342 host: Some(worker_node.host.as_ref().unwrap().clone()),
343 local_execute_plan: Some(Plan(local_execute_plan)),
344 };
345 sources.push(exchange_source);
346 }
347 } else if let Some(source_info) = &second_stage.source_info {
348 let chunk_size = (source_info.split_info().unwrap().len() as f32
351 / (self.worker_node_manager.schedule_unit_count()) as f32)
352 .ceil() as usize;
353 for (id, split) in source_info
354 .split_info()
355 .unwrap()
356 .chunks(chunk_size)
357 .enumerate()
358 {
359 let second_stage_plan_node = self.convert_plan_node(
360 &second_stage.root,
361 &mut None,
362 Some(PartitionInfo::Source(split.to_vec())),
363 next_executor_id.clone(),
364 )?;
365 let second_stage_plan_fragment = PlanFragment {
366 root: Some(second_stage_plan_node),
367 exchange_info: Some(ExchangeInfo {
368 mode: DistributionMode::Single as i32,
369 ..Default::default()
370 }),
371 };
372 let local_execute_plan = LocalExecutePlan {
373 plan: Some(second_stage_plan_fragment),
374 tracing_context: tracing_context.clone(),
375 };
376 let worker_node = self.worker_node_manager.next_random_worker()?;
378 let exchange_source = ExchangeSource {
379 task_output_id: Some(TaskOutputId {
380 task_id: Some(PbTaskId {
381 task_id: id as u64,
382 stage_id: exchange_source_stage_id.into(),
383 query_id: self.query.query_id.id.clone(),
384 }),
385 output_id: 0,
386 }),
387 host: Some(worker_node.host.as_ref().unwrap().clone()),
388 local_execute_plan: Some(Plan(local_execute_plan)),
389 };
390 sources.push(exchange_source);
391 }
392 } else if let Some(file_scan_info) = &second_stage.file_scan_info {
393 let chunk_size = (file_scan_info.file_location.len() as f32
394 / (self.worker_node_manager.schedule_unit_count()) as f32)
395 .ceil() as usize;
396 for (id, files) in file_scan_info.file_location.chunks(chunk_size).enumerate() {
397 let second_stage_plan_node = self.convert_plan_node(
398 &second_stage.root,
399 &mut None,
400 Some(PartitionInfo::File(files.to_vec())),
401 next_executor_id.clone(),
402 )?;
403 let second_stage_plan_fragment = PlanFragment {
404 root: Some(second_stage_plan_node),
405 exchange_info: Some(ExchangeInfo {
406 mode: DistributionMode::Single as i32,
407 ..Default::default()
408 }),
409 };
410 let local_execute_plan = LocalExecutePlan {
411 plan: Some(second_stage_plan_fragment),
412 tracing_context: tracing_context.clone(),
413 };
414 let worker_node = self.worker_node_manager.next_random_worker()?;
416 let exchange_source = ExchangeSource {
417 task_output_id: Some(TaskOutputId {
418 task_id: Some(PbTaskId {
419 task_id: id as u64,
420 stage_id: exchange_source_stage_id.into(),
421 query_id: self.query.query_id.id.clone(),
422 }),
423 output_id: 0,
424 }),
425 host: Some(worker_node.host.as_ref().unwrap().clone()),
426 local_execute_plan: Some(Plan(local_execute_plan)),
427 };
428 sources.push(exchange_source);
429 }
430 } else {
431 let second_stage_plan_node = self.convert_plan_node(
432 &second_stage.root,
433 &mut None,
434 None,
435 next_executor_id,
436 )?;
437 let second_stage_plan_fragment = PlanFragment {
438 root: Some(second_stage_plan_node),
439 exchange_info: Some(ExchangeInfo {
440 mode: DistributionMode::Single as i32,
441 ..Default::default()
442 }),
443 };
444
445 let local_execute_plan = LocalExecutePlan {
446 plan: Some(second_stage_plan_fragment),
447 tracing_context,
448 };
449
450 let workers = self.choose_worker(second_stage)?;
451 *sources = workers
452 .iter()
453 .enumerate()
454 .map(|(idx, worker_node)| ExchangeSource {
455 task_output_id: Some(TaskOutputId {
456 task_id: Some(PbTaskId {
457 task_id: idx as u64,
458 stage_id: exchange_source_stage_id.into(),
459 query_id: self.query.query_id.id.clone(),
460 }),
461 output_id: 0,
462 }),
463 host: Some(worker_node.host.as_ref().unwrap().clone()),
464 local_execute_plan: Some(Plan(local_execute_plan.clone())),
465 })
466 .collect();
467 }
468
469 Ok(PbPlanNode {
470 children: vec![],
473 identity,
474 node_body: Some(node_body),
475 })
476 }
477 BatchPlanNodeType::BatchSeqScan => {
478 let mut node_body = execution_plan_node.node.clone();
479 match &mut node_body {
480 NodeBody::RowSeqScan(scan_node) => {
481 if let Some(partition) = partition {
482 let partition = partition
483 .into_table()
484 .expect("PartitionInfo should be TablePartitionInfo here");
485 scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
486 scan_node.scan_ranges = partition.scan_ranges;
487 }
488 }
489 NodeBody::SysRowSeqScan(_) => {}
490 _ => unreachable!(),
491 }
492
493 Ok(PbPlanNode {
494 children: vec![],
495 identity,
496 node_body: Some(node_body),
497 })
498 }
499 BatchPlanNodeType::BatchLogSeqScan => {
500 let mut node_body = execution_plan_node.node.clone();
501 match &mut node_body {
502 NodeBody::LogRowSeqScan(scan_node) => {
503 if let Some(partition) = partition {
504 let partition = partition
505 .into_table()
506 .expect("PartitionInfo should be TablePartitionInfo here");
507 scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
508 }
509 }
510 _ => unreachable!(),
511 }
512
513 Ok(PbPlanNode {
514 children: vec![],
515 identity,
516 node_body: Some(node_body),
517 })
518 }
519 BatchPlanNodeType::BatchFileScan => {
520 let mut node_body = execution_plan_node.node.clone();
521 match &mut node_body {
522 NodeBody::FileScan(file_scan_node) => {
523 if let Some(partition) = partition {
524 let partition = partition
525 .into_file()
526 .expect("PartitionInfo should be FilePartitionInfo here");
527 file_scan_node.file_location = partition;
528 }
529 }
530 _ => unreachable!(),
531 }
532
533 Ok(PbPlanNode {
534 children: vec![],
535 identity,
536 node_body: Some(node_body),
537 })
538 }
539 BatchPlanNodeType::BatchSource | BatchPlanNodeType::BatchKafkaScan => {
540 let mut node_body = execution_plan_node.node.clone();
541 match &mut node_body {
542 NodeBody::Source(source_node) => {
543 if let Some(partition) = partition {
544 let partition = partition
545 .into_source()
546 .expect("PartitionInfo should be SourcePartitionInfo here");
547 source_node.split = partition
548 .into_iter()
549 .map(|split| split.encode_to_bytes().into())
550 .collect_vec();
551 }
552 }
553 _ => unreachable!(),
554 }
555
556 Ok(PbPlanNode {
557 children: vec![],
558 identity,
559 node_body: Some(node_body),
560 })
561 }
562 BatchPlanNodeType::BatchIcebergScan => {
563 let mut node_body = execution_plan_node.node.clone();
564 match &mut node_body {
565 NodeBody::IcebergScan(iceberg_scan_node) => {
566 if let Some(partition) = partition {
567 let partition = partition
568 .into_source()
569 .expect("PartitionInfo should be SourcePartitionInfo here");
570 iceberg_scan_node.split = partition
571 .into_iter()
572 .map(|split| split.encode_to_bytes().into())
573 .collect_vec();
574 }
575 }
576 _ => unreachable!(),
577 }
578
579 Ok(PbPlanNode {
580 children: vec![],
581 identity,
582 node_body: Some(node_body),
583 })
584 }
585 BatchPlanNodeType::BatchLookupJoin => {
586 let mut node_body = execution_plan_node.node.clone();
587 match &mut node_body {
588 NodeBody::LocalLookupJoin(node) => {
589 let side_table_desc = node
590 .inner_side_table_desc
591 .as_ref()
592 .expect("no side table desc");
593 let mapping = self.worker_node_manager.fragment_mapping(
594 self.get_fragment_id(&side_table_desc.table_id.into())?,
595 )?;
596
597 node.inner_side_vnode_mapping =
599 mapping.to_expanded().into_iter().map(u64::from).collect();
600 node.worker_nodes = self.worker_node_manager.manager.list_compute_nodes();
601 }
602 _ => unreachable!(),
603 }
604
605 let left_child = self.convert_plan_node(
606 &execution_plan_node.children[0],
607 second_stages,
608 partition,
609 next_executor_id,
610 )?;
611
612 Ok(PbPlanNode {
613 children: vec![left_child],
614 identity,
615 node_body: Some(node_body),
616 })
617 }
618 _ => {
619 let children = execution_plan_node
620 .children
621 .iter()
622 .map(|e| {
623 self.convert_plan_node(
624 e,
625 second_stages,
626 partition.clone(),
627 next_executor_id.clone(),
628 )
629 })
630 .collect::<SchedulerResult<Vec<PbPlanNode>>>()?;
631
632 Ok(PbPlanNode {
633 children,
634 identity,
635 node_body: Some(execution_plan_node.node.clone()),
636 })
637 }
638 }
639 }
640
641 #[inline(always)]
642 fn get_fragment_id(&self, table_id: &TableId) -> SchedulerResult<FragmentId> {
643 let reader = self.front_env.catalog_reader().read_guard();
644 reader
645 .get_any_table_by_id(table_id)
646 .map(|table| table.fragment_id)
647 .map_err(|e| SchedulerError::Internal(anyhow!(e)))
648 }
649
650 #[inline(always)]
651 fn get_table_dml_vnode_mapping(
652 &self,
653 table_id: &TableId,
654 ) -> SchedulerResult<WorkerSlotMapping> {
655 let guard = self.front_env.catalog_reader().read_guard();
656
657 let table = guard
658 .get_any_table_by_id(table_id)
659 .map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
660
661 let fragment_id = match table.dml_fragment_id.as_ref() {
662 Some(dml_fragment_id) => dml_fragment_id,
663 None => &table.fragment_id,
665 };
666
667 self.worker_node_manager
668 .manager
669 .get_streaming_fragment_mapping(fragment_id)
670 .map_err(|e| e.into())
671 }
672
673 fn choose_worker(&self, stage: &QueryStage) -> SchedulerResult<Vec<WorkerNode>> {
674 if let Some(table_id) = stage.dml_table_id.as_ref() {
675 let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?;
677 let worker_node = {
678 let worker_ids = vnode_mapping.iter_unique().collect_vec();
679 let candidates = self
680 .worker_node_manager
681 .manager
682 .get_workers_by_worker_slot_ids(&worker_ids)?;
683 if candidates.is_empty() {
684 return Err(BatchError::EmptyWorkerNodes.into());
685 }
686 candidates[stage.session_id.0 as usize % candidates.len()].clone()
687 };
688 Ok(vec![worker_node])
689 } else {
690 let mut workers = Vec::with_capacity(stage.parallelism.unwrap() as usize);
691 for _ in 0..stage.parallelism.unwrap() {
692 workers.push(self.worker_node_manager.next_random_worker()?);
693 }
694 Ok(workers)
695 }
696 }
697}