1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::StreamingParallelism;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tokio::time::Instant;
34use tracing::{debug, info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::barrier::context::GlobalBarrierWorkerContextImpl;
38use crate::barrier::info::InflightStreamingJobInfo;
39use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
40use crate::manager::ActiveStreamingWorkerNodes;
41use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
42use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
43use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
44use crate::stream::{
45 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
46 RescheduleOptions, SourceChange, StreamFragmentGraph,
47};
48use crate::{MetaResult, model};
49
50impl GlobalBarrierWorkerContextImpl {
51 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
53 let database_id = database_id.map(|database_id| database_id.database_id as _);
54 self.metadata_manager
55 .catalog_controller
56 .clean_dirty_subscription(database_id)
57 .await?;
58 let dirty_associated_source_ids = self
59 .metadata_manager
60 .catalog_controller
61 .clean_dirty_creating_jobs(database_id)
62 .await?;
63 self.metadata_manager
64 .catalog_controller
65 .reset_refreshing_tables(database_id)
66 .await?;
67
68 self.source_manager
70 .apply_source_change(SourceChange::DropSource {
71 dropped_source_ids: dirty_associated_source_ids,
72 })
73 .await;
74
75 Ok(())
76 }
77
78 async fn purge_state_table_from_hummock(
79 &self,
80 all_state_table_ids: &HashSet<TableId>,
81 ) -> MetaResult<()> {
82 self.hummock_manager.purge(all_state_table_ids).await?;
83 Ok(())
84 }
85
86 async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
87 let mgr = &self.metadata_manager;
88 let job_info = mgr
89 .catalog_controller
90 .list_background_creating_jobs(false)
91 .await?;
92
93 try_join_all(
94 job_info
95 .into_iter()
96 .map(|(id, definition, _init_at)| async move {
97 let table_id = TableId::new(id as _);
98 let stream_job_fragments =
99 mgr.catalog_controller.get_job_fragments_by_id(id).await?;
100 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
101 Ok((definition, stream_job_fragments))
102 }),
103 )
104 .await
105 }
107
108 async fn resolve_graph_info(
112 &self,
113 database_id: Option<DatabaseId>,
114 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
115 let database_id = database_id.map(|database_id| database_id.database_id as _);
116 let all_actor_infos = self
117 .metadata_manager
118 .catalog_controller
119 .load_all_actors(database_id)
120 .await?;
121
122 Ok(all_actor_infos
123 .into_iter()
124 .map(|(loaded_database_id, job_fragment_infos)| {
125 if let Some(database_id) = database_id {
126 assert_eq!(database_id, loaded_database_id);
127 }
128 (
129 DatabaseId::new(loaded_database_id as _),
130 job_fragment_infos
131 .into_iter()
132 .map(|(job_id, fragment_infos)| {
133 let job_id = TableId::new(job_id as _);
134 (
135 job_id,
136 InflightStreamingJobInfo {
137 job_id,
138 fragment_infos: fragment_infos
139 .into_iter()
140 .map(|(fragment_id, info)| (fragment_id as _, info))
141 .collect(),
142 },
143 )
144 })
145 .collect(),
146 )
147 })
148 .collect())
149 }
150
151 #[expect(clippy::type_complexity)]
152 fn resolve_hummock_version_epochs(
153 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
154 version: &HummockVersion,
155 ) -> MetaResult<(
156 HashMap<TableId, u64>,
157 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
158 )> {
159 let table_committed_epoch: HashMap<_, _> = version
160 .state_table_info
161 .info()
162 .iter()
163 .map(|(table_id, info)| (*table_id, info.committed_epoch))
164 .collect();
165 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
166 Ok(*table_committed_epoch
167 .get(&table_id)
168 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
169 };
170 let mut min_downstream_committed_epochs = HashMap::new();
171 for (_, job) in background_jobs.values() {
172 let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
173 warn!(
175 "background job {} has no committed epoch, skip resolving epochs",
176 job.stream_job_id
177 );
178 continue;
179 };
180 if let (Some(snapshot_backfill_info), _) =
181 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
182 job.fragments()
183 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
184 )?
185 {
186 for (upstream_table, snapshot_epoch) in
187 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188 {
189 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
190 anyhow!(
191 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
192 job
193 )
194 })?;
195 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
196 match min_downstream_committed_epochs.entry(upstream_table) {
197 Entry::Occupied(entry) => {
198 let prev_min_epoch = entry.into_mut();
199 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
200 }
201 Entry::Vacant(entry) => {
202 entry.insert(pinned_epoch);
203 }
204 }
205 }
206 }
207 }
208 let mut log_epochs = HashMap::new();
209 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
210 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
211 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
212 Ordering::Less => {
213 return Err(anyhow!(
214 "downstream epoch {} later than upstream epoch {} of table {}",
215 downstream_committed_epoch,
216 upstream_committed_epoch,
217 upstream_table_id
218 )
219 .into());
220 }
221 Ordering::Equal => {
222 continue;
223 }
224 Ordering::Greater => {
225 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
226 {
227 let epochs = table_change_log
228 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
229 .map(|epoch_log| {
230 (
231 epoch_log.non_checkpoint_epochs.clone(),
232 epoch_log.checkpoint_epoch,
233 )
234 })
235 .collect_vec();
236 let first_epochs = epochs.first();
237 if let Some((_, first_checkpoint_epoch)) = &first_epochs
238 && *first_checkpoint_epoch == downstream_committed_epoch
239 {
240 } else {
241 return Err(anyhow!(
242 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
243 epochs, upstream_table_id, downstream_committed_epoch).into()
244 );
245 }
246 log_epochs
247 .try_insert(upstream_table_id, epochs)
248 .expect("non-duplicated");
249 } else {
250 return Err(anyhow!(
251 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
252 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
253 );
254 }
255 }
256 }
257 }
258 Ok((table_committed_epoch, log_epochs))
259 }
260
261 async fn recovery_table_with_upstream_sinks(
266 &self,
267 inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
268 ) -> MetaResult<()> {
269 let mut jobs = inflight_jobs.values_mut().try_fold(
270 HashMap::new(),
271 |mut acc, table_map| -> MetaResult<_> {
272 for (tid, job) in table_map {
273 if acc.insert(tid.table_id, job).is_some() {
274 return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
275 }
276 }
277 Ok(acc)
278 },
279 )?;
280 let job_ids = jobs.keys().cloned().collect_vec();
281 let tables = self
283 .metadata_manager
284 .catalog_controller
285 .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
286 .await?;
287 for table in tables {
288 assert_eq!(table.table_type(), PbTableType::Table);
289 let fragments = jobs.get_mut(&table.id).unwrap();
290 let mut target_fragment_id = None;
291 for fragment in fragments.fragment_infos.values() {
292 let mut is_target_fragment = false;
293 visit_stream_node_cont(&fragment.nodes, |node| {
294 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
295 is_target_fragment = true;
296 false
297 } else {
298 true
299 }
300 });
301 if is_target_fragment {
302 target_fragment_id = Some(fragment.fragment_id);
303 break;
304 }
305 }
306 let Some(target_fragment_id) = target_fragment_id else {
307 tracing::debug!(
308 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
309 table.id
310 );
311 continue;
312 };
313 let target_fragment = fragments
314 .fragment_infos
315 .get_mut(&target_fragment_id)
316 .unwrap();
317 let upstream_infos = self
318 .metadata_manager
319 .catalog_controller
320 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
321 .await?;
322 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
323 }
324
325 Ok(())
326 }
327
328 pub(super) async fn reload_runtime_info_impl(
329 &self,
330 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
331 {
332 {
333 {
334 self.clean_dirty_streaming_jobs(None)
335 .await
336 .context("clean dirty streaming jobs")?;
337
338 tracing::info!("recovering background job progress");
340 let background_jobs = {
341 let jobs = self
342 .list_background_job_progress()
343 .await
344 .context("recover background job progress should not fail")?;
345 let mut background_jobs = HashMap::new();
346 for (definition, stream_job_fragments) in jobs {
347 if stream_job_fragments
348 .tracking_progress_actor_ids()
349 .is_empty()
350 {
351 self.metadata_manager
353 .catalog_controller
354 .finish_streaming_job(
355 stream_job_fragments.stream_job_id().table_id as _,
356 )
357 .await?;
358 } else {
359 background_jobs
360 .try_insert(
361 stream_job_fragments.stream_job_id(),
362 (definition, stream_job_fragments),
363 )
364 .expect("non-duplicate");
365 }
366 }
367 background_jobs
368 };
369
370 tracing::info!("recovered background job progress");
371
372 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
374 self.metadata_manager
375 .catalog_controller
376 .cleanup_dropped_tables()
377 .await;
378
379 let mut active_streaming_nodes =
380 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
381 .await?;
382
383 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
384 info!(
385 "background streaming jobs: {:?} total {}",
386 background_streaming_jobs,
387 background_streaming_jobs.len()
388 );
389
390 let unreschedulable_jobs = {
391 let mut unreschedulable_jobs = HashSet::new();
392
393 for job_id in background_streaming_jobs {
394 let scan_types = self
395 .metadata_manager
396 .get_job_backfill_scan_types(&job_id)
397 .await?;
398
399 if scan_types
400 .values()
401 .any(|scan_type| !scan_type.is_reschedulable())
402 {
403 unreschedulable_jobs.insert(job_id);
404 }
405 }
406
407 unreschedulable_jobs
408 };
409
410 if !unreschedulable_jobs.is_empty() {
411 tracing::info!(
412 "unreschedulable background jobs: {:?}",
413 unreschedulable_jobs
414 );
415 }
416
417 let mut info = if !self.env.opts.disable_automatic_parallelism_control
422 && unreschedulable_jobs.is_empty()
423 {
424 info!("trigger offline scaling");
425 self.scale_actors(&active_streaming_nodes)
426 .await
427 .inspect_err(|err| {
428 warn!(error = %err.as_report(), "scale actors failed");
429 })?;
430
431 self.resolve_graph_info(None).await.inspect_err(|err| {
432 warn!(error = %err.as_report(), "resolve actor info failed");
433 })?
434 } else {
435 info!("trigger actor migration");
436 self.migrate_actors(&mut active_streaming_nodes)
438 .await
439 .inspect_err(|err| {
440 warn!(error = %err.as_report(), "migrate actors failed");
441 })?
442 };
443
444 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
445 if !dropped_table_ids.is_empty() {
446 self.metadata_manager
447 .catalog_controller
448 .complete_dropped_tables(
449 dropped_table_ids.into_iter().map(|id| id.table_id as _),
450 )
451 .await;
452 info = self.resolve_graph_info(None).await.inspect_err(|err| {
453 warn!(error = %err.as_report(), "resolve actor info failed");
454 })?
455 }
456
457 self.recovery_table_with_upstream_sinks(&mut info).await?;
458
459 let info = info;
460
461 self.purge_state_table_from_hummock(
462 &info
463 .values()
464 .flatten()
465 .flat_map(|(_, job)| job.existing_table_ids())
466 .collect(),
467 )
468 .await
469 .context("purge state table from hummock")?;
470
471 let (state_table_committed_epochs, state_table_log_epochs) = self
472 .hummock_manager
473 .on_current_version(|version| {
474 Self::resolve_hummock_version_epochs(&background_jobs, version)
475 })
476 .await?;
477
478 let subscription_infos = self
479 .metadata_manager
480 .get_mv_depended_subscriptions(None)
481 .await?
482 .into_iter()
483 .map(|(database_id, mv_depended_subscriptions)| {
484 (
485 database_id,
486 InflightSubscriptionInfo {
487 mv_depended_subscriptions,
488 },
489 )
490 })
491 .collect();
492
493 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
495 warn!(error = %err.as_report(), "update actors failed");
496 })?;
497
498 let fragment_relations = self
499 .metadata_manager
500 .catalog_controller
501 .get_fragment_downstream_relations(
502 info.values()
503 .flatten()
504 .flat_map(|(_, job)| job.fragment_infos())
505 .map(|fragment| fragment.fragment_id as _)
506 .collect(),
507 )
508 .await?;
509
510 let background_jobs = {
511 let jobs = self
512 .list_background_job_progress()
513 .await
514 .context("recover background job progress should not fail")?;
515 let mut background_jobs = HashMap::new();
516 for (definition, stream_job_fragments) in jobs {
517 background_jobs
518 .try_insert(
519 stream_job_fragments.stream_job_id(),
520 (definition, stream_job_fragments),
521 )
522 .expect("non-duplicate");
523 }
524 background_jobs
525 };
526
527 let database_infos = self
528 .metadata_manager
529 .catalog_controller
530 .list_databases()
531 .await?;
532
533 let mut source_splits = HashMap::new();
535 for (_, job) in info.values().flatten() {
536 for fragment in job.fragment_infos.values() {
537 for (actor_id, info) in &fragment.actors {
538 source_splits.insert(*actor_id, info.splits.clone());
539 }
540 }
541 }
542
543 let cdc_table_backfill_actors = self
544 .metadata_manager
545 .catalog_controller
546 .cdc_table_backfill_actor_ids()
547 .await?;
548 let cdc_table_ids = cdc_table_backfill_actors
549 .keys()
550 .cloned()
551 .collect::<Vec<_>>();
552 let cdc_table_snapshot_split_assignment =
553 assign_cdc_table_snapshot_splits_pairs(
554 cdc_table_backfill_actors,
555 self.env.meta_store_ref(),
556 self.env.cdc_table_backfill_tracker.completed_job_ids(),
557 )
558 .await?;
559 let cdc_table_snapshot_split_assignment =
560 if cdc_table_snapshot_split_assignment.is_empty() {
561 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
562 } else {
563 let generation = self
564 .env
565 .cdc_table_backfill_tracker
566 .next_generation(cdc_table_ids.into_iter());
567 CdcTableSnapshotSplitAssignmentWithGeneration::new(
568 cdc_table_snapshot_split_assignment,
569 generation,
570 )
571 };
572 Ok(BarrierWorkerRuntimeInfoSnapshot {
573 active_streaming_nodes,
574 database_job_infos: info,
575 state_table_committed_epochs,
576 state_table_log_epochs,
577 subscription_infos,
578 stream_actors,
579 fragment_relations,
580 source_splits,
581 background_jobs,
582 hummock_version_stats: self.hummock_manager.get_version_stats().await,
583 database_infos,
584 cdc_table_snapshot_split_assignment,
585 })
586 }
587 }
588 }
589 }
590
591 pub(super) async fn reload_database_runtime_info_impl(
592 &self,
593 database_id: DatabaseId,
594 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
595 self.clean_dirty_streaming_jobs(Some(database_id))
596 .await
597 .context("clean dirty streaming jobs")?;
598
599 tracing::info!(
601 ?database_id,
602 "recovering background job progress of database"
603 );
604 let background_jobs = self
605 .list_background_job_progress()
606 .await
607 .context("recover background job progress of database should not fail")?;
608 tracing::info!(?database_id, "recovered background job progress");
609
610 let dropped_table_ids = self
612 .scheduled_barriers
613 .pre_apply_drop_cancel(Some(database_id));
614 self.metadata_manager
615 .catalog_controller
616 .complete_dropped_tables(dropped_table_ids.into_iter().map(|id| id.table_id as _))
617 .await;
618
619 let mut info = self
620 .resolve_graph_info(Some(database_id))
621 .await
622 .inspect_err(|err| {
623 warn!(error = %err.as_report(), "resolve actor info failed");
624 })?;
625
626 self.recovery_table_with_upstream_sinks(&mut info).await?;
627
628 assert!(info.len() <= 1);
629 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
630 assert_eq!(loaded_database_id, database_id);
631 info
632 }) else {
633 return Ok(None);
634 };
635
636 let background_jobs = {
637 let jobs = background_jobs;
638 let mut background_jobs = HashMap::new();
639 for (definition, stream_job_fragments) in jobs {
640 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
641 continue;
642 }
643 if stream_job_fragments
644 .tracking_progress_actor_ids()
645 .is_empty()
646 {
647 self.metadata_manager
649 .catalog_controller
650 .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
651 .await?;
652 } else {
653 background_jobs
654 .try_insert(
655 stream_job_fragments.stream_job_id(),
656 (definition, stream_job_fragments),
657 )
658 .expect("non-duplicate");
659 }
660 }
661 background_jobs
662 };
663
664 let (state_table_committed_epochs, state_table_log_epochs) = self
665 .hummock_manager
666 .on_current_version(|version| {
667 Self::resolve_hummock_version_epochs(&background_jobs, version)
668 })
669 .await?;
670
671 let subscription_infos = self
672 .metadata_manager
673 .get_mv_depended_subscriptions(Some(database_id))
674 .await?;
675 assert!(subscription_infos.len() <= 1);
676 let mv_depended_subscriptions = subscription_infos
677 .into_iter()
678 .next()
679 .map(|(loaded_database_id, subscriptions)| {
680 assert_eq!(loaded_database_id, database_id);
681 subscriptions
682 })
683 .unwrap_or_default();
684 let subscription_info = InflightSubscriptionInfo {
685 mv_depended_subscriptions,
686 };
687
688 let fragment_relations = self
689 .metadata_manager
690 .catalog_controller
691 .get_fragment_downstream_relations(
692 info.values()
693 .flatten()
694 .map(|fragment| fragment.fragment_id as _)
695 .collect(),
696 )
697 .await?;
698
699 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
701 warn!(error = %err.as_report(), "update actors failed");
702 })?;
703
704 let mut source_splits = HashMap::new();
706 for fragment in info.values().flatten() {
707 for (actor_id, info) in &fragment.actors {
708 source_splits.insert(*actor_id, info.splits.clone());
709 }
710 }
711
712 let cdc_table_backfill_actors = self
713 .metadata_manager
714 .catalog_controller
715 .cdc_table_backfill_actor_ids()
716 .await?;
717 let cdc_table_ids = cdc_table_backfill_actors
718 .keys()
719 .cloned()
720 .collect::<Vec<_>>();
721 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
722 cdc_table_backfill_actors,
723 self.env.meta_store_ref(),
724 self.env.cdc_table_backfill_tracker.completed_job_ids(),
725 )
726 .await?;
727 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
728 {
729 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
730 } else {
731 CdcTableSnapshotSplitAssignmentWithGeneration::new(
732 cdc_table_snapshot_split_assignment,
733 self.env
734 .cdc_table_backfill_tracker
735 .next_generation(cdc_table_ids.into_iter()),
736 )
737 };
738 Ok(Some(DatabaseRuntimeInfoSnapshot {
739 job_infos: info,
740 state_table_committed_epochs,
741 state_table_log_epochs,
742 subscription_info,
743 stream_actors,
744 fragment_relations,
745 source_splits,
746 background_jobs,
747 cdc_table_snapshot_split_assignment,
748 }))
749 }
750}
751
752impl GlobalBarrierWorkerContextImpl {
753 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
755
756 async fn migrate_actors(
758 &self,
759 active_nodes: &mut ActiveStreamingWorkerNodes,
760 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
761 let mgr = &self.metadata_manager;
762
763 let all_inuse_worker_slots: HashSet<_> = mgr
765 .catalog_controller
766 .all_inuse_worker_slots()
767 .await?
768 .into_iter()
769 .collect();
770
771 let active_worker_slots: HashSet<_> = active_nodes
772 .current()
773 .values()
774 .flat_map(|node| {
775 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
776 })
777 .collect();
778
779 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
780 .difference(&active_worker_slots)
781 .cloned()
782 .collect();
783
784 if expired_worker_slots.is_empty() {
785 info!("no expired worker slots, skipping.");
786 return self.resolve_graph_info(None).await;
787 }
788
789 info!("start migrate actors.");
790 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
791 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
792
793 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
794 .intersection(&active_worker_slots)
795 .cloned()
796 .collect();
797
798 let start = Instant::now();
799 let mut plan = HashMap::new();
800 'discovery: while !to_migrate_worker_slots.is_empty() {
801 let mut new_worker_slots = active_nodes
802 .current()
803 .values()
804 .flat_map(|worker| {
805 (0..worker.compute_node_parallelism())
806 .map(move |i| WorkerSlotId::new(worker.id, i as _))
807 })
808 .collect_vec();
809
810 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
811 let to_migration_size = to_migrate_worker_slots.len();
812 let mut available_size = new_worker_slots.len();
813
814 if available_size < to_migration_size
815 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
816 {
817 let mut factor = 2;
818
819 while available_size < to_migration_size {
820 let mut extended_worker_slots = active_nodes
821 .current()
822 .values()
823 .flat_map(|worker| {
824 (0..worker.compute_node_parallelism() * factor)
825 .map(move |i| WorkerSlotId::new(worker.id, i as _))
826 })
827 .collect_vec();
828
829 extended_worker_slots
830 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
831
832 extended_worker_slots.sort_by(|a, b| {
833 a.slot_idx()
834 .cmp(&b.slot_idx())
835 .then(a.worker_id().cmp(&b.worker_id()))
836 });
837
838 available_size = extended_worker_slots.len();
839 new_worker_slots = extended_worker_slots;
840
841 factor *= 2;
842 }
843
844 tracing::info!(
845 "migration timed out, extending worker slots to {:?} by factor {}",
846 new_worker_slots,
847 factor,
848 );
849 }
850
851 if !new_worker_slots.is_empty() {
852 debug!("new worker slots found: {:#?}", new_worker_slots);
853 for target_worker_slot in new_worker_slots {
854 if let Some(from) = to_migrate_worker_slots.pop() {
855 debug!(
856 "plan to migrate from worker slot {} to {}",
857 from, target_worker_slot
858 );
859 inuse_worker_slots.insert(target_worker_slot);
860 plan.insert(from, target_worker_slot);
861 } else {
862 break 'discovery;
863 }
864 }
865 }
866
867 if to_migrate_worker_slots.is_empty() {
868 break;
869 }
870
871 let changed = active_nodes
873 .wait_changed(
874 Duration::from_millis(5000),
875 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
876 |active_nodes| {
877 let current_nodes = active_nodes
878 .current()
879 .values()
880 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
881 .collect_vec();
882 warn!(
883 current_nodes = ?current_nodes,
884 "waiting for new workers to join, elapsed: {}s",
885 start.elapsed().as_secs()
886 );
887 },
888 )
889 .await;
890 warn!(?changed, "get worker changed or timed out. Retry migrate");
891 }
892
893 info!("migration plan {:?}", plan);
894
895 mgr.catalog_controller.migrate_actors(plan).await?;
896
897 info!("migrate actors succeed.");
898
899 self.resolve_graph_info(None).await
900 }
901
902 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
903 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
904 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
905 };
906
907 match self.scale_controller.integrity_check().await {
908 Ok(_) => {
909 info!("integrity check passed");
910 }
911 Err(e) => {
912 return Err(anyhow!(e).context("integrity check failed").into());
913 }
914 }
915
916 let mgr = &self.metadata_manager;
917
918 debug!("start resetting actors distribution");
919
920 let available_workers: HashMap<_, _> = active_nodes
921 .current()
922 .values()
923 .filter(|worker| worker.is_streaming_schedulable())
924 .map(|worker| (worker.id, worker.clone()))
925 .collect();
926
927 info!(
928 "target worker ids for offline scaling: {:?}",
929 available_workers
930 );
931
932 let available_parallelism = active_nodes
933 .current()
934 .values()
935 .map(|worker_node| worker_node.compute_node_parallelism())
936 .sum();
937
938 let mut table_parallelisms = HashMap::new();
939
940 let reschedule_targets: HashMap<_, _> = {
941 let streaming_parallelisms = mgr
942 .catalog_controller
943 .get_all_streaming_parallelisms()
944 .await?;
945
946 let mut result = HashMap::new();
947
948 for (object_id, streaming_parallelism) in streaming_parallelisms {
949 let actual_fragment_parallelism = mgr
950 .catalog_controller
951 .get_actual_job_fragment_parallelism(object_id)
952 .await?;
953
954 let table_parallelism = match streaming_parallelism {
955 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
956 StreamingParallelism::Custom => model::TableParallelism::Custom,
957 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
958 };
959
960 let target_parallelism = Self::derive_target_parallelism(
961 available_parallelism,
962 table_parallelism,
963 actual_fragment_parallelism,
964 self.env.opts.default_parallelism,
965 );
966
967 if target_parallelism != table_parallelism {
968 tracing::info!(
969 "resetting table {} parallelism from {:?} to {:?}",
970 object_id,
971 table_parallelism,
972 target_parallelism
973 );
974 }
975
976 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
977
978 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
979
980 result.insert(
981 object_id as u32,
982 JobRescheduleTarget {
983 parallelism: parallelism_change,
984 resource_group: JobResourceGroupTarget::Keep,
985 },
986 );
987 }
988
989 result
990 };
991
992 info!(
993 "target table parallelisms for offline scaling: {:?}",
994 reschedule_targets
995 );
996
997 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
998
999 for chunk in reschedule_targets
1000 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
1001 .map(|c| c.to_vec())
1002 {
1003 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
1004
1005 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
1006
1007 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
1008
1009 let plan = self
1010 .scale_controller
1011 .generate_job_reschedule_plan(
1012 JobReschedulePolicy {
1013 targets: local_reschedule_targets,
1014 },
1015 false,
1016 )
1017 .await?;
1018
1019 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
1021 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
1022 continue;
1023 };
1024
1025 let mut compared_table_parallelisms = table_parallelisms.clone();
1026
1027 let reschedule_fragment = if plan.reschedules.is_empty() {
1029 HashMap::new()
1030 } else {
1031 self.scale_controller
1032 .analyze_reschedule_plan(
1033 plan.reschedules,
1034 RescheduleOptions {
1035 resolve_no_shuffle_upstream: true,
1036 skip_create_new_actors: true,
1037 },
1038 &mut compared_table_parallelisms,
1039 )
1040 .await?
1041 };
1042
1043 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
1045
1046 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
1047
1048 if let Err(e) = self
1049 .scale_controller
1050 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
1051 .await
1052 {
1053 tracing::error!(
1054 error = %e.as_report(),
1055 "failed to apply reschedule for offline scaling in recovery",
1056 );
1057
1058 return Err(e);
1059 }
1060
1061 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
1062 }
1063
1064 info!("scaling actors succeed.");
1065 Ok(())
1066 }
1067
1068 fn derive_target_parallelism(
1076 available_parallelism: usize,
1077 assigned_parallelism: TableParallelism,
1078 actual_fragment_parallelism: Option<usize>,
1079 default_parallelism: DefaultParallelism,
1080 ) -> TableParallelism {
1081 match assigned_parallelism {
1082 TableParallelism::Custom => {
1083 if let Some(fragment_parallelism) = actual_fragment_parallelism {
1084 if fragment_parallelism >= available_parallelism {
1085 TableParallelism::Adaptive
1086 } else {
1087 TableParallelism::Fixed(fragment_parallelism)
1088 }
1089 } else {
1090 TableParallelism::Adaptive
1091 }
1092 }
1093 TableParallelism::Adaptive => {
1094 match (default_parallelism, actual_fragment_parallelism) {
1095 (DefaultParallelism::Default(n), Some(fragment_parallelism))
1096 if fragment_parallelism == n.get() =>
1097 {
1098 TableParallelism::Fixed(fragment_parallelism)
1099 }
1100 _ => TableParallelism::Adaptive,
1101 }
1102 }
1103 _ => assigned_parallelism,
1104 }
1105 }
1106
1107 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
1109 self.metadata_manager.all_active_actors().await
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use std::num::NonZeroUsize;
1116
1117 use super::*;
1118 #[test]
1119 fn test_derive_target_parallelism() {
1120 assert_eq!(
1122 TableParallelism::Fixed(5),
1123 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1124 10,
1125 TableParallelism::Custom,
1126 Some(5),
1127 DefaultParallelism::Full,
1128 )
1129 );
1130
1131 assert_eq!(
1133 TableParallelism::Adaptive,
1134 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1135 10,
1136 TableParallelism::Custom,
1137 Some(10),
1138 DefaultParallelism::Full,
1139 )
1140 );
1141
1142 assert_eq!(
1144 TableParallelism::Adaptive,
1145 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1146 10,
1147 TableParallelism::Custom,
1148 Some(11),
1149 DefaultParallelism::Full,
1150 )
1151 );
1152
1153 assert_eq!(
1155 TableParallelism::Adaptive,
1156 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1157 10,
1158 TableParallelism::Custom,
1159 None,
1160 DefaultParallelism::Full,
1161 )
1162 );
1163
1164 assert_eq!(
1166 TableParallelism::Adaptive,
1167 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1168 10,
1169 TableParallelism::Adaptive,
1170 None,
1171 DefaultParallelism::Full,
1172 )
1173 );
1174
1175 assert_eq!(
1177 TableParallelism::Fixed(5),
1178 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1179 10,
1180 TableParallelism::Adaptive,
1181 Some(5),
1182 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1183 )
1184 );
1185
1186 assert_eq!(
1188 TableParallelism::Adaptive,
1189 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1190 10,
1191 TableParallelism::Adaptive,
1192 Some(6),
1193 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1194 )
1195 );
1196 }
1197}