risingwave_frontend/scheduler/
fast_insert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use itertools::Itertools;
17use risingwave_batch::error::BatchError;
18use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_pb::common::WorkerNode;
21use risingwave_rpc_client::ComputeClient;
22
23use crate::catalog::TableId;
24use crate::scheduler::{SchedulerError, SchedulerResult};
25use crate::session::FrontendEnv;
26
27pub async fn choose_fast_insert_client(
28    table_id: &TableId,
29    frontend_env: &FrontendEnv,
30    request_id: u32,
31) -> SchedulerResult<ComputeClient> {
32    let worker = choose_worker(table_id, frontend_env, request_id)?;
33    let client = frontend_env.client_pool().get(&worker).await?;
34    Ok(client)
35}
36
37fn get_table_dml_vnode_mapping(
38    table_id: &TableId,
39    frontend_env: &FrontendEnv,
40    worker_node_manager: &WorkerNodeSelector,
41) -> SchedulerResult<WorkerSlotMapping> {
42    let guard = frontend_env.catalog_reader().read_guard();
43
44    let table = guard
45        .get_any_table_by_id(table_id)
46        .map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
47
48    let fragment_id = match table.dml_fragment_id.as_ref() {
49        Some(dml_fragment_id) => dml_fragment_id,
50        // Backward compatibility for those table without `dml_fragment_id`.
51        None => &table.fragment_id,
52    };
53
54    worker_node_manager
55        .manager
56        .get_streaming_fragment_mapping(fragment_id)
57        .map_err(|e| e.into())
58}
59
60fn choose_worker(
61    table_id: &TableId,
62    frontend_env: &FrontendEnv,
63    request_id: u32,
64) -> SchedulerResult<WorkerNode> {
65    let worker_node_manager =
66        WorkerNodeSelector::new(frontend_env.worker_node_manager_ref(), false);
67
68    // dml should use streaming vnode mapping
69    let vnode_mapping = get_table_dml_vnode_mapping(table_id, frontend_env, &worker_node_manager)?;
70    let worker_node = {
71        let worker_ids = vnode_mapping.iter_unique().collect_vec();
72        let candidates = worker_node_manager
73            .manager
74            .get_workers_by_worker_slot_ids(&worker_ids)?;
75        if candidates.is_empty() {
76            return Err(BatchError::EmptyWorkerNodes.into());
77        }
78        candidates[request_id as usize % candidates.len()].clone()
79    };
80    Ok(worker_node)
81}