1use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37 ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45 TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61 env: MetaSrvEnv,
62 max_heartbeat_interval: Duration,
63 inner: RwLock<ClusterControllerInner>,
64 started_at: u64,
66}
67
68struct WorkerInfo(
69 worker::Model,
70 Option<worker_property::Model>,
71 WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75 fn from(info: WorkerInfo) -> Self {
76 Self {
77 id: info.0.worker_id as _,
78 r#type: PbWorkerType::from(info.0.worker_type) as _,
79 host: Some(PbHostAddress {
80 host: info.0.host,
81 port: info.0.port,
82 }),
83 state: PbState::from(info.0.status) as _,
84 property: info.1.as_ref().map(|p| PbProperty {
85 is_streaming: p.is_streaming,
86 is_serving: p.is_serving,
87 is_unschedulable: p.is_unschedulable,
88 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89 resource_group: p.resource_group.clone(),
90 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91 }),
92 transactional_id: info.0.transaction_id.map(|id| id as _),
93 resource: Some(info.2.resource),
94 started_at: info.2.started_at,
95 }
96 }
97}
98
99impl ClusterController {
100 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
101 let inner = ClusterControllerInner::new(
102 env.meta_store_ref().conn.clone(),
103 env.opts.disable_automatic_parallelism_control,
104 )
105 .await?;
106 Ok(Self {
107 env,
108 max_heartbeat_interval,
109 inner: RwLock::new(inner),
110 started_at: timestamp_now_sec(),
111 })
112 }
113
114 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
117 self.inner.read().await
118 }
119
120 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
121 self.inner.read().await.count_worker_by_type().await
122 }
123
124 pub async fn cluster_resource(&self) -> ClusterResource {
126 self.inner.read().await.cluster_resource()
127 }
128
129 async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
131 let resource = self.cluster_resource().await;
132
133 LicenseManager::get().update_cluster_resource(resource);
135 self.env.notification_manager().notify_all_without_version(
137 Operation::Update, Info::ClusterResource(resource),
139 );
140
141 Ok(())
142 }
143
144 pub async fn add_worker(
149 &self,
150 r#type: PbWorkerType,
151 host_address: HostAddress,
152 property: AddNodeProperty,
153 resource: PbResource,
154 ) -> MetaResult<WorkerId> {
155 let worker_id = self
156 .inner
157 .write()
158 .await
159 .add_worker(
160 r#type,
161 host_address,
162 property,
163 resource,
164 self.max_heartbeat_interval,
165 )
166 .await?;
167
168 self.update_cluster_resource_for_license().await?;
169
170 Ok(worker_id)
171 }
172
173 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
174 let inner = self.inner.write().await;
175 let worker = inner.activate_worker(worker_id).await?;
176
177 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
180 {
181 self.env
182 .notification_manager()
183 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
184 .await;
185 }
186 self.env
187 .notification_manager()
188 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
189
190 Ok(())
191 }
192
193 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
194 let worker = self.inner.write().await.delete_worker(host_address).await?;
195
196 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
197 {
198 self.env
199 .notification_manager()
200 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
201 .await;
202 if worker.r#type() == PbWorkerType::ComputeNode {
203 self.update_cluster_resource_for_license().await?;
204 }
205 }
206
207 self.env
211 .notification_manager()
212 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
213
214 Ok(worker)
215 }
216
217 pub async fn update_schedulability(
218 &self,
219 worker_ids: Vec<WorkerId>,
220 schedulability: Schedulability,
221 ) -> MetaResult<()> {
222 self.inner
223 .write()
224 .await
225 .update_schedulability(worker_ids, schedulability)
226 .await
227 }
228
229 pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
231 tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
232 self.inner
233 .write()
234 .await
235 .heartbeat(worker_id, self.max_heartbeat_interval)
236 }
237
238 pub fn start_heartbeat_checker(
239 cluster_controller: ClusterControllerRef,
240 check_interval: Duration,
241 ) -> (JoinHandle<()>, Sender<()>) {
242 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
243 let join_handle = tokio::spawn(async move {
244 let mut min_interval = tokio::time::interval(check_interval);
245 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
246 loop {
247 tokio::select! {
248 _ = min_interval.tick() => {},
250 _ = &mut shutdown_rx => {
252 tracing::info!("Heartbeat checker is stopped");
253 return;
254 }
255 }
256
257 let mut inner = cluster_controller.inner.write().await;
258 for worker in inner
260 .worker_extra_info
261 .values_mut()
262 .filter(|worker| worker.expire_at.is_none())
263 {
264 worker.update_ttl(cluster_controller.max_heartbeat_interval);
265 }
266
267 let now = timestamp_now_sec();
269 let worker_to_delete = inner
270 .worker_extra_info
271 .iter()
272 .filter(|(_, info)| info.expire_at.unwrap() < now)
273 .map(|(id, _)| *id)
274 .collect_vec();
275
276 let worker_infos = match Worker::find()
278 .select_only()
279 .column(worker::Column::WorkerId)
280 .column(worker::Column::WorkerType)
281 .column(worker::Column::Host)
282 .column(worker::Column::Port)
283 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
284 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
285 .all(&inner.db)
286 .await
287 {
288 Ok(keys) => keys,
289 Err(err) => {
290 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
291 continue;
292 }
293 };
294 drop(inner);
295
296 for (worker_id, worker_type, host, port) in worker_infos {
297 let host_addr = PbHostAddress { host, port };
298 match cluster_controller.delete_worker(host_addr.clone()).await {
299 Ok(_) => {
300 tracing::warn!(
301 worker_id,
302 ?host_addr,
303 %now,
304 "Deleted expired worker"
305 );
306 match worker_type {
307 WorkerType::Frontend
308 | WorkerType::ComputeNode
309 | WorkerType::Compactor
310 | WorkerType::RiseCtl => cluster_controller
311 .env
312 .notification_manager()
313 .delete_sender(worker_type.into(), WorkerKey(host_addr)),
314 _ => {}
315 };
316 }
317 Err(err) => {
318 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
319 }
320 }
321 }
322 }
323 });
324
325 (join_handle, shutdown_tx)
326 }
327
328 pub async fn list_workers(
333 &self,
334 worker_type: Option<WorkerType>,
335 worker_status: Option<WorkerStatus>,
336 ) -> MetaResult<Vec<PbWorkerNode>> {
337 let mut workers = vec![];
338 if worker_type.is_none() {
340 workers.push(meta_node_info(
341 &self.env.opts.advertise_addr,
342 Some(self.started_at),
343 ));
344 }
345 workers.extend(
346 self.inner
347 .read()
348 .await
349 .list_workers(worker_type, worker_status)
350 .await?,
351 );
352 Ok(workers)
353 }
354
355 pub(crate) async fn subscribe_active_streaming_compute_nodes(
356 &self,
357 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
358 let inner = self.inner.read().await;
359 let worker_nodes = inner.list_active_streaming_workers().await?;
360 let (tx, rx) = unbounded_channel();
361
362 self.env.notification_manager().insert_local_sender(tx);
364 drop(inner);
365 Ok((worker_nodes, rx))
366 }
367
368 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
371 self.inner
372 .read()
373 .await
374 .list_active_streaming_workers()
375 .await
376 }
377
378 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
379 self.inner.read().await.list_active_worker_slots().await
380 }
381
382 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
385 self.inner.read().await.list_active_serving_workers().await
386 }
387
388 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
390 self.inner.read().await.get_streaming_cluster_info().await
391 }
392
393 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
394 self.inner.read().await.get_worker_by_id(worker_id).await
395 }
396
397 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
398 self.inner
399 .read()
400 .await
401 .get_worker_extra_info_by_id(worker_id)
402 }
403
404 pub fn cluster_id(&self) -> &ClusterId {
405 self.env.cluster_id()
406 }
407
408 pub fn meta_store_endpoint(&self) -> String {
409 self.env.meta_store_ref().endpoint.clone()
410 }
411}
412
413#[derive(Debug, Clone)]
415pub struct StreamingClusterInfo {
416 pub worker_nodes: HashMap<u32, WorkerNode>,
418
419 pub schedulable_workers: HashSet<u32>,
421
422 pub unschedulable_workers: HashSet<u32>,
424}
425
426impl StreamingClusterInfo {
428 pub fn parallelism(&self, resource_group: &str) -> usize {
429 let available_worker_ids =
430 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
431
432 self.worker_nodes
433 .values()
434 .filter(|worker| available_worker_ids.contains(&(worker.id as WorkerId)))
435 .map(|worker| worker.compute_node_parallelism())
436 .sum()
437 }
438
439 pub fn filter_schedulable_workers_by_resource_group(
440 &self,
441 resource_group: &str,
442 ) -> HashMap<u32, WorkerNode> {
443 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
444 self.worker_nodes
445 .iter()
446 .filter(|(id, _)| worker_ids.contains(&(**id as WorkerId)))
447 .map(|(id, worker)| (*id, worker.clone()))
448 .collect()
449 }
450}
451
452#[derive(Default, Clone, Debug)]
453pub struct WorkerExtraInfo {
454 expire_at: Option<u64>,
458 started_at: Option<u64>,
459 resource: PbResource,
460}
461
462impl WorkerExtraInfo {
463 fn update_ttl(&mut self, ttl: Duration) {
464 let expire = cmp::max(
465 self.expire_at.unwrap_or_default(),
466 SystemTime::now()
467 .add(ttl)
468 .duration_since(SystemTime::UNIX_EPOCH)
469 .expect("Clock may have gone backwards")
470 .as_secs(),
471 );
472 self.expire_at = Some(expire);
473 }
474
475 fn update_started_at(&mut self) {
476 self.started_at = Some(timestamp_now_sec());
477 }
478}
479
480fn timestamp_now_sec() -> u64 {
481 SystemTime::now()
482 .duration_since(SystemTime::UNIX_EPOCH)
483 .expect("Clock may have gone backwards")
484 .as_secs()
485}
486
487fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
488 PbWorkerNode {
489 id: META_NODE_ID,
490 r#type: PbWorkerType::Meta.into(),
491 host: HostAddr::try_from(host)
492 .as_ref()
493 .map(HostAddr::to_protobuf)
494 .ok(),
495 state: PbState::Running as _,
496 property: None,
497 transactional_id: None,
498 resource: Some(risingwave_pb::common::worker_node::Resource {
499 rw_version: RW_VERSION.to_owned(),
500 total_memory_bytes: system_memory_available_bytes() as _,
501 total_cpu_cores: total_cpu_available() as _,
502 hostname: hostname(),
503 }),
504 started_at,
505 }
506}
507
508pub struct ClusterControllerInner {
509 db: DatabaseConnection,
510 available_transactional_ids: VecDeque<TransactionId>,
512 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
513 disable_automatic_parallelism_control: bool,
514}
515
516impl ClusterControllerInner {
517 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
518 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
519
520 pub async fn new(
521 db: DatabaseConnection,
522 disable_automatic_parallelism_control: bool,
523 ) -> MetaResult<Self> {
524 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
525 .select_only()
526 .column(worker::Column::WorkerId)
527 .column(worker::Column::TransactionId)
528 .into_tuple()
529 .all(&db)
530 .await?;
531 let inuse_txn_ids: HashSet<_> = workers
532 .iter()
533 .cloned()
534 .filter_map(|(_, txn_id)| txn_id)
535 .collect();
536 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
537 .filter(|id| !inuse_txn_ids.contains(id))
538 .collect();
539
540 let worker_extra_info = workers
541 .into_iter()
542 .map(|(w, _)| (w, WorkerExtraInfo::default()))
543 .collect();
544
545 Ok(Self {
546 db,
547 available_transactional_ids,
548 worker_extra_info,
549 disable_automatic_parallelism_control,
550 })
551 }
552
553 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
554 let workers: Vec<(WorkerType, i64)> = Worker::find()
555 .select_only()
556 .column(worker::Column::WorkerType)
557 .column_as(worker::Column::WorkerId.count(), "count")
558 .group_by(worker::Column::WorkerType)
559 .into_tuple()
560 .all(&self.db)
561 .await?;
562
563 Ok(workers.into_iter().collect())
564 }
565
566 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
567 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
568 let expire = cmp::max(
569 info.expire_at.unwrap_or_default(),
570 SystemTime::now()
571 .add(ttl)
572 .duration_since(SystemTime::UNIX_EPOCH)
573 .expect("Clock may have gone backwards")
574 .as_secs(),
575 );
576 info.expire_at = Some(expire);
577 Ok(())
578 } else {
579 Err(MetaError::invalid_worker(worker_id, "worker not found"))
580 }
581 }
582
583 fn update_resource_and_started_at(
584 &mut self,
585 worker_id: WorkerId,
586 resource: PbResource,
587 ) -> MetaResult<()> {
588 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
589 info.resource = resource;
590 info.update_started_at();
591 Ok(())
592 } else {
593 Err(MetaError::invalid_worker(worker_id, "worker not found"))
594 }
595 }
596
597 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
598 self.worker_extra_info
599 .get(&worker_id)
600 .cloned()
601 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
602 }
603
604 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
605 match (self.available_transactional_ids.front(), r#type) {
606 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
607 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
609 _ => Ok(None),
610 }
611 }
612
613 fn cluster_resource(&self) -> ClusterResource {
615 let mut per_host = HashMap::new();
617
618 for info in self.worker_extra_info.values() {
619 let r = per_host
620 .entry(info.resource.hostname.as_str())
621 .or_insert_with(ClusterResource::default);
622
623 r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
624 r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
625 }
626
627 per_host
629 .into_values()
630 .reduce(|a, b| ClusterResource {
631 total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
632 total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
633 })
634 .unwrap_or_default()
635 }
636
637 pub async fn add_worker(
638 &mut self,
639 r#type: PbWorkerType,
640 host_address: HostAddress,
641 add_property: AddNodeProperty,
642 resource: PbResource,
643 ttl: Duration,
644 ) -> MetaResult<WorkerId> {
645 let txn = self.db.begin().await?;
646
647 let worker = Worker::find()
648 .filter(
649 worker::Column::Host
650 .eq(host_address.host.clone())
651 .and(worker::Column::Port.eq(host_address.port)),
652 )
653 .find_also_related(WorkerProperty)
654 .one(&txn)
655 .await?;
656 if let Some((worker, property)) = worker {
658 assert_eq!(worker.worker_type, r#type.into());
659 return if worker.worker_type == WorkerType::ComputeNode {
660 let property = property.unwrap();
661 let mut current_parallelism = property.parallelism as usize;
662 let new_parallelism = add_property.parallelism as usize;
663 match new_parallelism.cmp(¤t_parallelism) {
664 Ordering::Less => {
665 if !self.disable_automatic_parallelism_control {
666 tracing::info!(
668 "worker {} parallelism reduced from {} to {}",
669 worker.worker_id,
670 current_parallelism,
671 new_parallelism
672 );
673 current_parallelism = new_parallelism;
674 } else {
675 tracing::warn!(
678 "worker {} parallelism is less than current, current is {}, but received {}",
679 worker.worker_id,
680 current_parallelism,
681 new_parallelism
682 );
683 }
684 }
685 Ordering::Greater => {
686 tracing::info!(
687 "worker {} parallelism updated from {} to {}",
688 worker.worker_id,
689 current_parallelism,
690 new_parallelism
691 );
692 current_parallelism = new_parallelism;
693 }
694 Ordering::Equal => {}
695 }
696 let mut property: worker_property::ActiveModel = property.into();
697
698 property.is_streaming = Set(add_property.is_streaming);
700 property.is_serving = Set(add_property.is_serving);
701 property.parallelism = Set(current_parallelism as _);
702 property.resource_group =
703 Set(Some(add_property.resource_group.unwrap_or_else(|| {
704 tracing::warn!(
705 "resource_group is not set for worker {}, fallback to `default`",
706 worker.worker_id
707 );
708 DEFAULT_RESOURCE_GROUP.to_owned()
709 })));
710
711 WorkerProperty::update(property).exec(&txn).await?;
712 txn.commit().await?;
713 self.update_worker_ttl(worker.worker_id, ttl)?;
714 self.update_resource_and_started_at(worker.worker_id, resource)?;
715 Ok(worker.worker_id)
716 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
717 let worker_property = worker_property::ActiveModel {
718 worker_id: Set(worker.worker_id),
719 parallelism: Set(add_property
720 .parallelism
721 .try_into()
722 .expect("invalid parallelism")),
723 is_streaming: Set(add_property.is_streaming),
724 is_serving: Set(add_property.is_serving),
725 is_unschedulable: Set(add_property.is_unschedulable),
726 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
727 resource_group: Set(None),
728 };
729 WorkerProperty::insert(worker_property).exec(&txn).await?;
730 txn.commit().await?;
731 self.update_worker_ttl(worker.worker_id, ttl)?;
732 self.update_resource_and_started_at(worker.worker_id, resource)?;
733 Ok(worker.worker_id)
734 } else {
735 self.update_worker_ttl(worker.worker_id, ttl)?;
736 self.update_resource_and_started_at(worker.worker_id, resource)?;
737 Ok(worker.worker_id)
738 };
739 }
740 let txn_id = self.apply_transaction_id(r#type)?;
741
742 let worker = worker::ActiveModel {
743 worker_id: Default::default(),
744 worker_type: Set(r#type.into()),
745 host: Set(host_address.host.clone()),
746 port: Set(host_address.port),
747 status: Set(WorkerStatus::Starting),
748 transaction_id: Set(txn_id),
749 };
750 let insert_res = Worker::insert(worker).exec(&txn).await?;
751 let worker_id = insert_res.last_insert_id as WorkerId;
752 if r#type == PbWorkerType::ComputeNode || r#type == PbWorkerType::Frontend {
753 let property = worker_property::ActiveModel {
754 worker_id: Set(worker_id),
755 parallelism: Set(add_property
756 .parallelism
757 .try_into()
758 .expect("invalid parallelism")),
759 is_streaming: Set(add_property.is_streaming),
760 is_serving: Set(add_property.is_serving),
761 is_unschedulable: Set(add_property.is_unschedulable),
762 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
763 resource_group: if r#type == PbWorkerType::ComputeNode {
764 Set(add_property.resource_group.clone())
765 } else {
766 Set(None)
767 },
768 };
769 WorkerProperty::insert(property).exec(&txn).await?;
770 }
771
772 txn.commit().await?;
773 if let Some(txn_id) = txn_id {
774 self.available_transactional_ids.retain(|id| *id != txn_id);
775 }
776 let extra_info = WorkerExtraInfo {
777 started_at: Some(timestamp_now_sec()),
778 expire_at: None,
779 resource,
780 };
781 self.worker_extra_info.insert(worker_id, extra_info);
782
783 Ok(worker_id)
784 }
785
786 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
787 let worker = worker::ActiveModel {
788 worker_id: Set(worker_id),
789 status: Set(WorkerStatus::Running),
790 ..Default::default()
791 };
792
793 let worker = worker.update(&self.db).await?;
794 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
795 .one(&self.db)
796 .await?;
797 let extra_info = self.get_extra_info_checked(worker_id)?;
798 Ok(WorkerInfo(worker, worker_property, extra_info).into())
799 }
800
801 pub async fn update_schedulability(
802 &self,
803 worker_ids: Vec<WorkerId>,
804 schedulability: Schedulability,
805 ) -> MetaResult<()> {
806 let is_unschedulable = schedulability == Schedulability::Unschedulable;
807 WorkerProperty::update_many()
808 .col_expr(
809 worker_property::Column::IsUnschedulable,
810 Expr::value(is_unschedulable),
811 )
812 .filter(worker_property::Column::WorkerId.is_in(worker_ids))
813 .exec(&self.db)
814 .await?;
815
816 Ok(())
817 }
818
819 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
820 let worker = Worker::find()
821 .filter(
822 worker::Column::Host
823 .eq(host_addr.host)
824 .and(worker::Column::Port.eq(host_addr.port)),
825 )
826 .find_also_related(WorkerProperty)
827 .one(&self.db)
828 .await?;
829 let Some((worker, property)) = worker else {
830 return Err(MetaError::invalid_parameter("worker not found!"));
831 };
832
833 let res = Worker::delete_by_id(worker.worker_id)
834 .exec(&self.db)
835 .await?;
836 if res.rows_affected == 0 {
837 return Err(MetaError::invalid_parameter("worker not found!"));
838 }
839
840 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
841 if let Some(txn_id) = &worker.transaction_id {
842 self.available_transactional_ids.push_back(*txn_id);
843 }
844 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
845
846 Ok(worker)
847 }
848
849 pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
850 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
851 worker_info.update_ttl(ttl);
852 Ok(())
853 } else {
854 Err(MetaError::invalid_worker(worker_id, "worker not found"))
855 }
856 }
857
858 pub async fn list_workers(
859 &self,
860 worker_type: Option<WorkerType>,
861 worker_status: Option<WorkerStatus>,
862 ) -> MetaResult<Vec<PbWorkerNode>> {
863 let mut find = Worker::find();
864 if let Some(worker_type) = worker_type {
865 find = find.filter(worker::Column::WorkerType.eq(worker_type));
866 }
867 if let Some(worker_status) = worker_status {
868 find = find.filter(worker::Column::Status.eq(worker_status));
869 }
870 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
871 Ok(workers
872 .into_iter()
873 .map(|(worker, property)| {
874 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
875 WorkerInfo(worker, property, extra_info).into()
876 })
877 .collect_vec())
878 }
879
880 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
881 let workers = Worker::find()
882 .filter(
883 worker::Column::WorkerType
884 .eq(WorkerType::ComputeNode)
885 .and(worker::Column::Status.eq(WorkerStatus::Running)),
886 )
887 .inner_join(WorkerProperty)
888 .select_also(WorkerProperty)
889 .filter(worker_property::Column::IsStreaming.eq(true))
890 .all(&self.db)
891 .await?;
892
893 Ok(workers
894 .into_iter()
895 .map(|(worker, property)| {
896 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
897 WorkerInfo(worker, property, extra_info).into()
898 })
899 .collect_vec())
900 }
901
902 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
903 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
904 .select_only()
905 .column(worker_property::Column::WorkerId)
906 .column(worker_property::Column::Parallelism)
907 .inner_join(Worker)
908 .filter(worker::Column::Status.eq(WorkerStatus::Running))
909 .into_tuple()
910 .all(&self.db)
911 .await?;
912 Ok(worker_parallelisms
913 .into_iter()
914 .flat_map(|(worker_id, parallelism)| {
915 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id as u32, idx as usize))
916 })
917 .collect_vec())
918 }
919
920 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
921 let workers = Worker::find()
922 .filter(
923 worker::Column::WorkerType
924 .eq(WorkerType::ComputeNode)
925 .and(worker::Column::Status.eq(WorkerStatus::Running)),
926 )
927 .inner_join(WorkerProperty)
928 .select_also(WorkerProperty)
929 .filter(worker_property::Column::IsServing.eq(true))
930 .all(&self.db)
931 .await?;
932
933 Ok(workers
934 .into_iter()
935 .map(|(worker, property)| {
936 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
937 WorkerInfo(worker, property, extra_info).into()
938 })
939 .collect_vec())
940 }
941
942 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
943 let mut streaming_workers = self.list_active_streaming_workers().await?;
944
945 let unschedulable_workers: HashSet<_> = streaming_workers
946 .extract_if(.., |worker| {
947 worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
948 })
949 .map(|w| w.id)
950 .collect();
951
952 let schedulable_workers = streaming_workers
953 .iter()
954 .map(|worker| worker.id)
955 .filter(|id| !unschedulable_workers.contains(id))
956 .collect();
957
958 let active_workers: HashMap<_, _> =
959 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
960
961 Ok(StreamingClusterInfo {
962 worker_nodes: active_workers,
963 schedulable_workers,
964 unschedulable_workers,
965 })
966 }
967
968 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
969 let worker = Worker::find_by_id(worker_id)
970 .find_also_related(WorkerProperty)
971 .one(&self.db)
972 .await?;
973 if worker.is_none() {
974 return Ok(None);
975 }
976 let extra_info = self.get_extra_info_checked(worker_id)?;
977 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
978 }
979
980 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
981 self.worker_extra_info.get(&worker_id).cloned()
982 }
983}
984
985#[cfg(test)]
986mod tests {
987 use super::*;
988
989 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
990 (0..count)
991 .map(|i| HostAddress {
992 host: "localhost".to_owned(),
993 port: 5000 + i as i32,
994 })
995 .collect_vec()
996 }
997
998 #[tokio::test]
999 async fn test_cluster_controller() -> MetaResult<()> {
1000 let env = MetaSrvEnv::for_test().await;
1001 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1002
1003 let parallelism_num = 4_usize;
1004 let worker_count = 5_usize;
1005 let property = AddNodeProperty {
1006 parallelism: parallelism_num as _,
1007 is_streaming: true,
1008 is_serving: true,
1009 is_unschedulable: false,
1010 ..Default::default()
1011 };
1012 let hosts = mock_worker_hosts_for_test(worker_count);
1013 let mut worker_ids = vec![];
1014 for host in &hosts {
1015 worker_ids.push(
1016 cluster_ctl
1017 .add_worker(
1018 PbWorkerType::ComputeNode,
1019 host.clone(),
1020 property.clone(),
1021 PbResource::default(),
1022 )
1023 .await?,
1024 );
1025 }
1026
1027 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1029
1030 for id in &worker_ids {
1031 cluster_ctl.activate_worker(*id).await?;
1032 }
1033 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1034 assert_eq!(
1035 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1036 worker_count
1037 );
1038 assert_eq!(
1039 cluster_ctl.list_active_streaming_workers().await?.len(),
1040 worker_count
1041 );
1042 assert_eq!(
1043 cluster_ctl.list_active_serving_workers().await?.len(),
1044 worker_count
1045 );
1046 assert_eq!(
1047 cluster_ctl.list_active_worker_slots().await?.len(),
1048 parallelism_num * worker_count
1049 );
1050
1051 let mut new_property = property.clone();
1053 new_property.parallelism = (parallelism_num * 2) as _;
1054 new_property.is_serving = false;
1055 cluster_ctl
1056 .add_worker(
1057 PbWorkerType::ComputeNode,
1058 hosts[0].clone(),
1059 new_property,
1060 PbResource::default(),
1061 )
1062 .await?;
1063
1064 assert_eq!(
1065 cluster_ctl.list_active_streaming_workers().await?.len(),
1066 worker_count
1067 );
1068 assert_eq!(
1069 cluster_ctl.list_active_serving_workers().await?.len(),
1070 worker_count - 1
1071 );
1072 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1073 assert!(worker_slots.iter().all_unique());
1074 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1075
1076 for host in hosts {
1078 cluster_ctl.delete_worker(host).await?;
1079 }
1080 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1081 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1082 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1083
1084 Ok(())
1085 }
1086
1087 #[tokio::test]
1088 async fn test_update_schedulability() -> MetaResult<()> {
1089 let env = MetaSrvEnv::for_test().await;
1090 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1091
1092 let host = HostAddress {
1093 host: "localhost".to_owned(),
1094 port: 5001,
1095 };
1096 let mut property = AddNodeProperty {
1097 is_streaming: true,
1098 is_serving: true,
1099 is_unschedulable: false,
1100 parallelism: 4,
1101 ..Default::default()
1102 };
1103 let worker_id = cluster_ctl
1104 .add_worker(
1105 PbWorkerType::ComputeNode,
1106 host.clone(),
1107 property.clone(),
1108 PbResource::default(),
1109 )
1110 .await?;
1111
1112 cluster_ctl.activate_worker(worker_id).await?;
1113 cluster_ctl
1114 .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1115 .await?;
1116
1117 let workers = cluster_ctl.list_active_streaming_workers().await?;
1118 assert_eq!(workers.len(), 1);
1119 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1120
1121 property.is_unschedulable = false;
1123 property.is_serving = false;
1124 let new_worker_id = cluster_ctl
1125 .add_worker(
1126 PbWorkerType::ComputeNode,
1127 host.clone(),
1128 property,
1129 PbResource::default(),
1130 )
1131 .await?;
1132 assert_eq!(worker_id, new_worker_id);
1133
1134 let workers = cluster_ctl.list_active_streaming_workers().await?;
1135 assert_eq!(workers.len(), 1);
1136 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1137
1138 cluster_ctl.delete_worker(host).await?;
1139
1140 Ok(())
1141 }
1142}