1use std::cmp::{self, Ordering, max};
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::ops::Add;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20
21use itertools::Itertools;
22use risingwave_common::RW_VERSION;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_common::util::addr::HostAddr;
25use risingwave_common::util::resource_util::cpu::total_cpu_available;
26use risingwave_common::util::resource_util::hostname;
27use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
28use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::prelude::{Worker, WorkerProperty};
31use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
32use risingwave_meta_model::{TransactionId, WorkerId, worker, worker_property};
33use risingwave_pb::common::worker_node::{
34 PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
35};
36use risingwave_pb::common::{
37 ClusterResource, HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode,
38};
39use risingwave_pb::meta::subscribe_response::{Info, Operation};
40use sea_orm::ActiveValue::Set;
41use sea_orm::{
42 ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
43 TransactionTrait,
44};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
47use tokio::sync::oneshot::Sender;
48use tokio::sync::{RwLock, RwLockReadGuard};
49use tokio::task::JoinHandle;
50
51use crate::controller::utils::filter_workers_by_resource_group;
52use crate::manager::{LocalNotification, META_NODE_ID, MetaSrvEnv, WorkerKey};
53use crate::model::ClusterId;
54use crate::{MetaError, MetaResult};
55
56pub type ClusterControllerRef = Arc<ClusterController>;
57
58pub struct ClusterController {
59 env: MetaSrvEnv,
60 max_heartbeat_interval: Duration,
61 inner: RwLock<ClusterControllerInner>,
62 started_at: u64,
64}
65
66struct WorkerInfo(
67 worker::Model,
68 Option<worker_property::Model>,
69 WorkerExtraInfo,
70);
71
72impl From<WorkerInfo> for PbWorkerNode {
73 fn from(info: WorkerInfo) -> Self {
74 Self {
75 id: info.0.worker_id,
76 r#type: PbWorkerType::from(info.0.worker_type) as _,
77 host: Some(PbHostAddress {
78 host: info.0.host,
79 port: info.0.port,
80 }),
81 state: PbState::from(info.0.status) as _,
82 property: info.1.as_ref().map(|p| PbProperty {
83 is_streaming: p.is_streaming,
84 is_serving: p.is_serving,
85 internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
86 resource_group: p.resource_group.clone(),
87 parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
88 is_iceberg_compactor: p.is_iceberg_compactor,
89 }),
90 transactional_id: info.0.transaction_id.map(|id| id as _),
91 resource: Some(info.2.resource),
92 started_at: info.2.started_at,
93 }
94 }
95}
96
97impl ClusterController {
98 pub async fn new(env: MetaSrvEnv, max_heartbeat_interval: Duration) -> MetaResult<Self> {
99 let inner = ClusterControllerInner::new(
100 env.meta_store_ref().conn.clone(),
101 env.opts.disable_automatic_parallelism_control,
102 )
103 .await?;
104 Ok(Self {
105 env,
106 max_heartbeat_interval,
107 inner: RwLock::new(inner),
108 started_at: timestamp_now_sec(),
109 })
110 }
111
112 pub async fn get_inner_read_guard(&self) -> RwLockReadGuard<'_, ClusterControllerInner> {
115 self.inner.read().await
116 }
117
118 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
119 self.inner.read().await.count_worker_by_type().await
120 }
121
122 pub async fn cluster_resource(&self) -> ClusterResource {
124 self.inner.read().await.cluster_resource()
125 }
126
127 async fn update_cluster_resource_for_license(&self) -> MetaResult<()> {
129 let resource = self.cluster_resource().await;
130
131 LicenseManager::get().update_cluster_resource(resource);
133 self.env.notification_manager().notify_all_without_version(
135 Operation::Update, Info::ClusterResource(resource),
137 );
138
139 Ok(())
140 }
141
142 pub async fn add_worker(
147 &self,
148 r#type: PbWorkerType,
149 host_address: HostAddress,
150 property: AddNodeProperty,
151 resource: PbResource,
152 ) -> MetaResult<WorkerId> {
153 let worker_id = self
154 .inner
155 .write()
156 .await
157 .add_worker(
158 r#type,
159 host_address,
160 property,
161 resource,
162 self.max_heartbeat_interval,
163 )
164 .await?;
165
166 self.update_cluster_resource_for_license().await?;
168
169 Ok(worker_id)
170 }
171
172 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<()> {
173 let inner = self.inner.write().await;
174 let worker = inner.activate_worker(worker_id).await?;
175
176 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
179 {
180 self.env
181 .notification_manager()
182 .notify_frontend(Operation::Add, Info::Node(worker.clone()))
183 .await;
184 }
185 self.env
186 .notification_manager()
187 .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker));
188
189 Ok(())
190 }
191
192 pub async fn delete_worker(&self, host_address: HostAddress) -> MetaResult<WorkerNode> {
193 let worker = self.inner.write().await.delete_worker(host_address).await?;
194
195 if worker.r#type() == PbWorkerType::ComputeNode || worker.r#type() == PbWorkerType::Frontend
196 {
197 self.env
198 .notification_manager()
199 .notify_frontend(Operation::Delete, Info::Node(worker.clone()))
200 .await;
201 }
202
203 self.update_cluster_resource_for_license().await?;
205
206 self.env
210 .notification_manager()
211 .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker.clone()));
212
213 Ok(worker)
214 }
215
216 pub async fn heartbeat(
218 &self,
219 worker_id: WorkerId,
220 resource: Option<PbResource>,
221 ) -> MetaResult<()> {
222 tracing::trace!(target: "events::meta::server_heartbeat", %worker_id, "receive heartbeat");
223 self.inner
224 .write()
225 .await
226 .heartbeat(worker_id, self.max_heartbeat_interval, resource)
227 }
228
229 pub fn start_heartbeat_checker(
230 cluster_controller: ClusterControllerRef,
231 check_interval: Duration,
232 ) -> (JoinHandle<()>, Sender<()>) {
233 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
234 let join_handle = tokio::spawn(async move {
235 let mut min_interval = tokio::time::interval(check_interval);
236 min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
237 loop {
238 tokio::select! {
239 _ = min_interval.tick() => {},
241 _ = &mut shutdown_rx => {
243 tracing::info!("Heartbeat checker is stopped");
244 return;
245 }
246 }
247
248 let mut inner = cluster_controller.inner.write().await;
249 for worker in inner
251 .worker_extra_info
252 .values_mut()
253 .filter(|worker| worker.expire_at.is_none())
254 {
255 worker.update_ttl(cluster_controller.max_heartbeat_interval);
256 }
257
258 let now = timestamp_now_sec();
260 let worker_to_delete = inner
261 .worker_extra_info
262 .iter()
263 .filter(|(_, info)| info.expire_at.unwrap() < now)
264 .map(|(id, _)| *id)
265 .collect_vec();
266
267 let worker_infos = match Worker::find()
269 .select_only()
270 .column(worker::Column::WorkerId)
271 .column(worker::Column::WorkerType)
272 .column(worker::Column::Host)
273 .column(worker::Column::Port)
274 .filter(worker::Column::WorkerId.is_in(worker_to_delete.clone()))
275 .into_tuple::<(WorkerId, WorkerType, String, i32)>()
276 .all(&inner.db)
277 .await
278 {
279 Ok(keys) => keys,
280 Err(err) => {
281 tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db");
282 continue;
283 }
284 };
285 drop(inner);
286
287 for (worker_id, worker_type, host, port) in worker_infos {
288 let host_addr = PbHostAddress { host, port };
289 match cluster_controller.delete_worker(host_addr.clone()).await {
290 Ok(_) => {
291 tracing::warn!(
292 %worker_id,
293 ?host_addr,
294 %now,
295 "Deleted expired worker"
296 );
297 match worker_type {
298 WorkerType::Frontend
299 | WorkerType::ComputeNode
300 | WorkerType::Compactor
301 | WorkerType::RiseCtl => cluster_controller
302 .env
303 .notification_manager()
304 .delete_sender(worker_type.into(), WorkerKey(host_addr)),
305 _ => {}
306 };
307 }
308 Err(err) => {
309 tracing::warn!(error = %err.as_report(), "Failed to delete expire worker from db");
310 }
311 }
312 }
313 }
314 });
315
316 (join_handle, shutdown_tx)
317 }
318
319 pub async fn list_workers(
324 &self,
325 worker_type: Option<WorkerType>,
326 worker_status: Option<WorkerStatus>,
327 ) -> MetaResult<Vec<PbWorkerNode>> {
328 let mut workers = vec![];
329 let include_meta = worker_type.is_none() || worker_type == Some(WorkerType::Meta);
332 let include_meta = include_meta && worker_status != Some(WorkerStatus::Starting);
334 if include_meta {
335 workers.push(meta_node_info(
336 &self.env.opts.advertise_addr,
337 Some(self.started_at),
338 ));
339 }
340 workers.extend(
341 self.inner
342 .read()
343 .await
344 .list_workers(worker_type, worker_status)
345 .await?,
346 );
347 Ok(workers)
348 }
349
350 pub(crate) async fn subscribe_active_streaming_compute_nodes(
351 &self,
352 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
353 let inner = self.inner.read().await;
354 let worker_nodes = inner.list_active_streaming_workers().await?;
355 let (tx, rx) = unbounded_channel();
356
357 self.env.notification_manager().insert_local_sender(tx);
359 drop(inner);
360 Ok((worker_nodes, rx))
361 }
362
363 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
366 self.inner
367 .read()
368 .await
369 .list_active_streaming_workers()
370 .await
371 }
372
373 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
374 self.inner.read().await.list_active_worker_slots().await
375 }
376
377 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
379 self.inner.read().await.list_active_serving_workers().await
380 }
381
382 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
384 self.inner.read().await.get_streaming_cluster_info().await
385 }
386
387 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
388 self.inner.read().await.get_worker_by_id(worker_id).await
389 }
390
391 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
392 self.inner
393 .read()
394 .await
395 .get_worker_extra_info_by_id(worker_id)
396 }
397
398 pub fn cluster_id(&self) -> &ClusterId {
399 self.env.cluster_id()
400 }
401
402 pub fn meta_store_endpoint(&self) -> String {
403 self.env.meta_store_ref().endpoint.clone()
404 }
405}
406
407#[derive(Debug, Clone)]
409pub struct StreamingClusterInfo {
410 pub worker_nodes: HashMap<WorkerId, WorkerNode>,
412}
413
414impl StreamingClusterInfo {
416 pub fn parallelism(&self, resource_group: &str) -> usize {
417 let available_worker_ids =
418 filter_workers_by_resource_group(&self.worker_nodes, resource_group);
419
420 self.worker_nodes
421 .values()
422 .filter(|worker| available_worker_ids.contains(&(worker.id)))
423 .map(|worker| worker.compute_node_parallelism())
424 .sum()
425 }
426
427 pub fn filter_workers_by_resource_group(
428 &self,
429 resource_group: &str,
430 ) -> HashMap<WorkerId, WorkerNode> {
431 let worker_ids = filter_workers_by_resource_group(&self.worker_nodes, resource_group);
432 self.worker_nodes
433 .iter()
434 .filter(|(id, _)| worker_ids.contains(*id))
435 .map(|(id, worker)| (*id, worker.clone()))
436 .collect()
437 }
438}
439
440#[derive(Default, Clone, Debug)]
441pub struct WorkerExtraInfo {
442 expire_at: Option<u64>,
446 started_at: Option<u64>,
447 resource: PbResource,
448}
449
450impl WorkerExtraInfo {
451 fn update_ttl(&mut self, ttl: Duration) {
452 let expire = cmp::max(
453 self.expire_at.unwrap_or_default(),
454 SystemTime::now()
455 .add(ttl)
456 .duration_since(SystemTime::UNIX_EPOCH)
457 .expect("Clock may have gone backwards")
458 .as_secs(),
459 );
460 self.expire_at = Some(expire);
461 }
462
463 fn update_started_at(&mut self) {
464 self.started_at = Some(timestamp_now_sec());
465 }
466}
467
468fn timestamp_now_sec() -> u64 {
469 SystemTime::now()
470 .duration_since(SystemTime::UNIX_EPOCH)
471 .expect("Clock may have gone backwards")
472 .as_secs()
473}
474
475fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
476 PbWorkerNode {
477 id: META_NODE_ID,
478 r#type: PbWorkerType::Meta.into(),
479 host: HostAddr::try_from(host)
480 .as_ref()
481 .map(HostAddr::to_protobuf)
482 .ok(),
483 state: PbState::Running as _,
484 property: None,
485 transactional_id: None,
486 resource: Some(risingwave_pb::common::worker_node::Resource {
487 rw_version: RW_VERSION.to_owned(),
488 total_memory_bytes: system_memory_available_bytes() as _,
489 total_cpu_cores: total_cpu_available() as _,
490 hostname: hostname(),
491 }),
492 started_at,
493 }
494}
495
496pub struct ClusterControllerInner {
497 db: DatabaseConnection,
498 available_transactional_ids: VecDeque<TransactionId>,
500 worker_extra_info: HashMap<WorkerId, WorkerExtraInfo>,
501 disable_automatic_parallelism_control: bool,
502}
503
504impl ClusterControllerInner {
505 pub const MAX_WORKER_REUSABLE_ID_BITS: usize = 10;
506 pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS;
507
508 pub async fn new(
509 db: DatabaseConnection,
510 disable_automatic_parallelism_control: bool,
511 ) -> MetaResult<Self> {
512 let workers: Vec<(WorkerId, Option<TransactionId>)> = Worker::find()
513 .select_only()
514 .column(worker::Column::WorkerId)
515 .column(worker::Column::TransactionId)
516 .into_tuple()
517 .all(&db)
518 .await?;
519 let inuse_txn_ids: HashSet<_> = workers
520 .iter()
521 .cloned()
522 .filter_map(|(_, txn_id)| txn_id)
523 .collect();
524 let available_transactional_ids = (0..Self::MAX_WORKER_REUSABLE_ID_COUNT as TransactionId)
525 .filter(|id| !inuse_txn_ids.contains(id))
526 .collect();
527
528 let worker_extra_info = workers
529 .into_iter()
530 .map(|(w, _)| (w, WorkerExtraInfo::default()))
531 .collect();
532
533 Ok(Self {
534 db,
535 available_transactional_ids,
536 worker_extra_info,
537 disable_automatic_parallelism_control,
538 })
539 }
540
541 pub async fn count_worker_by_type(&self) -> MetaResult<HashMap<WorkerType, i64>> {
542 let workers: Vec<(WorkerType, i64)> = Worker::find()
543 .select_only()
544 .column(worker::Column::WorkerType)
545 .column_as(worker::Column::WorkerId.count(), "count")
546 .group_by(worker::Column::WorkerType)
547 .into_tuple()
548 .all(&self.db)
549 .await?;
550
551 Ok(workers.into_iter().collect())
552 }
553
554 pub fn update_worker_ttl(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
555 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
556 let expire = cmp::max(
557 info.expire_at.unwrap_or_default(),
558 SystemTime::now()
559 .add(ttl)
560 .duration_since(SystemTime::UNIX_EPOCH)
561 .expect("Clock may have gone backwards")
562 .as_secs(),
563 );
564 info.expire_at = Some(expire);
565 Ok(())
566 } else {
567 Err(MetaError::invalid_worker(worker_id, "worker not found"))
568 }
569 }
570
571 fn update_resource_and_started_at(
572 &mut self,
573 worker_id: WorkerId,
574 resource: PbResource,
575 ) -> MetaResult<()> {
576 if let Some(info) = self.worker_extra_info.get_mut(&worker_id) {
577 info.resource = resource;
578 info.update_started_at();
579 Ok(())
580 } else {
581 Err(MetaError::invalid_worker(worker_id, "worker not found"))
582 }
583 }
584
585 fn get_extra_info_checked(&self, worker_id: WorkerId) -> MetaResult<WorkerExtraInfo> {
586 self.worker_extra_info
587 .get(&worker_id)
588 .cloned()
589 .ok_or_else(|| MetaError::invalid_worker(worker_id, "worker not found"))
590 }
591
592 fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult<Option<TransactionId>> {
593 match (self.available_transactional_ids.front(), r#type) {
594 (None, _) => Err(MetaError::unavailable("no available reusable machine id")),
595 (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)),
597 _ => Ok(None),
598 }
599 }
600
601 fn cluster_resource(&self) -> ClusterResource {
603 let mut per_host = HashMap::new();
605
606 per_host.insert(
609 hostname(),
610 ClusterResource {
611 total_cpu_cores: total_cpu_available() as _,
612 total_memory_bytes: system_memory_available_bytes() as _,
613 },
614 );
615
616 for info in self.worker_extra_info.values() {
617 let r = per_host
618 .entry(info.resource.hostname.clone())
619 .or_insert_with(ClusterResource::default);
620
621 r.total_cpu_cores = max(r.total_cpu_cores, info.resource.total_cpu_cores);
622 r.total_memory_bytes = max(r.total_memory_bytes, info.resource.total_memory_bytes);
623 }
624
625 per_host
627 .into_values()
628 .reduce(|a, b| ClusterResource {
629 total_cpu_cores: a.total_cpu_cores + b.total_cpu_cores,
630 total_memory_bytes: a.total_memory_bytes + b.total_memory_bytes,
631 })
632 .unwrap_or_default()
633 }
634
635 pub async fn add_worker(
636 &mut self,
637 r#type: PbWorkerType,
638 host_address: HostAddress,
639 add_property: AddNodeProperty,
640 resource: PbResource,
641 ttl: Duration,
642 ) -> MetaResult<WorkerId> {
643 let txn = self.db.begin().await?;
644
645 let worker = Worker::find()
646 .filter(
647 worker::Column::Host
648 .eq(host_address.host.clone())
649 .and(worker::Column::Port.eq(host_address.port)),
650 )
651 .find_also_related(WorkerProperty)
652 .one(&txn)
653 .await?;
654 if let Some((worker, property)) = worker {
656 assert_eq!(worker.worker_type, r#type.into());
657 return if worker.worker_type == WorkerType::ComputeNode {
658 let property = property.unwrap();
659 let mut current_parallelism = property.parallelism as usize;
660 let new_parallelism = add_property.parallelism as usize;
661 match new_parallelism.cmp(¤t_parallelism) {
662 Ordering::Less => {
663 if !self.disable_automatic_parallelism_control {
664 tracing::info!(
666 "worker {} parallelism reduced from {} to {}",
667 worker.worker_id,
668 current_parallelism,
669 new_parallelism
670 );
671 current_parallelism = new_parallelism;
672 } else {
673 tracing::warn!(
676 "worker {} parallelism is less than current, current is {}, but received {}",
677 worker.worker_id,
678 current_parallelism,
679 new_parallelism
680 );
681 }
682 }
683 Ordering::Greater => {
684 tracing::info!(
685 "worker {} parallelism updated from {} to {}",
686 worker.worker_id,
687 current_parallelism,
688 new_parallelism
689 );
690 current_parallelism = new_parallelism;
691 }
692 Ordering::Equal => {}
693 }
694 let mut property: worker_property::ActiveModel = property.into();
695
696 property.is_streaming = Set(add_property.is_streaming);
697 property.is_serving = Set(add_property.is_serving);
698 property.parallelism = Set(current_parallelism as _);
699 property.resource_group =
700 Set(Some(add_property.resource_group.unwrap_or_else(|| {
701 tracing::warn!(
702 "resource_group is not set for worker {}, fallback to `default`",
703 worker.worker_id
704 );
705 DEFAULT_RESOURCE_GROUP.to_owned()
706 })));
707
708 WorkerProperty::update(property).exec(&txn).await?;
709 txn.commit().await?;
710 self.update_worker_ttl(worker.worker_id, ttl)?;
711 self.update_resource_and_started_at(worker.worker_id, resource)?;
712 Ok(worker.worker_id)
713 } else if worker.worker_type == WorkerType::Frontend && property.is_none() {
714 let worker_property = worker_property::ActiveModel {
715 worker_id: Set(worker.worker_id),
716 parallelism: Set(add_property
717 .parallelism
718 .try_into()
719 .expect("invalid parallelism")),
720 is_streaming: Set(add_property.is_streaming),
721 is_serving: Set(add_property.is_serving),
722 is_unschedulable: Set(false),
723 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
724 resource_group: Set(None),
725 is_iceberg_compactor: Set(false),
726 };
727 WorkerProperty::insert(worker_property).exec(&txn).await?;
728 txn.commit().await?;
729 self.update_worker_ttl(worker.worker_id, ttl)?;
730 self.update_resource_and_started_at(worker.worker_id, resource)?;
731 Ok(worker.worker_id)
732 } else if worker.worker_type == WorkerType::Compactor {
733 if let Some(property) = property {
734 let mut property: worker_property::ActiveModel = property.into();
735 property.is_iceberg_compactor = Set(add_property.is_iceberg_compactor);
736 property.internal_rpc_host_addr =
737 Set(Some(add_property.internal_rpc_host_addr));
738
739 WorkerProperty::update(property).exec(&txn).await?;
740 } else {
741 let property = worker_property::ActiveModel {
742 worker_id: Set(worker.worker_id),
743 parallelism: Set(add_property
744 .parallelism
745 .try_into()
746 .expect("invalid parallelism")),
747 is_streaming: Set(false),
748 is_serving: Set(false),
749 is_unschedulable: Set(false),
750 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
751 resource_group: Set(None),
752 is_iceberg_compactor: Set(add_property.is_iceberg_compactor),
753 };
754
755 WorkerProperty::insert(property).exec(&txn).await?;
756 }
757 txn.commit().await?;
758 self.update_worker_ttl(worker.worker_id, ttl)?;
759 self.update_resource_and_started_at(worker.worker_id, resource)?;
760 Ok(worker.worker_id)
761 } else {
762 self.update_worker_ttl(worker.worker_id, ttl)?;
763 self.update_resource_and_started_at(worker.worker_id, resource)?;
764 Ok(worker.worker_id)
765 };
766 }
767
768 let txn_id = self.apply_transaction_id(r#type)?;
769
770 let worker = worker::ActiveModel {
771 worker_id: Default::default(),
772 worker_type: Set(r#type.into()),
773 host: Set(host_address.host.clone()),
774 port: Set(host_address.port),
775 status: Set(WorkerStatus::Starting),
776 transaction_id: Set(txn_id),
777 };
778 let insert_res = Worker::insert(worker).exec(&txn).await?;
779 let worker_id = insert_res.last_insert_id as WorkerId;
780 if r#type == PbWorkerType::ComputeNode
781 || r#type == PbWorkerType::Frontend
782 || r#type == PbWorkerType::Compactor
783 {
784 let (is_serving, is_streaming, is_iceberg_compactor, resource_group) = match r#type {
785 PbWorkerType::ComputeNode => (
786 add_property.is_serving,
787 add_property.is_streaming,
788 false,
789 add_property.resource_group.clone(),
790 ),
791 PbWorkerType::Frontend => (
792 add_property.is_serving,
793 add_property.is_streaming,
794 false,
795 None,
796 ),
797 PbWorkerType::Compactor => (false, false, add_property.is_iceberg_compactor, None),
798 _ => unreachable!(),
799 };
800
801 let property = worker_property::ActiveModel {
802 worker_id: Set(worker_id),
803 parallelism: Set(add_property
804 .parallelism
805 .try_into()
806 .expect("invalid parallelism")),
807 is_streaming: Set(is_streaming),
808 is_serving: Set(is_serving),
809 is_unschedulable: Set(false),
810 internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
811 resource_group: Set(resource_group),
812 is_iceberg_compactor: Set(is_iceberg_compactor),
813 };
814 WorkerProperty::insert(property).exec(&txn).await?;
815 }
816
817 txn.commit().await?;
818 if let Some(txn_id) = txn_id {
819 self.available_transactional_ids.retain(|id| *id != txn_id);
820 }
821 let extra_info = WorkerExtraInfo {
822 started_at: Some(timestamp_now_sec()),
823 expire_at: None,
824 resource,
825 };
826 self.worker_extra_info.insert(worker_id, extra_info);
827
828 Ok(worker_id)
829 }
830
831 pub async fn activate_worker(&self, worker_id: WorkerId) -> MetaResult<PbWorkerNode> {
832 let worker = worker::ActiveModel {
833 worker_id: Set(worker_id),
834 status: Set(WorkerStatus::Running),
835 ..Default::default()
836 };
837
838 let worker = worker.update(&self.db).await?;
839 let worker_property = WorkerProperty::find_by_id(worker.worker_id)
840 .one(&self.db)
841 .await?;
842 let extra_info = self.get_extra_info_checked(worker_id)?;
843 Ok(WorkerInfo(worker, worker_property, extra_info).into())
844 }
845
846 pub async fn delete_worker(&mut self, host_addr: HostAddress) -> MetaResult<PbWorkerNode> {
847 let worker = Worker::find()
848 .filter(
849 worker::Column::Host
850 .eq(host_addr.host)
851 .and(worker::Column::Port.eq(host_addr.port)),
852 )
853 .find_also_related(WorkerProperty)
854 .one(&self.db)
855 .await?;
856 let Some((worker, property)) = worker else {
857 return Err(MetaError::invalid_parameter("worker not found!"));
858 };
859
860 let res = Worker::delete_by_id(worker.worker_id)
861 .exec(&self.db)
862 .await?;
863 if res.rows_affected == 0 {
864 return Err(MetaError::invalid_parameter("worker not found!"));
865 }
866
867 let extra_info = self.worker_extra_info.remove(&worker.worker_id).unwrap();
868 if let Some(txn_id) = &worker.transaction_id {
869 self.available_transactional_ids.push_back(*txn_id);
870 }
871 let worker: PbWorkerNode = WorkerInfo(worker, property, extra_info).into();
872
873 Ok(worker)
874 }
875
876 pub fn heartbeat(
877 &mut self,
878 worker_id: WorkerId,
879 ttl: Duration,
880 resource: Option<PbResource>,
881 ) -> MetaResult<()> {
882 if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
883 worker_info.update_ttl(ttl);
884 if let Some(resource) = resource {
885 worker_info.resource = resource;
886 }
887 Ok(())
888 } else {
889 Err(MetaError::invalid_worker(worker_id, "worker not found"))
890 }
891 }
892
893 pub async fn list_workers(
894 &self,
895 worker_type: Option<WorkerType>,
896 worker_status: Option<WorkerStatus>,
897 ) -> MetaResult<Vec<PbWorkerNode>> {
898 let mut find = Worker::find();
899 if let Some(worker_type) = worker_type {
900 find = find.filter(worker::Column::WorkerType.eq(worker_type));
901 }
902 if let Some(worker_status) = worker_status {
903 find = find.filter(worker::Column::Status.eq(worker_status));
904 }
905 let workers = find.find_also_related(WorkerProperty).all(&self.db).await?;
906 Ok(workers
907 .into_iter()
908 .map(|(worker, property)| {
909 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
910 WorkerInfo(worker, property, extra_info).into()
911 })
912 .collect_vec())
913 }
914
915 pub async fn list_active_streaming_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
916 let workers = Worker::find()
917 .filter(
918 worker::Column::WorkerType
919 .eq(WorkerType::ComputeNode)
920 .and(worker::Column::Status.eq(WorkerStatus::Running)),
921 )
922 .inner_join(WorkerProperty)
923 .select_also(WorkerProperty)
924 .filter(worker_property::Column::IsStreaming.eq(true))
925 .all(&self.db)
926 .await?;
927
928 Ok(workers
929 .into_iter()
930 .map(|(worker, property)| {
931 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
932 WorkerInfo(worker, property, extra_info).into()
933 })
934 .collect_vec())
935 }
936
937 pub async fn list_active_worker_slots(&self) -> MetaResult<Vec<WorkerSlotId>> {
938 let worker_parallelisms: Vec<(WorkerId, i32)> = WorkerProperty::find()
939 .select_only()
940 .column(worker_property::Column::WorkerId)
941 .column(worker_property::Column::Parallelism)
942 .inner_join(Worker)
943 .filter(worker::Column::Status.eq(WorkerStatus::Running))
944 .into_tuple()
945 .all(&self.db)
946 .await?;
947 Ok(worker_parallelisms
948 .into_iter()
949 .flat_map(|(worker_id, parallelism)| {
950 (0..parallelism).map(move |idx| WorkerSlotId::new(worker_id, idx as usize))
951 })
952 .collect_vec())
953 }
954
955 pub async fn list_active_serving_workers(&self) -> MetaResult<Vec<PbWorkerNode>> {
956 let workers = Worker::find()
957 .filter(
958 worker::Column::WorkerType
959 .eq(WorkerType::ComputeNode)
960 .and(worker::Column::Status.eq(WorkerStatus::Running)),
961 )
962 .inner_join(WorkerProperty)
963 .select_also(WorkerProperty)
964 .filter(worker_property::Column::IsServing.eq(true))
965 .all(&self.db)
966 .await?;
967
968 Ok(workers
969 .into_iter()
970 .map(|(worker, property)| {
971 let extra_info = self.get_extra_info_checked(worker.worker_id).unwrap();
972 WorkerInfo(worker, property, extra_info).into()
973 })
974 .collect_vec())
975 }
976
977 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
978 let streaming_workers = self.list_active_streaming_workers().await?;
979
980 let active_workers: HashMap<_, _> =
981 streaming_workers.into_iter().map(|w| (w.id, w)).collect();
982
983 Ok(StreamingClusterInfo {
984 worker_nodes: active_workers,
985 })
986 }
987
988 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
989 let worker = Worker::find_by_id(worker_id)
990 .find_also_related(WorkerProperty)
991 .one(&self.db)
992 .await?;
993 if worker.is_none() {
994 return Ok(None);
995 }
996 let extra_info = self.get_extra_info_checked(worker_id)?;
997 Ok(worker.map(|(w, p)| WorkerInfo(w, p, extra_info).into()))
998 }
999
1000 pub fn get_worker_extra_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
1001 self.worker_extra_info.get(&worker_id).cloned()
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008
1009 fn mock_worker_hosts_for_test(count: usize) -> Vec<HostAddress> {
1010 (0..count)
1011 .map(|i| HostAddress {
1012 host: "localhost".to_owned(),
1013 port: 5000 + i as i32,
1014 })
1015 .collect_vec()
1016 }
1017
1018 #[tokio::test]
1019 async fn test_cluster_controller() -> MetaResult<()> {
1020 let env = MetaSrvEnv::for_test().await;
1021 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1022
1023 let parallelism_num = 4_usize;
1024 let worker_count = 5_usize;
1025 let property = AddNodeProperty {
1026 parallelism: parallelism_num as _,
1027 is_streaming: true,
1028 is_serving: true,
1029 ..Default::default()
1030 };
1031 let hosts = mock_worker_hosts_for_test(worker_count);
1032 let mut worker_ids = vec![];
1033 for host in &hosts {
1034 worker_ids.push(
1035 cluster_ctl
1036 .add_worker(
1037 PbWorkerType::ComputeNode,
1038 host.clone(),
1039 property.clone(),
1040 PbResource::default(),
1041 )
1042 .await?,
1043 );
1044 }
1045
1046 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1048
1049 for id in &worker_ids {
1050 cluster_ctl.activate_worker(*id).await?;
1051 }
1052 let worker_cnt_map = cluster_ctl.count_worker_by_type().await?;
1053 assert_eq!(
1054 *worker_cnt_map.get(&WorkerType::ComputeNode).unwrap() as usize,
1055 worker_count
1056 );
1057 assert_eq!(
1058 cluster_ctl.list_active_streaming_workers().await?.len(),
1059 worker_count
1060 );
1061 assert_eq!(
1062 cluster_ctl.list_active_serving_workers().await?.len(),
1063 worker_count
1064 );
1065 assert_eq!(
1066 cluster_ctl.list_active_worker_slots().await?.len(),
1067 parallelism_num * worker_count
1068 );
1069
1070 let mut new_property = property.clone();
1072 new_property.parallelism = (parallelism_num * 2) as _;
1073 new_property.is_serving = false;
1074 cluster_ctl
1075 .add_worker(
1076 PbWorkerType::ComputeNode,
1077 hosts[0].clone(),
1078 new_property,
1079 PbResource::default(),
1080 )
1081 .await?;
1082
1083 assert_eq!(
1084 cluster_ctl.list_active_streaming_workers().await?.len(),
1085 worker_count
1086 );
1087 assert_eq!(
1088 cluster_ctl.list_active_serving_workers().await?.len(),
1089 worker_count - 1
1090 );
1091 let worker_slots = cluster_ctl.list_active_worker_slots().await?;
1092 assert!(worker_slots.iter().all_unique());
1093 assert_eq!(worker_slots.len(), parallelism_num * (worker_count + 1));
1094
1095 for host in hosts {
1097 cluster_ctl.delete_worker(host).await?;
1098 }
1099 assert_eq!(cluster_ctl.list_active_streaming_workers().await?.len(), 0);
1100 assert_eq!(cluster_ctl.list_active_serving_workers().await?.len(), 0);
1101 assert_eq!(cluster_ctl.list_active_worker_slots().await?.len(), 0);
1102
1103 Ok(())
1104 }
1105
1106 #[tokio::test]
1107 async fn test_list_workers_include_meta_node() -> MetaResult<()> {
1108 let env = MetaSrvEnv::for_test().await;
1109 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1110
1111 let workers = cluster_ctl.list_workers(None, None).await?;
1113 assert!(workers.iter().any(|w| w.r#type() == PbWorkerType::Meta));
1114
1115 let workers = cluster_ctl
1117 .list_workers(Some(WorkerType::Meta), None)
1118 .await?;
1119 assert_eq!(workers.len(), 1);
1120 assert_eq!(workers[0].r#type(), PbWorkerType::Meta);
1121
1122 let workers = cluster_ctl
1124 .list_workers(Some(WorkerType::Meta), Some(WorkerStatus::Starting))
1125 .await?;
1126 assert!(workers.is_empty());
1127
1128 Ok(())
1129 }
1130
1131 #[tokio::test]
1132 async fn test_heartbeat_updates_resource() -> MetaResult<()> {
1133 let env = MetaSrvEnv::for_test().await;
1134 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1135
1136 let host = HostAddress {
1137 host: "localhost".to_owned(),
1138 port: 5010,
1139 };
1140 let property = AddNodeProperty {
1141 is_streaming: true,
1142 is_serving: true,
1143 parallelism: 4,
1144 ..Default::default()
1145 };
1146
1147 let resource_v1 = PbResource {
1148 rw_version: "rw-v1".to_owned(),
1149 total_memory_bytes: 1024,
1150 total_cpu_cores: 4,
1151 hostname: "host-v1".to_owned(),
1152 };
1153 let worker_id = cluster_ctl
1154 .add_worker(
1155 PbWorkerType::ComputeNode,
1156 host.clone(),
1157 property,
1158 resource_v1,
1159 )
1160 .await?;
1161
1162 let resource_v2 = PbResource {
1163 rw_version: "rw-v2".to_owned(),
1164 total_memory_bytes: 2048,
1165 total_cpu_cores: 8,
1166 hostname: "host-v2".to_owned(),
1167 };
1168 cluster_ctl
1169 .heartbeat(worker_id, Some(resource_v2.clone()))
1170 .await?;
1171
1172 let worker = cluster_ctl
1173 .get_worker_by_id(worker_id)
1174 .await?
1175 .expect("worker should exist");
1176 assert_eq!(
1177 worker.resource.expect("worker resource should exist"),
1178 resource_v2
1179 );
1180
1181 cluster_ctl.delete_worker(host).await?;
1182 Ok(())
1183 }
1184
1185 #[tokio::test]
1186 async fn test_reregister_compute_node_updates_resource() -> MetaResult<()> {
1187 let env = MetaSrvEnv::for_test().await;
1188 let cluster_ctl = ClusterController::new(env, Duration::from_secs(1)).await?;
1189
1190 let host = HostAddress {
1191 host: "localhost".to_owned(),
1192 port: 5011,
1193 };
1194 let property = AddNodeProperty {
1195 is_streaming: true,
1196 is_serving: true,
1197 parallelism: 4,
1198 ..Default::default()
1199 };
1200
1201 let resource_v1 = PbResource {
1202 rw_version: "rw-v1".to_owned(),
1203 total_memory_bytes: 1024,
1204 total_cpu_cores: 4,
1205 hostname: "host-v1".to_owned(),
1206 };
1207 let worker_id = cluster_ctl
1208 .add_worker(
1209 PbWorkerType::ComputeNode,
1210 host.clone(),
1211 property.clone(),
1212 resource_v1,
1213 )
1214 .await?;
1215
1216 let resource_v2 = PbResource {
1217 rw_version: "rw-v2".to_owned(),
1218 total_memory_bytes: 2048,
1219 total_cpu_cores: 8,
1220 hostname: "host-v2".to_owned(),
1221 };
1222 cluster_ctl
1223 .add_worker(
1224 PbWorkerType::ComputeNode,
1225 host.clone(),
1226 property,
1227 resource_v2.clone(),
1228 )
1229 .await?;
1230
1231 let worker = cluster_ctl
1232 .get_worker_by_id(worker_id)
1233 .await?
1234 .expect("worker should exist");
1235 assert_eq!(
1236 worker.resource.expect("worker resource should exist"),
1237 resource_v2
1238 );
1239
1240 cluster_ctl.delete_worker(host).await?;
1241 Ok(())
1242 }
1243}