1use std::cmp;
16use std::cmp::Ordering;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::ops::Add;
19use std::sync::Arc;
20use std::time::{Duration, SystemTime};
21
22use itertools::Itertools;
23use risingwave_common::RW_VERSION;
24use risingwave_common::hash::WorkerSlotId;
25use risingwave_common::util::addr::HostAddr;
26use risingwave_common::util::resource_util::cpu::total_cpu_available;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode};
37use risingwave_pb::meta::subscribe_response::{Info, Operation};
38use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
39use sea_orm::ActiveValue::Set;
40use sea_orm::prelude::Expr;
41use sea_orm::{
42 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43 TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59 env: MetaSrvEnv,
60 max_heartbeat_interval: Duration,
61 inner: RwLock<ClusterControllerInner>,
62 started_at: u64,
64}
65
66struct WorkerInfo(
67 worker::Model,
68 Option<worker_property::Model>,
69 WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73 fn from(info: WorkerInfo) -> Self {
74 Self {
75 id: info.0.worker_id as _,
76 r#type: PbWorkerType::from(info.0.worker_type) as _,
77 host: Some(PbHostAddress {
78 host: info.0.host,
79 port: info.0.port,
80 }),
81 state: PbState::from(info.0.status) as _,
82 property: info.1.as_ref().map(|p| PbProperty {
83 is_streaming: p.is_streaming,
84 is_serving: p.is_serving,
85 is_unschedulable: p.is_unschedulable,
86 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
87 resource_group: p.resource_group.clone(),
88 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
89 }),
90 transactional_id: info.0.transaction_id.map(|id| id as _),
91 resource: Some(info.2.resource),
92 started_at: info.2.started_at,
93 }
94 }
95}
96
97impl ClusterController {
98 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99 let inner = ClusterControllerInner::new(
100 env.meta_store_ref().conn.clone(),
101 env.opts.disable_automatic_parallelism_control,
102 )
103 .await?;
104 Ok(Self {
105 env,
106 max_heartbeat_interval,
107 inner: RwLock::new(inner),
108 started_at: timestamp_now_sec(),
109 })
110 }
111
112 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115 self.inner.read().await
116 }
117
118 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119 self.inner.read().await.count_worker_by_type().await
120 }
121
122 pub async fn compute_node_total_cpu_count(&self) -> usize {
123 self.inner.read().await.compute_node_total_cpu_count()
124 }
125
126 async fn update_compute_node_total_cpu_count(&self) -> MetaResult<()> {
127 let total_cpu_cores = self.compute_node_total_cpu_count().await;
128
129 LicenseManager::get().update_cpu_core_count(total_cpu_cores);
131 self.env.notification_manager().notify_all_without_version(
133 Operation::Update, Info::ComputeNodeTotalCpuCount(total_cpu_cores as _),
135 );
136
137 Ok(())
138 }
139
140 pub async fn add_worker(
145 &self,
146 r#type: PbWorkerType,
147 host_address: HostAddress,
148 property: AddNodeProperty,
149 resource: PbResource,
150 ) -> MetaResult<WorkerId> {
151 let worker_id = self
152 .inner
153 .write()
154 .await
155 .add_worker(
156 r#type,
157 host_address,
158 property,
159 resource,
160 self.max_heartbeat_interval,
161 )
162 .await?;
163
164 if r#type == PbWorkerType::ComputeNode {
165 self.update_compute_node_total_cpu_count().await?;
166 }
167
168 Ok(worker_id)
169 }
170
171 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
172 let inner = self.inner.write().await;
173 let worker = inner.activate_worker(worker_id).await?;
174
175 if worker.r#type() == PbWorkerType::ComputeNode {
178 self.env
179 .notification_manager()
180 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
181 .await;
182 }
183 self.env
184 .notification_manager()
185 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker))
186 .await;
187
188 Ok(())
189 }
190
191 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
192 let worker = self.inner.write().await.delete_worker(host_address).await?;
193
194 if worker.r#type() == PbWorkerType::ComputeNode {
195 self.env
196 .notification_manager()
197 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
198 .await;
199
200 self.update_compute_node_total_cpu_count().await?;
201 }
202
203 self.env
207 .notification_manager()
208 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()))
209 .await;
210
211 Ok(worker)
212 }
213
214 pub async fn update_schedulability(
215 &self,
216 worker_ids: Vec<WorkerId>,
217 schedulability: Schedulability,
218 ) -> MetaResult<()> {
219 self.inner
220 .write()
221 .await
222 .update_schedulability(worker_ids, schedulability)
223 .await
224 }
225
226 pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
228 tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
229 self.inner
230 .write()
231 .await
232 .heartbeat(worker_id, self.max_heartbeat_interval)
233 }
234
235 pub fn start_heartbeat_checker(
236 cluster_controller: ClusterControllerRef,
237 check_interval: Duration,
238 ) -> (JoinHandle<()>, Sender<()>) {
239 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
240 let join_handle = tokio::spawn(async move {
241 let mut min_interval = tokio::time::interval(check_interval);
242 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
243 loop {
244 tokio::select! {
245 _ = min_interval.tick() => {},
247 _ = &mut shutdown_rx => {
249 tracing::info!("Heartbeat checker is stopped");
250 return;
251 }
252 }
253
254 let mut inner = cluster_controller.inner.write().await;
255 for worker in inner
257 .worker_extra_info
258 .values_mut()
259 .filter(|worker| worker.expire_at.is_none())
260 {
261 worker.update_ttl(cluster_controller.max_heartbeat_interval);
262 }
263
264 let now = timestamp_now_sec();
266 let worker_to_delete = inner
267 .worker_extra_info
268 .iter()
269 .filter(|(_, info)| info.expire_at.unwrap() < now)
270 .map(|(id, _)| *id)
271 .collect_vec();
272
273 let worker_infos = match Worker::find()
275 .select_only()
276 .column(worker::Column::WorkerId)
277 .column(worker::Column::WorkerType)
278 .column(worker::Column::Host)
279 .column(worker::Column::Port)
280 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
281 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
282 .all(&inner.db)
283 .await
284 {
285 Ok(keys) => keys,
286 Err(err) => {
287 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
288 continue;
289 }
290 };
291 drop(inner);
292
293 for (worker_id, worker_type, host, port) in worker_infos {
294 let host_addr = PbHostAddress { host, port };
295 match cluster_controller.delete_worker(host_addr.clone()).await {
296 Ok(_) => {
297 tracing::warn!(
298 worker_id,
299 ?host_addr,
300 %now,
301 "Deleted expired worker"
302 );
303 match worker_type {
304 WorkerType::Frontend
305 | WorkerType::ComputeNode
306 | WorkerType::Compactor
307 | WorkerType::RiseCtl => {
308 cluster_controller
309 .env
310 .notification_manager()
311 .delete_sender(worker_type.into(), WorkerKey(host_addr))
312 .await
313 }
314 _ => {}
315 };
316 }
317 Err(err) => {
318 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
319 }
320 }
321 }
322 }
323 });
324
325 (join_handle, shutdown_tx)
326 }
327
328 pub async fn list_workers(
333 &self,
334 worker_type: Option<WorkerType>,
335 worker_status: Option<WorkerStatus>,
336 ) -> MetaResult<Vec<PbWorkerNode>> {
337 let mut workers = vec![];
338 if worker_type.is_none() {
340 workers.push(meta_node_info(
341 &self.env.opts.advertise_addr,
342 Some(self.started_at),
343 ));
344 }
345 workers.extend(
346 self.inner
347 .read()
348 .await
349 .list_workers(worker_type, worker_status)
350 .await?,
351 );
352 Ok(workers)
353 }
354
355 pub(crate) async fn subscribe_active_streaming_compute_nodes(
356 &self,
357 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
358 let inner = self.inner.read().await;
359 let worker_nodes = inner.list_active_streaming_workers().await?;
360 let (tx, rx) = unbounded_channel();
361
362 self.env
364 .notification_manager()
365 .insert_local_sender(tx)
366 .await;
367 drop(inner);
368 Ok((worker_nodes, rx))
369 }
370
371 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
374 self.inner
375 .read()
376 .await
377 .list_active_streaming_workers()
378 .await
379 }
380
381 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
382 self.inner.read().await.list_active_worker_slots().await
383 }
384
385 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
388 self.inner.read().await.list_active_serving_workers().await
389 }
390
391 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
393 self.inner.read().await.get_streaming_cluster_info().await
394 }
395
396 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
397 self.inner.read().await.get_worker_by_id(worker_id).await
398 }
399
400 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
401 self.inner
402 .read()
403 .await
404 .get_worker_extra_info_by_id(worker_id)
405 }
406
407 pub fn cluster_id(&self) -> &ClusterId {
408 self.env.cluster_id()
409 }
410
411 pub fn meta_store_endpoint(&self) -> String {
412 self.env.meta_store_ref().endpoint.clone()
413 }
414}
415
416#[derive(Debug, Clone)]
418pub struct StreamingClusterInfo {
419 pub worker_nodes: HashMap<u32, WorkerNode>,
421
422 pub schedulable_workers: HashSet<u32>,
424
425 pub unschedulable_workers: HashSet<u32>,
427}
428
429impl StreamingClusterInfo {
431 pub fn parallelism(&self, resource_group: String) -> usize {
432 let available_worker_ids =
433 filter_workers_by_resource_group(&self.worker_nodes, resource_group.as_str());
434
435 self.worker_nodes
436 .values()
437 .filter(|worker| available_worker_ids.contains(&(worker.id as WorkerId)))
438 .map(|worker| worker.compute_node_parallelism())
439 .sum()
440 }
441
442 pub fn filter_schedulable_workers_by_resource_group(
443 &self,
444 resource_group: &str,
445 ) -> HashMap<u32, WorkerNode> {
446 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
447 self.worker_nodes
448 .iter()
449 .filter(|(id, _)| worker_ids.contains(&(**id as WorkerId)))
450 .map(|(id, worker)| (*id, worker.clone()))
451 .collect()
452 }
453}
454
455#[derive(Default, Clone)]
456pub struct WorkerExtraInfo {
457 expire_at: Option<u64>,
461 started_at: Option<u64>,
462 resource: PbResource,
463 r#type: PbWorkerType,
464}
465
466impl WorkerExtraInfo {
467 fn update_ttl(&mut self, ttl: Duration) {
468 let expire = cmp::max(
469 self.expire_at.unwrap_or_default(),
470 SystemTime::now()
471 .add(ttl)
472 .duration_since(SystemTime::UNIX_EPOCH)
473 .expect("Clock may have gone backwards")
474 .as_secs(),
475 );
476 self.expire_at = Some(expire);
477 }
478
479 fn update_started_at(&mut self) {
480 self.started_at = Some(timestamp_now_sec());
481 }
482}
483
484fn timestamp_now_sec() -> u64 {
485 SystemTime::now()
486 .duration_since(SystemTime::UNIX_EPOCH)
487 .expect("Clock may have gone backwards")
488 .as_secs()
489}
490
491fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
492 PbWorkerNode {
493 id: META_NODE_ID,
494 r#type: PbWorkerType::Meta.into(),
495 host: HostAddr::try_from(host)
496 .as_ref()
497 .map(HostAddr::to_protobuf)
498 .ok(),
499 state: PbState::Running as _,
500 property: None,
501 transactional_id: None,
502 resource: Some(risingwave_pb::common::worker_node::Resource {
503 rw_version: RW_VERSION.to_owned(),
504 total_memory_bytes: system_memory_available_bytes() as _,
505 total_cpu_cores: total_cpu_available() as _,
506 }),
507 started_at,
508 }
509}
510
511pub struct ClusterControllerInner {
512 db: DatabaseConnection,
513 available_transactional_ids: VecDeque<TransactionId>,
515 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
516 disable_automatic_parallelism_control: bool,
517}
518
519impl ClusterControllerInner {
520 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
521 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
522
523 pub async fn new(
524 db: DatabaseConnection,
525 disable_automatic_parallelism_control: bool,
526 ) -> MetaResult<Self> {
527 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
528 .select_only()
529 .column(worker::Column::WorkerId)
530 .column(worker::Column::TransactionId)
531 .into_tuple()
532 .all(&db)
533 .await?;
534 let inuse_txn_ids: HashSet<_> = workers
535 .iter()
536 .cloned()
537 .filter_map(|(_, txn_id)| txn_id)
538 .collect();
539 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
540 .filter(|id| !inuse_txn_ids.contains(id))
541 .collect();
542
543 let worker_extra_info = workers
544 .into_iter()
545 .map(|(w, _)| (w, WorkerExtraInfo::default()))
546 .collect();
547
548 Ok(Self {
549 db,
550 available_transactional_ids,
551 worker_extra_info,
552 disable_automatic_parallelism_control,
553 })
554 }
555
556 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
557 let workers: Vec<(WorkerType, i64)> = Worker::find()
558 .select_only()
559 .column(worker::Column::WorkerType)
560 .column_as(worker::Column::WorkerId.count(), "count")
561 .group_by(worker::Column::WorkerType)
562 .into_tuple()
563 .all(&self.db)
564 .await?;
565
566 Ok(workers.into_iter().collect())
567 }
568
569 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
570 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
571 let expire = cmp::max(
572 info.expire_at.unwrap_or_default(),
573 SystemTime::now()
574 .add(ttl)
575 .duration_since(SystemTime::UNIX_EPOCH)
576 .expect("Clock may have gone backwards")
577 .as_secs(),
578 );
579 info.expire_at = Some(expire);
580 Ok(())
581 } else {
582 Err(MetaError::invalid_worker(worker_id, "worker not found"))
583 }
584 }
585
586 fn update_resource_and_started_at(
587 &mut self,
588 worker_id: WorkerId,
589 resource: PbResource,
590 ) -> MetaResult<()> {
591 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
592 info.resource = resource;
593 info.update_started_at();
594 Ok(())
595 } else {
596 Err(MetaError::invalid_worker(worker_id, "worker not found"))
597 }
598 }
599
600 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
601 self.worker_extra_info
602 .get(&worker_id)
603 .cloned()
604 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
605 }
606
607 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
608 match (self.available_transactional_ids.front(), r#type) {
609 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
610 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
612 _ => Ok(None),
613 }
614 }
615
616 fn compute_node_total_cpu_count(&self) -> usize {
617 self.worker_extra_info
618 .values()
619 .filter(|info| info.r#type == PbWorkerType::ComputeNode)
620 .map(|info| info.resource.total_cpu_cores as usize)
621 .sum()
622 }
623
624 pub async fn add_worker(
625 &mut self,
626 r#type: PbWorkerType,
627 host_address: HostAddress,
628 add_property: AddNodeProperty,
629 resource: PbResource,
630 ttl: Duration,
631 ) -> MetaResult<WorkerId> {
632 let txn = self.db.begin().await?;
633
634 let worker = Worker::find()
635 .filter(
636 worker::Column::Host
637 .eq(host_address.host.clone())
638 .and(worker::Column::Port.eq(host_address.port)),
639 )
640 .find_also_related(WorkerProperty)
641 .one(&txn)
642 .await?;
643 if let Some((worker, property)) = worker {
645 assert_eq!(worker.worker_type, r#type.into());
646 return if worker.worker_type == WorkerType::ComputeNode {
647 let property = property.unwrap();
648 let mut current_parallelism = property.parallelism as usize;
649 let new_parallelism = add_property.parallelism as usize;
650 match new_parallelism.cmp(¤t_parallelism) {
651 Ordering::Less => {
652 if !self.disable_automatic_parallelism_control {
653 tracing::info!(
655 "worker {} parallelism reduced from {} to {}",
656 worker.worker_id,
657 current_parallelism,
658 new_parallelism
659 );
660 current_parallelism = new_parallelism;
661 } else {
662 tracing::warn!(
665 "worker {} parallelism is less than current, current is {}, but received {}",
666 worker.worker_id,
667 current_parallelism,
668 new_parallelism
669 );
670 }
671 }
672 Ordering::Greater => {
673 tracing::info!(
674 "worker {} parallelism updated from {} to {}",
675 worker.worker_id,
676 current_parallelism,
677 new_parallelism
678 );
679 current_parallelism = new_parallelism;
680 }
681 Ordering::Equal => {}
682 }
683 let mut property: worker_property::ActiveModel = property.into();
684
685 property.is_streaming = Set(add_property.is_streaming);
687 property.is_serving = Set(add_property.is_serving);
688 property.parallelism = Set(current_parallelism as _);
689 property.resource_group =
690 Set(Some(add_property.resource_group.unwrap_or_else(|| {
691 tracing::warn!(
692 "resource_group is not set for worker {}, fallback to `default`",
693 worker.worker_id
694 );
695 DEFAULT_RESOURCE_GROUP.to_owned()
696 })));
697
698 WorkerProperty::update(property).exec(&txn).await?;
699 txn.commit().await?;
700 self.update_worker_ttl(worker.worker_id, ttl)?;
701 self.update_resource_and_started_at(worker.worker_id, resource)?;
702 Ok(worker.worker_id)
703 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
704 let worker_property = worker_property::ActiveModel {
705 worker_id: Set(worker.worker_id),
706 parallelism: Set(add_property
707 .parallelism
708 .try_into()
709 .expect("invalid parallelism")),
710 is_streaming: Set(add_property.is_streaming),
711 is_serving: Set(add_property.is_serving),
712 is_unschedulable: Set(add_property.is_unschedulable),
713 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
714 resource_group: Set(None),
715 };
716 WorkerProperty::insert(worker_property).exec(&txn).await?;
717 txn.commit().await?;
718 self.update_worker_ttl(worker.worker_id, ttl)?;
719 self.update_resource_and_started_at(worker.worker_id, resource)?;
720 Ok(worker.worker_id)
721 } else {
722 self.update_worker_ttl(worker.worker_id, ttl)?;
723 self.update_resource_and_started_at(worker.worker_id, resource)?;
724 Ok(worker.worker_id)
725 };
726 }
727 let txn_id = self.apply_transaction_id(r#type)?;
728
729 let worker = worker::ActiveModel {
730 worker_id: Default::default(),
731 worker_type: Set(r#type.into()),
732 host: Set(host_address.host),
733 port: Set(host_address.port),
734 status: Set(WorkerStatus::Starting),
735 transaction_id: Set(txn_id),
736 };
737 let insert_res = Worker::insert(worker).exec(&txn).await?;
738 let worker_id = insert_res.last_insert_id as WorkerId;
739 if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend {
740 let property = worker_property::ActiveModel {
741 worker_id: Set(worker_id),
742 parallelism: Set(add_property
743 .parallelism
744 .try_into()
745 .expect("invalid parallelism")),
746 is_streaming: Set(add_property.is_streaming),
747 is_serving: Set(add_property.is_serving),
748 is_unschedulable: Set(add_property.is_unschedulable),
749 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
750 resource_group: if r#type == PbWorkerType::ComputeNode {
751 Set(add_property.resource_group.clone())
752 } else {
753 Set(None)
754 },
755 };
756 WorkerProperty::insert(property).exec(&txn).await?;
757 }
758
759 txn.commit().await?;
760 if let Some(txn_id) = txn_id {
761 self.available_transactional_ids.retain(|id| *id != txn_id);
762 }
763 let extra_info = WorkerExtraInfo {
764 started_at: Some(timestamp_now_sec()),
765 expire_at: None,
766 resource,
767 r#type,
768 };
769 self.worker_extra_info.insert(worker_id, extra_info);
770
771 Ok(worker_id)
772 }
773
774 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
775 let worker = worker::ActiveModel {
776 worker_id: Set(worker_id),
777 status: Set(WorkerStatus::Running),
778 ..Default::default()
779 };
780
781 let worker = worker.update(&self.db).await?;
782 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
783 .one(&self.db)
784 .await?;
785 let extra_info = self.get_extra_info_checked(worker_id)?;
786 Ok(WorkerInfo(worker, worker_property, extra_info).into())
787 }
788
789 pub async fn update_schedulability(
790 &self,
791 worker_ids: Vec<WorkerId>,
792 schedulability: Schedulability,
793 ) -> MetaResult<()> {
794 let is_unschedulable = schedulability == Schedulability::Unschedulable;
795 WorkerProperty::update_many()
796 .col_expr(
797 worker_property::Column::IsUnschedulable,
798 Expr::value(is_unschedulable),
799 )
800 .filter(worker_property::Column::WorkerId.is_in(worker_ids))
801 .exec(&self.db)
802 .await?;
803
804 Ok(())
805 }
806
807 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
808 let worker = Worker::find()
809 .filter(
810 worker::Column::Host
811 .eq(host_addr.host)
812 .and(worker::Column::Port.eq(host_addr.port)),
813 )
814 .find_also_related(WorkerProperty)
815 .one(&self.db)
816 .await?;
817 let Some((worker, property)) = worker else {
818 return Err(MetaError::invalid_parameter("worker not found!"));
819 };
820
821 let res = Worker::delete_by_id(worker.worker_id)
822 .exec(&self.db)
823 .await?;
824 if res.rows_affected == 0 {
825 return Err(MetaError::invalid_parameter("worker not found!"));
826 }
827
828 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
829 if let Some(txn_id) = &worker.transaction_id {
830 self.available_transactional_ids.push_back(*txn_id);
831 }
832 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
833
834 Ok(worker)
835 }
836
837 pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
838 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
839 worker_info.update_ttl(ttl);
840 Ok(())
841 } else {
842 Err(MetaError::invalid_worker(worker_id, "worker not found"))
843 }
844 }
845
846 pub async fn list_workers(
847 &self,
848 worker_type: Option<WorkerType>,
849 worker_status: Option<WorkerStatus>,
850 ) -> MetaResult<Vec<PbWorkerNode>> {
851 let mut find = Worker::find();
852 if let Some(worker_type) = worker_type {
853 find = find.filter(worker::Column::WorkerType.eq(worker_type));
854 }
855 if let Some(worker_status) = worker_status {
856 find = find.filter(worker::Column::Status.eq(worker_status));
857 }
858 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
859 Ok(workers
860 .into_iter()
861 .map(|(worker, property)| {
862 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
863 WorkerInfo(worker, property, extra_info).into()
864 })
865 .collect_vec())
866 }
867
868 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
869 let workers = Worker::find()
870 .filter(
871 worker::Column::WorkerType
872 .eq(WorkerType::ComputeNode)
873 .and(worker::Column::Status.eq(WorkerStatus::Running)),
874 )
875 .inner_join(WorkerProperty)
876 .select_also(WorkerProperty)
877 .filter(worker_property::Column::IsStreaming.eq(true))
878 .all(&self.db)
879 .await?;
880
881 Ok(workers
882 .into_iter()
883 .map(|(worker, property)| {
884 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
885 WorkerInfo(worker, property, extra_info).into()
886 })
887 .collect_vec())
888 }
889
890 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
891 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
892 .select_only()
893 .column(worker_property::Column::WorkerId)
894 .column(worker_property::Column::Parallelism)
895 .inner_join(Worker)
896 .filter(worker::Column::Status.eq(WorkerStatus::Running))
897 .into_tuple()
898 .all(&self.db)
899 .await?;
900 Ok(worker_parallelisms
901 .into_iter()
902 .flat_map(|(worker_id, parallelism)| {
903 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id as u32, idx as usize))
904 })
905 .collect_vec())
906 }
907
908 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
909 let workers = Worker::find()
910 .filter(
911 worker::Column::WorkerType
912 .eq(WorkerType::ComputeNode)
913 .and(worker::Column::Status.eq(WorkerStatus::Running)),
914 )
915 .inner_join(WorkerProperty)
916 .select_also(WorkerProperty)
917 .filter(worker_property::Column::IsServing.eq(true))
918 .all(&self.db)
919 .await?;
920
921 Ok(workers
922 .into_iter()
923 .map(|(worker, property)| {
924 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
925 WorkerInfo(worker, property, extra_info).into()
926 })
927 .collect_vec())
928 }
929
930 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
931 let mut streaming_workers = self.list_active_streaming_workers().await?;
932
933 let unschedulable_workers: HashSet<_> = streaming_workers
934 .extract_if(.., |worker| {
935 worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
936 })
937 .map(|w| w.id)
938 .collect();
939
940 let schedulable_workers = streaming_workers
941 .iter()
942 .map(|worker| worker.id)
943 .filter(|id| !unschedulable_workers.contains(id))
944 .collect();
945
946 let active_workers: HashMap<_, _> =
947 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
948
949 Ok(StreamingClusterInfo {
950 worker_nodes: active_workers,
951 schedulable_workers,
952 unschedulable_workers,
953 })
954 }
955
956 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
957 let worker = Worker::find_by_id(worker_id)
958 .find_also_related(WorkerProperty)
959 .one(&self.db)
960 .await?;
961 if worker.is_none() {
962 return Ok(None);
963 }
964 let extra_info = self.get_extra_info_checked(worker_id)?;
965 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
966 }
967
968 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
969 self.worker_extra_info.get(&worker_id).cloned()
970 }
971}
972
973#[cfg(test)]
974mod tests {
975 use super::*;
976
977 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
978 (0..count)
979 .map(|i| HostAddress {
980 host: "localhost".to_owned(),
981 port: 5000 + i as i32,
982 })
983 .collect_vec()
984 }
985
986 #[tokio::test]
987 async fn test_cluster_controller() -> MetaResult<()> {
988 let env = MetaSrvEnv::for_test().await;
989 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
990
991 let parallelism_num = 4_usize;
992 let worker_count = 5_usize;
993 let property = AddNodeProperty {
994 parallelism: parallelism_num as _,
995 is_streaming: true,
996 is_serving: true,
997 is_unschedulable: false,
998 ..Default::default()
999 };
1000 let hosts = mock_worker_hosts_for_test(worker_count);
1001 let mut worker_ids = vec![];
1002 for host in &hosts {
1003 worker_ids.push(
1004 cluster_ctl
1005 .add_worker(
1006 PbWorkerType::ComputeNode,
1007 host.clone(),
1008 property.clone(),
1009 PbResource::default(),
1010 )
1011 .await?,
1012 );
1013 }
1014
1015 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1017
1018 for id in &worker_ids {
1019 cluster_ctl.activate_worker(*id).await?;
1020 }
1021 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1022 assert_eq!(
1023 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1024 worker_count
1025 );
1026 assert_eq!(
1027 cluster_ctl.list_active_streaming_workers().await?.len(),
1028 worker_count
1029 );
1030 assert_eq!(
1031 cluster_ctl.list_active_serving_workers().await?.len(),
1032 worker_count
1033 );
1034 assert_eq!(
1035 cluster_ctl.list_active_worker_slots().await?.len(),
1036 parallelism_num * worker_count
1037 );
1038
1039 let mut new_property = property.clone();
1041 new_property.parallelism = (parallelism_num * 2) as _;
1042 new_property.is_serving = false;
1043 cluster_ctl
1044 .add_worker(
1045 PbWorkerType::ComputeNode,
1046 hosts[0].clone(),
1047 new_property,
1048 PbResource::default(),
1049 )
1050 .await?;
1051
1052 assert_eq!(
1053 cluster_ctl.list_active_streaming_workers().await?.len(),
1054 worker_count
1055 );
1056 assert_eq!(
1057 cluster_ctl.list_active_serving_workers().await?.len(),
1058 worker_count - 1
1059 );
1060 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1061 assert!(worker_slots.iter().all_unique());
1062 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1063
1064 for host in hosts {
1066 cluster_ctl.delete_worker(host).await?;
1067 }
1068 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1069 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1070 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1071
1072 Ok(())
1073 }
1074
1075 #[tokio::test]
1076 async fn test_update_schedulability() -> MetaResult<()> {
1077 let env = MetaSrvEnv::for_test().await;
1078 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1079
1080 let host = HostAddress {
1081 host: "localhost".to_owned(),
1082 port: 5001,
1083 };
1084 let mut property = AddNodeProperty {
1085 is_streaming: true,
1086 is_serving: true,
1087 is_unschedulable: false,
1088 parallelism: 4,
1089 ..Default::default()
1090 };
1091 let worker_id = cluster_ctl
1092 .add_worker(
1093 PbWorkerType::ComputeNode,
1094 host.clone(),
1095 property.clone(),
1096 PbResource::default(),
1097 )
1098 .await?;
1099
1100 cluster_ctl.activate_worker(worker_id).await?;
1101 cluster_ctl
1102 .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1103 .await?;
1104
1105 let workers = cluster_ctl.list_active_streaming_workers().await?;
1106 assert_eq!(workers.len(), 1);
1107 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1108
1109 property.is_unschedulable = false;
1111 property.is_serving = false;
1112 let new_worker_id = cluster_ctl
1113 .add_worker(
1114 PbWorkerType::ComputeNode,
1115 host.clone(),
1116 property,
1117 PbResource::default(),
1118 )
1119 .await?;
1120 assert_eq!(worker_id, new_worker_id);
1121
1122 let workers = cluster_ctl.list_active_streaming_workers().await?;
1123 assert_eq!(workers.len(), 1);
1124 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1125
1126 cluster_ctl.delete_worker(host).await?;
1127
1128 Ok(())
1129 }
1130}