1use std::collections::HashMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::stream::BoxStream;
23use futures::{FutureExt, StreamExt};
24use futures_async_stream::try_stream;
25use itertools::Itertools;
26use pgwire::pg_server::BoxedError;
27use risingwave_batch::error::BatchError;
28use risingwave_batch::executor::ExecutorBuilder;
29use risingwave_batch::task::{ShutdownToken, TaskId};
30use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
31use risingwave_common::array::DataChunk;
32use risingwave_common::bail;
33use risingwave_common::hash::WorkerSlotMapping;
34use risingwave_common::util::iter_util::ZipEqFast;
35use risingwave_common::util::tracing::{InstrumentStream, TracingContext};
36use risingwave_connector::source::SplitMetaData;
37use risingwave_pb::batch_plan::exchange_info::DistributionMode;
38use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
39use risingwave_pb::batch_plan::plan_node::NodeBody;
40use risingwave_pb::batch_plan::{
41 ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PbPlanNode,
42 TaskOutputId,
43};
44use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
45use tokio::sync::mpsc;
46use tokio_stream::wrappers::ReceiverStream;
47use tracing::debug;
48
49use super::plan_fragmenter::{PartitionInfo, QueryStage, QueryStageRef};
50use crate::catalog::{FragmentId, TableId};
51use crate::error::RwError;
52use crate::optimizer::plan_node::PlanNodeType;
53use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
54use crate::scheduler::task_context::FrontendBatchTaskContext;
55use crate::scheduler::{SchedulerError, SchedulerResult};
56use crate::session::{FrontendEnv, SessionImpl};
57
58pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;
60pub struct LocalQueryExecution {
61 query: Query,
62 front_env: FrontendEnv,
63 batch_query_epoch: BatchQueryEpoch,
64 session: Arc<SessionImpl>,
65 worker_node_manager: WorkerNodeSelector,
66 timeout: Option<Duration>,
67}
68
69impl LocalQueryExecution {
70 pub fn new(
71 query: Query,
72 front_env: FrontendEnv,
73 support_barrier_read: bool,
74 batch_query_epoch: BatchQueryEpoch,
75 session: Arc<SessionImpl>,
76 timeout: Option<Duration>,
77 ) -> Self {
78 let worker_node_manager =
79 WorkerNodeSelector::new(front_env.worker_node_manager_ref(), support_barrier_read);
80
81 Self {
82 query,
83 front_env,
84 batch_query_epoch,
85 session,
86 worker_node_manager,
87 timeout,
88 }
89 }
90
91 fn shutdown_rx(&self) -> ShutdownToken {
92 self.session.reset_cancel_query_flag()
93 }
94
95 #[try_stream(ok = DataChunk, error = RwError)]
96 pub async fn run_inner(self) {
97 debug!(
98 query_id = %self.query.query_id,
99 "Starting to run query"
100 );
101 let context = FrontendBatchTaskContext::create(self.session.clone());
102 let task_id = TaskId {
103 query_id: self.query.query_id.id.clone(),
104 stage_id: 0,
105 task_id: 0,
106 };
107
108 let plan_fragment = self.create_plan_fragment()?;
109 let plan_node = plan_fragment.root.unwrap();
110
111 let executor = ExecutorBuilder::new(
112 &plan_node,
113 &task_id,
114 context,
115 self.batch_query_epoch,
116 self.shutdown_rx().clone(),
117 );
118 let executor = executor.build().await?;
119 drop(plan_node);
122 drop(self);
123
124 #[for_await]
125 for chunk in executor.execute() {
126 yield chunk?;
127 }
128 }
129
130 fn run(self) -> BoxStream<'static, Result<DataChunk, RwError>> {
131 let span = tracing::info_span!(
132 "local_execute",
133 query_id = self.query.query_id.id,
134 epoch = ?self.batch_query_epoch,
135 );
136 Box::pin(self.run_inner().instrument(span))
137 }
138
139 pub fn stream_rows(self) -> LocalQueryStream {
140 let compute_runtime = self.front_env.compute_runtime();
141 let (sender, receiver) = mpsc::channel(10);
142 let shutdown_rx = self.shutdown_rx().clone();
143
144 let catalog_reader = self.front_env.catalog_reader().clone();
145 let user_info_reader = self.front_env.user_info_reader().clone();
146 let auth_context = self.session.auth_context().clone();
147 let db_name = self.session.database();
148 let search_path = self.session.config().search_path();
149 let time_zone = self.session.config().timezone();
150 let strict_mode = self.session.config().batch_expr_strict_mode();
151 let timeout = self.timeout;
152 let meta_client = self.front_env.meta_client_ref();
153
154 let sender1 = sender.clone();
155 let exec = async move {
156 let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError));
157 while let Some(mut r) = data_stream.next().await {
158 if r.is_err() && shutdown_rx.is_cancelled() {
160 r = Err(Box::new(SchedulerError::QueryCancelled(
161 "Cancelled by user".to_owned(),
162 )) as BoxedError);
163 }
164 if sender1.send(r).await.is_err() {
165 tracing::info!("Receiver closed.");
166 return;
167 }
168 }
169 };
170
171 use risingwave_expr::expr_context::*;
172
173 use crate::expr::function_impl::context::{
174 AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER,
175 };
176
177 let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await }.boxed();
179 let exec = async move { USER_INFO_READER::scope(user_info_reader, exec).await }.boxed();
180 let exec = async move { DB_NAME::scope(db_name, exec).await }.boxed();
181 let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
182 let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
183 let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
184 let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed();
185 let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed();
186
187 if let Some(timeout) = timeout {
188 let exec = async move {
189 if let Err(_e) = tokio::time::timeout(timeout, exec).await {
190 tracing::error!(
191 "Local query execution timeout after {} seconds",
192 timeout.as_secs()
193 );
194 if sender
195 .send(Err(Box::new(SchedulerError::QueryCancelled(format!(
196 "timeout after {} seconds",
197 timeout.as_secs(),
198 ))) as BoxedError))
199 .await
200 .is_err()
201 {
202 tracing::info!("Receiver closed.");
203 }
204 }
205 };
206 compute_runtime.spawn(exec);
207 } else {
208 compute_runtime.spawn(exec);
209 }
210
211 ReceiverStream::new(receiver)
212 }
213
214 fn create_plan_fragment(&self) -> SchedulerResult<PlanFragment> {
225 let next_executor_id = Arc::new(AtomicU32::new(0));
226 let root_stage_id = self.query.root_stage_id();
227 let root_stage = self.query.stage_graph.stages.get(&root_stage_id).unwrap();
228 assert_eq!(root_stage.parallelism.unwrap(), 1);
229 let second_stage_id = self.query.stage_graph.get_child_stages(&root_stage_id);
230 let plan_node_prost = match second_stage_id {
231 None => {
232 debug!("Local execution mode converts a plan with a single stage");
233 self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
234 }
235 Some(second_stage_ids) => {
236 debug!("Local execution mode converts a plan with two stages");
237 if second_stage_ids.is_empty() {
238 self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
241 } else {
242 let mut second_stages = HashMap::new();
243 for second_stage_id in second_stage_ids {
244 let second_stage =
245 self.query.stage_graph.stages.get(second_stage_id).unwrap();
246 second_stages.insert(*second_stage_id, second_stage.clone());
247 }
248 let mut stage_id_to_plan = Some(second_stages);
249 let res = self.convert_plan_node(
250 &root_stage.root,
251 &mut stage_id_to_plan,
252 None,
253 next_executor_id,
254 )?;
255 assert!(
256 stage_id_to_plan.as_ref().unwrap().is_empty(),
257 "We expect that all the child stage plan fragments have been used"
258 );
259 res
260 }
261 }
262 };
263
264 Ok(PlanFragment {
265 root: Some(plan_node_prost),
266 exchange_info: None,
270 })
271 }
272
273 fn convert_plan_node(
274 &self,
275 execution_plan_node: &ExecutionPlanNode,
276 second_stages: &mut Option<HashMap<StageId, QueryStageRef>>,
277 partition: Option<PartitionInfo>,
278 next_executor_id: Arc<AtomicU32>,
279 ) -> SchedulerResult<PbPlanNode> {
280 let identity = format!(
281 "{:?}-{}",
282 execution_plan_node.plan_node_type,
283 next_executor_id.fetch_add(1, Ordering::Relaxed)
284 );
285 match execution_plan_node.plan_node_type {
286 PlanNodeType::BatchExchange => {
287 let exchange_source_stage_id = execution_plan_node
288 .source_stage_id
289 .expect("We expect stage id for Exchange Operator");
290 let Some(second_stages) = second_stages.as_mut() else {
291 bail!(
292 "Unexpected exchange detected. We are either converting a single stage plan or converting the second stage of the plan."
293 )
294 };
295 let second_stage = second_stages.remove(&exchange_source_stage_id).expect(
296 "We expect child stage fragment for Exchange Operator running in the frontend",
297 );
298 let mut node_body = execution_plan_node.node.clone();
299 let sources = match &mut node_body {
300 NodeBody::Exchange(exchange_node) => &mut exchange_node.sources,
301 NodeBody::MergeSortExchange(merge_sort_exchange_node) => {
302 &mut merge_sort_exchange_node
303 .exchange
304 .as_mut()
305 .expect("MergeSortExchangeNode must have a exchange node")
306 .sources
307 }
308 _ => unreachable!(),
309 };
310 assert!(sources.is_empty());
311
312 let tracing_context = TracingContext::from_current_span().to_protobuf();
313
314 if let Some(table_scan_info) = second_stage.table_scan_info.clone()
315 && let Some(vnode_bitmaps) = table_scan_info.partitions()
316 {
317 let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) =
321 vnode_bitmaps.clone().into_iter().unzip();
322 let workers = self
323 .worker_node_manager
324 .manager
325 .get_workers_by_worker_slot_ids(&worker_ids)?;
326 for (idx, (worker_node, partition)) in
327 (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate()
328 {
329 let second_stage_plan_node = self.convert_plan_node(
330 &second_stage.root,
331 &mut None,
332 Some(PartitionInfo::Table(partition)),
333 next_executor_id.clone(),
334 )?;
335 let second_stage_plan_fragment = PlanFragment {
336 root: Some(second_stage_plan_node),
337 exchange_info: Some(ExchangeInfo {
338 mode: DistributionMode::Single as i32,
339 ..Default::default()
340 }),
341 };
342 let local_execute_plan = LocalExecutePlan {
343 plan: Some(second_stage_plan_fragment),
344 epoch: Some(self.batch_query_epoch),
345 tracing_context: tracing_context.clone(),
346 };
347 let exchange_source = ExchangeSource {
348 task_output_id: Some(TaskOutputId {
349 task_id: Some(PbTaskId {
350 task_id: idx as u64,
351 stage_id: exchange_source_stage_id,
352 query_id: self.query.query_id.id.clone(),
353 }),
354 output_id: 0,
355 }),
356 host: Some(worker_node.host.as_ref().unwrap().clone()),
357 local_execute_plan: Some(Plan(local_execute_plan)),
358 };
359 sources.push(exchange_source);
360 }
361 } else if let Some(source_info) = &second_stage.source_info {
362 let chunk_size = (source_info.split_info().unwrap().len() as f32
365 / (self.worker_node_manager.schedule_unit_count()) as f32)
366 .ceil() as usize;
367 for (id, split) in source_info
368 .split_info()
369 .unwrap()
370 .chunks(chunk_size)
371 .enumerate()
372 {
373 let second_stage_plan_node = self.convert_plan_node(
374 &second_stage.root,
375 &mut None,
376 Some(PartitionInfo::Source(split.to_vec())),
377 next_executor_id.clone(),
378 )?;
379 let second_stage_plan_fragment = PlanFragment {
380 root: Some(second_stage_plan_node),
381 exchange_info: Some(ExchangeInfo {
382 mode: DistributionMode::Single as i32,
383 ..Default::default()
384 }),
385 };
386 let local_execute_plan = LocalExecutePlan {
387 plan: Some(second_stage_plan_fragment),
388 epoch: Some(self.batch_query_epoch),
389 tracing_context: tracing_context.clone(),
390 };
391 let worker_node = self.worker_node_manager.next_random_worker()?;
393 let exchange_source = ExchangeSource {
394 task_output_id: Some(TaskOutputId {
395 task_id: Some(PbTaskId {
396 task_id: id as u64,
397 stage_id: exchange_source_stage_id,
398 query_id: self.query.query_id.id.clone(),
399 }),
400 output_id: 0,
401 }),
402 host: Some(worker_node.host.as_ref().unwrap().clone()),
403 local_execute_plan: Some(Plan(local_execute_plan)),
404 };
405 sources.push(exchange_source);
406 }
407 } else if let Some(file_scan_info) = &second_stage.file_scan_info {
408 let chunk_size = (file_scan_info.file_location.len() as f32
409 / (self.worker_node_manager.schedule_unit_count()) as f32)
410 .ceil() as usize;
411 for (id, files) in file_scan_info.file_location.chunks(chunk_size).enumerate() {
412 let second_stage_plan_node = self.convert_plan_node(
413 &second_stage.root,
414 &mut None,
415 Some(PartitionInfo::File(files.to_vec())),
416 next_executor_id.clone(),
417 )?;
418 let second_stage_plan_fragment = PlanFragment {
419 root: Some(second_stage_plan_node),
420 exchange_info: Some(ExchangeInfo {
421 mode: DistributionMode::Single as i32,
422 ..Default::default()
423 }),
424 };
425 let local_execute_plan = LocalExecutePlan {
426 plan: Some(second_stage_plan_fragment),
427 epoch: Some(self.batch_query_epoch),
428 tracing_context: tracing_context.clone(),
429 };
430 let worker_node = self.worker_node_manager.next_random_worker()?;
432 let exchange_source = ExchangeSource {
433 task_output_id: Some(TaskOutputId {
434 task_id: Some(PbTaskId {
435 task_id: id as u64,
436 stage_id: exchange_source_stage_id,
437 query_id: self.query.query_id.id.clone(),
438 }),
439 output_id: 0,
440 }),
441 host: Some(worker_node.host.as_ref().unwrap().clone()),
442 local_execute_plan: Some(Plan(local_execute_plan)),
443 };
444 sources.push(exchange_source);
445 }
446 } else {
447 let second_stage_plan_node = self.convert_plan_node(
448 &second_stage.root,
449 &mut None,
450 None,
451 next_executor_id,
452 )?;
453 let second_stage_plan_fragment = PlanFragment {
454 root: Some(second_stage_plan_node),
455 exchange_info: Some(ExchangeInfo {
456 mode: DistributionMode::Single as i32,
457 ..Default::default()
458 }),
459 };
460
461 let local_execute_plan = LocalExecutePlan {
462 plan: Some(second_stage_plan_fragment),
463 epoch: Some(self.batch_query_epoch),
464 tracing_context,
465 };
466
467 let workers = self.choose_worker(&second_stage)?;
468 *sources = workers
469 .iter()
470 .enumerate()
471 .map(|(idx, worker_node)| ExchangeSource {
472 task_output_id: Some(TaskOutputId {
473 task_id: Some(PbTaskId {
474 task_id: idx as u64,
475 stage_id: exchange_source_stage_id,
476 query_id: self.query.query_id.id.clone(),
477 }),
478 output_id: 0,
479 }),
480 host: Some(worker_node.host.as_ref().unwrap().clone()),
481 local_execute_plan: Some(Plan(local_execute_plan.clone())),
482 })
483 .collect();
484 }
485
486 Ok(PbPlanNode {
487 children: vec![],
490 identity,
491 node_body: Some(node_body),
492 })
493 }
494 PlanNodeType::BatchSeqScan => {
495 let mut node_body = execution_plan_node.node.clone();
496 match &mut node_body {
497 NodeBody::RowSeqScan(scan_node) => {
498 if let Some(partition) = partition {
499 let partition = partition
500 .into_table()
501 .expect("PartitionInfo should be TablePartitionInfo here");
502 scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
503 scan_node.scan_ranges = partition.scan_ranges;
504 }
505 }
506 NodeBody::SysRowSeqScan(_) => {}
507 _ => unreachable!(),
508 }
509
510 Ok(PbPlanNode {
511 children: vec![],
512 identity,
513 node_body: Some(node_body),
514 })
515 }
516 PlanNodeType::BatchLogSeqScan => {
517 let mut node_body = execution_plan_node.node.clone();
518 match &mut node_body {
519 NodeBody::LogRowSeqScan(scan_node) => {
520 if let Some(partition) = partition {
521 let partition = partition
522 .into_table()
523 .expect("PartitionInfo should be TablePartitionInfo here");
524 scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
525 }
526 }
527 _ => unreachable!(),
528 }
529
530 Ok(PbPlanNode {
531 children: vec![],
532 identity,
533 node_body: Some(node_body),
534 })
535 }
536 PlanNodeType::BatchFileScan => {
537 let mut node_body = execution_plan_node.node.clone();
538 match &mut node_body {
539 NodeBody::FileScan(file_scan_node) => {
540 if let Some(partition) = partition {
541 let partition = partition
542 .into_file()
543 .expect("PartitionInfo should be FilePartitionInfo here");
544 file_scan_node.file_location = partition;
545 }
546 }
547 _ => unreachable!(),
548 }
549
550 Ok(PbPlanNode {
551 children: vec![],
552 identity,
553 node_body: Some(node_body),
554 })
555 }
556 PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => {
557 let mut node_body = execution_plan_node.node.clone();
558 match &mut node_body {
559 NodeBody::Source(source_node) => {
560 if let Some(partition) = partition {
561 let partition = partition
562 .into_source()
563 .expect("PartitionInfo should be SourcePartitionInfo here");
564 source_node.split = partition
565 .into_iter()
566 .map(|split| split.encode_to_bytes().into())
567 .collect_vec();
568 }
569 }
570 _ => unreachable!(),
571 }
572
573 Ok(PbPlanNode {
574 children: vec![],
575 identity,
576 node_body: Some(node_body),
577 })
578 }
579 PlanNodeType::BatchIcebergScan => {
580 let mut node_body = execution_plan_node.node.clone();
581 match &mut node_body {
582 NodeBody::IcebergScan(iceberg_scan_node) => {
583 if let Some(partition) = partition {
584 let partition = partition
585 .into_source()
586 .expect("PartitionInfo should be SourcePartitionInfo here");
587 iceberg_scan_node.split = partition
588 .into_iter()
589 .map(|split| split.encode_to_bytes().into())
590 .collect_vec();
591 }
592 }
593 _ => unreachable!(),
594 }
595
596 Ok(PbPlanNode {
597 children: vec![],
598 identity,
599 node_body: Some(node_body),
600 })
601 }
602 PlanNodeType::BatchLookupJoin => {
603 let mut node_body = execution_plan_node.node.clone();
604 match &mut node_body {
605 NodeBody::LocalLookupJoin(node) => {
606 let side_table_desc = node
607 .inner_side_table_desc
608 .as_ref()
609 .expect("no side table desc");
610 let mapping = self.worker_node_manager.fragment_mapping(
611 self.get_fragment_id(&side_table_desc.table_id.into())?,
612 )?;
613
614 node.inner_side_vnode_mapping =
616 mapping.to_expanded().into_iter().map(u64::from).collect();
617 node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes();
618 }
619 _ => unreachable!(),
620 }
621
622 let left_child = self.convert_plan_node(
623 &execution_plan_node.children[0],
624 second_stages,
625 partition,
626 next_executor_id,
627 )?;
628
629 Ok(PbPlanNode {
630 children: vec![left_child],
631 identity,
632 node_body: Some(node_body),
633 })
634 }
635 _ => {
636 let children = execution_plan_node
637 .children
638 .iter()
639 .map(|e| {
640 self.convert_plan_node(
641 e,
642 second_stages,
643 partition.clone(),
644 next_executor_id.clone(),
645 )
646 })
647 .collect::<SchedulerResult<Vec<PbPlanNode>>>()?;
648
649 Ok(PbPlanNode {
650 children,
651 identity,
652 node_body: Some(execution_plan_node.node.clone()),
653 })
654 }
655 }
656 }
657
658 #[inline(always)]
659 fn get_fragment_id(&self, table_id: &TableId) -> SchedulerResult<FragmentId> {
660 let reader = self.front_env.catalog_reader().read_guard();
661 reader
662 .get_any_table_by_id(table_id)
663 .map(|table| table.fragment_id)
664 .map_err(|e| SchedulerError::Internal(anyhow!(e)))
665 }
666
667 #[inline(always)]
668 fn get_table_dml_vnode_mapping(
669 &self,
670 table_id: &TableId,
671 ) -> SchedulerResult<WorkerSlotMapping> {
672 let guard = self.front_env.catalog_reader().read_guard();
673
674 let table = guard
675 .get_any_table_by_id(table_id)
676 .map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
677
678 let fragment_id = match table.dml_fragment_id.as_ref() {
679 Some(dml_fragment_id) => dml_fragment_id,
680 None => &table.fragment_id,
682 };
683
684 self.worker_node_manager
685 .manager
686 .get_streaming_fragment_mapping(fragment_id)
687 .map_err(|e| e.into())
688 }
689
690 fn choose_worker(&self, stage: &Arc<QueryStage>) -> SchedulerResult<Vec<WorkerNode>> {
691 if let Some(table_id) = stage.dml_table_id.as_ref() {
692 let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?;
694 let worker_node = {
695 let worker_ids = vnode_mapping.iter_unique().collect_vec();
696 let candidates = self
697 .worker_node_manager
698 .manager
699 .get_workers_by_worker_slot_ids(&worker_ids)?;
700 if candidates.is_empty() {
701 return Err(BatchError::EmptyWorkerNodes.into());
702 }
703 candidates[stage.session_id.0 as usize % candidates.len()].clone()
704 };
705 Ok(vec![worker_node])
706 } else {
707 let mut workers = Vec::with_capacity(stage.parallelism.unwrap() as usize);
708 for _ in 0..stage.parallelism.unwrap() {
709 workers.push(self.worker_node_manager.next_random_worker()?);
710 }
711 Ok(workers)
712 }
713 }
714}