Skip to main content

risingwave_meta/controller/
cluster.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34    PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37    ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use sea_orm::ActiveValue::Set;
41use sea_orm::{
42    ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43    TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59    env: MetaSrvEnv,
60    max_heartbeat_interval: Duration,
61    inner: RwLock<ClusterControllerInner>,
62    /// Used as timestamp when meta node starts in sec.
63    started_at: u64,
64}
65
66struct WorkerInfo(
67    worker::Model,
68    Option<worker_property::Model>,
69    WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73    fn from(info: WorkerInfo) -> Self {
74        Self {
75            id: info.0.worker_id,
76            r#type: PbWorkerType::from(info.0.worker_type) as _,
77            host: Some(PbHostAddress {
78                host: info.0.host,
79                port: info.0.port,
80            }),
81            state: PbState::from(info.0.status) as _,
82            property: info.1.as_ref().map(|p| PbProperty {
83                is_streaming: p.is_streaming,
84                is_serving: p.is_serving,
85                internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
86                resource_group: p.resource_group.clone(),
87                parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
88                is_iceberg_compactor: p.is_iceberg_compactor,
89            }),
90            transactional_id: info.0.transaction_id.map(|id| id as _),
91            resource: Some(info.2.resource),
92            started_at: info.2.started_at,
93        }
94    }
95}
96
97impl ClusterController {
98    pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99        let inner = ClusterControllerInner::new(
100            env.meta_store_ref().conn.clone(),
101            env.opts.disable_automatic_parallelism_control,
102        )
103        .await?;
104        Ok(Self {
105            env,
106            max_heartbeat_interval,
107            inner: RwLock::new(inner),
108            started_at: timestamp_now_sec(),
109        })
110    }
111
112    /// Used in `NotificationService::subscribe`.
113    /// Need to pay attention to the order of acquiring locks to prevent deadlock problems.
114    pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115        self.inner.read().await
116    }
117
118    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119        self.inner.read().await.count_worker_by_type().await
120    }
121
122    /// Get the total resource of the cluster.
123    pub async fn cluster_resource(&self) -> ClusterResource {
124        self.inner.read().await.cluster_resource()
125    }
126
127    /// Get the total resource of the cluster, then update license manager and notify all other nodes.
128    async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
129        let resource = self.cluster_resource().await;
130
131        // Update local license manager.
132        LicenseManager::get().update_cluster_resource(resource);
133        // Notify all other nodes.
134        self.env.notification_manager().notify_all_without_version(
135            Operation::Update, // unused
136            Info::ClusterResource(resource),
137        );
138
139        Ok(())
140    }
141
142    /// A worker node will immediately register itself to meta when it bootstraps.
143    /// The meta will assign it with a unique ID and set its state as `Starting`.
144    /// When the worker node is fully ready to serve, it will request meta again
145    /// (via `activate_worker_node`) to set its state to `Running`.
146    pub async fn add_worker(
147        &self,
148        r#type: PbWorkerType,
149        host_address: HostAddress,
150        property: AddNodeProperty,
151        resource: PbResource,
152    ) -> MetaResult<WorkerId> {
153        let worker_id = self
154            .inner
155            .write()
156            .await
157            .add_worker(
158                r#type,
159                host_address,
160                property,
161                resource,
162                self.max_heartbeat_interval,
163            )
164            .await?;
165
166        // Keep license manager in sync with the latest cluster resource.
167        self.update_cluster_resource_for_license().await?;
168
169        Ok(worker_id)
170    }
171
172    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
173        let inner = self.inner.write().await;
174        let worker = inner.activate_worker(worker_id).await?;
175
176        // Notify frontends of new compute node and frontend node.
177        // Always notify because a running worker's property may have been changed.
178        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
179        {
180            self.env
181                .notification_manager()
182                .notify_frontend(Operation::Add, Info::Node(worker.clone()))
183                .await;
184        }
185        self.env
186            .notification_manager()
187            .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
188
189        Ok(())
190    }
191
192    pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
193        let worker = self.inner.write().await.delete_worker(host_address).await?;
194
195        if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
196        {
197            self.env
198                .notification_manager()
199                .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
200                .await;
201        }
202
203        // Keep license manager in sync with the latest cluster resource.
204        self.update_cluster_resource_for_license().await?;
205
206        // Notify local subscribers.
207        // Note: Any type of workers may pin some hummock resource. So `HummockManager` expect this
208        // local notification.
209        self.env
210            .notification_manager()
211            .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
212
213        Ok(worker)
214    }
215
216    /// Invoked when it receives a heartbeat from a worker node.
217    pub async fn heartbeat(
218        &self,
219        worker_id: WorkerId,
220        resource: Option<PbResource>,
221    ) -> MetaResult<()> {
222        tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
223        self.inner
224            .write()
225            .await
226            .heartbeat(worker_id, self.max_heartbeat_interval, resource)
227    }
228
229    pub fn start_heartbeat_checker(
230        cluster_controller: ClusterControllerRef,
231        check_interval: Duration,
232    ) -> (JoinHandle<()>, Sender<()>) {
233        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
234        let join_handle = tokio::spawn(async move {
235            let mut min_interval = tokio::time::interval(check_interval);
236            min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
237            loop {
238                tokio::select! {
239                    // Wait for interval
240                    _ = min_interval.tick() => {},
241                    // Shutdown
242                    _ = &mut shutdown_rx => {
243                        tracing::info!("Heartbeat checker is stopped");
244                        return;
245                    }
246                }
247
248                let mut inner = cluster_controller.inner.write().await;
249                // 1. Initialize new workers' TTL.
250                for worker in inner
251                    .worker_extra_info
252                    .values_mut()
253                    .filter(|worker| worker.expire_at.is_none())
254                {
255                    worker.update_ttl(cluster_controller.max_heartbeat_interval);
256                }
257
258                // 2. Collect expired workers.
259                let now = timestamp_now_sec();
260                let worker_to_delete = inner
261                    .worker_extra_info
262                    .iter()
263                    .filter(|(_, info)| info.expire_at.unwrap() < now)
264                    .map(|(id, _)| *id)
265                    .collect_vec();
266                if worker_to_delete.is_empty() {
267                    drop(inner);
268                    continue;
269                }
270
271                // 3. Delete expired workers.
272                let worker_infos = match Worker::find()
273                    .select_only()
274                    .column(worker::Column::WorkerId)
275                    .column(worker::Column::WorkerType)
276                    .column(worker::Column::Host)
277                    .column(worker::Column::Port)
278                    .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
279                    .into_tuple::<(WorkerId, WorkerType, String, i32)>()
280                    .all(&inner.db)
281                    .await
282                {
283                    Ok(keys) => keys,
284                    Err(err) => {
285                        tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
286                        continue;
287                    }
288                };
289                drop(inner);
290
291                for (worker_id, worker_type, host, port) in worker_infos {
292                    let host_addr = PbHostAddress { host, port };
293                    match cluster_controller.delete_worker(host_addr.clone()).await {
294                        Ok(_) => {
295                            tracing::warn!(
296                                %worker_id,
297                                ?host_addr,
298                                %now,
299                                "Deleted expired worker"
300                            );
301                            match worker_type {
302                                WorkerType::Frontend
303                                | WorkerType::ComputeNode
304                                | WorkerType::Compactor
305                                | WorkerType::RiseCtl => cluster_controller
306                                    .env
307                                    .notification_manager()
308                                    .delete_sender(worker_type.into(), WorkerKey(host_addr)),
309                                _ => {}
310                            };
311                        }
312                        Err(err) => {
313                            tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
314                        }
315                    }
316                }
317            }
318        });
319
320        (join_handle, shutdown_tx)
321    }
322
323    /// Get live nodes with the specified type and state.
324    /// # Arguments
325    /// * `worker_type` `WorkerType` of the nodes
326    /// * `worker_state` Filter by this state if it is not None.
327    pub async fn list_workers(
328        &self,
329        worker_type: Option<WorkerType>,
330        worker_status: Option<WorkerStatus>,
331    ) -> MetaResult<Vec<PbWorkerNode>> {
332        let mut workers = vec![];
333        // Meta node is not stored in the cluster manager DB, so we synthesize it here.
334        // Include it when listing all workers, or when explicitly listing meta nodes.
335        let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
336        // Meta node is always "running" once the service is up.
337        let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
338        if include_meta {
339            workers.push(meta_node_info(
340                &self.env.opts.advertise_addr,
341                Some(self.started_at),
342            ));
343        }
344        workers.extend(
345            self.inner
346                .read()
347                .await
348                .list_workers(worker_type, worker_status)
349                .await?,
350        );
351        Ok(workers)
352    }
353
354    pub(crate) async fn subscribe_active_streaming_compute_nodes(
355        &self,
356    ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
357        let inner = self.inner.read().await;
358        let worker_nodes = inner.list_active_streaming_workers().await?;
359        let (tx, rx) = unbounded_channel();
360
361        // insert before release the read lock to ensure that we don't lose any update in between
362        self.env.notification_manager().insert_local_sender(tx);
363        drop(inner);
364        Ok((worker_nodes, rx))
365    }
366
367    /// A convenient method to get all running compute nodes that may have running actors on them
368    /// i.e. CNs which are running
369    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
370        self.inner
371            .read()
372            .await
373            .list_active_streaming_workers()
374            .await
375    }
376
377    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
378        self.inner.read().await.list_active_worker_slots().await
379    }
380
381    /// Get the cluster info used for scheduling a streaming job, containing all active serving nodes.
382    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
383        self.inner.read().await.list_active_serving_workers().await
384    }
385
386    /// Get the cluster info used for scheduling a streaming job.
387    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
388        self.inner.read().await.get_streaming_cluster_info().await
389    }
390
391    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
392        self.inner.read().await.get_worker_by_id(worker_id).await
393    }
394
395    pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
396        self.inner
397            .read()
398            .await
399            .get_worker_extra_info_by_id(worker_id)
400    }
401
402    pub fn cluster_id(&self) -> &ClusterId {
403        self.env.cluster_id()
404    }
405
406    pub fn meta_store_endpoint(&self) -> String {
407        self.env.meta_store_ref().endpoint.clone()
408    }
409}
410
411/// The cluster info used for scheduling a streaming job.
412#[derive(Debug, Clone)]
413pub struct StreamingClusterInfo {
414    /// All **active** compute nodes in the cluster.
415    pub worker_nodes: HashMap<WorkerId, WorkerNode>,
416}
417
418// Encapsulating the use of parallelism
419impl StreamingClusterInfo {
420    pub fn parallelism(&self, resource_group: &str) -> usize {
421        let available_worker_ids =
422            filter_workers_by_resource_group(&self.worker_nodes, resource_group);
423
424        self.worker_nodes
425            .values()
426            .filter(|worker| available_worker_ids.contains(&(worker.id)))
427            .map(|worker| worker.compute_node_parallelism())
428            .sum()
429    }
430
431    pub fn filter_workers_by_resource_group(
432        &self,
433        resource_group: &str,
434    ) -> HashMap<WorkerId, WorkerNode> {
435        let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
436        self.worker_nodes
437            .iter()
438            .filter(|(id, _)| worker_ids.contains(*id))
439            .map(|(id, worker)| (*id, worker.clone()))
440            .collect()
441    }
442}
443
444#[derive(Default, Clone, Debug)]
445pub struct WorkerExtraInfo {
446    // Volatile values updated by meta node as follows.
447    //
448    // Unix timestamp that the worker will expire at.
449    expire_at: Option<u64>,
450    started_at: Option<u64>,
451    resource: PbResource,
452}
453
454impl WorkerExtraInfo {
455    fn update_ttl(&mut self, ttl: Duration) {
456        let expire = cmp::max(
457            self.expire_at.unwrap_or_default(),
458            SystemTime::now()
459                .add(ttl)
460                .duration_since(SystemTime::UNIX_EPOCH)
461                .expect("Clock may have gone backwards")
462                .as_secs(),
463        );
464        self.expire_at = Some(expire);
465    }
466
467    fn update_started_at(&mut self) {
468        self.started_at = Some(timestamp_now_sec());
469    }
470}
471
472fn timestamp_now_sec() -> u64 {
473    SystemTime::now()
474        .duration_since(SystemTime::UNIX_EPOCH)
475        .expect("Clock may have gone backwards")
476        .as_secs()
477}
478
479fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
480    PbWorkerNode {
481        id: META_NODE_ID,
482        r#type: PbWorkerType::Meta.into(),
483        host: HostAddr::try_from(host)
484            .as_ref()
485            .map(HostAddr::to_protobuf)
486            .ok(),
487        state: PbState::Running as _,
488        property: None,
489        transactional_id: None,
490        resource: Some(risingwave_pb::common::worker_node::Resource {
491            rw_version: RW_VERSION.to_owned(),
492            total_memory_bytes: system_memory_available_bytes() as _,
493            total_cpu_cores: total_cpu_available() as _,
494            hostname: hostname(),
495        }),
496        started_at,
497    }
498}
499
500pub struct ClusterControllerInner {
501    db: DatabaseConnection,
502    /// Record for tracking available machine ids, one is available.
503    available_transactional_ids: VecDeque<TransactionId>,
504    worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
505    disable_automatic_parallelism_control: bool,
506}
507
508impl ClusterControllerInner {
509    pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
510    pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
511
512    pub async fn new(
513        db: DatabaseConnection,
514        disable_automatic_parallelism_control: bool,
515    ) -> MetaResult<Self> {
516        let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
517            .select_only()
518            .column(worker::Column::WorkerId)
519            .column(worker::Column::TransactionId)
520            .into_tuple()
521            .all(&db)
522            .await?;
523        let inuse_txn_ids: HashSet<_> = workers
524            .iter()
525            .cloned()
526            .filter_map(|(_, txn_id)| txn_id)
527            .collect();
528        let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
529            .filter(|id| !inuse_txn_ids.contains(id))
530            .collect();
531
532        let worker_extra_info = workers
533            .into_iter()
534            .map(|(w, _)| (w, WorkerExtraInfo::default()))
535            .collect();
536
537        Ok(Self {
538            db,
539            available_transactional_ids,
540            worker_extra_info,
541            disable_automatic_parallelism_control,
542        })
543    }
544
545    pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
546        let workers: Vec<(WorkerType, i64)> = Worker::find()
547            .select_only()
548            .column(worker::Column::WorkerType)
549            .column_as(worker::Column::WorkerId.count(), "count")
550            .group_by(worker::Column::WorkerType)
551            .into_tuple()
552            .all(&self.db)
553            .await?;
554
555        Ok(workers.into_iter().collect())
556    }
557
558    pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
559        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
560            let expire = cmp::max(
561                info.expire_at.unwrap_or_default(),
562                SystemTime::now()
563                    .add(ttl)
564                    .duration_since(SystemTime::UNIX_EPOCH)
565                    .expect("Clock may have gone backwards")
566                    .as_secs(),
567            );
568            info.expire_at = Some(expire);
569            Ok(())
570        } else {
571            Err(MetaError::invalid_worker(worker_id, "worker not found"))
572        }
573    }
574
575    fn update_resource_and_started_at(
576        &mut self,
577        worker_id: WorkerId,
578        resource: PbResource,
579    ) -> MetaResult<()> {
580        if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
581            info.resource = resource;
582            info.update_started_at();
583            Ok(())
584        } else {
585            Err(MetaError::invalid_worker(worker_id, "worker not found"))
586        }
587    }
588
589    fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
590        self.worker_extra_info
591            .get(&worker_id)
592            .cloned()
593            .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
594    }
595
596    fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
597        match (self.available_transactional_ids.front(), r#type) {
598            (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
599            // We only assign transactional id to compute node and frontend.
600            (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
601            _ => Ok(None),
602        }
603    }
604
605    /// Get the total resource of the cluster.
606    fn cluster_resource(&self) -> ClusterResource {
607        // For each hostname, we only consider the maximum resource, in case a host has multiple nodes.
608        let mut per_host = HashMap::new();
609
610        // Note: Meta node itself is not a "worker" and thus won't register via `add_worker_node`.
611        // Still, for license/RWU enforcement we should include the resources used by the meta node.
612        per_host.insert(
613            hostname(),
614            ClusterResource {
615                total_cpu_cores: total_cpu_available() as _,
616                total_memory_bytes: system_memory_available_bytes() as _,
617            },
618        );
619
620        for info in self.worker_extra_info.values() {
621            let r = per_host
622                .entry(info.resource.hostname.clone())
623                .or_insert_with(ClusterResource::default);
624
625            r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
626            r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
627        }
628
629        // For different hostnames, we sum up the resources.
630        per_host
631            .into_values()
632            .reduce(|a, b| ClusterResource {
633                total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
634                total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
635            })
636            .unwrap_or_default()
637    }
638
639    #[await_tree::instrument]
640    pub async fn add_worker(
641        &mut self,
642        r#type: PbWorkerType,
643        host_address: HostAddress,
644        add_property: AddNodeProperty,
645        resource: PbResource,
646        ttl: Duration,
647    ) -> MetaResult<WorkerId> {
648        let txn = self.db.begin().await?;
649
650        let worker = Worker::find()
651            .filter(
652                worker::Column::Host
653                    .eq(host_address.host.clone())
654                    .and(worker::Column::Port.eq(host_address.port)),
655            )
656            .find_also_related(WorkerProperty)
657            .one(&txn)
658            .await?;
659        // Worker already exist.
660        if let Some((worker, property)) = worker {
661            assert_eq!(worker.worker_type, r#type.into());
662            return if worker.worker_type == WorkerType::ComputeNode {
663                let property = property.unwrap();
664                let mut current_parallelism = property.parallelism as usize;
665                let new_parallelism = add_property.parallelism as usize;
666                match new_parallelism.cmp(&current_parallelism) {
667                    Ordering::Less => {
668                        if !self.disable_automatic_parallelism_control {
669                            // Handing over to the subsequent recovery loop for a forced reschedule.
670                            tracing::info!(
671                                "worker {} parallelism reduced from {} to {}",
672                                worker.worker_id,
673                                current_parallelism,
674                                new_parallelism
675                            );
676                            current_parallelism = new_parallelism;
677                        } else {
678                            // Warn and keep the original parallelism if the worker registered with a
679                            // smaller parallelism.
680                            tracing::warn!(
681                                "worker {} parallelism is less than current, current is {}, but received {}",
682                                worker.worker_id,
683                                current_parallelism,
684                                new_parallelism
685                            );
686                        }
687                    }
688                    Ordering::Greater => {
689                        tracing::info!(
690                            "worker {} parallelism updated from {} to {}",
691                            worker.worker_id,
692                            current_parallelism,
693                            new_parallelism
694                        );
695                        current_parallelism = new_parallelism;
696                    }
697                    Ordering::Equal => {}
698                }
699                let mut property: worker_property::ActiveModel = property.into();
700
701                property.is_streaming = Set(add_property.is_streaming);
702                property.is_serving = Set(add_property.is_serving);
703                property.parallelism = Set(current_parallelism as _);
704                property.resource_group =
705                    Set(Some(add_property.resource_group.unwrap_or_else(|| {
706                        tracing::warn!(
707                            "resource_group is not set for worker {}, fallback to `default`",
708                            worker.worker_id
709                        );
710                        DEFAULT_RESOURCE_GROUP.to_owned()
711                    })));
712
713                WorkerProperty::update(property).exec(&txn).await?;
714                txn.commit().await?;
715                self.update_worker_ttl(worker.worker_id, ttl)?;
716                self.update_resource_and_started_at(worker.worker_id, resource)?;
717                Ok(worker.worker_id)
718            } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
719                let worker_property = worker_property::ActiveModel {
720                    worker_id: Set(worker.worker_id),
721                    parallelism: Set(add_property
722                        .parallelism
723                        .try_into()
724                        .expect("invalid parallelism")),
725                    is_streaming: Set(add_property.is_streaming),
726                    is_serving: Set(add_property.is_serving),
727                    is_unschedulable: Set(false),
728                    internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
729                    resource_group: Set(None),
730                    is_iceberg_compactor: Set(false),
731                };
732                WorkerProperty::insert(worker_property).exec(&txn).await?;
733                txn.commit().await?;
734                self.update_worker_ttl(worker.worker_id, ttl)?;
735                self.update_resource_and_started_at(worker.worker_id, resource)?;
736                Ok(worker.worker_id)
737            } else if worker.worker_type == WorkerType::Compactor {
738                if let Some(property) = property {
739                    let mut property: worker_property::ActiveModel = property.into();
740                    property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
741                    property.internal_rpc_host_addr =
742                        Set(Some(add_property.internal_rpc_host_addr));
743
744                    WorkerProperty::update(property).exec(&txn).await?;
745                } else {
746                    let property = worker_property::ActiveModel {
747                        worker_id: Set(worker.worker_id),
748                        parallelism: Set(add_property
749                            .parallelism
750                            .try_into()
751                            .expect("invalid parallelism")),
752                        is_streaming: Set(false),
753                        is_serving: Set(false),
754                        is_unschedulable: Set(false),
755                        internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
756                        resource_group: Set(None),
757                        is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
758                    };
759
760                    WorkerProperty::insert(property).exec(&txn).await?;
761                }
762                txn.commit().await?;
763                self.update_worker_ttl(worker.worker_id, ttl)?;
764                self.update_resource_and_started_at(worker.worker_id, resource)?;
765                Ok(worker.worker_id)
766            } else {
767                self.update_worker_ttl(worker.worker_id, ttl)?;
768                self.update_resource_and_started_at(worker.worker_id, resource)?;
769                Ok(worker.worker_id)
770            };
771        }
772
773        let txn_id = self.apply_transaction_id(r#type)?;
774
775        let worker = worker::ActiveModel {
776            worker_id: Default::default(),
777            worker_type: Set(r#type.into()),
778            host: Set(host_address.host.clone()),
779            port: Set(host_address.port),
780            status: Set(WorkerStatus::Starting),
781            transaction_id: Set(txn_id),
782        };
783        let insert_res = Worker::insert(worker).exec(&txn).await?;
784        let worker_id = insert_res.last_insert_id as WorkerId;
785        if r#type == PbWorkerType::ComputeNode
786            || r#type == PbWorkerType::Frontend
787            || r#type == PbWorkerType::Compactor
788        {
789            let (is_serving, is_streaming, is_iceberg_compactor, resource_group) = match r#type {
790                PbWorkerType::ComputeNode => (
791                    add_property.is_serving,
792                    add_property.is_streaming,
793                    false,
794                    add_property.resource_group.clone(),
795                ),
796                PbWorkerType::Frontend => (
797                    add_property.is_serving,
798                    add_property.is_streaming,
799                    false,
800                    None,
801                ),
802                PbWorkerType::Compactor => (false, false, add_property.is_iceberg_compactor, None),
803                _ => unreachable!(),
804            };
805
806            let property = worker_property::ActiveModel {
807                worker_id: Set(worker_id),
808                parallelism: Set(add_property
809                    .parallelism
810                    .try_into()
811                    .expect("invalid parallelism")),
812                is_streaming: Set(is_streaming),
813                is_serving: Set(is_serving),
814                is_unschedulable: Set(false),
815                internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
816                resource_group: Set(resource_group),
817                is_iceberg_compactor: Set(is_iceberg_compactor),
818            };
819            WorkerProperty::insert(property).exec(&txn).await?;
820        }
821
822        txn.commit().await?;
823        if let Some(txn_id) = txn_id {
824            self.available_transactional_ids.retain(|id| *id != txn_id);
825        }
826        let extra_info = WorkerExtraInfo {
827            started_at: Some(timestamp_now_sec()),
828            expire_at: None,
829            resource,
830        };
831        self.worker_extra_info.insert(worker_id, extra_info);
832
833        Ok(worker_id)
834    }
835
836    pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
837        let worker = worker::ActiveModel {
838            worker_id: Set(worker_id),
839            status: Set(WorkerStatus::Running),
840            ..Default::default()
841        };
842
843        let worker = worker.update(&self.db).await?;
844        let worker_property = WorkerProperty::find_by_id(worker.worker_id)
845            .one(&self.db)
846            .await?;
847        let extra_info = self.get_extra_info_checked(worker_id)?;
848        Ok(WorkerInfo(worker, worker_property, extra_info).into())
849    }
850
851    pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
852        let worker = Worker::find()
853            .filter(
854                worker::Column::Host
855                    .eq(host_addr.host)
856                    .and(worker::Column::Port.eq(host_addr.port)),
857            )
858            .find_also_related(WorkerProperty)
859            .one(&self.db)
860            .await?;
861        let Some((worker, property)) = worker else {
862            return Err(MetaError::invalid_parameter("worker not found!"));
863        };
864
865        let res = Worker::delete_by_id(worker.worker_id)
866            .exec(&self.db)
867            .await?;
868        if res.rows_affected == 0 {
869            return Err(MetaError::invalid_parameter("worker not found!"));
870        }
871
872        let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
873        if let Some(txn_id) = &worker.transaction_id {
874            self.available_transactional_ids.push_back(*txn_id);
875        }
876        let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
877
878        Ok(worker)
879    }
880
881    pub fn heartbeat(
882        &mut self,
883        worker_id: WorkerId,
884        ttl: Duration,
885        resource: Option<PbResource>,
886    ) -> MetaResult<()> {
887        if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
888            worker_info.update_ttl(ttl);
889            if let Some(resource) = resource {
890                worker_info.resource = resource;
891            }
892            Ok(())
893        } else {
894            Err(MetaError::invalid_worker(worker_id, "worker not found"))
895        }
896    }
897
898    pub async fn list_workers(
899        &self,
900        worker_type: Option<WorkerType>,
901        worker_status: Option<WorkerStatus>,
902    ) -> MetaResult<Vec<PbWorkerNode>> {
903        let mut find = Worker::find();
904        if let Some(worker_type) = worker_type {
905            find = find.filter(worker::Column::WorkerType.eq(worker_type));
906        }
907        if let Some(worker_status) = worker_status {
908            find = find.filter(worker::Column::Status.eq(worker_status));
909        }
910        let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
911        Ok(workers
912            .into_iter()
913            .map(|(worker, property)| {
914                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
915                WorkerInfo(worker, property, extra_info).into()
916            })
917            .collect_vec())
918    }
919
920    pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
921        let workers = Worker::find()
922            .filter(
923                worker::Column::WorkerType
924                    .eq(WorkerType::ComputeNode)
925                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
926            )
927            .inner_join(WorkerProperty)
928            .select_also(WorkerProperty)
929            .filter(worker_property::Column::IsStreaming.eq(true))
930            .all(&self.db)
931            .await?;
932
933        Ok(workers
934            .into_iter()
935            .map(|(worker, property)| {
936                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
937                WorkerInfo(worker, property, extra_info).into()
938            })
939            .collect_vec())
940    }
941
942    pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
943        let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
944            .select_only()
945            .column(worker_property::Column::WorkerId)
946            .column(worker_property::Column::Parallelism)
947            .inner_join(Worker)
948            .filter(worker::Column::Status.eq(WorkerStatus::Running))
949            .into_tuple()
950            .all(&self.db)
951            .await?;
952        Ok(worker_parallelisms
953            .into_iter()
954            .flat_map(|(worker_id, parallelism)| {
955                (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
956            })
957            .collect_vec())
958    }
959
960    pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
961        let workers = Worker::find()
962            .filter(
963                worker::Column::WorkerType
964                    .eq(WorkerType::ComputeNode)
965                    .and(worker::Column::Status.eq(WorkerStatus::Running)),
966            )
967            .inner_join(WorkerProperty)
968            .select_also(WorkerProperty)
969            .filter(worker_property::Column::IsServing.eq(true))
970            .all(&self.db)
971            .await?;
972
973        Ok(workers
974            .into_iter()
975            .map(|(worker, property)| {
976                let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
977                WorkerInfo(worker, property, extra_info).into()
978            })
979            .collect_vec())
980    }
981
982    pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
983        let streaming_workers = self.list_active_streaming_workers().await?;
984
985        let active_workers: HashMap<_, _> =
986            streaming_workers.into_iter().map(|w| (w.id, w)).collect();
987
988        Ok(StreamingClusterInfo {
989            worker_nodes: active_workers,
990        })
991    }
992
993    pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
994        let worker = Worker::find_by_id(worker_id)
995            .find_also_related(WorkerProperty)
996            .one(&self.db)
997            .await?;
998        if worker.is_none() {
999            return Ok(None);
1000        }
1001        let extra_info = self.get_extra_info_checked(worker_id)?;
1002        Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1003    }
1004
1005    pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1006        self.worker_extra_info.get(&worker_id).cloned()
1007    }
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use super::*;
1013
1014    fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1015        (0..count)
1016            .map(|i| HostAddress {
1017                host: "localhost".to_owned(),
1018                port: 5000 + i as i32,
1019            })
1020            .collect_vec()
1021    }
1022
1023    #[tokio::test]
1024    async fn test_cluster_controller() -> MetaResult<()> {
1025        let env = MetaSrvEnv::for_test().await;
1026        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1027
1028        let parallelism_num = 4_usize;
1029        let worker_count = 5_usize;
1030        let property = AddNodeProperty {
1031            parallelism: parallelism_num as _,
1032            is_streaming: true,
1033            is_serving: true,
1034            ..Default::default()
1035        };
1036        let hosts = mock_worker_hosts_for_test(worker_count);
1037        let mut worker_ids = vec![];
1038        for host in &hosts {
1039            worker_ids.push(
1040                cluster_ctl
1041                    .add_worker(
1042                        PbWorkerType::ComputeNode,
1043                        host.clone(),
1044                        property.clone(),
1045                        PbResource::default(),
1046                    )
1047                    .await?,
1048            );
1049        }
1050
1051        // Since no worker is active, the parallelism should be 0.
1052        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1053
1054        for id in &worker_ids {
1055            cluster_ctl.activate_worker(*id).await?;
1056        }
1057        let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1058        assert_eq!(
1059            *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1060            worker_count
1061        );
1062        assert_eq!(
1063            cluster_ctl.list_active_streaming_workers().await?.len(),
1064            worker_count
1065        );
1066        assert_eq!(
1067            cluster_ctl.list_active_serving_workers().await?.len(),
1068            worker_count
1069        );
1070        assert_eq!(
1071            cluster_ctl.list_active_worker_slots().await?.len(),
1072            parallelism_num * worker_count
1073        );
1074
1075        // re-register existing worker node with larger parallelism and change its serving mode.
1076        let mut new_property = property.clone();
1077        new_property.parallelism = (parallelism_num * 2) as _;
1078        new_property.is_serving = false;
1079        cluster_ctl
1080            .add_worker(
1081                PbWorkerType::ComputeNode,
1082                hosts[0].clone(),
1083                new_property,
1084                PbResource::default(),
1085            )
1086            .await?;
1087
1088        assert_eq!(
1089            cluster_ctl.list_active_streaming_workers().await?.len(),
1090            worker_count
1091        );
1092        assert_eq!(
1093            cluster_ctl.list_active_serving_workers().await?.len(),
1094            worker_count - 1
1095        );
1096        let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1097        assert!(worker_slots.iter().all_unique());
1098        assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1099
1100        // delete workers.
1101        for host in hosts {
1102            cluster_ctl.delete_worker(host).await?;
1103        }
1104        assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1105        assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1106        assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1107
1108        Ok(())
1109    }
1110
1111    #[tokio::test]
1112    async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1113        let env = MetaSrvEnv::for_test().await;
1114        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1115
1116        // List all workers should include the synthesized meta node.
1117        let workers = cluster_ctl.list_workers(None, None).await?;
1118        assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1119
1120        // Explicitly listing meta workers should also include it.
1121        let workers = cluster_ctl
1122            .list_workers(Some(WorkerType::Meta), None)
1123            .await?;
1124        assert_eq!(workers.len(), 1);
1125        assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1126
1127        // Listing starting workers should not include meta.
1128        let workers = cluster_ctl
1129            .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1130            .await?;
1131        assert!(workers.is_empty());
1132
1133        Ok(())
1134    }
1135
1136    #[tokio::test]
1137    async fn test_heartbeat_updates_resource() -> MetaResult<()> {
1138        let env = MetaSrvEnv::for_test().await;
1139        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1140
1141        let host = HostAddress {
1142            host: "localhost".to_owned(),
1143            port: 5010,
1144        };
1145        let property = AddNodeProperty {
1146            is_streaming: true,
1147            is_serving: true,
1148            parallelism: 4,
1149            ..Default::default()
1150        };
1151
1152        let resource_v1 = PbResource {
1153            rw_version: "rw-v1".to_owned(),
1154            total_memory_bytes: 1024,
1155            total_cpu_cores: 4,
1156            hostname: "host-v1".to_owned(),
1157        };
1158        let worker_id = cluster_ctl
1159            .add_worker(
1160                PbWorkerType::ComputeNode,
1161                host.clone(),
1162                property,
1163                resource_v1,
1164            )
1165            .await?;
1166
1167        let resource_v2 = PbResource {
1168            rw_version: "rw-v2".to_owned(),
1169            total_memory_bytes: 2048,
1170            total_cpu_cores: 8,
1171            hostname: "host-v2".to_owned(),
1172        };
1173        cluster_ctl
1174            .heartbeat(worker_id, Some(resource_v2.clone()))
1175            .await?;
1176
1177        let worker = cluster_ctl
1178            .get_worker_by_id(worker_id)
1179            .await?
1180            .expect("worker should exist");
1181        assert_eq!(
1182            worker.resource.expect("worker resource should exist"),
1183            resource_v2
1184        );
1185
1186        cluster_ctl.delete_worker(host).await?;
1187        Ok(())
1188    }
1189
1190    #[tokio::test]
1191    async fn test_reregister_compute_node_updates_resource() -> MetaResult<()> {
1192        let env = MetaSrvEnv::for_test().await;
1193        let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1194
1195        let host = HostAddress {
1196            host: "localhost".to_owned(),
1197            port: 5011,
1198        };
1199        let property = AddNodeProperty {
1200            is_streaming: true,
1201            is_serving: true,
1202            parallelism: 4,
1203            ..Default::default()
1204        };
1205
1206        let resource_v1 = PbResource {
1207            rw_version: "rw-v1".to_owned(),
1208            total_memory_bytes: 1024,
1209            total_cpu_cores: 4,
1210            hostname: "host-v1".to_owned(),
1211        };
1212        let worker_id = cluster_ctl
1213            .add_worker(
1214                PbWorkerType::ComputeNode,
1215                host.clone(),
1216                property.clone(),
1217                resource_v1,
1218            )
1219            .await?;
1220
1221        let resource_v2 = PbResource {
1222            rw_version: "rw-v2".to_owned(),
1223            total_memory_bytes: 2048,
1224            total_cpu_cores: 8,
1225            hostname: "host-v2".to_owned(),
1226        };
1227        cluster_ctl
1228            .add_worker(
1229                PbWorkerType::ComputeNode,
1230                host.clone(),
1231                property,
1232                resource_v2.clone(),
1233            )
1234            .await?;
1235
1236        let worker = cluster_ctl
1237            .get_worker_by_id(worker_id)
1238            .await?
1239            .expect("worker should exist");
1240        assert_eq!(
1241            worker.resource.expect("worker resource should exist"),
1242            resource_v2
1243        );
1244
1245        cluster_ctl.delete_worker(host).await?;
1246        Ok(())
1247    }
1248}