1use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37 ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45 TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61 env: MetaSrvEnv,
62 max_heartbeat_interval: Duration,
63 inner: RwLock<ClusterControllerInner>,
64 started_at: u64,
66}
67
68struct WorkerInfo(
69 worker::Model,
70 Option<worker_property::Model>,
71 WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75 fn from(info: WorkerInfo) -> Self {
76 Self {
77 id: info.0.worker_id,
78 r#type: PbWorkerType::from(info.0.worker_type) as _,
79 host: Some(PbHostAddress {
80 host: info.0.host,
81 port: info.0.port,
82 }),
83 state: PbState::from(info.0.status) as _,
84 property: info.1.as_ref().map(|p| PbProperty {
85 is_streaming: p.is_streaming,
86 is_serving: p.is_serving,
87 is_unschedulable: p.is_unschedulable,
88 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89 resource_group: p.resource_group.clone(),
90 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91 is_iceberg_compactor: p.is_iceberg_compactor,
92 }),
93 transactional_id: info.0.transaction_id.map(|id| id as _),
94 resource: Some(info.2.resource),
95 started_at: info.2.started_at,
96 }
97 }
98}
99
100impl ClusterController {
101 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
102 let inner = ClusterControllerInner::new(
103 env.meta_store_ref().conn.clone(),
104 env.opts.disable_automatic_parallelism_control,
105 )
106 .await?;
107 Ok(Self {
108 env,
109 max_heartbeat_interval,
110 inner: RwLock::new(inner),
111 started_at: timestamp_now_sec(),
112 })
113 }
114
115 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
118 self.inner.read().await
119 }
120
121 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
122 self.inner.read().await.count_worker_by_type().await
123 }
124
125 pub async fn cluster_resource(&self) -> ClusterResource {
127 self.inner.read().await.cluster_resource()
128 }
129
130 async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
132 let resource = self.cluster_resource().await;
133
134 LicenseManager::get().update_cluster_resource(resource);
136 self.env.notification_manager().notify_all_without_version(
138 Operation::Update, Info::ClusterResource(resource),
140 );
141
142 Ok(())
143 }
144
145 pub async fn add_worker(
150 &self,
151 r#type: PbWorkerType,
152 host_address: HostAddress,
153 property: AddNodeProperty,
154 resource: PbResource,
155 ) -> MetaResult<WorkerId> {
156 let worker_id = self
157 .inner
158 .write()
159 .await
160 .add_worker(
161 r#type,
162 host_address,
163 property,
164 resource,
165 self.max_heartbeat_interval,
166 )
167 .await?;
168
169 self.update_cluster_resource_for_license().await?;
170
171 Ok(worker_id)
172 }
173
174 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
175 let inner = self.inner.write().await;
176 let worker = inner.activate_worker(worker_id).await?;
177
178 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
181 {
182 self.env
183 .notification_manager()
184 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
185 .await;
186 }
187 self.env
188 .notification_manager()
189 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
190
191 Ok(())
192 }
193
194 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
195 let worker = self.inner.write().await.delete_worker(host_address).await?;
196
197 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
198 {
199 self.env
200 .notification_manager()
201 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
202 .await;
203 if worker.r#type() == PbWorkerType::ComputeNode {
204 self.update_cluster_resource_for_license().await?;
205 }
206 }
207
208 self.env
212 .notification_manager()
213 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
214
215 Ok(worker)
216 }
217
218 pub async fn update_schedulability(
219 &self,
220 worker_ids: Vec<WorkerId>,
221 schedulability: Schedulability,
222 ) -> MetaResult<()> {
223 self.inner
224 .write()
225 .await
226 .update_schedulability(worker_ids, schedulability)
227 .await
228 }
229
230 pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
232 tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
233 self.inner
234 .write()
235 .await
236 .heartbeat(worker_id, self.max_heartbeat_interval)
237 }
238
239 pub fn start_heartbeat_checker(
240 cluster_controller: ClusterControllerRef,
241 check_interval: Duration,
242 ) -> (JoinHandle<()>, Sender<()>) {
243 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
244 let join_handle = tokio::spawn(async move {
245 let mut min_interval = tokio::time::interval(check_interval);
246 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
247 loop {
248 tokio::select! {
249 _ = min_interval.tick() => {},
251 _ = &mut shutdown_rx => {
253 tracing::info!("Heartbeat checker is stopped");
254 return;
255 }
256 }
257
258 let mut inner = cluster_controller.inner.write().await;
259 for worker in inner
261 .worker_extra_info
262 .values_mut()
263 .filter(|worker| worker.expire_at.is_none())
264 {
265 worker.update_ttl(cluster_controller.max_heartbeat_interval);
266 }
267
268 let now = timestamp_now_sec();
270 let worker_to_delete = inner
271 .worker_extra_info
272 .iter()
273 .filter(|(_, info)| info.expire_at.unwrap() < now)
274 .map(|(id, _)| *id)
275 .collect_vec();
276
277 let worker_infos = match Worker::find()
279 .select_only()
280 .column(worker::Column::WorkerId)
281 .column(worker::Column::WorkerType)
282 .column(worker::Column::Host)
283 .column(worker::Column::Port)
284 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
285 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
286 .all(&inner.db)
287 .await
288 {
289 Ok(keys) => keys,
290 Err(err) => {
291 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
292 continue;
293 }
294 };
295 drop(inner);
296
297 for (worker_id, worker_type, host, port) in worker_infos {
298 let host_addr = PbHostAddress { host, port };
299 match cluster_controller.delete_worker(host_addr.clone()).await {
300 Ok(_) => {
301 tracing::warn!(
302 %worker_id,
303 ?host_addr,
304 %now,
305 "Deleted expired worker"
306 );
307 match worker_type {
308 WorkerType::Frontend
309 | WorkerType::ComputeNode
310 | WorkerType::Compactor
311 | WorkerType::RiseCtl => cluster_controller
312 .env
313 .notification_manager()
314 .delete_sender(worker_type.into(), WorkerKey(host_addr)),
315 _ => {}
316 };
317 }
318 Err(err) => {
319 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
320 }
321 }
322 }
323 }
324 });
325
326 (join_handle, shutdown_tx)
327 }
328
329 pub async fn list_workers(
334 &self,
335 worker_type: Option<WorkerType>,
336 worker_status: Option<WorkerStatus>,
337 ) -> MetaResult<Vec<PbWorkerNode>> {
338 let mut workers = vec![];
339 if worker_type.is_none() {
341 workers.push(meta_node_info(
342 &self.env.opts.advertise_addr,
343 Some(self.started_at),
344 ));
345 }
346 workers.extend(
347 self.inner
348 .read()
349 .await
350 .list_workers(worker_type, worker_status)
351 .await?,
352 );
353 Ok(workers)
354 }
355
356 pub(crate) async fn subscribe_active_streaming_compute_nodes(
357 &self,
358 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
359 let inner = self.inner.read().await;
360 let worker_nodes = inner.list_active_streaming_workers().await?;
361 let (tx, rx) = unbounded_channel();
362
363 self.env.notification_manager().insert_local_sender(tx);
365 drop(inner);
366 Ok((worker_nodes, rx))
367 }
368
369 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
372 self.inner
373 .read()
374 .await
375 .list_active_streaming_workers()
376 .await
377 }
378
379 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
380 self.inner.read().await.list_active_worker_slots().await
381 }
382
383 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
386 self.inner.read().await.list_active_serving_workers().await
387 }
388
389 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
391 self.inner.read().await.get_streaming_cluster_info().await
392 }
393
394 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
395 self.inner.read().await.get_worker_by_id(worker_id).await
396 }
397
398 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
399 self.inner
400 .read()
401 .await
402 .get_worker_extra_info_by_id(worker_id)
403 }
404
405 pub fn cluster_id(&self) -> &ClusterId {
406 self.env.cluster_id()
407 }
408
409 pub fn meta_store_endpoint(&self) -> String {
410 self.env.meta_store_ref().endpoint.clone()
411 }
412}
413
414#[derive(Debug, Clone)]
416pub struct StreamingClusterInfo {
417 pub worker_nodes: HashMap<WorkerId, WorkerNode>,
419
420 pub schedulable_workers: HashSet<WorkerId>,
422
423 pub unschedulable_workers: HashSet<WorkerId>,
425}
426
427impl StreamingClusterInfo {
429 pub fn parallelism(&self, resource_group: &str) -> usize {
430 let available_worker_ids =
431 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
432
433 self.worker_nodes
434 .values()
435 .filter(|worker| available_worker_ids.contains(&(worker.id)))
436 .map(|worker| worker.compute_node_parallelism())
437 .sum()
438 }
439
440 pub fn filter_schedulable_workers_by_resource_group(
441 &self,
442 resource_group: &str,
443 ) -> HashMap<WorkerId, WorkerNode> {
444 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
445 self.worker_nodes
446 .iter()
447 .filter(|(id, _)| worker_ids.contains(*id))
448 .map(|(id, worker)| (*id, worker.clone()))
449 .collect()
450 }
451}
452
453#[derive(Default, Clone, Debug)]
454pub struct WorkerExtraInfo {
455 expire_at: Option<u64>,
459 started_at: Option<u64>,
460 resource: PbResource,
461}
462
463impl WorkerExtraInfo {
464 fn update_ttl(&mut self, ttl: Duration) {
465 let expire = cmp::max(
466 self.expire_at.unwrap_or_default(),
467 SystemTime::now()
468 .add(ttl)
469 .duration_since(SystemTime::UNIX_EPOCH)
470 .expect("Clock may have gone backwards")
471 .as_secs(),
472 );
473 self.expire_at = Some(expire);
474 }
475
476 fn update_started_at(&mut self) {
477 self.started_at = Some(timestamp_now_sec());
478 }
479}
480
481fn timestamp_now_sec() -> u64 {
482 SystemTime::now()
483 .duration_since(SystemTime::UNIX_EPOCH)
484 .expect("Clock may have gone backwards")
485 .as_secs()
486}
487
488fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
489 PbWorkerNode {
490 id: META_NODE_ID,
491 r#type: PbWorkerType::Meta.into(),
492 host: HostAddr::try_from(host)
493 .as_ref()
494 .map(HostAddr::to_protobuf)
495 .ok(),
496 state: PbState::Running as _,
497 property: None,
498 transactional_id: None,
499 resource: Some(risingwave_pb::common::worker_node::Resource {
500 rw_version: RW_VERSION.to_owned(),
501 total_memory_bytes: system_memory_available_bytes() as _,
502 total_cpu_cores: total_cpu_available() as _,
503 hostname: hostname(),
504 }),
505 started_at,
506 }
507}
508
509pub struct ClusterControllerInner {
510 db: DatabaseConnection,
511 available_transactional_ids: VecDeque<TransactionId>,
513 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
514 disable_automatic_parallelism_control: bool,
515}
516
517impl ClusterControllerInner {
518 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
519 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
520
521 pub async fn new(
522 db: DatabaseConnection,
523 disable_automatic_parallelism_control: bool,
524 ) -> MetaResult<Self> {
525 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
526 .select_only()
527 .column(worker::Column::WorkerId)
528 .column(worker::Column::TransactionId)
529 .into_tuple()
530 .all(&db)
531 .await?;
532 let inuse_txn_ids: HashSet<_> = workers
533 .iter()
534 .cloned()
535 .filter_map(|(_, txn_id)| txn_id)
536 .collect();
537 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
538 .filter(|id| !inuse_txn_ids.contains(id))
539 .collect();
540
541 let worker_extra_info = workers
542 .into_iter()
543 .map(|(w, _)| (w, WorkerExtraInfo::default()))
544 .collect();
545
546 Ok(Self {
547 db,
548 available_transactional_ids,
549 worker_extra_info,
550 disable_automatic_parallelism_control,
551 })
552 }
553
554 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
555 let workers: Vec<(WorkerType, i64)> = Worker::find()
556 .select_only()
557 .column(worker::Column::WorkerType)
558 .column_as(worker::Column::WorkerId.count(), "count")
559 .group_by(worker::Column::WorkerType)
560 .into_tuple()
561 .all(&self.db)
562 .await?;
563
564 Ok(workers.into_iter().collect())
565 }
566
567 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
568 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
569 let expire = cmp::max(
570 info.expire_at.unwrap_or_default(),
571 SystemTime::now()
572 .add(ttl)
573 .duration_since(SystemTime::UNIX_EPOCH)
574 .expect("Clock may have gone backwards")
575 .as_secs(),
576 );
577 info.expire_at = Some(expire);
578 Ok(())
579 } else {
580 Err(MetaError::invalid_worker(worker_id, "worker not found"))
581 }
582 }
583
584 fn update_resource_and_started_at(
585 &mut self,
586 worker_id: WorkerId,
587 resource: PbResource,
588 ) -> MetaResult<()> {
589 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
590 info.resource = resource;
591 info.update_started_at();
592 Ok(())
593 } else {
594 Err(MetaError::invalid_worker(worker_id, "worker not found"))
595 }
596 }
597
598 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
599 self.worker_extra_info
600 .get(&worker_id)
601 .cloned()
602 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
603 }
604
605 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
606 match (self.available_transactional_ids.front(), r#type) {
607 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
608 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
610 _ => Ok(None),
611 }
612 }
613
614 fn cluster_resource(&self) -> ClusterResource {
616 let mut per_host = HashMap::new();
618
619 for info in self.worker_extra_info.values() {
620 let r = per_host
621 .entry(info.resource.hostname.as_str())
622 .or_insert_with(ClusterResource::default);
623
624 r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
625 r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
626 }
627
628 per_host
630 .into_values()
631 .reduce(|a, b| ClusterResource {
632 total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
633 total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
634 })
635 .unwrap_or_default()
636 }
637
638 pub async fn add_worker(
639 &mut self,
640 r#type: PbWorkerType,
641 host_address: HostAddress,
642 add_property: AddNodeProperty,
643 resource: PbResource,
644 ttl: Duration,
645 ) -> MetaResult<WorkerId> {
646 let txn = self.db.begin().await?;
647
648 let worker = Worker::find()
649 .filter(
650 worker::Column::Host
651 .eq(host_address.host.clone())
652 .and(worker::Column::Port.eq(host_address.port)),
653 )
654 .find_also_related(WorkerProperty)
655 .one(&txn)
656 .await?;
657 if let Some((worker, property)) = worker {
659 assert_eq!(worker.worker_type, r#type.into());
660 return if worker.worker_type == WorkerType::ComputeNode {
661 let property = property.unwrap();
662 let mut current_parallelism = property.parallelism as usize;
663 let new_parallelism = add_property.parallelism as usize;
664 match new_parallelism.cmp(¤t_parallelism) {
665 Ordering::Less => {
666 if !self.disable_automatic_parallelism_control {
667 tracing::info!(
669 "worker {} parallelism reduced from {} to {}",
670 worker.worker_id,
671 current_parallelism,
672 new_parallelism
673 );
674 current_parallelism = new_parallelism;
675 } else {
676 tracing::warn!(
679 "worker {} parallelism is less than current, current is {}, but received {}",
680 worker.worker_id,
681 current_parallelism,
682 new_parallelism
683 );
684 }
685 }
686 Ordering::Greater => {
687 tracing::info!(
688 "worker {} parallelism updated from {} to {}",
689 worker.worker_id,
690 current_parallelism,
691 new_parallelism
692 );
693 current_parallelism = new_parallelism;
694 }
695 Ordering::Equal => {}
696 }
697 let mut property: worker_property::ActiveModel = property.into();
698
699 property.is_streaming = Set(add_property.is_streaming);
701 property.is_serving = Set(add_property.is_serving);
702 property.parallelism = Set(current_parallelism as _);
703 property.resource_group =
704 Set(Some(add_property.resource_group.unwrap_or_else(|| {
705 tracing::warn!(
706 "resource_group is not set for worker {}, fallback to `default`",
707 worker.worker_id
708 );
709 DEFAULT_RESOURCE_GROUP.to_owned()
710 })));
711
712 WorkerProperty::update(property).exec(&txn).await?;
713 txn.commit().await?;
714 self.update_worker_ttl(worker.worker_id, ttl)?;
715 self.update_resource_and_started_at(worker.worker_id, resource)?;
716 Ok(worker.worker_id)
717 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
718 let worker_property = worker_property::ActiveModel {
719 worker_id: Set(worker.worker_id),
720 parallelism: Set(add_property
721 .parallelism
722 .try_into()
723 .expect("invalid parallelism")),
724 is_streaming: Set(add_property.is_streaming),
725 is_serving: Set(add_property.is_serving),
726 is_unschedulable: Set(add_property.is_unschedulable),
727 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
728 resource_group: Set(None),
729 is_iceberg_compactor: Set(false),
730 };
731 WorkerProperty::insert(worker_property).exec(&txn).await?;
732 txn.commit().await?;
733 self.update_worker_ttl(worker.worker_id, ttl)?;
734 self.update_resource_and_started_at(worker.worker_id, resource)?;
735 Ok(worker.worker_id)
736 } else if worker.worker_type == WorkerType::Compactor {
737 if let Some(property) = property {
738 let mut property: worker_property::ActiveModel = property.into();
739 property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
740 property.internal_rpc_host_addr =
741 Set(Some(add_property.internal_rpc_host_addr));
742
743 WorkerProperty::update(property).exec(&txn).await?;
744 } else {
745 let property = worker_property::ActiveModel {
746 worker_id: Set(worker.worker_id),
747 parallelism: Set(add_property
748 .parallelism
749 .try_into()
750 .expect("invalid parallelism")),
751 is_streaming: Set(false),
752 is_serving: Set(false),
753 is_unschedulable: Set(false),
754 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
755 resource_group: Set(None),
756 is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
757 };
758
759 WorkerProperty::insert(property).exec(&txn).await?;
760 }
761 txn.commit().await?;
762 self.update_worker_ttl(worker.worker_id, ttl)?;
763 self.update_resource_and_started_at(worker.worker_id, resource)?;
764 Ok(worker.worker_id)
765 } else {
766 self.update_worker_ttl(worker.worker_id, ttl)?;
767 self.update_resource_and_started_at(worker.worker_id, resource)?;
768 Ok(worker.worker_id)
769 };
770 }
771
772 let txn_id = self.apply_transaction_id(r#type)?;
773
774 let worker = worker::ActiveModel {
775 worker_id: Default::default(),
776 worker_type: Set(r#type.into()),
777 host: Set(host_address.host.clone()),
778 port: Set(host_address.port),
779 status: Set(WorkerStatus::Starting),
780 transaction_id: Set(txn_id),
781 };
782 let insert_res = Worker::insert(worker).exec(&txn).await?;
783 let worker_id = insert_res.last_insert_id as WorkerId;
784 if r#type == PbWorkerType::ComputeNode
785 || r#type == PbWorkerType::Frontend
786 || r#type == PbWorkerType::Compactor
787 {
788 let (is_serving, is_streaming, is_unschedulable, is_iceberg_compactor, resource_group) =
789 match r#type {
790 PbWorkerType::ComputeNode => (
791 add_property.is_serving,
792 add_property.is_streaming,
793 add_property.is_unschedulable,
794 false,
795 add_property.resource_group.clone(),
796 ),
797 PbWorkerType::Frontend => (
798 add_property.is_serving,
799 add_property.is_streaming,
800 add_property.is_unschedulable,
801 false,
802 None,
803 ),
804 PbWorkerType::Compactor => {
805 (false, false, false, add_property.is_iceberg_compactor, None)
806 }
807 _ => unreachable!(),
808 };
809
810 let property = worker_property::ActiveModel {
811 worker_id: Set(worker_id),
812 parallelism: Set(add_property
813 .parallelism
814 .try_into()
815 .expect("invalid parallelism")),
816 is_streaming: Set(is_streaming),
817 is_serving: Set(is_serving),
818 is_unschedulable: Set(is_unschedulable),
819 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
820 resource_group: Set(resource_group),
821 is_iceberg_compactor: Set(is_iceberg_compactor),
822 };
823 WorkerProperty::insert(property).exec(&txn).await?;
824 }
825
826 txn.commit().await?;
827 if let Some(txn_id) = txn_id {
828 self.available_transactional_ids.retain(|id| *id != txn_id);
829 }
830 let extra_info = WorkerExtraInfo {
831 started_at: Some(timestamp_now_sec()),
832 expire_at: None,
833 resource,
834 };
835 self.worker_extra_info.insert(worker_id, extra_info);
836
837 Ok(worker_id)
838 }
839
840 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
841 let worker = worker::ActiveModel {
842 worker_id: Set(worker_id),
843 status: Set(WorkerStatus::Running),
844 ..Default::default()
845 };
846
847 let worker = worker.update(&self.db).await?;
848 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
849 .one(&self.db)
850 .await?;
851 let extra_info = self.get_extra_info_checked(worker_id)?;
852 Ok(WorkerInfo(worker, worker_property, extra_info).into())
853 }
854
855 pub async fn update_schedulability(
856 &self,
857 worker_ids: Vec<WorkerId>,
858 schedulability: Schedulability,
859 ) -> MetaResult<()> {
860 let is_unschedulable = schedulability == Schedulability::Unschedulable;
861 WorkerProperty::update_many()
862 .col_expr(
863 worker_property::Column::IsUnschedulable,
864 Expr::value(is_unschedulable),
865 )
866 .filter(worker_property::Column::WorkerId.is_in(worker_ids))
867 .exec(&self.db)
868 .await?;
869
870 Ok(())
871 }
872
873 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
874 let worker = Worker::find()
875 .filter(
876 worker::Column::Host
877 .eq(host_addr.host)
878 .and(worker::Column::Port.eq(host_addr.port)),
879 )
880 .find_also_related(WorkerProperty)
881 .one(&self.db)
882 .await?;
883 let Some((worker, property)) = worker else {
884 return Err(MetaError::invalid_parameter("worker not found!"));
885 };
886
887 let res = Worker::delete_by_id(worker.worker_id)
888 .exec(&self.db)
889 .await?;
890 if res.rows_affected == 0 {
891 return Err(MetaError::invalid_parameter("worker not found!"));
892 }
893
894 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
895 if let Some(txn_id) = &worker.transaction_id {
896 self.available_transactional_ids.push_back(*txn_id);
897 }
898 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
899
900 Ok(worker)
901 }
902
903 pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
904 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
905 worker_info.update_ttl(ttl);
906 Ok(())
907 } else {
908 Err(MetaError::invalid_worker(worker_id, "worker not found"))
909 }
910 }
911
912 pub async fn list_workers(
913 &self,
914 worker_type: Option<WorkerType>,
915 worker_status: Option<WorkerStatus>,
916 ) -> MetaResult<Vec<PbWorkerNode>> {
917 let mut find = Worker::find();
918 if let Some(worker_type) = worker_type {
919 find = find.filter(worker::Column::WorkerType.eq(worker_type));
920 }
921 if let Some(worker_status) = worker_status {
922 find = find.filter(worker::Column::Status.eq(worker_status));
923 }
924 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
925 Ok(workers
926 .into_iter()
927 .map(|(worker, property)| {
928 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
929 WorkerInfo(worker, property, extra_info).into()
930 })
931 .collect_vec())
932 }
933
934 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
935 let workers = Worker::find()
936 .filter(
937 worker::Column::WorkerType
938 .eq(WorkerType::ComputeNode)
939 .and(worker::Column::Status.eq(WorkerStatus::Running)),
940 )
941 .inner_join(WorkerProperty)
942 .select_also(WorkerProperty)
943 .filter(worker_property::Column::IsStreaming.eq(true))
944 .all(&self.db)
945 .await?;
946
947 Ok(workers
948 .into_iter()
949 .map(|(worker, property)| {
950 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
951 WorkerInfo(worker, property, extra_info).into()
952 })
953 .collect_vec())
954 }
955
956 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
957 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
958 .select_only()
959 .column(worker_property::Column::WorkerId)
960 .column(worker_property::Column::Parallelism)
961 .inner_join(Worker)
962 .filter(worker::Column::Status.eq(WorkerStatus::Running))
963 .into_tuple()
964 .all(&self.db)
965 .await?;
966 Ok(worker_parallelisms
967 .into_iter()
968 .flat_map(|(worker_id, parallelism)| {
969 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
970 })
971 .collect_vec())
972 }
973
974 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
975 let workers = Worker::find()
976 .filter(
977 worker::Column::WorkerType
978 .eq(WorkerType::ComputeNode)
979 .and(worker::Column::Status.eq(WorkerStatus::Running)),
980 )
981 .inner_join(WorkerProperty)
982 .select_also(WorkerProperty)
983 .filter(worker_property::Column::IsServing.eq(true))
984 .all(&self.db)
985 .await?;
986
987 Ok(workers
988 .into_iter()
989 .map(|(worker, property)| {
990 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
991 WorkerInfo(worker, property, extra_info).into()
992 })
993 .collect_vec())
994 }
995
996 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
997 let mut streaming_workers = self.list_active_streaming_workers().await?;
998
999 let unschedulable_workers: HashSet<_> = streaming_workers
1000 .extract_if(.., |worker| {
1001 worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
1002 })
1003 .map(|w| w.id)
1004 .collect();
1005
1006 let schedulable_workers = streaming_workers
1007 .iter()
1008 .map(|worker| worker.id)
1009 .filter(|id| !unschedulable_workers.contains(id))
1010 .collect();
1011
1012 let active_workers: HashMap<_, _> =
1013 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
1014
1015 Ok(StreamingClusterInfo {
1016 worker_nodes: active_workers,
1017 schedulable_workers,
1018 unschedulable_workers,
1019 })
1020 }
1021
1022 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
1023 let worker = Worker::find_by_id(worker_id)
1024 .find_also_related(WorkerProperty)
1025 .one(&self.db)
1026 .await?;
1027 if worker.is_none() {
1028 return Ok(None);
1029 }
1030 let extra_info = self.get_extra_info_checked(worker_id)?;
1031 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1032 }
1033
1034 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1035 self.worker_extra_info.get(&worker_id).cloned()
1036 }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use super::*;
1042
1043 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1044 (0..count)
1045 .map(|i| HostAddress {
1046 host: "localhost".to_owned(),
1047 port: 5000 + i as i32,
1048 })
1049 .collect_vec()
1050 }
1051
1052 #[tokio::test]
1053 async fn test_cluster_controller() -> MetaResult<()> {
1054 let env = MetaSrvEnv::for_test().await;
1055 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1056
1057 let parallelism_num = 4_usize;
1058 let worker_count = 5_usize;
1059 let property = AddNodeProperty {
1060 parallelism: parallelism_num as _,
1061 is_streaming: true,
1062 is_serving: true,
1063 is_unschedulable: false,
1064 ..Default::default()
1065 };
1066 let hosts = mock_worker_hosts_for_test(worker_count);
1067 let mut worker_ids = vec![];
1068 for host in &hosts {
1069 worker_ids.push(
1070 cluster_ctl
1071 .add_worker(
1072 PbWorkerType::ComputeNode,
1073 host.clone(),
1074 property.clone(),
1075 PbResource::default(),
1076 )
1077 .await?,
1078 );
1079 }
1080
1081 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1083
1084 for id in &worker_ids {
1085 cluster_ctl.activate_worker(*id).await?;
1086 }
1087 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1088 assert_eq!(
1089 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1090 worker_count
1091 );
1092 assert_eq!(
1093 cluster_ctl.list_active_streaming_workers().await?.len(),
1094 worker_count
1095 );
1096 assert_eq!(
1097 cluster_ctl.list_active_serving_workers().await?.len(),
1098 worker_count
1099 );
1100 assert_eq!(
1101 cluster_ctl.list_active_worker_slots().await?.len(),
1102 parallelism_num * worker_count
1103 );
1104
1105 let mut new_property = property.clone();
1107 new_property.parallelism = (parallelism_num * 2) as _;
1108 new_property.is_serving = false;
1109 cluster_ctl
1110 .add_worker(
1111 PbWorkerType::ComputeNode,
1112 hosts[0].clone(),
1113 new_property,
1114 PbResource::default(),
1115 )
1116 .await?;
1117
1118 assert_eq!(
1119 cluster_ctl.list_active_streaming_workers().await?.len(),
1120 worker_count
1121 );
1122 assert_eq!(
1123 cluster_ctl.list_active_serving_workers().await?.len(),
1124 worker_count - 1
1125 );
1126 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1127 assert!(worker_slots.iter().all_unique());
1128 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1129
1130 for host in hosts {
1132 cluster_ctl.delete_worker(host).await?;
1133 }
1134 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1135 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1136 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1137
1138 Ok(())
1139 }
1140
1141 #[tokio::test]
1142 async fn test_update_schedulability() -> MetaResult<()> {
1143 let env = MetaSrvEnv::for_test().await;
1144 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1145
1146 let host = HostAddress {
1147 host: "localhost".to_owned(),
1148 port: 5001,
1149 };
1150 let mut property = AddNodeProperty {
1151 is_streaming: true,
1152 is_serving: true,
1153 is_unschedulable: false,
1154 parallelism: 4,
1155 ..Default::default()
1156 };
1157 let worker_id = cluster_ctl
1158 .add_worker(
1159 PbWorkerType::ComputeNode,
1160 host.clone(),
1161 property.clone(),
1162 PbResource::default(),
1163 )
1164 .await?;
1165
1166 cluster_ctl.activate_worker(worker_id).await?;
1167 cluster_ctl
1168 .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1169 .await?;
1170
1171 let workers = cluster_ctl.list_active_streaming_workers().await?;
1172 assert_eq!(workers.len(), 1);
1173 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1174
1175 property.is_unschedulable = false;
1177 property.is_serving = false;
1178 let new_worker_id = cluster_ctl
1179 .add_worker(
1180 PbWorkerType::ComputeNode,
1181 host.clone(),
1182 property,
1183 PbResource::default(),
1184 )
1185 .await?;
1186 assert_eq!(worker_id, new_worker_id);
1187
1188 let workers = cluster_ctl.list_active_streaming_workers().await?;
1189 assert_eq!(workers.len(), 1);
1190 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1191
1192 cluster_ctl.delete_worker(host).await?;
1193
1194 Ok(())
1195 }
1196}