1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::{
39 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
40 RescheduleOptions, SourceChange, StreamFragmentGraph,
41};
42use crate::{MetaResult, model};
43
44impl GlobalBarrierWorkerContextImpl {
45 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47 let database_id = database_id.map(|database_id| database_id.database_id as _);
48 self.metadata_manager
49 .catalog_controller
50 .clean_dirty_subscription(database_id)
51 .await?;
52 let dirty_associated_source_ids = self
53 .metadata_manager
54 .catalog_controller
55 .clean_dirty_creating_jobs(database_id)
56 .await?;
57
58 self.source_manager
60 .apply_source_change(SourceChange::DropSource {
61 dropped_source_ids: dirty_associated_source_ids,
62 })
63 .await;
64
65 Ok(())
66 }
67
68 async fn purge_state_table_from_hummock(
69 &self,
70 all_state_table_ids: &HashSet<TableId>,
71 ) -> MetaResult<()> {
72 self.hummock_manager.purge(all_state_table_ids).await?;
73 Ok(())
74 }
75
76 async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
77 let mgr = &self.metadata_manager;
78 let mviews = mgr
79 .catalog_controller
80 .list_background_creating_mviews(false)
81 .await?;
82
83 try_join_all(mviews.into_iter().map(|mview| async move {
84 let table_id = TableId::new(mview.table_id as _);
85 let stream_job_fragments = mgr
86 .catalog_controller
87 .get_job_fragments_by_id(mview.table_id)
88 .await?;
89 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
90 Ok((mview.definition, stream_job_fragments))
91 }))
92 .await
93 }
95
96 async fn resolve_graph_info(
100 &self,
101 database_id: Option<DatabaseId>,
102 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
103 let database_id = database_id.map(|database_id| database_id.database_id as _);
104 let all_actor_infos = self
105 .metadata_manager
106 .catalog_controller
107 .load_all_actors(database_id)
108 .await?;
109
110 Ok(all_actor_infos
111 .into_iter()
112 .map(|(loaded_database_id, job_fragment_infos)| {
113 if let Some(database_id) = database_id {
114 assert_eq!(database_id, loaded_database_id);
115 }
116 (
117 DatabaseId::new(loaded_database_id as _),
118 job_fragment_infos
119 .into_iter()
120 .map(|(job_id, fragment_infos)| {
121 let job_id = TableId::new(job_id as _);
122 (
123 job_id,
124 InflightStreamingJobInfo {
125 job_id,
126 fragment_infos: fragment_infos
127 .into_iter()
128 .map(|(fragment_id, info)| (fragment_id as _, info))
129 .collect(),
130 },
131 )
132 })
133 .collect(),
134 )
135 })
136 .collect())
137 }
138
139 #[expect(clippy::type_complexity)]
140 fn resolve_hummock_version_epochs(
141 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
142 version: &HummockVersion,
143 ) -> MetaResult<(HashMap<TableId, u64>, HashMap<TableId, Vec<Vec<u64>>>)> {
144 let table_committed_epoch: HashMap<_, _> = version
145 .state_table_info
146 .info()
147 .iter()
148 .map(|(table_id, info)| (*table_id, info.committed_epoch))
149 .collect();
150 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
151 Ok(*table_committed_epoch
152 .get(&table_id)
153 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
154 };
155 let mut min_downstream_committed_epochs = HashMap::new();
156 for (_, job) in background_jobs.values() {
157 let job_committed_epoch = get_table_committed_epoch(job.stream_job_id)?;
158 if let (Some(snapshot_backfill_info), _) =
159 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
160 job.fragments()
161 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
162 )?
163 {
164 for (upstream_table, snapshot_epoch) in
165 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
166 {
167 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
168 anyhow!(
169 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
170 job
171 )
172 })?;
173 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
174 match min_downstream_committed_epochs.entry(upstream_table) {
175 Entry::Occupied(entry) => {
176 let prev_min_epoch = entry.into_mut();
177 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
178 }
179 Entry::Vacant(entry) => {
180 entry.insert(pinned_epoch);
181 }
182 }
183 }
184 }
185 }
186 let mut log_epochs = HashMap::new();
187 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
188 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
189 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
190 Ordering::Less => {
191 return Err(anyhow!(
192 "downstream epoch {} later than upstream epoch {} of table {}",
193 downstream_committed_epoch,
194 upstream_committed_epoch,
195 upstream_table_id
196 )
197 .into());
198 }
199 Ordering::Equal => {
200 continue;
201 }
202 Ordering::Greater => {
203 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
204 {
205 let epochs = table_change_log
206 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
207 .map(|epoch_log| epoch_log.epochs.clone())
208 .collect_vec();
209 let first_epochs = epochs.first();
210 if let Some(first_epochs) = &first_epochs
211 && first_epochs.last() == Some(&downstream_committed_epoch)
212 {
213 } else {
214 return Err(anyhow!(
215 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
216 epochs, upstream_table_id, downstream_committed_epoch).into()
217 );
218 }
219 log_epochs
220 .try_insert(upstream_table_id, epochs)
221 .expect("non-duplicated");
222 } else {
223 return Err(anyhow!(
224 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
225 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
226 );
227 }
228 }
229 }
230 }
231 Ok((table_committed_epoch, log_epochs))
232 }
233
234 pub(super) async fn reload_runtime_info_impl(
235 &self,
236 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
237 {
238 {
239 {
240 self.clean_dirty_streaming_jobs(None)
241 .await
242 .context("clean dirty streaming jobs")?;
243
244 tracing::info!("recovering mview progress");
246 let background_jobs = {
247 let jobs = self
248 .list_background_mv_progress()
249 .await
250 .context("recover mview progress should not fail")?;
251 let mut background_jobs = HashMap::new();
252 for (definition, stream_job_fragments) in jobs {
253 if stream_job_fragments
254 .tracking_progress_actor_ids()
255 .is_empty()
256 {
257 self.metadata_manager
259 .catalog_controller
260 .finish_streaming_job(
261 stream_job_fragments.stream_job_id().table_id as _,
262 None,
263 )
264 .await?;
265 } else {
266 background_jobs
267 .try_insert(
268 stream_job_fragments.stream_job_id(),
269 (definition, stream_job_fragments),
270 )
271 .expect("non-duplicate");
272 }
273 }
274 background_jobs
275 };
276
277 tracing::info!("recovered mview progress");
278
279 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
281
282 let mut active_streaming_nodes =
283 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
284 .await?;
285
286 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
287 info!(
288 "background streaming jobs: {:?} total {}",
289 background_streaming_jobs,
290 background_streaming_jobs.len()
291 );
292
293 let unreschedulable_jobs = {
294 let mut unreschedulable_jobs = HashSet::new();
295
296 for job_id in background_streaming_jobs {
297 let scan_types = self
298 .metadata_manager
299 .get_job_backfill_scan_types(&job_id)
300 .await?;
301
302 if scan_types
303 .values()
304 .any(|scan_type| !scan_type.is_reschedulable())
305 {
306 unreschedulable_jobs.insert(job_id);
307 }
308 }
309
310 unreschedulable_jobs
311 };
312
313 if !unreschedulable_jobs.is_empty() {
314 tracing::info!(
315 "unreschedulable background jobs: {:?}",
316 unreschedulable_jobs
317 );
318 }
319
320 let mut info = if !self.env.opts.disable_automatic_parallelism_control
325 && unreschedulable_jobs.is_empty()
326 {
327 info!("trigger offline scaling");
328 self.scale_actors(&active_streaming_nodes)
329 .await
330 .inspect_err(|err| {
331 warn!(error = %err.as_report(), "scale actors failed");
332 })?;
333
334 self.resolve_graph_info(None).await.inspect_err(|err| {
335 warn!(error = %err.as_report(), "resolve actor info failed");
336 })?
337 } else {
338 info!("trigger actor migration");
339 self.migrate_actors(&mut active_streaming_nodes)
341 .await
342 .inspect_err(|err| {
343 warn!(error = %err.as_report(), "migrate actors failed");
344 })?
345 };
346
347 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
348 info = self.resolve_graph_info(None).await.inspect_err(|err| {
349 warn!(error = %err.as_report(), "resolve actor info failed");
350 })?
351 }
352
353 let info = info;
354
355 self.purge_state_table_from_hummock(
356 &info
357 .values()
358 .flatten()
359 .flat_map(|(_, job)| job.existing_table_ids())
360 .collect(),
361 )
362 .await
363 .context("purge state table from hummock")?;
364
365 let (state_table_committed_epochs, state_table_log_epochs) = self
366 .hummock_manager
367 .on_current_version(|version| {
368 Self::resolve_hummock_version_epochs(&background_jobs, version)
369 })
370 .await?;
371
372 let subscription_infos = self
373 .metadata_manager
374 .get_mv_depended_subscriptions(None)
375 .await?
376 .into_iter()
377 .map(|(database_id, mv_depended_subscriptions)| {
378 (
379 database_id,
380 InflightSubscriptionInfo {
381 mv_depended_subscriptions,
382 },
383 )
384 })
385 .collect();
386
387 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
389 warn!(error = %err.as_report(), "update actors failed");
390 })?;
391
392 let fragment_relations = self
393 .metadata_manager
394 .catalog_controller
395 .get_fragment_downstream_relations(
396 info.values()
397 .flatten()
398 .flat_map(|(_, job)| job.fragment_infos())
399 .map(|fragment| fragment.fragment_id as _)
400 .collect(),
401 )
402 .await?;
403
404 let background_jobs = {
405 let jobs = self
406 .list_background_mv_progress()
407 .await
408 .context("recover mview progress should not fail")?;
409 let mut background_jobs = HashMap::new();
410 for (definition, stream_job_fragments) in jobs {
411 background_jobs
412 .try_insert(
413 stream_job_fragments.stream_job_id(),
414 (definition, stream_job_fragments),
415 )
416 .expect("non-duplicate");
417 }
418 background_jobs
419 };
420
421 let source_splits = self.source_manager.list_assignments().await;
423 Ok(BarrierWorkerRuntimeInfoSnapshot {
424 active_streaming_nodes,
425 database_job_infos: info,
426 state_table_committed_epochs,
427 state_table_log_epochs,
428 subscription_infos,
429 stream_actors,
430 fragment_relations,
431 source_splits,
432 background_jobs,
433 hummock_version_stats: self.hummock_manager.get_version_stats().await,
434 })
435 }
436 }
437 }
438 }
439
440 pub(super) async fn reload_database_runtime_info_impl(
441 &self,
442 database_id: DatabaseId,
443 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
444 self.clean_dirty_streaming_jobs(Some(database_id))
445 .await
446 .context("clean dirty streaming jobs")?;
447
448 tracing::info!(?database_id, "recovering mview progress of database");
450 let background_jobs = self
451 .list_background_mv_progress()
452 .await
453 .context("recover mview progress of database should not fail")?;
454 tracing::info!(?database_id, "recovered mview progress");
455
456 let _ = self
458 .scheduled_barriers
459 .pre_apply_drop_cancel(Some(database_id));
460
461 let info = self
462 .resolve_graph_info(Some(database_id))
463 .await
464 .inspect_err(|err| {
465 warn!(error = %err.as_report(), "resolve actor info failed");
466 })?;
467 assert!(info.len() <= 1);
468 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
469 assert_eq!(loaded_database_id, database_id);
470 info
471 }) else {
472 return Ok(None);
473 };
474
475 let background_jobs = {
476 let jobs = background_jobs;
477 let mut background_jobs = HashMap::new();
478 for (definition, stream_job_fragments) in jobs {
479 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
480 continue;
481 }
482 if stream_job_fragments
483 .tracking_progress_actor_ids()
484 .is_empty()
485 {
486 self.metadata_manager
488 .catalog_controller
489 .finish_streaming_job(
490 stream_job_fragments.stream_job_id().table_id as _,
491 None,
492 )
493 .await?;
494 } else {
495 background_jobs
496 .try_insert(
497 stream_job_fragments.stream_job_id(),
498 (definition, stream_job_fragments),
499 )
500 .expect("non-duplicate");
501 }
502 }
503 background_jobs
504 };
505
506 let (state_table_committed_epochs, state_table_log_epochs) = self
507 .hummock_manager
508 .on_current_version(|version| {
509 Self::resolve_hummock_version_epochs(&background_jobs, version)
510 })
511 .await?;
512
513 let subscription_infos = self
514 .metadata_manager
515 .get_mv_depended_subscriptions(Some(database_id))
516 .await?;
517 assert!(subscription_infos.len() <= 1);
518 let mv_depended_subscriptions = subscription_infos
519 .into_iter()
520 .next()
521 .map(|(loaded_database_id, subscriptions)| {
522 assert_eq!(loaded_database_id, database_id);
523 subscriptions
524 })
525 .unwrap_or_default();
526 let subscription_info = InflightSubscriptionInfo {
527 mv_depended_subscriptions,
528 };
529
530 let fragment_relations = self
531 .metadata_manager
532 .catalog_controller
533 .get_fragment_downstream_relations(
534 info.values()
535 .flatten()
536 .map(|fragment| fragment.fragment_id as _)
537 .collect(),
538 )
539 .await?;
540
541 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
543 warn!(error = %err.as_report(), "update actors failed");
544 })?;
545
546 let source_splits = self.source_manager.list_assignments().await;
548 Ok(Some(DatabaseRuntimeInfoSnapshot {
549 job_infos: info,
550 state_table_committed_epochs,
551 state_table_log_epochs,
552 subscription_info,
553 stream_actors,
554 fragment_relations,
555 source_splits,
556 background_jobs,
557 }))
558 }
559}
560
561impl GlobalBarrierWorkerContextImpl {
562 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
564
565 async fn migrate_actors(
567 &self,
568 active_nodes: &mut ActiveStreamingWorkerNodes,
569 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
570 let mgr = &self.metadata_manager;
571
572 let all_inuse_worker_slots: HashSet<_> = mgr
574 .catalog_controller
575 .all_inuse_worker_slots()
576 .await?
577 .into_iter()
578 .collect();
579
580 let active_worker_slots: HashSet<_> = active_nodes
581 .current()
582 .values()
583 .flat_map(|node| {
584 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
585 })
586 .collect();
587
588 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
589 .difference(&active_worker_slots)
590 .cloned()
591 .collect();
592
593 if expired_worker_slots.is_empty() {
594 info!("no expired worker slots, skipping.");
595 return self.resolve_graph_info(None).await;
596 }
597
598 info!("start migrate actors.");
599 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
600 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
601
602 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
603 .intersection(&active_worker_slots)
604 .cloned()
605 .collect();
606
607 let start = Instant::now();
608 let mut plan = HashMap::new();
609 'discovery: while !to_migrate_worker_slots.is_empty() {
610 let mut new_worker_slots = active_nodes
611 .current()
612 .values()
613 .flat_map(|worker| {
614 (0..worker.compute_node_parallelism())
615 .map(move |i| WorkerSlotId::new(worker.id, i as _))
616 })
617 .collect_vec();
618
619 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
620 let to_migration_size = to_migrate_worker_slots.len();
621 let mut available_size = new_worker_slots.len();
622
623 if available_size < to_migration_size
624 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
625 {
626 let mut factor = 2;
627
628 while available_size < to_migration_size {
629 let mut extended_worker_slots = active_nodes
630 .current()
631 .values()
632 .flat_map(|worker| {
633 (0..worker.compute_node_parallelism() * factor)
634 .map(move |i| WorkerSlotId::new(worker.id, i as _))
635 })
636 .collect_vec();
637
638 extended_worker_slots
639 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
640
641 extended_worker_slots.sort_by(|a, b| {
642 a.slot_idx()
643 .cmp(&b.slot_idx())
644 .then(a.worker_id().cmp(&b.worker_id()))
645 });
646
647 available_size = extended_worker_slots.len();
648 new_worker_slots = extended_worker_slots;
649
650 factor *= 2;
651 }
652
653 tracing::info!(
654 "migration timed out, extending worker slots to {:?} by factor {}",
655 new_worker_slots,
656 factor,
657 );
658 }
659
660 if !new_worker_slots.is_empty() {
661 debug!("new worker slots found: {:#?}", new_worker_slots);
662 for target_worker_slot in new_worker_slots {
663 if let Some(from) = to_migrate_worker_slots.pop() {
664 debug!(
665 "plan to migrate from worker slot {} to {}",
666 from, target_worker_slot
667 );
668 inuse_worker_slots.insert(target_worker_slot);
669 plan.insert(from, target_worker_slot);
670 } else {
671 break 'discovery;
672 }
673 }
674 }
675
676 if to_migrate_worker_slots.is_empty() {
677 break;
678 }
679
680 let changed = active_nodes
682 .wait_changed(
683 Duration::from_millis(5000),
684 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
685 |active_nodes| {
686 let current_nodes = active_nodes
687 .current()
688 .values()
689 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
690 .collect_vec();
691 warn!(
692 current_nodes = ?current_nodes,
693 "waiting for new workers to join, elapsed: {}s",
694 start.elapsed().as_secs()
695 );
696 },
697 )
698 .await;
699 warn!(?changed, "get worker changed or timed out. Retry migrate");
700 }
701
702 info!("migration plan {:?}", plan);
703
704 mgr.catalog_controller.migrate_actors(plan).await?;
705
706 info!("migrate actors succeed.");
707
708 self.resolve_graph_info(None).await
709 }
710
711 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
712 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
713 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
714 };
715
716 match self.scale_controller.integrity_check().await {
717 Ok(_) => {
718 info!("integrity check passed");
719 }
720 Err(e) => {
721 return Err(anyhow!(e).context("integrity check failed").into());
722 }
723 }
724
725 let mgr = &self.metadata_manager;
726
727 debug!("start resetting actors distribution");
728
729 let available_workers: HashMap<_, _> = active_nodes
730 .current()
731 .values()
732 .filter(|worker| worker.is_streaming_schedulable())
733 .map(|worker| (worker.id, worker.clone()))
734 .collect();
735
736 info!(
737 "target worker ids for offline scaling: {:?}",
738 available_workers
739 );
740
741 let available_parallelism = active_nodes
742 .current()
743 .values()
744 .map(|worker_node| worker_node.compute_node_parallelism())
745 .sum();
746
747 let mut table_parallelisms = HashMap::new();
748
749 let reschedule_targets: HashMap<_, _> = {
750 let streaming_parallelisms = mgr
751 .catalog_controller
752 .get_all_streaming_parallelisms()
753 .await?;
754
755 let mut result = HashMap::new();
756
757 for (object_id, streaming_parallelism) in streaming_parallelisms {
758 let actual_fragment_parallelism = mgr
759 .catalog_controller
760 .get_actual_job_fragment_parallelism(object_id)
761 .await?;
762
763 let table_parallelism = match streaming_parallelism {
764 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
765 StreamingParallelism::Custom => model::TableParallelism::Custom,
766 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
767 };
768
769 let target_parallelism = Self::derive_target_parallelism(
770 available_parallelism,
771 table_parallelism,
772 actual_fragment_parallelism,
773 self.env.opts.default_parallelism,
774 );
775
776 if target_parallelism != table_parallelism {
777 tracing::info!(
778 "resetting table {} parallelism from {:?} to {:?}",
779 object_id,
780 table_parallelism,
781 target_parallelism
782 );
783 }
784
785 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
786
787 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
788
789 result.insert(
790 object_id as u32,
791 JobRescheduleTarget {
792 parallelism: parallelism_change,
793 resource_group: JobResourceGroupTarget::Keep,
794 },
795 );
796 }
797
798 result
799 };
800
801 info!(
802 "target table parallelisms for offline scaling: {:?}",
803 reschedule_targets
804 );
805
806 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
807
808 for chunk in reschedule_targets
809 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
810 .map(|c| c.to_vec())
811 {
812 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
813
814 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
815
816 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
817
818 let plan = self
819 .scale_controller
820 .generate_job_reschedule_plan(JobReschedulePolicy {
821 targets: local_reschedule_targets,
822 })
823 .await?;
824
825 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
827 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
828 continue;
829 };
830
831 let mut compared_table_parallelisms = table_parallelisms.clone();
832
833 let reschedule_fragment = if plan.reschedules.is_empty() {
835 HashMap::new()
836 } else {
837 self.scale_controller
838 .analyze_reschedule_plan(
839 plan.reschedules,
840 RescheduleOptions {
841 resolve_no_shuffle_upstream: true,
842 skip_create_new_actors: true,
843 },
844 &mut compared_table_parallelisms,
845 )
846 .await?
847 };
848
849 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
851
852 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
853
854 if let Err(e) = self
855 .scale_controller
856 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
857 .await
858 {
859 tracing::error!(
860 error = %e.as_report(),
861 "failed to apply reschedule for offline scaling in recovery",
862 );
863
864 return Err(e);
865 }
866
867 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
868 }
869
870 info!("scaling actors succeed.");
871 Ok(())
872 }
873
874 fn derive_target_parallelism(
882 available_parallelism: usize,
883 assigned_parallelism: TableParallelism,
884 actual_fragment_parallelism: Option<usize>,
885 default_parallelism: DefaultParallelism,
886 ) -> TableParallelism {
887 match assigned_parallelism {
888 TableParallelism::Custom => {
889 if let Some(fragment_parallelism) = actual_fragment_parallelism {
890 if fragment_parallelism >= available_parallelism {
891 TableParallelism::Adaptive
892 } else {
893 TableParallelism::Fixed(fragment_parallelism)
894 }
895 } else {
896 TableParallelism::Adaptive
897 }
898 }
899 TableParallelism::Adaptive => {
900 match (default_parallelism, actual_fragment_parallelism) {
901 (DefaultParallelism::Default(n), Some(fragment_parallelism))
902 if fragment_parallelism == n.get() =>
903 {
904 TableParallelism::Fixed(fragment_parallelism)
905 }
906 _ => TableParallelism::Adaptive,
907 }
908 }
909 _ => assigned_parallelism,
910 }
911 }
912
913 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
915 self.metadata_manager.all_active_actors().await
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use std::num::NonZeroUsize;
922
923 use super::*;
924 #[test]
925 fn test_derive_target_parallelism() {
926 assert_eq!(
928 TableParallelism::Fixed(5),
929 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
930 10,
931 TableParallelism::Custom,
932 Some(5),
933 DefaultParallelism::Full,
934 )
935 );
936
937 assert_eq!(
939 TableParallelism::Adaptive,
940 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
941 10,
942 TableParallelism::Custom,
943 Some(10),
944 DefaultParallelism::Full,
945 )
946 );
947
948 assert_eq!(
950 TableParallelism::Adaptive,
951 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
952 10,
953 TableParallelism::Custom,
954 Some(11),
955 DefaultParallelism::Full,
956 )
957 );
958
959 assert_eq!(
961 TableParallelism::Adaptive,
962 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
963 10,
964 TableParallelism::Custom,
965 None,
966 DefaultParallelism::Full,
967 )
968 );
969
970 assert_eq!(
972 TableParallelism::Adaptive,
973 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
974 10,
975 TableParallelism::Adaptive,
976 None,
977 DefaultParallelism::Full,
978 )
979 );
980
981 assert_eq!(
983 TableParallelism::Fixed(5),
984 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
985 10,
986 TableParallelism::Adaptive,
987 Some(5),
988 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
989 )
990 );
991
992 assert_eq!(
994 TableParallelism::Adaptive,
995 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
996 10,
997 TableParallelism::Adaptive,
998 Some(6),
999 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1000 )
1001 );
1002 }
1003}