risingwave_meta/controller/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::cmp::Ordering;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::ops::Add;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21
22use itertools::Itertools;
23use risingwave_common::RW_VERSION;
24use risingwave_common::hash::WorkerSlotId;
25use risingwave_common::util::addr::HostAddr;
26use risingwave_common::util::resource_util::cpu::total_cpu_available;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode};
37use risingwave_pb::meta::subscribe_response::{Info, Operation};
38use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
39use sea_orm::ActiveValue::Set;
40use sea_orm::prelude::Expr;
41use sea_orm::{
42    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43    TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59    env: MetaSrvEnv,
60    max_heartbeat_interval: Duration,
61    inner: RwLock<ClusterControllerInner>,
62    /// Used as timestamp when meta node starts in sec.
63    started_at: u64,
64}
65
66struct WorkerInfo(
67    worker::Model,
68    Option<worker_property::Model>,
69    WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73    fn from(info: WorkerInfo) -> Self {
74        Self {
75            id: info.0.worker_id as _,
76            r#type: PbWorkerType::from(info.0.worker_type) as _,
77            host: Some(PbHostAddress {
78                host: info.0.host,
79                port: info.0.port,
80            }),
81            state: PbState::from(info.0.status) as _,
82            property: info.1.as_ref().map(|p| PbProperty {
83                is_streaming: p.is_streaming,
84                is_serving: p.is_serving,
85                is_unschedulable: p.is_unschedulable,
86                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
87                resource_group: p.resource_group.clone(),
88                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
89            }),
90            transactional_id: info.0.transaction_id.map(|id| id as _),
91            resource: Some(info.2.resource),
92            started_at: info.2.started_at,
93        }
94    }
95}
96
97impl ClusterController {
98    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99        let inner = ClusterControllerInner::new(
100            env.meta_store_ref().conn.clone(),
101            env.opts.disable_automatic_parallelism_control,
102        )
103        .await?;
104        Ok(Self {
105            env,
106            max_heartbeat_interval,
107            inner: RwLock::new(inner),
108            started_at: timestamp_now_sec(),
109        })
110    }
111
112    /// Used in `NotificationService::subscribe`.
113    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
114    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115        self.inner.read().await
116    }
117
118    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119        self.inner.read().await.count_worker_by_type().await
120    }
121
122    pub async fn compute_node_total_cpu_count(&self) -> usize {
123        self.inner.read().await.compute_node_total_cpu_count()
124    }
125
126    async fn update_compute_node_total_cpu_count(&self) -> MetaResult<()> {
127        let total_cpu_cores = self.compute_node_total_cpu_count().await;
128
129        // Update local license manager.
130        LicenseManager::get().update_cpu_core_count(total_cpu_cores);
131        // Notify all other nodes.
132        self.env.notification_manager().notify_all_without_version(
133            Operation::Update, // unused
134            Info::ComputeNodeTotalCpuCount(total_cpu_cores as _),
135        );
136
137        Ok(())
138    }
139
140    /// A worker node will immediately register itself to meta when it bootstraps.
141    /// The meta will assign it with a unique ID and set its state as `Starting`.
142    /// When the worker node is fully ready to serve, it will request meta again
143    /// (via `activate_worker_node`) to set its state to `Running`.
144    pub async fn add_worker(
145        &self,
146        r#type: PbWorkerType,
147        host_address: HostAddress,
148        property: AddNodeProperty,
149        resource: PbResource,
150    ) -> MetaResult<WorkerId> {
151        let worker_id = self
152            .inner
153            .write()
154            .await
155            .add_worker(
156                r#type,
157                host_address,
158                property,
159                resource,
160                self.max_heartbeat_interval,
161            )
162            .await?;
163
164        if r#type == PbWorkerType::ComputeNode {
165            self.update_compute_node_total_cpu_count().await?;
166        }
167
168        Ok(worker_id)
169    }
170
171    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
172        let inner = self.inner.write().await;
173        let worker = inner.activate_worker(worker_id).await?;
174
175        // Notify frontends of new compute node and frontend node.
176        // Always notify because a running worker's property may have been changed.
177        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
178        {
179            self.env
180                .notification_manager()
181                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
182                .await;
183        }
184        self.env
185            .notification_manager()
186            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker))
187            .await;
188
189        Ok(())
190    }
191
192    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
193        let worker = self.inner.write().await.delete_worker(host_address).await?;
194
195        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
196        {
197            self.env
198                .notification_manager()
199                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
200                .await;
201            if worker.r#type() == PbWorkerType::ComputeNode {
202                self.update_compute_node_total_cpu_count().await?;
203            }
204        }
205
206        // Notify local subscribers.
207        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
208        // local notification.
209        self.env
210            .notification_manager()
211            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()))
212            .await;
213
214        Ok(worker)
215    }
216
217    pub async fn update_schedulability(
218        &self,
219        worker_ids: Vec<WorkerId>,
220        schedulability: Schedulability,
221    ) -> MetaResult<()> {
222        self.inner
223            .write()
224            .await
225            .update_schedulability(worker_ids, schedulability)
226            .await
227    }
228
229    /// Invoked when it receives a heartbeat from a worker node.
230    pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
231        tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
232        self.inner
233            .write()
234            .await
235            .heartbeat(worker_id, self.max_heartbeat_interval)
236    }
237
238    pub fn start_heartbeat_checker(
239        cluster_controller: ClusterControllerRef,
240        check_interval: Duration,
241    ) -> (JoinHandle<()>, Sender<()>) {
242        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
243        let join_handle = tokio::spawn(async move {
244            let mut min_interval = tokio::time::interval(check_interval);
245            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
246            loop {
247                tokio::select! {
248                    // Wait for interval
249                    _ = min_interval.tick() => {},
250                    // Shutdown
251                    _ = &mut shutdown_rx => {
252                        tracing::info!("Heartbeat checker is stopped");
253                        return;
254                    }
255                }
256
257                let mut inner = cluster_controller.inner.write().await;
258                // 1. Initialize new workers' TTL.
259                for worker in inner
260                    .worker_extra_info
261                    .values_mut()
262                    .filter(|worker| worker.expire_at.is_none())
263                {
264                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
265                }
266
267                // 2. Collect expired workers.
268                let now = timestamp_now_sec();
269                let worker_to_delete = inner
270                    .worker_extra_info
271                    .iter()
272                    .filter(|(_, info)| info.expire_at.unwrap() < now)
273                    .map(|(id, _)| *id)
274                    .collect_vec();
275
276                // 3. Delete expired workers.
277                let worker_infos = match Worker::find()
278                    .select_only()
279                    .column(worker::Column::WorkerId)
280                    .column(worker::Column::WorkerType)
281                    .column(worker::Column::Host)
282                    .column(worker::Column::Port)
283                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
284                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
285                    .all(&inner.db)
286                    .await
287                {
288                    Ok(keys) => keys,
289                    Err(err) => {
290                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
291                        continue;
292                    }
293                };
294                drop(inner);
295
296                for (worker_id, worker_type, host, port) in worker_infos {
297                    let host_addr = PbHostAddress { host, port };
298                    match cluster_controller.delete_worker(host_addr.clone()).await {
299                        Ok(_) => {
300                            tracing::warn!(
301                                worker_id,
302                                ?host_addr,
303                                %now,
304                                "Deleted expired worker"
305                            );
306                            match worker_type {
307                                WorkerType::Frontend
308                                | WorkerType::ComputeNode
309                                | WorkerType::Compactor
310                                | WorkerType::RiseCtl => {
311                                    cluster_controller
312                                        .env
313                                        .notification_manager()
314                                        .delete_sender(worker_type.into(), WorkerKey(host_addr))
315                                        .await
316                                }
317                                _ => {}
318                            };
319                        }
320                        Err(err) => {
321                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
322                        }
323                    }
324                }
325            }
326        });
327
328        (join_handle, shutdown_tx)
329    }
330
331    /// Get live nodes with the specified type and state.
332    /// # Arguments
333    /// * `worker_type` `WorkerType` of the nodes
334    /// * `worker_state` Filter by this state if it is not None.
335    pub async fn list_workers(
336        &self,
337        worker_type: Option<WorkerType>,
338        worker_status: Option<WorkerStatus>,
339    ) -> MetaResult<Vec<PbWorkerNode>> {
340        let mut workers = vec![];
341        // fill meta info.
342        if worker_type.is_none() {
343            workers.push(meta_node_info(
344                &self.env.opts.advertise_addr,
345                Some(self.started_at),
346            ));
347        }
348        workers.extend(
349            self.inner
350                .read()
351                .await
352                .list_workers(worker_type, worker_status)
353                .await?,
354        );
355        Ok(workers)
356    }
357
358    pub(crate) async fn subscribe_active_streaming_compute_nodes(
359        &self,
360    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
361        let inner = self.inner.read().await;
362        let worker_nodes = inner.list_active_streaming_workers().await?;
363        let (tx, rx) = unbounded_channel();
364
365        // insert before release the read lock to ensure that we don't lose any update in between
366        self.env
367            .notification_manager()
368            .insert_local_sender(tx)
369            .await;
370        drop(inner);
371        Ok((worker_nodes, rx))
372    }
373
374    /// A convenient method to get all running compute nodes that may have running actors on them
375    /// i.e. CNs which are running
376    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
377        self.inner
378            .read()
379            .await
380            .list_active_streaming_workers()
381            .await
382    }
383
384    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
385        self.inner.read().await.list_active_worker_slots().await
386    }
387
388    /// Get the cluster info used for scheduling a streaming job, containing all nodes that are
389    /// running and schedulable
390    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
391        self.inner.read().await.list_active_serving_workers().await
392    }
393
394    /// Get the cluster info used for scheduling a streaming job.
395    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
396        self.inner.read().await.get_streaming_cluster_info().await
397    }
398
399    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
400        self.inner.read().await.get_worker_by_id(worker_id).await
401    }
402
403    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
404        self.inner
405            .read()
406            .await
407            .get_worker_extra_info_by_id(worker_id)
408    }
409
410    pub fn cluster_id(&self) -> &ClusterId {
411        self.env.cluster_id()
412    }
413
414    pub fn meta_store_endpoint(&self) -> String {
415        self.env.meta_store_ref().endpoint.clone()
416    }
417}
418
419/// The cluster info used for scheduling a streaming job.
420#[derive(Debug, Clone)]
421pub struct StreamingClusterInfo {
422    /// All **active** compute nodes in the cluster.
423    pub worker_nodes: HashMap<u32, WorkerNode>,
424
425    /// All schedulable compute nodes in the cluster. Normally for resource group based scheduling.
426    pub schedulable_workers: HashSet<u32>,
427
428    /// All unschedulable compute nodes in the cluster.
429    pub unschedulable_workers: HashSet<u32>,
430}
431
432// Encapsulating the use of parallelism
433impl StreamingClusterInfo {
434    pub fn parallelism(&self, resource_group: &str) -> usize {
435        let available_worker_ids =
436            filter_workers_by_resource_group(&self.worker_nodes, resource_group);
437
438        self.worker_nodes
439            .values()
440            .filter(|worker| available_worker_ids.contains(&(worker.id as WorkerId)))
441            .map(|worker| worker.compute_node_parallelism())
442            .sum()
443    }
444
445    pub fn filter_schedulable_workers_by_resource_group(
446        &self,
447        resource_group: &str,
448    ) -> HashMap<u32, WorkerNode> {
449        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
450        self.worker_nodes
451            .iter()
452            .filter(|(id, _)| worker_ids.contains(&(**id as WorkerId)))
453            .map(|(id, worker)| (*id, worker.clone()))
454            .collect()
455    }
456}
457
458#[derive(Default, Clone)]
459pub struct WorkerExtraInfo {
460    // Volatile values updated by meta node as follows.
461    //
462    // Unix timestamp that the worker will expire at.
463    expire_at: Option<u64>,
464    started_at: Option<u64>,
465    resource: PbResource,
466    r#type: PbWorkerType,
467}
468
469impl WorkerExtraInfo {
470    fn update_ttl(&mut self, ttl: Duration) {
471        let expire = cmp::max(
472            self.expire_at.unwrap_or_default(),
473            SystemTime::now()
474                .add(ttl)
475                .duration_since(SystemTime::UNIX_EPOCH)
476                .expect("Clock may have gone backwards")
477                .as_secs(),
478        );
479        self.expire_at = Some(expire);
480    }
481
482    fn update_started_at(&mut self) {
483        self.started_at = Some(timestamp_now_sec());
484    }
485}
486
487fn timestamp_now_sec() -> u64 {
488    SystemTime::now()
489        .duration_since(SystemTime::UNIX_EPOCH)
490        .expect("Clock may have gone backwards")
491        .as_secs()
492}
493
494fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
495    PbWorkerNode {
496        id: META_NODE_ID,
497        r#type: PbWorkerType::Meta.into(),
498        host: HostAddr::try_from(host)
499            .as_ref()
500            .map(HostAddr::to_protobuf)
501            .ok(),
502        state: PbState::Running as _,
503        property: None,
504        transactional_id: None,
505        resource: Some(risingwave_pb::common::worker_node::Resource {
506            rw_version: RW_VERSION.to_owned(),
507            total_memory_bytes: system_memory_available_bytes() as _,
508            total_cpu_cores: total_cpu_available() as _,
509        }),
510        started_at,
511    }
512}
513
514pub struct ClusterControllerInner {
515    db: DatabaseConnection,
516    /// Record for tracking available machine ids, one is available.
517    available_transactional_ids: VecDeque<TransactionId>,
518    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
519    disable_automatic_parallelism_control: bool,
520}
521
522impl ClusterControllerInner {
523    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
524    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
525
526    pub async fn new(
527        db: DatabaseConnection,
528        disable_automatic_parallelism_control: bool,
529    ) -> MetaResult<Self> {
530        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
531            .select_only()
532            .column(worker::Column::WorkerId)
533            .column(worker::Column::TransactionId)
534            .into_tuple()
535            .all(&db)
536            .await?;
537        let inuse_txn_ids: HashSet<_> = workers
538            .iter()
539            .cloned()
540            .filter_map(|(_, txn_id)| txn_id)
541            .collect();
542        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
543            .filter(|id| !inuse_txn_ids.contains(id))
544            .collect();
545
546        let worker_extra_info = workers
547            .into_iter()
548            .map(|(w, _)| (w, WorkerExtraInfo::default()))
549            .collect();
550
551        Ok(Self {
552            db,
553            available_transactional_ids,
554            worker_extra_info,
555            disable_automatic_parallelism_control,
556        })
557    }
558
559    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
560        let workers: Vec<(WorkerType, i64)> = Worker::find()
561            .select_only()
562            .column(worker::Column::WorkerType)
563            .column_as(worker::Column::WorkerId.count(), "count")
564            .group_by(worker::Column::WorkerType)
565            .into_tuple()
566            .all(&self.db)
567            .await?;
568
569        Ok(workers.into_iter().collect())
570    }
571
572    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
573        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
574            let expire = cmp::max(
575                info.expire_at.unwrap_or_default(),
576                SystemTime::now()
577                    .add(ttl)
578                    .duration_since(SystemTime::UNIX_EPOCH)
579                    .expect("Clock may have gone backwards")
580                    .as_secs(),
581            );
582            info.expire_at = Some(expire);
583            Ok(())
584        } else {
585            Err(MetaError::invalid_worker(worker_id, "worker not found"))
586        }
587    }
588
589    fn update_resource_and_started_at(
590        &mut self,
591        worker_id: WorkerId,
592        resource: PbResource,
593    ) -> MetaResult<()> {
594        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
595            info.resource = resource;
596            info.update_started_at();
597            Ok(())
598        } else {
599            Err(MetaError::invalid_worker(worker_id, "worker not found"))
600        }
601    }
602
603    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
604        self.worker_extra_info
605            .get(&worker_id)
606            .cloned()
607            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
608    }
609
610    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
611        match (self.available_transactional_ids.front(), r#type) {
612            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
613            // We only assign transactional id to compute node and frontend.
614            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
615            _ => Ok(None),
616        }
617    }
618
619    fn compute_node_total_cpu_count(&self) -> usize {
620        self.worker_extra_info
621            .values()
622            .filter(|info| info.r#type == PbWorkerType::ComputeNode)
623            .map(|info| info.resource.total_cpu_cores as usize)
624            .sum()
625    }
626
627    pub async fn add_worker(
628        &mut self,
629        r#type: PbWorkerType,
630        host_address: HostAddress,
631        add_property: AddNodeProperty,
632        resource: PbResource,
633        ttl: Duration,
634    ) -> MetaResult<WorkerId> {
635        let txn = self.db.begin().await?;
636
637        let worker = Worker::find()
638            .filter(
639                worker::Column::Host
640                    .eq(host_address.host.clone())
641                    .and(worker::Column::Port.eq(host_address.port)),
642            )
643            .find_also_related(WorkerProperty)
644            .one(&txn)
645            .await?;
646        // Worker already exist.
647        if let Some((worker, property)) = worker {
648            assert_eq!(worker.worker_type, r#type.into());
649            return if worker.worker_type == WorkerType::ComputeNode {
650                let property = property.unwrap();
651                let mut current_parallelism = property.parallelism as usize;
652                let new_parallelism = add_property.parallelism as usize;
653                match new_parallelism.cmp(&current_parallelism) {
654                    Ordering::Less => {
655                        if !self.disable_automatic_parallelism_control {
656                            // Handing over to the subsequent recovery loop for a forced reschedule.
657                            tracing::info!(
658                                "worker {} parallelism reduced from {} to {}",
659                                worker.worker_id,
660                                current_parallelism,
661                                new_parallelism
662                            );
663                            current_parallelism = new_parallelism;
664                        } else {
665                            // Warn and keep the original parallelism if the worker registered with a
666                            // smaller parallelism.
667                            tracing::warn!(
668                                "worker {} parallelism is less than current, current is {}, but received {}",
669                                worker.worker_id,
670                                current_parallelism,
671                                new_parallelism
672                            );
673                        }
674                    }
675                    Ordering::Greater => {
676                        tracing::info!(
677                            "worker {} parallelism updated from {} to {}",
678                            worker.worker_id,
679                            current_parallelism,
680                            new_parallelism
681                        );
682                        current_parallelism = new_parallelism;
683                    }
684                    Ordering::Equal => {}
685                }
686                let mut property: worker_property::ActiveModel = property.into();
687
688                // keep `is_unschedulable` unchanged.
689                property.is_streaming = Set(add_property.is_streaming);
690                property.is_serving = Set(add_property.is_serving);
691                property.parallelism = Set(current_parallelism as _);
692                property.resource_group =
693                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
694                        tracing::warn!(
695                            "resource_group is not set for worker {}, fallback to `default`",
696                            worker.worker_id
697                        );
698                        DEFAULT_RESOURCE_GROUP.to_owned()
699                    })));
700
701                WorkerProperty::update(property).exec(&txn).await?;
702                txn.commit().await?;
703                self.update_worker_ttl(worker.worker_id, ttl)?;
704                self.update_resource_and_started_at(worker.worker_id, resource)?;
705                Ok(worker.worker_id)
706            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
707                let worker_property = worker_property::ActiveModel {
708                    worker_id: Set(worker.worker_id),
709                    parallelism: Set(add_property
710                        .parallelism
711                        .try_into()
712                        .expect("invalid parallelism")),
713                    is_streaming: Set(add_property.is_streaming),
714                    is_serving: Set(add_property.is_serving),
715                    is_unschedulable: Set(add_property.is_unschedulable),
716                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
717                    resource_group: Set(None),
718                };
719                WorkerProperty::insert(worker_property).exec(&txn).await?;
720                txn.commit().await?;
721                self.update_worker_ttl(worker.worker_id, ttl)?;
722                self.update_resource_and_started_at(worker.worker_id, resource)?;
723                Ok(worker.worker_id)
724            } else {
725                self.update_worker_ttl(worker.worker_id, ttl)?;
726                self.update_resource_and_started_at(worker.worker_id, resource)?;
727                Ok(worker.worker_id)
728            };
729        }
730        let txn_id = self.apply_transaction_id(r#type)?;
731
732        let worker = worker::ActiveModel {
733            worker_id: Default::default(),
734            worker_type: Set(r#type.into()),
735            host: Set(host_address.host),
736            port: Set(host_address.port),
737            status: Set(WorkerStatus::Starting),
738            transaction_id: Set(txn_id),
739        };
740        let insert_res = Worker::insert(worker).exec(&txn).await?;
741        let worker_id = insert_res.last_insert_id as WorkerId;
742        if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend {
743            let property = worker_property::ActiveModel {
744                worker_id: Set(worker_id),
745                parallelism: Set(add_property
746                    .parallelism
747                    .try_into()
748                    .expect("invalid parallelism")),
749                is_streaming: Set(add_property.is_streaming),
750                is_serving: Set(add_property.is_serving),
751                is_unschedulable: Set(add_property.is_unschedulable),
752                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
753                resource_group: if r#type == PbWorkerType::ComputeNode {
754                    Set(add_property.resource_group.clone())
755                } else {
756                    Set(None)
757                },
758            };
759            WorkerProperty::insert(property).exec(&txn).await?;
760        }
761
762        txn.commit().await?;
763        if let Some(txn_id) = txn_id {
764            self.available_transactional_ids.retain(|id| *id != txn_id);
765        }
766        let extra_info = WorkerExtraInfo {
767            started_at: Some(timestamp_now_sec()),
768            expire_at: None,
769            resource,
770            r#type,
771        };
772        self.worker_extra_info.insert(worker_id, extra_info);
773
774        Ok(worker_id)
775    }
776
777    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
778        let worker = worker::ActiveModel {
779            worker_id: Set(worker_id),
780            status: Set(WorkerStatus::Running),
781            ..Default::default()
782        };
783
784        let worker = worker.update(&self.db).await?;
785        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
786            .one(&self.db)
787            .await?;
788        let extra_info = self.get_extra_info_checked(worker_id)?;
789        Ok(WorkerInfo(worker, worker_property, extra_info).into())
790    }
791
792    pub async fn update_schedulability(
793        &self,
794        worker_ids: Vec<WorkerId>,
795        schedulability: Schedulability,
796    ) -> MetaResult<()> {
797        let is_unschedulable = schedulability == Schedulability::Unschedulable;
798        WorkerProperty::update_many()
799            .col_expr(
800                worker_property::Column::IsUnschedulable,
801                Expr::value(is_unschedulable),
802            )
803            .filter(worker_property::Column::WorkerId.is_in(worker_ids))
804            .exec(&self.db)
805            .await?;
806
807        Ok(())
808    }
809
810    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
811        let worker = Worker::find()
812            .filter(
813                worker::Column::Host
814                    .eq(host_addr.host)
815                    .and(worker::Column::Port.eq(host_addr.port)),
816            )
817            .find_also_related(WorkerProperty)
818            .one(&self.db)
819            .await?;
820        let Some((worker, property)) = worker else {
821            return Err(MetaError::invalid_parameter("worker not found!"));
822        };
823
824        let res = Worker::delete_by_id(worker.worker_id)
825            .exec(&self.db)
826            .await?;
827        if res.rows_affected == 0 {
828            return Err(MetaError::invalid_parameter("worker not found!"));
829        }
830
831        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
832        if let Some(txn_id) = &worker.transaction_id {
833            self.available_transactional_ids.push_back(*txn_id);
834        }
835        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
836
837        Ok(worker)
838    }
839
840    pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
841        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
842            worker_info.update_ttl(ttl);
843            Ok(())
844        } else {
845            Err(MetaError::invalid_worker(worker_id, "worker not found"))
846        }
847    }
848
849    pub async fn list_workers(
850        &self,
851        worker_type: Option<WorkerType>,
852        worker_status: Option<WorkerStatus>,
853    ) -> MetaResult<Vec<PbWorkerNode>> {
854        let mut find = Worker::find();
855        if let Some(worker_type) = worker_type {
856            find = find.filter(worker::Column::WorkerType.eq(worker_type));
857        }
858        if let Some(worker_status) = worker_status {
859            find = find.filter(worker::Column::Status.eq(worker_status));
860        }
861        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
862        Ok(workers
863            .into_iter()
864            .map(|(worker, property)| {
865                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
866                WorkerInfo(worker, property, extra_info).into()
867            })
868            .collect_vec())
869    }
870
871    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
872        let workers = Worker::find()
873            .filter(
874                worker::Column::WorkerType
875                    .eq(WorkerType::ComputeNode)
876                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
877            )
878            .inner_join(WorkerProperty)
879            .select_also(WorkerProperty)
880            .filter(worker_property::Column::IsStreaming.eq(true))
881            .all(&self.db)
882            .await?;
883
884        Ok(workers
885            .into_iter()
886            .map(|(worker, property)| {
887                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
888                WorkerInfo(worker, property, extra_info).into()
889            })
890            .collect_vec())
891    }
892
893    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
894        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
895            .select_only()
896            .column(worker_property::Column::WorkerId)
897            .column(worker_property::Column::Parallelism)
898            .inner_join(Worker)
899            .filter(worker::Column::Status.eq(WorkerStatus::Running))
900            .into_tuple()
901            .all(&self.db)
902            .await?;
903        Ok(worker_parallelisms
904            .into_iter()
905            .flat_map(|(worker_id, parallelism)| {
906                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id as u32, idx as usize))
907            })
908            .collect_vec())
909    }
910
911    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
912        let workers = Worker::find()
913            .filter(
914                worker::Column::WorkerType
915                    .eq(WorkerType::ComputeNode)
916                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
917            )
918            .inner_join(WorkerProperty)
919            .select_also(WorkerProperty)
920            .filter(worker_property::Column::IsServing.eq(true))
921            .all(&self.db)
922            .await?;
923
924        Ok(workers
925            .into_iter()
926            .map(|(worker, property)| {
927                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
928                WorkerInfo(worker, property, extra_info).into()
929            })
930            .collect_vec())
931    }
932
933    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
934        let mut streaming_workers = self.list_active_streaming_workers().await?;
935
936        let unschedulable_workers: HashSet<_> = streaming_workers
937            .extract_if(.., |worker| {
938                worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
939            })
940            .map(|w| w.id)
941            .collect();
942
943        let schedulable_workers = streaming_workers
944            .iter()
945            .map(|worker| worker.id)
946            .filter(|id| !unschedulable_workers.contains(id))
947            .collect();
948
949        let active_workers: HashMap<_, _> =
950            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
951
952        Ok(StreamingClusterInfo {
953            worker_nodes: active_workers,
954            schedulable_workers,
955            unschedulable_workers,
956        })
957    }
958
959    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
960        let worker = Worker::find_by_id(worker_id)
961            .find_also_related(WorkerProperty)
962            .one(&self.db)
963            .await?;
964        if worker.is_none() {
965            return Ok(None);
966        }
967        let extra_info = self.get_extra_info_checked(worker_id)?;
968        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
969    }
970
971    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
972        self.worker_extra_info.get(&worker_id).cloned()
973    }
974}
975
976#[cfg(test)]
977mod tests {
978    use super::*;
979
980    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
981        (0..count)
982            .map(|i| HostAddress {
983                host: "localhost".to_owned(),
984                port: 5000 + i as i32,
985            })
986            .collect_vec()
987    }
988
989    #[tokio::test]
990    async fn test_cluster_controller() -> MetaResult<()> {
991        let env = MetaSrvEnv::for_test().await;
992        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
993
994        let parallelism_num = 4_usize;
995        let worker_count = 5_usize;
996        let property = AddNodeProperty {
997            parallelism: parallelism_num as _,
998            is_streaming: true,
999            is_serving: true,
1000            is_unschedulable: false,
1001            ..Default::default()
1002        };
1003        let hosts = mock_worker_hosts_for_test(worker_count);
1004        let mut worker_ids = vec![];
1005        for host in &hosts {
1006            worker_ids.push(
1007                cluster_ctl
1008                    .add_worker(
1009                        PbWorkerType::ComputeNode,
1010                        host.clone(),
1011                        property.clone(),
1012                        PbResource::default(),
1013                    )
1014                    .await?,
1015            );
1016        }
1017
1018        // Since no worker is active, the parallelism should be 0.
1019        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1020
1021        for id in &worker_ids {
1022            cluster_ctl.activate_worker(*id).await?;
1023        }
1024        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1025        assert_eq!(
1026            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1027            worker_count
1028        );
1029        assert_eq!(
1030            cluster_ctl.list_active_streaming_workers().await?.len(),
1031            worker_count
1032        );
1033        assert_eq!(
1034            cluster_ctl.list_active_serving_workers().await?.len(),
1035            worker_count
1036        );
1037        assert_eq!(
1038            cluster_ctl.list_active_worker_slots().await?.len(),
1039            parallelism_num * worker_count
1040        );
1041
1042        // re-register existing worker node with larger parallelism and change its serving mode.
1043        let mut new_property = property.clone();
1044        new_property.parallelism = (parallelism_num * 2) as _;
1045        new_property.is_serving = false;
1046        cluster_ctl
1047            .add_worker(
1048                PbWorkerType::ComputeNode,
1049                hosts[0].clone(),
1050                new_property,
1051                PbResource::default(),
1052            )
1053            .await?;
1054
1055        assert_eq!(
1056            cluster_ctl.list_active_streaming_workers().await?.len(),
1057            worker_count
1058        );
1059        assert_eq!(
1060            cluster_ctl.list_active_serving_workers().await?.len(),
1061            worker_count - 1
1062        );
1063        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1064        assert!(worker_slots.iter().all_unique());
1065        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1066
1067        // delete workers.
1068        for host in hosts {
1069            cluster_ctl.delete_worker(host).await?;
1070        }
1071        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1072        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1073        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1074
1075        Ok(())
1076    }
1077
1078    #[tokio::test]
1079    async fn test_update_schedulability() -> MetaResult<()> {
1080        let env = MetaSrvEnv::for_test().await;
1081        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1082
1083        let host = HostAddress {
1084            host: "localhost".to_owned(),
1085            port: 5001,
1086        };
1087        let mut property = AddNodeProperty {
1088            is_streaming: true,
1089            is_serving: true,
1090            is_unschedulable: false,
1091            parallelism: 4,
1092            ..Default::default()
1093        };
1094        let worker_id = cluster_ctl
1095            .add_worker(
1096                PbWorkerType::ComputeNode,
1097                host.clone(),
1098                property.clone(),
1099                PbResource::default(),
1100            )
1101            .await?;
1102
1103        cluster_ctl.activate_worker(worker_id).await?;
1104        cluster_ctl
1105            .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1106            .await?;
1107
1108        let workers = cluster_ctl.list_active_streaming_workers().await?;
1109        assert_eq!(workers.len(), 1);
1110        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1111
1112        // re-register existing worker node and change its serving mode, the schedulable state should not be changed.
1113        property.is_unschedulable = false;
1114        property.is_serving = false;
1115        let new_worker_id = cluster_ctl
1116            .add_worker(
1117                PbWorkerType::ComputeNode,
1118                host.clone(),
1119                property,
1120                PbResource::default(),
1121            )
1122            .await?;
1123        assert_eq!(worker_id, new_worker_id);
1124
1125        let workers = cluster_ctl.list_active_streaming_workers().await?;
1126        assert_eq!(workers.len(), 1);
1127        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1128
1129        cluster_ctl.delete_worker(host).await?;
1130
1131        Ok(())
1132    }
1133}