1use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37 ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use sea_orm::ActiveValue::Set;
41use sea_orm::{
42 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43 TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59 env: MetaSrvEnv,
60 max_heartbeat_interval: Duration,
61 inner: RwLock<ClusterControllerInner>,
62 started_at: u64,
64}
65
66struct WorkerInfo(
67 worker::Model,
68 Option<worker_property::Model>,
69 WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73 fn from(info: WorkerInfo) -> Self {
74 Self {
75 id: info.0.worker_id,
76 r#type: PbWorkerType::from(info.0.worker_type) as _,
77 host: Some(PbHostAddress {
78 host: info.0.host,
79 port: info.0.port,
80 }),
81 state: PbState::from(info.0.status) as _,
82 property: info.1.as_ref().map(|p| PbProperty {
83 is_streaming: p.is_streaming,
84 is_serving: p.is_serving,
85 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
86 resource_group: p.resource_group.clone(),
87 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
88 is_iceberg_compactor: p.is_iceberg_compactor,
89 }),
90 transactional_id: info.0.transaction_id.map(|id| id as _),
91 resource: Some(info.2.resource),
92 started_at: info.2.started_at,
93 }
94 }
95}
96
97impl ClusterController {
98 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99 let inner = ClusterControllerInner::new(
100 env.meta_store_ref().conn.clone(),
101 env.opts.disable_automatic_parallelism_control,
102 )
103 .await?;
104 Ok(Self {
105 env,
106 max_heartbeat_interval,
107 inner: RwLock::new(inner),
108 started_at: timestamp_now_sec(),
109 })
110 }
111
112 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115 self.inner.read().await
116 }
117
118 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119 self.inner.read().await.count_worker_by_type().await
120 }
121
122 pub async fn cluster_resource(&self) -> ClusterResource {
124 self.inner.read().await.cluster_resource()
125 }
126
127 async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
129 let resource = self.cluster_resource().await;
130
131 LicenseManager::get().update_cluster_resource(resource);
133 self.env.notification_manager().notify_all_without_version(
135 Operation::Update, Info::ClusterResource(resource),
137 );
138
139 Ok(())
140 }
141
142 pub async fn add_worker(
147 &self,
148 r#type: PbWorkerType,
149 host_address: HostAddress,
150 property: AddNodeProperty,
151 resource: PbResource,
152 ) -> MetaResult<WorkerId> {
153 let worker_id = self
154 .inner
155 .write()
156 .await
157 .add_worker(
158 r#type,
159 host_address,
160 property,
161 resource,
162 self.max_heartbeat_interval,
163 )
164 .await?;
165
166 self.update_cluster_resource_for_license().await?;
168
169 Ok(worker_id)
170 }
171
172 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
173 let inner = self.inner.write().await;
174 let worker = inner.activate_worker(worker_id).await?;
175
176 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
179 {
180 self.env
181 .notification_manager()
182 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
183 .await;
184 }
185 self.env
186 .notification_manager()
187 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
188
189 Ok(())
190 }
191
192 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
193 let worker = self.inner.write().await.delete_worker(host_address).await?;
194
195 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
196 {
197 self.env
198 .notification_manager()
199 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
200 .await;
201 }
202
203 self.update_cluster_resource_for_license().await?;
205
206 self.env
210 .notification_manager()
211 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
212
213 Ok(worker)
214 }
215
216 pub async fn heartbeat(
218 &self,
219 worker_id: WorkerId,
220 resource: Option<PbResource>,
221 ) -> MetaResult<()> {
222 tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
223 self.inner
224 .write()
225 .await
226 .heartbeat(worker_id, self.max_heartbeat_interval, resource)
227 }
228
229 pub fn start_heartbeat_checker(
230 cluster_controller: ClusterControllerRef,
231 check_interval: Duration,
232 ) -> (JoinHandle<()>, Sender<()>) {
233 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
234 let join_handle = tokio::spawn(async move {
235 let mut min_interval = tokio::time::interval(check_interval);
236 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
237 loop {
238 tokio::select! {
239 _ = min_interval.tick() => {},
241 _ = &mut shutdown_rx => {
243 tracing::info!("Heartbeat checker is stopped");
244 return;
245 }
246 }
247
248 let mut inner = cluster_controller.inner.write().await;
249 for worker in inner
251 .worker_extra_info
252 .values_mut()
253 .filter(|worker| worker.expire_at.is_none())
254 {
255 worker.update_ttl(cluster_controller.max_heartbeat_interval);
256 }
257
258 let now = timestamp_now_sec();
260 let worker_to_delete = inner
261 .worker_extra_info
262 .iter()
263 .filter(|(_, info)| info.expire_at.unwrap() < now)
264 .map(|(id, _)| *id)
265 .collect_vec();
266 if worker_to_delete.is_empty() {
267 drop(inner);
268 continue;
269 }
270
271 let worker_infos = match Worker::find()
273 .select_only()
274 .column(worker::Column::WorkerId)
275 .column(worker::Column::WorkerType)
276 .column(worker::Column::Host)
277 .column(worker::Column::Port)
278 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
279 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
280 .all(&inner.db)
281 .await
282 {
283 Ok(keys) => keys,
284 Err(err) => {
285 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
286 continue;
287 }
288 };
289 drop(inner);
290
291 for (worker_id, worker_type, host, port) in worker_infos {
292 let host_addr = PbHostAddress { host, port };
293 match cluster_controller.delete_worker(host_addr.clone()).await {
294 Ok(_) => {
295 tracing::warn!(
296 %worker_id,
297 ?host_addr,
298 %now,
299 "Deleted expired worker"
300 );
301 match worker_type {
302 WorkerType::Frontend
303 | WorkerType::ComputeNode
304 | WorkerType::Compactor
305 | WorkerType::RiseCtl => cluster_controller
306 .env
307 .notification_manager()
308 .delete_sender(worker_type.into(), WorkerKey(host_addr)),
309 _ => {}
310 };
311 }
312 Err(err) => {
313 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
314 }
315 }
316 }
317 }
318 });
319
320 (join_handle, shutdown_tx)
321 }
322
323 pub async fn list_workers(
328 &self,
329 worker_type: Option<WorkerType>,
330 worker_status: Option<WorkerStatus>,
331 ) -> MetaResult<Vec<PbWorkerNode>> {
332 let mut workers = vec![];
333 let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
336 let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
338 if include_meta {
339 workers.push(meta_node_info(
340 &self.env.opts.advertise_addr,
341 Some(self.started_at),
342 ));
343 }
344 workers.extend(
345 self.inner
346 .read()
347 .await
348 .list_workers(worker_type, worker_status)
349 .await?,
350 );
351 Ok(workers)
352 }
353
354 pub(crate) async fn subscribe_active_streaming_compute_nodes(
355 &self,
356 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
357 let inner = self.inner.read().await;
358 let worker_nodes = inner.list_active_streaming_workers().await?;
359 let (tx, rx) = unbounded_channel();
360
361 self.env.notification_manager().insert_local_sender(tx);
363 drop(inner);
364 Ok((worker_nodes, rx))
365 }
366
367 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
370 self.inner
371 .read()
372 .await
373 .list_active_streaming_workers()
374 .await
375 }
376
377 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
378 self.inner.read().await.list_active_worker_slots().await
379 }
380
381 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
383 self.inner.read().await.list_active_serving_workers().await
384 }
385
386 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
388 self.inner.read().await.get_streaming_cluster_info().await
389 }
390
391 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
392 self.inner.read().await.get_worker_by_id(worker_id).await
393 }
394
395 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
396 self.inner
397 .read()
398 .await
399 .get_worker_extra_info_by_id(worker_id)
400 }
401
402 pub fn cluster_id(&self) -> &ClusterId {
403 self.env.cluster_id()
404 }
405
406 pub fn meta_store_endpoint(&self) -> String {
407 self.env.meta_store_ref().endpoint.clone()
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct StreamingClusterInfo {
414 pub worker_nodes: HashMap<WorkerId, WorkerNode>,
416}
417
418impl StreamingClusterInfo {
420 pub fn parallelism(&self, resource_group: &str) -> usize {
421 let available_worker_ids =
422 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
423
424 self.worker_nodes
425 .values()
426 .filter(|worker| available_worker_ids.contains(&(worker.id)))
427 .map(|worker| worker.compute_node_parallelism())
428 .sum()
429 }
430
431 pub fn filter_workers_by_resource_group(
432 &self,
433 resource_group: &str,
434 ) -> HashMap<WorkerId, WorkerNode> {
435 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
436 self.worker_nodes
437 .iter()
438 .filter(|(id, _)| worker_ids.contains(*id))
439 .map(|(id, worker)| (*id, worker.clone()))
440 .collect()
441 }
442}
443
444#[derive(Default, Clone, Debug)]
445pub struct WorkerExtraInfo {
446 expire_at: Option<u64>,
450 started_at: Option<u64>,
451 resource: PbResource,
452}
453
454impl WorkerExtraInfo {
455 fn update_ttl(&mut self, ttl: Duration) {
456 let expire = cmp::max(
457 self.expire_at.unwrap_or_default(),
458 SystemTime::now()
459 .add(ttl)
460 .duration_since(SystemTime::UNIX_EPOCH)
461 .expect("Clock may have gone backwards")
462 .as_secs(),
463 );
464 self.expire_at = Some(expire);
465 }
466
467 fn update_started_at(&mut self) {
468 self.started_at = Some(timestamp_now_sec());
469 }
470}
471
472fn timestamp_now_sec() -> u64 {
473 SystemTime::now()
474 .duration_since(SystemTime::UNIX_EPOCH)
475 .expect("Clock may have gone backwards")
476 .as_secs()
477}
478
479fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
480 PbWorkerNode {
481 id: META_NODE_ID,
482 r#type: PbWorkerType::Meta.into(),
483 host: HostAddr::try_from(host)
484 .as_ref()
485 .map(HostAddr::to_protobuf)
486 .ok(),
487 state: PbState::Running as _,
488 property: None,
489 transactional_id: None,
490 resource: Some(risingwave_pb::common::worker_node::Resource {
491 rw_version: RW_VERSION.to_owned(),
492 total_memory_bytes: system_memory_available_bytes() as _,
493 total_cpu_cores: total_cpu_available() as _,
494 hostname: hostname(),
495 }),
496 started_at,
497 }
498}
499
500pub struct ClusterControllerInner {
501 db: DatabaseConnection,
502 available_transactional_ids: VecDeque<TransactionId>,
504 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
505 disable_automatic_parallelism_control: bool,
506}
507
508impl ClusterControllerInner {
509 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
510 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
511
512 pub async fn new(
513 db: DatabaseConnection,
514 disable_automatic_parallelism_control: bool,
515 ) -> MetaResult<Self> {
516 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
517 .select_only()
518 .column(worker::Column::WorkerId)
519 .column(worker::Column::TransactionId)
520 .into_tuple()
521 .all(&db)
522 .await?;
523 let inuse_txn_ids: HashSet<_> = workers
524 .iter()
525 .cloned()
526 .filter_map(|(_, txn_id)| txn_id)
527 .collect();
528 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
529 .filter(|id| !inuse_txn_ids.contains(id))
530 .collect();
531
532 let worker_extra_info = workers
533 .into_iter()
534 .map(|(w, _)| (w, WorkerExtraInfo::default()))
535 .collect();
536
537 Ok(Self {
538 db,
539 available_transactional_ids,
540 worker_extra_info,
541 disable_automatic_parallelism_control,
542 })
543 }
544
545 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
546 let workers: Vec<(WorkerType, i64)> = Worker::find()
547 .select_only()
548 .column(worker::Column::WorkerType)
549 .column_as(worker::Column::WorkerId.count(), "count")
550 .group_by(worker::Column::WorkerType)
551 .into_tuple()
552 .all(&self.db)
553 .await?;
554
555 Ok(workers.into_iter().collect())
556 }
557
558 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
559 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
560 let expire = cmp::max(
561 info.expire_at.unwrap_or_default(),
562 SystemTime::now()
563 .add(ttl)
564 .duration_since(SystemTime::UNIX_EPOCH)
565 .expect("Clock may have gone backwards")
566 .as_secs(),
567 );
568 info.expire_at = Some(expire);
569 Ok(())
570 } else {
571 Err(MetaError::invalid_worker(worker_id, "worker not found"))
572 }
573 }
574
575 fn update_resource_and_started_at(
576 &mut self,
577 worker_id: WorkerId,
578 resource: PbResource,
579 ) -> MetaResult<()> {
580 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
581 info.resource = resource;
582 info.update_started_at();
583 Ok(())
584 } else {
585 Err(MetaError::invalid_worker(worker_id, "worker not found"))
586 }
587 }
588
589 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
590 self.worker_extra_info
591 .get(&worker_id)
592 .cloned()
593 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
594 }
595
596 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
597 match (self.available_transactional_ids.front(), r#type) {
598 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
599 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
601 _ => Ok(None),
602 }
603 }
604
605 fn cluster_resource(&self) -> ClusterResource {
607 let mut per_host = HashMap::new();
609
610 per_host.insert(
613 hostname(),
614 ClusterResource {
615 total_cpu_cores: total_cpu_available() as _,
616 total_memory_bytes: system_memory_available_bytes() as _,
617 },
618 );
619
620 for info in self.worker_extra_info.values() {
621 let r = per_host
622 .entry(info.resource.hostname.clone())
623 .or_insert_with(ClusterResource::default);
624
625 r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
626 r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
627 }
628
629 per_host
631 .into_values()
632 .reduce(|a, b| ClusterResource {
633 total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
634 total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
635 })
636 .unwrap_or_default()
637 }
638
639 #[await_tree::instrument]
640 pub async fn add_worker(
641 &mut self,
642 r#type: PbWorkerType,
643 host_address: HostAddress,
644 add_property: AddNodeProperty,
645 resource: PbResource,
646 ttl: Duration,
647 ) -> MetaResult<WorkerId> {
648 let txn = self.db.begin().await?;
649
650 let worker = Worker::find()
651 .filter(
652 worker::Column::Host
653 .eq(host_address.host.clone())
654 .and(worker::Column::Port.eq(host_address.port)),
655 )
656 .find_also_related(WorkerProperty)
657 .one(&txn)
658 .await?;
659 if let Some((worker, property)) = worker {
661 assert_eq!(worker.worker_type, r#type.into());
662 return if worker.worker_type == WorkerType::ComputeNode {
663 let property = property.unwrap();
664 let mut current_parallelism = property.parallelism as usize;
665 let new_parallelism = add_property.parallelism as usize;
666 match new_parallelism.cmp(¤t_parallelism) {
667 Ordering::Less => {
668 if !self.disable_automatic_parallelism_control {
669 tracing::info!(
671 "worker {} parallelism reduced from {} to {}",
672 worker.worker_id,
673 current_parallelism,
674 new_parallelism
675 );
676 current_parallelism = new_parallelism;
677 } else {
678 tracing::warn!(
681 "worker {} parallelism is less than current, current is {}, but received {}",
682 worker.worker_id,
683 current_parallelism,
684 new_parallelism
685 );
686 }
687 }
688 Ordering::Greater => {
689 tracing::info!(
690 "worker {} parallelism updated from {} to {}",
691 worker.worker_id,
692 current_parallelism,
693 new_parallelism
694 );
695 current_parallelism = new_parallelism;
696 }
697 Ordering::Equal => {}
698 }
699 let mut property: worker_property::ActiveModel = property.into();
700
701 property.is_streaming = Set(add_property.is_streaming);
702 property.is_serving = Set(add_property.is_serving);
703 property.parallelism = Set(current_parallelism as _);
704 property.resource_group =
705 Set(Some(add_property.resource_group.unwrap_or_else(|| {
706 tracing::warn!(
707 "resource_group is not set for worker {}, fallback to `default`",
708 worker.worker_id
709 );
710 DEFAULT_RESOURCE_GROUP.to_owned()
711 })));
712
713 WorkerProperty::update(property).exec(&txn).await?;
714 txn.commit().await?;
715 self.update_worker_ttl(worker.worker_id, ttl)?;
716 self.update_resource_and_started_at(worker.worker_id, resource)?;
717 Ok(worker.worker_id)
718 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
719 let worker_property = worker_property::ActiveModel {
720 worker_id: Set(worker.worker_id),
721 parallelism: Set(add_property
722 .parallelism
723 .try_into()
724 .expect("invalid parallelism")),
725 is_streaming: Set(add_property.is_streaming),
726 is_serving: Set(add_property.is_serving),
727 is_unschedulable: Set(false),
728 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
729 resource_group: Set(None),
730 is_iceberg_compactor: Set(false),
731 };
732 WorkerProperty::insert(worker_property).exec(&txn).await?;
733 txn.commit().await?;
734 self.update_worker_ttl(worker.worker_id, ttl)?;
735 self.update_resource_and_started_at(worker.worker_id, resource)?;
736 Ok(worker.worker_id)
737 } else if worker.worker_type == WorkerType::Compactor {
738 if let Some(property) = property {
739 let mut property: worker_property::ActiveModel = property.into();
740 property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
741 property.internal_rpc_host_addr =
742 Set(Some(add_property.internal_rpc_host_addr));
743
744 WorkerProperty::update(property).exec(&txn).await?;
745 } else {
746 let property = worker_property::ActiveModel {
747 worker_id: Set(worker.worker_id),
748 parallelism: Set(add_property
749 .parallelism
750 .try_into()
751 .expect("invalid parallelism")),
752 is_streaming: Set(false),
753 is_serving: Set(false),
754 is_unschedulable: Set(false),
755 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
756 resource_group: Set(None),
757 is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
758 };
759
760 WorkerProperty::insert(property).exec(&txn).await?;
761 }
762 txn.commit().await?;
763 self.update_worker_ttl(worker.worker_id, ttl)?;
764 self.update_resource_and_started_at(worker.worker_id, resource)?;
765 Ok(worker.worker_id)
766 } else {
767 self.update_worker_ttl(worker.worker_id, ttl)?;
768 self.update_resource_and_started_at(worker.worker_id, resource)?;
769 Ok(worker.worker_id)
770 };
771 }
772
773 let txn_id = self.apply_transaction_id(r#type)?;
774
775 let worker = worker::ActiveModel {
776 worker_id: Default::default(),
777 worker_type: Set(r#type.into()),
778 host: Set(host_address.host.clone()),
779 port: Set(host_address.port),
780 status: Set(WorkerStatus::Starting),
781 transaction_id: Set(txn_id),
782 };
783 let insert_res = Worker::insert(worker).exec(&txn).await?;
784 let worker_id = insert_res.last_insert_id as WorkerId;
785 if r#type == PbWorkerType::ComputeNode
786 || r#type == PbWorkerType::Frontend
787 || r#type == PbWorkerType::Compactor
788 {
789 let (is_serving, is_streaming, is_iceberg_compactor, resource_group) = match r#type {
790 PbWorkerType::ComputeNode => (
791 add_property.is_serving,
792 add_property.is_streaming,
793 false,
794 add_property.resource_group.clone(),
795 ),
796 PbWorkerType::Frontend => (
797 add_property.is_serving,
798 add_property.is_streaming,
799 false,
800 None,
801 ),
802 PbWorkerType::Compactor => (false, false, add_property.is_iceberg_compactor, None),
803 _ => unreachable!(),
804 };
805
806 let property = worker_property::ActiveModel {
807 worker_id: Set(worker_id),
808 parallelism: Set(add_property
809 .parallelism
810 .try_into()
811 .expect("invalid parallelism")),
812 is_streaming: Set(is_streaming),
813 is_serving: Set(is_serving),
814 is_unschedulable: Set(false),
815 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
816 resource_group: Set(resource_group),
817 is_iceberg_compactor: Set(is_iceberg_compactor),
818 };
819 WorkerProperty::insert(property).exec(&txn).await?;
820 }
821
822 txn.commit().await?;
823 if let Some(txn_id) = txn_id {
824 self.available_transactional_ids.retain(|id| *id != txn_id);
825 }
826 let extra_info = WorkerExtraInfo {
827 started_at: Some(timestamp_now_sec()),
828 expire_at: None,
829 resource,
830 };
831 self.worker_extra_info.insert(worker_id, extra_info);
832
833 Ok(worker_id)
834 }
835
836 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
837 let worker = worker::ActiveModel {
838 worker_id: Set(worker_id),
839 status: Set(WorkerStatus::Running),
840 ..Default::default()
841 };
842
843 let worker = worker.update(&self.db).await?;
844 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
845 .one(&self.db)
846 .await?;
847 let extra_info = self.get_extra_info_checked(worker_id)?;
848 Ok(WorkerInfo(worker, worker_property, extra_info).into())
849 }
850
851 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
852 let worker = Worker::find()
853 .filter(
854 worker::Column::Host
855 .eq(host_addr.host)
856 .and(worker::Column::Port.eq(host_addr.port)),
857 )
858 .find_also_related(WorkerProperty)
859 .one(&self.db)
860 .await?;
861 let Some((worker, property)) = worker else {
862 return Err(MetaError::invalid_parameter("worker not found!"));
863 };
864
865 let res = Worker::delete_by_id(worker.worker_id)
866 .exec(&self.db)
867 .await?;
868 if res.rows_affected == 0 {
869 return Err(MetaError::invalid_parameter("worker not found!"));
870 }
871
872 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
873 if let Some(txn_id) = &worker.transaction_id {
874 self.available_transactional_ids.push_back(*txn_id);
875 }
876 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
877
878 Ok(worker)
879 }
880
881 pub fn heartbeat(
882 &mut self,
883 worker_id: WorkerId,
884 ttl: Duration,
885 resource: Option<PbResource>,
886 ) -> MetaResult<()> {
887 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
888 worker_info.update_ttl(ttl);
889 if let Some(resource) = resource {
890 worker_info.resource = resource;
891 }
892 Ok(())
893 } else {
894 Err(MetaError::invalid_worker(worker_id, "worker not found"))
895 }
896 }
897
898 pub async fn list_workers(
899 &self,
900 worker_type: Option<WorkerType>,
901 worker_status: Option<WorkerStatus>,
902 ) -> MetaResult<Vec<PbWorkerNode>> {
903 let mut find = Worker::find();
904 if let Some(worker_type) = worker_type {
905 find = find.filter(worker::Column::WorkerType.eq(worker_type));
906 }
907 if let Some(worker_status) = worker_status {
908 find = find.filter(worker::Column::Status.eq(worker_status));
909 }
910 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
911 Ok(workers
912 .into_iter()
913 .map(|(worker, property)| {
914 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
915 WorkerInfo(worker, property, extra_info).into()
916 })
917 .collect_vec())
918 }
919
920 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
921 let workers = Worker::find()
922 .filter(
923 worker::Column::WorkerType
924 .eq(WorkerType::ComputeNode)
925 .and(worker::Column::Status.eq(WorkerStatus::Running)),
926 )
927 .inner_join(WorkerProperty)
928 .select_also(WorkerProperty)
929 .filter(worker_property::Column::IsStreaming.eq(true))
930 .all(&self.db)
931 .await?;
932
933 Ok(workers
934 .into_iter()
935 .map(|(worker, property)| {
936 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
937 WorkerInfo(worker, property, extra_info).into()
938 })
939 .collect_vec())
940 }
941
942 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
943 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
944 .select_only()
945 .column(worker_property::Column::WorkerId)
946 .column(worker_property::Column::Parallelism)
947 .inner_join(Worker)
948 .filter(worker::Column::Status.eq(WorkerStatus::Running))
949 .into_tuple()
950 .all(&self.db)
951 .await?;
952 Ok(worker_parallelisms
953 .into_iter()
954 .flat_map(|(worker_id, parallelism)| {
955 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
956 })
957 .collect_vec())
958 }
959
960 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
961 let workers = Worker::find()
962 .filter(
963 worker::Column::WorkerType
964 .eq(WorkerType::ComputeNode)
965 .and(worker::Column::Status.eq(WorkerStatus::Running)),
966 )
967 .inner_join(WorkerProperty)
968 .select_also(WorkerProperty)
969 .filter(worker_property::Column::IsServing.eq(true))
970 .all(&self.db)
971 .await?;
972
973 Ok(workers
974 .into_iter()
975 .map(|(worker, property)| {
976 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
977 WorkerInfo(worker, property, extra_info).into()
978 })
979 .collect_vec())
980 }
981
982 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
983 let streaming_workers = self.list_active_streaming_workers().await?;
984
985 let active_workers: HashMap<_, _> =
986 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
987
988 Ok(StreamingClusterInfo {
989 worker_nodes: active_workers,
990 })
991 }
992
993 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
994 let worker = Worker::find_by_id(worker_id)
995 .find_also_related(WorkerProperty)
996 .one(&self.db)
997 .await?;
998 if worker.is_none() {
999 return Ok(None);
1000 }
1001 let extra_info = self.get_extra_info_checked(worker_id)?;
1002 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1003 }
1004
1005 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1006 self.worker_extra_info.get(&worker_id).cloned()
1007 }
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use super::*;
1013
1014 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1015 (0..count)
1016 .map(|i| HostAddress {
1017 host: "localhost".to_owned(),
1018 port: 5000 + i as i32,
1019 })
1020 .collect_vec()
1021 }
1022
1023 #[tokio::test]
1024 async fn test_cluster_controller() -> MetaResult<()> {
1025 let env = MetaSrvEnv::for_test().await;
1026 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1027
1028 let parallelism_num = 4_usize;
1029 let worker_count = 5_usize;
1030 let property = AddNodeProperty {
1031 parallelism: parallelism_num as _,
1032 is_streaming: true,
1033 is_serving: true,
1034 ..Default::default()
1035 };
1036 let hosts = mock_worker_hosts_for_test(worker_count);
1037 let mut worker_ids = vec![];
1038 for host in &hosts {
1039 worker_ids.push(
1040 cluster_ctl
1041 .add_worker(
1042 PbWorkerType::ComputeNode,
1043 host.clone(),
1044 property.clone(),
1045 PbResource::default(),
1046 )
1047 .await?,
1048 );
1049 }
1050
1051 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1053
1054 for id in &worker_ids {
1055 cluster_ctl.activate_worker(*id).await?;
1056 }
1057 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1058 assert_eq!(
1059 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1060 worker_count
1061 );
1062 assert_eq!(
1063 cluster_ctl.list_active_streaming_workers().await?.len(),
1064 worker_count
1065 );
1066 assert_eq!(
1067 cluster_ctl.list_active_serving_workers().await?.len(),
1068 worker_count
1069 );
1070 assert_eq!(
1071 cluster_ctl.list_active_worker_slots().await?.len(),
1072 parallelism_num * worker_count
1073 );
1074
1075 let mut new_property = property.clone();
1077 new_property.parallelism = (parallelism_num * 2) as _;
1078 new_property.is_serving = false;
1079 cluster_ctl
1080 .add_worker(
1081 PbWorkerType::ComputeNode,
1082 hosts[0].clone(),
1083 new_property,
1084 PbResource::default(),
1085 )
1086 .await?;
1087
1088 assert_eq!(
1089 cluster_ctl.list_active_streaming_workers().await?.len(),
1090 worker_count
1091 );
1092 assert_eq!(
1093 cluster_ctl.list_active_serving_workers().await?.len(),
1094 worker_count - 1
1095 );
1096 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1097 assert!(worker_slots.iter().all_unique());
1098 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1099
1100 for host in hosts {
1102 cluster_ctl.delete_worker(host).await?;
1103 }
1104 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1105 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1106 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1107
1108 Ok(())
1109 }
1110
1111 #[tokio::test]
1112 async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1113 let env = MetaSrvEnv::for_test().await;
1114 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1115
1116 let workers = cluster_ctl.list_workers(None, None).await?;
1118 assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1119
1120 let workers = cluster_ctl
1122 .list_workers(Some(WorkerType::Meta), None)
1123 .await?;
1124 assert_eq!(workers.len(), 1);
1125 assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1126
1127 let workers = cluster_ctl
1129 .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1130 .await?;
1131 assert!(workers.is_empty());
1132
1133 Ok(())
1134 }
1135
1136 #[tokio::test]
1137 async fn test_heartbeat_updates_resource() -> MetaResult<()> {
1138 let env = MetaSrvEnv::for_test().await;
1139 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1140
1141 let host = HostAddress {
1142 host: "localhost".to_owned(),
1143 port: 5010,
1144 };
1145 let property = AddNodeProperty {
1146 is_streaming: true,
1147 is_serving: true,
1148 parallelism: 4,
1149 ..Default::default()
1150 };
1151
1152 let resource_v1 = PbResource {
1153 rw_version: "rw-v1".to_owned(),
1154 total_memory_bytes: 1024,
1155 total_cpu_cores: 4,
1156 hostname: "host-v1".to_owned(),
1157 };
1158 let worker_id = cluster_ctl
1159 .add_worker(
1160 PbWorkerType::ComputeNode,
1161 host.clone(),
1162 property,
1163 resource_v1,
1164 )
1165 .await?;
1166
1167 let resource_v2 = PbResource {
1168 rw_version: "rw-v2".to_owned(),
1169 total_memory_bytes: 2048,
1170 total_cpu_cores: 8,
1171 hostname: "host-v2".to_owned(),
1172 };
1173 cluster_ctl
1174 .heartbeat(worker_id, Some(resource_v2.clone()))
1175 .await?;
1176
1177 let worker = cluster_ctl
1178 .get_worker_by_id(worker_id)
1179 .await?
1180 .expect("worker should exist");
1181 assert_eq!(
1182 worker.resource.expect("worker resource should exist"),
1183 resource_v2
1184 );
1185
1186 cluster_ctl.delete_worker(host).await?;
1187 Ok(())
1188 }
1189
1190 #[tokio::test]
1191 async fn test_reregister_compute_node_updates_resource() -> MetaResult<()> {
1192 let env = MetaSrvEnv::for_test().await;
1193 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1194
1195 let host = HostAddress {
1196 host: "localhost".to_owned(),
1197 port: 5011,
1198 };
1199 let property = AddNodeProperty {
1200 is_streaming: true,
1201 is_serving: true,
1202 parallelism: 4,
1203 ..Default::default()
1204 };
1205
1206 let resource_v1 = PbResource {
1207 rw_version: "rw-v1".to_owned(),
1208 total_memory_bytes: 1024,
1209 total_cpu_cores: 4,
1210 hostname: "host-v1".to_owned(),
1211 };
1212 let worker_id = cluster_ctl
1213 .add_worker(
1214 PbWorkerType::ComputeNode,
1215 host.clone(),
1216 property.clone(),
1217 resource_v1,
1218 )
1219 .await?;
1220
1221 let resource_v2 = PbResource {
1222 rw_version: "rw-v2".to_owned(),
1223 total_memory_bytes: 2048,
1224 total_cpu_cores: 8,
1225 hostname: "host-v2".to_owned(),
1226 };
1227 cluster_ctl
1228 .add_worker(
1229 PbWorkerType::ComputeNode,
1230 host.clone(),
1231 property,
1232 resource_v2.clone(),
1233 )
1234 .await?;
1235
1236 let worker = cluster_ctl
1237 .get_worker_by_id(worker_id)
1238 .await?
1239 .expect("worker should exist");
1240 assert_eq!(
1241 worker.resource.expect("worker resource should exist"),
1242 resource_v2
1243 );
1244
1245 cluster_ctl.delete_worker(host).await?;
1246 Ok(())
1247 }
1248}