1use std::cmp::{Ordering, min};
16use std::collections::hash_map::DefaultHasher;
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use itertools::Itertools;
25use num_integer::Integer;
26use num_traits::abs;
27use risingwave_common::bail;
28use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::hash::ActorMapping;
31use risingwave_common::util::iter_util::ZipEqDebug;
32use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
33use risingwave_pb::common::{WorkerNode, WorkerType};
34use risingwave_pb::meta::FragmentWorkerSlotMappings;
35use risingwave_pb::meta::subscribe_response::{Info, Operation};
36use risingwave_pb::meta::table_fragments::fragment::{
37 FragmentDistributionType, PbFragmentDistributionType,
38};
39use risingwave_pb::meta::table_fragments::{self, State};
40use risingwave_pb::stream_plan::{
41 Dispatcher, FragmentTypeFlag, PbDispatcher, PbDispatcherType, StreamNode,
42};
43use thiserror_ext::AsReport;
44use tokio::sync::oneshot::Receiver;
45use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
46use tokio::task::JoinHandle;
47use tokio::time::{Instant, MissedTickBehavior};
48
49use crate::barrier::{Command, Reschedule};
50use crate::controller::scale::RescheduleWorkingSet;
51use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
52use crate::model::{
53 ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
54};
55use crate::serving::{
56 ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
57};
58use crate::stream::{GlobalStreamManager, SourceManagerRef};
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone, Eq, PartialEq)]
62pub struct WorkerReschedule {
63 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
64}
65
66pub struct CustomFragmentInfo {
67 pub fragment_id: u32,
68 pub fragment_type_mask: u32,
69 pub distribution_type: PbFragmentDistributionType,
70 pub state_table_ids: Vec<u32>,
71 pub node: StreamNode,
72 pub actor_template: StreamActorWithDispatchers,
73 pub actors: Vec<CustomActorInfo>,
74}
75
76#[derive(Default, Clone)]
77pub struct CustomActorInfo {
78 pub actor_id: u32,
79 pub fragment_id: u32,
80 pub dispatcher: Vec<Dispatcher>,
81 pub vnode_bitmap: Option<Bitmap>,
83}
84
85impl CustomFragmentInfo {
86 pub fn get_fragment_type_mask(&self) -> u32 {
87 self.fragment_type_mask
88 }
89
90 pub fn distribution_type(&self) -> FragmentDistributionType {
91 self.distribution_type
92 }
93}
94
95use educe::Educe;
96use futures::future::try_join_all;
97use risingwave_common::system_param::AdaptiveParallelismStrategy;
98use risingwave_common::system_param::reader::SystemParamsRead;
99use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
100use risingwave_meta_model::DispatcherType;
101use risingwave_pb::stream_plan::stream_node::NodeBody;
102
103use super::SourceChange;
104use crate::controller::id::IdCategory;
105use crate::controller::utils::filter_workers_by_resource_group;
106
107#[derive(Educe)]
109#[educe(Debug)]
110pub struct RescheduleContext {
111 #[educe(Debug(ignore))]
113 actor_map: HashMap<ActorId, CustomActorInfo>,
114 actor_status: BTreeMap<ActorId, WorkerId>,
116 #[educe(Debug(ignore))]
118 fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
119 stream_source_fragment_ids: HashSet<FragmentId>,
121 stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
123 no_shuffle_target_fragment_ids: HashSet<FragmentId>,
125 no_shuffle_source_fragment_ids: HashSet<FragmentId>,
127 fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
129 fragment_upstreams: HashMap<
130 risingwave_meta_model::FragmentId,
131 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
132 >,
133}
134
135impl RescheduleContext {
136 fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
137 self.actor_status
138 .get(actor_id)
139 .cloned()
140 .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
141 }
142}
143
144pub fn rebalance_actor_vnode(
171 actors: &[CustomActorInfo],
172 actors_to_remove: &BTreeSet<ActorId>,
173 actors_to_create: &BTreeSet<ActorId>,
174) -> HashMap<ActorId, Bitmap> {
175 let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
176
177 assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
178 assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
179
180 assert!(actors.len() >= actors_to_remove.len());
181
182 let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
183 assert!(target_actor_count > 0);
184
185 let vnode_count = actors[0]
187 .vnode_bitmap
188 .as_ref()
189 .expect("vnode bitmap unset")
190 .len();
191
192 #[derive(Debug)]
194 struct Balance {
195 actor_id: ActorId,
196 balance: i32,
197 builder: BitmapBuilder,
198 }
199 let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
200
201 tracing::debug!(
202 "expected {}, remain {}, prev actors {}, target actors {}",
203 expected,
204 remain,
205 actors.len(),
206 target_actor_count,
207 );
208
209 let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
210 .iter()
211 .map(|actor| {
212 (
213 actor.actor_id as ActorId,
214 actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
215 )
216 })
217 .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
218
219 let order_by_bitmap_desc =
220 |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
221 bitmap_a
222 .count_ones()
223 .cmp(&bitmap_b.count_ones())
224 .reverse()
225 .then(id_a.cmp(id_b))
226 };
227
228 let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
229 let mut builder = BitmapBuilder::default();
230 builder.append_bitmap(bitmap);
231 builder
232 };
233
234 let (prev_expected, _) = vnode_count.div_rem(&actors.len());
235
236 let prev_remain = removed
237 .iter()
238 .map(|(_, bitmap)| {
239 assert!(bitmap.count_ones() >= prev_expected);
240 bitmap.count_ones() - prev_expected
241 })
242 .sum::<usize>();
243
244 removed.sort_by(order_by_bitmap_desc);
245 rest.sort_by(order_by_bitmap_desc);
246
247 let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
248 actor_id,
249 balance: bitmap.count_ones() as i32,
250 builder: builder_from_bitmap(&bitmap),
251 });
252
253 let mut rest_balances = rest
254 .into_iter()
255 .map(|(actor_id, bitmap)| Balance {
256 actor_id,
257 balance: bitmap.count_ones() as i32 - expected as i32,
258 builder: builder_from_bitmap(&bitmap),
259 })
260 .collect_vec();
261
262 let mut created_balances = actors_to_create
263 .iter()
264 .map(|actor_id| Balance {
265 actor_id: *actor_id,
266 balance: -(expected as i32),
267 builder: BitmapBuilder::zeroed(vnode_count),
268 })
269 .collect_vec();
270
271 for balance in created_balances
272 .iter_mut()
273 .rev()
274 .take(prev_remain)
275 .chain(rest_balances.iter_mut())
276 {
277 if remain > 0 {
278 balance.balance -= 1;
279 remain -= 1;
280 }
281 }
282
283 for balance in &mut created_balances {
285 if remain > 0 {
286 balance.balance -= 1;
287 remain -= 1;
288 }
289 }
290
291 assert_eq!(remain, 0);
292
293 let mut v: VecDeque<_> = removed_balances
294 .chain(rest_balances)
295 .chain(created_balances)
296 .collect();
297
298 let mut result = HashMap::with_capacity(target_actor_count);
301
302 for balance in &v {
303 tracing::debug!(
304 "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
305 balance.actor_id,
306 balance.balance,
307 actors_to_remove.contains(&balance.actor_id),
308 actors_to_create.contains(&balance.actor_id)
309 );
310 }
311
312 while !v.is_empty() {
313 if v.len() == 1 {
314 let single = v.pop_front().unwrap();
315 assert_eq!(single.balance, 0);
316 if !actors_to_remove.contains(&single.actor_id) {
317 result.insert(single.actor_id, single.builder.finish());
318 }
319
320 continue;
321 }
322
323 let mut src = v.pop_front().unwrap();
324 let mut dst = v.pop_back().unwrap();
325
326 let n = min(abs(src.balance), abs(dst.balance));
327
328 let mut moved = 0;
329 for idx in (0..vnode_count).rev() {
330 if moved >= n {
331 break;
332 }
333
334 if src.builder.is_set(idx) {
335 src.builder.set(idx, false);
336 assert!(!dst.builder.is_set(idx));
337 dst.builder.set(idx, true);
338 moved += 1;
339 }
340 }
341
342 src.balance -= n;
343 dst.balance += n;
344
345 if src.balance != 0 {
346 v.push_front(src);
347 } else if !actors_to_remove.contains(&src.actor_id) {
348 result.insert(src.actor_id, src.builder.finish());
349 }
350
351 if dst.balance != 0 {
352 v.push_back(dst);
353 } else {
354 result.insert(dst.actor_id, dst.builder.finish());
355 }
356 }
357
358 result
359}
360
361#[derive(Debug, Clone, Copy)]
362pub struct RescheduleOptions {
363 pub resolve_no_shuffle_upstream: bool,
365
366 pub skip_create_new_actors: bool,
368}
369
370pub type ScaleControllerRef = Arc<ScaleController>;
371
372pub struct ScaleController {
373 pub metadata_manager: MetadataManager,
374
375 pub source_manager: SourceManagerRef,
376
377 pub env: MetaSrvEnv,
378
379 pub reschedule_lock: RwLock<()>,
382}
383
384impl ScaleController {
385 pub fn new(
386 metadata_manager: &MetadataManager,
387 source_manager: SourceManagerRef,
388 env: MetaSrvEnv,
389 ) -> Self {
390 Self {
391 metadata_manager: metadata_manager.clone(),
392 source_manager,
393 env,
394 reschedule_lock: RwLock::new(()),
395 }
396 }
397
398 pub async fn integrity_check(&self) -> MetaResult<()> {
399 self.metadata_manager
400 .catalog_controller
401 .integrity_check()
402 .await
403 }
404
405 async fn build_reschedule_context(
407 &self,
408 reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
409 options: RescheduleOptions,
410 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
411 ) -> MetaResult<RescheduleContext> {
412 let worker_nodes: HashMap<WorkerId, WorkerNode> = self
413 .metadata_manager
414 .list_active_streaming_compute_nodes()
415 .await?
416 .into_iter()
417 .map(|worker_node| (worker_node.id as _, worker_node))
418 .collect();
419
420 if worker_nodes.is_empty() {
421 bail!("no available compute node in the cluster");
422 }
423
424 let unschedulable_worker_ids: HashSet<_> = worker_nodes
426 .values()
427 .filter(|w| {
428 w.property
429 .as_ref()
430 .map(|property| property.is_unschedulable)
431 .unwrap_or(false)
432 })
433 .map(|worker| worker.id as WorkerId)
434 .collect();
435
436 for (fragment_id, reschedule) in &*reschedule {
437 for (worker_id, change) in &reschedule.worker_actor_diff {
438 if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
439 bail!(
440 "unable to move fragment {} to unschedulable worker {}",
441 fragment_id,
442 worker_id
443 );
444 }
445 }
446 }
447
448 let mut actor_map = HashMap::new();
451 let mut fragment_map = HashMap::new();
453 let mut actor_status = BTreeMap::new();
455 let mut fragment_state = HashMap::new();
456 let mut fragment_to_table = HashMap::new();
457
458 fn fulfill_index_by_fragment_ids(
459 actor_map: &mut HashMap<u32, CustomActorInfo>,
460 fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
461 actor_status: &mut BTreeMap<ActorId, WorkerId>,
462 fragment_state: &mut HashMap<FragmentId, State>,
463 fragment_to_table: &mut HashMap<FragmentId, TableId>,
464 fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
465 actors: HashMap<ActorId, actor::Model>,
466 mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
467 related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
468 ) {
469 let mut fragment_actors: HashMap<
470 risingwave_meta_model::FragmentId,
471 Vec<CustomActorInfo>,
472 > = HashMap::new();
473
474 let mut expr_contexts = HashMap::new();
475 for (
476 _,
477 actor::Model {
478 actor_id,
479 fragment_id,
480 status: _,
481 splits: _,
482 worker_id,
483 vnode_bitmap,
484 expr_context,
485 ..
486 },
487 ) in actors
488 {
489 let dispatchers = actor_dispatchers
490 .remove(&(actor_id as _))
491 .unwrap_or_default();
492
493 let actor_info = CustomActorInfo {
494 actor_id: actor_id as _,
495 fragment_id: fragment_id as _,
496 dispatcher: dispatchers,
497 vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
498 };
499
500 actor_map.insert(actor_id as _, actor_info.clone());
501
502 fragment_actors
503 .entry(fragment_id as _)
504 .or_default()
505 .push(actor_info);
506
507 actor_status.insert(actor_id as _, worker_id as WorkerId);
508
509 expr_contexts.insert(actor_id as u32, expr_context);
510 }
511
512 for (
513 _,
514 fragment::Model {
515 fragment_id,
516 job_id,
517 fragment_type_mask,
518 distribution_type,
519 stream_node,
520 state_table_ids,
521 ..
522 },
523 ) in fragments
524 {
525 let actors = fragment_actors
526 .remove(&(fragment_id as _))
527 .unwrap_or_default();
528
529 let CustomActorInfo {
530 actor_id,
531 fragment_id,
532 dispatcher,
533 vnode_bitmap,
534 } = actors.first().unwrap().clone();
535
536 let (related_job, job_definition) =
537 related_jobs.get(&job_id).expect("job not found");
538
539 let fragment = CustomFragmentInfo {
540 fragment_id: fragment_id as _,
541 fragment_type_mask: fragment_type_mask as _,
542 distribution_type: distribution_type.into(),
543 state_table_ids: state_table_ids.into_u32_array(),
544 node: stream_node.to_protobuf(),
545 actor_template: (
546 StreamActor {
547 actor_id,
548 fragment_id: fragment_id as _,
549 vnode_bitmap,
550 mview_definition: job_definition.to_owned(),
551 expr_context: expr_contexts
552 .get(&actor_id)
553 .cloned()
554 .map(|expr_context| expr_context.to_protobuf()),
555 },
556 dispatcher,
557 ),
558 actors,
559 };
560
561 fragment_map.insert(fragment_id as _, fragment);
562
563 fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
564
565 fragment_state.insert(
566 fragment_id,
567 table_fragments::PbState::from(related_job.job_status),
568 );
569 }
570 }
571 let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
572 let working_set = self
573 .metadata_manager
574 .catalog_controller
575 .resolve_working_set_for_reschedule_fragments(fragment_ids)
576 .await?;
577
578 fulfill_index_by_fragment_ids(
579 &mut actor_map,
580 &mut fragment_map,
581 &mut actor_status,
582 &mut fragment_state,
583 &mut fragment_to_table,
584 working_set.fragments,
585 working_set.actors,
586 working_set.actor_dispatchers,
587 working_set.related_jobs,
588 );
589
590 let mut no_shuffle_source_fragment_ids = HashSet::new();
592 let mut no_shuffle_target_fragment_ids = HashSet::new();
593
594 Self::build_no_shuffle_relation_index(
595 &actor_map,
596 &mut no_shuffle_source_fragment_ids,
597 &mut no_shuffle_target_fragment_ids,
598 );
599
600 if options.resolve_no_shuffle_upstream {
601 let original_reschedule_keys = reschedule.keys().cloned().collect();
602
603 Self::resolve_no_shuffle_upstream_fragments(
604 reschedule,
605 &no_shuffle_source_fragment_ids,
606 &no_shuffle_target_fragment_ids,
607 &working_set.fragment_upstreams,
608 )?;
609
610 if !table_parallelisms.is_empty() {
611 Self::resolve_no_shuffle_upstream_tables(
613 original_reschedule_keys,
614 &no_shuffle_source_fragment_ids,
615 &no_shuffle_target_fragment_ids,
616 &fragment_to_table,
617 &working_set.fragment_upstreams,
618 table_parallelisms,
619 )?;
620 }
621 }
622
623 let mut fragment_dispatcher_map = HashMap::new();
624 Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
625
626 let mut stream_source_fragment_ids = HashSet::new();
627 let mut stream_source_backfill_fragment_ids = HashMap::new();
628 let mut no_shuffle_reschedule = HashMap::new();
629 for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
630 let fragment = fragment_map
631 .get(fragment_id)
632 .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
633
634 match fragment_state[fragment_id] {
636 table_fragments::State::Unspecified => unreachable!(),
637 state @ table_fragments::State::Initial => {
638 bail!(
639 "the materialized view of fragment {fragment_id} is in state {}",
640 state.as_str_name()
641 )
642 }
643 state @ table_fragments::State::Creating => {
644 let stream_node = &fragment.node;
645
646 let mut is_reschedulable = true;
647 visit_stream_node_cont(stream_node, |body| {
648 if let Some(NodeBody::StreamScan(node)) = &body.node_body {
649 if !node.stream_scan_type().is_reschedulable() {
650 is_reschedulable = false;
651
652 return false;
654 }
655
656 return true;
658 }
659
660 true
662 });
663
664 if !is_reschedulable {
665 bail!(
666 "the materialized view of fragment {fragment_id} is in state {}",
667 state.as_str_name()
668 )
669 }
670 }
671 table_fragments::State::Created => {}
672 }
673
674 if no_shuffle_target_fragment_ids.contains(fragment_id) {
675 bail!(
676 "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
677 );
678 }
679
680 if no_shuffle_source_fragment_ids.contains(fragment_id) {
685 let mut queue: VecDeque<_> = fragment_dispatcher_map
687 .get(fragment_id)
688 .unwrap()
689 .keys()
690 .cloned()
691 .collect();
692
693 while let Some(downstream_id) = queue.pop_front() {
694 if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
695 continue;
696 }
697
698 if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
699 {
700 let no_shuffle_downstreams = downstream_fragments
701 .iter()
702 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
703 .map(|(fragment_id, _)| fragment_id);
704
705 queue.extend(no_shuffle_downstreams.copied());
706 }
707
708 no_shuffle_reschedule.insert(
709 downstream_id,
710 WorkerReschedule {
711 worker_actor_diff: worker_actor_diff.clone(),
712 },
713 );
714 }
715 }
716
717 if (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0
718 && fragment.node.find_stream_source().is_some()
719 {
720 stream_source_fragment_ids.insert(*fragment_id);
721 }
722
723 let current_worker_ids = fragment
725 .actors
726 .iter()
727 .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
728 .collect::<HashSet<_>>();
729
730 for (removed, change) in worker_actor_diff {
731 if !current_worker_ids.contains(removed) && change.is_negative() {
732 bail!(
733 "no actor on the worker {} of fragment {}",
734 removed,
735 fragment_id
736 );
737 }
738 }
739
740 let added_actor_count: usize = worker_actor_diff
741 .values()
742 .filter(|change| change.is_positive())
743 .cloned()
744 .map(|change| change as usize)
745 .sum();
746
747 let removed_actor_count: usize = worker_actor_diff
748 .values()
749 .filter(|change| change.is_positive())
750 .cloned()
751 .map(|v| v.unsigned_abs())
752 .sum();
753
754 match fragment.distribution_type() {
755 FragmentDistributionType::Hash => {
756 if fragment.actors.len() + added_actor_count <= removed_actor_count {
757 bail!("can't remove all actors from fragment {}", fragment_id);
758 }
759 }
760 FragmentDistributionType::Single => {
761 if added_actor_count != removed_actor_count {
762 bail!("single distribution fragment only support migration");
763 }
764 }
765 FragmentDistributionType::Unspecified => unreachable!(),
766 }
767 }
768
769 if !no_shuffle_reschedule.is_empty() {
770 tracing::info!(
771 "reschedule plan rewritten with NoShuffle reschedule {:?}",
772 no_shuffle_reschedule
773 );
774
775 for noshuffle_downstream in no_shuffle_reschedule.keys() {
776 let fragment = fragment_map.get(noshuffle_downstream).unwrap();
777 if (fragment.fragment_type_mask & FragmentTypeFlag::SourceScan as u32) != 0 {
779 let stream_node = &fragment.node;
780 if let Some((_source_id, upstream_source_fragment_id)) =
781 stream_node.find_source_backfill()
782 {
783 stream_source_backfill_fragment_ids
784 .insert(fragment.fragment_id, upstream_source_fragment_id);
785 }
786 }
787 }
788 }
789
790 reschedule.extend(no_shuffle_reschedule.into_iter());
792
793 Ok(RescheduleContext {
794 actor_map,
795 actor_status,
796 fragment_map,
797 stream_source_fragment_ids,
798 stream_source_backfill_fragment_ids,
799 no_shuffle_target_fragment_ids,
800 no_shuffle_source_fragment_ids,
801 fragment_dispatcher_map,
802 fragment_upstreams: working_set.fragment_upstreams,
803 })
804 }
805
806 pub(crate) async fn analyze_reschedule_plan(
818 &self,
819 mut reschedules: HashMap<FragmentId, WorkerReschedule>,
820 options: RescheduleOptions,
821 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
822 ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
823 tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
824 let ctx = self
825 .build_reschedule_context(&mut reschedules, options, table_parallelisms)
826 .await?;
827 tracing::debug!("reschedule context: {:#?}", ctx);
828 let reschedules = reschedules;
829
830 let (fragment_actors_to_remove, fragment_actors_to_create) =
835 self.arrange_reschedules(&reschedules, &ctx)?;
836
837 let mut fragment_actor_bitmap = HashMap::new();
838 for fragment_id in reschedules.keys() {
839 if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
840 continue;
843 }
844
845 let actors_to_create = fragment_actors_to_create
846 .get(fragment_id)
847 .map(|map| map.iter().map(|(actor_id, _)| *actor_id).collect())
848 .unwrap_or_default();
849
850 let actors_to_remove = fragment_actors_to_remove
851 .get(fragment_id)
852 .map(|map| map.iter().map(|(actor_id, _)| *actor_id).collect())
853 .unwrap_or_default();
854
855 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
856
857 match fragment.distribution_type() {
858 FragmentDistributionType::Single => {
859 fragment_actor_bitmap
861 .insert(fragment.fragment_id as FragmentId, Default::default());
862 }
863 FragmentDistributionType::Hash => {
864 let actor_vnode = rebalance_actor_vnode(
865 &fragment.actors,
866 &actors_to_remove,
867 &actors_to_create,
868 );
869
870 fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
871 }
872
873 FragmentDistributionType::Unspecified => unreachable!(),
874 }
875 }
876
877 let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
881 for fragment_id in reschedules.keys() {
882 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
883 let mut new_actor_ids = BTreeMap::new();
884 for actor in &fragment.actors {
885 if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id) {
886 if actors_to_remove.contains_key(&actor.actor_id) {
887 continue;
888 }
889 }
890 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
891 new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
892 }
893
894 if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
895 for (actor_id, worker_id) in actors_to_create {
896 new_actor_ids.insert(*actor_id, *worker_id);
897 }
898 }
899
900 assert!(
901 !new_actor_ids.is_empty(),
902 "should be at least one actor in fragment {} after rescheduling",
903 fragment_id
904 );
905
906 fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
907 }
908
909 let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
910
911 fn arrange_no_shuffle_relation(
917 ctx: &RescheduleContext,
918 fragment_id: &FragmentId,
919 upstream_fragment_id: &FragmentId,
920 fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
921 actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
922 fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
923 no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
924 no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
925 ) {
926 if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
927 return;
928 }
929
930 let fragment = &ctx.fragment_map[fragment_id];
931
932 let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
933
934 for upstream_actor in &upstream_fragment.actors {
936 for dispatcher in &upstream_actor.dispatcher {
937 if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
938 let downstream_actor_id =
939 *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
940
941 if !ctx
943 .no_shuffle_target_fragment_ids
944 .contains(upstream_fragment_id)
945 {
946 actor_group_map.insert(
947 upstream_actor.actor_id,
948 (upstream_fragment.fragment_id, upstream_actor.actor_id),
949 );
950 actor_group_map.insert(
951 downstream_actor_id,
952 (upstream_fragment.fragment_id, upstream_actor.actor_id),
953 );
954 } else {
955 let root_actor_id = actor_group_map[&upstream_actor.actor_id];
956
957 actor_group_map.insert(downstream_actor_id, root_actor_id);
958 }
959 }
960 }
961 }
962
963 let upstream_fragment_bitmap = fragment_updated_bitmap
965 .get(upstream_fragment_id)
966 .cloned()
967 .unwrap_or_default();
968
969 if upstream_fragment.distribution_type() == FragmentDistributionType::Single {
971 assert!(
972 upstream_fragment_bitmap.is_empty(),
973 "single fragment should have no bitmap updates"
974 );
975 }
976
977 let upstream_fragment_actor_map = fragment_actors_after_reschedule
978 .get(upstream_fragment_id)
979 .cloned()
980 .unwrap();
981
982 let fragment_actor_map = fragment_actors_after_reschedule
983 .get(fragment_id)
984 .cloned()
985 .unwrap();
986
987 let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
988
989 let mut fragment_bitmap = HashMap::new();
991
992 for (actor_id, worker_id) in &fragment_actor_map {
993 if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
994 let root_bitmap = fragment_updated_bitmap
995 .get(root_fragment)
996 .expect("root fragment bitmap not found")
997 .get(root_actor_id)
998 .cloned()
999 .expect("root actor bitmap not found");
1000
1001 fragment_bitmap.insert(*actor_id, root_bitmap);
1003
1004 no_shuffle_upstream_actor_map
1005 .entry(*actor_id as ActorId)
1006 .or_default()
1007 .insert(*upstream_fragment_id, *root_actor_id);
1008 no_shuffle_downstream_actors_map
1009 .entry(*root_actor_id)
1010 .or_default()
1011 .insert(*fragment_id, *actor_id);
1012 } else {
1013 worker_reverse_index
1014 .entry(*worker_id)
1015 .or_default()
1016 .insert(*actor_id);
1017 }
1018 }
1019
1020 let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1021
1022 for (actor_id, worker_id) in &upstream_fragment_actor_map {
1023 if !actor_group_map.contains_key(actor_id) {
1024 upstream_worker_reverse_index
1025 .entry(*worker_id)
1026 .or_default()
1027 .insert(*actor_id);
1028 }
1029 }
1030
1031 for (worker_id, actor_ids) in worker_reverse_index {
1033 let upstream_actor_ids = upstream_worker_reverse_index
1034 .get(&worker_id)
1035 .unwrap()
1036 .clone();
1037
1038 assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1039
1040 for (actor_id, upstream_actor_id) in actor_ids
1041 .into_iter()
1042 .zip_eq_debug(upstream_actor_ids.into_iter())
1043 {
1044 match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1045 None => {
1046 assert_eq!(
1048 upstream_fragment.distribution_type(),
1049 FragmentDistributionType::Single
1050 );
1051 }
1052 Some(bitmap) => {
1053 fragment_bitmap.insert(actor_id, bitmap);
1055 }
1056 }
1057
1058 no_shuffle_upstream_actor_map
1059 .entry(actor_id as ActorId)
1060 .or_default()
1061 .insert(*upstream_fragment_id, upstream_actor_id);
1062 no_shuffle_downstream_actors_map
1063 .entry(upstream_actor_id)
1064 .or_default()
1065 .insert(*fragment_id, actor_id);
1066 }
1067 }
1068
1069 match fragment.distribution_type() {
1070 FragmentDistributionType::Hash => {}
1071 FragmentDistributionType::Single => {
1072 assert!(fragment_bitmap.is_empty());
1074 }
1075 FragmentDistributionType::Unspecified => unreachable!(),
1076 }
1077
1078 if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1079 assert_eq!(
1080 e.entry.get(),
1081 &e.value,
1082 "bitmaps derived from different no-shuffle upstreams mismatch"
1083 );
1084 }
1085
1086 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1088 let no_shuffle_downstreams = downstream_fragments
1089 .iter()
1090 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1091 .map(|(fragment_id, _)| fragment_id);
1092
1093 for downstream_fragment_id in no_shuffle_downstreams {
1094 arrange_no_shuffle_relation(
1095 ctx,
1096 downstream_fragment_id,
1097 fragment_id,
1098 fragment_actors_after_reschedule,
1099 actor_group_map,
1100 fragment_updated_bitmap,
1101 no_shuffle_upstream_actor_map,
1102 no_shuffle_downstream_actors_map,
1103 );
1104 }
1105 }
1106 }
1107
1108 let mut no_shuffle_upstream_actor_map = HashMap::new();
1109 let mut no_shuffle_downstream_actors_map = HashMap::new();
1110 let mut actor_group_map = HashMap::new();
1111 for fragment_id in reschedules.keys() {
1114 if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1115 && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1116 {
1117 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1118 for downstream_fragment_id in downstream_fragments.keys() {
1119 arrange_no_shuffle_relation(
1120 &ctx,
1121 downstream_fragment_id,
1122 fragment_id,
1123 &fragment_actors_after_reschedule,
1124 &mut actor_group_map,
1125 &mut fragment_actor_bitmap,
1126 &mut no_shuffle_upstream_actor_map,
1127 &mut no_shuffle_downstream_actors_map,
1128 );
1129 }
1130 }
1131 }
1132 }
1133
1134 tracing::debug!("actor group map {:?}", actor_group_map);
1135
1136 let mut new_created_actors = HashMap::new();
1137 for fragment_id in reschedules.keys() {
1138 let actors_to_create = fragment_actors_to_create
1139 .get(fragment_id)
1140 .cloned()
1141 .unwrap_or_default();
1142
1143 let fragment = &ctx.fragment_map[fragment_id];
1144
1145 assert!(!fragment.actors.is_empty());
1146
1147 for actor_to_create in &actors_to_create {
1148 let new_actor_id = actor_to_create.0;
1149 let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1150
1151 new_actor.actor_id = *new_actor_id;
1155
1156 Self::modify_actor_upstream_and_downstream(
1157 &ctx,
1158 &fragment_actors_to_remove,
1159 &fragment_actors_to_create,
1160 &fragment_actor_bitmap,
1161 &no_shuffle_downstream_actors_map,
1162 &mut new_actor,
1163 &mut dispatchers,
1164 )?;
1165
1166 if let Some(bitmap) = fragment_actor_bitmap
1167 .get(fragment_id)
1168 .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1169 {
1170 new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1171 }
1172
1173 new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1174 }
1175 }
1176
1177 let mut fragment_actor_splits = HashMap::new();
1180 for fragment_id in reschedules.keys() {
1181 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1182
1183 if ctx.stream_source_fragment_ids.contains(fragment_id) {
1184 let fragment = &ctx.fragment_map[fragment_id];
1185
1186 let prev_actor_ids = fragment
1187 .actors
1188 .iter()
1189 .map(|actor| actor.actor_id)
1190 .collect_vec();
1191
1192 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1193
1194 let actor_splits = self
1195 .source_manager
1196 .migrate_splits_for_source_actors(
1197 *fragment_id,
1198 &prev_actor_ids,
1199 &curr_actor_ids,
1200 )
1201 .await?;
1202
1203 tracing::debug!(
1204 "source actor splits: {:?}, fragment_id: {}",
1205 actor_splits,
1206 fragment_id
1207 );
1208 fragment_actor_splits.insert(*fragment_id, actor_splits);
1209 }
1210 }
1211 if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1213 for fragment_id in reschedules.keys() {
1214 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1215
1216 if let Some(upstream_source_fragment_id) =
1217 ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1218 {
1219 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1220
1221 let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1222 *fragment_id,
1223 *upstream_source_fragment_id,
1224 &curr_actor_ids,
1225 &fragment_actor_splits,
1226 &no_shuffle_upstream_actor_map,
1227 )?;
1228 tracing::debug!(
1229 "source backfill actor splits: {:?}, fragment_id: {}",
1230 actor_splits,
1231 fragment_id
1232 );
1233 fragment_actor_splits.insert(*fragment_id, actor_splits);
1234 }
1235 }
1236 }
1237
1238 let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1240 HashMap::with_capacity(reschedules.len());
1241
1242 for (fragment_id, _) in reschedules {
1243 let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1244
1245 if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1246 for (actor_id, worker_id) in actor_worker_maps {
1247 actors_to_create
1248 .entry(worker_id)
1249 .or_default()
1250 .push(actor_id);
1251 }
1252 }
1253
1254 let actors_to_remove = fragment_actors_to_remove
1255 .get(&fragment_id)
1256 .cloned()
1257 .unwrap_or_default()
1258 .into_keys()
1259 .collect();
1260
1261 let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1262
1263 assert!(!actors_after_reschedule.is_empty());
1264
1265 let fragment = &ctx.fragment_map[&fragment_id];
1266
1267 let in_degree_types: HashSet<_> = ctx
1268 .fragment_upstreams
1269 .get(&(fragment_id as _))
1270 .map(|upstreams| upstreams.values())
1271 .into_iter()
1272 .flatten()
1273 .cloned()
1274 .collect();
1275
1276 let upstream_dispatcher_mapping = match fragment.distribution_type() {
1277 FragmentDistributionType::Hash => {
1278 if !in_degree_types.contains(&DispatcherType::Hash) {
1279 None
1280 } else {
1281 Some(ActorMapping::from_bitmaps(
1283 &fragment_actor_bitmap[&fragment_id],
1284 ))
1285 }
1286 }
1287
1288 FragmentDistributionType::Single => {
1289 assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1290 None
1291 }
1292 FragmentDistributionType::Unspecified => unreachable!(),
1293 };
1294
1295 let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1296
1297 {
1298 if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1299 for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1300 match upstream_dispatcher_type {
1301 DispatcherType::NoShuffle => {}
1302 _ => {
1303 upstream_fragment_dispatcher_set.insert((
1304 *upstream_fragment_id as FragmentId,
1305 fragment.fragment_id as DispatcherId,
1306 ));
1307 }
1308 }
1309 }
1310 }
1311 }
1312
1313 let downstream_fragment_ids = if let Some(downstream_fragments) =
1314 ctx.fragment_dispatcher_map.get(&fragment_id)
1315 {
1316 downstream_fragments
1319 .iter()
1320 .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1321 .map(|(fragment_id, _)| *fragment_id)
1322 .collect_vec()
1323 } else {
1324 vec![]
1325 };
1326
1327 let vnode_bitmap_updates = match fragment.distribution_type() {
1328 FragmentDistributionType::Hash => {
1329 let mut vnode_bitmap_updates =
1330 fragment_actor_bitmap.remove(&fragment_id).unwrap();
1331
1332 for actor_id in actors_after_reschedule.keys() {
1335 assert!(vnode_bitmap_updates.contains_key(actor_id));
1336
1337 if let Some(actor) = ctx.actor_map.get(actor_id) {
1339 let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1340
1341 if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref() {
1342 if prev_bitmap.eq(bitmap) {
1343 vnode_bitmap_updates.remove(actor_id);
1344 }
1345 }
1346 }
1347 }
1348
1349 vnode_bitmap_updates
1350 }
1351 FragmentDistributionType::Single => HashMap::new(),
1352 FragmentDistributionType::Unspecified => unreachable!(),
1353 };
1354
1355 let upstream_fragment_dispatcher_ids =
1356 upstream_fragment_dispatcher_set.into_iter().collect_vec();
1357
1358 let actor_splits = fragment_actor_splits
1359 .get(&fragment_id)
1360 .cloned()
1361 .unwrap_or_default();
1362
1363 reschedule_fragment.insert(
1364 fragment_id,
1365 Reschedule {
1366 added_actors: actors_to_create,
1367 removed_actors: actors_to_remove,
1368 vnode_bitmap_updates,
1369 upstream_fragment_dispatcher_ids,
1370 upstream_dispatcher_mapping,
1371 downstream_fragment_ids,
1372 actor_splits,
1373 newly_created_actors: Default::default(),
1374 },
1375 );
1376 }
1377
1378 let mut fragment_created_actors = HashMap::new();
1379 for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1380 let mut created_actors = HashMap::new();
1381 for (actor_id, worker_id) in actors_to_create {
1382 let actor = new_created_actors.get(actor_id).cloned().unwrap();
1383 created_actors.insert(*actor_id, (actor, *worker_id));
1384 }
1385
1386 fragment_created_actors.insert(*fragment_id, created_actors);
1387 }
1388
1389 for (fragment_id, to_create) in fragment_created_actors {
1390 let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1391 reschedule.newly_created_actors = to_create;
1392 }
1393 tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1394
1395 Ok(reschedule_fragment)
1396 }
1397
1398 #[expect(clippy::type_complexity)]
1399 fn arrange_reschedules(
1400 &self,
1401 reschedule: &HashMap<FragmentId, WorkerReschedule>,
1402 ctx: &RescheduleContext,
1403 ) -> MetaResult<(
1404 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1405 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1406 )> {
1407 let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1408 let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1409
1410 for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1411 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1412
1413 let mut actors_to_remove = BTreeMap::new();
1415 let mut actors_to_create = BTreeMap::new();
1416
1417 let mut worker_to_actors = HashMap::new();
1419
1420 for actor in &fragment.actors {
1421 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1422 worker_to_actors
1423 .entry(worker_id)
1424 .or_insert(BTreeSet::new())
1425 .insert(actor.actor_id as ActorId);
1426 }
1427
1428 let decreased_actor_count = worker_actor_diff
1429 .iter()
1430 .filter(|(_, change)| change.is_negative())
1431 .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1432
1433 for (worker_id, n) in decreased_actor_count {
1434 if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1435 if actor_ids.len() < n {
1436 bail!(
1437 "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1438 fragment_id,
1439 worker_id,
1440 actor_ids.len(),
1441 n
1442 );
1443 }
1444
1445 let removed_actors: Vec<_> = actor_ids
1446 .iter()
1447 .skip(actor_ids.len().saturating_sub(n))
1448 .cloned()
1449 .collect();
1450
1451 for actor in removed_actors {
1452 actors_to_remove.insert(actor, *worker_id);
1453 }
1454 }
1455 }
1456
1457 let increased_actor_count = worker_actor_diff
1458 .iter()
1459 .filter(|(_, change)| change.is_positive());
1460
1461 for (worker, n) in increased_actor_count {
1462 for _ in 0..*n {
1463 let id = self
1464 .env
1465 .id_gen_manager()
1466 .generate_interval::<{ IdCategory::Actor }>(1)
1467 as ActorId;
1468 actors_to_create.insert(id, *worker);
1469 }
1470 }
1471
1472 if !actors_to_remove.is_empty() {
1473 fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1474 }
1475
1476 if !actors_to_create.is_empty() {
1477 fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1478 }
1479 }
1480
1481 for actors_to_remove in fragment_actors_to_remove.values() {
1483 for actor_id in actors_to_remove.keys() {
1484 let actor = ctx.actor_map.get(actor_id).unwrap();
1485 for dispatcher in &actor.dispatcher {
1486 if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1487 let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1488
1489 let _should_exists = fragment_actors_to_remove
1490 .get(&(dispatcher.dispatcher_id as FragmentId))
1491 .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1492 .get(downstream_actor_id)
1493 .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1494 }
1495 }
1496 }
1497 }
1498
1499 Ok((fragment_actors_to_remove, fragment_actors_to_create))
1500 }
1501
1502 fn modify_actor_upstream_and_downstream(
1505 ctx: &RescheduleContext,
1506 fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1507 fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1508 fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1509 no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1510 new_actor: &mut StreamActor,
1511 dispatchers: &mut Vec<PbDispatcher>,
1512 ) -> MetaResult<()> {
1513 for dispatcher in dispatchers {
1515 let downstream_fragment_id = dispatcher
1516 .downstream_actor_id
1517 .iter()
1518 .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1519 .dedup()
1520 .exactly_one()
1521 .unwrap() as FragmentId;
1522
1523 let downstream_fragment_actors_to_remove =
1524 fragment_actors_to_remove.get(&downstream_fragment_id);
1525 let downstream_fragment_actors_to_create =
1526 fragment_actors_to_create.get(&downstream_fragment_id);
1527
1528 match dispatcher.r#type() {
1529 d @ (PbDispatcherType::Hash
1530 | PbDispatcherType::Simple
1531 | PbDispatcherType::Broadcast) => {
1532 if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1533 {
1534 dispatcher
1535 .downstream_actor_id
1536 .retain(|id| !downstream_actors_to_remove.contains_key(id));
1537 }
1538
1539 if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1540 {
1541 dispatcher
1542 .downstream_actor_id
1543 .extend(downstream_actors_to_create.keys().cloned())
1544 }
1545
1546 if d == PbDispatcherType::Simple {
1548 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1549 }
1550 }
1551 PbDispatcherType::NoShuffle => {
1552 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1553 let downstream_actor_id = no_shuffle_downstream_actors_map
1554 .get(&new_actor.actor_id)
1555 .and_then(|map| map.get(&downstream_fragment_id))
1556 .unwrap();
1557 dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1558 }
1559 PbDispatcherType::Unspecified => unreachable!(),
1560 }
1561
1562 if let Some(mapping) = dispatcher.hash_mapping.as_mut() {
1563 if let Some(downstream_updated_bitmap) =
1564 fragment_actor_bitmap.get(&downstream_fragment_id)
1565 {
1566 *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1568 }
1569 }
1570 }
1571
1572 Ok(())
1573 }
1574
1575 pub async fn post_apply_reschedule(
1576 &self,
1577 reschedules: &HashMap<FragmentId, Reschedule>,
1578 post_updates: &JobReschedulePostUpdates,
1579 ) -> MetaResult<()> {
1580 self.metadata_manager
1582 .post_apply_reschedules(reschedules.clone(), post_updates)
1583 .await?;
1584
1585 if !reschedules.is_empty() {
1587 let workers = self
1588 .metadata_manager
1589 .list_active_serving_compute_nodes()
1590 .await?;
1591 let streaming_parallelisms = self
1592 .metadata_manager
1593 .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1594 .await?;
1595 let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1596 let max_serving_parallelism = self
1597 .env
1598 .session_params_manager_impl_ref()
1599 .get_params()
1600 .await
1601 .batch_parallelism()
1602 .map(|p| p.get());
1603 let (upserted, failed) = serving_worker_slot_mapping.upsert(
1604 streaming_parallelisms,
1605 &workers,
1606 max_serving_parallelism,
1607 );
1608 if !upserted.is_empty() {
1609 tracing::debug!(
1610 "Update serving vnode mapping for fragments {:?}.",
1611 upserted.keys()
1612 );
1613 self.env
1614 .notification_manager()
1615 .notify_frontend_without_version(
1616 Operation::Update,
1617 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1618 mappings: to_fragment_worker_slot_mapping(&upserted),
1619 }),
1620 );
1621 }
1622 if !failed.is_empty() {
1623 tracing::debug!(
1624 "Fail to update serving vnode mapping for fragments {:?}.",
1625 failed
1626 );
1627 self.env
1628 .notification_manager()
1629 .notify_frontend_without_version(
1630 Operation::Delete,
1631 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1632 mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1633 }),
1634 );
1635 }
1636 }
1637
1638 let mut stream_source_actor_splits = HashMap::new();
1639 let mut stream_source_dropped_actors = HashSet::new();
1640
1641 for (fragment_id, reschedule) in reschedules {
1643 if !reschedule.actor_splits.is_empty() {
1644 stream_source_actor_splits
1645 .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1646 stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1647 }
1648 }
1649
1650 if !stream_source_actor_splits.is_empty() {
1651 self.source_manager
1652 .apply_source_change(SourceChange::Reschedule {
1653 split_assignment: stream_source_actor_splits,
1654 dropped_actors: stream_source_dropped_actors,
1655 })
1656 .await;
1657 }
1658
1659 Ok(())
1660 }
1661
1662 pub async fn generate_job_reschedule_plan(
1663 &self,
1664 policy: JobReschedulePolicy,
1665 ) -> MetaResult<JobReschedulePlan> {
1666 type VnodeCount = usize;
1667
1668 let JobReschedulePolicy { targets } = policy;
1669
1670 let workers = self
1671 .metadata_manager
1672 .list_active_streaming_compute_nodes()
1673 .await?;
1674
1675 let workers: HashMap<_, _> = workers
1677 .into_iter()
1678 .filter(|worker| worker.is_streaming_schedulable())
1679 .map(|worker| (worker.id, worker))
1680 .collect();
1681
1682 #[derive(Debug)]
1683 struct JobUpdate {
1684 filtered_worker_ids: BTreeSet<WorkerId>,
1685 parallelism: TableParallelism,
1686 }
1687
1688 let mut job_parallelism_updates = HashMap::new();
1689
1690 let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1691 parallelism_updates: Default::default(),
1692 resource_group_updates: Default::default(),
1693 };
1694
1695 for (
1696 job_id,
1697 JobRescheduleTarget {
1698 parallelism: parallelism_update,
1699 resource_group: resource_group_update,
1700 },
1701 ) in &targets
1702 {
1703 let parallelism = match parallelism_update {
1704 JobParallelismTarget::Update(parallelism) => *parallelism,
1705 JobParallelismTarget::Refresh => {
1706 let parallelism = self
1707 .metadata_manager
1708 .catalog_controller
1709 .get_job_streaming_parallelisms(*job_id as _)
1710 .await?;
1711
1712 parallelism.into()
1713 }
1714 };
1715
1716 job_reschedule_post_updates
1717 .parallelism_updates
1718 .insert(TableId::from(*job_id), parallelism);
1719
1720 let current_resource_group = match resource_group_update {
1721 JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1722 job_reschedule_post_updates.resource_group_updates.insert(
1723 *job_id as ObjectId,
1724 Some(specific_resource_group.to_owned()),
1725 );
1726
1727 specific_resource_group.to_owned()
1728 }
1729 JobResourceGroupTarget::Update(None) => {
1730 let database_resource_group = self
1731 .metadata_manager
1732 .catalog_controller
1733 .get_existing_job_database_resource_group(*job_id as _)
1734 .await?;
1735
1736 job_reschedule_post_updates
1737 .resource_group_updates
1738 .insert(*job_id as ObjectId, None);
1739 database_resource_group
1740 }
1741 JobResourceGroupTarget::Keep => {
1742 self.metadata_manager
1743 .catalog_controller
1744 .get_existing_job_resource_group(*job_id as _)
1745 .await?
1746 }
1747 };
1748
1749 let filtered_worker_ids =
1750 filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1751
1752 if filtered_worker_ids.is_empty() {
1753 bail!("Cannot resize streaming_job {job_id} to empty worker set")
1754 }
1755
1756 job_parallelism_updates.insert(
1757 *job_id,
1758 JobUpdate {
1759 filtered_worker_ids,
1760 parallelism,
1761 },
1762 );
1763 }
1764
1765 let mut no_shuffle_source_fragment_ids = HashSet::new();
1767 let mut no_shuffle_target_fragment_ids = HashSet::new();
1768
1769 let mut fragment_distribution_map = HashMap::new();
1771 let mut actor_location = HashMap::new();
1773 let mut table_fragment_id_map = HashMap::new();
1775 let mut fragment_actor_id_map = HashMap::new();
1777
1778 async fn build_index(
1779 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1780 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1781 fragment_distribution_map: &mut HashMap<
1782 FragmentId,
1783 (FragmentDistributionType, VnodeCount),
1784 >,
1785 actor_location: &mut HashMap<ActorId, WorkerId>,
1786 table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1787 fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1788 mgr: &MetadataManager,
1789 table_ids: Vec<ObjectId>,
1790 ) -> Result<(), MetaError> {
1791 let RescheduleWorkingSet {
1792 fragments,
1793 actors,
1794 actor_dispatchers: _actor_dispatchers,
1795 fragment_downstreams,
1796 fragment_upstreams: _fragment_upstreams,
1797 related_jobs: _related_jobs,
1798 job_resource_groups: _job_resource_groups,
1799 } = mgr
1800 .catalog_controller
1801 .resolve_working_set_for_reschedule_tables(table_ids)
1802 .await?;
1803
1804 for (fragment_id, downstreams) in fragment_downstreams {
1805 for (downstream_fragment_id, dispatcher_type) in downstreams {
1806 if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1807 no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1808 no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1809 }
1810 }
1811 }
1812
1813 for (fragment_id, fragment) in fragments {
1814 fragment_distribution_map.insert(
1815 fragment_id as FragmentId,
1816 (
1817 FragmentDistributionType::from(fragment.distribution_type),
1818 fragment.vnode_count as _,
1819 ),
1820 );
1821
1822 table_fragment_id_map
1823 .entry(fragment.job_id as u32)
1824 .or_default()
1825 .insert(fragment_id as FragmentId);
1826 }
1827
1828 for (actor_id, actor) in actors {
1829 actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1830 fragment_actor_id_map
1831 .entry(actor.fragment_id as FragmentId)
1832 .or_default()
1833 .insert(actor_id as ActorId);
1834 }
1835
1836 Ok(())
1837 }
1838
1839 let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1840
1841 build_index(
1842 &mut no_shuffle_source_fragment_ids,
1843 &mut no_shuffle_target_fragment_ids,
1844 &mut fragment_distribution_map,
1845 &mut actor_location,
1846 &mut table_fragment_id_map,
1847 &mut fragment_actor_id_map,
1848 &self.metadata_manager,
1849 table_ids,
1850 )
1851 .await?;
1852 tracing::debug!(
1853 ?job_reschedule_post_updates,
1854 ?job_parallelism_updates,
1855 ?no_shuffle_source_fragment_ids,
1856 ?no_shuffle_target_fragment_ids,
1857 ?fragment_distribution_map,
1858 ?actor_location,
1859 ?table_fragment_id_map,
1860 ?fragment_actor_id_map,
1861 "generate_table_resize_plan, after build_index"
1862 );
1863
1864 let adaptive_parallelism_strategy = self
1865 .env
1866 .system_params_reader()
1867 .await
1868 .adaptive_parallelism_strategy();
1869
1870 let mut target_plan = HashMap::new();
1871
1872 for (
1873 table_id,
1874 JobUpdate {
1875 filtered_worker_ids,
1876 parallelism,
1877 },
1878 ) in job_parallelism_updates
1879 {
1880 let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1881
1882 let available_worker_slots = workers
1883 .iter()
1884 .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1885 .map(|(_, worker)| (worker.id as WorkerId, worker.compute_node_parallelism()))
1886 .collect::<BTreeMap<_, _>>();
1887
1888 for fragment_id in fragment_map {
1889 if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1891 continue;
1892 }
1893
1894 let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1895
1896 for actor_id in &fragment_actor_id_map[&fragment_id] {
1897 let worker_id = actor_location[actor_id];
1898 *fragment_slots.entry(worker_id).or_default() += 1;
1899 }
1900
1901 let available_slot_count: usize = available_worker_slots.values().cloned().sum();
1902
1903 if available_slot_count == 0 {
1904 bail!(
1905 "No schedulable slots available for fragment {}",
1906 fragment_id
1907 );
1908 }
1909
1910 let (dist, vnode_count) = fragment_distribution_map[&fragment_id];
1911 let max_parallelism = vnode_count;
1912
1913 match dist {
1914 FragmentDistributionType::Unspecified => unreachable!(),
1915 FragmentDistributionType::Single => {
1916 let (single_worker_id, should_be_one) = fragment_slots
1917 .iter()
1918 .exactly_one()
1919 .expect("single fragment should have only one worker slot");
1920
1921 assert_eq!(*should_be_one, 1);
1922
1923 let units = schedule_units_for_slots(&available_worker_slots, 1, table_id)?;
1924
1925 let (chosen_target_worker_id, should_be_one) =
1926 units.iter().exactly_one().ok().with_context(|| {
1927 format!(
1928 "Cannot find a single target worker for fragment {fragment_id}"
1929 )
1930 })?;
1931
1932 assert_eq!(*should_be_one, 1);
1933
1934 if *chosen_target_worker_id == *single_worker_id {
1935 tracing::debug!(
1936 "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1937 );
1938 continue;
1939 }
1940
1941 target_plan.insert(
1942 fragment_id,
1943 WorkerReschedule {
1944 worker_actor_diff: BTreeMap::from_iter(vec![
1945 (*chosen_target_worker_id, 1),
1946 (*single_worker_id, -1),
1947 ]),
1948 },
1949 );
1950 }
1951 FragmentDistributionType::Hash => match parallelism {
1952 TableParallelism::Adaptive => {
1953 let target_slot_count = adaptive_parallelism_strategy
1954 .compute_target_parallelism(available_slot_count);
1955
1956 if target_slot_count > max_parallelism {
1957 tracing::warn!(
1958 "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1959 );
1960 let target_worker_slots = schedule_units_for_slots(
1962 &available_worker_slots,
1963 max_parallelism,
1964 table_id,
1965 )?;
1966
1967 target_plan.insert(
1968 fragment_id,
1969 Self::diff_worker_slot_changes(
1970 &fragment_slots,
1971 &target_worker_slots,
1972 ),
1973 );
1974 } else if available_slot_count != target_slot_count {
1975 tracing::info!(
1976 "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
1977 );
1978 let target_worker_slots = schedule_units_for_slots(
1979 &available_worker_slots,
1980 target_slot_count,
1981 table_id,
1982 )?;
1983
1984 target_plan.insert(
1985 fragment_id,
1986 Self::diff_worker_slot_changes(
1987 &fragment_slots,
1988 &target_worker_slots,
1989 ),
1990 );
1991 } else {
1992 target_plan.insert(
1993 fragment_id,
1994 Self::diff_worker_slot_changes(
1995 &fragment_slots,
1996 &available_worker_slots,
1997 ),
1998 );
1999 }
2000 }
2001 TableParallelism::Fixed(mut n) => {
2002 if n > max_parallelism {
2003 tracing::warn!(
2004 "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2005 );
2006 n = max_parallelism
2007 }
2008
2009 let target_worker_slots =
2010 schedule_units_for_slots(&available_worker_slots, n, table_id)?;
2011
2012 target_plan.insert(
2013 fragment_id,
2014 Self::diff_worker_slot_changes(
2015 &fragment_slots,
2016 &target_worker_slots,
2017 ),
2018 );
2019 }
2020 TableParallelism::Custom => {
2021 }
2023 },
2024 }
2025 }
2026 }
2027
2028 target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2029 tracing::debug!(
2030 ?target_plan,
2031 "generate_table_resize_plan finished target_plan"
2032 );
2033
2034 Ok(JobReschedulePlan {
2035 reschedules: target_plan,
2036 post_updates: job_reschedule_post_updates,
2037 })
2038 }
2039
2040 fn diff_worker_slot_changes(
2041 fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2042 target_worker_slots: &BTreeMap<WorkerId, usize>,
2043 ) -> WorkerReschedule {
2044 let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2045 let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2046
2047 for (&worker_id, &target_slots) in target_worker_slots {
2048 let ¤t_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2049
2050 if target_slots > current_slots {
2051 increased_actor_count.insert(worker_id, target_slots - current_slots);
2052 }
2053 }
2054
2055 for (&worker_id, ¤t_slots) in fragment_worker_slots {
2056 let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2057
2058 if current_slots > target_slots {
2059 decreased_actor_count.insert(worker_id, current_slots - target_slots);
2060 }
2061 }
2062
2063 let worker_ids: HashSet<_> = increased_actor_count
2064 .keys()
2065 .chain(decreased_actor_count.keys())
2066 .cloned()
2067 .collect();
2068
2069 let mut worker_actor_diff = BTreeMap::new();
2070
2071 for worker_id in worker_ids {
2072 let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2073 let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2074 let change = increased - decreased;
2075
2076 assert_ne!(change, 0);
2077
2078 worker_actor_diff.insert(worker_id, change);
2079 }
2080
2081 WorkerReschedule { worker_actor_diff }
2082 }
2083
2084 fn build_no_shuffle_relation_index(
2085 actor_map: &HashMap<ActorId, CustomActorInfo>,
2086 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2087 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2088 ) {
2089 let mut fragment_cache = HashSet::new();
2090 for actor in actor_map.values() {
2091 if fragment_cache.contains(&actor.fragment_id) {
2092 continue;
2093 }
2094
2095 for dispatcher in &actor.dispatcher {
2096 for downstream_actor_id in &dispatcher.downstream_actor_id {
2097 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2098 if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2100 no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2101 no_shuffle_target_fragment_ids
2102 .insert(downstream_actor.fragment_id as FragmentId);
2103 }
2104 }
2105 }
2106 }
2107
2108 fragment_cache.insert(actor.fragment_id);
2109 }
2110 }
2111
2112 fn build_fragment_dispatcher_index(
2113 actor_map: &HashMap<ActorId, CustomActorInfo>,
2114 fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2115 ) {
2116 for actor in actor_map.values() {
2117 for dispatcher in &actor.dispatcher {
2118 for downstream_actor_id in &dispatcher.downstream_actor_id {
2119 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2120 fragment_dispatcher_map
2121 .entry(actor.fragment_id as FragmentId)
2122 .or_default()
2123 .insert(
2124 downstream_actor.fragment_id as FragmentId,
2125 dispatcher.r#type().into(),
2126 );
2127 }
2128 }
2129 }
2130 }
2131 }
2132
2133 pub fn resolve_no_shuffle_upstream_tables(
2134 fragment_ids: HashSet<FragmentId>,
2135 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2136 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2137 fragment_to_table: &HashMap<FragmentId, TableId>,
2138 fragment_upstreams: &HashMap<
2139 risingwave_meta_model::FragmentId,
2140 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2141 >,
2142 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2143 ) -> MetaResult<()> {
2144 let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2145
2146 let mut fragment_ids = fragment_ids;
2147
2148 while let Some(fragment_id) = queue.pop_front() {
2151 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2152 continue;
2153 }
2154
2155 for upstream_fragment_id in fragment_upstreams
2157 .get(&(fragment_id as _))
2158 .map(|upstreams| upstreams.keys())
2159 .into_iter()
2160 .flatten()
2161 {
2162 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2163 let upstream_fragment_id = &upstream_fragment_id;
2164 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2165 continue;
2166 }
2167
2168 let table_id = &fragment_to_table[&fragment_id];
2169 let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2170
2171 if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2173 if let Some(upstream_table_parallelism) =
2174 table_parallelisms.get(upstream_table_id)
2175 {
2176 if upstream_table_parallelism != &TableParallelism::Custom {
2177 bail!(
2178 "Cannot change upstream table {} from {:?} to {:?}",
2179 upstream_table_id,
2180 upstream_table_parallelism,
2181 TableParallelism::Custom
2182 )
2183 }
2184 } else {
2185 table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2186 }
2187 }
2188
2189 fragment_ids.insert(*upstream_fragment_id);
2190 queue.push_back(*upstream_fragment_id);
2191 }
2192 }
2193
2194 let downstream_fragment_ids = fragment_ids
2195 .iter()
2196 .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2197
2198 let downstream_table_ids = downstream_fragment_ids
2199 .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2200 .collect::<HashSet<_>>();
2201
2202 table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2203
2204 Ok(())
2205 }
2206
2207 pub fn resolve_no_shuffle_upstream_fragments<T>(
2208 reschedule: &mut HashMap<FragmentId, T>,
2209 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2210 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2211 fragment_upstreams: &HashMap<
2212 risingwave_meta_model::FragmentId,
2213 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2214 >,
2215 ) -> MetaResult<()>
2216 where
2217 T: Clone + Eq,
2218 {
2219 let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2220
2221 while let Some(fragment_id) = queue.pop_front() {
2224 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2225 continue;
2226 }
2227
2228 for upstream_fragment_id in fragment_upstreams
2230 .get(&(fragment_id as _))
2231 .map(|upstreams| upstreams.keys())
2232 .into_iter()
2233 .flatten()
2234 {
2235 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2236 let upstream_fragment_id = &upstream_fragment_id;
2237 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2238 continue;
2239 }
2240
2241 let reschedule_plan = &reschedule[&fragment_id];
2242
2243 if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2244 if upstream_reschedule_plan != reschedule_plan {
2245 bail!(
2246 "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2247 fragment_id,
2248 upstream_fragment_id
2249 );
2250 }
2251
2252 continue;
2253 }
2254
2255 reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2256
2257 queue.push_back(*upstream_fragment_id);
2258 }
2259 }
2260
2261 reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2262
2263 Ok(())
2264 }
2265
2266 pub async fn resolve_related_no_shuffle_jobs(
2267 &self,
2268 jobs: &[TableId],
2269 ) -> MetaResult<HashSet<TableId>> {
2270 let RescheduleWorkingSet { related_jobs, .. } = self
2271 .metadata_manager
2272 .catalog_controller
2273 .resolve_working_set_for_reschedule_tables(
2274 jobs.iter().map(|id| id.table_id as _).collect(),
2275 )
2276 .await?;
2277
2278 Ok(related_jobs
2279 .keys()
2280 .map(|id| TableId::new(*id as _))
2281 .collect())
2282 }
2283}
2284
2285#[derive(Debug, Clone)]
2286pub enum JobParallelismTarget {
2287 Update(TableParallelism),
2288 Refresh,
2289}
2290
2291#[derive(Debug, Clone)]
2292pub enum JobResourceGroupTarget {
2293 Update(Option<String>),
2294 Keep,
2295}
2296
2297#[derive(Debug, Clone)]
2298pub struct JobRescheduleTarget {
2299 pub parallelism: JobParallelismTarget,
2300 pub resource_group: JobResourceGroupTarget,
2301}
2302
2303#[derive(Debug)]
2304pub struct JobReschedulePolicy {
2305 pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2306}
2307
2308#[derive(Debug, Clone)]
2310pub struct JobReschedulePostUpdates {
2311 pub parallelism_updates: HashMap<TableId, TableParallelism>,
2312 pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2313}
2314
2315#[derive(Debug)]
2316pub struct JobReschedulePlan {
2317 pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2318 pub post_updates: JobReschedulePostUpdates,
2319}
2320
2321impl GlobalStreamManager {
2322 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2323 self.scale_controller.reschedule_lock.read().await
2324 }
2325
2326 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2327 self.scale_controller.reschedule_lock.write().await
2328 }
2329
2330 pub async fn reschedule_actors(
2338 &self,
2339 database_id: DatabaseId,
2340 plan: JobReschedulePlan,
2341 options: RescheduleOptions,
2342 ) -> MetaResult<()> {
2343 let JobReschedulePlan {
2344 reschedules,
2345 mut post_updates,
2346 } = plan;
2347
2348 let reschedule_fragment = self
2349 .scale_controller
2350 .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2351 .await?;
2352
2353 tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2354
2355 let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2356 .iter()
2357 .flat_map(|(_, reschedule)| {
2358 reschedule
2359 .upstream_fragment_dispatcher_ids
2360 .iter()
2361 .map(|(fragment_id, _)| *fragment_id)
2362 .chain(reschedule.downstream_fragment_ids.iter().cloned())
2363 })
2364 .collect();
2365
2366 let fragment_actors =
2367 try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2368 let actor_ids = self
2369 .metadata_manager
2370 .get_running_actors_of_fragment(*fragment_id)
2371 .await?;
2372 Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2373 }))
2374 .await?
2375 .into_iter()
2376 .collect();
2377
2378 let command = Command::RescheduleFragment {
2379 reschedules: reschedule_fragment,
2380 fragment_actors,
2381 post_updates,
2382 };
2383
2384 let _guard = self.source_manager.pause_tick().await;
2385
2386 self.barrier_scheduler
2387 .run_command(database_id, command)
2388 .await?;
2389
2390 tracing::info!("reschedule done");
2391
2392 Ok(())
2393 }
2394
2395 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2404 tracing::info!("trigger parallelism control");
2405
2406 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2407
2408 let background_streaming_jobs = self
2409 .metadata_manager
2410 .list_background_creating_jobs()
2411 .await?;
2412
2413 let skipped_jobs = if !background_streaming_jobs.is_empty() {
2414 let jobs = self
2415 .scale_controller
2416 .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2417 .await?;
2418
2419 tracing::info!(
2420 "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2421 background_streaming_jobs,
2422 jobs
2423 );
2424
2425 jobs
2426 } else {
2427 HashSet::new()
2428 };
2429
2430 let job_ids: HashSet<_> = {
2431 let streaming_parallelisms = self
2432 .metadata_manager
2433 .catalog_controller
2434 .get_all_streaming_parallelisms()
2435 .await?;
2436
2437 streaming_parallelisms
2438 .into_iter()
2439 .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2440 .map(|(table_id, _)| table_id)
2441 .collect()
2442 };
2443
2444 let workers = self
2445 .metadata_manager
2446 .cluster_controller
2447 .list_active_streaming_workers()
2448 .await?;
2449
2450 let schedulable_worker_ids: BTreeSet<_> = workers
2451 .iter()
2452 .filter(|worker| {
2453 !worker
2454 .property
2455 .as_ref()
2456 .map(|p| p.is_unschedulable)
2457 .unwrap_or(false)
2458 })
2459 .map(|worker| worker.id as WorkerId)
2460 .collect();
2461
2462 if job_ids.is_empty() {
2463 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2464 return Ok(false);
2465 }
2466
2467 let batch_size = match self.env.opts.parallelism_control_batch_size {
2468 0 => job_ids.len(),
2469 n => n,
2470 };
2471
2472 tracing::info!(
2473 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2474 job_ids.len(),
2475 batch_size,
2476 schedulable_worker_ids
2477 );
2478
2479 let batches: Vec<_> = job_ids
2480 .into_iter()
2481 .chunks(batch_size)
2482 .into_iter()
2483 .map(|chunk| chunk.collect_vec())
2484 .collect();
2485
2486 let mut reschedules = None;
2487
2488 for batch in batches {
2489 let targets: HashMap<_, _> = batch
2490 .into_iter()
2491 .map(|job_id| {
2492 (
2493 job_id as u32,
2494 JobRescheduleTarget {
2495 parallelism: JobParallelismTarget::Refresh,
2496 resource_group: JobResourceGroupTarget::Keep,
2497 },
2498 )
2499 })
2500 .collect();
2501
2502 let plan = self
2503 .scale_controller
2504 .generate_job_reschedule_plan(JobReschedulePolicy { targets })
2505 .await?;
2506
2507 if !plan.reschedules.is_empty() {
2508 tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2509 reschedules = Some(plan);
2510 break;
2511 }
2512 }
2513
2514 let Some(plan) = reschedules else {
2515 tracing::info!("no reschedule plan generated");
2516 return Ok(false);
2517 };
2518
2519 for (database_id, reschedules) in self
2521 .metadata_manager
2522 .split_fragment_map_by_database(plan.reschedules)
2523 .await?
2524 {
2525 self.reschedule_actors(
2526 database_id,
2527 JobReschedulePlan {
2528 reschedules,
2529 post_updates: plan.post_updates.clone(),
2530 },
2531 RescheduleOptions {
2532 resolve_no_shuffle_upstream: false,
2533 skip_create_new_actors: false,
2534 },
2535 )
2536 .await?;
2537 }
2538
2539 Ok(true)
2540 }
2541
2542 async fn run(&self, mut shutdown_rx: Receiver<()>) {
2544 tracing::info!("starting automatic parallelism control monitor");
2545
2546 let check_period =
2547 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2548
2549 let mut ticker = tokio::time::interval_at(
2550 Instant::now()
2551 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2552 check_period,
2553 );
2554 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2555
2556 ticker.tick().await;
2558
2559 let (local_notification_tx, mut local_notification_rx) =
2560 tokio::sync::mpsc::unbounded_channel();
2561
2562 self.env
2563 .notification_manager()
2564 .insert_local_sender(local_notification_tx)
2565 .await;
2566
2567 let worker_nodes = self
2568 .metadata_manager
2569 .list_active_streaming_compute_nodes()
2570 .await
2571 .expect("list active streaming compute nodes");
2572
2573 let mut worker_cache: BTreeMap<_, _> = worker_nodes
2574 .into_iter()
2575 .map(|worker| (worker.id, worker))
2576 .collect();
2577
2578 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2579
2580 let mut should_trigger = false;
2581
2582 loop {
2583 tokio::select! {
2584 biased;
2585
2586 _ = &mut shutdown_rx => {
2587 tracing::info!("Stream manager is stopped");
2588 break;
2589 }
2590
2591 _ = ticker.tick(), if should_trigger => {
2592 let include_workers = worker_cache.keys().copied().collect_vec();
2593
2594 if include_workers.is_empty() {
2595 tracing::debug!("no available worker nodes");
2596 should_trigger = false;
2597 continue;
2598 }
2599
2600 match self.trigger_parallelism_control().await {
2601 Ok(cont) => {
2602 should_trigger = cont;
2603 }
2604 Err(e) => {
2605 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2606 ticker.reset();
2607 }
2608 }
2609 }
2610
2611 notification = local_notification_rx.recv() => {
2612 let notification = notification.expect("local notification channel closed in loop of stream manager");
2613
2614 let worker_is_streaming_compute = |worker: &WorkerNode| {
2616 worker.get_type() == Ok(WorkerType::ComputeNode)
2617 && worker.property.as_ref().unwrap().is_streaming
2618 };
2619
2620 match notification {
2621 LocalNotification::SystemParamsChange(reader) => {
2622 let new_strategy = reader.adaptive_parallelism_strategy();
2623 if new_strategy != previous_adaptive_parallelism_strategy {
2624 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2625 should_trigger = true;
2626 previous_adaptive_parallelism_strategy = new_strategy;
2627 }
2628 }
2629 LocalNotification::WorkerNodeActivated(worker) => {
2630 if !worker_is_streaming_compute(&worker) {
2631 continue;
2632 }
2633
2634 tracing::info!(worker = worker.id, "worker activated notification received");
2635
2636 let prev_worker = worker_cache.insert(worker.id, worker.clone());
2637
2638 match prev_worker {
2639 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
2640 tracing::info!(worker = worker.id, "worker parallelism changed");
2641 should_trigger = true;
2642 }
2643 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
2644 tracing::info!(worker = worker.id, "worker label changed");
2645 should_trigger = true;
2646 }
2647 None => {
2648 tracing::info!(worker = worker.id, "new worker joined");
2649 should_trigger = true;
2650 }
2651 _ => {}
2652 }
2653 }
2654
2655 LocalNotification::WorkerNodeDeleted(worker) => {
2658 if !worker_is_streaming_compute(&worker) {
2659 continue;
2660 }
2661
2662 match worker_cache.remove(&worker.id) {
2663 Some(prev_worker) => {
2664 tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2665 }
2666 None => {
2667 tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2668 }
2669 }
2670 }
2671
2672 _ => {}
2673 }
2674 }
2675 }
2676 }
2677 }
2678
2679 pub fn start_auto_parallelism_monitor(
2680 self: Arc<Self>,
2681 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2682 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2683 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2684 let join_handle = tokio::spawn(async move {
2685 self.run(shutdown_rx).await;
2686 });
2687
2688 (join_handle, shutdown_tx)
2689 }
2690}
2691
2692pub fn schedule_units_for_slots(
2693 slots: &BTreeMap<WorkerId, usize>,
2694 total_unit_size: usize,
2695 salt: u32,
2696) -> MetaResult<BTreeMap<WorkerId, usize>> {
2697 let mut ch = ConsistentHashRing::new(salt);
2698
2699 for (worker_id, parallelism) in slots {
2700 ch.add_worker(*worker_id as _, *parallelism as u32);
2701 }
2702
2703 let target_distribution = ch.distribute_tasks(total_unit_size as u32)?;
2704
2705 Ok(target_distribution
2706 .into_iter()
2707 .map(|(worker_id, task_count)| (worker_id as WorkerId, task_count as usize))
2708 .collect())
2709}
2710
2711pub struct ConsistentHashRing {
2712 ring: BTreeMap<u64, u32>,
2713 weights: BTreeMap<u32, u32>,
2714 virtual_nodes: u32,
2715 salt: u32,
2716}
2717
2718impl ConsistentHashRing {
2719 fn new(salt: u32) -> Self {
2720 ConsistentHashRing {
2721 ring: BTreeMap::new(),
2722 weights: BTreeMap::new(),
2723 virtual_nodes: 1024,
2724 salt,
2725 }
2726 }
2727
2728 fn hash<T: Hash, S: Hash>(key: T, salt: S) -> u64 {
2729 let mut hasher = DefaultHasher::new();
2730 salt.hash(&mut hasher);
2731 key.hash(&mut hasher);
2732 hasher.finish()
2733 }
2734
2735 fn add_worker(&mut self, id: u32, weight: u32) {
2736 let virtual_nodes_count = self.virtual_nodes;
2737
2738 for i in 0..virtual_nodes_count {
2739 let virtual_node_key = (id, i);
2740 let hash = Self::hash(virtual_node_key, self.salt);
2741 self.ring.insert(hash, id);
2742 }
2743
2744 self.weights.insert(id, weight);
2745 }
2746
2747 fn distribute_tasks(&self, total_tasks: u32) -> MetaResult<BTreeMap<u32, u32>> {
2748 let total_weight = self.weights.values().sum::<u32>();
2749
2750 let mut soft_limits = HashMap::new();
2751 for (worker_id, worker_capacity) in &self.weights {
2752 soft_limits.insert(
2753 *worker_id,
2754 (total_tasks as f64 * (*worker_capacity as f64 / total_weight as f64)).ceil()
2755 as u32,
2756 );
2757 }
2758
2759 let mut task_distribution: BTreeMap<u32, u32> = BTreeMap::new();
2760 let mut task_hashes = (0..total_tasks)
2761 .map(|task_idx| Self::hash(task_idx, self.salt))
2762 .collect_vec();
2763
2764 task_hashes.sort();
2766
2767 for task_hash in task_hashes {
2768 let mut assigned = false;
2769
2770 let ring_range = self.ring.range(task_hash..).chain(self.ring.iter());
2772
2773 for (_, &worker_id) in ring_range {
2774 let task_limit = soft_limits[&worker_id];
2775
2776 let worker_task_count = task_distribution.entry(worker_id).or_insert(0);
2777
2778 if *worker_task_count < task_limit {
2779 *worker_task_count += 1;
2780 assigned = true;
2781 break;
2782 }
2783 }
2784
2785 if !assigned {
2786 bail!("Could not distribute tasks due to capacity constraints.");
2787 }
2788 }
2789
2790 Ok(task_distribution)
2791 }
2792}
2793
2794#[cfg(test)]
2795mod tests {
2796 use super::*;
2797
2798 const DEFAULT_SALT: u32 = 42;
2799
2800 #[test]
2801 fn test_single_worker_capacity() {
2802 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2803 ch.add_worker(1, 10);
2804
2805 let total_tasks = 5;
2806 let task_distribution = ch.distribute_tasks(total_tasks).unwrap();
2807
2808 assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 5);
2809 }
2810
2811 #[test]
2812 fn test_multiple_workers_even_distribution() {
2813 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2814
2815 ch.add_worker(1, 1);
2816 ch.add_worker(2, 1);
2817 ch.add_worker(3, 1);
2818
2819 let total_tasks = 3;
2820 let task_distribution = ch.distribute_tasks(total_tasks).unwrap();
2821
2822 for id in 1..=3 {
2823 assert_eq!(task_distribution.get(&id).cloned().unwrap_or(0), 1);
2824 }
2825 }
2826
2827 #[test]
2828 fn test_weighted_distribution() {
2829 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2830
2831 ch.add_worker(1, 2);
2832 ch.add_worker(2, 3);
2833 ch.add_worker(3, 5);
2834
2835 let total_tasks = 10;
2836 let task_distribution = ch.distribute_tasks(total_tasks).unwrap();
2837
2838 assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 2);
2839 assert_eq!(task_distribution.get(&2).cloned().unwrap_or(0), 3);
2840 assert_eq!(task_distribution.get(&3).cloned().unwrap_or(0), 5);
2841 }
2842
2843 #[test]
2844 fn test_over_capacity() {
2845 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2846
2847 ch.add_worker(1, 1);
2848 ch.add_worker(2, 2);
2849 ch.add_worker(3, 3);
2850
2851 let total_tasks = 10; let task_distribution = ch.distribute_tasks(total_tasks);
2853
2854 assert!(task_distribution.is_ok());
2855 }
2856
2857 #[test]
2858 fn test_balance_distribution() {
2859 for mut worker_capacity in 1..10 {
2860 for workers in 3..10 {
2861 let mut ring = ConsistentHashRing::new(DEFAULT_SALT);
2862
2863 for worker_id in 0..workers {
2864 ring.add_worker(worker_id, worker_capacity);
2865 }
2866
2867 if worker_capacity % 2 == 0 {
2871 worker_capacity /= 2;
2872 }
2873
2874 let total_tasks = worker_capacity * workers;
2875
2876 let task_distribution = ring.distribute_tasks(total_tasks).unwrap();
2877
2878 for (_, v) in task_distribution {
2879 assert_eq!(v, worker_capacity);
2880 }
2881 }
2882 }
2883 }
2884}