1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::StreamingParallelism;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tokio::time::Instant;
34use tracing::{debug, info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::barrier::context::GlobalBarrierWorkerContextImpl;
38use crate::barrier::info::InflightStreamingJobInfo;
39use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
40use crate::manager::ActiveStreamingWorkerNodes;
41use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
42use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
43use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
44use crate::stream::{
45 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
46 RescheduleOptions, SourceChange, StreamFragmentGraph,
47};
48use crate::{MetaResult, model};
49
50impl GlobalBarrierWorkerContextImpl {
51 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
53 let database_id = database_id.map(|database_id| database_id.database_id as _);
54 self.metadata_manager
55 .catalog_controller
56 .clean_dirty_subscription(database_id)
57 .await?;
58 let dirty_associated_source_ids = self
59 .metadata_manager
60 .catalog_controller
61 .clean_dirty_creating_jobs(database_id)
62 .await?;
63 self.metadata_manager
64 .catalog_controller
65 .reset_refreshing_tables(database_id)
66 .await?;
67
68 self.source_manager
70 .apply_source_change(SourceChange::DropSource {
71 dropped_source_ids: dirty_associated_source_ids,
72 })
73 .await;
74
75 Ok(())
76 }
77
78 async fn purge_state_table_from_hummock(
79 &self,
80 all_state_table_ids: &HashSet<TableId>,
81 ) -> MetaResult<()> {
82 self.hummock_manager.purge(all_state_table_ids).await?;
83 Ok(())
84 }
85
86 async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
87 let mgr = &self.metadata_manager;
88 let job_info = mgr
89 .catalog_controller
90 .list_background_creating_jobs(false)
91 .await?;
92
93 try_join_all(
94 job_info
95 .into_iter()
96 .map(|(id, definition, _init_at)| async move {
97 let table_id = TableId::new(id as _);
98 let stream_job_fragments =
99 mgr.catalog_controller.get_job_fragments_by_id(id).await?;
100 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
101 Ok((definition, stream_job_fragments))
102 }),
103 )
104 .await
105 }
107
108 async fn resolve_graph_info(
112 &self,
113 database_id: Option<DatabaseId>,
114 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
115 let database_id = database_id.map(|database_id| database_id.database_id as _);
116 let all_actor_infos = self
117 .metadata_manager
118 .catalog_controller
119 .load_all_actors(database_id)
120 .await?;
121
122 Ok(all_actor_infos
123 .into_iter()
124 .map(|(loaded_database_id, job_fragment_infos)| {
125 if let Some(database_id) = database_id {
126 assert_eq!(database_id, loaded_database_id);
127 }
128 (
129 DatabaseId::new(loaded_database_id as _),
130 job_fragment_infos
131 .into_iter()
132 .map(|(job_id, fragment_infos)| {
133 let job_id = TableId::new(job_id as _);
134 (
135 job_id,
136 InflightStreamingJobInfo {
137 job_id,
138 fragment_infos: fragment_infos
139 .into_iter()
140 .map(|(fragment_id, info)| (fragment_id as _, info))
141 .collect(),
142 },
143 )
144 })
145 .collect(),
146 )
147 })
148 .collect())
149 }
150
151 #[expect(clippy::type_complexity)]
152 fn resolve_hummock_version_epochs(
153 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
154 version: &HummockVersion,
155 ) -> MetaResult<(
156 HashMap<TableId, u64>,
157 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
158 )> {
159 let table_committed_epoch: HashMap<_, _> = version
160 .state_table_info
161 .info()
162 .iter()
163 .map(|(table_id, info)| (*table_id, info.committed_epoch))
164 .collect();
165 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
166 Ok(*table_committed_epoch
167 .get(&table_id)
168 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
169 };
170 let mut min_downstream_committed_epochs = HashMap::new();
171 for (_, job) in background_jobs.values() {
172 let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
173 warn!(
175 "background job {} has no committed epoch, skip resolving epochs",
176 job.stream_job_id
177 );
178 continue;
179 };
180 if let (Some(snapshot_backfill_info), _) =
181 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
182 job.fragments()
183 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
184 )?
185 {
186 for (upstream_table, snapshot_epoch) in
187 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188 {
189 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
190 anyhow!(
191 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
192 job
193 )
194 })?;
195 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
196 match min_downstream_committed_epochs.entry(upstream_table) {
197 Entry::Occupied(entry) => {
198 let prev_min_epoch = entry.into_mut();
199 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
200 }
201 Entry::Vacant(entry) => {
202 entry.insert(pinned_epoch);
203 }
204 }
205 }
206 }
207 }
208 let mut log_epochs = HashMap::new();
209 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
210 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
211 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
212 Ordering::Less => {
213 return Err(anyhow!(
214 "downstream epoch {} later than upstream epoch {} of table {}",
215 downstream_committed_epoch,
216 upstream_committed_epoch,
217 upstream_table_id
218 )
219 .into());
220 }
221 Ordering::Equal => {
222 continue;
223 }
224 Ordering::Greater => {
225 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
226 {
227 let epochs = table_change_log
228 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
229 .map(|epoch_log| {
230 (
231 epoch_log.non_checkpoint_epochs.clone(),
232 epoch_log.checkpoint_epoch,
233 )
234 })
235 .collect_vec();
236 let first_epochs = epochs.first();
237 if let Some((_, first_checkpoint_epoch)) = &first_epochs
238 && *first_checkpoint_epoch == downstream_committed_epoch
239 {
240 } else {
241 return Err(anyhow!(
242 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
243 epochs, upstream_table_id, downstream_committed_epoch).into()
244 );
245 }
246 log_epochs
247 .try_insert(upstream_table_id, epochs)
248 .expect("non-duplicated");
249 } else {
250 return Err(anyhow!(
251 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
252 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
253 );
254 }
255 }
256 }
257 }
258 Ok((table_committed_epoch, log_epochs))
259 }
260
261 async fn recovery_table_with_upstream_sinks(
266 &self,
267 inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
268 ) -> MetaResult<()> {
269 let mut jobs = inflight_jobs.values_mut().try_fold(
270 HashMap::new(),
271 |mut acc, table_map| -> MetaResult<_> {
272 for (tid, job) in table_map {
273 if acc.insert(tid.table_id, job).is_some() {
274 return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
275 }
276 }
277 Ok(acc)
278 },
279 )?;
280 let job_ids = jobs.keys().cloned().collect_vec();
281 let tables = self
283 .metadata_manager
284 .catalog_controller
285 .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
286 .await?;
287 for table in tables {
288 assert_eq!(table.table_type(), PbTableType::Table);
289 let fragments = jobs.get_mut(&table.id).unwrap();
290 let mut target_fragment_id = None;
291 for fragment in fragments.fragment_infos.values() {
292 let mut is_target_fragment = false;
293 visit_stream_node_cont(&fragment.nodes, |node| {
294 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
295 is_target_fragment = true;
296 false
297 } else {
298 true
299 }
300 });
301 if is_target_fragment {
302 target_fragment_id = Some(fragment.fragment_id);
303 break;
304 }
305 }
306 let Some(target_fragment_id) = target_fragment_id else {
307 tracing::debug!(
308 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
309 table.id
310 );
311 continue;
312 };
313 let target_fragment = fragments
314 .fragment_infos
315 .get_mut(&target_fragment_id)
316 .unwrap();
317 let upstream_infos = self
318 .metadata_manager
319 .catalog_controller
320 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
321 .await?;
322 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
323 }
324
325 Ok(())
326 }
327
328 pub(super) async fn reload_runtime_info_impl(
329 &self,
330 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
331 {
332 {
333 {
334 self.clean_dirty_streaming_jobs(None)
335 .await
336 .context("clean dirty streaming jobs")?;
337
338 tracing::info!("recovering background job progress");
340 let background_jobs = {
341 let jobs = self
342 .list_background_job_progress()
343 .await
344 .context("recover background job progress should not fail")?;
345 let mut background_jobs = HashMap::new();
346 for (definition, stream_job_fragments) in jobs {
347 if stream_job_fragments
348 .tracking_progress_actor_ids()
349 .is_empty()
350 {
351 self.metadata_manager
353 .catalog_controller
354 .finish_streaming_job(
355 stream_job_fragments.stream_job_id().table_id as _,
356 )
357 .await?;
358 } else {
359 background_jobs
360 .try_insert(
361 stream_job_fragments.stream_job_id(),
362 (definition, stream_job_fragments),
363 )
364 .expect("non-duplicate");
365 }
366 }
367 background_jobs
368 };
369
370 tracing::info!("recovered background job progress");
371
372 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
374
375 let mut active_streaming_nodes =
376 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
377 .await?;
378
379 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
380 info!(
381 "background streaming jobs: {:?} total {}",
382 background_streaming_jobs,
383 background_streaming_jobs.len()
384 );
385
386 let unreschedulable_jobs = {
387 let mut unreschedulable_jobs = HashSet::new();
388
389 for job_id in background_streaming_jobs {
390 let scan_types = self
391 .metadata_manager
392 .get_job_backfill_scan_types(&job_id)
393 .await?;
394
395 if scan_types
396 .values()
397 .any(|scan_type| !scan_type.is_reschedulable())
398 {
399 unreschedulable_jobs.insert(job_id);
400 }
401 }
402
403 unreschedulable_jobs
404 };
405
406 if !unreschedulable_jobs.is_empty() {
407 tracing::info!(
408 "unreschedulable background jobs: {:?}",
409 unreschedulable_jobs
410 );
411 }
412
413 let mut info = if !self.env.opts.disable_automatic_parallelism_control
418 && unreschedulable_jobs.is_empty()
419 {
420 info!("trigger offline scaling");
421 self.scale_actors(&active_streaming_nodes)
422 .await
423 .inspect_err(|err| {
424 warn!(error = %err.as_report(), "scale actors failed");
425 })?;
426
427 self.resolve_graph_info(None).await.inspect_err(|err| {
428 warn!(error = %err.as_report(), "resolve actor info failed");
429 })?
430 } else {
431 info!("trigger actor migration");
432 self.migrate_actors(&mut active_streaming_nodes)
434 .await
435 .inspect_err(|err| {
436 warn!(error = %err.as_report(), "migrate actors failed");
437 })?
438 };
439
440 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
441 info = self.resolve_graph_info(None).await.inspect_err(|err| {
442 warn!(error = %err.as_report(), "resolve actor info failed");
443 })?
444 }
445
446 self.recovery_table_with_upstream_sinks(&mut info).await?;
447
448 let info = info;
449
450 self.purge_state_table_from_hummock(
451 &info
452 .values()
453 .flatten()
454 .flat_map(|(_, job)| job.existing_table_ids())
455 .collect(),
456 )
457 .await
458 .context("purge state table from hummock")?;
459
460 let (state_table_committed_epochs, state_table_log_epochs) = self
461 .hummock_manager
462 .on_current_version(|version| {
463 Self::resolve_hummock_version_epochs(&background_jobs, version)
464 })
465 .await?;
466
467 let subscription_infos = self
468 .metadata_manager
469 .get_mv_depended_subscriptions(None)
470 .await?
471 .into_iter()
472 .map(|(database_id, mv_depended_subscriptions)| {
473 (
474 database_id,
475 InflightSubscriptionInfo {
476 mv_depended_subscriptions,
477 },
478 )
479 })
480 .collect();
481
482 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
484 warn!(error = %err.as_report(), "update actors failed");
485 })?;
486
487 let fragment_relations = self
488 .metadata_manager
489 .catalog_controller
490 .get_fragment_downstream_relations(
491 info.values()
492 .flatten()
493 .flat_map(|(_, job)| job.fragment_infos())
494 .map(|fragment| fragment.fragment_id as _)
495 .collect(),
496 )
497 .await?;
498
499 let background_jobs = {
500 let jobs = self
501 .list_background_job_progress()
502 .await
503 .context("recover background job progress should not fail")?;
504 let mut background_jobs = HashMap::new();
505 for (definition, stream_job_fragments) in jobs {
506 background_jobs
507 .try_insert(
508 stream_job_fragments.stream_job_id(),
509 (definition, stream_job_fragments),
510 )
511 .expect("non-duplicate");
512 }
513 background_jobs
514 };
515
516 let database_infos = self
517 .metadata_manager
518 .catalog_controller
519 .list_databases()
520 .await?;
521
522 let mut source_splits = HashMap::new();
524 for (_, job) in info.values().flatten() {
525 for fragment in job.fragment_infos.values() {
526 for (actor_id, info) in &fragment.actors {
527 source_splits.insert(*actor_id, info.splits.clone());
528 }
529 }
530 }
531
532 let cdc_table_backfill_actors = self
533 .metadata_manager
534 .catalog_controller
535 .cdc_table_backfill_actor_ids()
536 .await?;
537 let cdc_table_ids = cdc_table_backfill_actors
538 .keys()
539 .cloned()
540 .collect::<Vec<_>>();
541 let cdc_table_snapshot_split_assignment =
542 assign_cdc_table_snapshot_splits_pairs(
543 cdc_table_backfill_actors,
544 self.env.meta_store_ref(),
545 self.env.cdc_table_backfill_tracker.completed_job_ids(),
546 )
547 .await?;
548 let cdc_table_snapshot_split_assignment =
549 if cdc_table_snapshot_split_assignment.is_empty() {
550 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
551 } else {
552 let generation = self
553 .env
554 .cdc_table_backfill_tracker
555 .next_generation(cdc_table_ids.into_iter());
556 CdcTableSnapshotSplitAssignmentWithGeneration::new(
557 cdc_table_snapshot_split_assignment,
558 generation,
559 )
560 };
561 Ok(BarrierWorkerRuntimeInfoSnapshot {
562 active_streaming_nodes,
563 database_job_infos: info,
564 state_table_committed_epochs,
565 state_table_log_epochs,
566 subscription_infos,
567 stream_actors,
568 fragment_relations,
569 source_splits,
570 background_jobs,
571 hummock_version_stats: self.hummock_manager.get_version_stats().await,
572 database_infos,
573 cdc_table_snapshot_split_assignment,
574 })
575 }
576 }
577 }
578 }
579
580 pub(super) async fn reload_database_runtime_info_impl(
581 &self,
582 database_id: DatabaseId,
583 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
584 self.clean_dirty_streaming_jobs(Some(database_id))
585 .await
586 .context("clean dirty streaming jobs")?;
587
588 tracing::info!(
590 ?database_id,
591 "recovering background job progress of database"
592 );
593 let background_jobs = self
594 .list_background_job_progress()
595 .await
596 .context("recover background job progress of database should not fail")?;
597 tracing::info!(?database_id, "recovered background job progress");
598
599 let _ = self
601 .scheduled_barriers
602 .pre_apply_drop_cancel(Some(database_id));
603
604 let mut info = self
605 .resolve_graph_info(Some(database_id))
606 .await
607 .inspect_err(|err| {
608 warn!(error = %err.as_report(), "resolve actor info failed");
609 })?;
610
611 self.recovery_table_with_upstream_sinks(&mut info).await?;
612
613 assert!(info.len() <= 1);
614 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
615 assert_eq!(loaded_database_id, database_id);
616 info
617 }) else {
618 return Ok(None);
619 };
620
621 let background_jobs = {
622 let jobs = background_jobs;
623 let mut background_jobs = HashMap::new();
624 for (definition, stream_job_fragments) in jobs {
625 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
626 continue;
627 }
628 if stream_job_fragments
629 .tracking_progress_actor_ids()
630 .is_empty()
631 {
632 self.metadata_manager
634 .catalog_controller
635 .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
636 .await?;
637 } else {
638 background_jobs
639 .try_insert(
640 stream_job_fragments.stream_job_id(),
641 (definition, stream_job_fragments),
642 )
643 .expect("non-duplicate");
644 }
645 }
646 background_jobs
647 };
648
649 let (state_table_committed_epochs, state_table_log_epochs) = self
650 .hummock_manager
651 .on_current_version(|version| {
652 Self::resolve_hummock_version_epochs(&background_jobs, version)
653 })
654 .await?;
655
656 let subscription_infos = self
657 .metadata_manager
658 .get_mv_depended_subscriptions(Some(database_id))
659 .await?;
660 assert!(subscription_infos.len() <= 1);
661 let mv_depended_subscriptions = subscription_infos
662 .into_iter()
663 .next()
664 .map(|(loaded_database_id, subscriptions)| {
665 assert_eq!(loaded_database_id, database_id);
666 subscriptions
667 })
668 .unwrap_or_default();
669 let subscription_info = InflightSubscriptionInfo {
670 mv_depended_subscriptions,
671 };
672
673 let fragment_relations = self
674 .metadata_manager
675 .catalog_controller
676 .get_fragment_downstream_relations(
677 info.values()
678 .flatten()
679 .map(|fragment| fragment.fragment_id as _)
680 .collect(),
681 )
682 .await?;
683
684 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
686 warn!(error = %err.as_report(), "update actors failed");
687 })?;
688
689 let mut source_splits = HashMap::new();
691 for fragment in info.values().flatten() {
692 for (actor_id, info) in &fragment.actors {
693 source_splits.insert(*actor_id, info.splits.clone());
694 }
695 }
696
697 let cdc_table_backfill_actors = self
698 .metadata_manager
699 .catalog_controller
700 .cdc_table_backfill_actor_ids()
701 .await?;
702 let cdc_table_ids = cdc_table_backfill_actors
703 .keys()
704 .cloned()
705 .collect::<Vec<_>>();
706 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
707 cdc_table_backfill_actors,
708 self.env.meta_store_ref(),
709 self.env.cdc_table_backfill_tracker.completed_job_ids(),
710 )
711 .await?;
712 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
713 {
714 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
715 } else {
716 CdcTableSnapshotSplitAssignmentWithGeneration::new(
717 cdc_table_snapshot_split_assignment,
718 self.env
719 .cdc_table_backfill_tracker
720 .next_generation(cdc_table_ids.into_iter()),
721 )
722 };
723 Ok(Some(DatabaseRuntimeInfoSnapshot {
724 job_infos: info,
725 state_table_committed_epochs,
726 state_table_log_epochs,
727 subscription_info,
728 stream_actors,
729 fragment_relations,
730 source_splits,
731 background_jobs,
732 cdc_table_snapshot_split_assignment,
733 }))
734 }
735}
736
737impl GlobalBarrierWorkerContextImpl {
738 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
740
741 async fn migrate_actors(
743 &self,
744 active_nodes: &mut ActiveStreamingWorkerNodes,
745 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
746 let mgr = &self.metadata_manager;
747
748 let all_inuse_worker_slots: HashSet<_> = mgr
750 .catalog_controller
751 .all_inuse_worker_slots()
752 .await?
753 .into_iter()
754 .collect();
755
756 let active_worker_slots: HashSet<_> = active_nodes
757 .current()
758 .values()
759 .flat_map(|node| {
760 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
761 })
762 .collect();
763
764 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
765 .difference(&active_worker_slots)
766 .cloned()
767 .collect();
768
769 if expired_worker_slots.is_empty() {
770 info!("no expired worker slots, skipping.");
771 return self.resolve_graph_info(None).await;
772 }
773
774 info!("start migrate actors.");
775 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
776 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
777
778 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
779 .intersection(&active_worker_slots)
780 .cloned()
781 .collect();
782
783 let start = Instant::now();
784 let mut plan = HashMap::new();
785 'discovery: while !to_migrate_worker_slots.is_empty() {
786 let mut new_worker_slots = active_nodes
787 .current()
788 .values()
789 .flat_map(|worker| {
790 (0..worker.compute_node_parallelism())
791 .map(move |i| WorkerSlotId::new(worker.id, i as _))
792 })
793 .collect_vec();
794
795 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
796 let to_migration_size = to_migrate_worker_slots.len();
797 let mut available_size = new_worker_slots.len();
798
799 if available_size < to_migration_size
800 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
801 {
802 let mut factor = 2;
803
804 while available_size < to_migration_size {
805 let mut extended_worker_slots = active_nodes
806 .current()
807 .values()
808 .flat_map(|worker| {
809 (0..worker.compute_node_parallelism() * factor)
810 .map(move |i| WorkerSlotId::new(worker.id, i as _))
811 })
812 .collect_vec();
813
814 extended_worker_slots
815 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
816
817 extended_worker_slots.sort_by(|a, b| {
818 a.slot_idx()
819 .cmp(&b.slot_idx())
820 .then(a.worker_id().cmp(&b.worker_id()))
821 });
822
823 available_size = extended_worker_slots.len();
824 new_worker_slots = extended_worker_slots;
825
826 factor *= 2;
827 }
828
829 tracing::info!(
830 "migration timed out, extending worker slots to {:?} by factor {}",
831 new_worker_slots,
832 factor,
833 );
834 }
835
836 if !new_worker_slots.is_empty() {
837 debug!("new worker slots found: {:#?}", new_worker_slots);
838 for target_worker_slot in new_worker_slots {
839 if let Some(from) = to_migrate_worker_slots.pop() {
840 debug!(
841 "plan to migrate from worker slot {} to {}",
842 from, target_worker_slot
843 );
844 inuse_worker_slots.insert(target_worker_slot);
845 plan.insert(from, target_worker_slot);
846 } else {
847 break 'discovery;
848 }
849 }
850 }
851
852 if to_migrate_worker_slots.is_empty() {
853 break;
854 }
855
856 let changed = active_nodes
858 .wait_changed(
859 Duration::from_millis(5000),
860 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
861 |active_nodes| {
862 let current_nodes = active_nodes
863 .current()
864 .values()
865 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
866 .collect_vec();
867 warn!(
868 current_nodes = ?current_nodes,
869 "waiting for new workers to join, elapsed: {}s",
870 start.elapsed().as_secs()
871 );
872 },
873 )
874 .await;
875 warn!(?changed, "get worker changed or timed out. Retry migrate");
876 }
877
878 info!("migration plan {:?}", plan);
879
880 mgr.catalog_controller.migrate_actors(plan).await?;
881
882 info!("migrate actors succeed.");
883
884 self.resolve_graph_info(None).await
885 }
886
887 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
888 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
889 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
890 };
891
892 match self.scale_controller.integrity_check().await {
893 Ok(_) => {
894 info!("integrity check passed");
895 }
896 Err(e) => {
897 return Err(anyhow!(e).context("integrity check failed").into());
898 }
899 }
900
901 let mgr = &self.metadata_manager;
902
903 debug!("start resetting actors distribution");
904
905 let available_workers: HashMap<_, _> = active_nodes
906 .current()
907 .values()
908 .filter(|worker| worker.is_streaming_schedulable())
909 .map(|worker| (worker.id, worker.clone()))
910 .collect();
911
912 info!(
913 "target worker ids for offline scaling: {:?}",
914 available_workers
915 );
916
917 let available_parallelism = active_nodes
918 .current()
919 .values()
920 .map(|worker_node| worker_node.compute_node_parallelism())
921 .sum();
922
923 let mut table_parallelisms = HashMap::new();
924
925 let reschedule_targets: HashMap<_, _> = {
926 let streaming_parallelisms = mgr
927 .catalog_controller
928 .get_all_streaming_parallelisms()
929 .await?;
930
931 let mut result = HashMap::new();
932
933 for (object_id, streaming_parallelism) in streaming_parallelisms {
934 let actual_fragment_parallelism = mgr
935 .catalog_controller
936 .get_actual_job_fragment_parallelism(object_id)
937 .await?;
938
939 let table_parallelism = match streaming_parallelism {
940 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
941 StreamingParallelism::Custom => model::TableParallelism::Custom,
942 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
943 };
944
945 let target_parallelism = Self::derive_target_parallelism(
946 available_parallelism,
947 table_parallelism,
948 actual_fragment_parallelism,
949 self.env.opts.default_parallelism,
950 );
951
952 if target_parallelism != table_parallelism {
953 tracing::info!(
954 "resetting table {} parallelism from {:?} to {:?}",
955 object_id,
956 table_parallelism,
957 target_parallelism
958 );
959 }
960
961 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
962
963 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
964
965 result.insert(
966 object_id as u32,
967 JobRescheduleTarget {
968 parallelism: parallelism_change,
969 resource_group: JobResourceGroupTarget::Keep,
970 },
971 );
972 }
973
974 result
975 };
976
977 info!(
978 "target table parallelisms for offline scaling: {:?}",
979 reschedule_targets
980 );
981
982 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
983
984 for chunk in reschedule_targets
985 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
986 .map(|c| c.to_vec())
987 {
988 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
989
990 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
991
992 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
993
994 let plan = self
995 .scale_controller
996 .generate_job_reschedule_plan(
997 JobReschedulePolicy {
998 targets: local_reschedule_targets,
999 },
1000 false,
1001 )
1002 .await?;
1003
1004 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
1006 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
1007 continue;
1008 };
1009
1010 let mut compared_table_parallelisms = table_parallelisms.clone();
1011
1012 let reschedule_fragment = if plan.reschedules.is_empty() {
1014 HashMap::new()
1015 } else {
1016 self.scale_controller
1017 .analyze_reschedule_plan(
1018 plan.reschedules,
1019 RescheduleOptions {
1020 resolve_no_shuffle_upstream: true,
1021 skip_create_new_actors: true,
1022 },
1023 &mut compared_table_parallelisms,
1024 )
1025 .await?
1026 };
1027
1028 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
1030
1031 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
1032
1033 if let Err(e) = self
1034 .scale_controller
1035 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
1036 .await
1037 {
1038 tracing::error!(
1039 error = %e.as_report(),
1040 "failed to apply reschedule for offline scaling in recovery",
1041 );
1042
1043 return Err(e);
1044 }
1045
1046 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
1047 }
1048
1049 info!("scaling actors succeed.");
1050 Ok(())
1051 }
1052
1053 fn derive_target_parallelism(
1061 available_parallelism: usize,
1062 assigned_parallelism: TableParallelism,
1063 actual_fragment_parallelism: Option<usize>,
1064 default_parallelism: DefaultParallelism,
1065 ) -> TableParallelism {
1066 match assigned_parallelism {
1067 TableParallelism::Custom => {
1068 if let Some(fragment_parallelism) = actual_fragment_parallelism {
1069 if fragment_parallelism >= available_parallelism {
1070 TableParallelism::Adaptive
1071 } else {
1072 TableParallelism::Fixed(fragment_parallelism)
1073 }
1074 } else {
1075 TableParallelism::Adaptive
1076 }
1077 }
1078 TableParallelism::Adaptive => {
1079 match (default_parallelism, actual_fragment_parallelism) {
1080 (DefaultParallelism::Default(n), Some(fragment_parallelism))
1081 if fragment_parallelism == n.get() =>
1082 {
1083 TableParallelism::Fixed(fragment_parallelism)
1084 }
1085 _ => TableParallelism::Adaptive,
1086 }
1087 }
1088 _ => assigned_parallelism,
1089 }
1090 }
1091
1092 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
1094 self.metadata_manager.all_active_actors().await
1095 }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use std::num::NonZeroUsize;
1101
1102 use super::*;
1103 #[test]
1104 fn test_derive_target_parallelism() {
1105 assert_eq!(
1107 TableParallelism::Fixed(5),
1108 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1109 10,
1110 TableParallelism::Custom,
1111 Some(5),
1112 DefaultParallelism::Full,
1113 )
1114 );
1115
1116 assert_eq!(
1118 TableParallelism::Adaptive,
1119 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1120 10,
1121 TableParallelism::Custom,
1122 Some(10),
1123 DefaultParallelism::Full,
1124 )
1125 );
1126
1127 assert_eq!(
1129 TableParallelism::Adaptive,
1130 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1131 10,
1132 TableParallelism::Custom,
1133 Some(11),
1134 DefaultParallelism::Full,
1135 )
1136 );
1137
1138 assert_eq!(
1140 TableParallelism::Adaptive,
1141 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1142 10,
1143 TableParallelism::Custom,
1144 None,
1145 DefaultParallelism::Full,
1146 )
1147 );
1148
1149 assert_eq!(
1151 TableParallelism::Adaptive,
1152 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1153 10,
1154 TableParallelism::Adaptive,
1155 None,
1156 DefaultParallelism::Full,
1157 )
1158 );
1159
1160 assert_eq!(
1162 TableParallelism::Fixed(5),
1163 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1164 10,
1165 TableParallelism::Adaptive,
1166 Some(5),
1167 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1168 )
1169 );
1170
1171 assert_eq!(
1173 TableParallelism::Adaptive,
1174 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1175 10,
1176 TableParallelism::Adaptive,
1177 Some(6),
1178 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1179 )
1180 );
1181 }
1182}