risingwave_meta/controller/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37    ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45    TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61    env: MetaSrvEnv,
62    max_heartbeat_interval: Duration,
63    inner: RwLock<ClusterControllerInner>,
64    /// Used as timestamp when meta node starts in sec.
65    started_at: u64,
66}
67
68struct WorkerInfo(
69    worker::Model,
70    Option<worker_property::Model>,
71    WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75    fn from(info: WorkerInfo) -> Self {
76        Self {
77            id: info.0.worker_id,
78            r#type: PbWorkerType::from(info.0.worker_type) as _,
79            host: Some(PbHostAddress {
80                host: info.0.host,
81                port: info.0.port,
82            }),
83            state: PbState::from(info.0.status) as _,
84            property: info.1.as_ref().map(|p| PbProperty {
85                is_streaming: p.is_streaming,
86                is_serving: p.is_serving,
87                is_unschedulable: p.is_unschedulable,
88                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89                resource_group: p.resource_group.clone(),
90                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91                is_iceberg_compactor: p.is_iceberg_compactor,
92            }),
93            transactional_id: info.0.transaction_id.map(|id| id as _),
94            resource: Some(info.2.resource),
95            started_at: info.2.started_at,
96        }
97    }
98}
99
100impl ClusterController {
101    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
102        let inner = ClusterControllerInner::new(
103            env.meta_store_ref().conn.clone(),
104            env.opts.disable_automatic_parallelism_control,
105        )
106        .await?;
107        Ok(Self {
108            env,
109            max_heartbeat_interval,
110            inner: RwLock::new(inner),
111            started_at: timestamp_now_sec(),
112        })
113    }
114
115    /// Used in `NotificationService::subscribe`.
116    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
117    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
118        self.inner.read().await
119    }
120
121    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
122        self.inner.read().await.count_worker_by_type().await
123    }
124
125    /// Get the total resource of the cluster.
126    pub async fn cluster_resource(&self) -> ClusterResource {
127        self.inner.read().await.cluster_resource()
128    }
129
130    /// Get the total resource of the cluster, then update license manager and notify all other nodes.
131    async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
132        let resource = self.cluster_resource().await;
133
134        // Update local license manager.
135        LicenseManager::get().update_cluster_resource(resource);
136        // Notify all other nodes.
137        self.env.notification_manager().notify_all_without_version(
138            Operation::Update, // unused
139            Info::ClusterResource(resource),
140        );
141
142        Ok(())
143    }
144
145    /// A worker node will immediately register itself to meta when it bootstraps.
146    /// The meta will assign it with a unique ID and set its state as `Starting`.
147    /// When the worker node is fully ready to serve, it will request meta again
148    /// (via `activate_worker_node`) to set its state to `Running`.
149    pub async fn add_worker(
150        &self,
151        r#type: PbWorkerType,
152        host_address: HostAddress,
153        property: AddNodeProperty,
154        resource: PbResource,
155    ) -> MetaResult<WorkerId> {
156        let worker_id = self
157            .inner
158            .write()
159            .await
160            .add_worker(
161                r#type,
162                host_address,
163                property,
164                resource,
165                self.max_heartbeat_interval,
166            )
167            .await?;
168
169        self.update_cluster_resource_for_license().await?;
170
171        Ok(worker_id)
172    }
173
174    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
175        let inner = self.inner.write().await;
176        let worker = inner.activate_worker(worker_id).await?;
177
178        // Notify frontends of new compute node and frontend node.
179        // Always notify because a running worker's property may have been changed.
180        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
181        {
182            self.env
183                .notification_manager()
184                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
185                .await;
186        }
187        self.env
188            .notification_manager()
189            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
190
191        Ok(())
192    }
193
194    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
195        let worker = self.inner.write().await.delete_worker(host_address).await?;
196
197        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
198        {
199            self.env
200                .notification_manager()
201                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
202                .await;
203            if worker.r#type() == PbWorkerType::ComputeNode {
204                self.update_cluster_resource_for_license().await?;
205            }
206        }
207
208        // Notify local subscribers.
209        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
210        // local notification.
211        self.env
212            .notification_manager()
213            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
214
215        Ok(worker)
216    }
217
218    pub async fn update_schedulability(
219        &self,
220        worker_ids: Vec<WorkerId>,
221        schedulability: Schedulability,
222    ) -> MetaResult<()> {
223        self.inner
224            .write()
225            .await
226            .update_schedulability(worker_ids, schedulability)
227            .await
228    }
229
230    /// Invoked when it receives a heartbeat from a worker node.
231    pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
232        tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
233        self.inner
234            .write()
235            .await
236            .heartbeat(worker_id, self.max_heartbeat_interval)
237    }
238
239    pub fn start_heartbeat_checker(
240        cluster_controller: ClusterControllerRef,
241        check_interval: Duration,
242    ) -> (JoinHandle<()>, Sender<()>) {
243        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
244        let join_handle = tokio::spawn(async move {
245            let mut min_interval = tokio::time::interval(check_interval);
246            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
247            loop {
248                tokio::select! {
249                    // Wait for interval
250                    _ = min_interval.tick() => {},
251                    // Shutdown
252                    _ = &mut shutdown_rx => {
253                        tracing::info!("Heartbeat checker is stopped");
254                        return;
255                    }
256                }
257
258                let mut inner = cluster_controller.inner.write().await;
259                // 1. Initialize new workers' TTL.
260                for worker in inner
261                    .worker_extra_info
262                    .values_mut()
263                    .filter(|worker| worker.expire_at.is_none())
264                {
265                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
266                }
267
268                // 2. Collect expired workers.
269                let now = timestamp_now_sec();
270                let worker_to_delete = inner
271                    .worker_extra_info
272                    .iter()
273                    .filter(|(_, info)| info.expire_at.unwrap() < now)
274                    .map(|(id, _)| *id)
275                    .collect_vec();
276
277                // 3. Delete expired workers.
278                let worker_infos = match Worker::find()
279                    .select_only()
280                    .column(worker::Column::WorkerId)
281                    .column(worker::Column::WorkerType)
282                    .column(worker::Column::Host)
283                    .column(worker::Column::Port)
284                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
285                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
286                    .all(&inner.db)
287                    .await
288                {
289                    Ok(keys) => keys,
290                    Err(err) => {
291                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
292                        continue;
293                    }
294                };
295                drop(inner);
296
297                for (worker_id, worker_type, host, port) in worker_infos {
298                    let host_addr = PbHostAddress { host, port };
299                    match cluster_controller.delete_worker(host_addr.clone()).await {
300                        Ok(_) => {
301                            tracing::warn!(
302                                %worker_id,
303                                ?host_addr,
304                                %now,
305                                "Deleted expired worker"
306                            );
307                            match worker_type {
308                                WorkerType::Frontend
309                                | WorkerType::ComputeNode
310                                | WorkerType::Compactor
311                                | WorkerType::RiseCtl => cluster_controller
312                                    .env
313                                    .notification_manager()
314                                    .delete_sender(worker_type.into(), WorkerKey(host_addr)),
315                                _ => {}
316                            };
317                        }
318                        Err(err) => {
319                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
320                        }
321                    }
322                }
323            }
324        });
325
326        (join_handle, shutdown_tx)
327    }
328
329    /// Get live nodes with the specified type and state.
330    /// # Arguments
331    /// * `worker_type` `WorkerType` of the nodes
332    /// * `worker_state` Filter by this state if it is not None.
333    pub async fn list_workers(
334        &self,
335        worker_type: Option<WorkerType>,
336        worker_status: Option<WorkerStatus>,
337    ) -> MetaResult<Vec<PbWorkerNode>> {
338        let mut workers = vec![];
339        // fill meta info.
340        if worker_type.is_none() {
341            workers.push(meta_node_info(
342                &self.env.opts.advertise_addr,
343                Some(self.started_at),
344            ));
345        }
346        workers.extend(
347            self.inner
348                .read()
349                .await
350                .list_workers(worker_type, worker_status)
351                .await?,
352        );
353        Ok(workers)
354    }
355
356    pub(crate) async fn subscribe_active_streaming_compute_nodes(
357        &self,
358    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
359        let inner = self.inner.read().await;
360        let worker_nodes = inner.list_active_streaming_workers().await?;
361        let (tx, rx) = unbounded_channel();
362
363        // insert before release the read lock to ensure that we don't lose any update in between
364        self.env.notification_manager().insert_local_sender(tx);
365        drop(inner);
366        Ok((worker_nodes, rx))
367    }
368
369    /// A convenient method to get all running compute nodes that may have running actors on them
370    /// i.e. CNs which are running
371    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
372        self.inner
373            .read()
374            .await
375            .list_active_streaming_workers()
376            .await
377    }
378
379    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
380        self.inner.read().await.list_active_worker_slots().await
381    }
382
383    /// Get the cluster info used for scheduling a streaming job, containing all nodes that are
384    /// running and schedulable
385    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
386        self.inner.read().await.list_active_serving_workers().await
387    }
388
389    /// Get the cluster info used for scheduling a streaming job.
390    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
391        self.inner.read().await.get_streaming_cluster_info().await
392    }
393
394    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
395        self.inner.read().await.get_worker_by_id(worker_id).await
396    }
397
398    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
399        self.inner
400            .read()
401            .await
402            .get_worker_extra_info_by_id(worker_id)
403    }
404
405    pub fn cluster_id(&self) -> &ClusterId {
406        self.env.cluster_id()
407    }
408
409    pub fn meta_store_endpoint(&self) -> String {
410        self.env.meta_store_ref().endpoint.clone()
411    }
412}
413
414/// The cluster info used for scheduling a streaming job.
415#[derive(Debug, Clone)]
416pub struct StreamingClusterInfo {
417    /// All **active** compute nodes in the cluster.
418    pub worker_nodes: HashMap<WorkerId, WorkerNode>,
419
420    /// All schedulable compute nodes in the cluster. Normally for resource group based scheduling.
421    pub schedulable_workers: HashSet<WorkerId>,
422
423    /// All unschedulable compute nodes in the cluster.
424    pub unschedulable_workers: HashSet<WorkerId>,
425}
426
427// Encapsulating the use of parallelism
428impl StreamingClusterInfo {
429    pub fn parallelism(&self, resource_group: &str) -> usize {
430        let available_worker_ids =
431            filter_workers_by_resource_group(&self.worker_nodes, resource_group);
432
433        self.worker_nodes
434            .values()
435            .filter(|worker| available_worker_ids.contains(&(worker.id)))
436            .map(|worker| worker.compute_node_parallelism())
437            .sum()
438    }
439
440    pub fn filter_schedulable_workers_by_resource_group(
441        &self,
442        resource_group: &str,
443    ) -> HashMap<WorkerId, WorkerNode> {
444        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
445        self.worker_nodes
446            .iter()
447            .filter(|(id, _)| worker_ids.contains(*id))
448            .map(|(id, worker)| (*id, worker.clone()))
449            .collect()
450    }
451}
452
453#[derive(Default, Clone, Debug)]
454pub struct WorkerExtraInfo {
455    // Volatile values updated by meta node as follows.
456    //
457    // Unix timestamp that the worker will expire at.
458    expire_at: Option<u64>,
459    started_at: Option<u64>,
460    resource: PbResource,
461}
462
463impl WorkerExtraInfo {
464    fn update_ttl(&mut self, ttl: Duration) {
465        let expire = cmp::max(
466            self.expire_at.unwrap_or_default(),
467            SystemTime::now()
468                .add(ttl)
469                .duration_since(SystemTime::UNIX_EPOCH)
470                .expect("Clock may have gone backwards")
471                .as_secs(),
472        );
473        self.expire_at = Some(expire);
474    }
475
476    fn update_started_at(&mut self) {
477        self.started_at = Some(timestamp_now_sec());
478    }
479}
480
481fn timestamp_now_sec() -> u64 {
482    SystemTime::now()
483        .duration_since(SystemTime::UNIX_EPOCH)
484        .expect("Clock may have gone backwards")
485        .as_secs()
486}
487
488fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
489    PbWorkerNode {
490        id: META_NODE_ID,
491        r#type: PbWorkerType::Meta.into(),
492        host: HostAddr::try_from(host)
493            .as_ref()
494            .map(HostAddr::to_protobuf)
495            .ok(),
496        state: PbState::Running as _,
497        property: None,
498        transactional_id: None,
499        resource: Some(risingwave_pb::common::worker_node::Resource {
500            rw_version: RW_VERSION.to_owned(),
501            total_memory_bytes: system_memory_available_bytes() as _,
502            total_cpu_cores: total_cpu_available() as _,
503            hostname: hostname(),
504        }),
505        started_at,
506    }
507}
508
509pub struct ClusterControllerInner {
510    db: DatabaseConnection,
511    /// Record for tracking available machine ids, one is available.
512    available_transactional_ids: VecDeque<TransactionId>,
513    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
514    disable_automatic_parallelism_control: bool,
515}
516
517impl ClusterControllerInner {
518    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
519    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
520
521    pub async fn new(
522        db: DatabaseConnection,
523        disable_automatic_parallelism_control: bool,
524    ) -> MetaResult<Self> {
525        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
526            .select_only()
527            .column(worker::Column::WorkerId)
528            .column(worker::Column::TransactionId)
529            .into_tuple()
530            .all(&db)
531            .await?;
532        let inuse_txn_ids: HashSet<_> = workers
533            .iter()
534            .cloned()
535            .filter_map(|(_, txn_id)| txn_id)
536            .collect();
537        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
538            .filter(|id| !inuse_txn_ids.contains(id))
539            .collect();
540
541        let worker_extra_info = workers
542            .into_iter()
543            .map(|(w, _)| (w, WorkerExtraInfo::default()))
544            .collect();
545
546        Ok(Self {
547            db,
548            available_transactional_ids,
549            worker_extra_info,
550            disable_automatic_parallelism_control,
551        })
552    }
553
554    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
555        let workers: Vec<(WorkerType, i64)> = Worker::find()
556            .select_only()
557            .column(worker::Column::WorkerType)
558            .column_as(worker::Column::WorkerId.count(), "count")
559            .group_by(worker::Column::WorkerType)
560            .into_tuple()
561            .all(&self.db)
562            .await?;
563
564        Ok(workers.into_iter().collect())
565    }
566
567    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
568        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
569            let expire = cmp::max(
570                info.expire_at.unwrap_or_default(),
571                SystemTime::now()
572                    .add(ttl)
573                    .duration_since(SystemTime::UNIX_EPOCH)
574                    .expect("Clock may have gone backwards")
575                    .as_secs(),
576            );
577            info.expire_at = Some(expire);
578            Ok(())
579        } else {
580            Err(MetaError::invalid_worker(worker_id, "worker not found"))
581        }
582    }
583
584    fn update_resource_and_started_at(
585        &mut self,
586        worker_id: WorkerId,
587        resource: PbResource,
588    ) -> MetaResult<()> {
589        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
590            info.resource = resource;
591            info.update_started_at();
592            Ok(())
593        } else {
594            Err(MetaError::invalid_worker(worker_id, "worker not found"))
595        }
596    }
597
598    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
599        self.worker_extra_info
600            .get(&worker_id)
601            .cloned()
602            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
603    }
604
605    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
606        match (self.available_transactional_ids.front(), r#type) {
607            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
608            // We only assign transactional id to compute node and frontend.
609            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
610            _ => Ok(None),
611        }
612    }
613
614    /// Get the total resource of the cluster.
615    fn cluster_resource(&self) -> ClusterResource {
616        // For each hostname, we only consider the maximum resource, in case a host has multiple nodes.
617        let mut per_host = HashMap::new();
618
619        for info in self.worker_extra_info.values() {
620            let r = per_host
621                .entry(info.resource.hostname.as_str())
622                .or_insert_with(ClusterResource::default);
623
624            r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
625            r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
626        }
627
628        // For different hostnames, we sum up the resources.
629        per_host
630            .into_values()
631            .reduce(|a, b| ClusterResource {
632                total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
633                total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
634            })
635            .unwrap_or_default()
636    }
637
638    pub async fn add_worker(
639        &mut self,
640        r#type: PbWorkerType,
641        host_address: HostAddress,
642        add_property: AddNodeProperty,
643        resource: PbResource,
644        ttl: Duration,
645    ) -> MetaResult<WorkerId> {
646        let txn = self.db.begin().await?;
647
648        let worker = Worker::find()
649            .filter(
650                worker::Column::Host
651                    .eq(host_address.host.clone())
652                    .and(worker::Column::Port.eq(host_address.port)),
653            )
654            .find_also_related(WorkerProperty)
655            .one(&txn)
656            .await?;
657        // Worker already exist.
658        if let Some((worker, property)) = worker {
659            assert_eq!(worker.worker_type, r#type.into());
660            return if worker.worker_type == WorkerType::ComputeNode {
661                let property = property.unwrap();
662                let mut current_parallelism = property.parallelism as usize;
663                let new_parallelism = add_property.parallelism as usize;
664                match new_parallelism.cmp(&current_parallelism) {
665                    Ordering::Less => {
666                        if !self.disable_automatic_parallelism_control {
667                            // Handing over to the subsequent recovery loop for a forced reschedule.
668                            tracing::info!(
669                                "worker {} parallelism reduced from {} to {}",
670                                worker.worker_id,
671                                current_parallelism,
672                                new_parallelism
673                            );
674                            current_parallelism = new_parallelism;
675                        } else {
676                            // Warn and keep the original parallelism if the worker registered with a
677                            // smaller parallelism.
678                            tracing::warn!(
679                                "worker {} parallelism is less than current, current is {}, but received {}",
680                                worker.worker_id,
681                                current_parallelism,
682                                new_parallelism
683                            );
684                        }
685                    }
686                    Ordering::Greater => {
687                        tracing::info!(
688                            "worker {} parallelism updated from {} to {}",
689                            worker.worker_id,
690                            current_parallelism,
691                            new_parallelism
692                        );
693                        current_parallelism = new_parallelism;
694                    }
695                    Ordering::Equal => {}
696                }
697                let mut property: worker_property::ActiveModel = property.into();
698
699                // keep `is_unschedulable` unchanged.
700                property.is_streaming = Set(add_property.is_streaming);
701                property.is_serving = Set(add_property.is_serving);
702                property.parallelism = Set(current_parallelism as _);
703                property.resource_group =
704                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
705                        tracing::warn!(
706                            "resource_group is not set for worker {}, fallback to `default`",
707                            worker.worker_id
708                        );
709                        DEFAULT_RESOURCE_GROUP.to_owned()
710                    })));
711
712                WorkerProperty::update(property).exec(&txn).await?;
713                txn.commit().await?;
714                self.update_worker_ttl(worker.worker_id, ttl)?;
715                self.update_resource_and_started_at(worker.worker_id, resource)?;
716                Ok(worker.worker_id)
717            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
718                let worker_property = worker_property::ActiveModel {
719                    worker_id: Set(worker.worker_id),
720                    parallelism: Set(add_property
721                        .parallelism
722                        .try_into()
723                        .expect("invalid parallelism")),
724                    is_streaming: Set(add_property.is_streaming),
725                    is_serving: Set(add_property.is_serving),
726                    is_unschedulable: Set(add_property.is_unschedulable),
727                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
728                    resource_group: Set(None),
729                    is_iceberg_compactor: Set(false),
730                };
731                WorkerProperty::insert(worker_property).exec(&txn).await?;
732                txn.commit().await?;
733                self.update_worker_ttl(worker.worker_id, ttl)?;
734                self.update_resource_and_started_at(worker.worker_id, resource)?;
735                Ok(worker.worker_id)
736            } else if worker.worker_type == WorkerType::Compactor {
737                if let Some(property) = property {
738                    let mut property: worker_property::ActiveModel = property.into();
739                    property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
740                    property.internal_rpc_host_addr =
741                        Set(Some(add_property.internal_rpc_host_addr));
742
743                    WorkerProperty::update(property).exec(&txn).await?;
744                } else {
745                    let property = worker_property::ActiveModel {
746                        worker_id: Set(worker.worker_id),
747                        parallelism: Set(add_property
748                            .parallelism
749                            .try_into()
750                            .expect("invalid parallelism")),
751                        is_streaming: Set(false),
752                        is_serving: Set(false),
753                        is_unschedulable: Set(false),
754                        internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
755                        resource_group: Set(None),
756                        is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
757                    };
758
759                    WorkerProperty::insert(property).exec(&txn).await?;
760                }
761                txn.commit().await?;
762                self.update_worker_ttl(worker.worker_id, ttl)?;
763                self.update_resource_and_started_at(worker.worker_id, resource)?;
764                Ok(worker.worker_id)
765            } else {
766                self.update_worker_ttl(worker.worker_id, ttl)?;
767                self.update_resource_and_started_at(worker.worker_id, resource)?;
768                Ok(worker.worker_id)
769            };
770        }
771
772        let txn_id = self.apply_transaction_id(r#type)?;
773
774        let worker = worker::ActiveModel {
775            worker_id: Default::default(),
776            worker_type: Set(r#type.into()),
777            host: Set(host_address.host.clone()),
778            port: Set(host_address.port),
779            status: Set(WorkerStatus::Starting),
780            transaction_id: Set(txn_id),
781        };
782        let insert_res = Worker::insert(worker).exec(&txn).await?;
783        let worker_id = insert_res.last_insert_id as WorkerId;
784        if r#type == PbWorkerType::ComputeNode
785            || r#type == PbWorkerType::Frontend
786            || r#type == PbWorkerType::Compactor
787        {
788            let (is_serving, is_streaming, is_unschedulable, is_iceberg_compactor, resource_group) =
789                match r#type {
790                    PbWorkerType::ComputeNode => (
791                        add_property.is_serving,
792                        add_property.is_streaming,
793                        add_property.is_unschedulable,
794                        false,
795                        add_property.resource_group.clone(),
796                    ),
797                    PbWorkerType::Frontend => (
798                        add_property.is_serving,
799                        add_property.is_streaming,
800                        add_property.is_unschedulable,
801                        false,
802                        None,
803                    ),
804                    PbWorkerType::Compactor => {
805                        (false, false, false, add_property.is_iceberg_compactor, None)
806                    }
807                    _ => unreachable!(),
808                };
809
810            let property = worker_property::ActiveModel {
811                worker_id: Set(worker_id),
812                parallelism: Set(add_property
813                    .parallelism
814                    .try_into()
815                    .expect("invalid parallelism")),
816                is_streaming: Set(is_streaming),
817                is_serving: Set(is_serving),
818                is_unschedulable: Set(is_unschedulable),
819                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
820                resource_group: Set(resource_group),
821                is_iceberg_compactor: Set(is_iceberg_compactor),
822            };
823            WorkerProperty::insert(property).exec(&txn).await?;
824        }
825
826        txn.commit().await?;
827        if let Some(txn_id) = txn_id {
828            self.available_transactional_ids.retain(|id| *id != txn_id);
829        }
830        let extra_info = WorkerExtraInfo {
831            started_at: Some(timestamp_now_sec()),
832            expire_at: None,
833            resource,
834        };
835        self.worker_extra_info.insert(worker_id, extra_info);
836
837        Ok(worker_id)
838    }
839
840    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
841        let worker = worker::ActiveModel {
842            worker_id: Set(worker_id),
843            status: Set(WorkerStatus::Running),
844            ..Default::default()
845        };
846
847        let worker = worker.update(&self.db).await?;
848        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
849            .one(&self.db)
850            .await?;
851        let extra_info = self.get_extra_info_checked(worker_id)?;
852        Ok(WorkerInfo(worker, worker_property, extra_info).into())
853    }
854
855    pub async fn update_schedulability(
856        &self,
857        worker_ids: Vec<WorkerId>,
858        schedulability: Schedulability,
859    ) -> MetaResult<()> {
860        let is_unschedulable = schedulability == Schedulability::Unschedulable;
861        WorkerProperty::update_many()
862            .col_expr(
863                worker_property::Column::IsUnschedulable,
864                Expr::value(is_unschedulable),
865            )
866            .filter(worker_property::Column::WorkerId.is_in(worker_ids))
867            .exec(&self.db)
868            .await?;
869
870        Ok(())
871    }
872
873    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
874        let worker = Worker::find()
875            .filter(
876                worker::Column::Host
877                    .eq(host_addr.host)
878                    .and(worker::Column::Port.eq(host_addr.port)),
879            )
880            .find_also_related(WorkerProperty)
881            .one(&self.db)
882            .await?;
883        let Some((worker, property)) = worker else {
884            return Err(MetaError::invalid_parameter("worker not found!"));
885        };
886
887        let res = Worker::delete_by_id(worker.worker_id)
888            .exec(&self.db)
889            .await?;
890        if res.rows_affected == 0 {
891            return Err(MetaError::invalid_parameter("worker not found!"));
892        }
893
894        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
895        if let Some(txn_id) = &worker.transaction_id {
896            self.available_transactional_ids.push_back(*txn_id);
897        }
898        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
899
900        Ok(worker)
901    }
902
903    pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
904        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
905            worker_info.update_ttl(ttl);
906            Ok(())
907        } else {
908            Err(MetaError::invalid_worker(worker_id, "worker not found"))
909        }
910    }
911
912    pub async fn list_workers(
913        &self,
914        worker_type: Option<WorkerType>,
915        worker_status: Option<WorkerStatus>,
916    ) -> MetaResult<Vec<PbWorkerNode>> {
917        let mut find = Worker::find();
918        if let Some(worker_type) = worker_type {
919            find = find.filter(worker::Column::WorkerType.eq(worker_type));
920        }
921        if let Some(worker_status) = worker_status {
922            find = find.filter(worker::Column::Status.eq(worker_status));
923        }
924        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
925        Ok(workers
926            .into_iter()
927            .map(|(worker, property)| {
928                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
929                WorkerInfo(worker, property, extra_info).into()
930            })
931            .collect_vec())
932    }
933
934    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
935        let workers = Worker::find()
936            .filter(
937                worker::Column::WorkerType
938                    .eq(WorkerType::ComputeNode)
939                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
940            )
941            .inner_join(WorkerProperty)
942            .select_also(WorkerProperty)
943            .filter(worker_property::Column::IsStreaming.eq(true))
944            .all(&self.db)
945            .await?;
946
947        Ok(workers
948            .into_iter()
949            .map(|(worker, property)| {
950                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
951                WorkerInfo(worker, property, extra_info).into()
952            })
953            .collect_vec())
954    }
955
956    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
957        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
958            .select_only()
959            .column(worker_property::Column::WorkerId)
960            .column(worker_property::Column::Parallelism)
961            .inner_join(Worker)
962            .filter(worker::Column::Status.eq(WorkerStatus::Running))
963            .into_tuple()
964            .all(&self.db)
965            .await?;
966        Ok(worker_parallelisms
967            .into_iter()
968            .flat_map(|(worker_id, parallelism)| {
969                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
970            })
971            .collect_vec())
972    }
973
974    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
975        let workers = Worker::find()
976            .filter(
977                worker::Column::WorkerType
978                    .eq(WorkerType::ComputeNode)
979                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
980            )
981            .inner_join(WorkerProperty)
982            .select_also(WorkerProperty)
983            .filter(worker_property::Column::IsServing.eq(true))
984            .all(&self.db)
985            .await?;
986
987        Ok(workers
988            .into_iter()
989            .map(|(worker, property)| {
990                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
991                WorkerInfo(worker, property, extra_info).into()
992            })
993            .collect_vec())
994    }
995
996    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
997        let mut streaming_workers = self.list_active_streaming_workers().await?;
998
999        let unschedulable_workers: HashSet<_> = streaming_workers
1000            .extract_if(.., |worker| {
1001                worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
1002            })
1003            .map(|w| w.id)
1004            .collect();
1005
1006        let schedulable_workers = streaming_workers
1007            .iter()
1008            .map(|worker| worker.id)
1009            .filter(|id| !unschedulable_workers.contains(id))
1010            .collect();
1011
1012        let active_workers: HashMap<_, _> =
1013            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
1014
1015        Ok(StreamingClusterInfo {
1016            worker_nodes: active_workers,
1017            schedulable_workers,
1018            unschedulable_workers,
1019        })
1020    }
1021
1022    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
1023        let worker = Worker::find_by_id(worker_id)
1024            .find_also_related(WorkerProperty)
1025            .one(&self.db)
1026            .await?;
1027        if worker.is_none() {
1028            return Ok(None);
1029        }
1030        let extra_info = self.get_extra_info_checked(worker_id)?;
1031        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1032    }
1033
1034    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1035        self.worker_extra_info.get(&worker_id).cloned()
1036    }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use super::*;
1042
1043    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1044        (0..count)
1045            .map(|i| HostAddress {
1046                host: "localhost".to_owned(),
1047                port: 5000 + i as i32,
1048            })
1049            .collect_vec()
1050    }
1051
1052    #[tokio::test]
1053    async fn test_cluster_controller() -> MetaResult<()> {
1054        let env = MetaSrvEnv::for_test().await;
1055        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1056
1057        let parallelism_num = 4_usize;
1058        let worker_count = 5_usize;
1059        let property = AddNodeProperty {
1060            parallelism: parallelism_num as _,
1061            is_streaming: true,
1062            is_serving: true,
1063            is_unschedulable: false,
1064            ..Default::default()
1065        };
1066        let hosts = mock_worker_hosts_for_test(worker_count);
1067        let mut worker_ids = vec![];
1068        for host in &hosts {
1069            worker_ids.push(
1070                cluster_ctl
1071                    .add_worker(
1072                        PbWorkerType::ComputeNode,
1073                        host.clone(),
1074                        property.clone(),
1075                        PbResource::default(),
1076                    )
1077                    .await?,
1078            );
1079        }
1080
1081        // Since no worker is active, the parallelism should be 0.
1082        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1083
1084        for id in &worker_ids {
1085            cluster_ctl.activate_worker(*id).await?;
1086        }
1087        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1088        assert_eq!(
1089            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1090            worker_count
1091        );
1092        assert_eq!(
1093            cluster_ctl.list_active_streaming_workers().await?.len(),
1094            worker_count
1095        );
1096        assert_eq!(
1097            cluster_ctl.list_active_serving_workers().await?.len(),
1098            worker_count
1099        );
1100        assert_eq!(
1101            cluster_ctl.list_active_worker_slots().await?.len(),
1102            parallelism_num * worker_count
1103        );
1104
1105        // re-register existing worker node with larger parallelism and change its serving mode.
1106        let mut new_property = property.clone();
1107        new_property.parallelism = (parallelism_num * 2) as _;
1108        new_property.is_serving = false;
1109        cluster_ctl
1110            .add_worker(
1111                PbWorkerType::ComputeNode,
1112                hosts[0].clone(),
1113                new_property,
1114                PbResource::default(),
1115            )
1116            .await?;
1117
1118        assert_eq!(
1119            cluster_ctl.list_active_streaming_workers().await?.len(),
1120            worker_count
1121        );
1122        assert_eq!(
1123            cluster_ctl.list_active_serving_workers().await?.len(),
1124            worker_count - 1
1125        );
1126        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1127        assert!(worker_slots.iter().all_unique());
1128        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1129
1130        // delete workers.
1131        for host in hosts {
1132            cluster_ctl.delete_worker(host).await?;
1133        }
1134        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1135        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1136        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1137
1138        Ok(())
1139    }
1140
1141    #[tokio::test]
1142    async fn test_update_schedulability() -> MetaResult<()> {
1143        let env = MetaSrvEnv::for_test().await;
1144        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1145
1146        let host = HostAddress {
1147            host: "localhost".to_owned(),
1148            port: 5001,
1149        };
1150        let mut property = AddNodeProperty {
1151            is_streaming: true,
1152            is_serving: true,
1153            is_unschedulable: false,
1154            parallelism: 4,
1155            ..Default::default()
1156        };
1157        let worker_id = cluster_ctl
1158            .add_worker(
1159                PbWorkerType::ComputeNode,
1160                host.clone(),
1161                property.clone(),
1162                PbResource::default(),
1163            )
1164            .await?;
1165
1166        cluster_ctl.activate_worker(worker_id).await?;
1167        cluster_ctl
1168            .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1169            .await?;
1170
1171        let workers = cluster_ctl.list_active_streaming_workers().await?;
1172        assert_eq!(workers.len(), 1);
1173        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1174
1175        // re-register existing worker node and change its serving mode, the schedulable state should not be changed.
1176        property.is_unschedulable = false;
1177        property.is_serving = false;
1178        let new_worker_id = cluster_ctl
1179            .add_worker(
1180                PbWorkerType::ComputeNode,
1181                host.clone(),
1182                property,
1183                PbResource::default(),
1184            )
1185            .await?;
1186        assert_eq!(worker_id, new_worker_id);
1187
1188        let workers = cluster_ctl.list_active_streaming_workers().await?;
1189        assert_eq!(workers.len(), 1);
1190        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1191
1192        cluster_ctl.delete_worker(host).await?;
1193
1194        Ok(())
1195    }
1196}