risingwave_meta/controller/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::cmp::Ordering;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::ops::Add;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21
22use itertools::Itertools;
23use risingwave_common::RW_VERSION;
24use risingwave_common::hash::WorkerSlotId;
25use risingwave_common::util::addr::HostAddr;
26use risingwave_common::util::resource_util::cpu::total_cpu_available;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode};
37use risingwave_pb::meta::subscribe_response::{Info, Operation};
38use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
39use sea_orm::ActiveValue::Set;
40use sea_orm::prelude::Expr;
41use sea_orm::{
42    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43    TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59    env: MetaSrvEnv,
60    max_heartbeat_interval: Duration,
61    inner: RwLock<ClusterControllerInner>,
62    /// Used as timestamp when meta node starts in sec.
63    started_at: u64,
64}
65
66struct WorkerInfo(
67    worker::Model,
68    Option<worker_property::Model>,
69    WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73    fn from(info: WorkerInfo) -> Self {
74        Self {
75            id: info.0.worker_id as _,
76            r#type: PbWorkerType::from(info.0.worker_type) as _,
77            host: Some(PbHostAddress {
78                host: info.0.host,
79                port: info.0.port,
80            }),
81            state: PbState::from(info.0.status) as _,
82            property: info.1.as_ref().map(|p| PbProperty {
83                is_streaming: p.is_streaming,
84                is_serving: p.is_serving,
85                is_unschedulable: p.is_unschedulable,
86                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
87                resource_group: p.resource_group.clone(),
88                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
89            }),
90            transactional_id: info.0.transaction_id.map(|id| id as _),
91            resource: Some(info.2.resource),
92            started_at: info.2.started_at,
93        }
94    }
95}
96
97impl ClusterController {
98    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99        let inner = ClusterControllerInner::new(
100            env.meta_store_ref().conn.clone(),
101            env.opts.disable_automatic_parallelism_control,
102        )
103        .await?;
104        Ok(Self {
105            env,
106            max_heartbeat_interval,
107            inner: RwLock::new(inner),
108            started_at: timestamp_now_sec(),
109        })
110    }
111
112    /// Used in `NotificationService::subscribe`.
113    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
114    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115        self.inner.read().await
116    }
117
118    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119        self.inner.read().await.count_worker_by_type().await
120    }
121
122    pub async fn compute_node_total_cpu_count(&self) -> usize {
123        self.inner.read().await.compute_node_total_cpu_count()
124    }
125
126    async fn update_compute_node_total_cpu_count(&self) -> MetaResult<()> {
127        let total_cpu_cores = self.compute_node_total_cpu_count().await;
128
129        // Update local license manager.
130        LicenseManager::get().update_cpu_core_count(total_cpu_cores);
131        // Notify all other nodes.
132        self.env.notification_manager().notify_all_without_version(
133            Operation::Update, // unused
134            Info::ComputeNodeTotalCpuCount(total_cpu_cores as _),
135        );
136
137        Ok(())
138    }
139
140    /// A worker node will immediately register itself to meta when it bootstraps.
141    /// The meta will assign it with a unique ID and set its state as `Starting`.
142    /// When the worker node is fully ready to serve, it will request meta again
143    /// (via `activate_worker_node`) to set its state to `Running`.
144    pub async fn add_worker(
145        &self,
146        r#type: PbWorkerType,
147        host_address: HostAddress,
148        property: AddNodeProperty,
149        resource: PbResource,
150    ) -> MetaResult<WorkerId> {
151        let worker_id = self
152            .inner
153            .write()
154            .await
155            .add_worker(
156                r#type,
157                host_address,
158                property,
159                resource,
160                self.max_heartbeat_interval,
161            )
162            .await?;
163
164        if r#type == PbWorkerType::ComputeNode {
165            self.update_compute_node_total_cpu_count().await?;
166        }
167
168        Ok(worker_id)
169    }
170
171    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
172        let inner = self.inner.write().await;
173        let worker = inner.activate_worker(worker_id).await?;
174
175        // Notify frontends of new compute node.
176        // Always notify because a running worker's property may have been changed.
177        if worker.r#type() == PbWorkerType::ComputeNode {
178            self.env
179                .notification_manager()
180                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
181                .await;
182        }
183        self.env
184            .notification_manager()
185            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker))
186            .await;
187
188        Ok(())
189    }
190
191    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
192        let worker = self.inner.write().await.delete_worker(host_address).await?;
193
194        if worker.r#type() == PbWorkerType::ComputeNode {
195            self.env
196                .notification_manager()
197                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
198                .await;
199
200            self.update_compute_node_total_cpu_count().await?;
201        }
202
203        // Notify local subscribers.
204        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
205        // local notification.
206        self.env
207            .notification_manager()
208            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()))
209            .await;
210
211        Ok(worker)
212    }
213
214    pub async fn update_schedulability(
215        &self,
216        worker_ids: Vec<WorkerId>,
217        schedulability: Schedulability,
218    ) -> MetaResult<()> {
219        self.inner
220            .write()
221            .await
222            .update_schedulability(worker_ids, schedulability)
223            .await
224    }
225
226    /// Invoked when it receives a heartbeat from a worker node.
227    pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
228        tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
229        self.inner
230            .write()
231            .await
232            .heartbeat(worker_id, self.max_heartbeat_interval)
233    }
234
235    pub fn start_heartbeat_checker(
236        cluster_controller: ClusterControllerRef,
237        check_interval: Duration,
238    ) -> (JoinHandle<()>, Sender<()>) {
239        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
240        let join_handle = tokio::spawn(async move {
241            let mut min_interval = tokio::time::interval(check_interval);
242            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
243            loop {
244                tokio::select! {
245                    // Wait for interval
246                    _ = min_interval.tick() => {},
247                    // Shutdown
248                    _ = &mut shutdown_rx => {
249                        tracing::info!("Heartbeat checker is stopped");
250                        return;
251                    }
252                }
253
254                let mut inner = cluster_controller.inner.write().await;
255                // 1. Initialize new workers' TTL.
256                for worker in inner
257                    .worker_extra_info
258                    .values_mut()
259                    .filter(|worker| worker.expire_at.is_none())
260                {
261                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
262                }
263
264                // 2. Collect expired workers.
265                let now = timestamp_now_sec();
266                let worker_to_delete = inner
267                    .worker_extra_info
268                    .iter()
269                    .filter(|(_, info)| info.expire_at.unwrap() < now)
270                    .map(|(id, _)| *id)
271                    .collect_vec();
272
273                // 3. Delete expired workers.
274                let worker_infos = match Worker::find()
275                    .select_only()
276                    .column(worker::Column::WorkerId)
277                    .column(worker::Column::WorkerType)
278                    .column(worker::Column::Host)
279                    .column(worker::Column::Port)
280                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
281                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
282                    .all(&inner.db)
283                    .await
284                {
285                    Ok(keys) => keys,
286                    Err(err) => {
287                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
288                        continue;
289                    }
290                };
291                drop(inner);
292
293                for (worker_id, worker_type, host, port) in worker_infos {
294                    let host_addr = PbHostAddress { host, port };
295                    match cluster_controller.delete_worker(host_addr.clone()).await {
296                        Ok(_) => {
297                            tracing::warn!(
298                                worker_id,
299                                ?host_addr,
300                                %now,
301                                "Deleted expired worker"
302                            );
303                            match worker_type {
304                                WorkerType::Frontend
305                                | WorkerType::ComputeNode
306                                | WorkerType::Compactor
307                                | WorkerType::RiseCtl => {
308                                    cluster_controller
309                                        .env
310                                        .notification_manager()
311                                        .delete_sender(worker_type.into(), WorkerKey(host_addr))
312                                        .await
313                                }
314                                _ => {}
315                            };
316                        }
317                        Err(err) => {
318                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
319                        }
320                    }
321                }
322            }
323        });
324
325        (join_handle, shutdown_tx)
326    }
327
328    /// Get live nodes with the specified type and state.
329    /// # Arguments
330    /// * `worker_type` `WorkerType` of the nodes
331    /// * `worker_state` Filter by this state if it is not None.
332    pub async fn list_workers(
333        &self,
334        worker_type: Option<WorkerType>,
335        worker_status: Option<WorkerStatus>,
336    ) -> MetaResult<Vec<PbWorkerNode>> {
337        let mut workers = vec![];
338        // fill meta info.
339        if worker_type.is_none() {
340            workers.push(meta_node_info(
341                &self.env.opts.advertise_addr,
342                Some(self.started_at),
343            ));
344        }
345        workers.extend(
346            self.inner
347                .read()
348                .await
349                .list_workers(worker_type, worker_status)
350                .await?,
351        );
352        Ok(workers)
353    }
354
355    pub(crate) async fn subscribe_active_streaming_compute_nodes(
356        &self,
357    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
358        let inner = self.inner.read().await;
359        let worker_nodes = inner.list_active_streaming_workers().await?;
360        let (tx, rx) = unbounded_channel();
361
362        // insert before release the read lock to ensure that we don't lose any update in between
363        self.env
364            .notification_manager()
365            .insert_local_sender(tx)
366            .await;
367        drop(inner);
368        Ok((worker_nodes, rx))
369    }
370
371    /// A convenient method to get all running compute nodes that may have running actors on them
372    /// i.e. CNs which are running
373    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
374        self.inner
375            .read()
376            .await
377            .list_active_streaming_workers()
378            .await
379    }
380
381    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
382        self.inner.read().await.list_active_worker_slots().await
383    }
384
385    /// Get the cluster info used for scheduling a streaming job, containing all nodes that are
386    /// running and schedulable
387    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
388        self.inner.read().await.list_active_serving_workers().await
389    }
390
391    /// Get the cluster info used for scheduling a streaming job.
392    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
393        self.inner.read().await.get_streaming_cluster_info().await
394    }
395
396    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
397        self.inner.read().await.get_worker_by_id(worker_id).await
398    }
399
400    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
401        self.inner
402            .read()
403            .await
404            .get_worker_extra_info_by_id(worker_id)
405    }
406
407    pub fn cluster_id(&self) -> &ClusterId {
408        self.env.cluster_id()
409    }
410
411    pub fn meta_store_endpoint(&self) -> String {
412        self.env.meta_store_ref().endpoint.clone()
413    }
414}
415
416/// The cluster info used for scheduling a streaming job.
417#[derive(Debug, Clone)]
418pub struct StreamingClusterInfo {
419    /// All **active** compute nodes in the cluster.
420    pub worker_nodes: HashMap<u32, WorkerNode>,
421
422    /// All schedulable compute nodes in the cluster. Normally for resource group based scheduling.
423    pub schedulable_workers: HashSet<u32>,
424
425    /// All unschedulable compute nodes in the cluster.
426    pub unschedulable_workers: HashSet<u32>,
427}
428
429// Encapsulating the use of parallelism
430impl StreamingClusterInfo {
431    pub fn parallelism(&self, resource_group: String) -> usize {
432        let available_worker_ids =
433            filter_workers_by_resource_group(&self.worker_nodes, resource_group.as_str());
434
435        self.worker_nodes
436            .values()
437            .filter(|worker| available_worker_ids.contains(&(worker.id as WorkerId)))
438            .map(|worker| worker.compute_node_parallelism())
439            .sum()
440    }
441
442    pub fn filter_schedulable_workers_by_resource_group(
443        &self,
444        resource_group: &str,
445    ) -> HashMap<u32, WorkerNode> {
446        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
447        self.worker_nodes
448            .iter()
449            .filter(|(id, _)| worker_ids.contains(&(**id as WorkerId)))
450            .map(|(id, worker)| (*id, worker.clone()))
451            .collect()
452    }
453}
454
455#[derive(Default, Clone)]
456pub struct WorkerExtraInfo {
457    // Volatile values updated by meta node as follows.
458    //
459    // Unix timestamp that the worker will expire at.
460    expire_at: Option<u64>,
461    started_at: Option<u64>,
462    resource: PbResource,
463    r#type: PbWorkerType,
464}
465
466impl WorkerExtraInfo {
467    fn update_ttl(&mut self, ttl: Duration) {
468        let expire = cmp::max(
469            self.expire_at.unwrap_or_default(),
470            SystemTime::now()
471                .add(ttl)
472                .duration_since(SystemTime::UNIX_EPOCH)
473                .expect("Clock may have gone backwards")
474                .as_secs(),
475        );
476        self.expire_at = Some(expire);
477    }
478
479    fn update_started_at(&mut self) {
480        self.started_at = Some(timestamp_now_sec());
481    }
482}
483
484fn timestamp_now_sec() -> u64 {
485    SystemTime::now()
486        .duration_since(SystemTime::UNIX_EPOCH)
487        .expect("Clock may have gone backwards")
488        .as_secs()
489}
490
491fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
492    PbWorkerNode {
493        id: META_NODE_ID,
494        r#type: PbWorkerType::Meta.into(),
495        host: HostAddr::try_from(host)
496            .as_ref()
497            .map(HostAddr::to_protobuf)
498            .ok(),
499        state: PbState::Running as _,
500        property: None,
501        transactional_id: None,
502        resource: Some(risingwave_pb::common::worker_node::Resource {
503            rw_version: RW_VERSION.to_owned(),
504            total_memory_bytes: system_memory_available_bytes() as _,
505            total_cpu_cores: total_cpu_available() as _,
506        }),
507        started_at,
508    }
509}
510
511pub struct ClusterControllerInner {
512    db: DatabaseConnection,
513    /// Record for tracking available machine ids, one is available.
514    available_transactional_ids: VecDeque<TransactionId>,
515    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
516    disable_automatic_parallelism_control: bool,
517}
518
519impl ClusterControllerInner {
520    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
521    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
522
523    pub async fn new(
524        db: DatabaseConnection,
525        disable_automatic_parallelism_control: bool,
526    ) -> MetaResult<Self> {
527        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
528            .select_only()
529            .column(worker::Column::WorkerId)
530            .column(worker::Column::TransactionId)
531            .into_tuple()
532            .all(&db)
533            .await?;
534        let inuse_txn_ids: HashSet<_> = workers
535            .iter()
536            .cloned()
537            .filter_map(|(_, txn_id)| txn_id)
538            .collect();
539        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
540            .filter(|id| !inuse_txn_ids.contains(id))
541            .collect();
542
543        let worker_extra_info = workers
544            .into_iter()
545            .map(|(w, _)| (w, WorkerExtraInfo::default()))
546            .collect();
547
548        Ok(Self {
549            db,
550            available_transactional_ids,
551            worker_extra_info,
552            disable_automatic_parallelism_control,
553        })
554    }
555
556    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
557        let workers: Vec<(WorkerType, i64)> = Worker::find()
558            .select_only()
559            .column(worker::Column::WorkerType)
560            .column_as(worker::Column::WorkerId.count(), "count")
561            .group_by(worker::Column::WorkerType)
562            .into_tuple()
563            .all(&self.db)
564            .await?;
565
566        Ok(workers.into_iter().collect())
567    }
568
569    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
570        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
571            let expire = cmp::max(
572                info.expire_at.unwrap_or_default(),
573                SystemTime::now()
574                    .add(ttl)
575                    .duration_since(SystemTime::UNIX_EPOCH)
576                    .expect("Clock may have gone backwards")
577                    .as_secs(),
578            );
579            info.expire_at = Some(expire);
580            Ok(())
581        } else {
582            Err(MetaError::invalid_worker(worker_id, "worker not found"))
583        }
584    }
585
586    fn update_resource_and_started_at(
587        &mut self,
588        worker_id: WorkerId,
589        resource: PbResource,
590    ) -> MetaResult<()> {
591        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
592            info.resource = resource;
593            info.update_started_at();
594            Ok(())
595        } else {
596            Err(MetaError::invalid_worker(worker_id, "worker not found"))
597        }
598    }
599
600    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
601        self.worker_extra_info
602            .get(&worker_id)
603            .cloned()
604            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
605    }
606
607    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
608        match (self.available_transactional_ids.front(), r#type) {
609            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
610            // We only assign transactional id to compute node and frontend.
611            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
612            _ => Ok(None),
613        }
614    }
615
616    fn compute_node_total_cpu_count(&self) -> usize {
617        self.worker_extra_info
618            .values()
619            .filter(|info| info.r#type == PbWorkerType::ComputeNode)
620            .map(|info| info.resource.total_cpu_cores as usize)
621            .sum()
622    }
623
624    pub async fn add_worker(
625        &mut self,
626        r#type: PbWorkerType,
627        host_address: HostAddress,
628        add_property: AddNodeProperty,
629        resource: PbResource,
630        ttl: Duration,
631    ) -> MetaResult<WorkerId> {
632        let txn = self.db.begin().await?;
633
634        let worker = Worker::find()
635            .filter(
636                worker::Column::Host
637                    .eq(host_address.host.clone())
638                    .and(worker::Column::Port.eq(host_address.port)),
639            )
640            .find_also_related(WorkerProperty)
641            .one(&txn)
642            .await?;
643        // Worker already exist.
644        if let Some((worker, property)) = worker {
645            assert_eq!(worker.worker_type, r#type.into());
646            return if worker.worker_type == WorkerType::ComputeNode {
647                let property = property.unwrap();
648                let mut current_parallelism = property.parallelism as usize;
649                let new_parallelism = add_property.parallelism as usize;
650                match new_parallelism.cmp(&current_parallelism) {
651                    Ordering::Less => {
652                        if !self.disable_automatic_parallelism_control {
653                            // Handing over to the subsequent recovery loop for a forced reschedule.
654                            tracing::info!(
655                                "worker {} parallelism reduced from {} to {}",
656                                worker.worker_id,
657                                current_parallelism,
658                                new_parallelism
659                            );
660                            current_parallelism = new_parallelism;
661                        } else {
662                            // Warn and keep the original parallelism if the worker registered with a
663                            // smaller parallelism.
664                            tracing::warn!(
665                                "worker {} parallelism is less than current, current is {}, but received {}",
666                                worker.worker_id,
667                                current_parallelism,
668                                new_parallelism
669                            );
670                        }
671                    }
672                    Ordering::Greater => {
673                        tracing::info!(
674                            "worker {} parallelism updated from {} to {}",
675                            worker.worker_id,
676                            current_parallelism,
677                            new_parallelism
678                        );
679                        current_parallelism = new_parallelism;
680                    }
681                    Ordering::Equal => {}
682                }
683                let mut property: worker_property::ActiveModel = property.into();
684
685                // keep `is_unschedulable` unchanged.
686                property.is_streaming = Set(add_property.is_streaming);
687                property.is_serving = Set(add_property.is_serving);
688                property.parallelism = Set(current_parallelism as _);
689                property.resource_group =
690                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
691                        tracing::warn!(
692                            "resource_group is not set for worker {}, fallback to `default`",
693                            worker.worker_id
694                        );
695                        DEFAULT_RESOURCE_GROUP.to_owned()
696                    })));
697
698                WorkerProperty::update(property).exec(&txn).await?;
699                txn.commit().await?;
700                self.update_worker_ttl(worker.worker_id, ttl)?;
701                self.update_resource_and_started_at(worker.worker_id, resource)?;
702                Ok(worker.worker_id)
703            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
704                let worker_property = worker_property::ActiveModel {
705                    worker_id: Set(worker.worker_id),
706                    parallelism: Set(add_property
707                        .parallelism
708                        .try_into()
709                        .expect("invalid parallelism")),
710                    is_streaming: Set(add_property.is_streaming),
711                    is_serving: Set(add_property.is_serving),
712                    is_unschedulable: Set(add_property.is_unschedulable),
713                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
714                    resource_group: Set(None),
715                };
716                WorkerProperty::insert(worker_property).exec(&txn).await?;
717                txn.commit().await?;
718                self.update_worker_ttl(worker.worker_id, ttl)?;
719                self.update_resource_and_started_at(worker.worker_id, resource)?;
720                Ok(worker.worker_id)
721            } else {
722                self.update_worker_ttl(worker.worker_id, ttl)?;
723                self.update_resource_and_started_at(worker.worker_id, resource)?;
724                Ok(worker.worker_id)
725            };
726        }
727        let txn_id = self.apply_transaction_id(r#type)?;
728
729        let worker = worker::ActiveModel {
730            worker_id: Default::default(),
731            worker_type: Set(r#type.into()),
732            host: Set(host_address.host),
733            port: Set(host_address.port),
734            status: Set(WorkerStatus::Starting),
735            transaction_id: Set(txn_id),
736        };
737        let insert_res = Worker::insert(worker).exec(&txn).await?;
738        let worker_id = insert_res.last_insert_id as WorkerId;
739        if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend {
740            let property = worker_property::ActiveModel {
741                worker_id: Set(worker_id),
742                parallelism: Set(add_property
743                    .parallelism
744                    .try_into()
745                    .expect("invalid parallelism")),
746                is_streaming: Set(add_property.is_streaming),
747                is_serving: Set(add_property.is_serving),
748                is_unschedulable: Set(add_property.is_unschedulable),
749                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
750                resource_group: if r#type == PbWorkerType::ComputeNode {
751                    Set(add_property.resource_group.clone())
752                } else {
753                    Set(None)
754                },
755            };
756            WorkerProperty::insert(property).exec(&txn).await?;
757        }
758
759        txn.commit().await?;
760        if let Some(txn_id) = txn_id {
761            self.available_transactional_ids.retain(|id| *id != txn_id);
762        }
763        let extra_info = WorkerExtraInfo {
764            started_at: Some(timestamp_now_sec()),
765            expire_at: None,
766            resource,
767            r#type,
768        };
769        self.worker_extra_info.insert(worker_id, extra_info);
770
771        Ok(worker_id)
772    }
773
774    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
775        let worker = worker::ActiveModel {
776            worker_id: Set(worker_id),
777            status: Set(WorkerStatus::Running),
778            ..Default::default()
779        };
780
781        let worker = worker.update(&self.db).await?;
782        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
783            .one(&self.db)
784            .await?;
785        let extra_info = self.get_extra_info_checked(worker_id)?;
786        Ok(WorkerInfo(worker, worker_property, extra_info).into())
787    }
788
789    pub async fn update_schedulability(
790        &self,
791        worker_ids: Vec<WorkerId>,
792        schedulability: Schedulability,
793    ) -> MetaResult<()> {
794        let is_unschedulable = schedulability == Schedulability::Unschedulable;
795        WorkerProperty::update_many()
796            .col_expr(
797                worker_property::Column::IsUnschedulable,
798                Expr::value(is_unschedulable),
799            )
800            .filter(worker_property::Column::WorkerId.is_in(worker_ids))
801            .exec(&self.db)
802            .await?;
803
804        Ok(())
805    }
806
807    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
808        let worker = Worker::find()
809            .filter(
810                worker::Column::Host
811                    .eq(host_addr.host)
812                    .and(worker::Column::Port.eq(host_addr.port)),
813            )
814            .find_also_related(WorkerProperty)
815            .one(&self.db)
816            .await?;
817        let Some((worker, property)) = worker else {
818            return Err(MetaError::invalid_parameter("worker not found!"));
819        };
820
821        let res = Worker::delete_by_id(worker.worker_id)
822            .exec(&self.db)
823            .await?;
824        if res.rows_affected == 0 {
825            return Err(MetaError::invalid_parameter("worker not found!"));
826        }
827
828        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
829        if let Some(txn_id) = &worker.transaction_id {
830            self.available_transactional_ids.push_back(*txn_id);
831        }
832        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
833
834        Ok(worker)
835    }
836
837    pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
838        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
839            worker_info.update_ttl(ttl);
840            Ok(())
841        } else {
842            Err(MetaError::invalid_worker(worker_id, "worker not found"))
843        }
844    }
845
846    pub async fn list_workers(
847        &self,
848        worker_type: Option<WorkerType>,
849        worker_status: Option<WorkerStatus>,
850    ) -> MetaResult<Vec<PbWorkerNode>> {
851        let mut find = Worker::find();
852        if let Some(worker_type) = worker_type {
853            find = find.filter(worker::Column::WorkerType.eq(worker_type));
854        }
855        if let Some(worker_status) = worker_status {
856            find = find.filter(worker::Column::Status.eq(worker_status));
857        }
858        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
859        Ok(workers
860            .into_iter()
861            .map(|(worker, property)| {
862                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
863                WorkerInfo(worker, property, extra_info).into()
864            })
865            .collect_vec())
866    }
867
868    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
869        let workers = Worker::find()
870            .filter(
871                worker::Column::WorkerType
872                    .eq(WorkerType::ComputeNode)
873                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
874            )
875            .inner_join(WorkerProperty)
876            .select_also(WorkerProperty)
877            .filter(worker_property::Column::IsStreaming.eq(true))
878            .all(&self.db)
879            .await?;
880
881        Ok(workers
882            .into_iter()
883            .map(|(worker, property)| {
884                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
885                WorkerInfo(worker, property, extra_info).into()
886            })
887            .collect_vec())
888    }
889
890    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
891        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
892            .select_only()
893            .column(worker_property::Column::WorkerId)
894            .column(worker_property::Column::Parallelism)
895            .inner_join(Worker)
896            .filter(worker::Column::Status.eq(WorkerStatus::Running))
897            .into_tuple()
898            .all(&self.db)
899            .await?;
900        Ok(worker_parallelisms
901            .into_iter()
902            .flat_map(|(worker_id, parallelism)| {
903                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id as u32, idx as usize))
904            })
905            .collect_vec())
906    }
907
908    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
909        let workers = Worker::find()
910            .filter(
911                worker::Column::WorkerType
912                    .eq(WorkerType::ComputeNode)
913                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
914            )
915            .inner_join(WorkerProperty)
916            .select_also(WorkerProperty)
917            .filter(worker_property::Column::IsServing.eq(true))
918            .all(&self.db)
919            .await?;
920
921        Ok(workers
922            .into_iter()
923            .map(|(worker, property)| {
924                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
925                WorkerInfo(worker, property, extra_info).into()
926            })
927            .collect_vec())
928    }
929
930    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
931        let mut streaming_workers = self.list_active_streaming_workers().await?;
932
933        let unschedulable_workers: HashSet<_> = streaming_workers
934            .extract_if(.., |worker| {
935                worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
936            })
937            .map(|w| w.id)
938            .collect();
939
940        let schedulable_workers = streaming_workers
941            .iter()
942            .map(|worker| worker.id)
943            .filter(|id| !unschedulable_workers.contains(id))
944            .collect();
945
946        let active_workers: HashMap<_, _> =
947            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
948
949        Ok(StreamingClusterInfo {
950            worker_nodes: active_workers,
951            schedulable_workers,
952            unschedulable_workers,
953        })
954    }
955
956    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
957        let worker = Worker::find_by_id(worker_id)
958            .find_also_related(WorkerProperty)
959            .one(&self.db)
960            .await?;
961        if worker.is_none() {
962            return Ok(None);
963        }
964        let extra_info = self.get_extra_info_checked(worker_id)?;
965        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
966    }
967
968    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
969        self.worker_extra_info.get(&worker_id).cloned()
970    }
971}
972
973#[cfg(test)]
974mod tests {
975    use super::*;
976
977    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
978        (0..count)
979            .map(|i| HostAddress {
980                host: "localhost".to_owned(),
981                port: 5000 + i as i32,
982            })
983            .collect_vec()
984    }
985
986    #[tokio::test]
987    async fn test_cluster_controller() -> MetaResult<()> {
988        let env = MetaSrvEnv::for_test().await;
989        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
990
991        let parallelism_num = 4_usize;
992        let worker_count = 5_usize;
993        let property = AddNodeProperty {
994            parallelism: parallelism_num as _,
995            is_streaming: true,
996            is_serving: true,
997            is_unschedulable: false,
998            ..Default::default()
999        };
1000        let hosts = mock_worker_hosts_for_test(worker_count);
1001        let mut worker_ids = vec![];
1002        for host in &hosts {
1003            worker_ids.push(
1004                cluster_ctl
1005                    .add_worker(
1006                        PbWorkerType::ComputeNode,
1007                        host.clone(),
1008                        property.clone(),
1009                        PbResource::default(),
1010                    )
1011                    .await?,
1012            );
1013        }
1014
1015        // Since no worker is active, the parallelism should be 0.
1016        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1017
1018        for id in &worker_ids {
1019            cluster_ctl.activate_worker(*id).await?;
1020        }
1021        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1022        assert_eq!(
1023            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1024            worker_count
1025        );
1026        assert_eq!(
1027            cluster_ctl.list_active_streaming_workers().await?.len(),
1028            worker_count
1029        );
1030        assert_eq!(
1031            cluster_ctl.list_active_serving_workers().await?.len(),
1032            worker_count
1033        );
1034        assert_eq!(
1035            cluster_ctl.list_active_worker_slots().await?.len(),
1036            parallelism_num * worker_count
1037        );
1038
1039        // re-register existing worker node with larger parallelism and change its serving mode.
1040        let mut new_property = property.clone();
1041        new_property.parallelism = (parallelism_num * 2) as _;
1042        new_property.is_serving = false;
1043        cluster_ctl
1044            .add_worker(
1045                PbWorkerType::ComputeNode,
1046                hosts[0].clone(),
1047                new_property,
1048                PbResource::default(),
1049            )
1050            .await?;
1051
1052        assert_eq!(
1053            cluster_ctl.list_active_streaming_workers().await?.len(),
1054            worker_count
1055        );
1056        assert_eq!(
1057            cluster_ctl.list_active_serving_workers().await?.len(),
1058            worker_count - 1
1059        );
1060        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1061        assert!(worker_slots.iter().all_unique());
1062        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1063
1064        // delete workers.
1065        for host in hosts {
1066            cluster_ctl.delete_worker(host).await?;
1067        }
1068        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1069        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1070        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1071
1072        Ok(())
1073    }
1074
1075    #[tokio::test]
1076    async fn test_update_schedulability() -> MetaResult<()> {
1077        let env = MetaSrvEnv::for_test().await;
1078        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1079
1080        let host = HostAddress {
1081            host: "localhost".to_owned(),
1082            port: 5001,
1083        };
1084        let mut property = AddNodeProperty {
1085            is_streaming: true,
1086            is_serving: true,
1087            is_unschedulable: false,
1088            parallelism: 4,
1089            ..Default::default()
1090        };
1091        let worker_id = cluster_ctl
1092            .add_worker(
1093                PbWorkerType::ComputeNode,
1094                host.clone(),
1095                property.clone(),
1096                PbResource::default(),
1097            )
1098            .await?;
1099
1100        cluster_ctl.activate_worker(worker_id).await?;
1101        cluster_ctl
1102            .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1103            .await?;
1104
1105        let workers = cluster_ctl.list_active_streaming_workers().await?;
1106        assert_eq!(workers.len(), 1);
1107        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1108
1109        // re-register existing worker node and change its serving mode, the schedulable state should not be changed.
1110        property.is_unschedulable = false;
1111        property.is_serving = false;
1112        let new_worker_id = cluster_ctl
1113            .add_worker(
1114                PbWorkerType::ComputeNode,
1115                host.clone(),
1116                property,
1117                PbResource::default(),
1118            )
1119            .await?;
1120        assert_eq!(worker_id, new_worker_id);
1121
1122        let workers = cluster_ctl.list_active_streaming_workers().await?;
1123        assert_eq!(workers.len(), 1);
1124        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1125
1126        cluster_ctl.delete_worker(host).await?;
1127
1128        Ok(())
1129    }
1130}