risingwave_meta/controller/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37    ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45    TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61    env: MetaSrvEnv,
62    max_heartbeat_interval: Duration,
63    inner: RwLock<ClusterControllerInner>,
64    /// Used as timestamp when meta node starts in sec.
65    started_at: u64,
66}
67
68struct WorkerInfo(
69    worker::Model,
70    Option<worker_property::Model>,
71    WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75    fn from(info: WorkerInfo) -> Self {
76        Self {
77            id: info.0.worker_id as _,
78            r#type: PbWorkerType::from(info.0.worker_type) as _,
79            host: Some(PbHostAddress {
80                host: info.0.host,
81                port: info.0.port,
82            }),
83            state: PbState::from(info.0.status) as _,
84            property: info.1.as_ref().map(|p| PbProperty {
85                is_streaming: p.is_streaming,
86                is_serving: p.is_serving,
87                is_unschedulable: p.is_unschedulable,
88                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89                resource_group: p.resource_group.clone(),
90                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91            }),
92            transactional_id: info.0.transaction_id.map(|id| id as _),
93            resource: Some(info.2.resource),
94            started_at: info.2.started_at,
95        }
96    }
97}
98
99impl ClusterController {
100    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
101        let inner = ClusterControllerInner::new(
102            env.meta_store_ref().conn.clone(),
103            env.opts.disable_automatic_parallelism_control,
104        )
105        .await?;
106        Ok(Self {
107            env,
108            max_heartbeat_interval,
109            inner: RwLock::new(inner),
110            started_at: timestamp_now_sec(),
111        })
112    }
113
114    /// Used in `NotificationService::subscribe`.
115    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
116    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
117        self.inner.read().await
118    }
119
120    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
121        self.inner.read().await.count_worker_by_type().await
122    }
123
124    /// Get the total resource of the cluster.
125    pub async fn cluster_resource(&self) -> ClusterResource {
126        self.inner.read().await.cluster_resource()
127    }
128
129    /// Get the total resource of the cluster, then update license manager and notify all other nodes.
130    async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
131        let resource = self.cluster_resource().await;
132
133        // Update local license manager.
134        LicenseManager::get().update_cluster_resource(resource);
135        // Notify all other nodes.
136        self.env.notification_manager().notify_all_without_version(
137            Operation::Update, // unused
138            Info::ClusterResource(resource),
139        );
140
141        Ok(())
142    }
143
144    /// A worker node will immediately register itself to meta when it bootstraps.
145    /// The meta will assign it with a unique ID and set its state as `Starting`.
146    /// When the worker node is fully ready to serve, it will request meta again
147    /// (via `activate_worker_node`) to set its state to `Running`.
148    pub async fn add_worker(
149        &self,
150        r#type: PbWorkerType,
151        host_address: HostAddress,
152        property: AddNodeProperty,
153        resource: PbResource,
154    ) -> MetaResult<WorkerId> {
155        let worker_id = self
156            .inner
157            .write()
158            .await
159            .add_worker(
160                r#type,
161                host_address,
162                property,
163                resource,
164                self.max_heartbeat_interval,
165            )
166            .await?;
167
168        self.update_cluster_resource_for_license().await?;
169
170        Ok(worker_id)
171    }
172
173    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
174        let inner = self.inner.write().await;
175        let worker = inner.activate_worker(worker_id).await?;
176
177        // Notify frontends of new compute node and frontend node.
178        // Always notify because a running worker's property may have been changed.
179        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
180        {
181            self.env
182                .notification_manager()
183                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
184                .await;
185        }
186        self.env
187            .notification_manager()
188            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
189
190        Ok(())
191    }
192
193    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
194        let worker = self.inner.write().await.delete_worker(host_address).await?;
195
196        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
197        {
198            self.env
199                .notification_manager()
200                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
201                .await;
202            if worker.r#type() == PbWorkerType::ComputeNode {
203                self.update_cluster_resource_for_license().await?;
204            }
205        }
206
207        // Notify local subscribers.
208        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
209        // local notification.
210        self.env
211            .notification_manager()
212            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
213
214        Ok(worker)
215    }
216
217    pub async fn update_schedulability(
218        &self,
219        worker_ids: Vec<WorkerId>,
220        schedulability: Schedulability,
221    ) -> MetaResult<()> {
222        self.inner
223            .write()
224            .await
225            .update_schedulability(worker_ids, schedulability)
226            .await
227    }
228
229    /// Invoked when it receives a heartbeat from a worker node.
230    pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
231        tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
232        self.inner
233            .write()
234            .await
235            .heartbeat(worker_id, self.max_heartbeat_interval)
236    }
237
238    pub fn start_heartbeat_checker(
239        cluster_controller: ClusterControllerRef,
240        check_interval: Duration,
241    ) -> (JoinHandle<()>, Sender<()>) {
242        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
243        let join_handle = tokio::spawn(async move {
244            let mut min_interval = tokio::time::interval(check_interval);
245            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
246            loop {
247                tokio::select! {
248                    // Wait for interval
249                    _ = min_interval.tick() => {},
250                    // Shutdown
251                    _ = &mut shutdown_rx => {
252                        tracing::info!("Heartbeat checker is stopped");
253                        return;
254                    }
255                }
256
257                let mut inner = cluster_controller.inner.write().await;
258                // 1. Initialize new workers' TTL.
259                for worker in inner
260                    .worker_extra_info
261                    .values_mut()
262                    .filter(|worker| worker.expire_at.is_none())
263                {
264                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
265                }
266
267                // 2. Collect expired workers.
268                let now = timestamp_now_sec();
269                let worker_to_delete = inner
270                    .worker_extra_info
271                    .iter()
272                    .filter(|(_, info)| info.expire_at.unwrap() < now)
273                    .map(|(id, _)| *id)
274                    .collect_vec();
275
276                // 3. Delete expired workers.
277                let worker_infos = match Worker::find()
278                    .select_only()
279                    .column(worker::Column::WorkerId)
280                    .column(worker::Column::WorkerType)
281                    .column(worker::Column::Host)
282                    .column(worker::Column::Port)
283                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
284                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
285                    .all(&inner.db)
286                    .await
287                {
288                    Ok(keys) => keys,
289                    Err(err) => {
290                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
291                        continue;
292                    }
293                };
294                drop(inner);
295
296                for (worker_id, worker_type, host, port) in worker_infos {
297                    let host_addr = PbHostAddress { host, port };
298                    match cluster_controller.delete_worker(host_addr.clone()).await {
299                        Ok(_) => {
300                            tracing::warn!(
301                                worker_id,
302                                ?host_addr,
303                                %now,
304                                "Deleted expired worker"
305                            );
306                            match worker_type {
307                                WorkerType::Frontend
308                                | WorkerType::ComputeNode
309                                | WorkerType::Compactor
310                                | WorkerType::RiseCtl => cluster_controller
311                                    .env
312                                    .notification_manager()
313                                    .delete_sender(worker_type.into(), WorkerKey(host_addr)),
314                                _ => {}
315                            };
316                        }
317                        Err(err) => {
318                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
319                        }
320                    }
321                }
322            }
323        });
324
325        (join_handle, shutdown_tx)
326    }
327
328    /// Get live nodes with the specified type and state.
329    /// # Arguments
330    /// * `worker_type` `WorkerType` of the nodes
331    /// * `worker_state` Filter by this state if it is not None.
332    pub async fn list_workers(
333        &self,
334        worker_type: Option<WorkerType>,
335        worker_status: Option<WorkerStatus>,
336    ) -> MetaResult<Vec<PbWorkerNode>> {
337        let mut workers = vec![];
338        // fill meta info.
339        if worker_type.is_none() {
340            workers.push(meta_node_info(
341                &self.env.opts.advertise_addr,
342                Some(self.started_at),
343            ));
344        }
345        workers.extend(
346            self.inner
347                .read()
348                .await
349                .list_workers(worker_type, worker_status)
350                .await?,
351        );
352        Ok(workers)
353    }
354
355    pub(crate) async fn subscribe_active_streaming_compute_nodes(
356        &self,
357    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
358        let inner = self.inner.read().await;
359        let worker_nodes = inner.list_active_streaming_workers().await?;
360        let (tx, rx) = unbounded_channel();
361
362        // insert before release the read lock to ensure that we don't lose any update in between
363        self.env.notification_manager().insert_local_sender(tx);
364        drop(inner);
365        Ok((worker_nodes, rx))
366    }
367
368    /// A convenient method to get all running compute nodes that may have running actors on them
369    /// i.e. CNs which are running
370    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
371        self.inner
372            .read()
373            .await
374            .list_active_streaming_workers()
375            .await
376    }
377
378    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
379        self.inner.read().await.list_active_worker_slots().await
380    }
381
382    /// Get the cluster info used for scheduling a streaming job, containing all nodes that are
383    /// running and schedulable
384    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
385        self.inner.read().await.list_active_serving_workers().await
386    }
387
388    /// Get the cluster info used for scheduling a streaming job.
389    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
390        self.inner.read().await.get_streaming_cluster_info().await
391    }
392
393    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
394        self.inner.read().await.get_worker_by_id(worker_id).await
395    }
396
397    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
398        self.inner
399            .read()
400            .await
401            .get_worker_extra_info_by_id(worker_id)
402    }
403
404    pub fn cluster_id(&self) -> &ClusterId {
405        self.env.cluster_id()
406    }
407
408    pub fn meta_store_endpoint(&self) -> String {
409        self.env.meta_store_ref().endpoint.clone()
410    }
411}
412
413/// The cluster info used for scheduling a streaming job.
414#[derive(Debug, Clone)]
415pub struct StreamingClusterInfo {
416    /// All **active** compute nodes in the cluster.
417    pub worker_nodes: HashMap<u32, WorkerNode>,
418
419    /// All schedulable compute nodes in the cluster. Normally for resource group based scheduling.
420    pub schedulable_workers: HashSet<u32>,
421
422    /// All unschedulable compute nodes in the cluster.
423    pub unschedulable_workers: HashSet<u32>,
424}
425
426// Encapsulating the use of parallelism
427impl StreamingClusterInfo {
428    pub fn parallelism(&self, resource_group: &str) -> usize {
429        let available_worker_ids =
430            filter_workers_by_resource_group(&self.worker_nodes, resource_group);
431
432        self.worker_nodes
433            .values()
434            .filter(|worker| available_worker_ids.contains(&(worker.id as WorkerId)))
435            .map(|worker| worker.compute_node_parallelism())
436            .sum()
437    }
438
439    pub fn filter_schedulable_workers_by_resource_group(
440        &self,
441        resource_group: &str,
442    ) -> HashMap<u32, WorkerNode> {
443        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
444        self.worker_nodes
445            .iter()
446            .filter(|(id, _)| worker_ids.contains(&(**id as WorkerId)))
447            .map(|(id, worker)| (*id, worker.clone()))
448            .collect()
449    }
450}
451
452#[derive(Default, Clone, Debug)]
453pub struct WorkerExtraInfo {
454    // Volatile values updated by meta node as follows.
455    //
456    // Unix timestamp that the worker will expire at.
457    expire_at: Option<u64>,
458    started_at: Option<u64>,
459    resource: PbResource,
460}
461
462impl WorkerExtraInfo {
463    fn update_ttl(&mut self, ttl: Duration) {
464        let expire = cmp::max(
465            self.expire_at.unwrap_or_default(),
466            SystemTime::now()
467                .add(ttl)
468                .duration_since(SystemTime::UNIX_EPOCH)
469                .expect("Clock may have gone backwards")
470                .as_secs(),
471        );
472        self.expire_at = Some(expire);
473    }
474
475    fn update_started_at(&mut self) {
476        self.started_at = Some(timestamp_now_sec());
477    }
478}
479
480fn timestamp_now_sec() -> u64 {
481    SystemTime::now()
482        .duration_since(SystemTime::UNIX_EPOCH)
483        .expect("Clock may have gone backwards")
484        .as_secs()
485}
486
487fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
488    PbWorkerNode {
489        id: META_NODE_ID,
490        r#type: PbWorkerType::Meta.into(),
491        host: HostAddr::try_from(host)
492            .as_ref()
493            .map(HostAddr::to_protobuf)
494            .ok(),
495        state: PbState::Running as _,
496        property: None,
497        transactional_id: None,
498        resource: Some(risingwave_pb::common::worker_node::Resource {
499            rw_version: RW_VERSION.to_owned(),
500            total_memory_bytes: system_memory_available_bytes() as _,
501            total_cpu_cores: total_cpu_available() as _,
502            hostname: hostname(),
503        }),
504        started_at,
505    }
506}
507
508pub struct ClusterControllerInner {
509    db: DatabaseConnection,
510    /// Record for tracking available machine ids, one is available.
511    available_transactional_ids: VecDeque<TransactionId>,
512    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
513    disable_automatic_parallelism_control: bool,
514}
515
516impl ClusterControllerInner {
517    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
518    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
519
520    pub async fn new(
521        db: DatabaseConnection,
522        disable_automatic_parallelism_control: bool,
523    ) -> MetaResult<Self> {
524        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
525            .select_only()
526            .column(worker::Column::WorkerId)
527            .column(worker::Column::TransactionId)
528            .into_tuple()
529            .all(&db)
530            .await?;
531        let inuse_txn_ids: HashSet<_> = workers
532            .iter()
533            .cloned()
534            .filter_map(|(_, txn_id)| txn_id)
535            .collect();
536        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
537            .filter(|id| !inuse_txn_ids.contains(id))
538            .collect();
539
540        let worker_extra_info = workers
541            .into_iter()
542            .map(|(w, _)| (w, WorkerExtraInfo::default()))
543            .collect();
544
545        Ok(Self {
546            db,
547            available_transactional_ids,
548            worker_extra_info,
549            disable_automatic_parallelism_control,
550        })
551    }
552
553    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
554        let workers: Vec<(WorkerType, i64)> = Worker::find()
555            .select_only()
556            .column(worker::Column::WorkerType)
557            .column_as(worker::Column::WorkerId.count(), "count")
558            .group_by(worker::Column::WorkerType)
559            .into_tuple()
560            .all(&self.db)
561            .await?;
562
563        Ok(workers.into_iter().collect())
564    }
565
566    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
567        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
568            let expire = cmp::max(
569                info.expire_at.unwrap_or_default(),
570                SystemTime::now()
571                    .add(ttl)
572                    .duration_since(SystemTime::UNIX_EPOCH)
573                    .expect("Clock may have gone backwards")
574                    .as_secs(),
575            );
576            info.expire_at = Some(expire);
577            Ok(())
578        } else {
579            Err(MetaError::invalid_worker(worker_id, "worker not found"))
580        }
581    }
582
583    fn update_resource_and_started_at(
584        &mut self,
585        worker_id: WorkerId,
586        resource: PbResource,
587    ) -> MetaResult<()> {
588        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
589            info.resource = resource;
590            info.update_started_at();
591            Ok(())
592        } else {
593            Err(MetaError::invalid_worker(worker_id, "worker not found"))
594        }
595    }
596
597    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
598        self.worker_extra_info
599            .get(&worker_id)
600            .cloned()
601            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
602    }
603
604    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
605        match (self.available_transactional_ids.front(), r#type) {
606            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
607            // We only assign transactional id to compute node and frontend.
608            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
609            _ => Ok(None),
610        }
611    }
612
613    /// Get the total resource of the cluster.
614    fn cluster_resource(&self) -> ClusterResource {
615        // For each hostname, we only consider the maximum resource, in case a host has multiple nodes.
616        let mut per_host = HashMap::new();
617
618        for info in self.worker_extra_info.values() {
619            let r = per_host
620                .entry(info.resource.hostname.as_str())
621                .or_insert_with(ClusterResource::default);
622
623            r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
624            r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
625        }
626
627        // For different hostnames, we sum up the resources.
628        per_host
629            .into_values()
630            .reduce(|a, b| ClusterResource {
631                total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
632                total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
633            })
634            .unwrap_or_default()
635    }
636
637    pub async fn add_worker(
638        &mut self,
639        r#type: PbWorkerType,
640        host_address: HostAddress,
641        add_property: AddNodeProperty,
642        resource: PbResource,
643        ttl: Duration,
644    ) -> MetaResult<WorkerId> {
645        let txn = self.db.begin().await?;
646
647        let worker = Worker::find()
648            .filter(
649                worker::Column::Host
650                    .eq(host_address.host.clone())
651                    .and(worker::Column::Port.eq(host_address.port)),
652            )
653            .find_also_related(WorkerProperty)
654            .one(&txn)
655            .await?;
656        // Worker already exist.
657        if let Some((worker, property)) = worker {
658            assert_eq!(worker.worker_type, r#type.into());
659            return if worker.worker_type == WorkerType::ComputeNode {
660                let property = property.unwrap();
661                let mut current_parallelism = property.parallelism as usize;
662                let new_parallelism = add_property.parallelism as usize;
663                match new_parallelism.cmp(&current_parallelism) {
664                    Ordering::Less => {
665                        if !self.disable_automatic_parallelism_control {
666                            // Handing over to the subsequent recovery loop for a forced reschedule.
667                            tracing::info!(
668                                "worker {} parallelism reduced from {} to {}",
669                                worker.worker_id,
670                                current_parallelism,
671                                new_parallelism
672                            );
673                            current_parallelism = new_parallelism;
674                        } else {
675                            // Warn and keep the original parallelism if the worker registered with a
676                            // smaller parallelism.
677                            tracing::warn!(
678                                "worker {} parallelism is less than current, current is {}, but received {}",
679                                worker.worker_id,
680                                current_parallelism,
681                                new_parallelism
682                            );
683                        }
684                    }
685                    Ordering::Greater => {
686                        tracing::info!(
687                            "worker {} parallelism updated from {} to {}",
688                            worker.worker_id,
689                            current_parallelism,
690                            new_parallelism
691                        );
692                        current_parallelism = new_parallelism;
693                    }
694                    Ordering::Equal => {}
695                }
696                let mut property: worker_property::ActiveModel = property.into();
697
698                // keep `is_unschedulable` unchanged.
699                property.is_streaming = Set(add_property.is_streaming);
700                property.is_serving = Set(add_property.is_serving);
701                property.parallelism = Set(current_parallelism as _);
702                property.resource_group =
703                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
704                        tracing::warn!(
705                            "resource_group is not set for worker {}, fallback to `default`",
706                            worker.worker_id
707                        );
708                        DEFAULT_RESOURCE_GROUP.to_owned()
709                    })));
710
711                WorkerProperty::update(property).exec(&txn).await?;
712                txn.commit().await?;
713                self.update_worker_ttl(worker.worker_id, ttl)?;
714                self.update_resource_and_started_at(worker.worker_id, resource)?;
715                Ok(worker.worker_id)
716            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
717                let worker_property = worker_property::ActiveModel {
718                    worker_id: Set(worker.worker_id),
719                    parallelism: Set(add_property
720                        .parallelism
721                        .try_into()
722                        .expect("invalid parallelism")),
723                    is_streaming: Set(add_property.is_streaming),
724                    is_serving: Set(add_property.is_serving),
725                    is_unschedulable: Set(add_property.is_unschedulable),
726                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
727                    resource_group: Set(None),
728                };
729                WorkerProperty::insert(worker_property).exec(&txn).await?;
730                txn.commit().await?;
731                self.update_worker_ttl(worker.worker_id, ttl)?;
732                self.update_resource_and_started_at(worker.worker_id, resource)?;
733                Ok(worker.worker_id)
734            } else {
735                self.update_worker_ttl(worker.worker_id, ttl)?;
736                self.update_resource_and_started_at(worker.worker_id, resource)?;
737                Ok(worker.worker_id)
738            };
739        }
740        let txn_id = self.apply_transaction_id(r#type)?;
741
742        let worker = worker::ActiveModel {
743            worker_id: Default::default(),
744            worker_type: Set(r#type.into()),
745            host: Set(host_address.host.clone()),
746            port: Set(host_address.port),
747            status: Set(WorkerStatus::Starting),
748            transaction_id: Set(txn_id),
749        };
750        let insert_res = Worker::insert(worker).exec(&txn).await?;
751        let worker_id = insert_res.last_insert_id as WorkerId;
752        if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend {
753            let property = worker_property::ActiveModel {
754                worker_id: Set(worker_id),
755                parallelism: Set(add_property
756                    .parallelism
757                    .try_into()
758                    .expect("invalid parallelism")),
759                is_streaming: Set(add_property.is_streaming),
760                is_serving: Set(add_property.is_serving),
761                is_unschedulable: Set(add_property.is_unschedulable),
762                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
763                resource_group: if r#type == PbWorkerType::ComputeNode {
764                    Set(add_property.resource_group.clone())
765                } else {
766                    Set(None)
767                },
768            };
769            WorkerProperty::insert(property).exec(&txn).await?;
770        }
771
772        txn.commit().await?;
773        if let Some(txn_id) = txn_id {
774            self.available_transactional_ids.retain(|id| *id != txn_id);
775        }
776        let extra_info = WorkerExtraInfo {
777            started_at: Some(timestamp_now_sec()),
778            expire_at: None,
779            resource,
780        };
781        self.worker_extra_info.insert(worker_id, extra_info);
782
783        Ok(worker_id)
784    }
785
786    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
787        let worker = worker::ActiveModel {
788            worker_id: Set(worker_id),
789            status: Set(WorkerStatus::Running),
790            ..Default::default()
791        };
792
793        let worker = worker.update(&self.db).await?;
794        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
795            .one(&self.db)
796            .await?;
797        let extra_info = self.get_extra_info_checked(worker_id)?;
798        Ok(WorkerInfo(worker, worker_property, extra_info).into())
799    }
800
801    pub async fn update_schedulability(
802        &self,
803        worker_ids: Vec<WorkerId>,
804        schedulability: Schedulability,
805    ) -> MetaResult<()> {
806        let is_unschedulable = schedulability == Schedulability::Unschedulable;
807        WorkerProperty::update_many()
808            .col_expr(
809                worker_property::Column::IsUnschedulable,
810                Expr::value(is_unschedulable),
811            )
812            .filter(worker_property::Column::WorkerId.is_in(worker_ids))
813            .exec(&self.db)
814            .await?;
815
816        Ok(())
817    }
818
819    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
820        let worker = Worker::find()
821            .filter(
822                worker::Column::Host
823                    .eq(host_addr.host)
824                    .and(worker::Column::Port.eq(host_addr.port)),
825            )
826            .find_also_related(WorkerProperty)
827            .one(&self.db)
828            .await?;
829        let Some((worker, property)) = worker else {
830            return Err(MetaError::invalid_parameter("worker not found!"));
831        };
832
833        let res = Worker::delete_by_id(worker.worker_id)
834            .exec(&self.db)
835            .await?;
836        if res.rows_affected == 0 {
837            return Err(MetaError::invalid_parameter("worker not found!"));
838        }
839
840        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
841        if let Some(txn_id) = &worker.transaction_id {
842            self.available_transactional_ids.push_back(*txn_id);
843        }
844        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
845
846        Ok(worker)
847    }
848
849    pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
850        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
851            worker_info.update_ttl(ttl);
852            Ok(())
853        } else {
854            Err(MetaError::invalid_worker(worker_id, "worker not found"))
855        }
856    }
857
858    pub async fn list_workers(
859        &self,
860        worker_type: Option<WorkerType>,
861        worker_status: Option<WorkerStatus>,
862    ) -> MetaResult<Vec<PbWorkerNode>> {
863        let mut find = Worker::find();
864        if let Some(worker_type) = worker_type {
865            find = find.filter(worker::Column::WorkerType.eq(worker_type));
866        }
867        if let Some(worker_status) = worker_status {
868            find = find.filter(worker::Column::Status.eq(worker_status));
869        }
870        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
871        Ok(workers
872            .into_iter()
873            .map(|(worker, property)| {
874                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
875                WorkerInfo(worker, property, extra_info).into()
876            })
877            .collect_vec())
878    }
879
880    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
881        let workers = Worker::find()
882            .filter(
883                worker::Column::WorkerType
884                    .eq(WorkerType::ComputeNode)
885                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
886            )
887            .inner_join(WorkerProperty)
888            .select_also(WorkerProperty)
889            .filter(worker_property::Column::IsStreaming.eq(true))
890            .all(&self.db)
891            .await?;
892
893        Ok(workers
894            .into_iter()
895            .map(|(worker, property)| {
896                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
897                WorkerInfo(worker, property, extra_info).into()
898            })
899            .collect_vec())
900    }
901
902    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
903        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
904            .select_only()
905            .column(worker_property::Column::WorkerId)
906            .column(worker_property::Column::Parallelism)
907            .inner_join(Worker)
908            .filter(worker::Column::Status.eq(WorkerStatus::Running))
909            .into_tuple()
910            .all(&self.db)
911            .await?;
912        Ok(worker_parallelisms
913            .into_iter()
914            .flat_map(|(worker_id, parallelism)| {
915                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id as u32, idx as usize))
916            })
917            .collect_vec())
918    }
919
920    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
921        let workers = Worker::find()
922            .filter(
923                worker::Column::WorkerType
924                    .eq(WorkerType::ComputeNode)
925                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
926            )
927            .inner_join(WorkerProperty)
928            .select_also(WorkerProperty)
929            .filter(worker_property::Column::IsServing.eq(true))
930            .all(&self.db)
931            .await?;
932
933        Ok(workers
934            .into_iter()
935            .map(|(worker, property)| {
936                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
937                WorkerInfo(worker, property, extra_info).into()
938            })
939            .collect_vec())
940    }
941
942    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
943        let mut streaming_workers = self.list_active_streaming_workers().await?;
944
945        let unschedulable_workers: HashSet<_> = streaming_workers
946            .extract_if(.., |worker| {
947                worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
948            })
949            .map(|w| w.id)
950            .collect();
951
952        let schedulable_workers = streaming_workers
953            .iter()
954            .map(|worker| worker.id)
955            .filter(|id| !unschedulable_workers.contains(id))
956            .collect();
957
958        let active_workers: HashMap<_, _> =
959            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
960
961        Ok(StreamingClusterInfo {
962            worker_nodes: active_workers,
963            schedulable_workers,
964            unschedulable_workers,
965        })
966    }
967
968    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
969        let worker = Worker::find_by_id(worker_id)
970            .find_also_related(WorkerProperty)
971            .one(&self.db)
972            .await?;
973        if worker.is_none() {
974            return Ok(None);
975        }
976        let extra_info = self.get_extra_info_checked(worker_id)?;
977        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
978    }
979
980    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
981        self.worker_extra_info.get(&worker_id).cloned()
982    }
983}
984
985#[cfg(test)]
986mod tests {
987    use super::*;
988
989    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
990        (0..count)
991            .map(|i| HostAddress {
992                host: "localhost".to_owned(),
993                port: 5000 + i as i32,
994            })
995            .collect_vec()
996    }
997
998    #[tokio::test]
999    async fn test_cluster_controller() -> MetaResult<()> {
1000        let env = MetaSrvEnv::for_test().await;
1001        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1002
1003        let parallelism_num = 4_usize;
1004        let worker_count = 5_usize;
1005        let property = AddNodeProperty {
1006            parallelism: parallelism_num as _,
1007            is_streaming: true,
1008            is_serving: true,
1009            is_unschedulable: false,
1010            ..Default::default()
1011        };
1012        let hosts = mock_worker_hosts_for_test(worker_count);
1013        let mut worker_ids = vec![];
1014        for host in &hosts {
1015            worker_ids.push(
1016                cluster_ctl
1017                    .add_worker(
1018                        PbWorkerType::ComputeNode,
1019                        host.clone(),
1020                        property.clone(),
1021                        PbResource::default(),
1022                    )
1023                    .await?,
1024            );
1025        }
1026
1027        // Since no worker is active, the parallelism should be 0.
1028        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1029
1030        for id in &worker_ids {
1031            cluster_ctl.activate_worker(*id).await?;
1032        }
1033        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1034        assert_eq!(
1035            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1036            worker_count
1037        );
1038        assert_eq!(
1039            cluster_ctl.list_active_streaming_workers().await?.len(),
1040            worker_count
1041        );
1042        assert_eq!(
1043            cluster_ctl.list_active_serving_workers().await?.len(),
1044            worker_count
1045        );
1046        assert_eq!(
1047            cluster_ctl.list_active_worker_slots().await?.len(),
1048            parallelism_num * worker_count
1049        );
1050
1051        // re-register existing worker node with larger parallelism and change its serving mode.
1052        let mut new_property = property.clone();
1053        new_property.parallelism = (parallelism_num * 2) as _;
1054        new_property.is_serving = false;
1055        cluster_ctl
1056            .add_worker(
1057                PbWorkerType::ComputeNode,
1058                hosts[0].clone(),
1059                new_property,
1060                PbResource::default(),
1061            )
1062            .await?;
1063
1064        assert_eq!(
1065            cluster_ctl.list_active_streaming_workers().await?.len(),
1066            worker_count
1067        );
1068        assert_eq!(
1069            cluster_ctl.list_active_serving_workers().await?.len(),
1070            worker_count - 1
1071        );
1072        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1073        assert!(worker_slots.iter().all_unique());
1074        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1075
1076        // delete workers.
1077        for host in hosts {
1078            cluster_ctl.delete_worker(host).await?;
1079        }
1080        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1081        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1082        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1083
1084        Ok(())
1085    }
1086
1087    #[tokio::test]
1088    async fn test_update_schedulability() -> MetaResult<()> {
1089        let env = MetaSrvEnv::for_test().await;
1090        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1091
1092        let host = HostAddress {
1093            host: "localhost".to_owned(),
1094            port: 5001,
1095        };
1096        let mut property = AddNodeProperty {
1097            is_streaming: true,
1098            is_serving: true,
1099            is_unschedulable: false,
1100            parallelism: 4,
1101            ..Default::default()
1102        };
1103        let worker_id = cluster_ctl
1104            .add_worker(
1105                PbWorkerType::ComputeNode,
1106                host.clone(),
1107                property.clone(),
1108                PbResource::default(),
1109            )
1110            .await?;
1111
1112        cluster_ctl.activate_worker(worker_id).await?;
1113        cluster_ctl
1114            .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1115            .await?;
1116
1117        let workers = cluster_ctl.list_active_streaming_workers().await?;
1118        assert_eq!(workers.len(), 1);
1119        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1120
1121        // re-register existing worker node and change its serving mode, the schedulable state should not be changed.
1122        property.is_unschedulable = false;
1123        property.is_serving = false;
1124        let new_worker_id = cluster_ctl
1125            .add_worker(
1126                PbWorkerType::ComputeNode,
1127                host.clone(),
1128                property,
1129                PbResource::default(),
1130            )
1131            .await?;
1132        assert_eq!(worker_id, new_worker_id);
1133
1134        let workers = cluster_ctl.list_active_streaming_workers().await?;
1135        assert_eq!(workers.len(), 1);
1136        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1137
1138        cluster_ctl.delete_worker(host).await?;
1139
1140        Ok(())
1141    }
1142}