1use std::cmp;
16use std::cmp::Ordering;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::ops::Add;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21
22use itertools::Itertools;
23use risingwave_common::RW_VERSION;
24use risingwave_common::hash::WorkerSlotId;
25use risingwave_common::util::addr::HostAddr;
26use risingwave_common::util::resource_util::cpu::total_cpu_available;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode};
37use risingwave_pb::meta::subscribe_response::{Info, Operation};
38use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
39use sea_orm::ActiveValue::Set;
40use sea_orm::prelude::Expr;
41use sea_orm::{
42 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43 TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59 env: MetaSrvEnv,
60 max_heartbeat_interval: Duration,
61 inner: RwLock<ClusterControllerInner>,
62 started_at: u64,
64}
65
66struct WorkerInfo(
67 worker::Model,
68 Option<worker_property::Model>,
69 WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73 fn from(info: WorkerInfo) -> Self {
74 Self {
75 id: info.0.worker_id as _,
76 r#type: PbWorkerType::from(info.0.worker_type) as _,
77 host: Some(PbHostAddress {
78 host: info.0.host,
79 port: info.0.port,
80 }),
81 state: PbState::from(info.0.status) as _,
82 property: info.1.as_ref().map(|p| PbProperty {
83 is_streaming: p.is_streaming,
84 is_serving: p.is_serving,
85 is_unschedulable: p.is_unschedulable,
86 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
87 resource_group: p.resource_group.clone(),
88 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
89 }),
90 transactional_id: info.0.transaction_id.map(|id| id as _),
91 resource: Some(info.2.resource),
92 started_at: info.2.started_at,
93 }
94 }
95}
96
97impl ClusterController {
98 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99 let inner = ClusterControllerInner::new(
100 env.meta_store_ref().conn.clone(),
101 env.opts.disable_automatic_parallelism_control,
102 )
103 .await?;
104 Ok(Self {
105 env,
106 max_heartbeat_interval,
107 inner: RwLock::new(inner),
108 started_at: timestamp_now_sec(),
109 })
110 }
111
112 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115 self.inner.read().await
116 }
117
118 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119 self.inner.read().await.count_worker_by_type().await
120 }
121
122 pub async fn compute_node_total_cpu_count(&self) -> usize {
123 self.inner.read().await.compute_node_total_cpu_count()
124 }
125
126 async fn update_compute_node_total_cpu_count(&self) -> MetaResult<()> {
127 let total_cpu_cores = self.compute_node_total_cpu_count().await;
128
129 LicenseManager::get().update_cpu_core_count(total_cpu_cores);
131 self.env.notification_manager().notify_all_without_version(
133 Operation::Update, Info::ComputeNodeTotalCpuCount(total_cpu_cores as _),
135 );
136
137 Ok(())
138 }
139
140 pub async fn add_worker(
145 &self,
146 r#type: PbWorkerType,
147 host_address: HostAddress,
148 property: AddNodeProperty,
149 resource: PbResource,
150 ) -> MetaResult<WorkerId> {
151 let worker_id = self
152 .inner
153 .write()
154 .await
155 .add_worker(
156 r#type,
157 host_address,
158 property,
159 resource,
160 self.max_heartbeat_interval,
161 )
162 .await?;
163
164 if r#type == PbWorkerType::ComputeNode {
165 self.update_compute_node_total_cpu_count().await?;
166 }
167
168 Ok(worker_id)
169 }
170
171 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
172 let inner = self.inner.write().await;
173 let worker = inner.activate_worker(worker_id).await?;
174
175 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
178 {
179 self.env
180 .notification_manager()
181 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
182 .await;
183 }
184 self.env
185 .notification_manager()
186 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker))
187 .await;
188
189 Ok(())
190 }
191
192 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
193 let worker = self.inner.write().await.delete_worker(host_address).await?;
194
195 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
196 {
197 self.env
198 .notification_manager()
199 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
200 .await;
201 if worker.r#type() == PbWorkerType::ComputeNode {
202 self.update_compute_node_total_cpu_count().await?;
203 }
204 }
205
206 self.env
210 .notification_manager()
211 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()))
212 .await;
213
214 Ok(worker)
215 }
216
217 pub async fn update_schedulability(
218 &self,
219 worker_ids: Vec<WorkerId>,
220 schedulability: Schedulability,
221 ) -> MetaResult<()> {
222 self.inner
223 .write()
224 .await
225 .update_schedulability(worker_ids, schedulability)
226 .await
227 }
228
229 pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
231 tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
232 self.inner
233 .write()
234 .await
235 .heartbeat(worker_id, self.max_heartbeat_interval)
236 }
237
238 pub fn start_heartbeat_checker(
239 cluster_controller: ClusterControllerRef,
240 check_interval: Duration,
241 ) -> (JoinHandle<()>, Sender<()>) {
242 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
243 let join_handle = tokio::spawn(async move {
244 let mut min_interval = tokio::time::interval(check_interval);
245 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
246 loop {
247 tokio::select! {
248 _ = min_interval.tick() => {},
250 _ = &mut shutdown_rx => {
252 tracing::info!("Heartbeat checker is stopped");
253 return;
254 }
255 }
256
257 let mut inner = cluster_controller.inner.write().await;
258 for worker in inner
260 .worker_extra_info
261 .values_mut()
262 .filter(|worker| worker.expire_at.is_none())
263 {
264 worker.update_ttl(cluster_controller.max_heartbeat_interval);
265 }
266
267 let now = timestamp_now_sec();
269 let worker_to_delete = inner
270 .worker_extra_info
271 .iter()
272 .filter(|(_, info)| info.expire_at.unwrap() < now)
273 .map(|(id, _)| *id)
274 .collect_vec();
275
276 let worker_infos = match Worker::find()
278 .select_only()
279 .column(worker::Column::WorkerId)
280 .column(worker::Column::WorkerType)
281 .column(worker::Column::Host)
282 .column(worker::Column::Port)
283 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
284 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
285 .all(&inner.db)
286 .await
287 {
288 Ok(keys) => keys,
289 Err(err) => {
290 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
291 continue;
292 }
293 };
294 drop(inner);
295
296 for (worker_id, worker_type, host, port) in worker_infos {
297 let host_addr = PbHostAddress { host, port };
298 match cluster_controller.delete_worker(host_addr.clone()).await {
299 Ok(_) => {
300 tracing::warn!(
301 worker_id,
302 ?host_addr,
303 %now,
304 "Deleted expired worker"
305 );
306 match worker_type {
307 WorkerType::Frontend
308 | WorkerType::ComputeNode
309 | WorkerType::Compactor
310 | WorkerType::RiseCtl => {
311 cluster_controller
312 .env
313 .notification_manager()
314 .delete_sender(worker_type.into(), WorkerKey(host_addr))
315 .await
316 }
317 _ => {}
318 };
319 }
320 Err(err) => {
321 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
322 }
323 }
324 }
325 }
326 });
327
328 (join_handle, shutdown_tx)
329 }
330
331 pub async fn list_workers(
336 &self,
337 worker_type: Option<WorkerType>,
338 worker_status: Option<WorkerStatus>,
339 ) -> MetaResult<Vec<PbWorkerNode>> {
340 let mut workers = vec![];
341 if worker_type.is_none() {
343 workers.push(meta_node_info(
344 &self.env.opts.advertise_addr,
345 Some(self.started_at),
346 ));
347 }
348 workers.extend(
349 self.inner
350 .read()
351 .await
352 .list_workers(worker_type, worker_status)
353 .await?,
354 );
355 Ok(workers)
356 }
357
358 pub(crate) async fn subscribe_active_streaming_compute_nodes(
359 &self,
360 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
361 let inner = self.inner.read().await;
362 let worker_nodes = inner.list_active_streaming_workers().await?;
363 let (tx, rx) = unbounded_channel();
364
365 self.env
367 .notification_manager()
368 .insert_local_sender(tx)
369 .await;
370 drop(inner);
371 Ok((worker_nodes, rx))
372 }
373
374 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
377 self.inner
378 .read()
379 .await
380 .list_active_streaming_workers()
381 .await
382 }
383
384 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
385 self.inner.read().await.list_active_worker_slots().await
386 }
387
388 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
391 self.inner.read().await.list_active_serving_workers().await
392 }
393
394 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
396 self.inner.read().await.get_streaming_cluster_info().await
397 }
398
399 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
400 self.inner.read().await.get_worker_by_id(worker_id).await
401 }
402
403 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
404 self.inner
405 .read()
406 .await
407 .get_worker_extra_info_by_id(worker_id)
408 }
409
410 pub fn cluster_id(&self) -> &ClusterId {
411 self.env.cluster_id()
412 }
413
414 pub fn meta_store_endpoint(&self) -> String {
415 self.env.meta_store_ref().endpoint.clone()
416 }
417}
418
419#[derive(Debug, Clone)]
421pub struct StreamingClusterInfo {
422 pub worker_nodes: HashMap<u32, WorkerNode>,
424
425 pub schedulable_workers: HashSet<u32>,
427
428 pub unschedulable_workers: HashSet<u32>,
430}
431
432impl StreamingClusterInfo {
434 pub fn parallelism(&self, resource_group: &str) -> usize {
435 let available_worker_ids =
436 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
437
438 self.worker_nodes
439 .values()
440 .filter(|worker| available_worker_ids.contains(&(worker.id as WorkerId)))
441 .map(|worker| worker.compute_node_parallelism())
442 .sum()
443 }
444
445 pub fn filter_schedulable_workers_by_resource_group(
446 &self,
447 resource_group: &str,
448 ) -> HashMap<u32, WorkerNode> {
449 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
450 self.worker_nodes
451 .iter()
452 .filter(|(id, _)| worker_ids.contains(&(**id as WorkerId)))
453 .map(|(id, worker)| (*id, worker.clone()))
454 .collect()
455 }
456}
457
458#[derive(Default, Clone)]
459pub struct WorkerExtraInfo {
460 expire_at: Option<u64>,
464 started_at: Option<u64>,
465 resource: PbResource,
466 r#type: PbWorkerType,
467}
468
469impl WorkerExtraInfo {
470 fn update_ttl(&mut self, ttl: Duration) {
471 let expire = cmp::max(
472 self.expire_at.unwrap_or_default(),
473 SystemTime::now()
474 .add(ttl)
475 .duration_since(SystemTime::UNIX_EPOCH)
476 .expect("Clock may have gone backwards")
477 .as_secs(),
478 );
479 self.expire_at = Some(expire);
480 }
481
482 fn update_started_at(&mut self) {
483 self.started_at = Some(timestamp_now_sec());
484 }
485}
486
487fn timestamp_now_sec() -> u64 {
488 SystemTime::now()
489 .duration_since(SystemTime::UNIX_EPOCH)
490 .expect("Clock may have gone backwards")
491 .as_secs()
492}
493
494fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
495 PbWorkerNode {
496 id: META_NODE_ID,
497 r#type: PbWorkerType::Meta.into(),
498 host: HostAddr::try_from(host)
499 .as_ref()
500 .map(HostAddr::to_protobuf)
501 .ok(),
502 state: PbState::Running as _,
503 property: None,
504 transactional_id: None,
505 resource: Some(risingwave_pb::common::worker_node::Resource {
506 rw_version: RW_VERSION.to_owned(),
507 total_memory_bytes: system_memory_available_bytes() as _,
508 total_cpu_cores: total_cpu_available() as _,
509 }),
510 started_at,
511 }
512}
513
514pub struct ClusterControllerInner {
515 db: DatabaseConnection,
516 available_transactional_ids: VecDeque<TransactionId>,
518 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
519 disable_automatic_parallelism_control: bool,
520}
521
522impl ClusterControllerInner {
523 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
524 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
525
526 pub async fn new(
527 db: DatabaseConnection,
528 disable_automatic_parallelism_control: bool,
529 ) -> MetaResult<Self> {
530 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
531 .select_only()
532 .column(worker::Column::WorkerId)
533 .column(worker::Column::TransactionId)
534 .into_tuple()
535 .all(&db)
536 .await?;
537 let inuse_txn_ids: HashSet<_> = workers
538 .iter()
539 .cloned()
540 .filter_map(|(_, txn_id)| txn_id)
541 .collect();
542 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
543 .filter(|id| !inuse_txn_ids.contains(id))
544 .collect();
545
546 let worker_extra_info = workers
547 .into_iter()
548 .map(|(w, _)| (w, WorkerExtraInfo::default()))
549 .collect();
550
551 Ok(Self {
552 db,
553 available_transactional_ids,
554 worker_extra_info,
555 disable_automatic_parallelism_control,
556 })
557 }
558
559 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
560 let workers: Vec<(WorkerType, i64)> = Worker::find()
561 .select_only()
562 .column(worker::Column::WorkerType)
563 .column_as(worker::Column::WorkerId.count(), "count")
564 .group_by(worker::Column::WorkerType)
565 .into_tuple()
566 .all(&self.db)
567 .await?;
568
569 Ok(workers.into_iter().collect())
570 }
571
572 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
573 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
574 let expire = cmp::max(
575 info.expire_at.unwrap_or_default(),
576 SystemTime::now()
577 .add(ttl)
578 .duration_since(SystemTime::UNIX_EPOCH)
579 .expect("Clock may have gone backwards")
580 .as_secs(),
581 );
582 info.expire_at = Some(expire);
583 Ok(())
584 } else {
585 Err(MetaError::invalid_worker(worker_id, "worker not found"))
586 }
587 }
588
589 fn update_resource_and_started_at(
590 &mut self,
591 worker_id: WorkerId,
592 resource: PbResource,
593 ) -> MetaResult<()> {
594 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
595 info.resource = resource;
596 info.update_started_at();
597 Ok(())
598 } else {
599 Err(MetaError::invalid_worker(worker_id, "worker not found"))
600 }
601 }
602
603 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
604 self.worker_extra_info
605 .get(&worker_id)
606 .cloned()
607 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
608 }
609
610 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
611 match (self.available_transactional_ids.front(), r#type) {
612 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
613 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
615 _ => Ok(None),
616 }
617 }
618
619 fn compute_node_total_cpu_count(&self) -> usize {
620 self.worker_extra_info
621 .values()
622 .filter(|info| info.r#type == PbWorkerType::ComputeNode)
623 .map(|info| info.resource.total_cpu_cores as usize)
624 .sum()
625 }
626
627 pub async fn add_worker(
628 &mut self,
629 r#type: PbWorkerType,
630 host_address: HostAddress,
631 add_property: AddNodeProperty,
632 resource: PbResource,
633 ttl: Duration,
634 ) -> MetaResult<WorkerId> {
635 let txn = self.db.begin().await?;
636
637 let worker = Worker::find()
638 .filter(
639 worker::Column::Host
640 .eq(host_address.host.clone())
641 .and(worker::Column::Port.eq(host_address.port)),
642 )
643 .find_also_related(WorkerProperty)
644 .one(&txn)
645 .await?;
646 if let Some((worker, property)) = worker {
648 assert_eq!(worker.worker_type, r#type.into());
649 return if worker.worker_type == WorkerType::ComputeNode {
650 let property = property.unwrap();
651 let mut current_parallelism = property.parallelism as usize;
652 let new_parallelism = add_property.parallelism as usize;
653 match new_parallelism.cmp(¤t_parallelism) {
654 Ordering::Less => {
655 if !self.disable_automatic_parallelism_control {
656 tracing::info!(
658 "worker {} parallelism reduced from {} to {}",
659 worker.worker_id,
660 current_parallelism,
661 new_parallelism
662 );
663 current_parallelism = new_parallelism;
664 } else {
665 tracing::warn!(
668 "worker {} parallelism is less than current, current is {}, but received {}",
669 worker.worker_id,
670 current_parallelism,
671 new_parallelism
672 );
673 }
674 }
675 Ordering::Greater => {
676 tracing::info!(
677 "worker {} parallelism updated from {} to {}",
678 worker.worker_id,
679 current_parallelism,
680 new_parallelism
681 );
682 current_parallelism = new_parallelism;
683 }
684 Ordering::Equal => {}
685 }
686 let mut property: worker_property::ActiveModel = property.into();
687
688 property.is_streaming = Set(add_property.is_streaming);
690 property.is_serving = Set(add_property.is_serving);
691 property.parallelism = Set(current_parallelism as _);
692 property.resource_group =
693 Set(Some(add_property.resource_group.unwrap_or_else(|| {
694 tracing::warn!(
695 "resource_group is not set for worker {}, fallback to `default`",
696 worker.worker_id
697 );
698 DEFAULT_RESOURCE_GROUP.to_owned()
699 })));
700
701 WorkerProperty::update(property).exec(&txn).await?;
702 txn.commit().await?;
703 self.update_worker_ttl(worker.worker_id, ttl)?;
704 self.update_resource_and_started_at(worker.worker_id, resource)?;
705 Ok(worker.worker_id)
706 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
707 let worker_property = worker_property::ActiveModel {
708 worker_id: Set(worker.worker_id),
709 parallelism: Set(add_property
710 .parallelism
711 .try_into()
712 .expect("invalid parallelism")),
713 is_streaming: Set(add_property.is_streaming),
714 is_serving: Set(add_property.is_serving),
715 is_unschedulable: Set(add_property.is_unschedulable),
716 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
717 resource_group: Set(None),
718 };
719 WorkerProperty::insert(worker_property).exec(&txn).await?;
720 txn.commit().await?;
721 self.update_worker_ttl(worker.worker_id, ttl)?;
722 self.update_resource_and_started_at(worker.worker_id, resource)?;
723 Ok(worker.worker_id)
724 } else {
725 self.update_worker_ttl(worker.worker_id, ttl)?;
726 self.update_resource_and_started_at(worker.worker_id, resource)?;
727 Ok(worker.worker_id)
728 };
729 }
730 let txn_id = self.apply_transaction_id(r#type)?;
731
732 let worker = worker::ActiveModel {
733 worker_id: Default::default(),
734 worker_type: Set(r#type.into()),
735 host: Set(host_address.host),
736 port: Set(host_address.port),
737 status: Set(WorkerStatus::Starting),
738 transaction_id: Set(txn_id),
739 };
740 let insert_res = Worker::insert(worker).exec(&txn).await?;
741 let worker_id = insert_res.last_insert_id as WorkerId;
742 if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend {
743 let property = worker_property::ActiveModel {
744 worker_id: Set(worker_id),
745 parallelism: Set(add_property
746 .parallelism
747 .try_into()
748 .expect("invalid parallelism")),
749 is_streaming: Set(add_property.is_streaming),
750 is_serving: Set(add_property.is_serving),
751 is_unschedulable: Set(add_property.is_unschedulable),
752 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
753 resource_group: if r#type == PbWorkerType::ComputeNode {
754 Set(add_property.resource_group.clone())
755 } else {
756 Set(None)
757 },
758 };
759 WorkerProperty::insert(property).exec(&txn).await?;
760 }
761
762 txn.commit().await?;
763 if let Some(txn_id) = txn_id {
764 self.available_transactional_ids.retain(|id| *id != txn_id);
765 }
766 let extra_info = WorkerExtraInfo {
767 started_at: Some(timestamp_now_sec()),
768 expire_at: None,
769 resource,
770 r#type,
771 };
772 self.worker_extra_info.insert(worker_id, extra_info);
773
774 Ok(worker_id)
775 }
776
777 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
778 let worker = worker::ActiveModel {
779 worker_id: Set(worker_id),
780 status: Set(WorkerStatus::Running),
781 ..Default::default()
782 };
783
784 let worker = worker.update(&self.db).await?;
785 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
786 .one(&self.db)
787 .await?;
788 let extra_info = self.get_extra_info_checked(worker_id)?;
789 Ok(WorkerInfo(worker, worker_property, extra_info).into())
790 }
791
792 pub async fn update_schedulability(
793 &self,
794 worker_ids: Vec<WorkerId>,
795 schedulability: Schedulability,
796 ) -> MetaResult<()> {
797 let is_unschedulable = schedulability == Schedulability::Unschedulable;
798 WorkerProperty::update_many()
799 .col_expr(
800 worker_property::Column::IsUnschedulable,
801 Expr::value(is_unschedulable),
802 )
803 .filter(worker_property::Column::WorkerId.is_in(worker_ids))
804 .exec(&self.db)
805 .await?;
806
807 Ok(())
808 }
809
810 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
811 let worker = Worker::find()
812 .filter(
813 worker::Column::Host
814 .eq(host_addr.host)
815 .and(worker::Column::Port.eq(host_addr.port)),
816 )
817 .find_also_related(WorkerProperty)
818 .one(&self.db)
819 .await?;
820 let Some((worker, property)) = worker else {
821 return Err(MetaError::invalid_parameter("worker not found!"));
822 };
823
824 let res = Worker::delete_by_id(worker.worker_id)
825 .exec(&self.db)
826 .await?;
827 if res.rows_affected == 0 {
828 return Err(MetaError::invalid_parameter("worker not found!"));
829 }
830
831 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
832 if let Some(txn_id) = &worker.transaction_id {
833 self.available_transactional_ids.push_back(*txn_id);
834 }
835 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
836
837 Ok(worker)
838 }
839
840 pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
841 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
842 worker_info.update_ttl(ttl);
843 Ok(())
844 } else {
845 Err(MetaError::invalid_worker(worker_id, "worker not found"))
846 }
847 }
848
849 pub async fn list_workers(
850 &self,
851 worker_type: Option<WorkerType>,
852 worker_status: Option<WorkerStatus>,
853 ) -> MetaResult<Vec<PbWorkerNode>> {
854 let mut find = Worker::find();
855 if let Some(worker_type) = worker_type {
856 find = find.filter(worker::Column::WorkerType.eq(worker_type));
857 }
858 if let Some(worker_status) = worker_status {
859 find = find.filter(worker::Column::Status.eq(worker_status));
860 }
861 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
862 Ok(workers
863 .into_iter()
864 .map(|(worker, property)| {
865 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
866 WorkerInfo(worker, property, extra_info).into()
867 })
868 .collect_vec())
869 }
870
871 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
872 let workers = Worker::find()
873 .filter(
874 worker::Column::WorkerType
875 .eq(WorkerType::ComputeNode)
876 .and(worker::Column::Status.eq(WorkerStatus::Running)),
877 )
878 .inner_join(WorkerProperty)
879 .select_also(WorkerProperty)
880 .filter(worker_property::Column::IsStreaming.eq(true))
881 .all(&self.db)
882 .await?;
883
884 Ok(workers
885 .into_iter()
886 .map(|(worker, property)| {
887 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
888 WorkerInfo(worker, property, extra_info).into()
889 })
890 .collect_vec())
891 }
892
893 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
894 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
895 .select_only()
896 .column(worker_property::Column::WorkerId)
897 .column(worker_property::Column::Parallelism)
898 .inner_join(Worker)
899 .filter(worker::Column::Status.eq(WorkerStatus::Running))
900 .into_tuple()
901 .all(&self.db)
902 .await?;
903 Ok(worker_parallelisms
904 .into_iter()
905 .flat_map(|(worker_id, parallelism)| {
906 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id as u32, idx as usize))
907 })
908 .collect_vec())
909 }
910
911 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
912 let workers = Worker::find()
913 .filter(
914 worker::Column::WorkerType
915 .eq(WorkerType::ComputeNode)
916 .and(worker::Column::Status.eq(WorkerStatus::Running)),
917 )
918 .inner_join(WorkerProperty)
919 .select_also(WorkerProperty)
920 .filter(worker_property::Column::IsServing.eq(true))
921 .all(&self.db)
922 .await?;
923
924 Ok(workers
925 .into_iter()
926 .map(|(worker, property)| {
927 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
928 WorkerInfo(worker, property, extra_info).into()
929 })
930 .collect_vec())
931 }
932
933 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
934 let mut streaming_workers = self.list_active_streaming_workers().await?;
935
936 let unschedulable_workers: HashSet<_> = streaming_workers
937 .extract_if(.., |worker| {
938 worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
939 })
940 .map(|w| w.id)
941 .collect();
942
943 let schedulable_workers = streaming_workers
944 .iter()
945 .map(|worker| worker.id)
946 .filter(|id| !unschedulable_workers.contains(id))
947 .collect();
948
949 let active_workers: HashMap<_, _> =
950 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
951
952 Ok(StreamingClusterInfo {
953 worker_nodes: active_workers,
954 schedulable_workers,
955 unschedulable_workers,
956 })
957 }
958
959 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
960 let worker = Worker::find_by_id(worker_id)
961 .find_also_related(WorkerProperty)
962 .one(&self.db)
963 .await?;
964 if worker.is_none() {
965 return Ok(None);
966 }
967 let extra_info = self.get_extra_info_checked(worker_id)?;
968 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
969 }
970
971 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
972 self.worker_extra_info.get(&worker_id).cloned()
973 }
974}
975
976#[cfg(test)]
977mod tests {
978 use super::*;
979
980 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
981 (0..count)
982 .map(|i| HostAddress {
983 host: "localhost".to_owned(),
984 port: 5000 + i as i32,
985 })
986 .collect_vec()
987 }
988
989 #[tokio::test]
990 async fn test_cluster_controller() -> MetaResult<()> {
991 let env = MetaSrvEnv::for_test().await;
992 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
993
994 let parallelism_num = 4_usize;
995 let worker_count = 5_usize;
996 let property = AddNodeProperty {
997 parallelism: parallelism_num as _,
998 is_streaming: true,
999 is_serving: true,
1000 is_unschedulable: false,
1001 ..Default::default()
1002 };
1003 let hosts = mock_worker_hosts_for_test(worker_count);
1004 let mut worker_ids = vec![];
1005 for host in &hosts {
1006 worker_ids.push(
1007 cluster_ctl
1008 .add_worker(
1009 PbWorkerType::ComputeNode,
1010 host.clone(),
1011 property.clone(),
1012 PbResource::default(),
1013 )
1014 .await?,
1015 );
1016 }
1017
1018 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1020
1021 for id in &worker_ids {
1022 cluster_ctl.activate_worker(*id).await?;
1023 }
1024 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1025 assert_eq!(
1026 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1027 worker_count
1028 );
1029 assert_eq!(
1030 cluster_ctl.list_active_streaming_workers().await?.len(),
1031 worker_count
1032 );
1033 assert_eq!(
1034 cluster_ctl.list_active_serving_workers().await?.len(),
1035 worker_count
1036 );
1037 assert_eq!(
1038 cluster_ctl.list_active_worker_slots().await?.len(),
1039 parallelism_num * worker_count
1040 );
1041
1042 let mut new_property = property.clone();
1044 new_property.parallelism = (parallelism_num * 2) as _;
1045 new_property.is_serving = false;
1046 cluster_ctl
1047 .add_worker(
1048 PbWorkerType::ComputeNode,
1049 hosts[0].clone(),
1050 new_property,
1051 PbResource::default(),
1052 )
1053 .await?;
1054
1055 assert_eq!(
1056 cluster_ctl.list_active_streaming_workers().await?.len(),
1057 worker_count
1058 );
1059 assert_eq!(
1060 cluster_ctl.list_active_serving_workers().await?.len(),
1061 worker_count - 1
1062 );
1063 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1064 assert!(worker_slots.iter().all_unique());
1065 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1066
1067 for host in hosts {
1069 cluster_ctl.delete_worker(host).await?;
1070 }
1071 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1072 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1073 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1074
1075 Ok(())
1076 }
1077
1078 #[tokio::test]
1079 async fn test_update_schedulability() -> MetaResult<()> {
1080 let env = MetaSrvEnv::for_test().await;
1081 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1082
1083 let host = HostAddress {
1084 host: "localhost".to_owned(),
1085 port: 5001,
1086 };
1087 let mut property = AddNodeProperty {
1088 is_streaming: true,
1089 is_serving: true,
1090 is_unschedulable: false,
1091 parallelism: 4,
1092 ..Default::default()
1093 };
1094 let worker_id = cluster_ctl
1095 .add_worker(
1096 PbWorkerType::ComputeNode,
1097 host.clone(),
1098 property.clone(),
1099 PbResource::default(),
1100 )
1101 .await?;
1102
1103 cluster_ctl.activate_worker(worker_id).await?;
1104 cluster_ctl
1105 .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1106 .await?;
1107
1108 let workers = cluster_ctl.list_active_streaming_workers().await?;
1109 assert_eq!(workers.len(), 1);
1110 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1111
1112 property.is_unschedulable = false;
1114 property.is_serving = false;
1115 let new_worker_id = cluster_ctl
1116 .add_worker(
1117 PbWorkerType::ComputeNode,
1118 host.clone(),
1119 property,
1120 PbResource::default(),
1121 )
1122 .await?;
1123 assert_eq!(worker_id, new_worker_id);
1124
1125 let workers = cluster_ctl.list_active_streaming_workers().await?;
1126 assert_eq!(workers.len(), 1);
1127 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1128
1129 cluster_ctl.delete_worker(host).await?;
1130
1131 Ok(())
1132 }
1133}