1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
27use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::StreamingParallelism;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tokio::time::Instant;
34use tracing::{debug, info, warn};
35
36use super::BarrierWorkerRuntimeInfoSnapshot;
37use crate::barrier::context::GlobalBarrierWorkerContextImpl;
38use crate::barrier::info::InflightStreamingJobInfo;
39use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
40use crate::manager::ActiveStreamingWorkerNodes;
41use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
42use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
43use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
44use crate::stream::{
45 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
46 RescheduleOptions, SourceChange, StreamFragmentGraph,
47};
48use crate::{MetaResult, model};
49
50impl GlobalBarrierWorkerContextImpl {
51 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
53 let database_id = database_id.map(|database_id| database_id.database_id as _);
54 self.metadata_manager
55 .catalog_controller
56 .clean_dirty_subscription(database_id)
57 .await?;
58 let dirty_associated_source_ids = self
59 .metadata_manager
60 .catalog_controller
61 .clean_dirty_creating_jobs(database_id)
62 .await?;
63
64 self.source_manager
66 .apply_source_change(SourceChange::DropSource {
67 dropped_source_ids: dirty_associated_source_ids,
68 })
69 .await;
70
71 Ok(())
72 }
73
74 async fn purge_state_table_from_hummock(
75 &self,
76 all_state_table_ids: &HashSet<TableId>,
77 ) -> MetaResult<()> {
78 self.hummock_manager.purge(all_state_table_ids).await?;
79 Ok(())
80 }
81
82 async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
83 let mgr = &self.metadata_manager;
84 let job_info = mgr
85 .catalog_controller
86 .list_background_creating_jobs(false)
87 .await?;
88
89 try_join_all(
90 job_info
91 .into_iter()
92 .map(|(id, definition, _init_at)| async move {
93 let table_id = TableId::new(id as _);
94 let stream_job_fragments =
95 mgr.catalog_controller.get_job_fragments_by_id(id).await?;
96 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
97 Ok((definition, stream_job_fragments))
98 }),
99 )
100 .await
101 }
103
104 async fn resolve_graph_info(
108 &self,
109 database_id: Option<DatabaseId>,
110 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
111 let database_id = database_id.map(|database_id| database_id.database_id as _);
112 let all_actor_infos = self
113 .metadata_manager
114 .catalog_controller
115 .load_all_actors(database_id)
116 .await?;
117
118 Ok(all_actor_infos
119 .into_iter()
120 .map(|(loaded_database_id, job_fragment_infos)| {
121 if let Some(database_id) = database_id {
122 assert_eq!(database_id, loaded_database_id);
123 }
124 (
125 DatabaseId::new(loaded_database_id as _),
126 job_fragment_infos
127 .into_iter()
128 .map(|(job_id, fragment_infos)| {
129 let job_id = TableId::new(job_id as _);
130 (
131 job_id,
132 InflightStreamingJobInfo {
133 job_id,
134 fragment_infos: fragment_infos
135 .into_iter()
136 .map(|(fragment_id, info)| (fragment_id as _, info))
137 .collect(),
138 },
139 )
140 })
141 .collect(),
142 )
143 })
144 .collect())
145 }
146
147 #[expect(clippy::type_complexity)]
148 fn resolve_hummock_version_epochs(
149 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
150 version: &HummockVersion,
151 ) -> MetaResult<(
152 HashMap<TableId, u64>,
153 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
154 )> {
155 let table_committed_epoch: HashMap<_, _> = version
156 .state_table_info
157 .info()
158 .iter()
159 .map(|(table_id, info)| (*table_id, info.committed_epoch))
160 .collect();
161 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
162 Ok(*table_committed_epoch
163 .get(&table_id)
164 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
165 };
166 let mut min_downstream_committed_epochs = HashMap::new();
167 for (_, job) in background_jobs.values() {
168 let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
169 warn!(
171 "background job {} has no committed epoch, skip resolving epochs",
172 job.stream_job_id
173 );
174 continue;
175 };
176 if let (Some(snapshot_backfill_info), _) =
177 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
178 job.fragments()
179 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
180 )?
181 {
182 for (upstream_table, snapshot_epoch) in
183 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
184 {
185 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
186 anyhow!(
187 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
188 job
189 )
190 })?;
191 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
192 match min_downstream_committed_epochs.entry(upstream_table) {
193 Entry::Occupied(entry) => {
194 let prev_min_epoch = entry.into_mut();
195 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
196 }
197 Entry::Vacant(entry) => {
198 entry.insert(pinned_epoch);
199 }
200 }
201 }
202 }
203 }
204 let mut log_epochs = HashMap::new();
205 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
206 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
207 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
208 Ordering::Less => {
209 return Err(anyhow!(
210 "downstream epoch {} later than upstream epoch {} of table {}",
211 downstream_committed_epoch,
212 upstream_committed_epoch,
213 upstream_table_id
214 )
215 .into());
216 }
217 Ordering::Equal => {
218 continue;
219 }
220 Ordering::Greater => {
221 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
222 {
223 let epochs = table_change_log
224 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
225 .map(|epoch_log| {
226 (
227 epoch_log.non_checkpoint_epochs.clone(),
228 epoch_log.checkpoint_epoch,
229 )
230 })
231 .collect_vec();
232 let first_epochs = epochs.first();
233 if let Some((_, first_checkpoint_epoch)) = &first_epochs
234 && *first_checkpoint_epoch == downstream_committed_epoch
235 {
236 } else {
237 return Err(anyhow!(
238 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
239 epochs, upstream_table_id, downstream_committed_epoch).into()
240 );
241 }
242 log_epochs
243 .try_insert(upstream_table_id, epochs)
244 .expect("non-duplicated");
245 } else {
246 return Err(anyhow!(
247 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
248 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
249 );
250 }
251 }
252 }
253 }
254 Ok((table_committed_epoch, log_epochs))
255 }
256
257 async fn recovery_table_with_upstream_sinks(
262 &self,
263 inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
264 ) -> MetaResult<()> {
265 let mut jobs = inflight_jobs.values_mut().try_fold(
266 HashMap::new(),
267 |mut acc, table_map| -> MetaResult<_> {
268 for (tid, job) in table_map {
269 if acc.insert(tid.table_id, job).is_some() {
270 return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
271 }
272 }
273 Ok(acc)
274 },
275 )?;
276 let job_ids = jobs.keys().cloned().collect_vec();
277 let tables = self
279 .metadata_manager
280 .catalog_controller
281 .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
282 .await?;
283 for table in tables {
284 assert_eq!(table.table_type(), PbTableType::Table);
285 let fragments = jobs.get_mut(&table.id).unwrap();
286 let mut target_fragment_id = None;
287 for fragment in fragments.fragment_infos.values() {
288 let mut is_target_fragment = false;
289 visit_stream_node_cont(&fragment.nodes, |node| {
290 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
291 is_target_fragment = true;
292 false
293 } else {
294 true
295 }
296 });
297 if is_target_fragment {
298 target_fragment_id = Some(fragment.fragment_id);
299 break;
300 }
301 }
302 let Some(target_fragment_id) = target_fragment_id else {
303 tracing::debug!(
304 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
305 table.id
306 );
307 continue;
308 };
309 let target_fragment = fragments
310 .fragment_infos
311 .get_mut(&target_fragment_id)
312 .unwrap();
313 let upstream_infos = self
314 .metadata_manager
315 .catalog_controller
316 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
317 .await?;
318 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
319 }
320
321 Ok(())
322 }
323
324 pub(super) async fn reload_runtime_info_impl(
325 &self,
326 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
327 {
328 {
329 {
330 self.clean_dirty_streaming_jobs(None)
331 .await
332 .context("clean dirty streaming jobs")?;
333
334 tracing::info!("recovering background job progress");
336 let background_jobs = {
337 let jobs = self
338 .list_background_job_progress()
339 .await
340 .context("recover background job progress should not fail")?;
341 let mut background_jobs = HashMap::new();
342 for (definition, stream_job_fragments) in jobs {
343 if stream_job_fragments
344 .tracking_progress_actor_ids()
345 .is_empty()
346 {
347 self.metadata_manager
349 .catalog_controller
350 .finish_streaming_job(
351 stream_job_fragments.stream_job_id().table_id as _,
352 )
353 .await?;
354 } else {
355 background_jobs
356 .try_insert(
357 stream_job_fragments.stream_job_id(),
358 (definition, stream_job_fragments),
359 )
360 .expect("non-duplicate");
361 }
362 }
363 background_jobs
364 };
365
366 tracing::info!("recovered background job progress");
367
368 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
370
371 let mut active_streaming_nodes =
372 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
373 .await?;
374
375 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
376 info!(
377 "background streaming jobs: {:?} total {}",
378 background_streaming_jobs,
379 background_streaming_jobs.len()
380 );
381
382 let unreschedulable_jobs = {
383 let mut unreschedulable_jobs = HashSet::new();
384
385 for job_id in background_streaming_jobs {
386 let scan_types = self
387 .metadata_manager
388 .get_job_backfill_scan_types(&job_id)
389 .await?;
390
391 if scan_types
392 .values()
393 .any(|scan_type| !scan_type.is_reschedulable())
394 {
395 unreschedulable_jobs.insert(job_id);
396 }
397 }
398
399 unreschedulable_jobs
400 };
401
402 if !unreschedulable_jobs.is_empty() {
403 tracing::info!(
404 "unreschedulable background jobs: {:?}",
405 unreschedulable_jobs
406 );
407 }
408
409 let mut info = if !self.env.opts.disable_automatic_parallelism_control
414 && unreschedulable_jobs.is_empty()
415 {
416 info!("trigger offline scaling");
417 self.scale_actors(&active_streaming_nodes)
418 .await
419 .inspect_err(|err| {
420 warn!(error = %err.as_report(), "scale actors failed");
421 })?;
422
423 self.resolve_graph_info(None).await.inspect_err(|err| {
424 warn!(error = %err.as_report(), "resolve actor info failed");
425 })?
426 } else {
427 info!("trigger actor migration");
428 self.migrate_actors(&mut active_streaming_nodes)
430 .await
431 .inspect_err(|err| {
432 warn!(error = %err.as_report(), "migrate actors failed");
433 })?
434 };
435
436 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
437 info = self.resolve_graph_info(None).await.inspect_err(|err| {
438 warn!(error = %err.as_report(), "resolve actor info failed");
439 })?
440 }
441
442 self.recovery_table_with_upstream_sinks(&mut info).await?;
443
444 let info = info;
445
446 self.purge_state_table_from_hummock(
447 &info
448 .values()
449 .flatten()
450 .flat_map(|(_, job)| job.existing_table_ids())
451 .collect(),
452 )
453 .await
454 .context("purge state table from hummock")?;
455
456 let (state_table_committed_epochs, state_table_log_epochs) = self
457 .hummock_manager
458 .on_current_version(|version| {
459 Self::resolve_hummock_version_epochs(&background_jobs, version)
460 })
461 .await?;
462
463 let subscription_infos = self
464 .metadata_manager
465 .get_mv_depended_subscriptions(None)
466 .await?
467 .into_iter()
468 .map(|(database_id, mv_depended_subscriptions)| {
469 (
470 database_id,
471 InflightSubscriptionInfo {
472 mv_depended_subscriptions,
473 },
474 )
475 })
476 .collect();
477
478 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
480 warn!(error = %err.as_report(), "update actors failed");
481 })?;
482
483 let fragment_relations = self
484 .metadata_manager
485 .catalog_controller
486 .get_fragment_downstream_relations(
487 info.values()
488 .flatten()
489 .flat_map(|(_, job)| job.fragment_infos())
490 .map(|fragment| fragment.fragment_id as _)
491 .collect(),
492 )
493 .await?;
494
495 let background_jobs = {
496 let jobs = self
497 .list_background_job_progress()
498 .await
499 .context("recover background job progress should not fail")?;
500 let mut background_jobs = HashMap::new();
501 for (definition, stream_job_fragments) in jobs {
502 background_jobs
503 .try_insert(
504 stream_job_fragments.stream_job_id(),
505 (definition, stream_job_fragments),
506 )
507 .expect("non-duplicate");
508 }
509 background_jobs
510 };
511
512 let database_infos = self
513 .metadata_manager
514 .catalog_controller
515 .list_databases()
516 .await?;
517
518 let source_splits = self.source_manager.list_assignments().await;
520 let cdc_table_backfill_actors = self
521 .metadata_manager
522 .catalog_controller
523 .cdc_table_backfill_actor_ids()
524 .await?;
525 let cdc_table_ids = cdc_table_backfill_actors
526 .keys()
527 .cloned()
528 .collect::<Vec<_>>();
529 let cdc_table_snapshot_split_assignment =
530 assign_cdc_table_snapshot_splits_pairs(
531 cdc_table_backfill_actors,
532 self.env.meta_store_ref(),
533 self.env.cdc_table_backfill_tracker.completed_job_ids(),
534 )
535 .await?;
536 let cdc_table_snapshot_split_assignment =
537 if cdc_table_snapshot_split_assignment.is_empty() {
538 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
539 } else {
540 let generation = self
541 .env
542 .cdc_table_backfill_tracker
543 .next_generation(cdc_table_ids.into_iter());
544 CdcTableSnapshotSplitAssignmentWithGeneration::new(
545 cdc_table_snapshot_split_assignment,
546 generation,
547 )
548 };
549 Ok(BarrierWorkerRuntimeInfoSnapshot {
550 active_streaming_nodes,
551 database_job_infos: info,
552 state_table_committed_epochs,
553 state_table_log_epochs,
554 subscription_infos,
555 stream_actors,
556 fragment_relations,
557 source_splits,
558 background_jobs,
559 hummock_version_stats: self.hummock_manager.get_version_stats().await,
560 database_infos,
561 cdc_table_snapshot_split_assignment,
562 })
563 }
564 }
565 }
566 }
567
568 pub(super) async fn reload_database_runtime_info_impl(
569 &self,
570 database_id: DatabaseId,
571 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
572 self.clean_dirty_streaming_jobs(Some(database_id))
573 .await
574 .context("clean dirty streaming jobs")?;
575
576 tracing::info!(
578 ?database_id,
579 "recovering background job progress of database"
580 );
581 let background_jobs = self
582 .list_background_job_progress()
583 .await
584 .context("recover background job progress of database should not fail")?;
585 tracing::info!(?database_id, "recovered background job progress");
586
587 let _ = self
589 .scheduled_barriers
590 .pre_apply_drop_cancel(Some(database_id));
591
592 let mut info = self
593 .resolve_graph_info(Some(database_id))
594 .await
595 .inspect_err(|err| {
596 warn!(error = %err.as_report(), "resolve actor info failed");
597 })?;
598
599 self.recovery_table_with_upstream_sinks(&mut info).await?;
600
601 assert!(info.len() <= 1);
602 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
603 assert_eq!(loaded_database_id, database_id);
604 info
605 }) else {
606 return Ok(None);
607 };
608
609 let background_jobs = {
610 let jobs = background_jobs;
611 let mut background_jobs = HashMap::new();
612 for (definition, stream_job_fragments) in jobs {
613 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
614 continue;
615 }
616 if stream_job_fragments
617 .tracking_progress_actor_ids()
618 .is_empty()
619 {
620 self.metadata_manager
622 .catalog_controller
623 .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
624 .await?;
625 } else {
626 background_jobs
627 .try_insert(
628 stream_job_fragments.stream_job_id(),
629 (definition, stream_job_fragments),
630 )
631 .expect("non-duplicate");
632 }
633 }
634 background_jobs
635 };
636
637 let (state_table_committed_epochs, state_table_log_epochs) = self
638 .hummock_manager
639 .on_current_version(|version| {
640 Self::resolve_hummock_version_epochs(&background_jobs, version)
641 })
642 .await?;
643
644 let subscription_infos = self
645 .metadata_manager
646 .get_mv_depended_subscriptions(Some(database_id))
647 .await?;
648 assert!(subscription_infos.len() <= 1);
649 let mv_depended_subscriptions = subscription_infos
650 .into_iter()
651 .next()
652 .map(|(loaded_database_id, subscriptions)| {
653 assert_eq!(loaded_database_id, database_id);
654 subscriptions
655 })
656 .unwrap_or_default();
657 let subscription_info = InflightSubscriptionInfo {
658 mv_depended_subscriptions,
659 };
660
661 let fragment_relations = self
662 .metadata_manager
663 .catalog_controller
664 .get_fragment_downstream_relations(
665 info.values()
666 .flatten()
667 .map(|fragment| fragment.fragment_id as _)
668 .collect(),
669 )
670 .await?;
671
672 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
674 warn!(error = %err.as_report(), "update actors failed");
675 })?;
676
677 let source_splits = self.source_manager.list_assignments().await;
679
680 let cdc_table_backfill_actors = self
681 .metadata_manager
682 .catalog_controller
683 .cdc_table_backfill_actor_ids()
684 .await?;
685 let cdc_table_ids = cdc_table_backfill_actors
686 .keys()
687 .cloned()
688 .collect::<Vec<_>>();
689 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
690 cdc_table_backfill_actors,
691 self.env.meta_store_ref(),
692 self.env.cdc_table_backfill_tracker.completed_job_ids(),
693 )
694 .await?;
695 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
696 {
697 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
698 } else {
699 CdcTableSnapshotSplitAssignmentWithGeneration::new(
700 cdc_table_snapshot_split_assignment,
701 self.env
702 .cdc_table_backfill_tracker
703 .next_generation(cdc_table_ids.into_iter()),
704 )
705 };
706 Ok(Some(DatabaseRuntimeInfoSnapshot {
707 job_infos: info,
708 state_table_committed_epochs,
709 state_table_log_epochs,
710 subscription_info,
711 stream_actors,
712 fragment_relations,
713 source_splits,
714 background_jobs,
715 cdc_table_snapshot_split_assignment,
716 }))
717 }
718}
719
720impl GlobalBarrierWorkerContextImpl {
721 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
723
724 async fn migrate_actors(
726 &self,
727 active_nodes: &mut ActiveStreamingWorkerNodes,
728 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
729 let mgr = &self.metadata_manager;
730
731 let all_inuse_worker_slots: HashSet<_> = mgr
733 .catalog_controller
734 .all_inuse_worker_slots()
735 .await?
736 .into_iter()
737 .collect();
738
739 let active_worker_slots: HashSet<_> = active_nodes
740 .current()
741 .values()
742 .flat_map(|node| {
743 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
744 })
745 .collect();
746
747 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
748 .difference(&active_worker_slots)
749 .cloned()
750 .collect();
751
752 if expired_worker_slots.is_empty() {
753 info!("no expired worker slots, skipping.");
754 return self.resolve_graph_info(None).await;
755 }
756
757 info!("start migrate actors.");
758 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
759 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
760
761 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
762 .intersection(&active_worker_slots)
763 .cloned()
764 .collect();
765
766 let start = Instant::now();
767 let mut plan = HashMap::new();
768 'discovery: while !to_migrate_worker_slots.is_empty() {
769 let mut new_worker_slots = active_nodes
770 .current()
771 .values()
772 .flat_map(|worker| {
773 (0..worker.compute_node_parallelism())
774 .map(move |i| WorkerSlotId::new(worker.id, i as _))
775 })
776 .collect_vec();
777
778 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
779 let to_migration_size = to_migrate_worker_slots.len();
780 let mut available_size = new_worker_slots.len();
781
782 if available_size < to_migration_size
783 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
784 {
785 let mut factor = 2;
786
787 while available_size < to_migration_size {
788 let mut extended_worker_slots = active_nodes
789 .current()
790 .values()
791 .flat_map(|worker| {
792 (0..worker.compute_node_parallelism() * factor)
793 .map(move |i| WorkerSlotId::new(worker.id, i as _))
794 })
795 .collect_vec();
796
797 extended_worker_slots
798 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
799
800 extended_worker_slots.sort_by(|a, b| {
801 a.slot_idx()
802 .cmp(&b.slot_idx())
803 .then(a.worker_id().cmp(&b.worker_id()))
804 });
805
806 available_size = extended_worker_slots.len();
807 new_worker_slots = extended_worker_slots;
808
809 factor *= 2;
810 }
811
812 tracing::info!(
813 "migration timed out, extending worker slots to {:?} by factor {}",
814 new_worker_slots,
815 factor,
816 );
817 }
818
819 if !new_worker_slots.is_empty() {
820 debug!("new worker slots found: {:#?}", new_worker_slots);
821 for target_worker_slot in new_worker_slots {
822 if let Some(from) = to_migrate_worker_slots.pop() {
823 debug!(
824 "plan to migrate from worker slot {} to {}",
825 from, target_worker_slot
826 );
827 inuse_worker_slots.insert(target_worker_slot);
828 plan.insert(from, target_worker_slot);
829 } else {
830 break 'discovery;
831 }
832 }
833 }
834
835 if to_migrate_worker_slots.is_empty() {
836 break;
837 }
838
839 let changed = active_nodes
841 .wait_changed(
842 Duration::from_millis(5000),
843 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
844 |active_nodes| {
845 let current_nodes = active_nodes
846 .current()
847 .values()
848 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
849 .collect_vec();
850 warn!(
851 current_nodes = ?current_nodes,
852 "waiting for new workers to join, elapsed: {}s",
853 start.elapsed().as_secs()
854 );
855 },
856 )
857 .await;
858 warn!(?changed, "get worker changed or timed out. Retry migrate");
859 }
860
861 info!("migration plan {:?}", plan);
862
863 mgr.catalog_controller.migrate_actors(plan).await?;
864
865 info!("migrate actors succeed.");
866
867 self.resolve_graph_info(None).await
868 }
869
870 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
871 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
872 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
873 };
874
875 match self.scale_controller.integrity_check().await {
876 Ok(_) => {
877 info!("integrity check passed");
878 }
879 Err(e) => {
880 return Err(anyhow!(e).context("integrity check failed").into());
881 }
882 }
883
884 let mgr = &self.metadata_manager;
885
886 debug!("start resetting actors distribution");
887
888 let available_workers: HashMap<_, _> = active_nodes
889 .current()
890 .values()
891 .filter(|worker| worker.is_streaming_schedulable())
892 .map(|worker| (worker.id, worker.clone()))
893 .collect();
894
895 info!(
896 "target worker ids for offline scaling: {:?}",
897 available_workers
898 );
899
900 let available_parallelism = active_nodes
901 .current()
902 .values()
903 .map(|worker_node| worker_node.compute_node_parallelism())
904 .sum();
905
906 let mut table_parallelisms = HashMap::new();
907
908 let reschedule_targets: HashMap<_, _> = {
909 let streaming_parallelisms = mgr
910 .catalog_controller
911 .get_all_streaming_parallelisms()
912 .await?;
913
914 let mut result = HashMap::new();
915
916 for (object_id, streaming_parallelism) in streaming_parallelisms {
917 let actual_fragment_parallelism = mgr
918 .catalog_controller
919 .get_actual_job_fragment_parallelism(object_id)
920 .await?;
921
922 let table_parallelism = match streaming_parallelism {
923 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
924 StreamingParallelism::Custom => model::TableParallelism::Custom,
925 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
926 };
927
928 let target_parallelism = Self::derive_target_parallelism(
929 available_parallelism,
930 table_parallelism,
931 actual_fragment_parallelism,
932 self.env.opts.default_parallelism,
933 );
934
935 if target_parallelism != table_parallelism {
936 tracing::info!(
937 "resetting table {} parallelism from {:?} to {:?}",
938 object_id,
939 table_parallelism,
940 target_parallelism
941 );
942 }
943
944 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
945
946 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
947
948 result.insert(
949 object_id as u32,
950 JobRescheduleTarget {
951 parallelism: parallelism_change,
952 resource_group: JobResourceGroupTarget::Keep,
953 },
954 );
955 }
956
957 result
958 };
959
960 info!(
961 "target table parallelisms for offline scaling: {:?}",
962 reschedule_targets
963 );
964
965 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
966
967 for chunk in reschedule_targets
968 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
969 .map(|c| c.to_vec())
970 {
971 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
972
973 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
974
975 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
976
977 let plan = self
978 .scale_controller
979 .generate_job_reschedule_plan(
980 JobReschedulePolicy {
981 targets: local_reschedule_targets,
982 },
983 false,
984 )
985 .await?;
986
987 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
989 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
990 continue;
991 };
992
993 let mut compared_table_parallelisms = table_parallelisms.clone();
994
995 let reschedule_fragment = if plan.reschedules.is_empty() {
997 HashMap::new()
998 } else {
999 self.scale_controller
1000 .analyze_reschedule_plan(
1001 plan.reschedules,
1002 RescheduleOptions {
1003 resolve_no_shuffle_upstream: true,
1004 skip_create_new_actors: true,
1005 },
1006 &mut compared_table_parallelisms,
1007 )
1008 .await?
1009 };
1010
1011 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
1013
1014 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
1015
1016 if let Err(e) = self
1017 .scale_controller
1018 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
1019 .await
1020 {
1021 tracing::error!(
1022 error = %e.as_report(),
1023 "failed to apply reschedule for offline scaling in recovery",
1024 );
1025
1026 return Err(e);
1027 }
1028
1029 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
1030 }
1031
1032 info!("scaling actors succeed.");
1033 Ok(())
1034 }
1035
1036 fn derive_target_parallelism(
1044 available_parallelism: usize,
1045 assigned_parallelism: TableParallelism,
1046 actual_fragment_parallelism: Option<usize>,
1047 default_parallelism: DefaultParallelism,
1048 ) -> TableParallelism {
1049 match assigned_parallelism {
1050 TableParallelism::Custom => {
1051 if let Some(fragment_parallelism) = actual_fragment_parallelism {
1052 if fragment_parallelism >= available_parallelism {
1053 TableParallelism::Adaptive
1054 } else {
1055 TableParallelism::Fixed(fragment_parallelism)
1056 }
1057 } else {
1058 TableParallelism::Adaptive
1059 }
1060 }
1061 TableParallelism::Adaptive => {
1062 match (default_parallelism, actual_fragment_parallelism) {
1063 (DefaultParallelism::Default(n), Some(fragment_parallelism))
1064 if fragment_parallelism == n.get() =>
1065 {
1066 TableParallelism::Fixed(fragment_parallelism)
1067 }
1068 _ => TableParallelism::Adaptive,
1069 }
1070 }
1071 _ => assigned_parallelism,
1072 }
1073 }
1074
1075 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
1077 self.metadata_manager.all_active_actors().await
1078 }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use std::num::NonZeroUsize;
1084
1085 use super::*;
1086 #[test]
1087 fn test_derive_target_parallelism() {
1088 assert_eq!(
1090 TableParallelism::Fixed(5),
1091 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1092 10,
1093 TableParallelism::Custom,
1094 Some(5),
1095 DefaultParallelism::Full,
1096 )
1097 );
1098
1099 assert_eq!(
1101 TableParallelism::Adaptive,
1102 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1103 10,
1104 TableParallelism::Custom,
1105 Some(10),
1106 DefaultParallelism::Full,
1107 )
1108 );
1109
1110 assert_eq!(
1112 TableParallelism::Adaptive,
1113 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1114 10,
1115 TableParallelism::Custom,
1116 Some(11),
1117 DefaultParallelism::Full,
1118 )
1119 );
1120
1121 assert_eq!(
1123 TableParallelism::Adaptive,
1124 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1125 10,
1126 TableParallelism::Custom,
1127 None,
1128 DefaultParallelism::Full,
1129 )
1130 );
1131
1132 assert_eq!(
1134 TableParallelism::Adaptive,
1135 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1136 10,
1137 TableParallelism::Adaptive,
1138 None,
1139 DefaultParallelism::Full,
1140 )
1141 );
1142
1143 assert_eq!(
1145 TableParallelism::Fixed(5),
1146 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1147 10,
1148 TableParallelism::Adaptive,
1149 Some(5),
1150 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1151 )
1152 );
1153
1154 assert_eq!(
1156 TableParallelism::Adaptive,
1157 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1158 10,
1159 TableParallelism::Adaptive,
1160 Some(6),
1161 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1162 )
1163 );
1164 }
1165}