risingwave_frontend/scheduler/
local.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Local execution for batch query.
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU32, Ordering};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::stream::BoxStream;
23use futures::{FutureExt, StreamExt};
24use futures_async_stream::try_stream;
25use itertools::Itertools;
26use pgwire::pg_server::BoxedError;
27use risingwave_batch::error::BatchError;
28use risingwave_batch::executor::ExecutorBuilder;
29use risingwave_batch::task::{ShutdownToken, TaskId};
30use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
31use risingwave_common::array::DataChunk;
32use risingwave_common::bail;
33use risingwave_common::hash::WorkerSlotMapping;
34use risingwave_common::util::iter_util::ZipEqFast;
35use risingwave_common::util::tracing::{InstrumentStream, TracingContext};
36use risingwave_connector::source::SplitMetaData;
37use risingwave_pb::batch_plan::exchange_info::DistributionMode;
38use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
39use risingwave_pb::batch_plan::plan_node::NodeBody;
40use risingwave_pb::batch_plan::{
41    ExchangeInfo, ExchangeSource, LocalExecutePlan, PbTaskId, PlanFragment, PlanNode as PbPlanNode,
42    TaskOutputId,
43};
44use risingwave_pb::common::WorkerNode;
45use tokio::sync::mpsc;
46use tokio_stream::wrappers::ReceiverStream;
47use tracing::debug;
48
49use super::plan_fragmenter::{PartitionInfo, QueryStage};
50use crate::catalog::{FragmentId, TableId};
51use crate::error::RwError;
52use crate::optimizer::plan_node::BatchPlanNodeType;
53use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
54use crate::scheduler::task_context::FrontendBatchTaskContext;
55use crate::scheduler::{SchedulerError, SchedulerResult};
56use crate::session::{FrontendEnv, SessionImpl};
57
58// TODO(error-handling): use a concrete error type.
59pub type LocalQueryStream = ReceiverStream<Result<DataChunk, BoxedError>>;
60pub struct LocalQueryExecution {
61    query: Query,
62    front_env: FrontendEnv,
63    session: Arc<SessionImpl>,
64    worker_node_manager: WorkerNodeSelector,
65    timeout: Option<Duration>,
66}
67
68impl LocalQueryExecution {
69    pub fn new(
70        query: Query,
71        front_env: FrontendEnv,
72        support_barrier_read: bool,
73        session: Arc<SessionImpl>,
74        timeout: Option<Duration>,
75    ) -> Self {
76        let worker_node_manager =
77            WorkerNodeSelector::new(front_env.worker_node_manager_ref(), support_barrier_read);
78
79        Self {
80            query,
81            front_env,
82            session,
83            worker_node_manager,
84            timeout,
85        }
86    }
87
88    fn shutdown_rx(&self) -> ShutdownToken {
89        self.session.reset_cancel_query_flag()
90    }
91
92    #[try_stream(ok = DataChunk, error = RwError)]
93    pub async fn run_inner(self) {
94        debug!(
95            query_id = %self.query.query_id,
96            "Starting to run query"
97        );
98        let context = FrontendBatchTaskContext::create(self.session.clone());
99        let task_id = TaskId {
100            query_id: self.query.query_id.id.clone(),
101            stage_id: 0,
102            task_id: 0,
103        };
104
105        let plan_fragment = self.create_plan_fragment()?;
106        let plan_node = plan_fragment.root.unwrap();
107
108        let executor = ExecutorBuilder::new(&plan_node, &task_id, context, self.shutdown_rx());
109        let executor = executor.build().await?;
110        // The following loop can be slow.
111        // Release potential large object in Query and PlanNode early.
112        drop(plan_node);
113        drop(self);
114
115        #[for_await]
116        for chunk in executor.execute() {
117            yield chunk?;
118        }
119    }
120
121    fn run(self) -> BoxStream<'static, Result<DataChunk, RwError>> {
122        let span = tracing::info_span!("local_execute", query_id = self.query.query_id.id,);
123        Box::pin(self.run_inner().instrument(span))
124    }
125
126    pub fn stream_rows(self) -> LocalQueryStream {
127        let compute_runtime = self.front_env.compute_runtime();
128        let (sender, receiver) = mpsc::channel(10);
129        let shutdown_rx = self.shutdown_rx();
130
131        let catalog_reader = self.front_env.catalog_reader().clone();
132        let user_info_reader = self.front_env.user_info_reader().clone();
133        let auth_context = self.session.auth_context();
134        let db_name = self.session.database();
135        let search_path = self.session.config().search_path();
136        let time_zone = self.session.config().timezone();
137        let strict_mode = self.session.config().batch_expr_strict_mode();
138        let timeout = self.timeout;
139        let meta_client = self.front_env.meta_client_ref();
140
141        let sender1 = sender.clone();
142        let exec = async move {
143            let mut data_stream = self.run().map(|r| r.map_err(|e| Box::new(e) as BoxedError));
144            while let Some(mut r) = data_stream.next().await {
145                // append a query cancelled error if the query is cancelled.
146                if r.is_err() && shutdown_rx.is_cancelled() {
147                    r = Err(Box::new(SchedulerError::QueryCancelled(
148                        "Cancelled by user".to_owned(),
149                    )) as BoxedError);
150                }
151                if sender1.send(r).await.is_err() {
152                    tracing::info!("Receiver closed.");
153                    return;
154                }
155            }
156        };
157
158        use risingwave_expr::expr_context::*;
159
160        use crate::expr::function_impl::context::{
161            AUTH_CONTEXT, CATALOG_READER, DB_NAME, META_CLIENT, SEARCH_PATH, USER_INFO_READER,
162        };
163
164        // box is necessary, otherwise the size of `exec` will double each time it is nested.
165        let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await }.boxed();
166        let exec = async move { USER_INFO_READER::scope(user_info_reader, exec).await }.boxed();
167        let exec = async move { DB_NAME::scope(db_name, exec).await }.boxed();
168        let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
169        let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
170        let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
171        let exec = async move { STRICT_MODE::scope(strict_mode, exec).await }.boxed();
172        let exec = async move { META_CLIENT::scope(meta_client, exec).await }.boxed();
173
174        if let Some(timeout) = timeout {
175            let exec = async move {
176                if let Err(_e) = tokio::time::timeout(timeout, exec).await {
177                    tracing::error!(
178                        "Local query execution timeout after {} seconds",
179                        timeout.as_secs()
180                    );
181                    if sender
182                        .send(Err(Box::new(SchedulerError::QueryCancelled(format!(
183                            "timeout after {} seconds",
184                            timeout.as_secs(),
185                        ))) as BoxedError))
186                        .await
187                        .is_err()
188                    {
189                        tracing::info!("Receiver closed.");
190                    }
191                }
192            };
193            compute_runtime.spawn(exec);
194        } else {
195            compute_runtime.spawn(exec);
196        }
197
198        ReceiverStream::new(receiver)
199    }
200
201    /// Convert query to plan fragment.
202    ///
203    /// We can convert a query to plan fragment since in local execution mode, there are at most
204    /// two layers, e.g. root stage and its optional input stage. If it does have input stage, it
205    /// will be embedded in exchange source, so we can always convert a query into a plan fragment.
206    ///
207    /// We remark that the boundary to determine which part should be executed on the frontend and
208    /// which part should be executed on the backend is `the first exchange operator` when looking
209    /// from the the root of the plan to the leaves. The first exchange operator contains
210    /// the pushed-down plan fragment.
211    fn create_plan_fragment(&self) -> SchedulerResult<PlanFragment> {
212        let next_executor_id = Arc::new(AtomicU32::new(0));
213        let root_stage_id = self.query.root_stage_id();
214        let root_stage = self.query.stage_graph.stages.get(&root_stage_id).unwrap();
215        assert_eq!(root_stage.parallelism.unwrap(), 1);
216        let second_stage_id = self.query.stage_graph.get_child_stages(&root_stage_id);
217        let plan_node_prost = match second_stage_id {
218            None => {
219                debug!("Local execution mode converts a plan with a single stage");
220                self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
221            }
222            Some(second_stage_ids) => {
223                debug!("Local execution mode converts a plan with two stages");
224                if second_stage_ids.is_empty() {
225                    // This branch is defensive programming. The semantics should be the same as
226                    // `None`.
227                    self.convert_plan_node(&root_stage.root, &mut None, None, next_executor_id)?
228                } else {
229                    let mut second_stages = HashMap::new();
230                    for second_stage_id in second_stage_ids {
231                        let second_stage =
232                            self.query.stage_graph.stages.get(second_stage_id).unwrap();
233                        second_stages.insert(*second_stage_id, second_stage);
234                    }
235                    let mut stage_id_to_plan = Some(second_stages);
236                    let res = self.convert_plan_node(
237                        &root_stage.root,
238                        &mut stage_id_to_plan,
239                        None,
240                        next_executor_id,
241                    )?;
242                    assert!(
243                        stage_id_to_plan.as_ref().unwrap().is_empty(),
244                        "We expect that all the child stage plan fragments have been used"
245                    );
246                    res
247                }
248            }
249        };
250
251        Ok(PlanFragment {
252            root: Some(plan_node_prost),
253            // Intentionally leave this as `None` as this is the last stage for the frontend
254            // to really get the output of computation, which is single distribution
255            // but we do not need to explicitly specify this.
256            exchange_info: None,
257        })
258    }
259
260    fn convert_plan_node<'a>(
261        &'a self,
262        execution_plan_node: &ExecutionPlanNode,
263        second_stages: &mut Option<HashMap<StageId, &'a QueryStage>>,
264        partition: Option<PartitionInfo>,
265        next_executor_id: Arc<AtomicU32>,
266    ) -> SchedulerResult<PbPlanNode> {
267        let identity = format!(
268            "{:?}-{}",
269            execution_plan_node.plan_node_type,
270            next_executor_id.fetch_add(1, Ordering::Relaxed)
271        );
272        match execution_plan_node.plan_node_type {
273            BatchPlanNodeType::BatchExchange => {
274                let exchange_source_stage_id = execution_plan_node
275                    .source_stage_id
276                    .expect("We expect stage id for Exchange Operator");
277                let Some(second_stages) = second_stages.as_mut() else {
278                    bail!(
279                        "Unexpected exchange detected. We are either converting a single stage plan or converting the second stage of the plan."
280                    )
281                };
282                let second_stage = second_stages.remove(&exchange_source_stage_id).expect(
283                    "We expect child stage fragment for Exchange Operator running in the frontend",
284                );
285                let mut node_body = execution_plan_node.node.clone();
286                let sources = match &mut node_body {
287                    NodeBody::Exchange(exchange_node) => &mut exchange_node.sources,
288                    NodeBody::MergeSortExchange(merge_sort_exchange_node) => {
289                        &mut merge_sort_exchange_node
290                            .exchange
291                            .as_mut()
292                            .expect("MergeSortExchangeNode must have a exchange node")
293                            .sources
294                    }
295                    _ => unreachable!(),
296                };
297                assert!(sources.is_empty());
298
299                let tracing_context = TracingContext::from_current_span().to_protobuf();
300
301                if let Some(table_scan_info) = second_stage.table_scan_info.clone()
302                    && let Some(vnode_bitmaps) = table_scan_info.partitions()
303                {
304                    // Similar to the distributed case (StageRunner::schedule_tasks).
305                    // Set `vnode_ranges` of the scan node in `local_execute_plan` of each
306                    // `exchange_source`.
307                    let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) =
308                        vnode_bitmaps.clone().into_iter().unzip();
309                    let workers = self
310                        .worker_node_manager
311                        .manager
312                        .get_workers_by_worker_slot_ids(&worker_ids)?;
313                    for (idx, (worker_node, partition)) in
314                        (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate()
315                    {
316                        let second_stage_plan_node = self.convert_plan_node(
317                            &second_stage.root,
318                            &mut None,
319                            Some(PartitionInfo::Table(partition)),
320                            next_executor_id.clone(),
321                        )?;
322                        let second_stage_plan_fragment = PlanFragment {
323                            root: Some(second_stage_plan_node),
324                            exchange_info: Some(ExchangeInfo {
325                                mode: DistributionMode::Single as i32,
326                                ..Default::default()
327                            }),
328                        };
329                        let local_execute_plan = LocalExecutePlan {
330                            plan: Some(second_stage_plan_fragment),
331                            tracing_context: tracing_context.clone(),
332                        };
333                        let exchange_source = ExchangeSource {
334                            task_output_id: Some(TaskOutputId {
335                                task_id: Some(PbTaskId {
336                                    task_id: idx as u64,
337                                    stage_id: exchange_source_stage_id.into(),
338                                    query_id: self.query.query_id.id.clone(),
339                                }),
340                                output_id: 0,
341                            }),
342                            host: Some(worker_node.host.as_ref().unwrap().clone()),
343                            local_execute_plan: Some(Plan(local_execute_plan)),
344                        };
345                        sources.push(exchange_source);
346                    }
347                } else if let Some(source_info) = &second_stage.source_info {
348                    // For file source batch read, all the files  to be read  are divide into several parts to prevent the task from taking up too many resources.
349
350                    let chunk_size = (source_info.split_info().unwrap().len() as f32
351                        / (self.worker_node_manager.schedule_unit_count()) as f32)
352                        .ceil() as usize;
353                    for (id, split) in source_info
354                        .split_info()
355                        .unwrap()
356                        .chunks(chunk_size)
357                        .enumerate()
358                    {
359                        let second_stage_plan_node = self.convert_plan_node(
360                            &second_stage.root,
361                            &mut None,
362                            Some(PartitionInfo::Source(split.to_vec())),
363                            next_executor_id.clone(),
364                        )?;
365                        let second_stage_plan_fragment = PlanFragment {
366                            root: Some(second_stage_plan_node),
367                            exchange_info: Some(ExchangeInfo {
368                                mode: DistributionMode::Single as i32,
369                                ..Default::default()
370                            }),
371                        };
372                        let local_execute_plan = LocalExecutePlan {
373                            plan: Some(second_stage_plan_fragment),
374                            tracing_context: tracing_context.clone(),
375                        };
376                        // NOTE: select a random work node here.
377                        let worker_node = self.worker_node_manager.next_random_worker()?;
378                        let exchange_source = ExchangeSource {
379                            task_output_id: Some(TaskOutputId {
380                                task_id: Some(PbTaskId {
381                                    task_id: id as u64,
382                                    stage_id: exchange_source_stage_id.into(),
383                                    query_id: self.query.query_id.id.clone(),
384                                }),
385                                output_id: 0,
386                            }),
387                            host: Some(worker_node.host.as_ref().unwrap().clone()),
388                            local_execute_plan: Some(Plan(local_execute_plan)),
389                        };
390                        sources.push(exchange_source);
391                    }
392                } else if let Some(file_scan_info) = &second_stage.file_scan_info {
393                    let chunk_size = (file_scan_info.file_location.len() as f32
394                        / (self.worker_node_manager.schedule_unit_count()) as f32)
395                        .ceil() as usize;
396                    for (id, files) in file_scan_info.file_location.chunks(chunk_size).enumerate() {
397                        let second_stage_plan_node = self.convert_plan_node(
398                            &second_stage.root,
399                            &mut None,
400                            Some(PartitionInfo::File(files.to_vec())),
401                            next_executor_id.clone(),
402                        )?;
403                        let second_stage_plan_fragment = PlanFragment {
404                            root: Some(second_stage_plan_node),
405                            exchange_info: Some(ExchangeInfo {
406                                mode: DistributionMode::Single as i32,
407                                ..Default::default()
408                            }),
409                        };
410                        let local_execute_plan = LocalExecutePlan {
411                            plan: Some(second_stage_plan_fragment),
412                            tracing_context: tracing_context.clone(),
413                        };
414                        // NOTE: select a random work node here.
415                        let worker_node = self.worker_node_manager.next_random_worker()?;
416                        let exchange_source = ExchangeSource {
417                            task_output_id: Some(TaskOutputId {
418                                task_id: Some(PbTaskId {
419                                    task_id: id as u64,
420                                    stage_id: exchange_source_stage_id.into(),
421                                    query_id: self.query.query_id.id.clone(),
422                                }),
423                                output_id: 0,
424                            }),
425                            host: Some(worker_node.host.as_ref().unwrap().clone()),
426                            local_execute_plan: Some(Plan(local_execute_plan)),
427                        };
428                        sources.push(exchange_source);
429                    }
430                } else {
431                    let second_stage_plan_node = self.convert_plan_node(
432                        &second_stage.root,
433                        &mut None,
434                        None,
435                        next_executor_id,
436                    )?;
437                    let second_stage_plan_fragment = PlanFragment {
438                        root: Some(second_stage_plan_node),
439                        exchange_info: Some(ExchangeInfo {
440                            mode: DistributionMode::Single as i32,
441                            ..Default::default()
442                        }),
443                    };
444
445                    let local_execute_plan = LocalExecutePlan {
446                        plan: Some(second_stage_plan_fragment),
447                        tracing_context,
448                    };
449
450                    let workers = self.choose_worker(second_stage)?;
451                    *sources = workers
452                        .iter()
453                        .enumerate()
454                        .map(|(idx, worker_node)| ExchangeSource {
455                            task_output_id: Some(TaskOutputId {
456                                task_id: Some(PbTaskId {
457                                    task_id: idx as u64,
458                                    stage_id: exchange_source_stage_id.into(),
459                                    query_id: self.query.query_id.id.clone(),
460                                }),
461                                output_id: 0,
462                            }),
463                            host: Some(worker_node.host.as_ref().unwrap().clone()),
464                            local_execute_plan: Some(Plan(local_execute_plan.clone())),
465                        })
466                        .collect();
467                }
468
469                Ok(PbPlanNode {
470                    // Since all the rest plan is embedded into the exchange node,
471                    // there is no children any more.
472                    children: vec![],
473                    identity,
474                    node_body: Some(node_body),
475                })
476            }
477            BatchPlanNodeType::BatchSeqScan => {
478                let mut node_body = execution_plan_node.node.clone();
479                match &mut node_body {
480                    NodeBody::RowSeqScan(scan_node) => {
481                        if let Some(partition) = partition {
482                            let partition = partition
483                                .into_table()
484                                .expect("PartitionInfo should be TablePartitionInfo here");
485                            scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
486                            scan_node.scan_ranges = partition.scan_ranges;
487                        }
488                    }
489                    NodeBody::SysRowSeqScan(_) => {}
490                    _ => unreachable!(),
491                }
492
493                Ok(PbPlanNode {
494                    children: vec![],
495                    identity,
496                    node_body: Some(node_body),
497                })
498            }
499            BatchPlanNodeType::BatchLogSeqScan => {
500                let mut node_body = execution_plan_node.node.clone();
501                match &mut node_body {
502                    NodeBody::LogRowSeqScan(scan_node) => {
503                        if let Some(partition) = partition {
504                            let partition = partition
505                                .into_table()
506                                .expect("PartitionInfo should be TablePartitionInfo here");
507                            scan_node.vnode_bitmap = Some(partition.vnode_bitmap.to_protobuf());
508                        }
509                    }
510                    _ => unreachable!(),
511                }
512
513                Ok(PbPlanNode {
514                    children: vec![],
515                    identity,
516                    node_body: Some(node_body),
517                })
518            }
519            BatchPlanNodeType::BatchFileScan => {
520                let mut node_body = execution_plan_node.node.clone();
521                match &mut node_body {
522                    NodeBody::FileScan(file_scan_node) => {
523                        if let Some(partition) = partition {
524                            let partition = partition
525                                .into_file()
526                                .expect("PartitionInfo should be FilePartitionInfo here");
527                            file_scan_node.file_location = partition;
528                        }
529                    }
530                    _ => unreachable!(),
531                }
532
533                Ok(PbPlanNode {
534                    children: vec![],
535                    identity,
536                    node_body: Some(node_body),
537                })
538            }
539            BatchPlanNodeType::BatchSource | BatchPlanNodeType::BatchKafkaScan => {
540                let mut node_body = execution_plan_node.node.clone();
541                match &mut node_body {
542                    NodeBody::Source(source_node) => {
543                        if let Some(partition) = partition {
544                            let partition = partition
545                                .into_source()
546                                .expect("PartitionInfo should be SourcePartitionInfo here");
547                            source_node.split = partition
548                                .into_iter()
549                                .map(|split| split.encode_to_bytes().into())
550                                .collect_vec();
551                        }
552                    }
553                    _ => unreachable!(),
554                }
555
556                Ok(PbPlanNode {
557                    children: vec![],
558                    identity,
559                    node_body: Some(node_body),
560                })
561            }
562            BatchPlanNodeType::BatchIcebergScan => {
563                let mut node_body = execution_plan_node.node.clone();
564                match &mut node_body {
565                    NodeBody::IcebergScan(iceberg_scan_node) => {
566                        if let Some(partition) = partition {
567                            let partition = partition
568                                .into_source()
569                                .expect("PartitionInfo should be SourcePartitionInfo here");
570                            iceberg_scan_node.split = partition
571                                .into_iter()
572                                .map(|split| split.encode_to_bytes().into())
573                                .collect_vec();
574                        }
575                    }
576                    _ => unreachable!(),
577                }
578
579                Ok(PbPlanNode {
580                    children: vec![],
581                    identity,
582                    node_body: Some(node_body),
583                })
584            }
585            BatchPlanNodeType::BatchLookupJoin => {
586                let mut node_body = execution_plan_node.node.clone();
587                match &mut node_body {
588                    NodeBody::LocalLookupJoin(node) => {
589                        let side_table_desc = node
590                            .inner_side_table_desc
591                            .as_ref()
592                            .expect("no side table desc");
593                        let mapping = self.worker_node_manager.fragment_mapping(
594                            self.get_fragment_id(&side_table_desc.table_id.into())?,
595                        )?;
596
597                        // TODO: should we use `pb::WorkerSlotMapping` here?
598                        node.inner_side_vnode_mapping =
599                            mapping.to_expanded().into_iter().map(u64::from).collect();
600                        node.worker_nodes = self.worker_node_manager.manager.list_compute_nodes();
601                    }
602                    _ => unreachable!(),
603                }
604
605                let left_child = self.convert_plan_node(
606                    &execution_plan_node.children[0],
607                    second_stages,
608                    partition,
609                    next_executor_id,
610                )?;
611
612                Ok(PbPlanNode {
613                    children: vec![left_child],
614                    identity,
615                    node_body: Some(node_body),
616                })
617            }
618            _ => {
619                let children = execution_plan_node
620                    .children
621                    .iter()
622                    .map(|e| {
623                        self.convert_plan_node(
624                            e,
625                            second_stages,
626                            partition.clone(),
627                            next_executor_id.clone(),
628                        )
629                    })
630                    .collect::<SchedulerResult<Vec<PbPlanNode>>>()?;
631
632                Ok(PbPlanNode {
633                    children,
634                    identity,
635                    node_body: Some(execution_plan_node.node.clone()),
636                })
637            }
638        }
639    }
640
641    #[inline(always)]
642    fn get_fragment_id(&self, table_id: &TableId) -> SchedulerResult<FragmentId> {
643        let reader = self.front_env.catalog_reader().read_guard();
644        reader
645            .get_any_table_by_id(table_id)
646            .map(|table| table.fragment_id)
647            .map_err(|e| SchedulerError::Internal(anyhow!(e)))
648    }
649
650    #[inline(always)]
651    fn get_table_dml_vnode_mapping(
652        &self,
653        table_id: &TableId,
654    ) -> SchedulerResult<WorkerSlotMapping> {
655        let guard = self.front_env.catalog_reader().read_guard();
656
657        let table = guard
658            .get_any_table_by_id(table_id)
659            .map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
660
661        let fragment_id = match table.dml_fragment_id.as_ref() {
662            Some(dml_fragment_id) => dml_fragment_id,
663            // Backward compatibility for those table without `dml_fragment_id`.
664            None => &table.fragment_id,
665        };
666
667        self.worker_node_manager
668            .manager
669            .get_streaming_fragment_mapping(fragment_id)
670            .map_err(|e| e.into())
671    }
672
673    fn choose_worker(&self, stage: &QueryStage) -> SchedulerResult<Vec<WorkerNode>> {
674        if let Some(table_id) = stage.dml_table_id.as_ref() {
675            // dml should use streaming vnode mapping
676            let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?;
677            let worker_node = {
678                let worker_ids = vnode_mapping.iter_unique().collect_vec();
679                let candidates = self
680                    .worker_node_manager
681                    .manager
682                    .get_workers_by_worker_slot_ids(&worker_ids)?;
683                if candidates.is_empty() {
684                    return Err(BatchError::EmptyWorkerNodes.into());
685                }
686                candidates[stage.session_id.0 as usize % candidates.len()].clone()
687            };
688            Ok(vec![worker_node])
689        } else {
690            let mut workers = Vec::with_capacity(stage.parallelism.unwrap() as usize);
691            for _ in 0..stage.parallelism.unwrap() {
692                workers.push(self.worker_node_manager.next_random_worker()?);
693            }
694            Ok(workers)
695        }
696    }
697}