risingwave_meta/controller/
cluster.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37    ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45    TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61    env: MetaSrvEnv,
62    max_heartbeat_interval: Duration,
63    inner: RwLock<ClusterControllerInner>,
64    /// Used as timestamp when meta node starts in sec.
65    started_at: u64,
66}
67
68struct WorkerInfo(
69    worker::Model,
70    Option<worker_property::Model>,
71    WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75    fn from(info: WorkerInfo) -> Self {
76        Self {
77            id: info.0.worker_id,
78            r#type: PbWorkerType::from(info.0.worker_type) as _,
79            host: Some(PbHostAddress {
80                host: info.0.host,
81                port: info.0.port,
82            }),
83            state: PbState::from(info.0.status) as _,
84            property: info.1.as_ref().map(|p| PbProperty {
85                is_streaming: p.is_streaming,
86                is_serving: p.is_serving,
87                is_unschedulable: p.is_unschedulable,
88                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89                resource_group: p.resource_group.clone(),
90                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91                is_iceberg_compactor: p.is_iceberg_compactor,
92            }),
93            transactional_id: info.0.transaction_id.map(|id| id as _),
94            resource: Some(info.2.resource),
95            started_at: info.2.started_at,
96        }
97    }
98}
99
100impl ClusterController {
101    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
102        let inner = ClusterControllerInner::new(
103            env.meta_store_ref().conn.clone(),
104            env.opts.disable_automatic_parallelism_control,
105        )
106        .await?;
107        Ok(Self {
108            env,
109            max_heartbeat_interval,
110            inner: RwLock::new(inner),
111            started_at: timestamp_now_sec(),
112        })
113    }
114
115    /// Used in `NotificationService::subscribe`.
116    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
117    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
118        self.inner.read().await
119    }
120
121    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
122        self.inner.read().await.count_worker_by_type().await
123    }
124
125    /// Get the total resource of the cluster.
126    pub async fn cluster_resource(&self) -> ClusterResource {
127        self.inner.read().await.cluster_resource()
128    }
129
130    /// Get the total resource of the cluster, then update license manager and notify all other nodes.
131    async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
132        let resource = self.cluster_resource().await;
133
134        // Update local license manager.
135        LicenseManager::get().update_cluster_resource(resource);
136        // Notify all other nodes.
137        self.env.notification_manager().notify_all_without_version(
138            Operation::Update, // unused
139            Info::ClusterResource(resource),
140        );
141
142        Ok(())
143    }
144
145    /// A worker node will immediately register itself to meta when it bootstraps.
146    /// The meta will assign it with a unique ID and set its state as `Starting`.
147    /// When the worker node is fully ready to serve, it will request meta again
148    /// (via `activate_worker_node`) to set its state to `Running`.
149    pub async fn add_worker(
150        &self,
151        r#type: PbWorkerType,
152        host_address: HostAddress,
153        property: AddNodeProperty,
154        resource: PbResource,
155    ) -> MetaResult<WorkerId> {
156        let worker_id = self
157            .inner
158            .write()
159            .await
160            .add_worker(
161                r#type,
162                host_address,
163                property,
164                resource,
165                self.max_heartbeat_interval,
166            )
167            .await?;
168
169        // Keep license manager in sync with the latest cluster resource.
170        self.update_cluster_resource_for_license().await?;
171
172        Ok(worker_id)
173    }
174
175    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
176        let inner = self.inner.write().await;
177        let worker = inner.activate_worker(worker_id).await?;
178
179        // Notify frontends of new compute node and frontend node.
180        // Always notify because a running worker's property may have been changed.
181        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
182        {
183            self.env
184                .notification_manager()
185                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
186                .await;
187        }
188        self.env
189            .notification_manager()
190            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
191
192        Ok(())
193    }
194
195    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
196        let worker = self.inner.write().await.delete_worker(host_address).await?;
197
198        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
199        {
200            self.env
201                .notification_manager()
202                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
203                .await;
204        }
205
206        // Keep license manager in sync with the latest cluster resource.
207        self.update_cluster_resource_for_license().await?;
208
209        // Notify local subscribers.
210        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
211        // local notification.
212        self.env
213            .notification_manager()
214            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
215
216        Ok(worker)
217    }
218
219    pub async fn update_schedulability(
220        &self,
221        worker_ids: Vec<WorkerId>,
222        schedulability: Schedulability,
223    ) -> MetaResult<()> {
224        self.inner
225            .write()
226            .await
227            .update_schedulability(worker_ids, schedulability)
228            .await
229    }
230
231    /// Invoked when it receives a heartbeat from a worker node.
232    pub async fn heartbeat(
233        &self,
234        worker_id: WorkerId,
235        resource: Option<PbResource>,
236    ) -> MetaResult<()> {
237        tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
238        self.inner
239            .write()
240            .await
241            .heartbeat(worker_id, self.max_heartbeat_interval, resource)
242    }
243
244    pub fn start_heartbeat_checker(
245        cluster_controller: ClusterControllerRef,
246        check_interval: Duration,
247    ) -> (JoinHandle<()>, Sender<()>) {
248        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
249        let join_handle = tokio::spawn(async move {
250            let mut min_interval = tokio::time::interval(check_interval);
251            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
252            loop {
253                tokio::select! {
254                    // Wait for interval
255                    _ = min_interval.tick() => {},
256                    // Shutdown
257                    _ = &mut shutdown_rx => {
258                        tracing::info!("Heartbeat checker is stopped");
259                        return;
260                    }
261                }
262
263                let mut inner = cluster_controller.inner.write().await;
264                // 1. Initialize new workers' TTL.
265                for worker in inner
266                    .worker_extra_info
267                    .values_mut()
268                    .filter(|worker| worker.expire_at.is_none())
269                {
270                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
271                }
272
273                // 2. Collect expired workers.
274                let now = timestamp_now_sec();
275                let worker_to_delete = inner
276                    .worker_extra_info
277                    .iter()
278                    .filter(|(_, info)| info.expire_at.unwrap() < now)
279                    .map(|(id, _)| *id)
280                    .collect_vec();
281
282                // 3. Delete expired workers.
283                let worker_infos = match Worker::find()
284                    .select_only()
285                    .column(worker::Column::WorkerId)
286                    .column(worker::Column::WorkerType)
287                    .column(worker::Column::Host)
288                    .column(worker::Column::Port)
289                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
290                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
291                    .all(&inner.db)
292                    .await
293                {
294                    Ok(keys) => keys,
295                    Err(err) => {
296                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
297                        continue;
298                    }
299                };
300                drop(inner);
301
302                for (worker_id, worker_type, host, port) in worker_infos {
303                    let host_addr = PbHostAddress { host, port };
304                    match cluster_controller.delete_worker(host_addr.clone()).await {
305                        Ok(_) => {
306                            tracing::warn!(
307                                %worker_id,
308                                ?host_addr,
309                                %now,
310                                "Deleted expired worker"
311                            );
312                            match worker_type {
313                                WorkerType::Frontend
314                                | WorkerType::ComputeNode
315                                | WorkerType::Compactor
316                                | WorkerType::RiseCtl => cluster_controller
317                                    .env
318                                    .notification_manager()
319                                    .delete_sender(worker_type.into(), WorkerKey(host_addr)),
320                                _ => {}
321                            };
322                        }
323                        Err(err) => {
324                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
325                        }
326                    }
327                }
328            }
329        });
330
331        (join_handle, shutdown_tx)
332    }
333
334    /// Get live nodes with the specified type and state.
335    /// # Arguments
336    /// * `worker_type` `WorkerType` of the nodes
337    /// * `worker_state` Filter by this state if it is not None.
338    pub async fn list_workers(
339        &self,
340        worker_type: Option<WorkerType>,
341        worker_status: Option<WorkerStatus>,
342    ) -> MetaResult<Vec<PbWorkerNode>> {
343        let mut workers = vec![];
344        // Meta node is not stored in the cluster manager DB, so we synthesize it here.
345        // Include it when listing all workers, or when explicitly listing meta nodes.
346        let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
347        // Meta node is always "running" once the service is up.
348        let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
349        if include_meta {
350            workers.push(meta_node_info(
351                &self.env.opts.advertise_addr,
352                Some(self.started_at),
353            ));
354        }
355        workers.extend(
356            self.inner
357                .read()
358                .await
359                .list_workers(worker_type, worker_status)
360                .await?,
361        );
362        Ok(workers)
363    }
364
365    pub(crate) async fn subscribe_active_streaming_compute_nodes(
366        &self,
367    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
368        let inner = self.inner.read().await;
369        let worker_nodes = inner.list_active_streaming_workers().await?;
370        let (tx, rx) = unbounded_channel();
371
372        // insert before release the read lock to ensure that we don't lose any update in between
373        self.env.notification_manager().insert_local_sender(tx);
374        drop(inner);
375        Ok((worker_nodes, rx))
376    }
377
378    /// A convenient method to get all running compute nodes that may have running actors on them
379    /// i.e. CNs which are running
380    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
381        self.inner
382            .read()
383            .await
384            .list_active_streaming_workers()
385            .await
386    }
387
388    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
389        self.inner.read().await.list_active_worker_slots().await
390    }
391
392    /// Get the cluster info used for scheduling a streaming job, containing all nodes that are
393    /// running and schedulable
394    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
395        self.inner.read().await.list_active_serving_workers().await
396    }
397
398    /// Get the cluster info used for scheduling a streaming job.
399    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
400        self.inner.read().await.get_streaming_cluster_info().await
401    }
402
403    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
404        self.inner.read().await.get_worker_by_id(worker_id).await
405    }
406
407    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
408        self.inner
409            .read()
410            .await
411            .get_worker_extra_info_by_id(worker_id)
412    }
413
414    pub fn cluster_id(&self) -> &ClusterId {
415        self.env.cluster_id()
416    }
417
418    pub fn meta_store_endpoint(&self) -> String {
419        self.env.meta_store_ref().endpoint.clone()
420    }
421}
422
423/// The cluster info used for scheduling a streaming job.
424#[derive(Debug, Clone)]
425pub struct StreamingClusterInfo {
426    /// All **active** compute nodes in the cluster.
427    pub worker_nodes: HashMap<WorkerId, WorkerNode>,
428
429    /// All schedulable compute nodes in the cluster. Normally for resource group based scheduling.
430    pub schedulable_workers: HashSet<WorkerId>,
431
432    /// All unschedulable compute nodes in the cluster.
433    pub unschedulable_workers: HashSet<WorkerId>,
434}
435
436// Encapsulating the use of parallelism
437impl StreamingClusterInfo {
438    pub fn parallelism(&self, resource_group: &str) -> usize {
439        let available_worker_ids =
440            filter_workers_by_resource_group(&self.worker_nodes, resource_group);
441
442        self.worker_nodes
443            .values()
444            .filter(|worker| available_worker_ids.contains(&(worker.id)))
445            .map(|worker| worker.compute_node_parallelism())
446            .sum()
447    }
448
449    pub fn filter_schedulable_workers_by_resource_group(
450        &self,
451        resource_group: &str,
452    ) -> HashMap<WorkerId, WorkerNode> {
453        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
454        self.worker_nodes
455            .iter()
456            .filter(|(id, _)| worker_ids.contains(*id))
457            .map(|(id, worker)| (*id, worker.clone()))
458            .collect()
459    }
460}
461
462#[derive(Default, Clone, Debug)]
463pub struct WorkerExtraInfo {
464    // Volatile values updated by meta node as follows.
465    //
466    // Unix timestamp that the worker will expire at.
467    expire_at: Option<u64>,
468    started_at: Option<u64>,
469    resource: PbResource,
470}
471
472impl WorkerExtraInfo {
473    fn update_ttl(&mut self, ttl: Duration) {
474        let expire = cmp::max(
475            self.expire_at.unwrap_or_default(),
476            SystemTime::now()
477                .add(ttl)
478                .duration_since(SystemTime::UNIX_EPOCH)
479                .expect("Clock may have gone backwards")
480                .as_secs(),
481        );
482        self.expire_at = Some(expire);
483    }
484
485    fn update_started_at(&mut self) {
486        self.started_at = Some(timestamp_now_sec());
487    }
488}
489
490fn timestamp_now_sec() -> u64 {
491    SystemTime::now()
492        .duration_since(SystemTime::UNIX_EPOCH)
493        .expect("Clock may have gone backwards")
494        .as_secs()
495}
496
497fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
498    PbWorkerNode {
499        id: META_NODE_ID,
500        r#type: PbWorkerType::Meta.into(),
501        host: HostAddr::try_from(host)
502            .as_ref()
503            .map(HostAddr::to_protobuf)
504            .ok(),
505        state: PbState::Running as _,
506        property: None,
507        transactional_id: None,
508        resource: Some(risingwave_pb::common::worker_node::Resource {
509            rw_version: RW_VERSION.to_owned(),
510            total_memory_bytes: system_memory_available_bytes() as _,
511            total_cpu_cores: total_cpu_available() as _,
512            hostname: hostname(),
513        }),
514        started_at,
515    }
516}
517
518pub struct ClusterControllerInner {
519    db: DatabaseConnection,
520    /// Record for tracking available machine ids, one is available.
521    available_transactional_ids: VecDeque<TransactionId>,
522    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
523    disable_automatic_parallelism_control: bool,
524}
525
526impl ClusterControllerInner {
527    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
528    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
529
530    pub async fn new(
531        db: DatabaseConnection,
532        disable_automatic_parallelism_control: bool,
533    ) -> MetaResult<Self> {
534        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
535            .select_only()
536            .column(worker::Column::WorkerId)
537            .column(worker::Column::TransactionId)
538            .into_tuple()
539            .all(&db)
540            .await?;
541        let inuse_txn_ids: HashSet<_> = workers
542            .iter()
543            .cloned()
544            .filter_map(|(_, txn_id)| txn_id)
545            .collect();
546        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
547            .filter(|id| !inuse_txn_ids.contains(id))
548            .collect();
549
550        let worker_extra_info = workers
551            .into_iter()
552            .map(|(w, _)| (w, WorkerExtraInfo::default()))
553            .collect();
554
555        Ok(Self {
556            db,
557            available_transactional_ids,
558            worker_extra_info,
559            disable_automatic_parallelism_control,
560        })
561    }
562
563    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
564        let workers: Vec<(WorkerType, i64)> = Worker::find()
565            .select_only()
566            .column(worker::Column::WorkerType)
567            .column_as(worker::Column::WorkerId.count(), "count")
568            .group_by(worker::Column::WorkerType)
569            .into_tuple()
570            .all(&self.db)
571            .await?;
572
573        Ok(workers.into_iter().collect())
574    }
575
576    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
577        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
578            let expire = cmp::max(
579                info.expire_at.unwrap_or_default(),
580                SystemTime::now()
581                    .add(ttl)
582                    .duration_since(SystemTime::UNIX_EPOCH)
583                    .expect("Clock may have gone backwards")
584                    .as_secs(),
585            );
586            info.expire_at = Some(expire);
587            Ok(())
588        } else {
589            Err(MetaError::invalid_worker(worker_id, "worker not found"))
590        }
591    }
592
593    fn update_resource_and_started_at(
594        &mut self,
595        worker_id: WorkerId,
596        resource: PbResource,
597    ) -> MetaResult<()> {
598        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
599            info.resource = resource;
600            info.update_started_at();
601            Ok(())
602        } else {
603            Err(MetaError::invalid_worker(worker_id, "worker not found"))
604        }
605    }
606
607    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
608        self.worker_extra_info
609            .get(&worker_id)
610            .cloned()
611            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
612    }
613
614    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
615        match (self.available_transactional_ids.front(), r#type) {
616            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
617            // We only assign transactional id to compute node and frontend.
618            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
619            _ => Ok(None),
620        }
621    }
622
623    /// Get the total resource of the cluster.
624    fn cluster_resource(&self) -> ClusterResource {
625        // For each hostname, we only consider the maximum resource, in case a host has multiple nodes.
626        let mut per_host = HashMap::new();
627
628        // Note: Meta node itself is not a "worker" and thus won't register via `add_worker_node`.
629        // Still, for license/RWU enforcement we should include the resources used by the meta node.
630        per_host.insert(
631            hostname(),
632            ClusterResource {
633                total_cpu_cores: total_cpu_available() as _,
634                total_memory_bytes: system_memory_available_bytes() as _,
635            },
636        );
637
638        for info in self.worker_extra_info.values() {
639            let r = per_host
640                .entry(info.resource.hostname.clone())
641                .or_insert_with(ClusterResource::default);
642
643            r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
644            r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
645        }
646
647        // For different hostnames, we sum up the resources.
648        per_host
649            .into_values()
650            .reduce(|a, b| ClusterResource {
651                total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
652                total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
653            })
654            .unwrap_or_default()
655    }
656
657    pub async fn add_worker(
658        &mut self,
659        r#type: PbWorkerType,
660        host_address: HostAddress,
661        add_property: AddNodeProperty,
662        resource: PbResource,
663        ttl: Duration,
664    ) -> MetaResult<WorkerId> {
665        let txn = self.db.begin().await?;
666
667        let worker = Worker::find()
668            .filter(
669                worker::Column::Host
670                    .eq(host_address.host.clone())
671                    .and(worker::Column::Port.eq(host_address.port)),
672            )
673            .find_also_related(WorkerProperty)
674            .one(&txn)
675            .await?;
676        // Worker already exist.
677        if let Some((worker, property)) = worker {
678            assert_eq!(worker.worker_type, r#type.into());
679            return if worker.worker_type == WorkerType::ComputeNode {
680                let property = property.unwrap();
681                let mut current_parallelism = property.parallelism as usize;
682                let new_parallelism = add_property.parallelism as usize;
683                match new_parallelism.cmp(&current_parallelism) {
684                    Ordering::Less => {
685                        if !self.disable_automatic_parallelism_control {
686                            // Handing over to the subsequent recovery loop for a forced reschedule.
687                            tracing::info!(
688                                "worker {} parallelism reduced from {} to {}",
689                                worker.worker_id,
690                                current_parallelism,
691                                new_parallelism
692                            );
693                            current_parallelism = new_parallelism;
694                        } else {
695                            // Warn and keep the original parallelism if the worker registered with a
696                            // smaller parallelism.
697                            tracing::warn!(
698                                "worker {} parallelism is less than current, current is {}, but received {}",
699                                worker.worker_id,
700                                current_parallelism,
701                                new_parallelism
702                            );
703                        }
704                    }
705                    Ordering::Greater => {
706                        tracing::info!(
707                            "worker {} parallelism updated from {} to {}",
708                            worker.worker_id,
709                            current_parallelism,
710                            new_parallelism
711                        );
712                        current_parallelism = new_parallelism;
713                    }
714                    Ordering::Equal => {}
715                }
716                let mut property: worker_property::ActiveModel = property.into();
717
718                // keep `is_unschedulable` unchanged.
719                property.is_streaming = Set(add_property.is_streaming);
720                property.is_serving = Set(add_property.is_serving);
721                property.parallelism = Set(current_parallelism as _);
722                property.resource_group =
723                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
724                        tracing::warn!(
725                            "resource_group is not set for worker {}, fallback to `default`",
726                            worker.worker_id
727                        );
728                        DEFAULT_RESOURCE_GROUP.to_owned()
729                    })));
730
731                WorkerProperty::update(property).exec(&txn).await?;
732                txn.commit().await?;
733                self.update_worker_ttl(worker.worker_id, ttl)?;
734                self.update_resource_and_started_at(worker.worker_id, resource)?;
735                Ok(worker.worker_id)
736            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
737                let worker_property = worker_property::ActiveModel {
738                    worker_id: Set(worker.worker_id),
739                    parallelism: Set(add_property
740                        .parallelism
741                        .try_into()
742                        .expect("invalid parallelism")),
743                    is_streaming: Set(add_property.is_streaming),
744                    is_serving: Set(add_property.is_serving),
745                    is_unschedulable: Set(add_property.is_unschedulable),
746                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
747                    resource_group: Set(None),
748                    is_iceberg_compactor: Set(false),
749                };
750                WorkerProperty::insert(worker_property).exec(&txn).await?;
751                txn.commit().await?;
752                self.update_worker_ttl(worker.worker_id, ttl)?;
753                self.update_resource_and_started_at(worker.worker_id, resource)?;
754                Ok(worker.worker_id)
755            } else if worker.worker_type == WorkerType::Compactor {
756                if let Some(property) = property {
757                    let mut property: worker_property::ActiveModel = property.into();
758                    property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
759                    property.internal_rpc_host_addr =
760                        Set(Some(add_property.internal_rpc_host_addr));
761
762                    WorkerProperty::update(property).exec(&txn).await?;
763                } else {
764                    let property = worker_property::ActiveModel {
765                        worker_id: Set(worker.worker_id),
766                        parallelism: Set(add_property
767                            .parallelism
768                            .try_into()
769                            .expect("invalid parallelism")),
770                        is_streaming: Set(false),
771                        is_serving: Set(false),
772                        is_unschedulable: Set(false),
773                        internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
774                        resource_group: Set(None),
775                        is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
776                    };
777
778                    WorkerProperty::insert(property).exec(&txn).await?;
779                }
780                txn.commit().await?;
781                self.update_worker_ttl(worker.worker_id, ttl)?;
782                self.update_resource_and_started_at(worker.worker_id, resource)?;
783                Ok(worker.worker_id)
784            } else {
785                self.update_worker_ttl(worker.worker_id, ttl)?;
786                self.update_resource_and_started_at(worker.worker_id, resource)?;
787                Ok(worker.worker_id)
788            };
789        }
790
791        let txn_id = self.apply_transaction_id(r#type)?;
792
793        let worker = worker::ActiveModel {
794            worker_id: Default::default(),
795            worker_type: Set(r#type.into()),
796            host: Set(host_address.host.clone()),
797            port: Set(host_address.port),
798            status: Set(WorkerStatus::Starting),
799            transaction_id: Set(txn_id),
800        };
801        let insert_res = Worker::insert(worker).exec(&txn).await?;
802        let worker_id = insert_res.last_insert_id as WorkerId;
803        if r#type == PbWorkerType::ComputeNode
804            || r#type == PbWorkerType::Frontend
805            || r#type == PbWorkerType::Compactor
806        {
807            let (is_serving, is_streaming, is_unschedulable, is_iceberg_compactor, resource_group) =
808                match r#type {
809                    PbWorkerType::ComputeNode => (
810                        add_property.is_serving,
811                        add_property.is_streaming,
812                        add_property.is_unschedulable,
813                        false,
814                        add_property.resource_group.clone(),
815                    ),
816                    PbWorkerType::Frontend => (
817                        add_property.is_serving,
818                        add_property.is_streaming,
819                        add_property.is_unschedulable,
820                        false,
821                        None,
822                    ),
823                    PbWorkerType::Compactor => {
824                        (false, false, false, add_property.is_iceberg_compactor, None)
825                    }
826                    _ => unreachable!(),
827                };
828
829            let property = worker_property::ActiveModel {
830                worker_id: Set(worker_id),
831                parallelism: Set(add_property
832                    .parallelism
833                    .try_into()
834                    .expect("invalid parallelism")),
835                is_streaming: Set(is_streaming),
836                is_serving: Set(is_serving),
837                is_unschedulable: Set(is_unschedulable),
838                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
839                resource_group: Set(resource_group),
840                is_iceberg_compactor: Set(is_iceberg_compactor),
841            };
842            WorkerProperty::insert(property).exec(&txn).await?;
843        }
844
845        txn.commit().await?;
846        if let Some(txn_id) = txn_id {
847            self.available_transactional_ids.retain(|id| *id != txn_id);
848        }
849        let extra_info = WorkerExtraInfo {
850            started_at: Some(timestamp_now_sec()),
851            expire_at: None,
852            resource,
853        };
854        self.worker_extra_info.insert(worker_id, extra_info);
855
856        Ok(worker_id)
857    }
858
859    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
860        let worker = worker::ActiveModel {
861            worker_id: Set(worker_id),
862            status: Set(WorkerStatus::Running),
863            ..Default::default()
864        };
865
866        let worker = worker.update(&self.db).await?;
867        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
868            .one(&self.db)
869            .await?;
870        let extra_info = self.get_extra_info_checked(worker_id)?;
871        Ok(WorkerInfo(worker, worker_property, extra_info).into())
872    }
873
874    pub async fn update_schedulability(
875        &self,
876        worker_ids: Vec<WorkerId>,
877        schedulability: Schedulability,
878    ) -> MetaResult<()> {
879        let is_unschedulable = schedulability == Schedulability::Unschedulable;
880        WorkerProperty::update_many()
881            .col_expr(
882                worker_property::Column::IsUnschedulable,
883                Expr::value(is_unschedulable),
884            )
885            .filter(worker_property::Column::WorkerId.is_in(worker_ids))
886            .exec(&self.db)
887            .await?;
888
889        Ok(())
890    }
891
892    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
893        let worker = Worker::find()
894            .filter(
895                worker::Column::Host
896                    .eq(host_addr.host)
897                    .and(worker::Column::Port.eq(host_addr.port)),
898            )
899            .find_also_related(WorkerProperty)
900            .one(&self.db)
901            .await?;
902        let Some((worker, property)) = worker else {
903            return Err(MetaError::invalid_parameter("worker not found!"));
904        };
905
906        let res = Worker::delete_by_id(worker.worker_id)
907            .exec(&self.db)
908            .await?;
909        if res.rows_affected == 0 {
910            return Err(MetaError::invalid_parameter("worker not found!"));
911        }
912
913        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
914        if let Some(txn_id) = &worker.transaction_id {
915            self.available_transactional_ids.push_back(*txn_id);
916        }
917        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
918
919        Ok(worker)
920    }
921
922    pub fn heartbeat(
923        &mut self,
924        worker_id: WorkerId,
925        ttl: Duration,
926        resource: Option<PbResource>,
927    ) -> MetaResult<()> {
928        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
929            worker_info.update_ttl(ttl);
930            if let Some(resource) = resource {
931                worker_info.resource = resource;
932            }
933            Ok(())
934        } else {
935            Err(MetaError::invalid_worker(worker_id, "worker not found"))
936        }
937    }
938
939    pub async fn list_workers(
940        &self,
941        worker_type: Option<WorkerType>,
942        worker_status: Option<WorkerStatus>,
943    ) -> MetaResult<Vec<PbWorkerNode>> {
944        let mut find = Worker::find();
945        if let Some(worker_type) = worker_type {
946            find = find.filter(worker::Column::WorkerType.eq(worker_type));
947        }
948        if let Some(worker_status) = worker_status {
949            find = find.filter(worker::Column::Status.eq(worker_status));
950        }
951        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
952        Ok(workers
953            .into_iter()
954            .map(|(worker, property)| {
955                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
956                WorkerInfo(worker, property, extra_info).into()
957            })
958            .collect_vec())
959    }
960
961    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
962        let workers = Worker::find()
963            .filter(
964                worker::Column::WorkerType
965                    .eq(WorkerType::ComputeNode)
966                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
967            )
968            .inner_join(WorkerProperty)
969            .select_also(WorkerProperty)
970            .filter(worker_property::Column::IsStreaming.eq(true))
971            .all(&self.db)
972            .await?;
973
974        Ok(workers
975            .into_iter()
976            .map(|(worker, property)| {
977                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
978                WorkerInfo(worker, property, extra_info).into()
979            })
980            .collect_vec())
981    }
982
983    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
984        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
985            .select_only()
986            .column(worker_property::Column::WorkerId)
987            .column(worker_property::Column::Parallelism)
988            .inner_join(Worker)
989            .filter(worker::Column::Status.eq(WorkerStatus::Running))
990            .into_tuple()
991            .all(&self.db)
992            .await?;
993        Ok(worker_parallelisms
994            .into_iter()
995            .flat_map(|(worker_id, parallelism)| {
996                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
997            })
998            .collect_vec())
999    }
1000
1001    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
1002        let workers = Worker::find()
1003            .filter(
1004                worker::Column::WorkerType
1005                    .eq(WorkerType::ComputeNode)
1006                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
1007            )
1008            .inner_join(WorkerProperty)
1009            .select_also(WorkerProperty)
1010            .filter(worker_property::Column::IsServing.eq(true))
1011            .all(&self.db)
1012            .await?;
1013
1014        Ok(workers
1015            .into_iter()
1016            .map(|(worker, property)| {
1017                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
1018                WorkerInfo(worker, property, extra_info).into()
1019            })
1020            .collect_vec())
1021    }
1022
1023    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
1024        let mut streaming_workers = self.list_active_streaming_workers().await?;
1025
1026        let unschedulable_workers: HashSet<_> = streaming_workers
1027            .extract_if(.., |worker| {
1028                worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
1029            })
1030            .map(|w| w.id)
1031            .collect();
1032
1033        let schedulable_workers = streaming_workers
1034            .iter()
1035            .map(|worker| worker.id)
1036            .filter(|id| !unschedulable_workers.contains(id))
1037            .collect();
1038
1039        let active_workers: HashMap<_, _> =
1040            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
1041
1042        Ok(StreamingClusterInfo {
1043            worker_nodes: active_workers,
1044            schedulable_workers,
1045            unschedulable_workers,
1046        })
1047    }
1048
1049    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
1050        let worker = Worker::find_by_id(worker_id)
1051            .find_also_related(WorkerProperty)
1052            .one(&self.db)
1053            .await?;
1054        if worker.is_none() {
1055            return Ok(None);
1056        }
1057        let extra_info = self.get_extra_info_checked(worker_id)?;
1058        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1059    }
1060
1061    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1062        self.worker_extra_info.get(&worker_id).cloned()
1063    }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069
1070    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1071        (0..count)
1072            .map(|i| HostAddress {
1073                host: "localhost".to_owned(),
1074                port: 5000 + i as i32,
1075            })
1076            .collect_vec()
1077    }
1078
1079    #[tokio::test]
1080    async fn test_cluster_controller() -> MetaResult<()> {
1081        let env = MetaSrvEnv::for_test().await;
1082        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1083
1084        let parallelism_num = 4_usize;
1085        let worker_count = 5_usize;
1086        let property = AddNodeProperty {
1087            parallelism: parallelism_num as _,
1088            is_streaming: true,
1089            is_serving: true,
1090            is_unschedulable: false,
1091            ..Default::default()
1092        };
1093        let hosts = mock_worker_hosts_for_test(worker_count);
1094        let mut worker_ids = vec![];
1095        for host in &hosts {
1096            worker_ids.push(
1097                cluster_ctl
1098                    .add_worker(
1099                        PbWorkerType::ComputeNode,
1100                        host.clone(),
1101                        property.clone(),
1102                        PbResource::default(),
1103                    )
1104                    .await?,
1105            );
1106        }
1107
1108        // Since no worker is active, the parallelism should be 0.
1109        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1110
1111        for id in &worker_ids {
1112            cluster_ctl.activate_worker(*id).await?;
1113        }
1114        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1115        assert_eq!(
1116            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1117            worker_count
1118        );
1119        assert_eq!(
1120            cluster_ctl.list_active_streaming_workers().await?.len(),
1121            worker_count
1122        );
1123        assert_eq!(
1124            cluster_ctl.list_active_serving_workers().await?.len(),
1125            worker_count
1126        );
1127        assert_eq!(
1128            cluster_ctl.list_active_worker_slots().await?.len(),
1129            parallelism_num * worker_count
1130        );
1131
1132        // re-register existing worker node with larger parallelism and change its serving mode.
1133        let mut new_property = property.clone();
1134        new_property.parallelism = (parallelism_num * 2) as _;
1135        new_property.is_serving = false;
1136        cluster_ctl
1137            .add_worker(
1138                PbWorkerType::ComputeNode,
1139                hosts[0].clone(),
1140                new_property,
1141                PbResource::default(),
1142            )
1143            .await?;
1144
1145        assert_eq!(
1146            cluster_ctl.list_active_streaming_workers().await?.len(),
1147            worker_count
1148        );
1149        assert_eq!(
1150            cluster_ctl.list_active_serving_workers().await?.len(),
1151            worker_count - 1
1152        );
1153        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1154        assert!(worker_slots.iter().all_unique());
1155        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1156
1157        // delete workers.
1158        for host in hosts {
1159            cluster_ctl.delete_worker(host).await?;
1160        }
1161        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1162        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1163        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1164
1165        Ok(())
1166    }
1167
1168    #[tokio::test]
1169    async fn test_update_schedulability() -> MetaResult<()> {
1170        let env = MetaSrvEnv::for_test().await;
1171        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1172
1173        let host = HostAddress {
1174            host: "localhost".to_owned(),
1175            port: 5001,
1176        };
1177        let mut property = AddNodeProperty {
1178            is_streaming: true,
1179            is_serving: true,
1180            is_unschedulable: false,
1181            parallelism: 4,
1182            ..Default::default()
1183        };
1184        let worker_id = cluster_ctl
1185            .add_worker(
1186                PbWorkerType::ComputeNode,
1187                host.clone(),
1188                property.clone(),
1189                PbResource::default(),
1190            )
1191            .await?;
1192
1193        cluster_ctl.activate_worker(worker_id).await?;
1194        cluster_ctl
1195            .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1196            .await?;
1197
1198        let workers = cluster_ctl.list_active_streaming_workers().await?;
1199        assert_eq!(workers.len(), 1);
1200        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1201
1202        // re-register existing worker node and change its serving mode, the schedulable state should not be changed.
1203        property.is_unschedulable = false;
1204        property.is_serving = false;
1205        let new_worker_id = cluster_ctl
1206            .add_worker(
1207                PbWorkerType::ComputeNode,
1208                host.clone(),
1209                property,
1210                PbResource::default(),
1211            )
1212            .await?;
1213        assert_eq!(worker_id, new_worker_id);
1214
1215        let workers = cluster_ctl.list_active_streaming_workers().await?;
1216        assert_eq!(workers.len(), 1);
1217        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1218
1219        cluster_ctl.delete_worker(host).await?;
1220
1221        Ok(())
1222    }
1223
1224    #[tokio::test]
1225    async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1226        let env = MetaSrvEnv::for_test().await;
1227        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1228
1229        // List all workers should include the synthesized meta node.
1230        let workers = cluster_ctl.list_workers(None, None).await?;
1231        assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1232
1233        // Explicitly listing meta workers should also include it.
1234        let workers = cluster_ctl
1235            .list_workers(Some(WorkerType::Meta), None)
1236            .await?;
1237        assert_eq!(workers.len(), 1);
1238        assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1239
1240        // Listing starting workers should not include meta.
1241        let workers = cluster_ctl
1242            .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1243            .await?;
1244        assert!(workers.is_empty());
1245
1246        Ok(())
1247    }
1248
1249    #[tokio::test]
1250    async fn test_heartbeat_updates_resource() -> MetaResult<()> {
1251        let env = MetaSrvEnv::for_test().await;
1252        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1253
1254        let host = HostAddress {
1255            host: "localhost".to_owned(),
1256            port: 5010,
1257        };
1258        let property = AddNodeProperty {
1259            is_streaming: true,
1260            is_serving: true,
1261            is_unschedulable: false,
1262            parallelism: 4,
1263            ..Default::default()
1264        };
1265
1266        let resource_v1 = PbResource {
1267            rw_version: "rw-v1".to_owned(),
1268            total_memory_bytes: 1024,
1269            total_cpu_cores: 4,
1270            hostname: "host-v1".to_owned(),
1271        };
1272        let worker_id = cluster_ctl
1273            .add_worker(
1274                PbWorkerType::ComputeNode,
1275                host.clone(),
1276                property,
1277                resource_v1,
1278            )
1279            .await?;
1280
1281        let resource_v2 = PbResource {
1282            rw_version: "rw-v2".to_owned(),
1283            total_memory_bytes: 2048,
1284            total_cpu_cores: 8,
1285            hostname: "host-v2".to_owned(),
1286        };
1287        cluster_ctl
1288            .heartbeat(worker_id, Some(resource_v2.clone()))
1289            .await?;
1290
1291        let worker = cluster_ctl
1292            .get_worker_by_id(worker_id)
1293            .await?
1294            .expect("worker should exist");
1295        assert_eq!(
1296            worker.resource.expect("worker resource should exist"),
1297            resource_v2
1298        );
1299
1300        cluster_ctl.delete_worker(host).await?;
1301        Ok(())
1302    }
1303
1304    #[tokio::test]
1305    async fn test_reregister_compute_node_updates_resource() -> MetaResult<()> {
1306        let env = MetaSrvEnv::for_test().await;
1307        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1308
1309        let host = HostAddress {
1310            host: "localhost".to_owned(),
1311            port: 5011,
1312        };
1313        let property = AddNodeProperty {
1314            is_streaming: true,
1315            is_serving: true,
1316            is_unschedulable: false,
1317            parallelism: 4,
1318            ..Default::default()
1319        };
1320
1321        let resource_v1 = PbResource {
1322            rw_version: "rw-v1".to_owned(),
1323            total_memory_bytes: 1024,
1324            total_cpu_cores: 4,
1325            hostname: "host-v1".to_owned(),
1326        };
1327        let worker_id = cluster_ctl
1328            .add_worker(
1329                PbWorkerType::ComputeNode,
1330                host.clone(),
1331                property.clone(),
1332                resource_v1,
1333            )
1334            .await?;
1335
1336        let resource_v2 = PbResource {
1337            rw_version: "rw-v2".to_owned(),
1338            total_memory_bytes: 2048,
1339            total_cpu_cores: 8,
1340            hostname: "host-v2".to_owned(),
1341        };
1342        cluster_ctl
1343            .add_worker(
1344                PbWorkerType::ComputeNode,
1345                host.clone(),
1346                property,
1347                resource_v2.clone(),
1348            )
1349            .await?;
1350
1351        let worker = cluster_ctl
1352            .get_worker_by_id(worker_id)
1353            .await?
1354            .expect("worker should exist");
1355        assert_eq!(
1356            worker.resource.expect("worker resource should exist"),
1357            resource_v2
1358        );
1359
1360        cluster_ctl.delete_worker(host).await?;
1361        Ok(())
1362    }
1363}