1use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37 ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
41use sea_orm::ActiveValue::Set;
42use sea_orm::prelude::Expr;
43use sea_orm::{
44 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
45 TransactionTrait,
46};
47use thiserror_ext::AsReport;
48use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
49use tokio::sync::oneshot::Sender;
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tokio::task::JoinHandle;
52
53use crate::controller::utils::filter_workers_by_resource_group;
54use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
55use crate::model::ClusterId;
56use crate::{MetaError, MetaResult};
57
58pub type ClusterControllerRef = Arc<ClusterController>;
59
60pub struct ClusterController {
61 env: MetaSrvEnv,
62 max_heartbeat_interval: Duration,
63 inner: RwLock<ClusterControllerInner>,
64 started_at: u64,
66}
67
68struct WorkerInfo(
69 worker::Model,
70 Option<worker_property::Model>,
71 WorkerExtraInfo,
72);
73
74impl From<WorkerInfo> for PbWorkerNode {
75 fn from(info: WorkerInfo) -> Self {
76 Self {
77 id: info.0.worker_id,
78 r#type: PbWorkerType::from(info.0.worker_type) as _,
79 host: Some(PbHostAddress {
80 host: info.0.host,
81 port: info.0.port,
82 }),
83 state: PbState::from(info.0.status) as _,
84 property: info.1.as_ref().map(|p| PbProperty {
85 is_streaming: p.is_streaming,
86 is_serving: p.is_serving,
87 is_unschedulable: p.is_unschedulable,
88 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
89 resource_group: p.resource_group.clone(),
90 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
91 is_iceberg_compactor: p.is_iceberg_compactor,
92 }),
93 transactional_id: info.0.transaction_id.map(|id| id as _),
94 resource: Some(info.2.resource),
95 started_at: info.2.started_at,
96 }
97 }
98}
99
100impl ClusterController {
101 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
102 let inner = ClusterControllerInner::new(
103 env.meta_store_ref().conn.clone(),
104 env.opts.disable_automatic_parallelism_control,
105 )
106 .await?;
107 Ok(Self {
108 env,
109 max_heartbeat_interval,
110 inner: RwLock::new(inner),
111 started_at: timestamp_now_sec(),
112 })
113 }
114
115 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
118 self.inner.read().await
119 }
120
121 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
122 self.inner.read().await.count_worker_by_type().await
123 }
124
125 pub async fn cluster_resource(&self) -> ClusterResource {
127 self.inner.read().await.cluster_resource()
128 }
129
130 async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
132 let resource = self.cluster_resource().await;
133
134 LicenseManager::get().update_cluster_resource(resource);
136 self.env.notification_manager().notify_all_without_version(
138 Operation::Update, Info::ClusterResource(resource),
140 );
141
142 Ok(())
143 }
144
145 pub async fn add_worker(
150 &self,
151 r#type: PbWorkerType,
152 host_address: HostAddress,
153 property: AddNodeProperty,
154 resource: PbResource,
155 ) -> MetaResult<WorkerId> {
156 let worker_id = self
157 .inner
158 .write()
159 .await
160 .add_worker(
161 r#type,
162 host_address,
163 property,
164 resource,
165 self.max_heartbeat_interval,
166 )
167 .await?;
168
169 self.update_cluster_resource_for_license().await?;
171
172 Ok(worker_id)
173 }
174
175 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
176 let inner = self.inner.write().await;
177 let worker = inner.activate_worker(worker_id).await?;
178
179 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
182 {
183 self.env
184 .notification_manager()
185 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
186 .await;
187 }
188 self.env
189 .notification_manager()
190 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
191
192 Ok(())
193 }
194
195 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
196 let worker = self.inner.write().await.delete_worker(host_address).await?;
197
198 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
199 {
200 self.env
201 .notification_manager()
202 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
203 .await;
204 }
205
206 self.update_cluster_resource_for_license().await?;
208
209 self.env
213 .notification_manager()
214 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
215
216 Ok(worker)
217 }
218
219 pub async fn update_schedulability(
220 &self,
221 worker_ids: Vec<WorkerId>,
222 schedulability: Schedulability,
223 ) -> MetaResult<()> {
224 self.inner
225 .write()
226 .await
227 .update_schedulability(worker_ids, schedulability)
228 .await
229 }
230
231 pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
233 tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
234 self.inner
235 .write()
236 .await
237 .heartbeat(worker_id, self.max_heartbeat_interval)
238 }
239
240 pub fn start_heartbeat_checker(
241 cluster_controller: ClusterControllerRef,
242 check_interval: Duration,
243 ) -> (JoinHandle<()>, Sender<()>) {
244 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
245 let join_handle = tokio::spawn(async move {
246 let mut min_interval = tokio::time::interval(check_interval);
247 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
248 loop {
249 tokio::select! {
250 _ = min_interval.tick() => {},
252 _ = &mut shutdown_rx => {
254 tracing::info!("Heartbeat checker is stopped");
255 return;
256 }
257 }
258
259 let mut inner = cluster_controller.inner.write().await;
260 for worker in inner
262 .worker_extra_info
263 .values_mut()
264 .filter(|worker| worker.expire_at.is_none())
265 {
266 worker.update_ttl(cluster_controller.max_heartbeat_interval);
267 }
268
269 let now = timestamp_now_sec();
271 let worker_to_delete = inner
272 .worker_extra_info
273 .iter()
274 .filter(|(_, info)| info.expire_at.unwrap() < now)
275 .map(|(id, _)| *id)
276 .collect_vec();
277
278 let worker_infos = match Worker::find()
280 .select_only()
281 .column(worker::Column::WorkerId)
282 .column(worker::Column::WorkerType)
283 .column(worker::Column::Host)
284 .column(worker::Column::Port)
285 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
286 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
287 .all(&inner.db)
288 .await
289 {
290 Ok(keys) => keys,
291 Err(err) => {
292 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
293 continue;
294 }
295 };
296 drop(inner);
297
298 for (worker_id, worker_type, host, port) in worker_infos {
299 let host_addr = PbHostAddress { host, port };
300 match cluster_controller.delete_worker(host_addr.clone()).await {
301 Ok(_) => {
302 tracing::warn!(
303 %worker_id,
304 ?host_addr,
305 %now,
306 "Deleted expired worker"
307 );
308 match worker_type {
309 WorkerType::Frontend
310 | WorkerType::ComputeNode
311 | WorkerType::Compactor
312 | WorkerType::RiseCtl => cluster_controller
313 .env
314 .notification_manager()
315 .delete_sender(worker_type.into(), WorkerKey(host_addr)),
316 _ => {}
317 };
318 }
319 Err(err) => {
320 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
321 }
322 }
323 }
324 }
325 });
326
327 (join_handle, shutdown_tx)
328 }
329
330 pub async fn list_workers(
335 &self,
336 worker_type: Option<WorkerType>,
337 worker_status: Option<WorkerStatus>,
338 ) -> MetaResult<Vec<PbWorkerNode>> {
339 let mut workers = vec![];
340 let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
343 let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
345 if include_meta {
346 workers.push(meta_node_info(
347 &self.env.opts.advertise_addr,
348 Some(self.started_at),
349 ));
350 }
351 workers.extend(
352 self.inner
353 .read()
354 .await
355 .list_workers(worker_type, worker_status)
356 .await?,
357 );
358 Ok(workers)
359 }
360
361 pub(crate) async fn subscribe_active_streaming_compute_nodes(
362 &self,
363 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
364 let inner = self.inner.read().await;
365 let worker_nodes = inner.list_active_streaming_workers().await?;
366 let (tx, rx) = unbounded_channel();
367
368 self.env.notification_manager().insert_local_sender(tx);
370 drop(inner);
371 Ok((worker_nodes, rx))
372 }
373
374 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
377 self.inner
378 .read()
379 .await
380 .list_active_streaming_workers()
381 .await
382 }
383
384 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
385 self.inner.read().await.list_active_worker_slots().await
386 }
387
388 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
391 self.inner.read().await.list_active_serving_workers().await
392 }
393
394 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
396 self.inner.read().await.get_streaming_cluster_info().await
397 }
398
399 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
400 self.inner.read().await.get_worker_by_id(worker_id).await
401 }
402
403 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
404 self.inner
405 .read()
406 .await
407 .get_worker_extra_info_by_id(worker_id)
408 }
409
410 pub fn cluster_id(&self) -> &ClusterId {
411 self.env.cluster_id()
412 }
413
414 pub fn meta_store_endpoint(&self) -> String {
415 self.env.meta_store_ref().endpoint.clone()
416 }
417}
418
419#[derive(Debug, Clone)]
421pub struct StreamingClusterInfo {
422 pub worker_nodes: HashMap<WorkerId, WorkerNode>,
424
425 pub schedulable_workers: HashSet<WorkerId>,
427
428 pub unschedulable_workers: HashSet<WorkerId>,
430}
431
432impl StreamingClusterInfo {
434 pub fn parallelism(&self, resource_group: &str) -> usize {
435 let available_worker_ids =
436 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
437
438 self.worker_nodes
439 .values()
440 .filter(|worker| available_worker_ids.contains(&(worker.id)))
441 .map(|worker| worker.compute_node_parallelism())
442 .sum()
443 }
444
445 pub fn filter_schedulable_workers_by_resource_group(
446 &self,
447 resource_group: &str,
448 ) -> HashMap<WorkerId, WorkerNode> {
449 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
450 self.worker_nodes
451 .iter()
452 .filter(|(id, _)| worker_ids.contains(*id))
453 .map(|(id, worker)| (*id, worker.clone()))
454 .collect()
455 }
456}
457
458#[derive(Default, Clone, Debug)]
459pub struct WorkerExtraInfo {
460 expire_at: Option<u64>,
464 started_at: Option<u64>,
465 resource: PbResource,
466}
467
468impl WorkerExtraInfo {
469 fn update_ttl(&mut self, ttl: Duration) {
470 let expire = cmp::max(
471 self.expire_at.unwrap_or_default(),
472 SystemTime::now()
473 .add(ttl)
474 .duration_since(SystemTime::UNIX_EPOCH)
475 .expect("Clock may have gone backwards")
476 .as_secs(),
477 );
478 self.expire_at = Some(expire);
479 }
480
481 fn update_started_at(&mut self) {
482 self.started_at = Some(timestamp_now_sec());
483 }
484}
485
486fn timestamp_now_sec() -> u64 {
487 SystemTime::now()
488 .duration_since(SystemTime::UNIX_EPOCH)
489 .expect("Clock may have gone backwards")
490 .as_secs()
491}
492
493fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
494 PbWorkerNode {
495 id: META_NODE_ID,
496 r#type: PbWorkerType::Meta.into(),
497 host: HostAddr::try_from(host)
498 .as_ref()
499 .map(HostAddr::to_protobuf)
500 .ok(),
501 state: PbState::Running as _,
502 property: None,
503 transactional_id: None,
504 resource: Some(risingwave_pb::common::worker_node::Resource {
505 rw_version: RW_VERSION.to_owned(),
506 total_memory_bytes: system_memory_available_bytes() as _,
507 total_cpu_cores: total_cpu_available() as _,
508 hostname: hostname(),
509 }),
510 started_at,
511 }
512}
513
514pub struct ClusterControllerInner {
515 db: DatabaseConnection,
516 available_transactional_ids: VecDeque<TransactionId>,
518 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
519 disable_automatic_parallelism_control: bool,
520}
521
522impl ClusterControllerInner {
523 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
524 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
525
526 pub async fn new(
527 db: DatabaseConnection,
528 disable_automatic_parallelism_control: bool,
529 ) -> MetaResult<Self> {
530 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
531 .select_only()
532 .column(worker::Column::WorkerId)
533 .column(worker::Column::TransactionId)
534 .into_tuple()
535 .all(&db)
536 .await?;
537 let inuse_txn_ids: HashSet<_> = workers
538 .iter()
539 .cloned()
540 .filter_map(|(_, txn_id)| txn_id)
541 .collect();
542 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
543 .filter(|id| !inuse_txn_ids.contains(id))
544 .collect();
545
546 let worker_extra_info = workers
547 .into_iter()
548 .map(|(w, _)| (w, WorkerExtraInfo::default()))
549 .collect();
550
551 Ok(Self {
552 db,
553 available_transactional_ids,
554 worker_extra_info,
555 disable_automatic_parallelism_control,
556 })
557 }
558
559 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
560 let workers: Vec<(WorkerType, i64)> = Worker::find()
561 .select_only()
562 .column(worker::Column::WorkerType)
563 .column_as(worker::Column::WorkerId.count(), "count")
564 .group_by(worker::Column::WorkerType)
565 .into_tuple()
566 .all(&self.db)
567 .await?;
568
569 Ok(workers.into_iter().collect())
570 }
571
572 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
573 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
574 let expire = cmp::max(
575 info.expire_at.unwrap_or_default(),
576 SystemTime::now()
577 .add(ttl)
578 .duration_since(SystemTime::UNIX_EPOCH)
579 .expect("Clock may have gone backwards")
580 .as_secs(),
581 );
582 info.expire_at = Some(expire);
583 Ok(())
584 } else {
585 Err(MetaError::invalid_worker(worker_id, "worker not found"))
586 }
587 }
588
589 fn update_resource_and_started_at(
590 &mut self,
591 worker_id: WorkerId,
592 resource: PbResource,
593 ) -> MetaResult<()> {
594 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
595 info.resource = resource;
596 info.update_started_at();
597 Ok(())
598 } else {
599 Err(MetaError::invalid_worker(worker_id, "worker not found"))
600 }
601 }
602
603 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
604 self.worker_extra_info
605 .get(&worker_id)
606 .cloned()
607 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
608 }
609
610 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
611 match (self.available_transactional_ids.front(), r#type) {
612 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
613 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
615 _ => Ok(None),
616 }
617 }
618
619 fn cluster_resource(&self) -> ClusterResource {
621 let mut per_host = HashMap::new();
623
624 per_host.insert(
627 hostname(),
628 ClusterResource {
629 total_cpu_cores: total_cpu_available() as _,
630 total_memory_bytes: system_memory_available_bytes() as _,
631 },
632 );
633
634 for info in self.worker_extra_info.values() {
635 let r = per_host
636 .entry(info.resource.hostname.clone())
637 .or_insert_with(ClusterResource::default);
638
639 r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
640 r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
641 }
642
643 per_host
645 .into_values()
646 .reduce(|a, b| ClusterResource {
647 total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
648 total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
649 })
650 .unwrap_or_default()
651 }
652
653 pub async fn add_worker(
654 &mut self,
655 r#type: PbWorkerType,
656 host_address: HostAddress,
657 add_property: AddNodeProperty,
658 resource: PbResource,
659 ttl: Duration,
660 ) -> MetaResult<WorkerId> {
661 let txn = self.db.begin().await?;
662
663 let worker = Worker::find()
664 .filter(
665 worker::Column::Host
666 .eq(host_address.host.clone())
667 .and(worker::Column::Port.eq(host_address.port)),
668 )
669 .find_also_related(WorkerProperty)
670 .one(&txn)
671 .await?;
672 if let Some((worker, property)) = worker {
674 assert_eq!(worker.worker_type, r#type.into());
675 return if worker.worker_type == WorkerType::ComputeNode {
676 let property = property.unwrap();
677 let mut current_parallelism = property.parallelism as usize;
678 let new_parallelism = add_property.parallelism as usize;
679 match new_parallelism.cmp(¤t_parallelism) {
680 Ordering::Less => {
681 if !self.disable_automatic_parallelism_control {
682 tracing::info!(
684 "worker {} parallelism reduced from {} to {}",
685 worker.worker_id,
686 current_parallelism,
687 new_parallelism
688 );
689 current_parallelism = new_parallelism;
690 } else {
691 tracing::warn!(
694 "worker {} parallelism is less than current, current is {}, but received {}",
695 worker.worker_id,
696 current_parallelism,
697 new_parallelism
698 );
699 }
700 }
701 Ordering::Greater => {
702 tracing::info!(
703 "worker {} parallelism updated from {} to {}",
704 worker.worker_id,
705 current_parallelism,
706 new_parallelism
707 );
708 current_parallelism = new_parallelism;
709 }
710 Ordering::Equal => {}
711 }
712 let mut property: worker_property::ActiveModel = property.into();
713
714 property.is_streaming = Set(add_property.is_streaming);
716 property.is_serving = Set(add_property.is_serving);
717 property.parallelism = Set(current_parallelism as _);
718 property.resource_group =
719 Set(Some(add_property.resource_group.unwrap_or_else(|| {
720 tracing::warn!(
721 "resource_group is not set for worker {}, fallback to `default`",
722 worker.worker_id
723 );
724 DEFAULT_RESOURCE_GROUP.to_owned()
725 })));
726
727 WorkerProperty::update(property).exec(&txn).await?;
728 txn.commit().await?;
729 self.update_worker_ttl(worker.worker_id, ttl)?;
730 self.update_resource_and_started_at(worker.worker_id, resource)?;
731 Ok(worker.worker_id)
732 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
733 let worker_property = worker_property::ActiveModel {
734 worker_id: Set(worker.worker_id),
735 parallelism: Set(add_property
736 .parallelism
737 .try_into()
738 .expect("invalid parallelism")),
739 is_streaming: Set(add_property.is_streaming),
740 is_serving: Set(add_property.is_serving),
741 is_unschedulable: Set(add_property.is_unschedulable),
742 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
743 resource_group: Set(None),
744 is_iceberg_compactor: Set(false),
745 };
746 WorkerProperty::insert(worker_property).exec(&txn).await?;
747 txn.commit().await?;
748 self.update_worker_ttl(worker.worker_id, ttl)?;
749 self.update_resource_and_started_at(worker.worker_id, resource)?;
750 Ok(worker.worker_id)
751 } else if worker.worker_type == WorkerType::Compactor {
752 if let Some(property) = property {
753 let mut property: worker_property::ActiveModel = property.into();
754 property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
755 property.internal_rpc_host_addr =
756 Set(Some(add_property.internal_rpc_host_addr));
757
758 WorkerProperty::update(property).exec(&txn).await?;
759 } else {
760 let property = worker_property::ActiveModel {
761 worker_id: Set(worker.worker_id),
762 parallelism: Set(add_property
763 .parallelism
764 .try_into()
765 .expect("invalid parallelism")),
766 is_streaming: Set(false),
767 is_serving: Set(false),
768 is_unschedulable: Set(false),
769 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
770 resource_group: Set(None),
771 is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
772 };
773
774 WorkerProperty::insert(property).exec(&txn).await?;
775 }
776 txn.commit().await?;
777 self.update_worker_ttl(worker.worker_id, ttl)?;
778 self.update_resource_and_started_at(worker.worker_id, resource)?;
779 Ok(worker.worker_id)
780 } else {
781 self.update_worker_ttl(worker.worker_id, ttl)?;
782 self.update_resource_and_started_at(worker.worker_id, resource)?;
783 Ok(worker.worker_id)
784 };
785 }
786
787 let txn_id = self.apply_transaction_id(r#type)?;
788
789 let worker = worker::ActiveModel {
790 worker_id: Default::default(),
791 worker_type: Set(r#type.into()),
792 host: Set(host_address.host.clone()),
793 port: Set(host_address.port),
794 status: Set(WorkerStatus::Starting),
795 transaction_id: Set(txn_id),
796 };
797 let insert_res = Worker::insert(worker).exec(&txn).await?;
798 let worker_id = insert_res.last_insert_id as WorkerId;
799 if r#type == PbWorkerType::ComputeNode
800 || r#type == PbWorkerType::Frontend
801 || r#type == PbWorkerType::Compactor
802 {
803 let (is_serving, is_streaming, is_unschedulable, is_iceberg_compactor, resource_group) =
804 match r#type {
805 PbWorkerType::ComputeNode => (
806 add_property.is_serving,
807 add_property.is_streaming,
808 add_property.is_unschedulable,
809 false,
810 add_property.resource_group.clone(),
811 ),
812 PbWorkerType::Frontend => (
813 add_property.is_serving,
814 add_property.is_streaming,
815 add_property.is_unschedulable,
816 false,
817 None,
818 ),
819 PbWorkerType::Compactor => {
820 (false, false, false, add_property.is_iceberg_compactor, None)
821 }
822 _ => unreachable!(),
823 };
824
825 let property = worker_property::ActiveModel {
826 worker_id: Set(worker_id),
827 parallelism: Set(add_property
828 .parallelism
829 .try_into()
830 .expect("invalid parallelism")),
831 is_streaming: Set(is_streaming),
832 is_serving: Set(is_serving),
833 is_unschedulable: Set(is_unschedulable),
834 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
835 resource_group: Set(resource_group),
836 is_iceberg_compactor: Set(is_iceberg_compactor),
837 };
838 WorkerProperty::insert(property).exec(&txn).await?;
839 }
840
841 txn.commit().await?;
842 if let Some(txn_id) = txn_id {
843 self.available_transactional_ids.retain(|id| *id != txn_id);
844 }
845 let extra_info = WorkerExtraInfo {
846 started_at: Some(timestamp_now_sec()),
847 expire_at: None,
848 resource,
849 };
850 self.worker_extra_info.insert(worker_id, extra_info);
851
852 Ok(worker_id)
853 }
854
855 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
856 let worker = worker::ActiveModel {
857 worker_id: Set(worker_id),
858 status: Set(WorkerStatus::Running),
859 ..Default::default()
860 };
861
862 let worker = worker.update(&self.db).await?;
863 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
864 .one(&self.db)
865 .await?;
866 let extra_info = self.get_extra_info_checked(worker_id)?;
867 Ok(WorkerInfo(worker, worker_property, extra_info).into())
868 }
869
870 pub async fn update_schedulability(
871 &self,
872 worker_ids: Vec<WorkerId>,
873 schedulability: Schedulability,
874 ) -> MetaResult<()> {
875 let is_unschedulable = schedulability == Schedulability::Unschedulable;
876 WorkerProperty::update_many()
877 .col_expr(
878 worker_property::Column::IsUnschedulable,
879 Expr::value(is_unschedulable),
880 )
881 .filter(worker_property::Column::WorkerId.is_in(worker_ids))
882 .exec(&self.db)
883 .await?;
884
885 Ok(())
886 }
887
888 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
889 let worker = Worker::find()
890 .filter(
891 worker::Column::Host
892 .eq(host_addr.host)
893 .and(worker::Column::Port.eq(host_addr.port)),
894 )
895 .find_also_related(WorkerProperty)
896 .one(&self.db)
897 .await?;
898 let Some((worker, property)) = worker else {
899 return Err(MetaError::invalid_parameter("worker not found!"));
900 };
901
902 let res = Worker::delete_by_id(worker.worker_id)
903 .exec(&self.db)
904 .await?;
905 if res.rows_affected == 0 {
906 return Err(MetaError::invalid_parameter("worker not found!"));
907 }
908
909 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
910 if let Some(txn_id) = &worker.transaction_id {
911 self.available_transactional_ids.push_back(*txn_id);
912 }
913 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
914
915 Ok(worker)
916 }
917
918 pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
919 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
920 worker_info.update_ttl(ttl);
921 Ok(())
922 } else {
923 Err(MetaError::invalid_worker(worker_id, "worker not found"))
924 }
925 }
926
927 pub async fn list_workers(
928 &self,
929 worker_type: Option<WorkerType>,
930 worker_status: Option<WorkerStatus>,
931 ) -> MetaResult<Vec<PbWorkerNode>> {
932 let mut find = Worker::find();
933 if let Some(worker_type) = worker_type {
934 find = find.filter(worker::Column::WorkerType.eq(worker_type));
935 }
936 if let Some(worker_status) = worker_status {
937 find = find.filter(worker::Column::Status.eq(worker_status));
938 }
939 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
940 Ok(workers
941 .into_iter()
942 .map(|(worker, property)| {
943 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
944 WorkerInfo(worker, property, extra_info).into()
945 })
946 .collect_vec())
947 }
948
949 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
950 let workers = Worker::find()
951 .filter(
952 worker::Column::WorkerType
953 .eq(WorkerType::ComputeNode)
954 .and(worker::Column::Status.eq(WorkerStatus::Running)),
955 )
956 .inner_join(WorkerProperty)
957 .select_also(WorkerProperty)
958 .filter(worker_property::Column::IsStreaming.eq(true))
959 .all(&self.db)
960 .await?;
961
962 Ok(workers
963 .into_iter()
964 .map(|(worker, property)| {
965 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
966 WorkerInfo(worker, property, extra_info).into()
967 })
968 .collect_vec())
969 }
970
971 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
972 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
973 .select_only()
974 .column(worker_property::Column::WorkerId)
975 .column(worker_property::Column::Parallelism)
976 .inner_join(Worker)
977 .filter(worker::Column::Status.eq(WorkerStatus::Running))
978 .into_tuple()
979 .all(&self.db)
980 .await?;
981 Ok(worker_parallelisms
982 .into_iter()
983 .flat_map(|(worker_id, parallelism)| {
984 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
985 })
986 .collect_vec())
987 }
988
989 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
990 let workers = Worker::find()
991 .filter(
992 worker::Column::WorkerType
993 .eq(WorkerType::ComputeNode)
994 .and(worker::Column::Status.eq(WorkerStatus::Running)),
995 )
996 .inner_join(WorkerProperty)
997 .select_also(WorkerProperty)
998 .filter(worker_property::Column::IsServing.eq(true))
999 .all(&self.db)
1000 .await?;
1001
1002 Ok(workers
1003 .into_iter()
1004 .map(|(worker, property)| {
1005 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
1006 WorkerInfo(worker, property, extra_info).into()
1007 })
1008 .collect_vec())
1009 }
1010
1011 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
1012 let mut streaming_workers = self.list_active_streaming_workers().await?;
1013
1014 let unschedulable_workers: HashSet<_> = streaming_workers
1015 .extract_if(.., |worker| {
1016 worker.property.as_ref().is_some_and(|p| p.is_unschedulable)
1017 })
1018 .map(|w| w.id)
1019 .collect();
1020
1021 let schedulable_workers = streaming_workers
1022 .iter()
1023 .map(|worker| worker.id)
1024 .filter(|id| !unschedulable_workers.contains(id))
1025 .collect();
1026
1027 let active_workers: HashMap<_, _> =
1028 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
1029
1030 Ok(StreamingClusterInfo {
1031 worker_nodes: active_workers,
1032 schedulable_workers,
1033 unschedulable_workers,
1034 })
1035 }
1036
1037 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
1038 let worker = Worker::find_by_id(worker_id)
1039 .find_also_related(WorkerProperty)
1040 .one(&self.db)
1041 .await?;
1042 if worker.is_none() {
1043 return Ok(None);
1044 }
1045 let extra_info = self.get_extra_info_checked(worker_id)?;
1046 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
1047 }
1048
1049 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1050 self.worker_extra_info.get(&worker_id).cloned()
1051 }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 use super::*;
1057
1058 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1059 (0..count)
1060 .map(|i| HostAddress {
1061 host: "localhost".to_owned(),
1062 port: 5000 + i as i32,
1063 })
1064 .collect_vec()
1065 }
1066
1067 #[tokio::test]
1068 async fn test_cluster_controller() -> MetaResult<()> {
1069 let env = MetaSrvEnv::for_test().await;
1070 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1071
1072 let parallelism_num = 4_usize;
1073 let worker_count = 5_usize;
1074 let property = AddNodeProperty {
1075 parallelism: parallelism_num as _,
1076 is_streaming: true,
1077 is_serving: true,
1078 is_unschedulable: false,
1079 ..Default::default()
1080 };
1081 let hosts = mock_worker_hosts_for_test(worker_count);
1082 let mut worker_ids = vec![];
1083 for host in &hosts {
1084 worker_ids.push(
1085 cluster_ctl
1086 .add_worker(
1087 PbWorkerType::ComputeNode,
1088 host.clone(),
1089 property.clone(),
1090 PbResource::default(),
1091 )
1092 .await?,
1093 );
1094 }
1095
1096 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1098
1099 for id in &worker_ids {
1100 cluster_ctl.activate_worker(*id).await?;
1101 }
1102 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1103 assert_eq!(
1104 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1105 worker_count
1106 );
1107 assert_eq!(
1108 cluster_ctl.list_active_streaming_workers().await?.len(),
1109 worker_count
1110 );
1111 assert_eq!(
1112 cluster_ctl.list_active_serving_workers().await?.len(),
1113 worker_count
1114 );
1115 assert_eq!(
1116 cluster_ctl.list_active_worker_slots().await?.len(),
1117 parallelism_num * worker_count
1118 );
1119
1120 let mut new_property = property.clone();
1122 new_property.parallelism = (parallelism_num * 2) as _;
1123 new_property.is_serving = false;
1124 cluster_ctl
1125 .add_worker(
1126 PbWorkerType::ComputeNode,
1127 hosts[0].clone(),
1128 new_property,
1129 PbResource::default(),
1130 )
1131 .await?;
1132
1133 assert_eq!(
1134 cluster_ctl.list_active_streaming_workers().await?.len(),
1135 worker_count
1136 );
1137 assert_eq!(
1138 cluster_ctl.list_active_serving_workers().await?.len(),
1139 worker_count - 1
1140 );
1141 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1142 assert!(worker_slots.iter().all_unique());
1143 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1144
1145 for host in hosts {
1147 cluster_ctl.delete_worker(host).await?;
1148 }
1149 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1150 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1151 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1152
1153 Ok(())
1154 }
1155
1156 #[tokio::test]
1157 async fn test_update_schedulability() -> MetaResult<()> {
1158 let env = MetaSrvEnv::for_test().await;
1159 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1160
1161 let host = HostAddress {
1162 host: "localhost".to_owned(),
1163 port: 5001,
1164 };
1165 let mut property = AddNodeProperty {
1166 is_streaming: true,
1167 is_serving: true,
1168 is_unschedulable: false,
1169 parallelism: 4,
1170 ..Default::default()
1171 };
1172 let worker_id = cluster_ctl
1173 .add_worker(
1174 PbWorkerType::ComputeNode,
1175 host.clone(),
1176 property.clone(),
1177 PbResource::default(),
1178 )
1179 .await?;
1180
1181 cluster_ctl.activate_worker(worker_id).await?;
1182 cluster_ctl
1183 .update_schedulability(vec![worker_id], Schedulability::Unschedulable)
1184 .await?;
1185
1186 let workers = cluster_ctl.list_active_streaming_workers().await?;
1187 assert_eq!(workers.len(), 1);
1188 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1189
1190 property.is_unschedulable = false;
1192 property.is_serving = false;
1193 let new_worker_id = cluster_ctl
1194 .add_worker(
1195 PbWorkerType::ComputeNode,
1196 host.clone(),
1197 property,
1198 PbResource::default(),
1199 )
1200 .await?;
1201 assert_eq!(worker_id, new_worker_id);
1202
1203 let workers = cluster_ctl.list_active_streaming_workers().await?;
1204 assert_eq!(workers.len(), 1);
1205 assert!(workers[0].property.as_ref().unwrap().is_unschedulable);
1206
1207 cluster_ctl.delete_worker(host).await?;
1208
1209 Ok(())
1210 }
1211
1212 #[tokio::test]
1213 async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1214 let env = MetaSrvEnv::for_test().await;
1215 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1216
1217 let workers = cluster_ctl.list_workers(None, None).await?;
1219 assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1220
1221 let workers = cluster_ctl
1223 .list_workers(Some(WorkerType::Meta), None)
1224 .await?;
1225 assert_eq!(workers.len(), 1);
1226 assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1227
1228 let workers = cluster_ctl
1230 .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1231 .await?;
1232 assert!(workers.is_empty());
1233
1234 Ok(())
1235 }
1236}