risingwave_meta/controller/
cluster.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37    ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45    TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61    env: MetaSrvEnv,
62    max_heartbeat_interval: Duration,
63    inner: RwLock<ClusterControllerInner>,
64    /// Used as timestamp when meta node starts in sec.
65    started_at: u64,
66}
67
68struct WorkerInfo(
69    worker::Model,
70    Option<worker_property::Model>,
71    WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75    fn from(info: WorkerInfo) -> Self {
76        Self {
77            id: info.0.worker_id,
78            r#type: PbWorkerType::from(info.0.worker_type) as _,
79            host: Some(PbHostAddress {
80                host: info.0.host,
81                port: info.0.port,
82            }),
83            state: PbState::from(info.0.status) as _,
84            property: info.1.as_ref().map(|p| PbProperty {
85                is_streaming: p.is_streaming,
86                is_serving: p.is_serving,
87                is_unschedulable: p.is_unschedulable,
88                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89                resource_group: p.resource_group.clone(),
90                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91                is_iceberg_compactor: p.is_iceberg_compactor,
92            }),
93            transactional_id: info.0.transaction_id.map(|id| id as _),
94            resource: Some(info.2.resource),
95            started_at: info.2.started_at,
96        }
97    }
98}
99
100impl ClusterController {
101    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
102        let inner = ClusterControllerInner::new(
103            env.meta_store_ref().conn.clone(),
104            env.opts.disable_automatic_parallelism_control,
105        )
106        .await?;
107        Ok(Self {
108            env,
109            max_heartbeat_interval,
110            inner: RwLock::new(inner),
111            started_at: timestamp_now_sec(),
112        })
113    }
114
115    /// Used in `NotificationService::subscribe`.
116    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
117    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
118        self.inner.read().await
119    }
120
121    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
122        self.inner.read().await.count_worker_by_type().await
123    }
124
125    /// Get the total resource of the cluster.
126    pub async fn cluster_resource(&self) -> ClusterResource {
127        self.inner.read().await.cluster_resource()
128    }
129
130    /// Get the total resource of the cluster, then update license manager and notify all other nodes.
131    async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
132        let resource = self.cluster_resource().await;
133
134        // Update local license manager.
135        LicenseManager::get().update_cluster_resource(resource);
136        // Notify all other nodes.
137        self.env.notification_manager().notify_all_without_version(
138            Operation::Update, // unused
139            Info::ClusterResource(resource),
140        );
141
142        Ok(())
143    }
144
145    /// A worker node will immediately register itself to meta when it bootstraps.
146    /// The meta will assign it with a unique ID and set its state as `Starting`.
147    /// When the worker node is fully ready to serve, it will request meta again
148    /// (via `activate_worker_node`) to set its state to `Running`.
149    pub async fn add_worker(
150        &self,
151        r#type: PbWorkerType,
152        host_address: HostAddress,
153        property: AddNodeProperty,
154        resource: PbResource,
155    ) -> MetaResult<WorkerId> {
156        let worker_id = self
157            .inner
158            .write()
159            .await
160            .add_worker(
161                r#type,
162                host_address,
163                property,
164                resource,
165                self.max_heartbeat_interval,
166            )
167            .await?;
168
169        // Keep license manager in sync with the latest cluster resource.
170        self.update_cluster_resource_for_license().await?;
171
172        Ok(worker_id)
173    }
174
175    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
176        let inner = self.inner.write().await;
177        let worker = inner.activate_worker(worker_id).await?;
178
179        // Notify frontends of new compute node and frontend node.
180        // Always notify because a running worker's property may have been changed.
181        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
182        {
183            self.env
184                .notification_manager()
185                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
186                .await;
187        }
188        self.env
189            .notification_manager()
190            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
191
192        Ok(())
193    }
194
195    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
196        let worker = self.inner.write().await.delete_worker(host_address).await?;
197
198        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
199        {
200            self.env
201                .notification_manager()
202                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
203                .await;
204        }
205
206        // Keep license manager in sync with the latest cluster resource.
207        self.update_cluster_resource_for_license().await?;
208
209        // Notify local subscribers.
210        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
211        // local notification.
212        self.env
213            .notification_manager()
214            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
215
216        Ok(worker)
217    }
218
219    pub async fn update_schedulability(
220        &self,
221        worker_ids: Vec<WorkerId>,
222        schedulability: Schedulability,
223    ) -> MetaResult<()> {
224        self.inner
225            .write()
226            .await
227            .update_schedulability(worker_ids, schedulability)
228            .await
229    }
230
231    /// Invoked when it receives a heartbeat from a worker node.
232    pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
233        tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
234        self.inner
235            .write()
236            .await
237            .heartbeat(worker_id, self.max_heartbeat_interval)
238    }
239
240    pub fn start_heartbeat_checker(
241        cluster_controller: ClusterControllerRef,
242        check_interval: Duration,
243    ) -> (JoinHandle<()>, Sender<()>) {
244        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
245        let join_handle = tokio::spawn(async move {
246            let mut min_interval = tokio::time::interval(check_interval);
247            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
248            loop {
249                tokio::select! {
250                    // Wait for interval
251                    _ = min_interval.tick() => {},
252                    // Shutdown
253                    _ = &mut shutdown_rx => {
254                        tracing::info!("Heartbeat checker is stopped");
255                        return;
256                    }
257                }
258
259                let mut inner = cluster_controller.inner.write().await;
260                // 1. Initialize new workers' TTL.
261                for worker in inner
262                    .worker_extra_info
263                    .values_mut()
264                    .filter(|worker| worker.expire_at.is_none())
265                {
266                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
267                }
268
269                // 2. Collect expired workers.
270                let now = timestamp_now_sec();
271                let worker_to_delete = inner
272                    .worker_extra_info
273                    .iter()
274                    .filter(|(_, info)| info.expire_at.unwrap() < now)
275                    .map(|(id, _)| *id)
276                    .collect_vec();
277
278                // 3. Delete expired workers.
279                let worker_infos = match Worker::find()
280                    .select_only()
281                    .column(worker::Column::WorkerId)
282                    .column(worker::Column::WorkerType)
283                    .column(worker::Column::Host)
284                    .column(worker::Column::Port)
285                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
286                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
287                    .all(&inner.db)
288                    .await
289                {
290                    Ok(keys) => keys,
291                    Err(err) => {
292                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
293                        continue;
294                    }
295                };
296                drop(inner);
297
298                for (worker_id, worker_type, host, port) in worker_infos {
299                    let host_addr = PbHostAddress { host, port };
300                    match cluster_controller.delete_worker(host_addr.clone()).await {
301                        Ok(_) => {
302                            tracing::warn!(
303                                %worker_id,
304                                ?host_addr,
305                                %now,
306                                "Deleted expired worker"
307                            );
308                            match worker_type {
309                                WorkerType::Frontend
310                                | WorkerType::ComputeNode
311                                | WorkerType::Compactor
312                                | WorkerType::RiseCtl => cluster_controller
313                                    .env
314                                    .notification_manager()
315                                    .delete_sender(worker_type.into(), WorkerKey(host_addr)),
316                                _ => {}
317                            };
318                        }
319                        Err(err) => {
320                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
321                        }
322                    }
323                }
324            }
325        });
326
327        (join_handle, shutdown_tx)
328    }
329
330    /// Get live nodes with the specified type and state.
331    /// # Arguments
332    /// * `worker_type` `WorkerType` of the nodes
333    /// * `worker_state` Filter by this state if it is not None.
334    pub async fn list_workers(
335        &self,
336        worker_type: Option<WorkerType>,
337        worker_status: Option<WorkerStatus>,
338    ) -> MetaResult<Vec<PbWorkerNode>> {
339        let mut workers = vec![];
340        // Meta node is not stored in the cluster manager DB, so we synthesize it here.
341        // Include it when listing all workers, or when explicitly listing meta nodes.
342        let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
343        // Meta node is always "running" once the service is up.
344        let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
345        if include_meta {
346            workers.push(meta_node_info(
347                &self.env.opts.advertise_addr,
348                Some(self.started_at),
349            ));
350        }
351        workers.extend(
352            self.inner
353                .read()
354                .await
355                .list_workers(worker_type, worker_status)
356                .await?,
357        );
358        Ok(workers)
359    }
360
361    pub(crate) async fn subscribe_active_streaming_compute_nodes(
362        &self,
363    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
364        let inner = self.inner.read().await;
365        let worker_nodes = inner.list_active_streaming_workers().await?;
366        let (tx, rx) = unbounded_channel();
367
368        // insert before release the read lock to ensure that we don't lose any update in between
369        self.env.notification_manager().insert_local_sender(tx);
370        drop(inner);
371        Ok((worker_nodes, rx))
372    }
373
374    /// A convenient method to get all running compute nodes that may have running actors on them
375    /// i.e. CNs which are running
376    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
377        self.inner
378            .read()
379            .await
380            .list_active_streaming_workers()
381            .await
382    }
383
384    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
385        self.inner.read().await.list_active_worker_slots().await
386    }
387
388    /// Get the cluster info used for scheduling a streaming job, containing all nodes that are
389    /// running and schedulable
390    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
391        self.inner.read().await.list_active_serving_workers().await
392    }
393
394    /// Get the cluster info used for scheduling a streaming job.
395    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
396        self.inner.read().await.get_streaming_cluster_info().await
397    }
398
399    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
400        self.inner.read().await.get_worker_by_id(worker_id).await
401    }
402
403    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
404        self.inner
405            .read()
406            .await
407            .get_worker_extra_info_by_id(worker_id)
408    }
409
410    pub fn cluster_id(&self) -> &ClusterId {
411        self.env.cluster_id()
412    }
413
414    pub fn meta_store_endpoint(&self) -> String {
415        self.env.meta_store_ref().endpoint.clone()
416    }
417}
418
419/// The cluster info used for scheduling a streaming job.
420#[derive(Debug, Clone)]
421pub struct StreamingClusterInfo {
422    /// All **active** compute nodes in the cluster.
423    pub worker_nodes: HashMap<WorkerId, WorkerNode>,
424
425    /// All schedulable compute nodes in the cluster. Normally for resource group based scheduling.
426    pub schedulable_workers: HashSet<WorkerId>,
427
428    /// All unschedulable compute nodes in the cluster.
429    pub unschedulable_workers: HashSet<WorkerId>,
430}
431
432// Encapsulating the use of parallelism
433impl StreamingClusterInfo {
434    pub fn parallelism(&self, resource_group: &str) -> usize {
435        let available_worker_ids =
436            filter_workers_by_resource_group(&self.worker_nodes, resource_group);
437
438        self.worker_nodes
439            .values()
440            .filter(|worker| available_worker_ids.contains(&(worker.id)))
441            .map(|worker| worker.compute_node_parallelism())
442            .sum()
443    }
444
445    pub fn filter_schedulable_workers_by_resource_group(
446        &self,
447        resource_group: &str,
448    ) -> HashMap<WorkerId, WorkerNode> {
449        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
450        self.worker_nodes
451            .iter()
452            .filter(|(id, _)| worker_ids.contains(*id))
453            .map(|(id, worker)| (*id, worker.clone()))
454            .collect()
455    }
456}
457
458#[derive(Default, Clone, Debug)]
459pub struct WorkerExtraInfo {
460    // Volatile values updated by meta node as follows.
461    //
462    // Unix timestamp that the worker will expire at.
463    expire_at: Option<u64>,
464    started_at: Option<u64>,
465    resource: PbResource,
466}
467
468impl WorkerExtraInfo {
469    fn update_ttl(&mut self, ttl: Duration) {
470        let expire = cmp::max(
471            self.expire_at.unwrap_or_default(),
472            SystemTime::now()
473                .add(ttl)
474                .duration_since(SystemTime::UNIX_EPOCH)
475                .expect("Clock may have gone backwards")
476                .as_secs(),
477        );
478        self.expire_at = Some(expire);
479    }
480
481    fn update_started_at(&mut self) {
482        self.started_at = Some(timestamp_now_sec());
483    }
484}
485
486fn timestamp_now_sec() -> u64 {
487    SystemTime::now()
488        .duration_since(SystemTime::UNIX_EPOCH)
489        .expect("Clock may have gone backwards")
490        .as_secs()
491}
492
493fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
494    PbWorkerNode {
495        id: META_NODE_ID,
496        r#type: PbWorkerType::Meta.into(),
497        host: HostAddr::try_from(host)
498            .as_ref()
499            .map(HostAddr::to_protobuf)
500            .ok(),
501        state: PbState::Running as _,
502        property: None,
503        transactional_id: None,
504        resource: Some(risingwave_pb::common::worker_node::Resource {
505            rw_version: RW_VERSION.to_owned(),
506            total_memory_bytes: system_memory_available_bytes() as _,
507            total_cpu_cores: total_cpu_available() as _,
508            hostname: hostname(),
509        }),
510        started_at,
511    }
512}
513
514pub struct ClusterControllerInner {
515    db: DatabaseConnection,
516    /// Record for tracking available machine ids, one is available.
517    available_transactional_ids: VecDeque<TransactionId>,
518    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
519    disable_automatic_parallelism_control: bool,
520}
521
522impl ClusterControllerInner {
523    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
524    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
525
526    pub async fn new(
527        db: DatabaseConnection,
528        disable_automatic_parallelism_control: bool,
529    ) -> MetaResult<Self> {
530        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
531            .select_only()
532            .column(worker::Column::WorkerId)
533            .column(worker::Column::TransactionId)
534            .into_tuple()
535            .all(&db)
536            .await?;
537        let inuse_txn_ids: HashSet<_> = workers
538            .iter()
539            .cloned()
540            .filter_map(|(_, txn_id)| txn_id)
541            .collect();
542        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
543            .filter(|id| !inuse_txn_ids.contains(id))
544            .collect();
545
546        let worker_extra_info = workers
547            .into_iter()
548            .map(|(w, _)| (w, WorkerExtraInfo::default()))
549            .collect();
550
551        Ok(Self {
552            db,
553            available_transactional_ids,
554            worker_extra_info,
555            disable_automatic_parallelism_control,
556        })
557    }
558
559    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
560        let workers: Vec<(WorkerType, i64)> = Worker::find()
561            .select_only()
562            .column(worker::Column::WorkerType)
563            .column_as(worker::Column::WorkerId.count(), "count")
564            .group_by(worker::Column::WorkerType)
565            .into_tuple()
566            .all(&self.db)
567            .await?;
568
569        Ok(workers.into_iter().collect())
570    }
571
572    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
573        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
574            let expire = cmp::max(
575                info.expire_at.unwrap_or_default(),
576                SystemTime::now()
577                    .add(ttl)
578                    .duration_since(SystemTime::UNIX_EPOCH)
579                    .expect("Clock may have gone backwards")
580                    .as_secs(),
581            );
582            info.expire_at = Some(expire);
583            Ok(())
584        } else {
585            Err(MetaError::invalid_worker(worker_id, "worker not found"))
586        }
587    }
588
589    fn update_resource_and_started_at(
590        &mut self,
591        worker_id: WorkerId,
592        resource: PbResource,
593    ) -> MetaResult<()> {
594        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
595            info.resource = resource;
596            info.update_started_at();
597            Ok(())
598        } else {
599            Err(MetaError::invalid_worker(worker_id, "worker not found"))
600        }
601    }
602
603    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
604        self.worker_extra_info
605            .get(&worker_id)
606            .cloned()
607            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
608    }
609
610    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
611        match (self.available_transactional_ids.front(), r#type) {
612            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
613            // We only assign transactional id to compute node and frontend.
614            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
615            _ => Ok(None),
616        }
617    }
618
619    /// Get the total resource of the cluster.
620    fn cluster_resource(&self) -> ClusterResource {
621        // For each hostname, we only consider the maximum resource, in case a host has multiple nodes.
622        let mut per_host = HashMap::new();
623
624        // Note: Meta node itself is not a "worker" and thus won't register via `add_worker_node`.
625        // Still, for license/RWU enforcement we should include the resources used by the meta node.
626        per_host.insert(
627            hostname(),
628            ClusterResource {
629                total_cpu_cores: total_cpu_available() as _,
630                total_memory_bytes: system_memory_available_bytes() as _,
631            },
632        );
633
634        for info in self.worker_extra_info.values() {
635            let r = per_host
636                .entry(info.resource.hostname.clone())
637                .or_insert_with(ClusterResource::default);
638
639            r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
640            r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
641        }
642
643        // For different hostnames, we sum up the resources.
644        per_host
645            .into_values()
646            .reduce(|a, b| ClusterResource {
647                total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
648                total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
649            })
650            .unwrap_or_default()
651    }
652
653    pub async fn add_worker(
654        &mut self,
655        r#type: PbWorkerType,
656        host_address: HostAddress,
657        add_property: AddNodeProperty,
658        resource: PbResource,
659        ttl: Duration,
660    ) -> MetaResult<WorkerId> {
661        let txn = self.db.begin().await?;
662
663        let worker = Worker::find()
664            .filter(
665                worker::Column::Host
666                    .eq(host_address.host.clone())
667                    .and(worker::Column::Port.eq(host_address.port)),
668            )
669            .find_also_related(WorkerProperty)
670            .one(&txn)
671            .await?;
672        // Worker already exist.
673        if let Some((worker, property)) = worker {
674            assert_eq!(worker.worker_type, r#type.into());
675            return if worker.worker_type == WorkerType::ComputeNode {
676                let property = property.unwrap();
677                let mut current_parallelism = property.parallelism as usize;
678                let new_parallelism = add_property.parallelism as usize;
679                match new_parallelism.cmp(&current_parallelism) {
680                    Ordering::Less => {
681                        if !self.disable_automatic_parallelism_control {
682                            // Handing over to the subsequent recovery loop for a forced reschedule.
683                            tracing::info!(
684                                "worker {} parallelism reduced from {} to {}",
685                                worker.worker_id,
686                                current_parallelism,
687                                new_parallelism
688                            );
689                            current_parallelism = new_parallelism;
690                        } else {
691                            // Warn and keep the original parallelism if the worker registered with a
692                            // smaller parallelism.
693                            tracing::warn!(
694                                "worker {} parallelism is less than current, current is {}, but received {}",
695                                worker.worker_id,
696                                current_parallelism,
697                                new_parallelism
698                            );
699                        }
700                    }
701                    Ordering::Greater => {
702                        tracing::info!(
703                            "worker {} parallelism updated from {} to {}",
704                            worker.worker_id,
705                            current_parallelism,
706                            new_parallelism
707                        );
708                        current_parallelism = new_parallelism;
709                    }
710                    Ordering::Equal => {}
711                }
712                let mut property: worker_property::ActiveModel = property.into();
713
714                // keep `is_unschedulable` unchanged.
715                property.is_streaming = Set(add_property.is_streaming);
716                property.is_serving = Set(add_property.is_serving);
717                property.parallelism = Set(current_parallelism as _);
718                property.resource_group =
719                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
720                        tracing::warn!(
721                            "resource_group is not set for worker {}, fallback to `default`",
722                            worker.worker_id
723                        );
724                        DEFAULT_RESOURCE_GROUP.to_owned()
725                    })));
726
727                WorkerProperty::update(property).exec(&txn).await?;
728                txn.commit().await?;
729                self.update_worker_ttl(worker.worker_id, ttl)?;
730                self.update_resource_and_started_at(worker.worker_id, resource)?;
731                Ok(worker.worker_id)
732            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
733                let worker_property = worker_property::ActiveModel {
734                    worker_id: Set(worker.worker_id),
735                    parallelism: Set(add_property
736                        .parallelism
737                        .try_into()
738                        .expect("invalid parallelism")),
739                    is_streaming: Set(add_property.is_streaming),
740                    is_serving: Set(add_property.is_serving),
741                    is_unschedulable: Set(add_property.is_unschedulable),
742                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
743                    resource_group: Set(None),
744                    is_iceberg_compactor: Set(false),
745                };
746                WorkerProperty::insert(worker_property).exec(&txn).await?;
747                txn.commit().await?;
748                self.update_worker_ttl(worker.worker_id, ttl)?;
749                self.update_resource_and_started_at(worker.worker_id, resource)?;
750                Ok(worker.worker_id)
751            } else if worker.worker_type == WorkerType::Compactor {
752                if let Some(property) = property {
753                    let mut property: worker_property::ActiveModel = property.into();
754                    property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
755                    property.internal_rpc_host_addr =
756                        Set(Some(add_property.internal_rpc_host_addr));
757
758                    WorkerProperty::update(property).exec(&txn).await?;
759                } else {
760                    let property = worker_property::ActiveModel {
761                        worker_id: Set(worker.worker_id),
762                        parallelism: Set(add_property
763                            .parallelism
764                            .try_into()
765                            .expect("invalid parallelism")),
766                        is_streaming: Set(false),
767                        is_serving: Set(false),
768                        is_unschedulable: Set(false),
769                        internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
770                        resource_group: Set(None),
771                        is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
772                    };
773
774                    WorkerProperty::insert(property).exec(&txn).await?;
775                }
776                txn.commit().await?;
777                self.update_worker_ttl(worker.worker_id, ttl)?;
778                self.update_resource_and_started_at(worker.worker_id, resource)?;
779                Ok(worker.worker_id)
780            } else {
781                self.update_worker_ttl(worker.worker_id, ttl)?;
782                self.update_resource_and_started_at(worker.worker_id, resource)?;
783                Ok(worker.worker_id)
784            };
785        }
786
787        let txn_id = self.apply_transaction_id(r#type)?;
788
789        let worker = worker::ActiveModel {
790            worker_id: Default::default(),
791            worker_type: Set(r#type.into()),
792            host: Set(host_address.host.clone()),
793            port: Set(host_address.port),
794            status: Set(WorkerStatus::Starting),
795            transaction_id: Set(txn_id),
796        };
797        let insert_res = Worker::insert(worker).exec(&txn).await?;
798        let worker_id = insert_res.last_insert_id as WorkerId;
799        if r#type == PbWorkerType::ComputeNode
800            || r#type == PbWorkerType::Frontend
801            || r#type == PbWorkerType::Compactor
802        {
803            let (is_serving, is_streaming, is_unschedulable, is_iceberg_compactor, resource_group) =
804                match r#type {
805                    PbWorkerType::ComputeNode => (
806                        add_property.is_serving,
807                        add_property.is_streaming,
808                        add_property.is_unschedulable,
809                        false,
810                        add_property.resource_group.clone(),
811                    ),
812                    PbWorkerType::Frontend => (
813                        add_property.is_serving,
814                        add_property.is_streaming,
815                        add_property.is_unschedulable,
816                        false,
817                        None,
818                    ),
819                    PbWorkerType::Compactor => {
820                        (false, false, false, add_property.is_iceberg_compactor, None)
821                    }
822                    _ => unreachable!(),
823                };
824
825            let property = worker_property::ActiveModel {
826                worker_id: Set(worker_id),
827                parallelism: Set(add_property
828                    .parallelism
829                    .try_into()
830                    .expect("invalid parallelism")),
831                is_streaming: Set(is_streaming),
832                is_serving: Set(is_serving),
833                is_unschedulable: Set(is_unschedulable),
834                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
835                resource_group: Set(resource_group),
836                is_iceberg_compactor: Set(is_iceberg_compactor),
837            };
838            WorkerProperty::insert(property).exec(&txn).await?;
839        }
840
841        txn.commit().await?;
842        if let Some(txn_id) = txn_id {
843            self.available_transactional_ids.retain(|id| *id != txn_id);
844        }
845        let extra_info = WorkerExtraInfo {
846            started_at: Some(timestamp_now_sec()),
847            expire_at: None,
848            resource,
849        };
850        self.worker_extra_info.insert(worker_id, extra_info);
851
852        Ok(worker_id)
853    }
854
855    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
856        let worker = worker::ActiveModel {
857            worker_id: Set(worker_id),
858            status: Set(WorkerStatus::Running),
859            ..Default::default()
860        };
861
862        let worker = worker.update(&self.db).await?;
863        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
864            .one(&self.db)
865            .await?;
866        let extra_info = self.get_extra_info_checked(worker_id)?;
867        Ok(WorkerInfo(worker, worker_property, extra_info).into())
868    }
869
870    pub async fn update_schedulability(
871        &self,
872        worker_ids: Vec<WorkerId>,
873        schedulability: Schedulability,
874    ) -> MetaResult<()> {
875        let is_unschedulable = schedulability == Schedulability::Unschedulable;
876        WorkerProperty::update_many()
877            .col_expr(
878                worker_property::Column::IsUnschedulable,
879                Expr::value(is_unschedulable),
880            )
881            .filter(worker_property::Column::WorkerId.is_in(worker_ids))
882            .exec(&self.db)
883            .await?;
884
885        Ok(())
886    }
887
888    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
889        let worker = Worker::find()
890            .filter(
891                worker::Column::Host
892                    .eq(host_addr.host)
893                    .and(worker::Column::Port.eq(host_addr.port)),
894            )
895            .find_also_related(WorkerProperty)
896            .one(&self.db)
897            .await?;
898        let Some((worker, property)) = worker else {
899            return Err(MetaError::invalid_parameter("worker not found!"));
900        };
901
902        let res = Worker::delete_by_id(worker.worker_id)
903            .exec(&self.db)
904            .await?;
905        if res.rows_affected == 0 {
906            return Err(MetaError::invalid_parameter("worker not found!"));
907        }
908
909        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
910        if let Some(txn_id) = &worker.transaction_id {
911            self.available_transactional_ids.push_back(*txn_id);
912        }
913        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
914
915        Ok(worker)
916    }
917
918    pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
919        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
920            worker_info.update_ttl(ttl);
921            Ok(())
922        } else {
923            Err(MetaError::invalid_worker(worker_id, "worker not found"))
924        }
925    }
926
927    pub async fn list_workers(
928        &self,
929        worker_type: Option<WorkerType>,
930        worker_status: Option<WorkerStatus>,
931    ) -> MetaResult<Vec<PbWorkerNode>> {
932        let mut find = Worker::find();
933        if let Some(worker_type) = worker_type {
934            find = find.filter(worker::Column::WorkerType.eq(worker_type));
935        }
936        if let Some(worker_status) = worker_status {
937            find = find.filter(worker::Column::Status.eq(worker_status));
938        }
939        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
940        Ok(workers
941            .into_iter()
942            .map(|(worker, property)| {
943                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
944                WorkerInfo(worker, property, extra_info).into()
945            })
946            .collect_vec())
947    }
948
949    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
950        let workers = Worker::find()
951            .filter(
952                worker::Column::WorkerType
953                    .eq(WorkerType::ComputeNode)
954                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
955            )
956            .inner_join(WorkerProperty)
957            .select_also(WorkerProperty)
958            .filter(worker_property::Column::IsStreaming.eq(true))
959            .all(&self.db)
960            .await?;
961
962        Ok(workers
963            .into_iter()
964            .map(|(worker, property)| {
965                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
966                WorkerInfo(worker, property, extra_info).into()
967            })
968            .collect_vec())
969    }
970
971    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
972        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
973            .select_only()
974            .column(worker_property::Column::WorkerId)
975            .column(worker_property::Column::Parallelism)
976            .inner_join(Worker)
977            .filter(worker::Column::Status.eq(WorkerStatus::Running))
978            .into_tuple()
979            .all(&self.db)
980            .await?;
981        Ok(worker_parallelisms
982            .into_iter()
983            .flat_map(|(worker_id, parallelism)| {
984                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
985            })
986            .collect_vec())
987    }
988
989    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
990        let workers = Worker::find()
991            .filter(
992                worker::Column::WorkerType
993                    .eq(WorkerType::ComputeNode)
994                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
995            )
996            .inner_join(WorkerProperty)
997            .select_also(WorkerProperty)
998            .filter(worker_property::Column::IsServing.eq(true))
999            .all(&self.db)
1000            .await?;
1001
1002        Ok(workers
1003            .into_iter()
1004            .map(|(worker, property)| {
1005                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
1006                WorkerInfo(worker, property, extra_info).into()
1007            })
1008            .collect_vec())
1009    }
1010
1011    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
1012        let mut streaming_workers = self.list_active_streaming_workers().await?;
1013
1014        let unschedulable_workers: HashSet<_> = streaming_workers
1015            .extract_if(.., |worker| {
1016                worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
1017            })
1018            .map(|w| w.id)
1019            .collect();
1020
1021        let schedulable_workers = streaming_workers
1022            .iter()
1023            .map(|worker| worker.id)
1024            .filter(|id| !unschedulable_workers.contains(id))
1025            .collect();
1026
1027        let active_workers: HashMap<_, _> =
1028            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
1029
1030        Ok(StreamingClusterInfo {
1031            worker_nodes: active_workers,
1032            schedulable_workers,
1033            unschedulable_workers,
1034        })
1035    }
1036
1037    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
1038        let worker = Worker::find_by_id(worker_id)
1039            .find_also_related(WorkerProperty)
1040            .one(&self.db)
1041            .await?;
1042        if worker.is_none() {
1043            return Ok(None);
1044        }
1045        let extra_info = self.get_extra_info_checked(worker_id)?;
1046        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1047    }
1048
1049    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1050        self.worker_extra_info.get(&worker_id).cloned()
1051    }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    use super::*;
1057
1058    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1059        (0..count)
1060            .map(|i| HostAddress {
1061                host: "localhost".to_owned(),
1062                port: 5000 + i as i32,
1063            })
1064            .collect_vec()
1065    }
1066
1067    #[tokio::test]
1068    async fn test_cluster_controller() -> MetaResult<()> {
1069        let env = MetaSrvEnv::for_test().await;
1070        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1071
1072        let parallelism_num = 4_usize;
1073        let worker_count = 5_usize;
1074        let property = AddNodeProperty {
1075            parallelism: parallelism_num as _,
1076            is_streaming: true,
1077            is_serving: true,
1078            is_unschedulable: false,
1079            ..Default::default()
1080        };
1081        let hosts = mock_worker_hosts_for_test(worker_count);
1082        let mut worker_ids = vec![];
1083        for host in &hosts {
1084            worker_ids.push(
1085                cluster_ctl
1086                    .add_worker(
1087                        PbWorkerType::ComputeNode,
1088                        host.clone(),
1089                        property.clone(),
1090                        PbResource::default(),
1091                    )
1092                    .await?,
1093            );
1094        }
1095
1096        // Since no worker is active, the parallelism should be 0.
1097        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1098
1099        for id in &worker_ids {
1100            cluster_ctl.activate_worker(*id).await?;
1101        }
1102        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1103        assert_eq!(
1104            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1105            worker_count
1106        );
1107        assert_eq!(
1108            cluster_ctl.list_active_streaming_workers().await?.len(),
1109            worker_count
1110        );
1111        assert_eq!(
1112            cluster_ctl.list_active_serving_workers().await?.len(),
1113            worker_count
1114        );
1115        assert_eq!(
1116            cluster_ctl.list_active_worker_slots().await?.len(),
1117            parallelism_num * worker_count
1118        );
1119
1120        // re-register existing worker node with larger parallelism and change its serving mode.
1121        let mut new_property = property.clone();
1122        new_property.parallelism = (parallelism_num * 2) as _;
1123        new_property.is_serving = false;
1124        cluster_ctl
1125            .add_worker(
1126                PbWorkerType::ComputeNode,
1127                hosts[0].clone(),
1128                new_property,
1129                PbResource::default(),
1130            )
1131            .await?;
1132
1133        assert_eq!(
1134            cluster_ctl.list_active_streaming_workers().await?.len(),
1135            worker_count
1136        );
1137        assert_eq!(
1138            cluster_ctl.list_active_serving_workers().await?.len(),
1139            worker_count - 1
1140        );
1141        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1142        assert!(worker_slots.iter().all_unique());
1143        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1144
1145        // delete workers.
1146        for host in hosts {
1147            cluster_ctl.delete_worker(host).await?;
1148        }
1149        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1150        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1151        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1152
1153        Ok(())
1154    }
1155
1156    #[tokio::test]
1157    async fn test_update_schedulability() -> MetaResult<()> {
1158        let env = MetaSrvEnv::for_test().await;
1159        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1160
1161        let host = HostAddress {
1162            host: "localhost".to_owned(),
1163            port: 5001,
1164        };
1165        let mut property = AddNodeProperty {
1166            is_streaming: true,
1167            is_serving: true,
1168            is_unschedulable: false,
1169            parallelism: 4,
1170            ..Default::default()
1171        };
1172        let worker_id = cluster_ctl
1173            .add_worker(
1174                PbWorkerType::ComputeNode,
1175                host.clone(),
1176                property.clone(),
1177                PbResource::default(),
1178            )
1179            .await?;
1180
1181        cluster_ctl.activate_worker(worker_id).await?;
1182        cluster_ctl
1183            .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1184            .await?;
1185
1186        let workers = cluster_ctl.list_active_streaming_workers().await?;
1187        assert_eq!(workers.len(), 1);
1188        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1189
1190        // re-register existing worker node and change its serving mode, the schedulable state should not be changed.
1191        property.is_unschedulable = false;
1192        property.is_serving = false;
1193        let new_worker_id = cluster_ctl
1194            .add_worker(
1195                PbWorkerType::ComputeNode,
1196                host.clone(),
1197                property,
1198                PbResource::default(),
1199            )
1200            .await?;
1201        assert_eq!(worker_id, new_worker_id);
1202
1203        let workers = cluster_ctl.list_active_streaming_workers().await?;
1204        assert_eq!(workers.len(), 1);
1205        assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1206
1207        cluster_ctl.delete_worker(host).await?;
1208
1209        Ok(())
1210    }
1211
1212    #[tokio::test]
1213    async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1214        let env = MetaSrvEnv::for_test().await;
1215        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1216
1217        // List all workers should include the synthesized meta node.
1218        let workers = cluster_ctl.list_workers(None, None).await?;
1219        assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1220
1221        // Explicitly listing meta workers should also include it.
1222        let workers = cluster_ctl
1223            .list_workers(Some(WorkerType::Meta), None)
1224            .await?;
1225        assert_eq!(workers.len(), 1);
1226        assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1227
1228        // Listing starting workers should not include meta.
1229        let workers = cluster_ctl
1230            .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1231            .await?;
1232        assert!(workers.is_empty());
1233
1234        Ok(())
1235    }
1236}