1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::{
39 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
40 RescheduleOptions, SourceChange, StreamFragmentGraph,
41};
42use crate::{MetaResult, model};
43
44impl GlobalBarrierWorkerContextImpl {
45 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47 let database_id = database_id.map(|database_id| database_id.database_id as _);
48 self.metadata_manager
49 .catalog_controller
50 .clean_dirty_subscription(database_id)
51 .await?;
52 let dirty_associated_source_ids = self
53 .metadata_manager
54 .catalog_controller
55 .clean_dirty_creating_jobs(database_id)
56 .await?;
57
58 self.source_manager
60 .apply_source_change(SourceChange::DropSource {
61 dropped_source_ids: dirty_associated_source_ids,
62 })
63 .await;
64
65 Ok(())
66 }
67
68 async fn purge_state_table_from_hummock(
69 &self,
70 all_state_table_ids: &HashSet<TableId>,
71 ) -> MetaResult<()> {
72 self.hummock_manager.purge(all_state_table_ids).await?;
73 Ok(())
74 }
75
76 async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
77 let mgr = &self.metadata_manager;
78 let mviews = mgr
79 .catalog_controller
80 .list_background_creating_mviews(false)
81 .await?;
82
83 try_join_all(mviews.into_iter().map(|mview| async move {
84 let table_id = TableId::new(mview.table_id as _);
85 let stream_job_fragments = mgr
86 .catalog_controller
87 .get_job_fragments_by_id(mview.table_id)
88 .await?;
89 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
90 Ok((mview.definition, stream_job_fragments))
91 }))
92 .await
93 }
95
96 async fn resolve_graph_info(
100 &self,
101 database_id: Option<DatabaseId>,
102 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
103 let database_id = database_id.map(|database_id| database_id.database_id as _);
104 let all_actor_infos = self
105 .metadata_manager
106 .catalog_controller
107 .load_all_actors(database_id)
108 .await?;
109
110 Ok(all_actor_infos
111 .into_iter()
112 .map(|(loaded_database_id, job_fragment_infos)| {
113 if let Some(database_id) = database_id {
114 assert_eq!(database_id, loaded_database_id);
115 }
116 (
117 DatabaseId::new(loaded_database_id as _),
118 job_fragment_infos
119 .into_iter()
120 .map(|(job_id, fragment_infos)| {
121 let job_id = TableId::new(job_id as _);
122 (
123 job_id,
124 InflightStreamingJobInfo {
125 job_id,
126 fragment_infos: fragment_infos
127 .into_iter()
128 .map(|(fragment_id, info)| (fragment_id as _, info))
129 .collect(),
130 },
131 )
132 })
133 .collect(),
134 )
135 })
136 .collect())
137 }
138
139 #[expect(clippy::type_complexity)]
140 fn resolve_hummock_version_epochs(
141 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
142 version: &HummockVersion,
143 ) -> MetaResult<(
144 HashMap<TableId, u64>,
145 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
146 )> {
147 let table_committed_epoch: HashMap<_, _> = version
148 .state_table_info
149 .info()
150 .iter()
151 .map(|(table_id, info)| (*table_id, info.committed_epoch))
152 .collect();
153 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
154 Ok(*table_committed_epoch
155 .get(&table_id)
156 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
157 };
158 let mut min_downstream_committed_epochs = HashMap::new();
159 for (_, job) in background_jobs.values() {
160 let job_committed_epoch = get_table_committed_epoch(job.stream_job_id)?;
161 if let (Some(snapshot_backfill_info), _) =
162 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
163 job.fragments()
164 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
165 )?
166 {
167 for (upstream_table, snapshot_epoch) in
168 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
169 {
170 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
171 anyhow!(
172 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
173 job
174 )
175 })?;
176 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
177 match min_downstream_committed_epochs.entry(upstream_table) {
178 Entry::Occupied(entry) => {
179 let prev_min_epoch = entry.into_mut();
180 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
181 }
182 Entry::Vacant(entry) => {
183 entry.insert(pinned_epoch);
184 }
185 }
186 }
187 }
188 }
189 let mut log_epochs = HashMap::new();
190 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
191 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
192 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
193 Ordering::Less => {
194 return Err(anyhow!(
195 "downstream epoch {} later than upstream epoch {} of table {}",
196 downstream_committed_epoch,
197 upstream_committed_epoch,
198 upstream_table_id
199 )
200 .into());
201 }
202 Ordering::Equal => {
203 continue;
204 }
205 Ordering::Greater => {
206 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
207 {
208 let epochs = table_change_log
209 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
210 .map(|epoch_log| {
211 (
212 epoch_log.non_checkpoint_epochs.clone(),
213 epoch_log.checkpoint_epoch,
214 )
215 })
216 .collect_vec();
217 let first_epochs = epochs.first();
218 if let Some((_, first_checkpoint_epoch)) = &first_epochs
219 && *first_checkpoint_epoch == downstream_committed_epoch
220 {
221 } else {
222 return Err(anyhow!(
223 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
224 epochs, upstream_table_id, downstream_committed_epoch).into()
225 );
226 }
227 log_epochs
228 .try_insert(upstream_table_id, epochs)
229 .expect("non-duplicated");
230 } else {
231 return Err(anyhow!(
232 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
233 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
234 );
235 }
236 }
237 }
238 }
239 Ok((table_committed_epoch, log_epochs))
240 }
241
242 pub(super) async fn reload_runtime_info_impl(
243 &self,
244 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
245 {
246 {
247 {
248 self.clean_dirty_streaming_jobs(None)
249 .await
250 .context("clean dirty streaming jobs")?;
251
252 tracing::info!("recovering mview progress");
254 let background_jobs = {
255 let jobs = self
256 .list_background_mv_progress()
257 .await
258 .context("recover mview progress should not fail")?;
259 let mut background_jobs = HashMap::new();
260 for (definition, stream_job_fragments) in jobs {
261 if stream_job_fragments
262 .tracking_progress_actor_ids()
263 .is_empty()
264 {
265 self.metadata_manager
267 .catalog_controller
268 .finish_streaming_job(
269 stream_job_fragments.stream_job_id().table_id as _,
270 None,
271 )
272 .await?;
273 } else {
274 background_jobs
275 .try_insert(
276 stream_job_fragments.stream_job_id(),
277 (definition, stream_job_fragments),
278 )
279 .expect("non-duplicate");
280 }
281 }
282 background_jobs
283 };
284
285 tracing::info!("recovered mview progress");
286
287 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
289
290 let mut active_streaming_nodes =
291 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
292 .await?;
293
294 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
295 info!(
296 "background streaming jobs: {:?} total {}",
297 background_streaming_jobs,
298 background_streaming_jobs.len()
299 );
300
301 let unreschedulable_jobs = {
302 let mut unreschedulable_jobs = HashSet::new();
303
304 for job_id in background_streaming_jobs {
305 let scan_types = self
306 .metadata_manager
307 .get_job_backfill_scan_types(&job_id)
308 .await?;
309
310 if scan_types
311 .values()
312 .any(|scan_type| !scan_type.is_reschedulable())
313 {
314 unreschedulable_jobs.insert(job_id);
315 }
316 }
317
318 unreschedulable_jobs
319 };
320
321 if !unreschedulable_jobs.is_empty() {
322 tracing::info!(
323 "unreschedulable background jobs: {:?}",
324 unreschedulable_jobs
325 );
326 }
327
328 let mut info = if !self.env.opts.disable_automatic_parallelism_control
333 && unreschedulable_jobs.is_empty()
334 {
335 info!("trigger offline scaling");
336 self.scale_actors(&active_streaming_nodes)
337 .await
338 .inspect_err(|err| {
339 warn!(error = %err.as_report(), "scale actors failed");
340 })?;
341
342 self.resolve_graph_info(None).await.inspect_err(|err| {
343 warn!(error = %err.as_report(), "resolve actor info failed");
344 })?
345 } else {
346 info!("trigger actor migration");
347 self.migrate_actors(&mut active_streaming_nodes)
349 .await
350 .inspect_err(|err| {
351 warn!(error = %err.as_report(), "migrate actors failed");
352 })?
353 };
354
355 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
356 info = self.resolve_graph_info(None).await.inspect_err(|err| {
357 warn!(error = %err.as_report(), "resolve actor info failed");
358 })?
359 }
360
361 let info = info;
362
363 self.purge_state_table_from_hummock(
364 &info
365 .values()
366 .flatten()
367 .flat_map(|(_, job)| job.existing_table_ids())
368 .collect(),
369 )
370 .await
371 .context("purge state table from hummock")?;
372
373 let (state_table_committed_epochs, state_table_log_epochs) = self
374 .hummock_manager
375 .on_current_version(|version| {
376 Self::resolve_hummock_version_epochs(&background_jobs, version)
377 })
378 .await?;
379
380 let subscription_infos = self
381 .metadata_manager
382 .get_mv_depended_subscriptions(None)
383 .await?
384 .into_iter()
385 .map(|(database_id, mv_depended_subscriptions)| {
386 (
387 database_id,
388 InflightSubscriptionInfo {
389 mv_depended_subscriptions,
390 },
391 )
392 })
393 .collect();
394
395 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
397 warn!(error = %err.as_report(), "update actors failed");
398 })?;
399
400 let fragment_relations = self
401 .metadata_manager
402 .catalog_controller
403 .get_fragment_downstream_relations(
404 info.values()
405 .flatten()
406 .flat_map(|(_, job)| job.fragment_infos())
407 .map(|fragment| fragment.fragment_id as _)
408 .collect(),
409 )
410 .await?;
411
412 let background_jobs = {
413 let jobs = self
414 .list_background_mv_progress()
415 .await
416 .context("recover mview progress should not fail")?;
417 let mut background_jobs = HashMap::new();
418 for (definition, stream_job_fragments) in jobs {
419 background_jobs
420 .try_insert(
421 stream_job_fragments.stream_job_id(),
422 (definition, stream_job_fragments),
423 )
424 .expect("non-duplicate");
425 }
426 background_jobs
427 };
428
429 let source_splits = self.source_manager.list_assignments().await;
431 Ok(BarrierWorkerRuntimeInfoSnapshot {
432 active_streaming_nodes,
433 database_job_infos: info,
434 state_table_committed_epochs,
435 state_table_log_epochs,
436 subscription_infos,
437 stream_actors,
438 fragment_relations,
439 source_splits,
440 background_jobs,
441 hummock_version_stats: self.hummock_manager.get_version_stats().await,
442 })
443 }
444 }
445 }
446 }
447
448 pub(super) async fn reload_database_runtime_info_impl(
449 &self,
450 database_id: DatabaseId,
451 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
452 self.clean_dirty_streaming_jobs(Some(database_id))
453 .await
454 .context("clean dirty streaming jobs")?;
455
456 tracing::info!(?database_id, "recovering mview progress of database");
458 let background_jobs = self
459 .list_background_mv_progress()
460 .await
461 .context("recover mview progress of database should not fail")?;
462 tracing::info!(?database_id, "recovered mview progress");
463
464 let _ = self
466 .scheduled_barriers
467 .pre_apply_drop_cancel(Some(database_id));
468
469 let info = self
470 .resolve_graph_info(Some(database_id))
471 .await
472 .inspect_err(|err| {
473 warn!(error = %err.as_report(), "resolve actor info failed");
474 })?;
475 assert!(info.len() <= 1);
476 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
477 assert_eq!(loaded_database_id, database_id);
478 info
479 }) else {
480 return Ok(None);
481 };
482
483 let background_jobs = {
484 let jobs = background_jobs;
485 let mut background_jobs = HashMap::new();
486 for (definition, stream_job_fragments) in jobs {
487 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
488 continue;
489 }
490 if stream_job_fragments
491 .tracking_progress_actor_ids()
492 .is_empty()
493 {
494 self.metadata_manager
496 .catalog_controller
497 .finish_streaming_job(
498 stream_job_fragments.stream_job_id().table_id as _,
499 None,
500 )
501 .await?;
502 } else {
503 background_jobs
504 .try_insert(
505 stream_job_fragments.stream_job_id(),
506 (definition, stream_job_fragments),
507 )
508 .expect("non-duplicate");
509 }
510 }
511 background_jobs
512 };
513
514 let (state_table_committed_epochs, state_table_log_epochs) = self
515 .hummock_manager
516 .on_current_version(|version| {
517 Self::resolve_hummock_version_epochs(&background_jobs, version)
518 })
519 .await?;
520
521 let subscription_infos = self
522 .metadata_manager
523 .get_mv_depended_subscriptions(Some(database_id))
524 .await?;
525 assert!(subscription_infos.len() <= 1);
526 let mv_depended_subscriptions = subscription_infos
527 .into_iter()
528 .next()
529 .map(|(loaded_database_id, subscriptions)| {
530 assert_eq!(loaded_database_id, database_id);
531 subscriptions
532 })
533 .unwrap_or_default();
534 let subscription_info = InflightSubscriptionInfo {
535 mv_depended_subscriptions,
536 };
537
538 let fragment_relations = self
539 .metadata_manager
540 .catalog_controller
541 .get_fragment_downstream_relations(
542 info.values()
543 .flatten()
544 .map(|fragment| fragment.fragment_id as _)
545 .collect(),
546 )
547 .await?;
548
549 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
551 warn!(error = %err.as_report(), "update actors failed");
552 })?;
553
554 let source_splits = self.source_manager.list_assignments().await;
556 Ok(Some(DatabaseRuntimeInfoSnapshot {
557 job_infos: info,
558 state_table_committed_epochs,
559 state_table_log_epochs,
560 subscription_info,
561 stream_actors,
562 fragment_relations,
563 source_splits,
564 background_jobs,
565 }))
566 }
567}
568
569impl GlobalBarrierWorkerContextImpl {
570 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
572
573 async fn migrate_actors(
575 &self,
576 active_nodes: &mut ActiveStreamingWorkerNodes,
577 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
578 let mgr = &self.metadata_manager;
579
580 let all_inuse_worker_slots: HashSet<_> = mgr
582 .catalog_controller
583 .all_inuse_worker_slots()
584 .await?
585 .into_iter()
586 .collect();
587
588 let active_worker_slots: HashSet<_> = active_nodes
589 .current()
590 .values()
591 .flat_map(|node| {
592 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
593 })
594 .collect();
595
596 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
597 .difference(&active_worker_slots)
598 .cloned()
599 .collect();
600
601 if expired_worker_slots.is_empty() {
602 info!("no expired worker slots, skipping.");
603 return self.resolve_graph_info(None).await;
604 }
605
606 info!("start migrate actors.");
607 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
608 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
609
610 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
611 .intersection(&active_worker_slots)
612 .cloned()
613 .collect();
614
615 let start = Instant::now();
616 let mut plan = HashMap::new();
617 'discovery: while !to_migrate_worker_slots.is_empty() {
618 let mut new_worker_slots = active_nodes
619 .current()
620 .values()
621 .flat_map(|worker| {
622 (0..worker.compute_node_parallelism())
623 .map(move |i| WorkerSlotId::new(worker.id, i as _))
624 })
625 .collect_vec();
626
627 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
628 let to_migration_size = to_migrate_worker_slots.len();
629 let mut available_size = new_worker_slots.len();
630
631 if available_size < to_migration_size
632 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
633 {
634 let mut factor = 2;
635
636 while available_size < to_migration_size {
637 let mut extended_worker_slots = active_nodes
638 .current()
639 .values()
640 .flat_map(|worker| {
641 (0..worker.compute_node_parallelism() * factor)
642 .map(move |i| WorkerSlotId::new(worker.id, i as _))
643 })
644 .collect_vec();
645
646 extended_worker_slots
647 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
648
649 extended_worker_slots.sort_by(|a, b| {
650 a.slot_idx()
651 .cmp(&b.slot_idx())
652 .then(a.worker_id().cmp(&b.worker_id()))
653 });
654
655 available_size = extended_worker_slots.len();
656 new_worker_slots = extended_worker_slots;
657
658 factor *= 2;
659 }
660
661 tracing::info!(
662 "migration timed out, extending worker slots to {:?} by factor {}",
663 new_worker_slots,
664 factor,
665 );
666 }
667
668 if !new_worker_slots.is_empty() {
669 debug!("new worker slots found: {:#?}", new_worker_slots);
670 for target_worker_slot in new_worker_slots {
671 if let Some(from) = to_migrate_worker_slots.pop() {
672 debug!(
673 "plan to migrate from worker slot {} to {}",
674 from, target_worker_slot
675 );
676 inuse_worker_slots.insert(target_worker_slot);
677 plan.insert(from, target_worker_slot);
678 } else {
679 break 'discovery;
680 }
681 }
682 }
683
684 if to_migrate_worker_slots.is_empty() {
685 break;
686 }
687
688 let changed = active_nodes
690 .wait_changed(
691 Duration::from_millis(5000),
692 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
693 |active_nodes| {
694 let current_nodes = active_nodes
695 .current()
696 .values()
697 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
698 .collect_vec();
699 warn!(
700 current_nodes = ?current_nodes,
701 "waiting for new workers to join, elapsed: {}s",
702 start.elapsed().as_secs()
703 );
704 },
705 )
706 .await;
707 warn!(?changed, "get worker changed or timed out. Retry migrate");
708 }
709
710 info!("migration plan {:?}", plan);
711
712 mgr.catalog_controller.migrate_actors(plan).await?;
713
714 info!("migrate actors succeed.");
715
716 self.resolve_graph_info(None).await
717 }
718
719 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
720 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
721 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
722 };
723
724 match self.scale_controller.integrity_check().await {
725 Ok(_) => {
726 info!("integrity check passed");
727 }
728 Err(e) => {
729 return Err(anyhow!(e).context("integrity check failed").into());
730 }
731 }
732
733 let mgr = &self.metadata_manager;
734
735 debug!("start resetting actors distribution");
736
737 let available_workers: HashMap<_, _> = active_nodes
738 .current()
739 .values()
740 .filter(|worker| worker.is_streaming_schedulable())
741 .map(|worker| (worker.id, worker.clone()))
742 .collect();
743
744 info!(
745 "target worker ids for offline scaling: {:?}",
746 available_workers
747 );
748
749 let available_parallelism = active_nodes
750 .current()
751 .values()
752 .map(|worker_node| worker_node.compute_node_parallelism())
753 .sum();
754
755 let mut table_parallelisms = HashMap::new();
756
757 let reschedule_targets: HashMap<_, _> = {
758 let streaming_parallelisms = mgr
759 .catalog_controller
760 .get_all_streaming_parallelisms()
761 .await?;
762
763 let mut result = HashMap::new();
764
765 for (object_id, streaming_parallelism) in streaming_parallelisms {
766 let actual_fragment_parallelism = mgr
767 .catalog_controller
768 .get_actual_job_fragment_parallelism(object_id)
769 .await?;
770
771 let table_parallelism = match streaming_parallelism {
772 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
773 StreamingParallelism::Custom => model::TableParallelism::Custom,
774 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
775 };
776
777 let target_parallelism = Self::derive_target_parallelism(
778 available_parallelism,
779 table_parallelism,
780 actual_fragment_parallelism,
781 self.env.opts.default_parallelism,
782 );
783
784 if target_parallelism != table_parallelism {
785 tracing::info!(
786 "resetting table {} parallelism from {:?} to {:?}",
787 object_id,
788 table_parallelism,
789 target_parallelism
790 );
791 }
792
793 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
794
795 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
796
797 result.insert(
798 object_id as u32,
799 JobRescheduleTarget {
800 parallelism: parallelism_change,
801 resource_group: JobResourceGroupTarget::Keep,
802 },
803 );
804 }
805
806 result
807 };
808
809 info!(
810 "target table parallelisms for offline scaling: {:?}",
811 reschedule_targets
812 );
813
814 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
815
816 for chunk in reschedule_targets
817 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
818 .map(|c| c.to_vec())
819 {
820 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
821
822 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
823
824 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
825
826 let plan = self
827 .scale_controller
828 .generate_job_reschedule_plan(JobReschedulePolicy {
829 targets: local_reschedule_targets,
830 })
831 .await?;
832
833 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
835 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
836 continue;
837 };
838
839 let mut compared_table_parallelisms = table_parallelisms.clone();
840
841 let reschedule_fragment = if plan.reschedules.is_empty() {
843 HashMap::new()
844 } else {
845 self.scale_controller
846 .analyze_reschedule_plan(
847 plan.reschedules,
848 RescheduleOptions {
849 resolve_no_shuffle_upstream: true,
850 skip_create_new_actors: true,
851 },
852 &mut compared_table_parallelisms,
853 )
854 .await?
855 };
856
857 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
859
860 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
861
862 if let Err(e) = self
863 .scale_controller
864 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
865 .await
866 {
867 tracing::error!(
868 error = %e.as_report(),
869 "failed to apply reschedule for offline scaling in recovery",
870 );
871
872 return Err(e);
873 }
874
875 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
876 }
877
878 info!("scaling actors succeed.");
879 Ok(())
880 }
881
882 fn derive_target_parallelism(
890 available_parallelism: usize,
891 assigned_parallelism: TableParallelism,
892 actual_fragment_parallelism: Option<usize>,
893 default_parallelism: DefaultParallelism,
894 ) -> TableParallelism {
895 match assigned_parallelism {
896 TableParallelism::Custom => {
897 if let Some(fragment_parallelism) = actual_fragment_parallelism {
898 if fragment_parallelism >= available_parallelism {
899 TableParallelism::Adaptive
900 } else {
901 TableParallelism::Fixed(fragment_parallelism)
902 }
903 } else {
904 TableParallelism::Adaptive
905 }
906 }
907 TableParallelism::Adaptive => {
908 match (default_parallelism, actual_fragment_parallelism) {
909 (DefaultParallelism::Default(n), Some(fragment_parallelism))
910 if fragment_parallelism == n.get() =>
911 {
912 TableParallelism::Fixed(fragment_parallelism)
913 }
914 _ => TableParallelism::Adaptive,
915 }
916 }
917 _ => assigned_parallelism,
918 }
919 }
920
921 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
923 self.metadata_manager.all_active_actors().await
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use std::num::NonZeroUsize;
930
931 use super::*;
932 #[test]
933 fn test_derive_target_parallelism() {
934 assert_eq!(
936 TableParallelism::Fixed(5),
937 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
938 10,
939 TableParallelism::Custom,
940 Some(5),
941 DefaultParallelism::Full,
942 )
943 );
944
945 assert_eq!(
947 TableParallelism::Adaptive,
948 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
949 10,
950 TableParallelism::Custom,
951 Some(10),
952 DefaultParallelism::Full,
953 )
954 );
955
956 assert_eq!(
958 TableParallelism::Adaptive,
959 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
960 10,
961 TableParallelism::Custom,
962 Some(11),
963 DefaultParallelism::Full,
964 )
965 );
966
967 assert_eq!(
969 TableParallelism::Adaptive,
970 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
971 10,
972 TableParallelism::Custom,
973 None,
974 DefaultParallelism::Full,
975 )
976 );
977
978 assert_eq!(
980 TableParallelism::Adaptive,
981 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
982 10,
983 TableParallelism::Adaptive,
984 None,
985 DefaultParallelism::Full,
986 )
987 );
988
989 assert_eq!(
991 TableParallelism::Fixed(5),
992 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
993 10,
994 TableParallelism::Adaptive,
995 Some(5),
996 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
997 )
998 );
999
1000 assert_eq!(
1002 TableParallelism::Adaptive,
1003 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1004 10,
1005 TableParallelism::Adaptive,
1006 Some(6),
1007 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1008 )
1009 );
1010 }
1011}