1use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37 ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45 TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61 env: MetaSrvEnv,
62 max_heartbeat_interval: Duration,
63 inner: RwLock<ClusterControllerInner>,
64 started_at: u64,
66}
67
68struct WorkerInfo(
69 worker::Model,
70 Option<worker_property::Model>,
71 WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75 fn from(info: WorkerInfo) -> Self {
76 Self {
77 id: info.0.worker_id,
78 r#type: PbWorkerType::from(info.0.worker_type) as _,
79 host: Some(PbHostAddress {
80 host: info.0.host,
81 port: info.0.port,
82 }),
83 state: PbState::from(info.0.status) as _,
84 property: info.1.as_ref().map(|p| PbProperty {
85 is_streaming: p.is_streaming,
86 is_serving: p.is_serving,
87 is_unschedulable: p.is_unschedulable,
88 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89 resource_group: p.resource_group.clone(),
90 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91 is_iceberg_compactor: p.is_iceberg_compactor,
92 }),
93 transactional_id: info.0.transaction_id.map(|id| id as _),
94 resource: Some(info.2.resource),
95 started_at: info.2.started_at,
96 }
97 }
98}
99
100impl ClusterController {
101 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
102 let inner = ClusterControllerInner::new(
103 env.meta_store_ref().conn.clone(),
104 env.opts.disable_automatic_parallelism_control,
105 )
106 .await?;
107 Ok(Self {
108 env,
109 max_heartbeat_interval,
110 inner: RwLock::new(inner),
111 started_at: timestamp_now_sec(),
112 })
113 }
114
115 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
118 self.inner.read().await
119 }
120
121 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
122 self.inner.read().await.count_worker_by_type().await
123 }
124
125 pub async fn cluster_resource(&self) -> ClusterResource {
127 self.inner.read().await.cluster_resource()
128 }
129
130 async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
132 let resource = self.cluster_resource().await;
133
134 LicenseManager::get().update_cluster_resource(resource);
136 self.env.notification_manager().notify_all_without_version(
138 Operation::Update, Info::ClusterResource(resource),
140 );
141
142 Ok(())
143 }
144
145 pub async fn add_worker(
150 &self,
151 r#type: PbWorkerType,
152 host_address: HostAddress,
153 property: AddNodeProperty,
154 resource: PbResource,
155 ) -> MetaResult<WorkerId> {
156 let worker_id = self
157 .inner
158 .write()
159 .await
160 .add_worker(
161 r#type,
162 host_address,
163 property,
164 resource,
165 self.max_heartbeat_interval,
166 )
167 .await?;
168
169 self.update_cluster_resource_for_license().await?;
171
172 Ok(worker_id)
173 }
174
175 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
176 let inner = self.inner.write().await;
177 let worker = inner.activate_worker(worker_id).await?;
178
179 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
182 {
183 self.env
184 .notification_manager()
185 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
186 .await;
187 }
188 self.env
189 .notification_manager()
190 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
191
192 Ok(())
193 }
194
195 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
196 let worker = self.inner.write().await.delete_worker(host_address).await?;
197
198 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
199 {
200 self.env
201 .notification_manager()
202 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
203 .await;
204 }
205
206 self.update_cluster_resource_for_license().await?;
208
209 self.env
213 .notification_manager()
214 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
215
216 Ok(worker)
217 }
218
219 pub async fn update_schedulability(
220 &self,
221 worker_ids: Vec<WorkerId>,
222 schedulability: Schedulability,
223 ) -> MetaResult<()> {
224 self.inner
225 .write()
226 .await
227 .update_schedulability(worker_ids, schedulability)
228 .await
229 }
230
231 pub async fn heartbeat(
233 &self,
234 worker_id: WorkerId,
235 resource: Option<PbResource>,
236 ) -> MetaResult<()> {
237 tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
238 self.inner
239 .write()
240 .await
241 .heartbeat(worker_id, self.max_heartbeat_interval, resource)
242 }
243
244 pub fn start_heartbeat_checker(
245 cluster_controller: ClusterControllerRef,
246 check_interval: Duration,
247 ) -> (JoinHandle<()>, Sender<()>) {
248 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
249 let join_handle = tokio::spawn(async move {
250 let mut min_interval = tokio::time::interval(check_interval);
251 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
252 loop {
253 tokio::select! {
254 _ = min_interval.tick() => {},
256 _ = &mut shutdown_rx => {
258 tracing::info!("Heartbeat checker is stopped");
259 return;
260 }
261 }
262
263 let mut inner = cluster_controller.inner.write().await;
264 for worker in inner
266 .worker_extra_info
267 .values_mut()
268 .filter(|worker| worker.expire_at.is_none())
269 {
270 worker.update_ttl(cluster_controller.max_heartbeat_interval);
271 }
272
273 let now = timestamp_now_sec();
275 let worker_to_delete = inner
276 .worker_extra_info
277 .iter()
278 .filter(|(_, info)| info.expire_at.unwrap() < now)
279 .map(|(id, _)| *id)
280 .collect_vec();
281
282 let worker_infos = match Worker::find()
284 .select_only()
285 .column(worker::Column::WorkerId)
286 .column(worker::Column::WorkerType)
287 .column(worker::Column::Host)
288 .column(worker::Column::Port)
289 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
290 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
291 .all(&inner.db)
292 .await
293 {
294 Ok(keys) => keys,
295 Err(err) => {
296 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
297 continue;
298 }
299 };
300 drop(inner);
301
302 for (worker_id, worker_type, host, port) in worker_infos {
303 let host_addr = PbHostAddress { host, port };
304 match cluster_controller.delete_worker(host_addr.clone()).await {
305 Ok(_) => {
306 tracing::warn!(
307 %worker_id,
308 ?host_addr,
309 %now,
310 "Deleted expired worker"
311 );
312 match worker_type {
313 WorkerType::Frontend
314 | WorkerType::ComputeNode
315 | WorkerType::Compactor
316 | WorkerType::RiseCtl => cluster_controller
317 .env
318 .notification_manager()
319 .delete_sender(worker_type.into(), WorkerKey(host_addr)),
320 _ => {}
321 };
322 }
323 Err(err) => {
324 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
325 }
326 }
327 }
328 }
329 });
330
331 (join_handle, shutdown_tx)
332 }
333
334 pub async fn list_workers(
339 &self,
340 worker_type: Option<WorkerType>,
341 worker_status: Option<WorkerStatus>,
342 ) -> MetaResult<Vec<PbWorkerNode>> {
343 let mut workers = vec![];
344 let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
347 let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
349 if include_meta {
350 workers.push(meta_node_info(
351 &self.env.opts.advertise_addr,
352 Some(self.started_at),
353 ));
354 }
355 workers.extend(
356 self.inner
357 .read()
358 .await
359 .list_workers(worker_type, worker_status)
360 .await?,
361 );
362 Ok(workers)
363 }
364
365 pub(crate) async fn subscribe_active_streaming_compute_nodes(
366 &self,
367 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
368 let inner = self.inner.read().await;
369 let worker_nodes = inner.list_active_streaming_workers().await?;
370 let (tx, rx) = unbounded_channel();
371
372 self.env.notification_manager().insert_local_sender(tx);
374 drop(inner);
375 Ok((worker_nodes, rx))
376 }
377
378 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
381 self.inner
382 .read()
383 .await
384 .list_active_streaming_workers()
385 .await
386 }
387
388 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
389 self.inner.read().await.list_active_worker_slots().await
390 }
391
392 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
395 self.inner.read().await.list_active_serving_workers().await
396 }
397
398 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
400 self.inner.read().await.get_streaming_cluster_info().await
401 }
402
403 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
404 self.inner.read().await.get_worker_by_id(worker_id).await
405 }
406
407 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
408 self.inner
409 .read()
410 .await
411 .get_worker_extra_info_by_id(worker_id)
412 }
413
414 pub fn cluster_id(&self) -> &ClusterId {
415 self.env.cluster_id()
416 }
417
418 pub fn meta_store_endpoint(&self) -> String {
419 self.env.meta_store_ref().endpoint.clone()
420 }
421}
422
423#[derive(Debug, Clone)]
425pub struct StreamingClusterInfo {
426 pub worker_nodes: HashMap<WorkerId, WorkerNode>,
428
429 pub schedulable_workers: HashSet<WorkerId>,
431
432 pub unschedulable_workers: HashSet<WorkerId>,
434}
435
436impl StreamingClusterInfo {
438 pub fn parallelism(&self, resource_group: &str) -> usize {
439 let available_worker_ids =
440 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
441
442 self.worker_nodes
443 .values()
444 .filter(|worker| available_worker_ids.contains(&(worker.id)))
445 .map(|worker| worker.compute_node_parallelism())
446 .sum()
447 }
448
449 pub fn filter_schedulable_workers_by_resource_group(
450 &self,
451 resource_group: &str,
452 ) -> HashMap<WorkerId, WorkerNode> {
453 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
454 self.worker_nodes
455 .iter()
456 .filter(|(id, _)| worker_ids.contains(*id))
457 .map(|(id, worker)| (*id, worker.clone()))
458 .collect()
459 }
460}
461
462#[derive(Default, Clone, Debug)]
463pub struct WorkerExtraInfo {
464 expire_at: Option<u64>,
468 started_at: Option<u64>,
469 resource: PbResource,
470}
471
472impl WorkerExtraInfo {
473 fn update_ttl(&mut self, ttl: Duration) {
474 let expire = cmp::max(
475 self.expire_at.unwrap_or_default(),
476 SystemTime::now()
477 .add(ttl)
478 .duration_since(SystemTime::UNIX_EPOCH)
479 .expect("Clock may have gone backwards")
480 .as_secs(),
481 );
482 self.expire_at = Some(expire);
483 }
484
485 fn update_started_at(&mut self) {
486 self.started_at = Some(timestamp_now_sec());
487 }
488}
489
490fn timestamp_now_sec() -> u64 {
491 SystemTime::now()
492 .duration_since(SystemTime::UNIX_EPOCH)
493 .expect("Clock may have gone backwards")
494 .as_secs()
495}
496
497fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
498 PbWorkerNode {
499 id: META_NODE_ID,
500 r#type: PbWorkerType::Meta.into(),
501 host: HostAddr::try_from(host)
502 .as_ref()
503 .map(HostAddr::to_protobuf)
504 .ok(),
505 state: PbState::Running as _,
506 property: None,
507 transactional_id: None,
508 resource: Some(risingwave_pb::common::worker_node::Resource {
509 rw_version: RW_VERSION.to_owned(),
510 total_memory_bytes: system_memory_available_bytes() as _,
511 total_cpu_cores: total_cpu_available() as _,
512 hostname: hostname(),
513 }),
514 started_at,
515 }
516}
517
518pub struct ClusterControllerInner {
519 db: DatabaseConnection,
520 available_transactional_ids: VecDeque<TransactionId>,
522 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
523 disable_automatic_parallelism_control: bool,
524}
525
526impl ClusterControllerInner {
527 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
528 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
529
530 pub async fn new(
531 db: DatabaseConnection,
532 disable_automatic_parallelism_control: bool,
533 ) -> MetaResult<Self> {
534 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
535 .select_only()
536 .column(worker::Column::WorkerId)
537 .column(worker::Column::TransactionId)
538 .into_tuple()
539 .all(&db)
540 .await?;
541 let inuse_txn_ids: HashSet<_> = workers
542 .iter()
543 .cloned()
544 .filter_map(|(_, txn_id)| txn_id)
545 .collect();
546 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
547 .filter(|id| !inuse_txn_ids.contains(id))
548 .collect();
549
550 let worker_extra_info = workers
551 .into_iter()
552 .map(|(w, _)| (w, WorkerExtraInfo::default()))
553 .collect();
554
555 Ok(Self {
556 db,
557 available_transactional_ids,
558 worker_extra_info,
559 disable_automatic_parallelism_control,
560 })
561 }
562
563 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
564 let workers: Vec<(WorkerType, i64)> = Worker::find()
565 .select_only()
566 .column(worker::Column::WorkerType)
567 .column_as(worker::Column::WorkerId.count(), "count")
568 .group_by(worker::Column::WorkerType)
569 .into_tuple()
570 .all(&self.db)
571 .await?;
572
573 Ok(workers.into_iter().collect())
574 }
575
576 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
577 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
578 let expire = cmp::max(
579 info.expire_at.unwrap_or_default(),
580 SystemTime::now()
581 .add(ttl)
582 .duration_since(SystemTime::UNIX_EPOCH)
583 .expect("Clock may have gone backwards")
584 .as_secs(),
585 );
586 info.expire_at = Some(expire);
587 Ok(())
588 } else {
589 Err(MetaError::invalid_worker(worker_id, "worker not found"))
590 }
591 }
592
593 fn update_resource_and_started_at(
594 &mut self,
595 worker_id: WorkerId,
596 resource: PbResource,
597 ) -> MetaResult<()> {
598 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
599 info.resource = resource;
600 info.update_started_at();
601 Ok(())
602 } else {
603 Err(MetaError::invalid_worker(worker_id, "worker not found"))
604 }
605 }
606
607 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
608 self.worker_extra_info
609 .get(&worker_id)
610 .cloned()
611 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
612 }
613
614 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
615 match (self.available_transactional_ids.front(), r#type) {
616 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
617 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
619 _ => Ok(None),
620 }
621 }
622
623 fn cluster_resource(&self) -> ClusterResource {
625 let mut per_host = HashMap::new();
627
628 per_host.insert(
631 hostname(),
632 ClusterResource {
633 total_cpu_cores: total_cpu_available() as _,
634 total_memory_bytes: system_memory_available_bytes() as _,
635 },
636 );
637
638 for info in self.worker_extra_info.values() {
639 let r = per_host
640 .entry(info.resource.hostname.clone())
641 .or_insert_with(ClusterResource::default);
642
643 r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
644 r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
645 }
646
647 per_host
649 .into_values()
650 .reduce(|a, b| ClusterResource {
651 total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
652 total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
653 })
654 .unwrap_or_default()
655 }
656
657 pub async fn add_worker(
658 &mut self,
659 r#type: PbWorkerType,
660 host_address: HostAddress,
661 add_property: AddNodeProperty,
662 resource: PbResource,
663 ttl: Duration,
664 ) -> MetaResult<WorkerId> {
665 let txn = self.db.begin().await?;
666
667 let worker = Worker::find()
668 .filter(
669 worker::Column::Host
670 .eq(host_address.host.clone())
671 .and(worker::Column::Port.eq(host_address.port)),
672 )
673 .find_also_related(WorkerProperty)
674 .one(&txn)
675 .await?;
676 if let Some((worker, property)) = worker {
678 assert_eq!(worker.worker_type, r#type.into());
679 return if worker.worker_type == WorkerType::ComputeNode {
680 let property = property.unwrap();
681 let mut current_parallelism = property.parallelism as usize;
682 let new_parallelism = add_property.parallelism as usize;
683 match new_parallelism.cmp(¤t_parallelism) {
684 Ordering::Less => {
685 if !self.disable_automatic_parallelism_control {
686 tracing::info!(
688 "worker {} parallelism reduced from {} to {}",
689 worker.worker_id,
690 current_parallelism,
691 new_parallelism
692 );
693 current_parallelism = new_parallelism;
694 } else {
695 tracing::warn!(
698 "worker {} parallelism is less than current, current is {}, but received {}",
699 worker.worker_id,
700 current_parallelism,
701 new_parallelism
702 );
703 }
704 }
705 Ordering::Greater => {
706 tracing::info!(
707 "worker {} parallelism updated from {} to {}",
708 worker.worker_id,
709 current_parallelism,
710 new_parallelism
711 );
712 current_parallelism = new_parallelism;
713 }
714 Ordering::Equal => {}
715 }
716 let mut property: worker_property::ActiveModel = property.into();
717
718 property.is_streaming = Set(add_property.is_streaming);
720 property.is_serving = Set(add_property.is_serving);
721 property.parallelism = Set(current_parallelism as _);
722 property.resource_group =
723 Set(Some(add_property.resource_group.unwrap_or_else(|| {
724 tracing::warn!(
725 "resource_group is not set for worker {}, fallback to `default`",
726 worker.worker_id
727 );
728 DEFAULT_RESOURCE_GROUP.to_owned()
729 })));
730
731 WorkerProperty::update(property).exec(&txn).await?;
732 txn.commit().await?;
733 self.update_worker_ttl(worker.worker_id, ttl)?;
734 self.update_resource_and_started_at(worker.worker_id, resource)?;
735 Ok(worker.worker_id)
736 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
737 let worker_property = worker_property::ActiveModel {
738 worker_id: Set(worker.worker_id),
739 parallelism: Set(add_property
740 .parallelism
741 .try_into()
742 .expect("invalid parallelism")),
743 is_streaming: Set(add_property.is_streaming),
744 is_serving: Set(add_property.is_serving),
745 is_unschedulable: Set(add_property.is_unschedulable),
746 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
747 resource_group: Set(None),
748 is_iceberg_compactor: Set(false),
749 };
750 WorkerProperty::insert(worker_property).exec(&txn).await?;
751 txn.commit().await?;
752 self.update_worker_ttl(worker.worker_id, ttl)?;
753 self.update_resource_and_started_at(worker.worker_id, resource)?;
754 Ok(worker.worker_id)
755 } else if worker.worker_type == WorkerType::Compactor {
756 if let Some(property) = property {
757 let mut property: worker_property::ActiveModel = property.into();
758 property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
759 property.internal_rpc_host_addr =
760 Set(Some(add_property.internal_rpc_host_addr));
761
762 WorkerProperty::update(property).exec(&txn).await?;
763 } else {
764 let property = worker_property::ActiveModel {
765 worker_id: Set(worker.worker_id),
766 parallelism: Set(add_property
767 .parallelism
768 .try_into()
769 .expect("invalid parallelism")),
770 is_streaming: Set(false),
771 is_serving: Set(false),
772 is_unschedulable: Set(false),
773 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
774 resource_group: Set(None),
775 is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
776 };
777
778 WorkerProperty::insert(property).exec(&txn).await?;
779 }
780 txn.commit().await?;
781 self.update_worker_ttl(worker.worker_id, ttl)?;
782 self.update_resource_and_started_at(worker.worker_id, resource)?;
783 Ok(worker.worker_id)
784 } else {
785 self.update_worker_ttl(worker.worker_id, ttl)?;
786 self.update_resource_and_started_at(worker.worker_id, resource)?;
787 Ok(worker.worker_id)
788 };
789 }
790
791 let txn_id = self.apply_transaction_id(r#type)?;
792
793 let worker = worker::ActiveModel {
794 worker_id: Default::default(),
795 worker_type: Set(r#type.into()),
796 host: Set(host_address.host.clone()),
797 port: Set(host_address.port),
798 status: Set(WorkerStatus::Starting),
799 transaction_id: Set(txn_id),
800 };
801 let insert_res = Worker::insert(worker).exec(&txn).await?;
802 let worker_id = insert_res.last_insert_id as WorkerId;
803 if r#type == PbWorkerType::ComputeNode
804 || r#type == PbWorkerType::Frontend
805 || r#type == PbWorkerType::Compactor
806 {
807 let (is_serving, is_streaming, is_unschedulable, is_iceberg_compactor, resource_group) =
808 match r#type {
809 PbWorkerType::ComputeNode => (
810 add_property.is_serving,
811 add_property.is_streaming,
812 add_property.is_unschedulable,
813 false,
814 add_property.resource_group.clone(),
815 ),
816 PbWorkerType::Frontend => (
817 add_property.is_serving,
818 add_property.is_streaming,
819 add_property.is_unschedulable,
820 false,
821 None,
822 ),
823 PbWorkerType::Compactor => {
824 (false, false, false, add_property.is_iceberg_compactor, None)
825 }
826 _ => unreachable!(),
827 };
828
829 let property = worker_property::ActiveModel {
830 worker_id: Set(worker_id),
831 parallelism: Set(add_property
832 .parallelism
833 .try_into()
834 .expect("invalid parallelism")),
835 is_streaming: Set(is_streaming),
836 is_serving: Set(is_serving),
837 is_unschedulable: Set(is_unschedulable),
838 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
839 resource_group: Set(resource_group),
840 is_iceberg_compactor: Set(is_iceberg_compactor),
841 };
842 WorkerProperty::insert(property).exec(&txn).await?;
843 }
844
845 txn.commit().await?;
846 if let Some(txn_id) = txn_id {
847 self.available_transactional_ids.retain(|id| *id != txn_id);
848 }
849 let extra_info = WorkerExtraInfo {
850 started_at: Some(timestamp_now_sec()),
851 expire_at: None,
852 resource,
853 };
854 self.worker_extra_info.insert(worker_id, extra_info);
855
856 Ok(worker_id)
857 }
858
859 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
860 let worker = worker::ActiveModel {
861 worker_id: Set(worker_id),
862 status: Set(WorkerStatus::Running),
863 ..Default::default()
864 };
865
866 let worker = worker.update(&self.db).await?;
867 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
868 .one(&self.db)
869 .await?;
870 let extra_info = self.get_extra_info_checked(worker_id)?;
871 Ok(WorkerInfo(worker, worker_property, extra_info).into())
872 }
873
874 pub async fn update_schedulability(
875 &self,
876 worker_ids: Vec<WorkerId>,
877 schedulability: Schedulability,
878 ) -> MetaResult<()> {
879 let is_unschedulable = schedulability == Schedulability::Unschedulable;
880 WorkerProperty::update_many()
881 .col_expr(
882 worker_property::Column::IsUnschedulable,
883 Expr::value(is_unschedulable),
884 )
885 .filter(worker_property::Column::WorkerId.is_in(worker_ids))
886 .exec(&self.db)
887 .await?;
888
889 Ok(())
890 }
891
892 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
893 let worker = Worker::find()
894 .filter(
895 worker::Column::Host
896 .eq(host_addr.host)
897 .and(worker::Column::Port.eq(host_addr.port)),
898 )
899 .find_also_related(WorkerProperty)
900 .one(&self.db)
901 .await?;
902 let Some((worker, property)) = worker else {
903 return Err(MetaError::invalid_parameter("worker not found!"));
904 };
905
906 let res = Worker::delete_by_id(worker.worker_id)
907 .exec(&self.db)
908 .await?;
909 if res.rows_affected == 0 {
910 return Err(MetaError::invalid_parameter("worker not found!"));
911 }
912
913 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
914 if let Some(txn_id) = &worker.transaction_id {
915 self.available_transactional_ids.push_back(*txn_id);
916 }
917 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
918
919 Ok(worker)
920 }
921
922 pub fn heartbeat(
923 &mut self,
924 worker_id: WorkerId,
925 ttl: Duration,
926 resource: Option<PbResource>,
927 ) -> MetaResult<()> {
928 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
929 worker_info.update_ttl(ttl);
930 if let Some(resource) = resource {
931 worker_info.resource = resource;
932 }
933 Ok(())
934 } else {
935 Err(MetaError::invalid_worker(worker_id, "worker not found"))
936 }
937 }
938
939 pub async fn list_workers(
940 &self,
941 worker_type: Option<WorkerType>,
942 worker_status: Option<WorkerStatus>,
943 ) -> MetaResult<Vec<PbWorkerNode>> {
944 let mut find = Worker::find();
945 if let Some(worker_type) = worker_type {
946 find = find.filter(worker::Column::WorkerType.eq(worker_type));
947 }
948 if let Some(worker_status) = worker_status {
949 find = find.filter(worker::Column::Status.eq(worker_status));
950 }
951 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
952 Ok(workers
953 .into_iter()
954 .map(|(worker, property)| {
955 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
956 WorkerInfo(worker, property, extra_info).into()
957 })
958 .collect_vec())
959 }
960
961 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
962 let workers = Worker::find()
963 .filter(
964 worker::Column::WorkerType
965 .eq(WorkerType::ComputeNode)
966 .and(worker::Column::Status.eq(WorkerStatus::Running)),
967 )
968 .inner_join(WorkerProperty)
969 .select_also(WorkerProperty)
970 .filter(worker_property::Column::IsStreaming.eq(true))
971 .all(&self.db)
972 .await?;
973
974 Ok(workers
975 .into_iter()
976 .map(|(worker, property)| {
977 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
978 WorkerInfo(worker, property, extra_info).into()
979 })
980 .collect_vec())
981 }
982
983 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
984 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
985 .select_only()
986 .column(worker_property::Column::WorkerId)
987 .column(worker_property::Column::Parallelism)
988 .inner_join(Worker)
989 .filter(worker::Column::Status.eq(WorkerStatus::Running))
990 .into_tuple()
991 .all(&self.db)
992 .await?;
993 Ok(worker_parallelisms
994 .into_iter()
995 .flat_map(|(worker_id, parallelism)| {
996 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
997 })
998 .collect_vec())
999 }
1000
1001 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
1002 let workers = Worker::find()
1003 .filter(
1004 worker::Column::WorkerType
1005 .eq(WorkerType::ComputeNode)
1006 .and(worker::Column::Status.eq(WorkerStatus::Running)),
1007 )
1008 .inner_join(WorkerProperty)
1009 .select_also(WorkerProperty)
1010 .filter(worker_property::Column::IsServing.eq(true))
1011 .all(&self.db)
1012 .await?;
1013
1014 Ok(workers
1015 .into_iter()
1016 .map(|(worker, property)| {
1017 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
1018 WorkerInfo(worker, property, extra_info).into()
1019 })
1020 .collect_vec())
1021 }
1022
1023 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
1024 let mut streaming_workers = self.list_active_streaming_workers().await?;
1025
1026 let unschedulable_workers: HashSet<_> = streaming_workers
1027 .extract_if(.., |worker| {
1028 worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
1029 })
1030 .map(|w| w.id)
1031 .collect();
1032
1033 let schedulable_workers = streaming_workers
1034 .iter()
1035 .map(|worker| worker.id)
1036 .filter(|id| !unschedulable_workers.contains(id))
1037 .collect();
1038
1039 let active_workers: HashMap<_, _> =
1040 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
1041
1042 Ok(StreamingClusterInfo {
1043 worker_nodes: active_workers,
1044 schedulable_workers,
1045 unschedulable_workers,
1046 })
1047 }
1048
1049 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
1050 let worker = Worker::find_by_id(worker_id)
1051 .find_also_related(WorkerProperty)
1052 .one(&self.db)
1053 .await?;
1054 if worker.is_none() {
1055 return Ok(None);
1056 }
1057 let extra_info = self.get_extra_info_checked(worker_id)?;
1058 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1059 }
1060
1061 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1062 self.worker_extra_info.get(&worker_id).cloned()
1063 }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069
1070 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1071 (0..count)
1072 .map(|i| HostAddress {
1073 host: "localhost".to_owned(),
1074 port: 5000 + i as i32,
1075 })
1076 .collect_vec()
1077 }
1078
1079 #[tokio::test]
1080 async fn test_cluster_controller() -> MetaResult<()> {
1081 let env = MetaSrvEnv::for_test().await;
1082 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1083
1084 let parallelism_num = 4_usize;
1085 let worker_count = 5_usize;
1086 let property = AddNodeProperty {
1087 parallelism: parallelism_num as _,
1088 is_streaming: true,
1089 is_serving: true,
1090 is_unschedulable: false,
1091 ..Default::default()
1092 };
1093 let hosts = mock_worker_hosts_for_test(worker_count);
1094 let mut worker_ids = vec![];
1095 for host in &hosts {
1096 worker_ids.push(
1097 cluster_ctl
1098 .add_worker(
1099 PbWorkerType::ComputeNode,
1100 host.clone(),
1101 property.clone(),
1102 PbResource::default(),
1103 )
1104 .await?,
1105 );
1106 }
1107
1108 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1110
1111 for id in &worker_ids {
1112 cluster_ctl.activate_worker(*id).await?;
1113 }
1114 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1115 assert_eq!(
1116 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1117 worker_count
1118 );
1119 assert_eq!(
1120 cluster_ctl.list_active_streaming_workers().await?.len(),
1121 worker_count
1122 );
1123 assert_eq!(
1124 cluster_ctl.list_active_serving_workers().await?.len(),
1125 worker_count
1126 );
1127 assert_eq!(
1128 cluster_ctl.list_active_worker_slots().await?.len(),
1129 parallelism_num * worker_count
1130 );
1131
1132 let mut new_property = property.clone();
1134 new_property.parallelism = (parallelism_num * 2) as _;
1135 new_property.is_serving = false;
1136 cluster_ctl
1137 .add_worker(
1138 PbWorkerType::ComputeNode,
1139 hosts[0].clone(),
1140 new_property,
1141 PbResource::default(),
1142 )
1143 .await?;
1144
1145 assert_eq!(
1146 cluster_ctl.list_active_streaming_workers().await?.len(),
1147 worker_count
1148 );
1149 assert_eq!(
1150 cluster_ctl.list_active_serving_workers().await?.len(),
1151 worker_count - 1
1152 );
1153 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1154 assert!(worker_slots.iter().all_unique());
1155 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1156
1157 for host in hosts {
1159 cluster_ctl.delete_worker(host).await?;
1160 }
1161 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1162 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1163 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1164
1165 Ok(())
1166 }
1167
1168 #[tokio::test]
1169 async fn test_update_schedulability() -> MetaResult<()> {
1170 let env = MetaSrvEnv::for_test().await;
1171 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1172
1173 let host = HostAddress {
1174 host: "localhost".to_owned(),
1175 port: 5001,
1176 };
1177 let mut property = AddNodeProperty {
1178 is_streaming: true,
1179 is_serving: true,
1180 is_unschedulable: false,
1181 parallelism: 4,
1182 ..Default::default()
1183 };
1184 let worker_id = cluster_ctl
1185 .add_worker(
1186 PbWorkerType::ComputeNode,
1187 host.clone(),
1188 property.clone(),
1189 PbResource::default(),
1190 )
1191 .await?;
1192
1193 cluster_ctl.activate_worker(worker_id).await?;
1194 cluster_ctl
1195 .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1196 .await?;
1197
1198 let workers = cluster_ctl.list_active_streaming_workers().await?;
1199 assert_eq!(workers.len(), 1);
1200 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1201
1202 property.is_unschedulable = false;
1204 property.is_serving = false;
1205 let new_worker_id = cluster_ctl
1206 .add_worker(
1207 PbWorkerType::ComputeNode,
1208 host.clone(),
1209 property,
1210 PbResource::default(),
1211 )
1212 .await?;
1213 assert_eq!(worker_id, new_worker_id);
1214
1215 let workers = cluster_ctl.list_active_streaming_workers().await?;
1216 assert_eq!(workers.len(), 1);
1217 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1218
1219 cluster_ctl.delete_worker(host).await?;
1220
1221 Ok(())
1222 }
1223
1224 #[tokio::test]
1225 async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1226 let env = MetaSrvEnv::for_test().await;
1227 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1228
1229 let workers = cluster_ctl.list_workers(None, None).await?;
1231 assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1232
1233 let workers = cluster_ctl
1235 .list_workers(Some(WorkerType::Meta), None)
1236 .await?;
1237 assert_eq!(workers.len(), 1);
1238 assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1239
1240 let workers = cluster_ctl
1242 .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1243 .await?;
1244 assert!(workers.is_empty());
1245
1246 Ok(())
1247 }
1248
1249 #[tokio::test]
1250 async fn test_heartbeat_updates_resource() -> MetaResult<()> {
1251 let env = MetaSrvEnv::for_test().await;
1252 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1253
1254 let host = HostAddress {
1255 host: "localhost".to_owned(),
1256 port: 5010,
1257 };
1258 let property = AddNodeProperty {
1259 is_streaming: true,
1260 is_serving: true,
1261 is_unschedulable: false,
1262 parallelism: 4,
1263 ..Default::default()
1264 };
1265
1266 let resource_v1 = PbResource {
1267 rw_version: "rw-v1".to_owned(),
1268 total_memory_bytes: 1024,
1269 total_cpu_cores: 4,
1270 hostname: "host-v1".to_owned(),
1271 };
1272 let worker_id = cluster_ctl
1273 .add_worker(
1274 PbWorkerType::ComputeNode,
1275 host.clone(),
1276 property,
1277 resource_v1,
1278 )
1279 .await?;
1280
1281 let resource_v2 = PbResource {
1282 rw_version: "rw-v2".to_owned(),
1283 total_memory_bytes: 2048,
1284 total_cpu_cores: 8,
1285 hostname: "host-v2".to_owned(),
1286 };
1287 cluster_ctl
1288 .heartbeat(worker_id, Some(resource_v2.clone()))
1289 .await?;
1290
1291 let worker = cluster_ctl
1292 .get_worker_by_id(worker_id)
1293 .await?
1294 .expect("worker should exist");
1295 assert_eq!(
1296 worker.resource.expect("worker resource should exist"),
1297 resource_v2
1298 );
1299
1300 cluster_ctl.delete_worker(host).await?;
1301 Ok(())
1302 }
1303
1304 #[tokio::test]
1305 async fn test_reregister_compute_node_updates_resource() -> MetaResult<()> {
1306 let env = MetaSrvEnv::for_test().await;
1307 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1308
1309 let host = HostAddress {
1310 host: "localhost".to_owned(),
1311 port: 5011,
1312 };
1313 let property = AddNodeProperty {
1314 is_streaming: true,
1315 is_serving: true,
1316 is_unschedulable: false,
1317 parallelism: 4,
1318 ..Default::default()
1319 };
1320
1321 let resource_v1 = PbResource {
1322 rw_version: "rw-v1".to_owned(),
1323 total_memory_bytes: 1024,
1324 total_cpu_cores: 4,
1325 hostname: "host-v1".to_owned(),
1326 };
1327 let worker_id = cluster_ctl
1328 .add_worker(
1329 PbWorkerType::ComputeNode,
1330 host.clone(),
1331 property.clone(),
1332 resource_v1,
1333 )
1334 .await?;
1335
1336 let resource_v2 = PbResource {
1337 rw_version: "rw-v2".to_owned(),
1338 total_memory_bytes: 2048,
1339 total_cpu_cores: 8,
1340 hostname: "host-v2".to_owned(),
1341 };
1342 cluster_ctl
1343 .add_worker(
1344 PbWorkerType::ComputeNode,
1345 host.clone(),
1346 property,
1347 resource_v2.clone(),
1348 )
1349 .await?;
1350
1351 let worker = cluster_ctl
1352 .get_worker_by_id(worker_id)
1353 .await?
1354 .expect("worker should exist");
1355 assert_eq!(
1356 worker.resource.expect("worker resource should exist"),
1357 resource_v2
1358 );
1359
1360 cluster_ctl.delete_worker(host).await?;
1361 Ok(())
1362 }
1363}