risingwave_frontend/scheduler/
local.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Local execution for batch query.
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::stream::BoxStream;
23use futures::{FutureExt, StreamExt};
24use futures_async_stream::try_stream;
25use itertools::Itertools;
26use pgwire::pg_server::BoxedError;
27use risingwave_batch::error::BatchError;
28use risingwave_batch::executor::ExecutorBuilder;
29use risingwave_batch::task::{ShutdownToken, TaskId};
30use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
31use risingwave_common::array::DataChunk;
32use risingwave_common::bail;
33use risingwave_common::hash::WorkerSlotMapping;
34use risingwave_common::util::iter_util::ZipEqFast;
35use risingwave_common::util::tracing::{InstrumentStream, TracingContext};
36use risingwave_connector::source::SplitMetaData;
37use risingwave_pb::batch_plan::exchange_info::DistributionMode;
38use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
39use risingwave_pb::batch_plan::plan_node::NodeBody;
40use risingwave_pb::batch_plan::{
41    ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PbPlanNode,
42    TaskOutputId,
43};
44use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
45use tokio::sync::mpsc;
46use tokio_stream::wrappers::ReceiverStream;
47use tracing::debug;
48
49use super::plan_fragmenter::{PartitionInfo, QueryStage, QueryStageRef};
50use crate::catalog::{FragmentId, TableId};
51use crate::error::RwError;
52use crate::optimizer::plan_node::PlanNodeType;
53use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
54use crate::scheduler::task_context::FrontendBatchTaskContext;
55use crate::scheduler::{SchedulerError, SchedulerResult};
56use crate::session::{FrontendEnv, SessionImpl};
57
58// TODO(error-handling): use a concrete error type.
59pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;
60pub struct LocalQueryExecution {
61    query: Query,
62    front_env: FrontendEnv,
63    batch_query_epoch: BatchQueryEpoch,
64    session: Arc<SessionImpl>,
65    worker_node_manager: WorkerNodeSelector,
66    timeout: Option<Duration>,
67}
68
69impl LocalQueryExecution {
70    pub fn new(
71        query: Query,
72        front_env: FrontendEnv,
73        support_barrier_read: bool,
74        batch_query_epoch: BatchQueryEpoch,
75        session: Arc<SessionImpl>,
76        timeout: Option<Duration>,
77    ) -> Self {
78        let worker_node_manager =
79            WorkerNodeSelector::new(front_env.worker_node_manager_ref(), support_barrier_read);
80
81        Self {
82            query,
83            front_env,
84            batch_query_epoch,
85            session,
86            worker_node_manager,
87            timeout,
88        }
89    }
90
91    fn shutdown_rx(&self) -> ShutdownToken {
92        self.session.reset_cancel_query_flag()
93    }
94
95    #[try_stream(ok = DataChunk, error = RwError)]
96    pub async fn run_inner(self) {
97        debug!(
98            query_id = %self.query.query_id,
99            "Starting to run query"
100        );
101        let context = FrontendBatchTaskContext::create(self.session.clone());
102        let task_id = TaskId {
103            query_id: self.query.query_id.id.clone(),
104            stage_id: 0,
105            task_id: 0,
106        };
107
108        let plan_fragment = self.create_plan_fragment()?;
109        let plan_node = plan_fragment.root.unwrap();
110
111        let executor = ExecutorBuilder::new(
112            &plan_node,
113            &task_id,
114            context,
115            self.batch_query_epoch,
116            self.shutdown_rx().clone(),
117        );
118        let executor = executor.build().await?;
119        // The following loop can be slow.
120        // Release potential large object in Query and PlanNode early.
121        drop(plan_node);
122        drop(self);
123
124        #[for_await]
125        for chunk in executor.execute() {
126            yield chunk?;
127        }
128    }
129
130    fn run(self) -> BoxStream<'static, Result<DataChunk, RwError>> {
131        let span = tracing::info_span!(
132            "local_execute",
133            query_id = self.query.query_id.id,
134            epoch = ?self.batch_query_epoch,
135        );
136        Box::pin(self.run_inner().instrument(span))
137    }
138
139    pub fn stream_rows(self) -> LocalQueryStream {
140        let compute_runtime = self.front_env.compute_runtime();
141        let (sender, receiver) = mpsc::channel(10);
142        let shutdown_rx = self.shutdown_rx().clone();
143
144        let catalog_reader = self.front_env.catalog_reader().clone();
145        let user_info_reader = self.front_env.user_info_reader().clone();
146        let auth_context = self.session.auth_context().clone();
147        let db_name = self.session.database();
148        let search_path = self.session.config().search_path();
149        let time_zone = self.session.config().timezone();
150        let strict_mode = self.session.config().batch_expr_strict_mode();
151        let timeout = self.timeout;
152        let meta_client = self.front_env.meta_client_ref();
153
154        let sender1 = sender.clone();
155        let exec = async move {
156            let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError));
157            while let Some(mut r) = data_stream.next().await {
158                // append a query cancelled error if the query is cancelled.
159                if r.is_err() && shutdown_rx.is_cancelled() {
160                    r = Err(Box::new(SchedulerError::QueryCancelled(
161                        "Cancelled by user".to_owned(),
162                    )) as BoxedError);
163                }
164                if sender1.send(r).await.is_err() {
165                    tracing::info!("Receiver closed.");
166                    return;
167                }
168            }
169        };
170
171        use risingwave_expr::expr_context::*;
172
173        use crate::expr::function_impl::context::{
174            AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER,
175        };
176
177        // box is necessary, otherwise the size of `exec` will double each time it is nested.
178        let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await }.boxed();
179        let exec = async move { USER_INFO_READER::scope(user_info_reader, exec).await }.boxed();
180        let exec = async move { DB_NAME::scope(db_name, exec).await }.boxed();
181        let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
182        let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
183        let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
184        let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed();
185        let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed();
186
187        if let Some(timeout) = timeout {
188            let exec = async move {
189                if let Err(_e) = tokio::time::timeout(timeout, exec).await {
190                    tracing::error!(
191                        "Local query execution timeout after {} seconds",
192                        timeout.as_secs()
193                    );
194                    if sender
195                        .send(Err(Box::new(SchedulerError::QueryCancelled(format!(
196                            "timeout after {} seconds",
197                            timeout.as_secs(),
198                        ))) as BoxedError))
199                        .await
200                        .is_err()
201                    {
202                        tracing::info!("Receiver closed.");
203                    }
204                }
205            };
206            compute_runtime.spawn(exec);
207        } else {
208            compute_runtime.spawn(exec);
209        }
210
211        ReceiverStream::new(receiver)
212    }
213
214    /// Convert query to plan fragment.
215    ///
216    /// We can convert a query to plan fragment since in local execution mode, there are at most
217    /// two layers, e.g. root stage and its optional input stage. If it does have input stage, it
218    /// will be embedded in exchange source, so we can always convert a query into a plan fragment.
219    ///
220    /// We remark that the boundary to determine which part should be executed on the frontend and
221    /// which part should be executed on the backend is `the first exchange operator` when looking
222    /// from the the root of the plan to the leaves. The first exchange operator contains
223    /// the pushed-down plan fragment.
224    fn create_plan_fragment(&self) -> SchedulerResult<PlanFragment> {
225        let next_executor_id = Arc::new(AtomicU32::new(0));
226        let root_stage_id = self.query.root_stage_id();
227        let root_stage = self.query.stage_graph.stages.get(&root_stage_id).unwrap();
228        assert_eq!(root_stage.parallelism.unwrap(), 1);
229        let second_stage_id = self.query.stage_graph.get_child_stages(&root_stage_id);
230        let plan_node_prost = match second_stage_id {
231            None => {
232                debug!("Local execution mode converts a plan with a single stage");
233                self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
234            }
235            Some(second_stage_ids) => {
236                debug!("Local execution mode converts a plan with two stages");
237                if second_stage_ids.is_empty() {
238                    // This branch is defensive programming. The semantics should be the same as
239                    // `None`.
240                    self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
241                } else {
242                    let mut second_stages = HashMap::new();
243                    for second_stage_id in second_stage_ids {
244                        let second_stage =
245                            self.query.stage_graph.stages.get(second_stage_id).unwrap();
246                        second_stages.insert(*second_stage_id, second_stage.clone());
247                    }
248                    let mut stage_id_to_plan = Some(second_stages);
249                    let res = self.convert_plan_node(
250                        &root_stage.root,
251                        &mut stage_id_to_plan,
252                        None,
253                        next_executor_id,
254                    )?;
255                    assert!(
256                        stage_id_to_plan.as_ref().unwrap().is_empty(),
257                        "We expect that all the child stage plan fragments have been used"
258                    );
259                    res
260                }
261            }
262        };
263
264        Ok(PlanFragment {
265            root: Some(plan_node_prost),
266            // Intentionally leave this as `None` as this is the last stage for the frontend
267            // to really get the output of computation, which is single distribution
268            // but we do not need to explicitly specify this.
269            exchange_info: None,
270        })
271    }
272
273    fn convert_plan_node(
274        &self,
275        execution_plan_node: &ExecutionPlanNode,
276        second_stages: &mut Option<HashMap<StageId, QueryStageRef>>,
277        partition: Option<PartitionInfo>,
278        next_executor_id: Arc<AtomicU32>,
279    ) -> SchedulerResult<PbPlanNode> {
280        let identity = format!(
281            "{:?}-{}",
282            execution_plan_node.plan_node_type,
283            next_executor_id.fetch_add(1, Ordering::Relaxed)
284        );
285        match execution_plan_node.plan_node_type {
286            PlanNodeType::BatchExchange => {
287                let exchange_source_stage_id = execution_plan_node
288                    .source_stage_id
289                    .expect("We expect stage id for Exchange Operator");
290                let Some(second_stages) = second_stages.as_mut() else {
291                    bail!(
292                        "Unexpected exchange detected. We are either converting a single stage plan or converting the second stage of the plan."
293                    )
294                };
295                let second_stage = second_stages.remove(&exchange_source_stage_id).expect(
296                    "We expect child stage fragment for Exchange Operator running in the frontend",
297                );
298                let mut node_body = execution_plan_node.node.clone();
299                let sources = match &mut node_body {
300                    NodeBody::Exchange(exchange_node) => &mut exchange_node.sources,
301                    NodeBody::MergeSortExchange(merge_sort_exchange_node) => {
302                        &mut merge_sort_exchange_node
303                            .exchange
304                            .as_mut()
305                            .expect("MergeSortExchangeNode must have a exchange node")
306                            .sources
307                    }
308                    _ => unreachable!(),
309                };
310                assert!(sources.is_empty());
311
312                let tracing_context = TracingContext::from_current_span().to_protobuf();
313
314                if let Some(table_scan_info) = second_stage.table_scan_info.clone()
315                    && let Some(vnode_bitmaps) = table_scan_info.partitions()
316                {
317                    // Similar to the distributed case (StageRunner::schedule_tasks).
318                    // Set `vnode_ranges` of the scan node in `local_execute_plan` of each
319                    // `exchange_source`.
320                    let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) =
321                        vnode_bitmaps.clone().into_iter().unzip();
322                    let workers = self
323                        .worker_node_manager
324                        .manager
325                        .get_workers_by_worker_slot_ids(&worker_ids)?;
326                    for (idx, (worker_node, partition)) in
327                        (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate()
328                    {
329                        let second_stage_plan_node = self.convert_plan_node(
330                            &second_stage.root,
331                            &mut None,
332                            Some(PartitionInfo::Table(partition)),
333                            next_executor_id.clone(),
334                        )?;
335                        let second_stage_plan_fragment = PlanFragment {
336                            root: Some(second_stage_plan_node),
337                            exchange_info: Some(ExchangeInfo {
338                                mode: DistributionMode::Single as i32,
339                                ..Default::default()
340                            }),
341                        };
342                        let local_execute_plan = LocalExecutePlan {
343                            plan: Some(second_stage_plan_fragment),
344                            epoch: Some(self.batch_query_epoch),
345                            tracing_context: tracing_context.clone(),
346                        };
347                        let exchange_source = ExchangeSource {
348                            task_output_id: Some(TaskOutputId {
349                                task_id: Some(PbTaskId {
350                                    task_id: idx as u64,
351                                    stage_id: exchange_source_stage_id,
352                                    query_id: self.query.query_id.id.clone(),
353                                }),
354                                output_id: 0,
355                            }),
356                            host: Some(worker_node.host.as_ref().unwrap().clone()),
357                            local_execute_plan: Some(Plan(local_execute_plan)),
358                        };
359                        sources.push(exchange_source);
360                    }
361                } else if let Some(source_info) = &second_stage.source_info {
362                    // For file source batch read, all the files  to be read  are divide into several parts to prevent the task from taking up too many resources.
363
364                    let chunk_size = (source_info.split_info().unwrap().len() as f32
365                        / (self.worker_node_manager.schedule_unit_count()) as f32)
366                        .ceil() as usize;
367                    for (id, split) in source_info
368                        .split_info()
369                        .unwrap()
370                        .chunks(chunk_size)
371                        .enumerate()
372                    {
373                        let second_stage_plan_node = self.convert_plan_node(
374                            &second_stage.root,
375                            &mut None,
376                            Some(PartitionInfo::Source(split.to_vec())),
377                            next_executor_id.clone(),
378                        )?;
379                        let second_stage_plan_fragment = PlanFragment {
380                            root: Some(second_stage_plan_node),
381                            exchange_info: Some(ExchangeInfo {
382                                mode: DistributionMode::Single as i32,
383                                ..Default::default()
384                            }),
385                        };
386                        let local_execute_plan = LocalExecutePlan {
387                            plan: Some(second_stage_plan_fragment),
388                            epoch: Some(self.batch_query_epoch),
389                            tracing_context: tracing_context.clone(),
390                        };
391                        // NOTE: select a random work node here.
392                        let worker_node = self.worker_node_manager.next_random_worker()?;
393                        let exchange_source = ExchangeSource {
394                            task_output_id: Some(TaskOutputId {
395                                task_id: Some(PbTaskId {
396                                    task_id: id as u64,
397                                    stage_id: exchange_source_stage_id,
398                                    query_id: self.query.query_id.id.clone(),
399                                }),
400                                output_id: 0,
401                            }),
402                            host: Some(worker_node.host.as_ref().unwrap().clone()),
403                            local_execute_plan: Some(Plan(local_execute_plan)),
404                        };
405                        sources.push(exchange_source);
406                    }
407                } else if let Some(file_scan_info) = &second_stage.file_scan_info {
408                    let chunk_size = (file_scan_info.file_location.len() as f32
409                        / (self.worker_node_manager.schedule_unit_count()) as f32)
410                        .ceil() as usize;
411                    for (id, files) in file_scan_info.file_location.chunks(chunk_size).enumerate() {
412                        let second_stage_plan_node = self.convert_plan_node(
413                            &second_stage.root,
414                            &mut None,
415                            Some(PartitionInfo::File(files.to_vec())),
416                            next_executor_id.clone(),
417                        )?;
418                        let second_stage_plan_fragment = PlanFragment {
419                            root: Some(second_stage_plan_node),
420                            exchange_info: Some(ExchangeInfo {
421                                mode: DistributionMode::Single as i32,
422                                ..Default::default()
423                            }),
424                        };
425                        let local_execute_plan = LocalExecutePlan {
426                            plan: Some(second_stage_plan_fragment),
427                            epoch: Some(self.batch_query_epoch),
428                            tracing_context: tracing_context.clone(),
429                        };
430                        // NOTE: select a random work node here.
431                        let worker_node = self.worker_node_manager.next_random_worker()?;
432                        let exchange_source = ExchangeSource {
433                            task_output_id: Some(TaskOutputId {
434                                task_id: Some(PbTaskId {
435                                    task_id: id as u64,
436                                    stage_id: exchange_source_stage_id,
437                                    query_id: self.query.query_id.id.clone(),
438                                }),
439                                output_id: 0,
440                            }),
441                            host: Some(worker_node.host.as_ref().unwrap().clone()),
442                            local_execute_plan: Some(Plan(local_execute_plan)),
443                        };
444                        sources.push(exchange_source);
445                    }
446                } else {
447                    let second_stage_plan_node = self.convert_plan_node(
448                        &second_stage.root,
449                        &mut None,
450                        None,
451                        next_executor_id,
452                    )?;
453                    let second_stage_plan_fragment = PlanFragment {
454                        root: Some(second_stage_plan_node),
455                        exchange_info: Some(ExchangeInfo {
456                            mode: DistributionMode::Single as i32,
457                            ..Default::default()
458                        }),
459                    };
460
461                    let local_execute_plan = LocalExecutePlan {
462                        plan: Some(second_stage_plan_fragment),
463                        epoch: Some(self.batch_query_epoch),
464                        tracing_context,
465                    };
466
467                    let workers = self.choose_worker(&second_stage)?;
468                    *sources = workers
469                        .iter()
470                        .enumerate()
471                        .map(|(idx, worker_node)| ExchangeSource {
472                            task_output_id: Some(TaskOutputId {
473                                task_id: Some(PbTaskId {
474                                    task_id: idx as u64,
475                                    stage_id: exchange_source_stage_id,
476                                    query_id: self.query.query_id.id.clone(),
477                                }),
478                                output_id: 0,
479                            }),
480                            host: Some(worker_node.host.as_ref().unwrap().clone()),
481                            local_execute_plan: Some(Plan(local_execute_plan.clone())),
482                        })
483                        .collect();
484                }
485
486                Ok(PbPlanNode {
487                    // Since all the rest plan is embedded into the exchange node,
488                    // there is no children any more.
489                    children: vec![],
490                    identity,
491                    node_body: Some(node_body),
492                })
493            }
494            PlanNodeType::BatchSeqScan => {
495                let mut node_body = execution_plan_node.node.clone();
496                match &mut node_body {
497                    NodeBody::RowSeqScan(scan_node) => {
498                        if let Some(partition) = partition {
499                            let partition = partition
500                                .into_table()
501                                .expect("PartitionInfo should be TablePartitionInfo here");
502                            scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
503                            scan_node.scan_ranges = partition.scan_ranges;
504                        }
505                    }
506                    NodeBody::SysRowSeqScan(_) => {}
507                    _ => unreachable!(),
508                }
509
510                Ok(PbPlanNode {
511                    children: vec![],
512                    identity,
513                    node_body: Some(node_body),
514                })
515            }
516            PlanNodeType::BatchLogSeqScan => {
517                let mut node_body = execution_plan_node.node.clone();
518                match &mut node_body {
519                    NodeBody::LogRowSeqScan(scan_node) => {
520                        if let Some(partition) = partition {
521                            let partition = partition
522                                .into_table()
523                                .expect("PartitionInfo should be TablePartitionInfo here");
524                            scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
525                        }
526                    }
527                    _ => unreachable!(),
528                }
529
530                Ok(PbPlanNode {
531                    children: vec![],
532                    identity,
533                    node_body: Some(node_body),
534                })
535            }
536            PlanNodeType::BatchFileScan => {
537                let mut node_body = execution_plan_node.node.clone();
538                match &mut node_body {
539                    NodeBody::FileScan(file_scan_node) => {
540                        if let Some(partition) = partition {
541                            let partition = partition
542                                .into_file()
543                                .expect("PartitionInfo should be FilePartitionInfo here");
544                            file_scan_node.file_location = partition;
545                        }
546                    }
547                    _ => unreachable!(),
548                }
549
550                Ok(PbPlanNode {
551                    children: vec![],
552                    identity,
553                    node_body: Some(node_body),
554                })
555            }
556            PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => {
557                let mut node_body = execution_plan_node.node.clone();
558                match &mut node_body {
559                    NodeBody::Source(source_node) => {
560                        if let Some(partition) = partition {
561                            let partition = partition
562                                .into_source()
563                                .expect("PartitionInfo should be SourcePartitionInfo here");
564                            source_node.split = partition
565                                .into_iter()
566                                .map(|split| split.encode_to_bytes().into())
567                                .collect_vec();
568                        }
569                    }
570                    _ => unreachable!(),
571                }
572
573                Ok(PbPlanNode {
574                    children: vec![],
575                    identity,
576                    node_body: Some(node_body),
577                })
578            }
579            PlanNodeType::BatchIcebergScan => {
580                let mut node_body = execution_plan_node.node.clone();
581                match &mut node_body {
582                    NodeBody::IcebergScan(iceberg_scan_node) => {
583                        if let Some(partition) = partition {
584                            let partition = partition
585                                .into_source()
586                                .expect("PartitionInfo should be SourcePartitionInfo here");
587                            iceberg_scan_node.split = partition
588                                .into_iter()
589                                .map(|split| split.encode_to_bytes().into())
590                                .collect_vec();
591                        }
592                    }
593                    _ => unreachable!(),
594                }
595
596                Ok(PbPlanNode {
597                    children: vec![],
598                    identity,
599                    node_body: Some(node_body),
600                })
601            }
602            PlanNodeType::BatchLookupJoin => {
603                let mut node_body = execution_plan_node.node.clone();
604                match &mut node_body {
605                    NodeBody::LocalLookupJoin(node) => {
606                        let side_table_desc = node
607                            .inner_side_table_desc
608                            .as_ref()
609                            .expect("no side table desc");
610                        let mapping = self.worker_node_manager.fragment_mapping(
611                            self.get_fragment_id(&side_table_desc.table_id.into())?,
612                        )?;
613
614                        // TODO: should we use `pb::WorkerSlotMapping` here?
615                        node.inner_side_vnode_mapping =
616                            mapping.to_expanded().into_iter().map(u64::from).collect();
617                        node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes();
618                    }
619                    _ => unreachable!(),
620                }
621
622                let left_child = self.convert_plan_node(
623                    &execution_plan_node.children[0],
624                    second_stages,
625                    partition,
626                    next_executor_id,
627                )?;
628
629                Ok(PbPlanNode {
630                    children: vec![left_child],
631                    identity,
632                    node_body: Some(node_body),
633                })
634            }
635            _ => {
636                let children = execution_plan_node
637                    .children
638                    .iter()
639                    .map(|e| {
640                        self.convert_plan_node(
641                            e,
642                            second_stages,
643                            partition.clone(),
644                            next_executor_id.clone(),
645                        )
646                    })
647                    .collect::<SchedulerResult<Vec<PbPlanNode>>>()?;
648
649                Ok(PbPlanNode {
650                    children,
651                    identity,
652                    node_body: Some(execution_plan_node.node.clone()),
653                })
654            }
655        }
656    }
657
658    #[inline(always)]
659    fn get_fragment_id(&self, table_id: &TableId) -> SchedulerResult<FragmentId> {
660        let reader = self.front_env.catalog_reader().read_guard();
661        reader
662            .get_any_table_by_id(table_id)
663            .map(|table| table.fragment_id)
664            .map_err(|e| SchedulerError::Internal(anyhow!(e)))
665    }
666
667    #[inline(always)]
668    fn get_table_dml_vnode_mapping(
669        &self,
670        table_id: &TableId,
671    ) -> SchedulerResult<WorkerSlotMapping> {
672        let guard = self.front_env.catalog_reader().read_guard();
673
674        let table = guard
675            .get_any_table_by_id(table_id)
676            .map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
677
678        let fragment_id = match table.dml_fragment_id.as_ref() {
679            Some(dml_fragment_id) => dml_fragment_id,
680            // Backward compatibility for those table without `dml_fragment_id`.
681            None => &table.fragment_id,
682        };
683
684        self.worker_node_manager
685            .manager
686            .get_streaming_fragment_mapping(fragment_id)
687            .map_err(|e| e.into())
688    }
689
690    fn choose_worker(&self, stage: &Arc<QueryStage>) -> SchedulerResult<Vec<WorkerNode>> {
691        if let Some(table_id) = stage.dml_table_id.as_ref() {
692            // dml should use streaming vnode mapping
693            let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?;
694            let worker_node = {
695                let worker_ids = vnode_mapping.iter_unique().collect_vec();
696                let candidates = self
697                    .worker_node_manager
698                    .manager
699                    .get_workers_by_worker_slot_ids(&worker_ids)?;
700                if candidates.is_empty() {
701                    return Err(BatchError::EmptyWorkerNodes.into());
702                }
703                candidates[stage.session_id.0 as usize % candidates.len()].clone()
704            };
705            Ok(vec![worker_node])
706        } else {
707            let mut workers = Vec::with_capacity(stage.parallelism.unwrap() as usize);
708            for _ in 0..stage.parallelism.unwrap() {
709                workers.push(self.worker_node_manager.next_random_worker()?);
710            }
711            Ok(workers)
712        }
713    }
714}