1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::{
39 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
40 RescheduleOptions, SourceChange, StreamFragmentGraph,
41};
42use crate::{MetaResult, model};
43
44impl GlobalBarrierWorkerContextImpl {
45 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47 let database_id = database_id.map(|database_id| database_id.database_id as _);
48 self.metadata_manager
49 .catalog_controller
50 .clean_dirty_subscription(database_id)
51 .await?;
52 let dirty_associated_source_ids = self
53 .metadata_manager
54 .catalog_controller
55 .clean_dirty_creating_jobs(database_id)
56 .await?;
57
58 self.source_manager
60 .apply_source_change(SourceChange::DropSource {
61 dropped_source_ids: dirty_associated_source_ids,
62 })
63 .await;
64
65 Ok(())
66 }
67
68 async fn purge_state_table_from_hummock(
69 &self,
70 all_state_table_ids: &HashSet<TableId>,
71 ) -> MetaResult<()> {
72 self.hummock_manager.purge(all_state_table_ids).await?;
73 Ok(())
74 }
75
76 async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
77 let mgr = &self.metadata_manager;
78 let mviews = mgr
79 .catalog_controller
80 .list_background_creating_mviews(false)
81 .await?;
82
83 try_join_all(mviews.into_iter().map(|mview| async move {
84 let table_id = TableId::new(mview.table_id as _);
85 let stream_job_fragments = mgr
86 .catalog_controller
87 .get_job_fragments_by_id(mview.table_id)
88 .await?;
89 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
90 Ok((mview.definition, stream_job_fragments))
91 }))
92 .await
93 }
95
96 async fn resolve_graph_info(
100 &self,
101 database_id: Option<DatabaseId>,
102 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
103 let database_id = database_id.map(|database_id| database_id.database_id as _);
104 let all_actor_infos = self
105 .metadata_manager
106 .catalog_controller
107 .load_all_actors(database_id)
108 .await?;
109
110 Ok(all_actor_infos
111 .into_iter()
112 .map(|(loaded_database_id, job_fragment_infos)| {
113 if let Some(database_id) = database_id {
114 assert_eq!(database_id, loaded_database_id);
115 }
116 (
117 DatabaseId::new(loaded_database_id as _),
118 job_fragment_infos
119 .into_iter()
120 .map(|(job_id, fragment_infos)| {
121 let job_id = TableId::new(job_id as _);
122 (
123 job_id,
124 InflightStreamingJobInfo {
125 job_id,
126 fragment_infos: fragment_infos
127 .into_iter()
128 .map(|(fragment_id, info)| (fragment_id as _, info))
129 .collect(),
130 },
131 )
132 })
133 .collect(),
134 )
135 })
136 .collect())
137 }
138
139 #[expect(clippy::type_complexity)]
140 fn resolve_hummock_version_epochs(
141 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
142 version: &HummockVersion,
143 ) -> MetaResult<(
144 HashMap<TableId, u64>,
145 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
146 )> {
147 let table_committed_epoch: HashMap<_, _> = version
148 .state_table_info
149 .info()
150 .iter()
151 .map(|(table_id, info)| (*table_id, info.committed_epoch))
152 .collect();
153 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
154 Ok(*table_committed_epoch
155 .get(&table_id)
156 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
157 };
158 let mut min_downstream_committed_epochs = HashMap::new();
159 for (_, job) in background_jobs.values() {
160 let job_committed_epoch = get_table_committed_epoch(job.stream_job_id)?;
161 if let (Some(snapshot_backfill_info), _) =
162 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
163 job.fragments()
164 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
165 )?
166 {
167 for (upstream_table, snapshot_epoch) in
168 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
169 {
170 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
171 anyhow!(
172 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
173 job
174 )
175 })?;
176 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
177 match min_downstream_committed_epochs.entry(upstream_table) {
178 Entry::Occupied(entry) => {
179 let prev_min_epoch = entry.into_mut();
180 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
181 }
182 Entry::Vacant(entry) => {
183 entry.insert(pinned_epoch);
184 }
185 }
186 }
187 }
188 }
189 let mut log_epochs = HashMap::new();
190 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
191 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
192 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
193 Ordering::Less => {
194 return Err(anyhow!(
195 "downstream epoch {} later than upstream epoch {} of table {}",
196 downstream_committed_epoch,
197 upstream_committed_epoch,
198 upstream_table_id
199 )
200 .into());
201 }
202 Ordering::Equal => {
203 continue;
204 }
205 Ordering::Greater => {
206 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
207 {
208 let epochs = table_change_log
209 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
210 .map(|epoch_log| {
211 (
212 epoch_log.non_checkpoint_epochs.clone(),
213 epoch_log.checkpoint_epoch,
214 )
215 })
216 .collect_vec();
217 let first_epochs = epochs.first();
218 if let Some((_, first_checkpoint_epoch)) = &first_epochs
219 && *first_checkpoint_epoch == downstream_committed_epoch
220 {
221 } else {
222 return Err(anyhow!(
223 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
224 epochs, upstream_table_id, downstream_committed_epoch).into()
225 );
226 }
227 log_epochs
228 .try_insert(upstream_table_id, epochs)
229 .expect("non-duplicated");
230 } else {
231 return Err(anyhow!(
232 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
233 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
234 );
235 }
236 }
237 }
238 }
239 Ok((table_committed_epoch, log_epochs))
240 }
241
242 pub(super) async fn reload_runtime_info_impl(
243 &self,
244 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
245 {
246 {
247 {
248 self.clean_dirty_streaming_jobs(None)
249 .await
250 .context("clean dirty streaming jobs")?;
251
252 tracing::info!("recovering mview progress");
254 let background_jobs = {
255 let jobs = self
256 .list_background_mv_progress()
257 .await
258 .context("recover mview progress should not fail")?;
259 let mut background_jobs = HashMap::new();
260 for (definition, stream_job_fragments) in jobs {
261 if stream_job_fragments
262 .tracking_progress_actor_ids()
263 .is_empty()
264 {
265 self.metadata_manager
267 .catalog_controller
268 .finish_streaming_job(
269 stream_job_fragments.stream_job_id().table_id as _,
270 None,
271 )
272 .await?;
273 } else {
274 background_jobs
275 .try_insert(
276 stream_job_fragments.stream_job_id(),
277 (definition, stream_job_fragments),
278 )
279 .expect("non-duplicate");
280 }
281 }
282 background_jobs
283 };
284
285 tracing::info!("recovered mview progress");
286
287 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
289
290 let mut active_streaming_nodes =
291 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
292 .await?;
293
294 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
295 info!(
296 "background streaming jobs: {:?} total {}",
297 background_streaming_jobs,
298 background_streaming_jobs.len()
299 );
300
301 let unreschedulable_jobs = {
302 let mut unreschedulable_jobs = HashSet::new();
303
304 for job_id in background_streaming_jobs {
305 let scan_types = self
306 .metadata_manager
307 .get_job_backfill_scan_types(&job_id)
308 .await?;
309
310 if scan_types
311 .values()
312 .any(|scan_type| !scan_type.is_reschedulable())
313 {
314 unreschedulable_jobs.insert(job_id);
315 }
316 }
317
318 unreschedulable_jobs
319 };
320
321 if !unreschedulable_jobs.is_empty() {
322 tracing::info!(
323 "unreschedulable background jobs: {:?}",
324 unreschedulable_jobs
325 );
326 }
327
328 let mut info = if !self.env.opts.disable_automatic_parallelism_control
333 && unreschedulable_jobs.is_empty()
334 {
335 info!("trigger offline scaling");
336 self.scale_actors(&active_streaming_nodes)
337 .await
338 .inspect_err(|err| {
339 warn!(error = %err.as_report(), "scale actors failed");
340 })?;
341
342 self.resolve_graph_info(None).await.inspect_err(|err| {
343 warn!(error = %err.as_report(), "resolve actor info failed");
344 })?
345 } else {
346 info!("trigger actor migration");
347 self.migrate_actors(&mut active_streaming_nodes)
349 .await
350 .inspect_err(|err| {
351 warn!(error = %err.as_report(), "migrate actors failed");
352 })?
353 };
354
355 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
356 info = self.resolve_graph_info(None).await.inspect_err(|err| {
357 warn!(error = %err.as_report(), "resolve actor info failed");
358 })?
359 }
360
361 let info = info;
362
363 self.purge_state_table_from_hummock(
364 &info
365 .values()
366 .flatten()
367 .flat_map(|(_, job)| job.existing_table_ids())
368 .collect(),
369 )
370 .await
371 .context("purge state table from hummock")?;
372
373 let (state_table_committed_epochs, state_table_log_epochs) = self
374 .hummock_manager
375 .on_current_version(|version| {
376 Self::resolve_hummock_version_epochs(&background_jobs, version)
377 })
378 .await?;
379
380 let subscription_infos = self
381 .metadata_manager
382 .get_mv_depended_subscriptions(None)
383 .await?
384 .into_iter()
385 .map(|(database_id, mv_depended_subscriptions)| {
386 (
387 database_id,
388 InflightSubscriptionInfo {
389 mv_depended_subscriptions,
390 },
391 )
392 })
393 .collect();
394
395 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
397 warn!(error = %err.as_report(), "update actors failed");
398 })?;
399
400 let fragment_relations = self
401 .metadata_manager
402 .catalog_controller
403 .get_fragment_downstream_relations(
404 info.values()
405 .flatten()
406 .flat_map(|(_, job)| job.fragment_infos())
407 .map(|fragment| fragment.fragment_id as _)
408 .collect(),
409 )
410 .await?;
411
412 let background_jobs = {
413 let jobs = self
414 .list_background_mv_progress()
415 .await
416 .context("recover mview progress should not fail")?;
417 let mut background_jobs = HashMap::new();
418 for (definition, stream_job_fragments) in jobs {
419 background_jobs
420 .try_insert(
421 stream_job_fragments.stream_job_id(),
422 (definition, stream_job_fragments),
423 )
424 .expect("non-duplicate");
425 }
426 background_jobs
427 };
428
429 let database_infos = self
430 .metadata_manager
431 .catalog_controller
432 .list_databases()
433 .await?;
434
435 let source_splits = self.source_manager.list_assignments().await;
437 Ok(BarrierWorkerRuntimeInfoSnapshot {
438 active_streaming_nodes,
439 database_job_infos: info,
440 state_table_committed_epochs,
441 state_table_log_epochs,
442 subscription_infos,
443 stream_actors,
444 fragment_relations,
445 source_splits,
446 background_jobs,
447 hummock_version_stats: self.hummock_manager.get_version_stats().await,
448 database_infos,
449 })
450 }
451 }
452 }
453 }
454
455 pub(super) async fn reload_database_runtime_info_impl(
456 &self,
457 database_id: DatabaseId,
458 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
459 self.clean_dirty_streaming_jobs(Some(database_id))
460 .await
461 .context("clean dirty streaming jobs")?;
462
463 tracing::info!(?database_id, "recovering mview progress of database");
465 let background_jobs = self
466 .list_background_mv_progress()
467 .await
468 .context("recover mview progress of database should not fail")?;
469 tracing::info!(?database_id, "recovered mview progress");
470
471 let _ = self
473 .scheduled_barriers
474 .pre_apply_drop_cancel(Some(database_id));
475
476 let info = self
477 .resolve_graph_info(Some(database_id))
478 .await
479 .inspect_err(|err| {
480 warn!(error = %err.as_report(), "resolve actor info failed");
481 })?;
482 assert!(info.len() <= 1);
483 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
484 assert_eq!(loaded_database_id, database_id);
485 info
486 }) else {
487 return Ok(None);
488 };
489
490 let background_jobs = {
491 let jobs = background_jobs;
492 let mut background_jobs = HashMap::new();
493 for (definition, stream_job_fragments) in jobs {
494 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
495 continue;
496 }
497 if stream_job_fragments
498 .tracking_progress_actor_ids()
499 .is_empty()
500 {
501 self.metadata_manager
503 .catalog_controller
504 .finish_streaming_job(
505 stream_job_fragments.stream_job_id().table_id as _,
506 None,
507 )
508 .await?;
509 } else {
510 background_jobs
511 .try_insert(
512 stream_job_fragments.stream_job_id(),
513 (definition, stream_job_fragments),
514 )
515 .expect("non-duplicate");
516 }
517 }
518 background_jobs
519 };
520
521 let (state_table_committed_epochs, state_table_log_epochs) = self
522 .hummock_manager
523 .on_current_version(|version| {
524 Self::resolve_hummock_version_epochs(&background_jobs, version)
525 })
526 .await?;
527
528 let subscription_infos = self
529 .metadata_manager
530 .get_mv_depended_subscriptions(Some(database_id))
531 .await?;
532 assert!(subscription_infos.len() <= 1);
533 let mv_depended_subscriptions = subscription_infos
534 .into_iter()
535 .next()
536 .map(|(loaded_database_id, subscriptions)| {
537 assert_eq!(loaded_database_id, database_id);
538 subscriptions
539 })
540 .unwrap_or_default();
541 let subscription_info = InflightSubscriptionInfo {
542 mv_depended_subscriptions,
543 };
544
545 let fragment_relations = self
546 .metadata_manager
547 .catalog_controller
548 .get_fragment_downstream_relations(
549 info.values()
550 .flatten()
551 .map(|fragment| fragment.fragment_id as _)
552 .collect(),
553 )
554 .await?;
555
556 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
558 warn!(error = %err.as_report(), "update actors failed");
559 })?;
560
561 let source_splits = self.source_manager.list_assignments().await;
563 Ok(Some(DatabaseRuntimeInfoSnapshot {
564 job_infos: info,
565 state_table_committed_epochs,
566 state_table_log_epochs,
567 subscription_info,
568 stream_actors,
569 fragment_relations,
570 source_splits,
571 background_jobs,
572 }))
573 }
574}
575
576impl GlobalBarrierWorkerContextImpl {
577 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
579
580 async fn migrate_actors(
582 &self,
583 active_nodes: &mut ActiveStreamingWorkerNodes,
584 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
585 let mgr = &self.metadata_manager;
586
587 let all_inuse_worker_slots: HashSet<_> = mgr
589 .catalog_controller
590 .all_inuse_worker_slots()
591 .await?
592 .into_iter()
593 .collect();
594
595 let active_worker_slots: HashSet<_> = active_nodes
596 .current()
597 .values()
598 .flat_map(|node| {
599 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
600 })
601 .collect();
602
603 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
604 .difference(&active_worker_slots)
605 .cloned()
606 .collect();
607
608 if expired_worker_slots.is_empty() {
609 info!("no expired worker slots, skipping.");
610 return self.resolve_graph_info(None).await;
611 }
612
613 info!("start migrate actors.");
614 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
615 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
616
617 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
618 .intersection(&active_worker_slots)
619 .cloned()
620 .collect();
621
622 let start = Instant::now();
623 let mut plan = HashMap::new();
624 'discovery: while !to_migrate_worker_slots.is_empty() {
625 let mut new_worker_slots = active_nodes
626 .current()
627 .values()
628 .flat_map(|worker| {
629 (0..worker.compute_node_parallelism())
630 .map(move |i| WorkerSlotId::new(worker.id, i as _))
631 })
632 .collect_vec();
633
634 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
635 let to_migration_size = to_migrate_worker_slots.len();
636 let mut available_size = new_worker_slots.len();
637
638 if available_size < to_migration_size
639 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
640 {
641 let mut factor = 2;
642
643 while available_size < to_migration_size {
644 let mut extended_worker_slots = active_nodes
645 .current()
646 .values()
647 .flat_map(|worker| {
648 (0..worker.compute_node_parallelism() * factor)
649 .map(move |i| WorkerSlotId::new(worker.id, i as _))
650 })
651 .collect_vec();
652
653 extended_worker_slots
654 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
655
656 extended_worker_slots.sort_by(|a, b| {
657 a.slot_idx()
658 .cmp(&b.slot_idx())
659 .then(a.worker_id().cmp(&b.worker_id()))
660 });
661
662 available_size = extended_worker_slots.len();
663 new_worker_slots = extended_worker_slots;
664
665 factor *= 2;
666 }
667
668 tracing::info!(
669 "migration timed out, extending worker slots to {:?} by factor {}",
670 new_worker_slots,
671 factor,
672 );
673 }
674
675 if !new_worker_slots.is_empty() {
676 debug!("new worker slots found: {:#?}", new_worker_slots);
677 for target_worker_slot in new_worker_slots {
678 if let Some(from) = to_migrate_worker_slots.pop() {
679 debug!(
680 "plan to migrate from worker slot {} to {}",
681 from, target_worker_slot
682 );
683 inuse_worker_slots.insert(target_worker_slot);
684 plan.insert(from, target_worker_slot);
685 } else {
686 break 'discovery;
687 }
688 }
689 }
690
691 if to_migrate_worker_slots.is_empty() {
692 break;
693 }
694
695 let changed = active_nodes
697 .wait_changed(
698 Duration::from_millis(5000),
699 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
700 |active_nodes| {
701 let current_nodes = active_nodes
702 .current()
703 .values()
704 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
705 .collect_vec();
706 warn!(
707 current_nodes = ?current_nodes,
708 "waiting for new workers to join, elapsed: {}s",
709 start.elapsed().as_secs()
710 );
711 },
712 )
713 .await;
714 warn!(?changed, "get worker changed or timed out. Retry migrate");
715 }
716
717 info!("migration plan {:?}", plan);
718
719 mgr.catalog_controller.migrate_actors(plan).await?;
720
721 info!("migrate actors succeed.");
722
723 self.resolve_graph_info(None).await
724 }
725
726 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
727 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
728 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
729 };
730
731 match self.scale_controller.integrity_check().await {
732 Ok(_) => {
733 info!("integrity check passed");
734 }
735 Err(e) => {
736 return Err(anyhow!(e).context("integrity check failed").into());
737 }
738 }
739
740 let mgr = &self.metadata_manager;
741
742 debug!("start resetting actors distribution");
743
744 let available_workers: HashMap<_, _> = active_nodes
745 .current()
746 .values()
747 .filter(|worker| worker.is_streaming_schedulable())
748 .map(|worker| (worker.id, worker.clone()))
749 .collect();
750
751 info!(
752 "target worker ids for offline scaling: {:?}",
753 available_workers
754 );
755
756 let available_parallelism = active_nodes
757 .current()
758 .values()
759 .map(|worker_node| worker_node.compute_node_parallelism())
760 .sum();
761
762 let mut table_parallelisms = HashMap::new();
763
764 let reschedule_targets: HashMap<_, _> = {
765 let streaming_parallelisms = mgr
766 .catalog_controller
767 .get_all_streaming_parallelisms()
768 .await?;
769
770 let mut result = HashMap::new();
771
772 for (object_id, streaming_parallelism) in streaming_parallelisms {
773 let actual_fragment_parallelism = mgr
774 .catalog_controller
775 .get_actual_job_fragment_parallelism(object_id)
776 .await?;
777
778 let table_parallelism = match streaming_parallelism {
779 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
780 StreamingParallelism::Custom => model::TableParallelism::Custom,
781 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
782 };
783
784 let target_parallelism = Self::derive_target_parallelism(
785 available_parallelism,
786 table_parallelism,
787 actual_fragment_parallelism,
788 self.env.opts.default_parallelism,
789 );
790
791 if target_parallelism != table_parallelism {
792 tracing::info!(
793 "resetting table {} parallelism from {:?} to {:?}",
794 object_id,
795 table_parallelism,
796 target_parallelism
797 );
798 }
799
800 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
801
802 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
803
804 result.insert(
805 object_id as u32,
806 JobRescheduleTarget {
807 parallelism: parallelism_change,
808 resource_group: JobResourceGroupTarget::Keep,
809 },
810 );
811 }
812
813 result
814 };
815
816 info!(
817 "target table parallelisms for offline scaling: {:?}",
818 reschedule_targets
819 );
820
821 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
822
823 for chunk in reschedule_targets
824 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
825 .map(|c| c.to_vec())
826 {
827 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
828
829 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
830
831 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
832
833 let plan = self
834 .scale_controller
835 .generate_job_reschedule_plan(JobReschedulePolicy {
836 targets: local_reschedule_targets,
837 })
838 .await?;
839
840 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
842 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
843 continue;
844 };
845
846 let mut compared_table_parallelisms = table_parallelisms.clone();
847
848 let reschedule_fragment = if plan.reschedules.is_empty() {
850 HashMap::new()
851 } else {
852 self.scale_controller
853 .analyze_reschedule_plan(
854 plan.reschedules,
855 RescheduleOptions {
856 resolve_no_shuffle_upstream: true,
857 skip_create_new_actors: true,
858 },
859 &mut compared_table_parallelisms,
860 )
861 .await?
862 };
863
864 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
866
867 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
868
869 if let Err(e) = self
870 .scale_controller
871 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
872 .await
873 {
874 tracing::error!(
875 error = %e.as_report(),
876 "failed to apply reschedule for offline scaling in recovery",
877 );
878
879 return Err(e);
880 }
881
882 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
883 }
884
885 info!("scaling actors succeed.");
886 Ok(())
887 }
888
889 fn derive_target_parallelism(
897 available_parallelism: usize,
898 assigned_parallelism: TableParallelism,
899 actual_fragment_parallelism: Option<usize>,
900 default_parallelism: DefaultParallelism,
901 ) -> TableParallelism {
902 match assigned_parallelism {
903 TableParallelism::Custom => {
904 if let Some(fragment_parallelism) = actual_fragment_parallelism {
905 if fragment_parallelism >= available_parallelism {
906 TableParallelism::Adaptive
907 } else {
908 TableParallelism::Fixed(fragment_parallelism)
909 }
910 } else {
911 TableParallelism::Adaptive
912 }
913 }
914 TableParallelism::Adaptive => {
915 match (default_parallelism, actual_fragment_parallelism) {
916 (DefaultParallelism::Default(n), Some(fragment_parallelism))
917 if fragment_parallelism == n.get() =>
918 {
919 TableParallelism::Fixed(fragment_parallelism)
920 }
921 _ => TableParallelism::Adaptive,
922 }
923 }
924 _ => assigned_parallelism,
925 }
926 }
927
928 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
930 self.metadata_manager.all_active_actors().await
931 }
932}
933
934#[cfg(test)]
935mod tests {
936 use std::num::NonZeroUsize;
937
938 use super::*;
939 #[test]
940 fn test_derive_target_parallelism() {
941 assert_eq!(
943 TableParallelism::Fixed(5),
944 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
945 10,
946 TableParallelism::Custom,
947 Some(5),
948 DefaultParallelism::Full,
949 )
950 );
951
952 assert_eq!(
954 TableParallelism::Adaptive,
955 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
956 10,
957 TableParallelism::Custom,
958 Some(10),
959 DefaultParallelism::Full,
960 )
961 );
962
963 assert_eq!(
965 TableParallelism::Adaptive,
966 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
967 10,
968 TableParallelism::Custom,
969 Some(11),
970 DefaultParallelism::Full,
971 )
972 );
973
974 assert_eq!(
976 TableParallelism::Adaptive,
977 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
978 10,
979 TableParallelism::Custom,
980 None,
981 DefaultParallelism::Full,
982 )
983 );
984
985 assert_eq!(
987 TableParallelism::Adaptive,
988 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
989 10,
990 TableParallelism::Adaptive,
991 None,
992 DefaultParallelism::Full,
993 )
994 );
995
996 assert_eq!(
998 TableParallelism::Fixed(5),
999 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1000 10,
1001 TableParallelism::Adaptive,
1002 Some(5),
1003 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1004 )
1005 );
1006
1007 assert_eq!(
1009 TableParallelism::Adaptive,
1010 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1011 10,
1012 TableParallelism::Adaptive,
1013 Some(6),
1014 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1015 )
1016 );
1017 }
1018}