risingwave_frontend/scheduler/
fast_insert.rs1use anyhow::anyhow;
16use itertools::Itertools;
17use risingwave_batch::error::BatchError;
18use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_pb::common::WorkerNode;
21use risingwave_rpc_client::ComputeClient;
22
23use crate::catalog::TableId;
24use crate::scheduler::{SchedulerError, SchedulerResult};
25use crate::session::FrontendEnv;
26
27pub async fn choose_fast_insert_client(
28 table_id: &TableId,
29 frontend_env: &FrontendEnv,
30 request_id: u32,
31) -> SchedulerResult<ComputeClient> {
32 let worker = choose_worker(table_id, frontend_env, request_id)?;
33 let client = frontend_env.client_pool().get(&worker).await?;
34 Ok(client)
35}
36
37fn get_table_dml_vnode_mapping(
38 table_id: &TableId,
39 frontend_env: &FrontendEnv,
40 worker_node_manager: &WorkerNodeSelector,
41) -> SchedulerResult<WorkerSlotMapping> {
42 let guard = frontend_env.catalog_reader().read_guard();
43
44 let table = guard
45 .get_any_table_by_id(table_id)
46 .map_err(|e| SchedulerError::Internal(anyhow!(e)))?;
47
48 let fragment_id = match table.dml_fragment_id.as_ref() {
49 Some(dml_fragment_id) => dml_fragment_id,
50 None => &table.fragment_id,
52 };
53
54 worker_node_manager
55 .manager
56 .get_streaming_fragment_mapping(fragment_id)
57 .map_err(|e| e.into())
58}
59
60fn choose_worker(
61 table_id: &TableId,
62 frontend_env: &FrontendEnv,
63 request_id: u32,
64) -> SchedulerResult<WorkerNode> {
65 let worker_node_manager =
66 WorkerNodeSelector::new(frontend_env.worker_node_manager_ref(), false);
67
68 let vnode_mapping = get_table_dml_vnode_mapping(table_id, frontend_env, &worker_node_manager)?;
70 let worker_node = {
71 let worker_ids = vnode_mapping.iter_unique().collect_vec();
72 let candidates = worker_node_manager
73 .manager
74 .get_workers_by_worker_slot_ids(&worker_ids)?;
75 if candidates.is_empty() {
76 return Err(BatchError::EmptyWorkerNodes.into());
77 }
78 candidates[request_id as usize % candidates.len()].clone()
79 };
80 Ok(worker_node)
81}