1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use futures::future::try_join_all;
20use itertools::Itertools;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::WorkerSlotId;
24use risingwave_meta_model::StreamingParallelism;
25use thiserror_ext::AsReport;
26use tokio::time::Instant;
27use tracing::{debug, info, warn};
28
29use super::BarrierWorkerRuntimeInfoSnapshot;
30use crate::barrier::context::GlobalBarrierWorkerContextImpl;
31use crate::barrier::info::InflightDatabaseInfo;
32use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
33use crate::controller::fragment::InflightFragmentInfo;
34use crate::manager::ActiveStreamingWorkerNodes;
35use crate::model::{ActorId, StreamActor, StreamJobFragments, TableParallelism};
36use crate::stream::{
37 JobParallelismTarget, JobReschedulePolicy, JobRescheduleTarget, JobResourceGroupTarget,
38 RescheduleOptions, SourceChange,
39};
40use crate::{MetaResult, model};
41
42impl GlobalBarrierWorkerContextImpl {
43 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
45 let database_id = database_id.map(|database_id| database_id.database_id as _);
46 self.metadata_manager
47 .catalog_controller
48 .clean_dirty_subscription(database_id)
49 .await?;
50 let dirty_associated_source_ids = self
51 .metadata_manager
52 .catalog_controller
53 .clean_dirty_creating_jobs(database_id)
54 .await?;
55
56 self.source_manager
58 .apply_source_change(SourceChange::DropSource {
59 dropped_source_ids: dirty_associated_source_ids,
60 })
61 .await;
62
63 Ok(())
64 }
65
66 async fn purge_state_table_from_hummock(
67 &self,
68 all_state_table_ids: &HashSet<TableId>,
69 ) -> MetaResult<()> {
70 self.hummock_manager.purge(all_state_table_ids).await?;
71 Ok(())
72 }
73
74 async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
75 let mgr = &self.metadata_manager;
76 let mviews = mgr
77 .catalog_controller
78 .list_background_creating_mviews(false)
79 .await?;
80
81 try_join_all(mviews.into_iter().map(|mview| async move {
82 let table_id = TableId::new(mview.table_id as _);
83 let stream_job_fragments = mgr
84 .catalog_controller
85 .get_job_fragments_by_id(mview.table_id)
86 .await?;
87 assert_eq!(stream_job_fragments.stream_job_id(), table_id);
88 Ok((mview.definition, stream_job_fragments))
89 }))
90 .await
91 }
93
94 async fn resolve_graph_info(
98 &self,
99 database_id: Option<DatabaseId>,
100 ) -> MetaResult<HashMap<DatabaseId, InflightDatabaseInfo>> {
101 let database_id = database_id.map(|database_id| database_id.database_id as _);
102 let all_actor_infos = self
103 .metadata_manager
104 .catalog_controller
105 .load_all_actors(database_id)
106 .await?;
107
108 Ok(all_actor_infos
109 .into_iter()
110 .map(|(loaded_database_id, actor_infos)| {
111 if let Some(database_id) = database_id {
112 assert_eq!(database_id, loaded_database_id);
113 }
114 (
115 DatabaseId::new(loaded_database_id as _),
116 InflightDatabaseInfo::new(actor_infos.into_iter().map(
117 |(job_id, actor_infos)| {
118 (
119 TableId::new(job_id as _),
120 actor_infos
121 .into_iter()
122 .map(|(fragment_id, info)| (fragment_id as _, info)),
123 )
124 },
125 )),
126 )
127 })
128 .collect())
129 }
130
131 pub(super) async fn reload_runtime_info_impl(
132 &self,
133 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
134 {
135 {
136 {
137 self.clean_dirty_streaming_jobs(None)
138 .await
139 .context("clean dirty streaming jobs")?;
140
141 tracing::info!("recovering mview progress");
143 let background_jobs = {
144 let jobs = self
145 .list_background_mv_progress()
146 .await
147 .context("recover mview progress should not fail")?;
148 let mut background_jobs = HashMap::new();
149 for (definition, stream_job_fragments) in jobs {
150 if stream_job_fragments
151 .tracking_progress_actor_ids()
152 .is_empty()
153 {
154 self.metadata_manager
156 .catalog_controller
157 .finish_streaming_job(
158 stream_job_fragments.stream_job_id().table_id as _,
159 None,
160 )
161 .await?;
162 } else {
163 background_jobs
164 .try_insert(
165 stream_job_fragments.stream_job_id(),
166 (definition, stream_job_fragments),
167 )
168 .expect("non-duplicate");
169 }
170 }
171 background_jobs
172 };
173
174 tracing::info!("recovered mview progress");
175
176 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
178
179 let mut active_streaming_nodes =
180 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
181 .await?;
182
183 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
184 info!(
185 "background streaming jobs: {:?} total {}",
186 background_streaming_jobs,
187 background_streaming_jobs.len()
188 );
189
190 let unreschedulable_jobs = {
191 let mut unreschedulable_jobs = HashSet::new();
192
193 for job_id in background_streaming_jobs {
194 let scan_types = self
195 .metadata_manager
196 .get_job_backfill_scan_types(&job_id)
197 .await?;
198
199 if scan_types
200 .values()
201 .any(|scan_type| !scan_type.is_reschedulable())
202 {
203 unreschedulable_jobs.insert(job_id);
204 }
205 }
206
207 unreschedulable_jobs
208 };
209
210 if !unreschedulable_jobs.is_empty() {
211 tracing::info!(
212 "unreschedulable background jobs: {:?}",
213 unreschedulable_jobs
214 );
215 }
216
217 let mut info = if !self.env.opts.disable_automatic_parallelism_control
222 && unreschedulable_jobs.is_empty()
223 {
224 info!("trigger offline scaling");
225 self.scale_actors(&active_streaming_nodes)
226 .await
227 .inspect_err(|err| {
228 warn!(error = %err.as_report(), "scale actors failed");
229 })?;
230
231 self.resolve_graph_info(None).await.inspect_err(|err| {
232 warn!(error = %err.as_report(), "resolve actor info failed");
233 })?
234 } else {
235 info!("trigger actor migration");
236 self.migrate_actors(&mut active_streaming_nodes)
238 .await
239 .inspect_err(|err| {
240 warn!(error = %err.as_report(), "migrate actors failed");
241 })?
242 };
243
244 if self.scheduled_barriers.pre_apply_drop_cancel(None) {
245 info = self.resolve_graph_info(None).await.inspect_err(|err| {
246 warn!(error = %err.as_report(), "resolve actor info failed");
247 })?
248 }
249
250 let info = info;
251
252 self.purge_state_table_from_hummock(
253 &InflightFragmentInfo::existing_table_ids(
254 info.values().flat_map(|database| database.fragment_infos()),
255 )
256 .collect(),
257 )
258 .await
259 .context("purge state table from hummock")?;
260
261 let state_table_committed_epochs: HashMap<_, _> = self
262 .hummock_manager
263 .on_current_version(|version| {
264 version
265 .state_table_info
266 .info()
267 .iter()
268 .map(|(table_id, info)| (*table_id, info.committed_epoch))
269 .collect()
270 })
271 .await;
272
273 let subscription_infos = self
274 .metadata_manager
275 .get_mv_depended_subscriptions(None)
276 .await?
277 .into_iter()
278 .map(|(database_id, mv_depended_subscriptions)| {
279 (
280 database_id,
281 InflightSubscriptionInfo {
282 mv_depended_subscriptions,
283 },
284 )
285 })
286 .collect();
287
288 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
290 warn!(error = %err.as_report(), "update actors failed");
291 })?;
292
293 let fragment_relations = self
294 .metadata_manager
295 .catalog_controller
296 .get_fragment_downstream_relations(
297 info.values()
298 .flat_map(|database| database.fragment_infos())
299 .map(|fragment| fragment.fragment_id as _)
300 .collect(),
301 )
302 .await?;
303
304 let background_jobs = {
305 let jobs = self
306 .list_background_mv_progress()
307 .await
308 .context("recover mview progress should not fail")?;
309 let mut background_jobs = HashMap::new();
310 for (definition, stream_job_fragments) in jobs {
311 background_jobs
312 .try_insert(
313 stream_job_fragments.stream_job_id(),
314 (definition, stream_job_fragments),
315 )
316 .expect("non-duplicate");
317 }
318 background_jobs
319 };
320
321 let source_splits = self.source_manager.list_assignments().await;
323 Ok(BarrierWorkerRuntimeInfoSnapshot {
324 active_streaming_nodes,
325 database_fragment_infos: info,
326 state_table_committed_epochs,
327 subscription_infos,
328 stream_actors,
329 fragment_relations,
330 source_splits,
331 background_jobs,
332 hummock_version_stats: self.hummock_manager.get_version_stats().await,
333 })
334 }
335 }
336 }
337 }
338
339 pub(super) async fn reload_database_runtime_info_impl(
340 &self,
341 database_id: DatabaseId,
342 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
343 self.clean_dirty_streaming_jobs(Some(database_id))
344 .await
345 .context("clean dirty streaming jobs")?;
346
347 tracing::info!(?database_id, "recovering mview progress of database");
349 let background_jobs = self
350 .list_background_mv_progress()
351 .await
352 .context("recover mview progress of database should not fail")?;
353 tracing::info!(?database_id, "recovered mview progress");
354
355 let _ = self
357 .scheduled_barriers
358 .pre_apply_drop_cancel(Some(database_id));
359
360 let info = self
361 .resolve_graph_info(Some(database_id))
362 .await
363 .inspect_err(|err| {
364 warn!(error = %err.as_report(), "resolve actor info failed");
365 })?;
366 assert!(info.len() <= 1);
367 let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
368 assert_eq!(loaded_database_id, database_id);
369 info
370 }) else {
371 return Ok(None);
372 };
373
374 let background_jobs = {
375 let jobs = background_jobs;
376 let mut background_jobs = HashMap::new();
377 for (definition, stream_job_fragments) in jobs {
378 if !info.contains_job(stream_job_fragments.stream_job_id()) {
379 continue;
380 }
381 if stream_job_fragments
382 .tracking_progress_actor_ids()
383 .is_empty()
384 {
385 self.metadata_manager
387 .catalog_controller
388 .finish_streaming_job(
389 stream_job_fragments.stream_job_id().table_id as _,
390 None,
391 )
392 .await?;
393 } else {
394 background_jobs
395 .try_insert(
396 stream_job_fragments.stream_job_id(),
397 (definition, stream_job_fragments),
398 )
399 .expect("non-duplicate");
400 }
401 }
402 background_jobs
403 };
404
405 let state_table_committed_epochs: HashMap<_, _> = self
406 .hummock_manager
407 .on_current_version(|version| {
408 version
409 .state_table_info
410 .info()
411 .iter()
412 .map(|(table_id, info)| (*table_id, info.committed_epoch))
413 .collect()
414 })
415 .await;
416
417 let subscription_infos = self
418 .metadata_manager
419 .get_mv_depended_subscriptions(Some(database_id))
420 .await?;
421 assert!(subscription_infos.len() <= 1);
422 let mv_depended_subscriptions = subscription_infos
423 .into_iter()
424 .next()
425 .map(|(loaded_database_id, subscriptions)| {
426 assert_eq!(loaded_database_id, database_id);
427 subscriptions
428 })
429 .unwrap_or_default();
430 let subscription_info = InflightSubscriptionInfo {
431 mv_depended_subscriptions,
432 };
433
434 let fragment_relations = self
435 .metadata_manager
436 .catalog_controller
437 .get_fragment_downstream_relations(
438 info.fragment_infos()
439 .map(|fragment| fragment.fragment_id as _)
440 .collect(),
441 )
442 .await?;
443
444 let stream_actors = self.load_all_actors().await.inspect_err(|err| {
446 warn!(error = %err.as_report(), "update actors failed");
447 })?;
448
449 let source_splits = self.source_manager.list_assignments().await;
451 Ok(Some(DatabaseRuntimeInfoSnapshot {
452 database_fragment_info: info,
453 state_table_committed_epochs,
454 subscription_info,
455 stream_actors,
456 fragment_relations,
457 source_splits,
458 background_jobs,
459 }))
460 }
461}
462
463impl GlobalBarrierWorkerContextImpl {
464 const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(300);
466
467 async fn migrate_actors(
469 &self,
470 active_nodes: &mut ActiveStreamingWorkerNodes,
471 ) -> MetaResult<HashMap<DatabaseId, InflightDatabaseInfo>> {
472 let mgr = &self.metadata_manager;
473
474 let all_inuse_worker_slots: HashSet<_> = mgr
476 .catalog_controller
477 .all_inuse_worker_slots()
478 .await?
479 .into_iter()
480 .collect();
481
482 let active_worker_slots: HashSet<_> = active_nodes
483 .current()
484 .values()
485 .flat_map(|node| {
486 (0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
487 })
488 .collect();
489
490 let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
491 .difference(&active_worker_slots)
492 .cloned()
493 .collect();
494
495 if expired_worker_slots.is_empty() {
496 info!("no expired worker slots, skipping.");
497 return self.resolve_graph_info(None).await;
498 }
499
500 info!("start migrate actors.");
501 let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec();
502 info!("got to migrate worker slots {:#?}", to_migrate_worker_slots);
503
504 let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots
505 .intersection(&active_worker_slots)
506 .cloned()
507 .collect();
508
509 let start = Instant::now();
510 let mut plan = HashMap::new();
511 'discovery: while !to_migrate_worker_slots.is_empty() {
512 let mut new_worker_slots = active_nodes
513 .current()
514 .values()
515 .flat_map(|worker| {
516 (0..worker.compute_node_parallelism())
517 .map(move |i| WorkerSlotId::new(worker.id, i as _))
518 })
519 .collect_vec();
520
521 new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
522 let to_migration_size = to_migrate_worker_slots.len();
523 let mut available_size = new_worker_slots.len();
524
525 if available_size < to_migration_size
526 && start.elapsed() > Self::RECOVERY_FORCE_MIGRATION_TIMEOUT
527 {
528 let mut factor = 2;
529
530 while available_size < to_migration_size {
531 let mut extended_worker_slots = active_nodes
532 .current()
533 .values()
534 .flat_map(|worker| {
535 (0..worker.compute_node_parallelism() * factor)
536 .map(move |i| WorkerSlotId::new(worker.id, i as _))
537 })
538 .collect_vec();
539
540 extended_worker_slots
541 .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot));
542
543 extended_worker_slots.sort_by(|a, b| {
544 a.slot_idx()
545 .cmp(&b.slot_idx())
546 .then(a.worker_id().cmp(&b.worker_id()))
547 });
548
549 available_size = extended_worker_slots.len();
550 new_worker_slots = extended_worker_slots;
551
552 factor *= 2;
553 }
554
555 tracing::info!(
556 "migration timed out, extending worker slots to {:?} by factor {}",
557 new_worker_slots,
558 factor,
559 );
560 }
561
562 if !new_worker_slots.is_empty() {
563 debug!("new worker slots found: {:#?}", new_worker_slots);
564 for target_worker_slot in new_worker_slots {
565 if let Some(from) = to_migrate_worker_slots.pop() {
566 debug!(
567 "plan to migrate from worker slot {} to {}",
568 from, target_worker_slot
569 );
570 inuse_worker_slots.insert(target_worker_slot);
571 plan.insert(from, target_worker_slot);
572 } else {
573 break 'discovery;
574 }
575 }
576 }
577
578 if to_migrate_worker_slots.is_empty() {
579 break;
580 }
581
582 let changed = active_nodes
584 .wait_changed(
585 Duration::from_millis(5000),
586 Self::RECOVERY_FORCE_MIGRATION_TIMEOUT,
587 |active_nodes| {
588 let current_nodes = active_nodes
589 .current()
590 .values()
591 .map(|node| (node.id, &node.host, node.compute_node_parallelism()))
592 .collect_vec();
593 warn!(
594 current_nodes = ?current_nodes,
595 "waiting for new workers to join, elapsed: {}s",
596 start.elapsed().as_secs()
597 );
598 },
599 )
600 .await;
601 warn!(?changed, "get worker changed or timed out. Retry migrate");
602 }
603
604 info!("migration plan {:?}", plan);
605
606 mgr.catalog_controller.migrate_actors(plan).await?;
607
608 info!("migrate actors succeed.");
609
610 self.resolve_graph_info(None).await
611 }
612
613 async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
614 let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else {
615 return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into());
616 };
617
618 match self.scale_controller.integrity_check().await {
619 Ok(_) => {
620 info!("integrity check passed");
621 }
622 Err(e) => {
623 return Err(anyhow!(e).context("integrity check failed").into());
624 }
625 }
626
627 let mgr = &self.metadata_manager;
628
629 debug!("start resetting actors distribution");
630
631 let available_workers: HashMap<_, _> = active_nodes
632 .current()
633 .values()
634 .filter(|worker| worker.is_streaming_schedulable())
635 .map(|worker| (worker.id, worker.clone()))
636 .collect();
637
638 info!(
639 "target worker ids for offline scaling: {:?}",
640 available_workers
641 );
642
643 let available_parallelism = active_nodes
644 .current()
645 .values()
646 .map(|worker_node| worker_node.compute_node_parallelism())
647 .sum();
648
649 let mut table_parallelisms = HashMap::new();
650
651 let reschedule_targets: HashMap<_, _> = {
652 let streaming_parallelisms = mgr
653 .catalog_controller
654 .get_all_streaming_parallelisms()
655 .await?;
656
657 let mut result = HashMap::new();
658
659 for (object_id, streaming_parallelism) in streaming_parallelisms {
660 let actual_fragment_parallelism = mgr
661 .catalog_controller
662 .get_actual_job_fragment_parallelism(object_id)
663 .await?;
664
665 let table_parallelism = match streaming_parallelism {
666 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
667 StreamingParallelism::Custom => model::TableParallelism::Custom,
668 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
669 };
670
671 let target_parallelism = Self::derive_target_parallelism(
672 available_parallelism,
673 table_parallelism,
674 actual_fragment_parallelism,
675 self.env.opts.default_parallelism,
676 );
677
678 if target_parallelism != table_parallelism {
679 tracing::info!(
680 "resetting table {} parallelism from {:?} to {:?}",
681 object_id,
682 table_parallelism,
683 target_parallelism
684 );
685 }
686
687 table_parallelisms.insert(TableId::new(object_id as u32), target_parallelism);
688
689 let parallelism_change = JobParallelismTarget::Update(target_parallelism);
690
691 result.insert(
692 object_id as u32,
693 JobRescheduleTarget {
694 parallelism: parallelism_change,
695 resource_group: JobResourceGroupTarget::Keep,
696 },
697 );
698 }
699
700 result
701 };
702
703 info!(
704 "target table parallelisms for offline scaling: {:?}",
705 reschedule_targets
706 );
707
708 let reschedule_targets = reschedule_targets.into_iter().collect_vec();
709
710 for chunk in reschedule_targets
711 .chunks(self.env.opts.parallelism_control_batch_size.max(1))
712 .map(|c| c.to_vec())
713 {
714 let local_reschedule_targets: HashMap<u32, _> = chunk.into_iter().collect();
715
716 let reschedule_ids = local_reschedule_targets.keys().copied().collect_vec();
717
718 info!(jobs=?reschedule_ids,"generating reschedule plan for jobs in offline scaling");
719
720 let plan = self
721 .scale_controller
722 .generate_job_reschedule_plan(JobReschedulePolicy {
723 targets: local_reschedule_targets,
724 })
725 .await?;
726
727 if plan.reschedules.is_empty() && plan.post_updates.parallelism_updates.is_empty() {
729 info!(jobs=?reschedule_ids,"no plan generated for jobs in offline scaling");
730 continue;
731 };
732
733 let mut compared_table_parallelisms = table_parallelisms.clone();
734
735 let reschedule_fragment = if plan.reschedules.is_empty() {
737 HashMap::new()
738 } else {
739 self.scale_controller
740 .analyze_reschedule_plan(
741 plan.reschedules,
742 RescheduleOptions {
743 resolve_no_shuffle_upstream: true,
744 skip_create_new_actors: true,
745 },
746 &mut compared_table_parallelisms,
747 )
748 .await?
749 };
750
751 debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
753
754 info!(jobs=?reschedule_ids,"post applying reschedule for jobs in offline scaling");
755
756 if let Err(e) = self
757 .scale_controller
758 .post_apply_reschedule(&reschedule_fragment, &plan.post_updates)
759 .await
760 {
761 tracing::error!(
762 error = %e.as_report(),
763 "failed to apply reschedule for offline scaling in recovery",
764 );
765
766 return Err(e);
767 }
768
769 info!(jobs=?reschedule_ids,"post applied reschedule for jobs in offline scaling");
770 }
771
772 info!("scaling actors succeed.");
773 Ok(())
774 }
775
776 fn derive_target_parallelism(
784 available_parallelism: usize,
785 assigned_parallelism: TableParallelism,
786 actual_fragment_parallelism: Option<usize>,
787 default_parallelism: DefaultParallelism,
788 ) -> TableParallelism {
789 match assigned_parallelism {
790 TableParallelism::Custom => {
791 if let Some(fragment_parallelism) = actual_fragment_parallelism {
792 if fragment_parallelism >= available_parallelism {
793 TableParallelism::Adaptive
794 } else {
795 TableParallelism::Fixed(fragment_parallelism)
796 }
797 } else {
798 TableParallelism::Adaptive
799 }
800 }
801 TableParallelism::Adaptive => {
802 match (default_parallelism, actual_fragment_parallelism) {
803 (DefaultParallelism::Default(n), Some(fragment_parallelism))
804 if fragment_parallelism == n.get() =>
805 {
806 TableParallelism::Fixed(fragment_parallelism)
807 }
808 _ => TableParallelism::Adaptive,
809 }
810 }
811 _ => assigned_parallelism,
812 }
813 }
814
815 async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>> {
817 self.metadata_manager.all_active_actors().await
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use std::num::NonZeroUsize;
824
825 use super::*;
826 #[test]
827 fn test_derive_target_parallelism() {
828 assert_eq!(
830 TableParallelism::Fixed(5),
831 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
832 10,
833 TableParallelism::Custom,
834 Some(5),
835 DefaultParallelism::Full,
836 )
837 );
838
839 assert_eq!(
841 TableParallelism::Adaptive,
842 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
843 10,
844 TableParallelism::Custom,
845 Some(10),
846 DefaultParallelism::Full,
847 )
848 );
849
850 assert_eq!(
852 TableParallelism::Adaptive,
853 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
854 10,
855 TableParallelism::Custom,
856 Some(11),
857 DefaultParallelism::Full,
858 )
859 );
860
861 assert_eq!(
863 TableParallelism::Adaptive,
864 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
865 10,
866 TableParallelism::Custom,
867 None,
868 DefaultParallelism::Full,
869 )
870 );
871
872 assert_eq!(
874 TableParallelism::Adaptive,
875 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
876 10,
877 TableParallelism::Adaptive,
878 None,
879 DefaultParallelism::Full,
880 )
881 );
882
883 assert_eq!(
885 TableParallelism::Fixed(5),
886 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
887 10,
888 TableParallelism::Adaptive,
889 Some(5),
890 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
891 )
892 );
893
894 assert_eq!(
896 TableParallelism::Adaptive,
897 GlobalBarrierWorkerContextImpl::derive_target_parallelism(
898 10,
899 TableParallelism::Adaptive,
900 Some(6),
901 DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
902 )
903 );
904 }
905}