1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::{Debug, Formatter};
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::catalog::{DatabaseId, TableId, TableOption};
21use risingwave_common::id::JobId;
22use risingwave_meta_model::{SinkId, SourceId, WorkerId};
23use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
24use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State};
25use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
26use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
27use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode, PbStreamScanType};
28use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
29use tokio::sync::oneshot;
30use tracing::warn;
31
32use crate::MetaResult;
33use crate::barrier::SharedFragmentInfo;
34use crate::controller::catalog::CatalogControllerRef;
35use crate::controller::cluster::{ClusterControllerRef, StreamingClusterInfo, WorkerExtraInfo};
36use crate::controller::fragment::FragmentParallelismInfo;
37use crate::manager::{LocalNotification, NotificationVersion};
38use crate::model::{ActorId, ClusterId, FragmentId, StreamJobFragments, SubscriptionId};
39use crate::stream::SplitAssignment;
40use crate::telemetry::MetaTelemetryJobDesc;
41
42#[derive(Clone)]
43pub struct MetadataManager {
44 pub cluster_controller: ClusterControllerRef,
45 pub catalog_controller: CatalogControllerRef,
46}
47
48#[derive(Debug)]
49pub(crate) enum ActiveStreamingWorkerChange {
50 Add(WorkerNode),
51 Remove(WorkerNode),
52 Update(WorkerNode),
53}
54
55pub struct ActiveStreamingWorkerNodes {
56 worker_nodes: HashMap<WorkerId, WorkerNode>,
57 rx: UnboundedReceiver<LocalNotification>,
58 #[cfg_attr(not(debug_assertions), expect(dead_code))]
59 meta_manager: Option<MetadataManager>,
60}
61
62impl Debug for ActiveStreamingWorkerNodes {
63 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("ActiveStreamingWorkerNodes")
65 .field("worker_nodes", &self.worker_nodes)
66 .finish()
67 }
68}
69
70impl ActiveStreamingWorkerNodes {
71 pub(crate) fn uninitialized() -> Self {
72 Self {
73 worker_nodes: Default::default(),
74 rx: unbounded_channel().1,
75 meta_manager: None,
76 }
77 }
78
79 #[cfg(test)]
80 pub(crate) fn for_test(worker_nodes: HashMap<WorkerId, WorkerNode>) -> Self {
81 let (tx, rx) = unbounded_channel();
82 let _join_handle = tokio::spawn(async move {
83 let _tx = tx;
84 std::future::pending::<()>().await
85 });
86 Self {
87 worker_nodes,
88 rx,
89 meta_manager: None,
90 }
91 }
92
93 pub(crate) async fn new_snapshot(meta_manager: MetadataManager) -> MetaResult<Self> {
95 let (nodes, rx) = meta_manager
96 .subscribe_active_streaming_compute_nodes()
97 .await?;
98 Ok(Self {
99 worker_nodes: nodes.into_iter().map(|node| (node.id, node)).collect(),
100 rx,
101 meta_manager: Some(meta_manager),
102 })
103 }
104
105 pub(crate) fn current(&self) -> &HashMap<WorkerId, WorkerNode> {
106 &self.worker_nodes
107 }
108
109 pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange {
110 loop {
111 let notification = self
112 .rx
113 .recv()
114 .await
115 .expect("notification stopped or uninitialized");
116 match notification {
117 LocalNotification::WorkerNodeDeleted(worker) => {
118 let is_streaming_compute_node = worker.r#type == WorkerType::ComputeNode as i32
119 && worker.property.as_ref().unwrap().is_streaming;
120 let Some(prev_worker) = self.worker_nodes.remove(&worker.id) else {
121 if is_streaming_compute_node {
122 warn!(
123 ?worker,
124 "notify to delete an non-existing streaming compute worker"
125 );
126 }
127 continue;
128 };
129 if !is_streaming_compute_node {
130 warn!(
131 ?worker,
132 ?prev_worker,
133 "deleted worker has a different recent type"
134 );
135 }
136 if worker.state == State::Starting as i32 {
137 warn!(
138 id = %worker.id,
139 host = ?worker.host,
140 state = worker.state,
141 "a starting streaming worker is deleted"
142 );
143 }
144 break ActiveStreamingWorkerChange::Remove(prev_worker);
145 }
146 LocalNotification::WorkerNodeActivated(worker) => {
147 if worker.r#type != WorkerType::ComputeNode as i32
148 || !worker.property.as_ref().unwrap().is_streaming
149 {
150 if let Some(prev_worker) = self.worker_nodes.remove(&worker.id) {
151 warn!(
152 ?worker,
153 ?prev_worker,
154 "the type of a streaming worker is changed"
155 );
156 break ActiveStreamingWorkerChange::Remove(prev_worker);
157 } else {
158 continue;
159 }
160 }
161 assert_eq!(
162 worker.state,
163 State::Running as i32,
164 "not started worker added: {:?}",
165 worker
166 );
167 if let Some(prev_worker) = self.worker_nodes.insert(worker.id, worker.clone()) {
168 assert_eq!(prev_worker.host, worker.host);
169 assert_eq!(prev_worker.r#type, worker.r#type);
170 warn!(
171 ?prev_worker,
172 ?worker,
173 eq = prev_worker == worker,
174 "notify to update an existing active worker"
175 );
176 if prev_worker == worker {
177 continue;
178 } else {
179 break ActiveStreamingWorkerChange::Update(worker);
180 }
181 } else {
182 break ActiveStreamingWorkerChange::Add(worker);
183 }
184 }
185 _ => {
186 continue;
187 }
188 }
189 }
190 }
191
192 #[cfg(debug_assertions)]
193 pub(crate) async fn validate_change(&self) {
194 use risingwave_pb::common::WorkerNode;
195 use thiserror_ext::AsReport;
196 let Some(meta_manager) = &self.meta_manager else {
197 return;
198 };
199 match meta_manager.list_active_streaming_compute_nodes().await {
200 Ok(worker_nodes) => {
201 let ignore_irrelevant_info = |node: &WorkerNode| {
202 (
203 node.id,
204 WorkerNode {
205 id: node.id,
206 r#type: node.r#type,
207 host: node.host.clone(),
208 property: node.property.clone(),
209 resource: node.resource.clone(),
210 ..Default::default()
211 },
212 )
213 };
214 let worker_nodes: HashMap<_, _> =
215 worker_nodes.iter().map(ignore_irrelevant_info).collect();
216 let curr_worker_nodes: HashMap<_, _> = self
217 .current()
218 .values()
219 .map(ignore_irrelevant_info)
220 .collect();
221 if worker_nodes != curr_worker_nodes {
222 warn!(
223 ?worker_nodes,
224 ?curr_worker_nodes,
225 "different to global snapshot"
226 );
227 }
228 }
229 Err(e) => {
230 warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
231 }
232 }
233 }
234}
235
236impl MetadataManager {
237 pub fn new(
238 cluster_controller: ClusterControllerRef,
239 catalog_controller: CatalogControllerRef,
240 ) -> Self {
241 Self {
242 cluster_controller,
243 catalog_controller,
244 }
245 }
246
247 pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> MetaResult<Option<PbWorkerNode>> {
248 self.cluster_controller.get_worker_by_id(worker_id).await
249 }
250
251 pub async fn count_worker_node(&self) -> MetaResult<HashMap<WorkerType, u64>> {
252 let node_map = self.cluster_controller.count_worker_by_type().await?;
253 Ok(node_map
254 .into_iter()
255 .map(|(ty, cnt)| (ty.into(), cnt as u64))
256 .collect())
257 }
258
259 pub async fn get_worker_info_by_id(&self, worker_id: WorkerId) -> Option<WorkerExtraInfo> {
260 self.cluster_controller
261 .get_worker_info_by_id(worker_id as _)
262 .await
263 }
264
265 pub async fn add_worker_node(
266 &self,
267 r#type: PbWorkerType,
268 host_address: HostAddress,
269 property: AddNodeProperty,
270 resource: PbResource,
271 ) -> MetaResult<WorkerId> {
272 self.cluster_controller
273 .add_worker(r#type, host_address, property, resource)
274 .await
275 .map(|id| id as WorkerId)
276 }
277
278 pub async fn list_worker_node(
279 &self,
280 worker_type: Option<WorkerType>,
281 worker_state: Option<State>,
282 ) -> MetaResult<Vec<PbWorkerNode>> {
283 self.cluster_controller
284 .list_workers(worker_type.map(Into::into), worker_state.map(Into::into))
285 .await
286 }
287
288 pub async fn subscribe_active_streaming_compute_nodes(
289 &self,
290 ) -> MetaResult<(Vec<WorkerNode>, UnboundedReceiver<LocalNotification>)> {
291 self.cluster_controller
292 .subscribe_active_streaming_compute_nodes()
293 .await
294 }
295
296 pub async fn list_active_streaming_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
297 self.cluster_controller
298 .list_active_streaming_workers()
299 .await
300 }
301
302 pub async fn list_active_serving_compute_nodes(&self) -> MetaResult<Vec<PbWorkerNode>> {
303 self.cluster_controller.list_active_serving_workers().await
304 }
305
306 pub async fn list_active_database_ids(&self) -> MetaResult<HashSet<DatabaseId>> {
307 Ok(self
308 .catalog_controller
309 .list_fragment_database_ids(None)
310 .await?
311 .into_iter()
312 .map(|(_, database_id)| database_id)
313 .collect())
314 }
315
316 pub async fn split_fragment_map_by_database<T: Debug>(
317 &self,
318 fragment_map: HashMap<FragmentId, T>,
319 ) -> MetaResult<HashMap<DatabaseId, HashMap<FragmentId, T>>> {
320 let fragment_to_database_map: HashMap<_, _> = self
321 .catalog_controller
322 .list_fragment_database_ids(Some(
323 fragment_map
324 .keys()
325 .map(|fragment_id| *fragment_id as _)
326 .collect(),
327 ))
328 .await?
329 .into_iter()
330 .map(|(fragment_id, database_id)| (fragment_id as FragmentId, database_id))
331 .collect();
332 let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new();
333 for (fragment_id, value) in fragment_map {
334 let database_id = *fragment_to_database_map
335 .get(&fragment_id)
336 .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?;
337 ret.entry(database_id)
338 .or_default()
339 .try_insert(fragment_id, value)
340 .expect("non duplicate");
341 }
342 Ok(ret)
343 }
344
345 pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<JobId>> {
346 let jobs = self
347 .catalog_controller
348 .list_background_creating_jobs(false, None)
349 .await?;
350
351 let jobs = jobs.into_iter().map(|(id, _, _)| id).collect();
352
353 Ok(jobs)
354 }
355
356 pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
357 self.catalog_controller.list_sources().await
358 }
359
360 pub fn running_fragment_parallelisms(
361 &self,
362 id_filter: Option<HashSet<FragmentId>>,
363 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
364 let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
365 Ok(self
366 .catalog_controller
367 .running_fragment_parallelisms(id_filter)?
368 .into_iter()
369 .map(|(k, v)| (k as FragmentId, v))
370 .collect())
371 }
372
373 pub async fn get_upstream_root_fragments(
378 &self,
379 upstream_table_ids: &HashSet<TableId>,
380 ) -> MetaResult<(
381 HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
382 HashMap<ActorId, WorkerId>,
383 )> {
384 let (upstream_root_fragments, actors) = self
385 .catalog_controller
386 .get_root_fragments(upstream_table_ids.iter().map(|id| id.as_job_id()).collect())
387 .await?;
388
389 Ok((upstream_root_fragments, actors))
390 }
391
392 pub async fn get_streaming_cluster_info(&self) -> MetaResult<StreamingClusterInfo> {
393 self.cluster_controller.get_streaming_cluster_info().await
394 }
395
396 pub async fn get_all_table_options(&self) -> MetaResult<HashMap<TableId, TableOption>> {
397 self.catalog_controller.get_all_table_options().await
398 }
399
400 pub async fn get_table_name_type_mapping(
401 &self,
402 ) -> MetaResult<HashMap<TableId, (String, String)>> {
403 self.catalog_controller.get_table_name_type_mapping().await
404 }
405
406 pub async fn get_created_table_ids(&self) -> MetaResult<Vec<TableId>> {
407 self.catalog_controller.get_created_table_ids().await
408 }
409
410 pub async fn get_table_associated_source_id(
411 &self,
412 table_id: TableId,
413 ) -> MetaResult<Option<SourceId>> {
414 self.catalog_controller
415 .get_table_associated_source_id(table_id)
416 .await
417 }
418
419 pub async fn get_table_catalog_by_ids(&self, ids: &[TableId]) -> MetaResult<Vec<PbTable>> {
420 self.catalog_controller
421 .get_table_by_ids(ids.to_vec(), false)
422 .await
423 }
424
425 pub async fn get_table_incoming_sinks(&self, table_id: TableId) -> MetaResult<Vec<PbSink>> {
426 self.catalog_controller
427 .get_table_incoming_sinks(table_id)
428 .await
429 }
430
431 pub async fn get_sink_state_table_ids(&self, sink_id: SinkId) -> MetaResult<Vec<TableId>> {
432 self.catalog_controller
433 .get_sink_state_table_ids(sink_id)
434 .await
435 }
436
437 pub async fn get_table_catalog_by_cdc_table_id(
438 &self,
439 cdc_table_id: &String,
440 ) -> MetaResult<Vec<PbTable>> {
441 self.catalog_controller
442 .get_table_by_cdc_table_id(cdc_table_id)
443 .await
444 }
445
446 pub async fn get_downstream_fragments(
447 &self,
448 job_id: JobId,
449 ) -> MetaResult<(
450 Vec<(PbDispatcherType, SharedFragmentInfo, PbStreamNode)>,
451 HashMap<ActorId, WorkerId>,
452 )> {
453 let (fragments, actors) = self
454 .catalog_controller
455 .get_downstream_fragments(job_id)
456 .await?;
457
458 Ok((fragments, actors))
459 }
460
461 pub async fn get_job_id_to_internal_table_ids_mapping(
462 &self,
463 ) -> Option<Vec<(JobId, Vec<TableId>)>> {
464 self.catalog_controller.get_job_internal_table_ids().await
465 }
466
467 pub async fn get_job_fragments_by_id(&self, job_id: JobId) -> MetaResult<StreamJobFragments> {
468 self.catalog_controller
469 .get_job_fragments_by_id(job_id)
470 .await
471 }
472
473 pub fn get_running_actors_of_fragment(&self, id: FragmentId) -> MetaResult<HashSet<ActorId>> {
474 self.catalog_controller
475 .get_running_actors_of_fragment(id as _)
476 }
477
478 pub async fn get_running_actors_for_source_backfill(
480 &self,
481 source_backfill_fragment_id: FragmentId,
482 source_fragment_id: FragmentId,
483 ) -> MetaResult<HashSet<(ActorId, ActorId)>> {
484 let actor_ids = self
485 .catalog_controller
486 .get_running_actors_for_source_backfill(
487 source_backfill_fragment_id as _,
488 source_fragment_id as _,
489 )
490 .await?;
491 Ok(actor_ids
492 .into_iter()
493 .map(|(id, upstream)| (id as ActorId, upstream as ActorId))
494 .collect())
495 }
496
497 pub fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
498 let actor_cnt = self.catalog_controller.worker_actor_count()?;
499 Ok(actor_cnt
500 .into_iter()
501 .map(|(id, cnt)| (id as WorkerId, cnt))
502 .collect())
503 }
504
505 pub async fn count_streaming_job(&self) -> MetaResult<usize> {
506 self.catalog_controller
507 .list_streaming_job_infos()
508 .await
509 .map(|x| x.len())
510 }
511
512 pub async fn list_stream_job_desc(&self) -> MetaResult<Vec<MetaTelemetryJobDesc>> {
513 self.catalog_controller
514 .list_stream_job_desc_for_telemetry()
515 .await
516 }
517
518 pub async fn update_source_rate_limit_by_source_id(
519 &self,
520 source_id: SourceId,
521 rate_limit: Option<u32>,
522 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
523 let fragment_actors = self
524 .catalog_controller
525 .update_source_rate_limit_by_source_id(source_id as _, rate_limit)
526 .await?;
527 Ok(fragment_actors
528 .into_iter()
529 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
530 .collect())
531 }
532
533 pub async fn update_backfill_rate_limit_by_job_id(
534 &self,
535 job_id: JobId,
536 rate_limit: Option<u32>,
537 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
538 let fragment_actors = self
539 .catalog_controller
540 .update_backfill_rate_limit_by_job_id(job_id, rate_limit)
541 .await?;
542 Ok(fragment_actors
543 .into_iter()
544 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
545 .collect())
546 }
547
548 pub async fn update_sink_rate_limit_by_sink_id(
549 &self,
550 sink_id: SinkId,
551 rate_limit: Option<u32>,
552 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
553 let fragment_actors = self
554 .catalog_controller
555 .update_sink_rate_limit_by_job_id(sink_id, rate_limit)
556 .await?;
557 Ok(fragment_actors
558 .into_iter()
559 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
560 .collect())
561 }
562
563 pub async fn update_dml_rate_limit_by_job_id(
564 &self,
565 job_id: JobId,
566 rate_limit: Option<u32>,
567 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
568 let fragment_actors = self
569 .catalog_controller
570 .update_dml_rate_limit_by_job_id(job_id, rate_limit)
571 .await?;
572 Ok(fragment_actors
573 .into_iter()
574 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
575 .collect())
576 }
577
578 pub async fn update_sink_props_by_sink_id(
579 &self,
580 sink_id: SinkId,
581 props: BTreeMap<String, String>,
582 ) -> MetaResult<HashMap<String, String>> {
583 let new_props = self
584 .catalog_controller
585 .update_sink_props_by_sink_id(sink_id, props)
586 .await?;
587 Ok(new_props)
588 }
589
590 pub async fn update_iceberg_table_props_by_table_id(
591 &self,
592 table_id: TableId,
593 props: BTreeMap<String, String>,
594 alter_iceberg_table_props: Option<
595 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
596 >,
597 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
598 let (new_props, sink_id) = self
599 .catalog_controller
600 .update_iceberg_table_props_by_table_id(table_id, props, alter_iceberg_table_props)
601 .await?;
602 Ok((new_props, sink_id))
603 }
604
605 pub async fn update_fragment_rate_limit_by_fragment_id(
606 &self,
607 fragment_id: FragmentId,
608 rate_limit: Option<u32>,
609 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
610 let fragment_actors = self
611 .catalog_controller
612 .update_fragment_rate_limit_by_fragment_id(fragment_id as _, rate_limit)
613 .await?;
614 Ok(fragment_actors
615 .into_iter()
616 .map(|(id, actors)| (id as _, actors.into_iter().map(|id| id as _).collect()))
617 .collect())
618 }
619
620 #[await_tree::instrument]
621 pub async fn update_fragment_splits(
622 &self,
623 split_assignment: &SplitAssignment,
624 ) -> MetaResult<()> {
625 let fragment_splits = split_assignment
626 .iter()
627 .map(|(fragment_id, splits)| {
628 (
629 *fragment_id as _,
630 splits.values().flatten().cloned().collect_vec(),
631 )
632 })
633 .collect();
634
635 let inner = self.catalog_controller.inner.write().await;
636
637 self.catalog_controller
638 .update_fragment_splits(&inner.db, &fragment_splits)
639 .await
640 }
641
642 pub async fn get_mv_depended_subscriptions(
643 &self,
644 database_id: Option<DatabaseId>,
645 ) -> MetaResult<HashMap<TableId, HashMap<SubscriptionId, u64>>> {
646 Ok(self
647 .catalog_controller
648 .get_mv_depended_subscriptions(database_id)
649 .await?
650 .into_iter()
651 .map(|(table_id, subscriptions)| {
652 (
653 table_id,
654 subscriptions
655 .into_iter()
656 .map(|(subscription_id, retention_time)| {
657 (subscription_id as SubscriptionId, retention_time)
658 })
659 .collect(),
660 )
661 })
662 .collect())
663 }
664
665 pub async fn get_job_max_parallelism(&self, job_id: JobId) -> MetaResult<usize> {
666 self.catalog_controller
667 .get_max_parallelism_by_id(job_id)
668 .await
669 }
670
671 pub async fn get_existing_job_resource_group(
672 &self,
673 streaming_job_id: JobId,
674 ) -> MetaResult<String> {
675 self.catalog_controller
676 .get_existing_job_resource_group(streaming_job_id)
677 .await
678 }
679
680 pub async fn get_database_resource_group(&self, database_id: DatabaseId) -> MetaResult<String> {
681 self.catalog_controller
682 .get_database_resource_group(database_id)
683 .await
684 }
685
686 pub fn cluster_id(&self) -> &ClusterId {
687 self.cluster_controller.cluster_id()
688 }
689
690 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
691 let rate_limits = self.catalog_controller.list_rate_limits().await?;
692 Ok(rate_limits)
693 }
694
695 pub async fn get_job_backfill_scan_types(
696 &self,
697 job_id: JobId,
698 ) -> MetaResult<HashMap<FragmentId, PbStreamScanType>> {
699 let backfill_types = self
700 .catalog_controller
701 .get_job_fragment_backfill_scan_type(job_id)
702 .await?;
703 Ok(backfill_types)
704 }
705}
706
707impl MetadataManager {
708 #[await_tree::instrument]
711 pub async fn wait_streaming_job_finished(
712 &self,
713 database_id: DatabaseId,
714 id: JobId,
715 ) -> MetaResult<NotificationVersion> {
716 tracing::debug!("wait_streaming_job_finished: {id:?}");
717 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
718 if mgr.streaming_job_is_finished(id).await? {
719 return Ok(self.catalog_controller.current_notification_version().await);
720 }
721 let (tx, rx) = oneshot::channel();
722
723 mgr.register_finish_notifier(database_id, id, tx);
724 drop(mgr);
725 rx.await
726 .map_err(|_| "no received reason".to_owned())
727 .and_then(|result| result)
728 .map_err(|reason| anyhow!("failed to wait streaming job finish: {}", reason).into())
729 }
730
731 pub(crate) async fn notify_finish_failed(&self, database_id: Option<DatabaseId>, err: String) {
732 let mut mgr = self.catalog_controller.get_inner_write_guard().await;
733 mgr.notify_finish_failed(database_id, err);
734 }
735}