1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeSet, HashMap, HashSet};
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use futures::future::try_join_all;
22use itertools::Itertools;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::config::DefaultParallelism;
25use risingwave_common::hash::WorkerSlotId;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::StreamingParallelism;
28use thiserror_ext::AsReport;
29use tokio::time::Instant;
30use tracing::{debug, info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::barrier::context::GlobalBarrierWorkerContextImpl;
34use crate::barrier::info::InflightStreamingJobInfo;
35use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
36use crate::manager::ActiveStreamingWorkerNodes;
37use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
38use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
39use crate::stream::{
40 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
41 RescheduleOptions, SourceChange, StreamFragmentGraph,
42};
43use crate::{MetaResult, model};
44
45impl GlobalBarrierWorkerContextImpl {
46 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
48 let database_id = database_id.map(|database_id| database_id.database_id as _);
49 self.metadata_manager
50 .catalog_controller
51 .clean_dirty_subscription(database_id)
52 .await?;
53 let dirty_associated_source_ids = self
54 .metadata_manager
55 .catalog_controller
56 .clean_dirty_creating_jobs(database_id)
57 .await?;
58
59 self.source_manager
61 .apply_source_change(SourceChange::DropSource {
62 dropped_source_ids: dirty_associated_source_ids,
63 })
64 .await;
65
66 Ok(())
67 }
68
69 async fn purge_state_table_from_hummock(
70 &self,
71 all_state_table_ids: &HashSet<TableId>,
72 ) -> MetaResult<()> {
73 self.hummock_manager.purge(all_state_table_ids).await?;
74 Ok(())
75 }
76
77 async fn list_background_job_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
78 let mgr = &self.metadata_manager;
79 let job_info = mgr
80 .catalog_controller
81 .list_background_creating_jobs(false)
82 .await?;
83
84 try_join_all(
85 job_info
86 .into_iter()
87 .map(|(id, definition, _init_at)| async move {
88 let table_id = TableId::new(id as _);
89 let stream_job_fragments =
90 mgr.catalog_controller.get_job_fragments_by_id(id).await?;
91 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
92 Ok((definition, stream_job_fragments))
93 }),
94 )
95 .await
96 }
98
99 async fn resolve_graph_info(
103 &self,
104 database_id: Option<DatabaseId>,
105 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
106 let database_id = database_id.map(|database_id| database_id.database_id as _);
107 let all_actor_infos = self
108 .metadata_manager
109 .catalog_controller
110 .load_all_actors(database_id)
111 .await?;
112
113 Ok(all_actor_infos
114 .into_iter()
115 .map(|(loaded_database_id, job_fragment_infos)| {
116 if let Some(database_id) = database_id {
117 assert_eq!(database_id, loaded_database_id);
118 }
119 (
120 DatabaseId::new(loaded_database_id as _),
121 job_fragment_infos
122 .into_iter()
123 .map(|(job_id, fragment_infos)| {
124 let job_id = TableId::new(job_id as _);
125 (
126 job_id,
127 InflightStreamingJobInfo {
128 job_id,
129 fragment_infos: fragment_infos
130 .into_iter()
131 .map(|(fragment_id, info)| (fragment_id as _, info))
132 .collect(),
133 },
134 )
135 })
136 .collect(),
137 )
138 })
139 .collect())
140 }
141
142 #[expect(clippy::type_complexity)]
143 fn resolve_hummock_version_epochs(
144 background_jobs: &HashMap<TableId, (String, StreamJobFragments)>,
145 version: &HummockVersion,
146 ) -> MetaResult<(
147 HashMap<TableId, u64>,
148 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
149 )> {
150 let table_committed_epoch: HashMap<_, _> = version
151 .state_table_info
152 .info()
153 .iter()
154 .map(|(table_id, info)| (*table_id, info.committed_epoch))
155 .collect();
156 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
157 Ok(*table_committed_epoch
158 .get(&table_id)
159 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
160 };
161 let mut min_downstream_committed_epochs = HashMap::new();
162 for (_, job) in background_jobs.values() {
163 let Ok(job_committed_epoch) = get_table_committed_epoch(job.stream_job_id) else {
164 warn!(
166 "background job {} has no committed epoch, skip resolving epochs",
167 job.stream_job_id
168 );
169 continue;
170 };
171 if let (Some(snapshot_backfill_info), _) =
172 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
173 job.fragments()
174 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
175 )?
176 {
177 for (upstream_table, snapshot_epoch) in
178 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
179 {
180 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
181 anyhow!(
182 "recovered snapshot backfill job has not filled snapshot epoch: {:?}",
183 job
184 )
185 })?;
186 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
187 match min_downstream_committed_epochs.entry(upstream_table) {
188 Entry::Occupied(entry) => {
189 let prev_min_epoch = entry.into_mut();
190 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
191 }
192 Entry::Vacant(entry) => {
193 entry.insert(pinned_epoch);
194 }
195 }
196 }
197 }
198 }
199 let mut log_epochs = HashMap::new();
200 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
201 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
202 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
203 Ordering::Less => {
204 return Err(anyhow!(
205 "downstream epoch {} later than upstream epoch {} of table {}",
206 downstream_committed_epoch,
207 upstream_committed_epoch,
208 upstream_table_id
209 )
210 .into());
211 }
212 Ordering::Equal => {
213 continue;
214 }
215 Ordering::Greater => {
216 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
217 {
218 let epochs = table_change_log
219 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
220 .map(|epoch_log| {
221 (
222 epoch_log.non_checkpoint_epochs.clone(),
223 epoch_log.checkpoint_epoch,
224 )
225 })
226 .collect_vec();
227 let first_epochs = epochs.first();
228 if let Some((_, first_checkpoint_epoch)) = &first_epochs
229 && *first_checkpoint_epoch == downstream_committed_epoch
230 {
231 } else {
232 return Err(anyhow!(
233 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
234 epochs, upstream_table_id, downstream_committed_epoch).into()
235 );
236 }
237 log_epochs
238 .try_insert(upstream_table_id, epochs)
239 .expect("non-duplicated");
240 } else {
241 return Err(anyhow!(
242 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
243 upstream_table_id, upstream_committed_epoch, downstream_committed_epoch).into()
244 );
245 }
246 }
247 }
248 }
249 Ok((table_committed_epoch, log_epochs))
250 }
251
252 pub(super) async fn reload_runtime_info_impl(
253 &self,
254 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
255 {
256 {
257 {
258 self.clean_dirty_streaming_jobs(None)
259 .await
260 .context("clean dirty streaming jobs")?;
261
262 tracing::info!("recovering background job progress");
264 let background_jobs = {
265 let jobs = self
266 .list_background_job_progress()
267 .await
268 .context("recover background job progress should not fail")?;
269 let mut background_jobs = HashMap::new();
270 for (definition, stream_job_fragments) in jobs {
271 if stream_job_fragments
272 .tracking_progress_actor_ids()
273 .is_empty()
274 {
275 self.metadata_manager
277 .catalog_controller
278 .finish_streaming_job(
279 stream_job_fragments.stream_job_id().table_id as _,
280 )
281 .await?;
282 } else {
283 background_jobs
284 .try_insert(
285 stream_job_fragments.stream_job_id(),
286 (definition, stream_job_fragments),
287 )
288 .expect("non-duplicate");
289 }
290 }
291 background_jobs
292 };
293
294 tracing::info!("recovered background job progress");
295
296 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
298
299 let mut active_streaming_nodes =
300 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
301 .await?;
302
303 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
304 info!(
305 "background streaming jobs: {:?} total {}",
306 background_streaming_jobs,
307 background_streaming_jobs.len()
308 );
309
310 let unreschedulable_jobs = {
311 let mut unreschedulable_jobs = HashSet::new();
312
313 for job_id in background_streaming_jobs {
314 let scan_types = self
315 .metadata_manager
316 .get_job_backfill_scan_types(&job_id)
317 .await?;
318
319 if scan_types
320 .values()
321 .any(|scan_type| !scan_type.is_reschedulable())
322 {
323 unreschedulable_jobs.insert(job_id);
324 }
325 }
326
327 unreschedulable_jobs
328 };
329
330 if !unreschedulable_jobs.is_empty() {
331 tracing::info!(
332 "unreschedulable background jobs: {:?}",
333 unreschedulable_jobs
334 );
335 }
336
337 let mut info = if !self.env.opts.disable_automatic_parallelism_control
342 && unreschedulable_jobs.is_empty()
343 {
344 info!("trigger offline scaling");
345 self.scale_actors(&active_streaming_nodes)
346 .await
347 .inspect_err(|err| {
348 warn!(error = %err.as_report(), "scale actors failed");
349 })?;
350
351 self.resolve_graph_info(None).await.inspect_err(|err| {
352 warn!(error = %err.as_report(), "resolve actor info failed");
353 })?
354 } else {
355 info!("trigger actor migration");
356 self.migrate_actors(&mut active_streaming_nodes)
358 .await
359 .inspect_err(|err| {
360 warn!(error = %err.as_report(), "migrate actors failed");
361 })?
362 };
363
364 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
365 info = self.resolve_graph_info(None).await.inspect_err(|err| {
366 warn!(error = %err.as_report(), "resolve actor info failed");
367 })?
368 }
369
370 let info = info;
371
372 self.purge_state_table_from_hummock(
373 &info
374 .values()
375 .flatten()
376 .flat_map(|(_, job)| job.existing_table_ids())
377 .collect(),
378 )
379 .await
380 .context("purge state table from hummock")?;
381
382 let (state_table_committed_epochs, state_table_log_epochs) = self
383 .hummock_manager
384 .on_current_version(|version| {
385 Self::resolve_hummock_version_epochs(&background_jobs, version)
386 })
387 .await?;
388
389 let subscription_infos = self
390 .metadata_manager
391 .get_mv_depended_subscriptions(None)
392 .await?
393 .into_iter()
394 .map(|(database_id, mv_depended_subscriptions)| {
395 (
396 database_id,
397 InflightSubscriptionInfo {
398 mv_depended_subscriptions,
399 },
400 )
401 })
402 .collect();
403
404 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
406 warn!(error = %err.as_report(), "update actors failed");
407 })?;
408
409 let fragment_relations = self
410 .metadata_manager
411 .catalog_controller
412 .get_fragment_downstream_relations(
413 info.values()
414 .flatten()
415 .flat_map(|(_, job)| job.fragment_infos())
416 .map(|fragment| fragment.fragment_id as _)
417 .collect(),
418 )
419 .await?;
420
421 let background_jobs = {
422 let jobs = self
423 .list_background_job_progress()
424 .await
425 .context("recover background job progress should not fail")?;
426 let mut background_jobs = HashMap::new();
427 for (definition, stream_job_fragments) in jobs {
428 background_jobs
429 .try_insert(
430 stream_job_fragments.stream_job_id(),
431 (definition, stream_job_fragments),
432 )
433 .expect("non-duplicate");
434 }
435 background_jobs
436 };
437
438 let database_infos = self
439 .metadata_manager
440 .catalog_controller
441 .list_databases()
442 .await?;
443
444 let source_splits = self.source_manager.list_assignments().await;
446 let cdc_table_backfill_actors = self
447 .metadata_manager
448 .catalog_controller
449 .cdc_table_backfill_actor_ids()
450 .await?;
451 let cdc_table_snapshot_split_assignment =
452 assign_cdc_table_snapshot_splits_pairs(
453 cdc_table_backfill_actors,
454 self.env.meta_store_ref(),
455 )
456 .await?;
457 Ok(BarrierWorkerRuntimeInfoSnapshot {
458 active_streaming_nodes,
459 database_job_infos: info,
460 state_table_committed_epochs,
461 state_table_log_epochs,
462 subscription_infos,
463 stream_actors,
464 fragment_relations,
465 source_splits,
466 background_jobs,
467 hummock_version_stats: self.hummock_manager.get_version_stats().await,
468 database_infos,
469 cdc_table_snapshot_split_assignment,
470 })
471 }
472 }
473 }
474 }
475
476 pub(super) async fn reload_database_runtime_info_impl(
477 &self,
478 database_id: DatabaseId,
479 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
480 self.clean_dirty_streaming_jobs(Some(database_id))
481 .await
482 .context("clean dirty streaming jobs")?;
483
484 tracing::info!(
486 ?database_id,
487 "recovering background job progress of database"
488 );
489 let background_jobs = self
490 .list_background_job_progress()
491 .await
492 .context("recover background job progress of database should not fail")?;
493 tracing::info!(?database_id, "recovered background job progress");
494
495 let _ = self
497 .scheduled_barriers
498 .pre_apply_drop_cancel(Some(database_id));
499
500 let info = self
501 .resolve_graph_info(Some(database_id))
502 .await
503 .inspect_err(|err| {
504 warn!(error = %err.as_report(), "resolve actor info failed");
505 })?;
506 assert!(info.len() <= 1);
507 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
508 assert_eq!(loaded_database_id, database_id);
509 info
510 }) else {
511 return Ok(None);
512 };
513
514 let background_jobs = {
515 let jobs = background_jobs;
516 let mut background_jobs = HashMap::new();
517 for (definition, stream_job_fragments) in jobs {
518 if !info.contains_key(&stream_job_fragments.stream_job_id()) {
519 continue;
520 }
521 if stream_job_fragments
522 .tracking_progress_actor_ids()
523 .is_empty()
524 {
525 self.metadata_manager
527 .catalog_controller
528 .finish_streaming_job(stream_job_fragments.stream_job_id().table_id as _)
529 .await?;
530 } else {
531 background_jobs
532 .try_insert(
533 stream_job_fragments.stream_job_id(),
534 (definition, stream_job_fragments),
535 )
536 .expect("non-duplicate");
537 }
538 }
539 background_jobs
540 };
541
542 let (state_table_committed_epochs, state_table_log_epochs) = self
543 .hummock_manager
544 .on_current_version(|version| {
545 Self::resolve_hummock_version_epochs(&background_jobs, version)
546 })
547 .await?;
548
549 let subscription_infos = self
550 .metadata_manager
551 .get_mv_depended_subscriptions(Some(database_id))
552 .await?;
553 assert!(subscription_infos.len() <= 1);
554 let mv_depended_subscriptions = subscription_infos
555 .into_iter()
556 .next()
557 .map(|(loaded_database_id, subscriptions)| {
558 assert_eq!(loaded_database_id, database_id);
559 subscriptions
560 })
561 .unwrap_or_default();
562 let subscription_info = InflightSubscriptionInfo {
563 mv_depended_subscriptions,
564 };
565
566 let fragment_relations = self
567 .metadata_manager
568 .catalog_controller
569 .get_fragment_downstream_relations(
570 info.values()
571 .flatten()
572 .map(|fragment| fragment.fragment_id as _)
573 .collect(),
574 )
575 .await?;
576
577 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
579 warn!(error = %err.as_report(), "update actors failed");
580 })?;
581
582 let source_splits = self.source_manager.list_assignments().await;
584
585 let cdc_table_backfill_actors = self
586 .metadata_manager
587 .catalog_controller
588 .cdc_table_backfill_actor_ids()
589 .await?;
590 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
591 cdc_table_backfill_actors,
592 self.env.meta_store_ref(),
593 )
594 .await?;
595 Ok(Some(DatabaseRuntimeInfoSnapshot {
596 job_infos: info,
597 state_table_committed_epochs,
598 state_table_log_epochs,
599 subscription_info,
600 stream_actors,
601 fragment_relations,
602 source_splits,
603 background_jobs,
604 cdc_table_snapshot_split_assignment,
605 }))
606 }
607}
608
609impl GlobalBarrierWorkerContextImpl {
610 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
612
613 async fn migrate_actors(
615 &self,
616 active_nodes: &mut ActiveStreamingWorkerNodes,
617 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>> {
618 let mgr = &self.metadata_manager;
619
620 let all_inuse_worker_slots: HashSet<_> = mgr
622 .catalog_controller
623 .all_inuse_worker_slots()
624 .await?
625 .into_iter()
626 .collect();
627
628 let active_worker_slots: HashSet<_> = active_nodes
629 .current()
630 .values()
631 .flat_map(|node| {
632 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
633 })
634 .collect();
635
636 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
637 .difference(&active_worker_slots)
638 .cloned()
639 .collect();
640
641 if expired_worker_slots.is_empty() {
642 info!("no expired worker slots, skipping.");
643 return self.resolve_graph_info(None).await;
644 }
645
646 info!("start migrate actors.");
647 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
648 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
649
650 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
651 .intersection(&active_worker_slots)
652 .cloned()
653 .collect();
654
655 let start = Instant::now();
656 let mut plan = HashMap::new();
657 'discovery: while !to_migrate_worker_slots.is_empty() {
658 let mut new_worker_slots = active_nodes
659 .current()
660 .values()
661 .flat_map(|worker| {
662 (0..worker.compute_node_parallelism())
663 .map(move |i| WorkerSlotId::new(worker.id, i as _))
664 })
665 .collect_vec();
666
667 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
668 let to_migration_size = to_migrate_worker_slots.len();
669 let mut available_size = new_worker_slots.len();
670
671 if available_size < to_migration_size
672 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
673 {
674 let mut factor = 2;
675
676 while available_size < to_migration_size {
677 let mut extended_worker_slots = active_nodes
678 .current()
679 .values()
680 .flat_map(|worker| {
681 (0..worker.compute_node_parallelism() * factor)
682 .map(move |i| WorkerSlotId::new(worker.id, i as _))
683 })
684 .collect_vec();
685
686 extended_worker_slots
687 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
688
689 extended_worker_slots.sort_by(|a, b| {
690 a.slot_idx()
691 .cmp(&b.slot_idx())
692 .then(a.worker_id().cmp(&b.worker_id()))
693 });
694
695 available_size = extended_worker_slots.len();
696 new_worker_slots = extended_worker_slots;
697
698 factor *= 2;
699 }
700
701 tracing::info!(
702 "migration timed out, extending worker slots to {:?} by factor {}",
703 new_worker_slots,
704 factor,
705 );
706 }
707
708 if !new_worker_slots.is_empty() {
709 debug!("new worker slots found: {:#?}", new_worker_slots);
710 for target_worker_slot in new_worker_slots {
711 if let Some(from) = to_migrate_worker_slots.pop() {
712 debug!(
713 "plan to migrate from worker slot {} to {}",
714 from, target_worker_slot
715 );
716 inuse_worker_slots.insert(target_worker_slot);
717 plan.insert(from, target_worker_slot);
718 } else {
719 break 'discovery;
720 }
721 }
722 }
723
724 if to_migrate_worker_slots.is_empty() {
725 break;
726 }
727
728 let changed = active_nodes
730 .wait_changed(
731 Duration::from_millis(5000),
732 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
733 |active_nodes| {
734 let current_nodes = active_nodes
735 .current()
736 .values()
737 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
738 .collect_vec();
739 warn!(
740 current_nodes = ?current_nodes,
741 "waiting for new workers to join, elapsed: {}s",
742 start.elapsed().as_secs()
743 );
744 },
745 )
746 .await;
747 warn!(?changed, "get worker changed or timed out. Retry migrate");
748 }
749
750 info!("migration plan {:?}", plan);
751
752 mgr.catalog_controller.migrate_actors(plan).await?;
753
754 info!("migrate actors succeed.");
755
756 self.resolve_graph_info(None).await
757 }
758
759 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
760 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
761 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
762 };
763
764 match self.scale_controller.integrity_check().await {
765 Ok(_) => {
766 info!("integrity check passed");
767 }
768 Err(e) => {
769 return Err(anyhow!(e).context("integrity check failed").into());
770 }
771 }
772
773 let mgr = &self.metadata_manager;
774
775 debug!("start resetting actors distribution");
776
777 let available_workers: HashMap<_, _> = active_nodes
778 .current()
779 .values()
780 .filter(|worker| worker.is_streaming_schedulable())
781 .map(|worker| (worker.id, worker.clone()))
782 .collect();
783
784 info!(
785 "target worker ids for offline scaling: {:?}",
786 available_workers
787 );
788
789 let available_parallelism = active_nodes
790 .current()
791 .values()
792 .map(|worker_node| worker_node.compute_node_parallelism())
793 .sum();
794
795 let mut table_parallelisms = HashMap::new();
796
797 let reschedule_targets: HashMap<_, _> = {
798 let streaming_parallelisms = mgr
799 .catalog_controller
800 .get_all_streaming_parallelisms()
801 .await?;
802
803 let mut result = HashMap::new();
804
805 for (object_id, streaming_parallelism) in streaming_parallelisms {
806 let actual_fragment_parallelism = mgr
807 .catalog_controller
808 .get_actual_job_fragment_parallelism(object_id)
809 .await?;
810
811 let table_parallelism = match streaming_parallelism {
812 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
813 StreamingParallelism::Custom => model::TableParallelism::Custom,
814 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
815 };
816
817 let target_parallelism = Self::derive_target_parallelism(
818 available_parallelism,
819 table_parallelism,
820 actual_fragment_parallelism,
821 self.env.opts.default_parallelism,
822 );
823
824 if target_parallelism != table_parallelism {
825 tracing::info!(
826 "resetting table {} parallelism from {:?} to {:?}",
827 object_id,
828 table_parallelism,
829 target_parallelism
830 );
831 }
832
833 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
834
835 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
836
837 result.insert(
838 object_id as u32,
839 JobRescheduleTarget {
840 parallelism: parallelism_change,
841 resource_group: JobResourceGroupTarget::Keep,
842 },
843 );
844 }
845
846 result
847 };
848
849 info!(
850 "target table parallelisms for offline scaling: {:?}",
851 reschedule_targets
852 );
853
854 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
855
856 for chunk in reschedule_targets
857 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
858 .map(|c| c.to_vec())
859 {
860 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
861
862 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
863
864 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
865
866 let plan = self
867 .scale_controller
868 .generate_job_reschedule_plan(
869 JobReschedulePolicy {
870 targets: local_reschedule_targets,
871 },
872 false,
873 )
874 .await?;
875
876 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
878 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
879 continue;
880 };
881
882 let mut compared_table_parallelisms = table_parallelisms.clone();
883
884 let reschedule_fragment = if plan.reschedules.is_empty() {
886 HashMap::new()
887 } else {
888 self.scale_controller
889 .analyze_reschedule_plan(
890 plan.reschedules,
891 RescheduleOptions {
892 resolve_no_shuffle_upstream: true,
893 skip_create_new_actors: true,
894 },
895 &mut compared_table_parallelisms,
896 )
897 .await?
898 };
899
900 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
902
903 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
904
905 if let Err(e) = self
906 .scale_controller
907 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
908 .await
909 {
910 tracing::error!(
911 error = %e.as_report(),
912 "failed to apply reschedule for offline scaling in recovery",
913 );
914
915 return Err(e);
916 }
917
918 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
919 }
920
921 info!("scaling actors succeed.");
922 Ok(())
923 }
924
925 fn derive_target_parallelism(
933 available_parallelism: usize,
934 assigned_parallelism: TableParallelism,
935 actual_fragment_parallelism: Option<usize>,
936 default_parallelism: DefaultParallelism,
937 ) -> TableParallelism {
938 match assigned_parallelism {
939 TableParallelism::Custom => {
940 if let Some(fragment_parallelism) = actual_fragment_parallelism {
941 if fragment_parallelism >= available_parallelism {
942 TableParallelism::Adaptive
943 } else {
944 TableParallelism::Fixed(fragment_parallelism)
945 }
946 } else {
947 TableParallelism::Adaptive
948 }
949 }
950 TableParallelism::Adaptive => {
951 match (default_parallelism, actual_fragment_parallelism) {
952 (DefaultParallelism::Default(n), Some(fragment_parallelism))
953 if fragment_parallelism == n.get() =>
954 {
955 TableParallelism::Fixed(fragment_parallelism)
956 }
957 _ => TableParallelism::Adaptive,
958 }
959 }
960 _ => assigned_parallelism,
961 }
962 }
963
964 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
966 self.metadata_manager.all_active_actors().await
967 }
968}
969
970#[cfg(test)]
971mod tests {
972 use std::num::NonZeroUsize;
973
974 use super::*;
975 #[test]
976 fn test_derive_target_parallelism() {
977 assert_eq!(
979 TableParallelism::Fixed(5),
980 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
981 10,
982 TableParallelism::Custom,
983 Some(5),
984 DefaultParallelism::Full,
985 )
986 );
987
988 assert_eq!(
990 TableParallelism::Adaptive,
991 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
992 10,
993 TableParallelism::Custom,
994 Some(10),
995 DefaultParallelism::Full,
996 )
997 );
998
999 assert_eq!(
1001 TableParallelism::Adaptive,
1002 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1003 10,
1004 TableParallelism::Custom,
1005 Some(11),
1006 DefaultParallelism::Full,
1007 )
1008 );
1009
1010 assert_eq!(
1012 TableParallelism::Adaptive,
1013 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1014 10,
1015 TableParallelism::Custom,
1016 None,
1017 DefaultParallelism::Full,
1018 )
1019 );
1020
1021 assert_eq!(
1023 TableParallelism::Adaptive,
1024 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1025 10,
1026 TableParallelism::Adaptive,
1027 None,
1028 DefaultParallelism::Full,
1029 )
1030 );
1031
1032 assert_eq!(
1034 TableParallelism::Fixed(5),
1035 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1036 10,
1037 TableParallelism::Adaptive,
1038 Some(5),
1039 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1040 )
1041 );
1042
1043 assert_eq!(
1045 TableParallelism::Adaptive,
1046 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
1047 10,
1048 TableParallelism::Adaptive,
1049 Some(6),
1050 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
1051 )
1052 );
1053 }
1054}