1use std::cmp::{Ordering, min};
16use std::collections::hash_map::DefaultHasher;
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21use std::time::Duration;
22
23use anyhow::{Context, anyhow};
24use itertools::Itertools;
25use num_integer::Integer;
26use num_traits::abs;
27use risingwave_common::bail;
28use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::hash::ActorMapping;
31use risingwave_common::util::iter_util::ZipEqDebug;
32use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
33use risingwave_pb::common::{WorkerNode, WorkerType};
34use risingwave_pb::meta::FragmentWorkerSlotMappings;
35use risingwave_pb::meta::subscribe_response::{Info, Operation};
36use risingwave_pb::meta::table_fragments::fragment::{
37 FragmentDistributionType, PbFragmentDistributionType,
38};
39use risingwave_pb::meta::table_fragments::{self, State};
40use risingwave_pb::stream_plan::{
41 Dispatcher, FragmentTypeFlag, PbDispatcher, PbDispatcherType, StreamNode,
42};
43use thiserror_ext::AsReport;
44use tokio::sync::oneshot::Receiver;
45use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
46use tokio::task::JoinHandle;
47use tokio::time::{Instant, MissedTickBehavior};
48
49use crate::barrier::{Command, Reschedule};
50use crate::controller::scale::RescheduleWorkingSet;
51use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
52use crate::model::{
53 ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
54};
55use crate::serving::{
56 ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
57};
58use crate::stream::{GlobalStreamManager, SourceManagerRef};
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone, Eq, PartialEq)]
62pub struct WorkerReschedule {
63 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
64}
65
66pub struct CustomFragmentInfo {
67 pub fragment_id: u32,
68 pub fragment_type_mask: u32,
69 pub distribution_type: PbFragmentDistributionType,
70 pub state_table_ids: Vec<u32>,
71 pub node: StreamNode,
72 pub actor_template: StreamActorWithDispatchers,
73 pub actors: Vec<CustomActorInfo>,
74}
75
76#[derive(Default, Clone)]
77pub struct CustomActorInfo {
78 pub actor_id: u32,
79 pub fragment_id: u32,
80 pub dispatcher: Vec<Dispatcher>,
81 pub vnode_bitmap: Option<Bitmap>,
83}
84
85impl CustomFragmentInfo {
86 pub fn get_fragment_type_mask(&self) -> u32 {
87 self.fragment_type_mask
88 }
89
90 pub fn distribution_type(&self) -> FragmentDistributionType {
91 self.distribution_type
92 }
93}
94
95use educe::Educe;
96use futures::future::try_join_all;
97use risingwave_common::system_param::AdaptiveParallelismStrategy;
98use risingwave_common::system_param::reader::SystemParamsRead;
99use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
100use risingwave_meta_model::actor_dispatcher::DispatcherType;
101use risingwave_pb::stream_plan::stream_node::NodeBody;
102
103use super::SourceChange;
104use crate::controller::id::IdCategory;
105use crate::controller::utils::filter_workers_by_resource_group;
106
107#[derive(Educe)]
109#[educe(Debug)]
110pub struct RescheduleContext {
111 #[educe(Debug(ignore))]
113 actor_map: HashMap<ActorId, CustomActorInfo>,
114 actor_status: BTreeMap<ActorId, WorkerId>,
116 #[educe(Debug(ignore))]
118 fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
119 stream_source_fragment_ids: HashSet<FragmentId>,
121 stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
123 no_shuffle_target_fragment_ids: HashSet<FragmentId>,
125 no_shuffle_source_fragment_ids: HashSet<FragmentId>,
127 fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
129 fragment_upstreams: HashMap<
130 risingwave_meta_model::FragmentId,
131 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
132 >,
133}
134
135impl RescheduleContext {
136 fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
137 self.actor_status
138 .get(actor_id)
139 .cloned()
140 .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
141 }
142}
143
144pub fn rebalance_actor_vnode(
171 actors: &[CustomActorInfo],
172 actors_to_remove: &BTreeSet<ActorId>,
173 actors_to_create: &BTreeSet<ActorId>,
174) -> HashMap<ActorId, Bitmap> {
175 let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
176
177 assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
178 assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
179
180 assert!(actors.len() >= actors_to_remove.len());
181
182 let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
183 assert!(target_actor_count > 0);
184
185 let vnode_count = actors[0]
187 .vnode_bitmap
188 .as_ref()
189 .expect("vnode bitmap unset")
190 .len();
191
192 #[derive(Debug)]
194 struct Balance {
195 actor_id: ActorId,
196 balance: i32,
197 builder: BitmapBuilder,
198 }
199 let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
200
201 tracing::debug!(
202 "expected {}, remain {}, prev actors {}, target actors {}",
203 expected,
204 remain,
205 actors.len(),
206 target_actor_count,
207 );
208
209 let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
210 .iter()
211 .map(|actor| {
212 (
213 actor.actor_id as ActorId,
214 actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
215 )
216 })
217 .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
218
219 let order_by_bitmap_desc =
220 |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
221 bitmap_a
222 .count_ones()
223 .cmp(&bitmap_b.count_ones())
224 .reverse()
225 .then(id_a.cmp(id_b))
226 };
227
228 let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
229 let mut builder = BitmapBuilder::default();
230 builder.append_bitmap(bitmap);
231 builder
232 };
233
234 let (prev_expected, _) = vnode_count.div_rem(&actors.len());
235
236 let prev_remain = removed
237 .iter()
238 .map(|(_, bitmap)| {
239 assert!(bitmap.count_ones() >= prev_expected);
240 bitmap.count_ones() - prev_expected
241 })
242 .sum::<usize>();
243
244 removed.sort_by(order_by_bitmap_desc);
245 rest.sort_by(order_by_bitmap_desc);
246
247 let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
248 actor_id,
249 balance: bitmap.count_ones() as i32,
250 builder: builder_from_bitmap(&bitmap),
251 });
252
253 let mut rest_balances = rest
254 .into_iter()
255 .map(|(actor_id, bitmap)| Balance {
256 actor_id,
257 balance: bitmap.count_ones() as i32 - expected as i32,
258 builder: builder_from_bitmap(&bitmap),
259 })
260 .collect_vec();
261
262 let mut created_balances = actors_to_create
263 .iter()
264 .map(|actor_id| Balance {
265 actor_id: *actor_id,
266 balance: -(expected as i32),
267 builder: BitmapBuilder::zeroed(vnode_count),
268 })
269 .collect_vec();
270
271 for balance in created_balances
272 .iter_mut()
273 .rev()
274 .take(prev_remain)
275 .chain(rest_balances.iter_mut())
276 {
277 if remain > 0 {
278 balance.balance -= 1;
279 remain -= 1;
280 }
281 }
282
283 for balance in &mut created_balances {
285 if remain > 0 {
286 balance.balance -= 1;
287 remain -= 1;
288 }
289 }
290
291 assert_eq!(remain, 0);
292
293 let mut v: VecDeque<_> = removed_balances
294 .chain(rest_balances)
295 .chain(created_balances)
296 .collect();
297
298 let mut result = HashMap::with_capacity(target_actor_count);
301
302 for balance in &v {
303 tracing::debug!(
304 "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
305 balance.actor_id,
306 balance.balance,
307 actors_to_remove.contains(&balance.actor_id),
308 actors_to_create.contains(&balance.actor_id)
309 );
310 }
311
312 while !v.is_empty() {
313 if v.len() == 1 {
314 let single = v.pop_front().unwrap();
315 assert_eq!(single.balance, 0);
316 if !actors_to_remove.contains(&single.actor_id) {
317 result.insert(single.actor_id, single.builder.finish());
318 }
319
320 continue;
321 }
322
323 let mut src = v.pop_front().unwrap();
324 let mut dst = v.pop_back().unwrap();
325
326 let n = min(abs(src.balance), abs(dst.balance));
327
328 let mut moved = 0;
329 for idx in (0..vnode_count).rev() {
330 if moved >= n {
331 break;
332 }
333
334 if src.builder.is_set(idx) {
335 src.builder.set(idx, false);
336 assert!(!dst.builder.is_set(idx));
337 dst.builder.set(idx, true);
338 moved += 1;
339 }
340 }
341
342 src.balance -= n;
343 dst.balance += n;
344
345 if src.balance != 0 {
346 v.push_front(src);
347 } else if !actors_to_remove.contains(&src.actor_id) {
348 result.insert(src.actor_id, src.builder.finish());
349 }
350
351 if dst.balance != 0 {
352 v.push_back(dst);
353 } else {
354 result.insert(dst.actor_id, dst.builder.finish());
355 }
356 }
357
358 result
359}
360
361#[derive(Debug, Clone, Copy)]
362pub struct RescheduleOptions {
363 pub resolve_no_shuffle_upstream: bool,
365
366 pub skip_create_new_actors: bool,
368}
369
370pub type ScaleControllerRef = Arc<ScaleController>;
371
372pub struct ScaleController {
373 pub metadata_manager: MetadataManager,
374
375 pub source_manager: SourceManagerRef,
376
377 pub env: MetaSrvEnv,
378
379 pub reschedule_lock: RwLock<()>,
382}
383
384impl ScaleController {
385 pub fn new(
386 metadata_manager: &MetadataManager,
387 source_manager: SourceManagerRef,
388 env: MetaSrvEnv,
389 ) -> Self {
390 Self {
391 metadata_manager: metadata_manager.clone(),
392 source_manager,
393 env,
394 reschedule_lock: RwLock::new(()),
395 }
396 }
397
398 pub async fn integrity_check(&self) -> MetaResult<()> {
399 self.metadata_manager
400 .catalog_controller
401 .integrity_check()
402 .await
403 }
404
405 async fn build_reschedule_context(
407 &self,
408 reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
409 options: RescheduleOptions,
410 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
411 ) -> MetaResult<RescheduleContext> {
412 let worker_nodes: HashMap<WorkerId, WorkerNode> = self
413 .metadata_manager
414 .list_active_streaming_compute_nodes()
415 .await?
416 .into_iter()
417 .map(|worker_node| (worker_node.id as _, worker_node))
418 .collect();
419
420 if worker_nodes.is_empty() {
421 bail!("no available compute node in the cluster");
422 }
423
424 let unschedulable_worker_ids: HashSet<_> = worker_nodes
426 .values()
427 .filter(|w| {
428 w.property
429 .as_ref()
430 .map(|property| property.is_unschedulable)
431 .unwrap_or(false)
432 })
433 .map(|worker| worker.id as WorkerId)
434 .collect();
435
436 for (fragment_id, reschedule) in &*reschedule {
437 for (worker_id, change) in &reschedule.worker_actor_diff {
438 if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
439 bail!(
440 "unable to move fragment {} to unschedulable worker {}",
441 fragment_id,
442 worker_id
443 );
444 }
445 }
446 }
447
448 let mut actor_map = HashMap::new();
451 let mut fragment_map = HashMap::new();
453 let mut actor_status = BTreeMap::new();
455 let mut fragment_state = HashMap::new();
456 let mut fragment_to_table = HashMap::new();
457
458 fn fulfill_index_by_fragment_ids(
459 actor_map: &mut HashMap<u32, CustomActorInfo>,
460 fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
461 actor_status: &mut BTreeMap<ActorId, WorkerId>,
462 fragment_state: &mut HashMap<FragmentId, State>,
463 fragment_to_table: &mut HashMap<FragmentId, TableId>,
464 fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
465 actors: HashMap<ActorId, actor::Model>,
466 mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
467 related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
468 ) {
469 let mut fragment_actors: HashMap<
470 risingwave_meta_model::FragmentId,
471 Vec<CustomActorInfo>,
472 > = HashMap::new();
473
474 let mut expr_contexts = HashMap::new();
475 for (
476 _,
477 actor::Model {
478 actor_id,
479 fragment_id,
480 status: _,
481 splits: _,
482 worker_id,
483 vnode_bitmap,
484 expr_context,
485 ..
486 },
487 ) in actors
488 {
489 let dispatchers = actor_dispatchers
490 .remove(&(actor_id as _))
491 .unwrap_or_default();
492
493 let actor_info = CustomActorInfo {
494 actor_id: actor_id as _,
495 fragment_id: fragment_id as _,
496 dispatcher: dispatchers,
497 vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
498 };
499
500 actor_map.insert(actor_id as _, actor_info.clone());
501
502 fragment_actors
503 .entry(fragment_id as _)
504 .or_default()
505 .push(actor_info);
506
507 actor_status.insert(actor_id as _, worker_id as WorkerId);
508
509 expr_contexts.insert(actor_id as u32, expr_context);
510 }
511
512 for (
513 _,
514 fragment::Model {
515 fragment_id,
516 job_id,
517 fragment_type_mask,
518 distribution_type,
519 stream_node,
520 state_table_ids,
521 ..
522 },
523 ) in fragments
524 {
525 let actors = fragment_actors
526 .remove(&(fragment_id as _))
527 .unwrap_or_default();
528
529 let CustomActorInfo {
530 actor_id,
531 fragment_id,
532 dispatcher,
533 vnode_bitmap,
534 } = actors.first().unwrap().clone();
535
536 let (related_job, job_definition) =
537 related_jobs.get(&job_id).expect("job not found");
538
539 let fragment = CustomFragmentInfo {
540 fragment_id: fragment_id as _,
541 fragment_type_mask: fragment_type_mask as _,
542 distribution_type: distribution_type.into(),
543 state_table_ids: state_table_ids.into_u32_array(),
544 node: stream_node.to_protobuf(),
545 actor_template: (
546 StreamActor {
547 actor_id,
548 fragment_id: fragment_id as _,
549 vnode_bitmap,
550 mview_definition: job_definition.to_owned(),
551 expr_context: expr_contexts
552 .get(&actor_id)
553 .cloned()
554 .map(|expr_context| expr_context.to_protobuf()),
555 },
556 dispatcher,
557 ),
558 actors,
559 };
560
561 fragment_map.insert(fragment_id as _, fragment);
562
563 fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
564
565 fragment_state.insert(
566 fragment_id,
567 table_fragments::PbState::from(related_job.job_status),
568 );
569 }
570 }
571 let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
572 let working_set = self
573 .metadata_manager
574 .catalog_controller
575 .resolve_working_set_for_reschedule_fragments(fragment_ids)
576 .await?;
577
578 fulfill_index_by_fragment_ids(
579 &mut actor_map,
580 &mut fragment_map,
581 &mut actor_status,
582 &mut fragment_state,
583 &mut fragment_to_table,
584 working_set.fragments,
585 working_set.actors,
586 working_set.actor_dispatchers,
587 working_set.related_jobs,
588 );
589
590 let mut no_shuffle_source_fragment_ids = HashSet::new();
592 let mut no_shuffle_target_fragment_ids = HashSet::new();
593
594 Self::build_no_shuffle_relation_index(
595 &actor_map,
596 &mut no_shuffle_source_fragment_ids,
597 &mut no_shuffle_target_fragment_ids,
598 );
599
600 if options.resolve_no_shuffle_upstream {
601 let original_reschedule_keys = reschedule.keys().cloned().collect();
602
603 Self::resolve_no_shuffle_upstream_fragments(
604 reschedule,
605 &no_shuffle_source_fragment_ids,
606 &no_shuffle_target_fragment_ids,
607 &working_set.fragment_upstreams,
608 )?;
609
610 if !table_parallelisms.is_empty() {
611 Self::resolve_no_shuffle_upstream_tables(
613 original_reschedule_keys,
614 &no_shuffle_source_fragment_ids,
615 &no_shuffle_target_fragment_ids,
616 &fragment_to_table,
617 &working_set.fragment_upstreams,
618 table_parallelisms,
619 )?;
620 }
621 }
622
623 let mut fragment_dispatcher_map = HashMap::new();
624 Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
625
626 let mut stream_source_fragment_ids = HashSet::new();
627 let mut stream_source_backfill_fragment_ids = HashMap::new();
628 let mut no_shuffle_reschedule = HashMap::new();
629 for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
630 let fragment = fragment_map
631 .get(fragment_id)
632 .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
633
634 match fragment_state[fragment_id] {
636 table_fragments::State::Unspecified => unreachable!(),
637 state @ table_fragments::State::Initial => {
638 bail!(
639 "the materialized view of fragment {fragment_id} is in state {}",
640 state.as_str_name()
641 )
642 }
643 state @ table_fragments::State::Creating => {
644 let stream_node = &fragment.node;
645
646 let mut is_reschedulable = true;
647 visit_stream_node_cont(stream_node, |body| {
648 if let Some(NodeBody::StreamScan(node)) = &body.node_body {
649 if !node.stream_scan_type().is_reschedulable() {
650 is_reschedulable = false;
651
652 return false;
654 }
655
656 return true;
658 }
659
660 true
662 });
663
664 if !is_reschedulable {
665 bail!(
666 "the materialized view of fragment {fragment_id} is in state {}",
667 state.as_str_name()
668 )
669 }
670 }
671 table_fragments::State::Created => {}
672 }
673
674 if no_shuffle_target_fragment_ids.contains(fragment_id) {
675 bail!(
676 "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
677 );
678 }
679
680 if no_shuffle_source_fragment_ids.contains(fragment_id) {
685 let mut queue: VecDeque<_> = fragment_dispatcher_map
687 .get(fragment_id)
688 .unwrap()
689 .keys()
690 .cloned()
691 .collect();
692
693 while let Some(downstream_id) = queue.pop_front() {
694 if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
695 continue;
696 }
697
698 if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
699 {
700 let no_shuffle_downstreams = downstream_fragments
701 .iter()
702 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
703 .map(|(fragment_id, _)| fragment_id);
704
705 queue.extend(no_shuffle_downstreams.copied());
706 }
707
708 no_shuffle_reschedule.insert(
709 downstream_id,
710 WorkerReschedule {
711 worker_actor_diff: worker_actor_diff.clone(),
712 },
713 );
714 }
715 }
716
717 if (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0
718 && fragment.node.find_stream_source().is_some()
719 {
720 stream_source_fragment_ids.insert(*fragment_id);
721 }
722
723 let current_worker_ids = fragment
725 .actors
726 .iter()
727 .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
728 .collect::<HashSet<_>>();
729
730 for (removed, change) in worker_actor_diff {
731 if !current_worker_ids.contains(removed) && change.is_negative() {
732 bail!(
733 "no actor on the worker {} of fragment {}",
734 removed,
735 fragment_id
736 );
737 }
738 }
739
740 let added_actor_count: usize = worker_actor_diff
741 .values()
742 .filter(|change| change.is_positive())
743 .cloned()
744 .map(|change| change as usize)
745 .sum();
746
747 let removed_actor_count: usize = worker_actor_diff
748 .values()
749 .filter(|change| change.is_positive())
750 .cloned()
751 .map(|v| v.unsigned_abs())
752 .sum();
753
754 match fragment.distribution_type() {
755 FragmentDistributionType::Hash => {
756 if fragment.actors.len() + added_actor_count <= removed_actor_count {
757 bail!("can't remove all actors from fragment {}", fragment_id);
758 }
759 }
760 FragmentDistributionType::Single => {
761 if added_actor_count != removed_actor_count {
762 bail!("single distribution fragment only support migration");
763 }
764 }
765 FragmentDistributionType::Unspecified => unreachable!(),
766 }
767 }
768
769 if !no_shuffle_reschedule.is_empty() {
770 tracing::info!(
771 "reschedule plan rewritten with NoShuffle reschedule {:?}",
772 no_shuffle_reschedule
773 );
774
775 for noshuffle_downstream in no_shuffle_reschedule.keys() {
776 let fragment = fragment_map.get(noshuffle_downstream).unwrap();
777 if (fragment.fragment_type_mask & FragmentTypeFlag::SourceScan as u32) != 0 {
779 let stream_node = &fragment.node;
780 if let Some((_source_id, upstream_source_fragment_id)) =
781 stream_node.find_source_backfill()
782 {
783 stream_source_backfill_fragment_ids
784 .insert(fragment.fragment_id, upstream_source_fragment_id);
785 }
786 }
787 }
788 }
789
790 reschedule.extend(no_shuffle_reschedule.into_iter());
792
793 Ok(RescheduleContext {
794 actor_map,
795 actor_status,
796 fragment_map,
797 stream_source_fragment_ids,
798 stream_source_backfill_fragment_ids,
799 no_shuffle_target_fragment_ids,
800 no_shuffle_source_fragment_ids,
801 fragment_dispatcher_map,
802 fragment_upstreams: working_set.fragment_upstreams,
803 })
804 }
805
806 pub(crate) async fn analyze_reschedule_plan(
818 &self,
819 mut reschedules: HashMap<FragmentId, WorkerReschedule>,
820 options: RescheduleOptions,
821 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
822 ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
823 tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
824 let ctx = self
825 .build_reschedule_context(&mut reschedules, options, table_parallelisms)
826 .await?;
827 tracing::debug!("reschedule context: {:#?}", ctx);
828 let reschedules = reschedules;
829
830 let (fragment_actors_to_remove, fragment_actors_to_create) =
835 self.arrange_reschedules(&reschedules, &ctx)?;
836
837 let mut fragment_actor_bitmap = HashMap::new();
838 for fragment_id in reschedules.keys() {
839 if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
840 continue;
843 }
844
845 let actors_to_create = fragment_actors_to_create
846 .get(fragment_id)
847 .map(|map| map.iter().map(|(actor_id, _)| *actor_id).collect())
848 .unwrap_or_default();
849
850 let actors_to_remove = fragment_actors_to_remove
851 .get(fragment_id)
852 .map(|map| map.iter().map(|(actor_id, _)| *actor_id).collect())
853 .unwrap_or_default();
854
855 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
856
857 match fragment.distribution_type() {
858 FragmentDistributionType::Single => {
859 fragment_actor_bitmap
861 .insert(fragment.fragment_id as FragmentId, Default::default());
862 }
863 FragmentDistributionType::Hash => {
864 let actor_vnode = rebalance_actor_vnode(
865 &fragment.actors,
866 &actors_to_remove,
867 &actors_to_create,
868 );
869
870 fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
871 }
872
873 FragmentDistributionType::Unspecified => unreachable!(),
874 }
875 }
876
877 let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
881 for fragment_id in reschedules.keys() {
882 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
883 let mut new_actor_ids = BTreeMap::new();
884 for actor in &fragment.actors {
885 if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id) {
886 if actors_to_remove.contains_key(&actor.actor_id) {
887 continue;
888 }
889 }
890 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
891 new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
892 }
893
894 if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
895 for (actor_id, worker_id) in actors_to_create {
896 new_actor_ids.insert(*actor_id, *worker_id);
897 }
898 }
899
900 assert!(
901 !new_actor_ids.is_empty(),
902 "should be at least one actor in fragment {} after rescheduling",
903 fragment_id
904 );
905
906 fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
907 }
908
909 let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
910
911 fn arrange_no_shuffle_relation(
917 ctx: &RescheduleContext,
918 fragment_id: &FragmentId,
919 upstream_fragment_id: &FragmentId,
920 fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
921 actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
922 fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
923 no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
924 no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
925 ) {
926 if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
927 return;
928 }
929
930 let fragment = &ctx.fragment_map[fragment_id];
931
932 let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
933
934 for upstream_actor in &upstream_fragment.actors {
936 for dispatcher in &upstream_actor.dispatcher {
937 if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
938 let downstream_actor_id =
939 *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
940
941 if !ctx
943 .no_shuffle_target_fragment_ids
944 .contains(upstream_fragment_id)
945 {
946 actor_group_map.insert(
947 upstream_actor.actor_id,
948 (upstream_fragment.fragment_id, upstream_actor.actor_id),
949 );
950 actor_group_map.insert(
951 downstream_actor_id,
952 (upstream_fragment.fragment_id, upstream_actor.actor_id),
953 );
954 } else {
955 let root_actor_id = actor_group_map[&upstream_actor.actor_id];
956
957 actor_group_map.insert(downstream_actor_id, root_actor_id);
958 }
959 }
960 }
961 }
962
963 let upstream_fragment_bitmap = fragment_updated_bitmap
965 .get(upstream_fragment_id)
966 .cloned()
967 .unwrap_or_default();
968
969 if upstream_fragment.distribution_type() == FragmentDistributionType::Single {
971 assert!(
972 upstream_fragment_bitmap.is_empty(),
973 "single fragment should have no bitmap updates"
974 );
975 }
976
977 let upstream_fragment_actor_map = fragment_actors_after_reschedule
978 .get(upstream_fragment_id)
979 .cloned()
980 .unwrap();
981
982 let fragment_actor_map = fragment_actors_after_reschedule
983 .get(fragment_id)
984 .cloned()
985 .unwrap();
986
987 let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
988
989 let mut fragment_bitmap = HashMap::new();
991
992 for (actor_id, worker_id) in &fragment_actor_map {
993 if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
994 let root_bitmap = fragment_updated_bitmap
995 .get(root_fragment)
996 .expect("root fragment bitmap not found")
997 .get(root_actor_id)
998 .cloned()
999 .expect("root actor bitmap not found");
1000
1001 fragment_bitmap.insert(*actor_id, root_bitmap);
1003
1004 no_shuffle_upstream_actor_map
1005 .entry(*actor_id as ActorId)
1006 .or_default()
1007 .insert(*upstream_fragment_id, *root_actor_id);
1008 no_shuffle_downstream_actors_map
1009 .entry(*root_actor_id)
1010 .or_default()
1011 .insert(*fragment_id, *actor_id);
1012 } else {
1013 worker_reverse_index
1014 .entry(*worker_id)
1015 .or_default()
1016 .insert(*actor_id);
1017 }
1018 }
1019
1020 let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1021
1022 for (actor_id, worker_id) in &upstream_fragment_actor_map {
1023 if !actor_group_map.contains_key(actor_id) {
1024 upstream_worker_reverse_index
1025 .entry(*worker_id)
1026 .or_default()
1027 .insert(*actor_id);
1028 }
1029 }
1030
1031 for (worker_id, actor_ids) in worker_reverse_index {
1033 let upstream_actor_ids = upstream_worker_reverse_index
1034 .get(&worker_id)
1035 .unwrap()
1036 .clone();
1037
1038 assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1039
1040 for (actor_id, upstream_actor_id) in actor_ids
1041 .into_iter()
1042 .zip_eq_debug(upstream_actor_ids.into_iter())
1043 {
1044 match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1045 None => {
1046 assert_eq!(
1048 upstream_fragment.distribution_type(),
1049 FragmentDistributionType::Single
1050 );
1051 }
1052 Some(bitmap) => {
1053 fragment_bitmap.insert(actor_id, bitmap);
1055 }
1056 }
1057
1058 no_shuffle_upstream_actor_map
1059 .entry(actor_id as ActorId)
1060 .or_default()
1061 .insert(*upstream_fragment_id, upstream_actor_id);
1062 no_shuffle_downstream_actors_map
1063 .entry(upstream_actor_id)
1064 .or_default()
1065 .insert(*fragment_id, actor_id);
1066 }
1067 }
1068
1069 match fragment.distribution_type() {
1070 FragmentDistributionType::Hash => {}
1071 FragmentDistributionType::Single => {
1072 assert!(fragment_bitmap.is_empty());
1074 }
1075 FragmentDistributionType::Unspecified => unreachable!(),
1076 }
1077
1078 if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1079 assert_eq!(
1080 e.entry.get(),
1081 &e.value,
1082 "bitmaps derived from different no-shuffle upstreams mismatch"
1083 );
1084 }
1085
1086 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1088 let no_shuffle_downstreams = downstream_fragments
1089 .iter()
1090 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1091 .map(|(fragment_id, _)| fragment_id);
1092
1093 for downstream_fragment_id in no_shuffle_downstreams {
1094 arrange_no_shuffle_relation(
1095 ctx,
1096 downstream_fragment_id,
1097 fragment_id,
1098 fragment_actors_after_reschedule,
1099 actor_group_map,
1100 fragment_updated_bitmap,
1101 no_shuffle_upstream_actor_map,
1102 no_shuffle_downstream_actors_map,
1103 );
1104 }
1105 }
1106 }
1107
1108 let mut no_shuffle_upstream_actor_map = HashMap::new();
1109 let mut no_shuffle_downstream_actors_map = HashMap::new();
1110 let mut actor_group_map = HashMap::new();
1111 for fragment_id in reschedules.keys() {
1114 if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1115 && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1116 {
1117 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1118 for downstream_fragment_id in downstream_fragments.keys() {
1119 arrange_no_shuffle_relation(
1120 &ctx,
1121 downstream_fragment_id,
1122 fragment_id,
1123 &fragment_actors_after_reschedule,
1124 &mut actor_group_map,
1125 &mut fragment_actor_bitmap,
1126 &mut no_shuffle_upstream_actor_map,
1127 &mut no_shuffle_downstream_actors_map,
1128 );
1129 }
1130 }
1131 }
1132 }
1133
1134 tracing::debug!("actor group map {:?}", actor_group_map);
1135
1136 let mut new_created_actors = HashMap::new();
1137 for fragment_id in reschedules.keys() {
1138 let actors_to_create = fragment_actors_to_create
1139 .get(fragment_id)
1140 .cloned()
1141 .unwrap_or_default();
1142
1143 let fragment = &ctx.fragment_map[fragment_id];
1144
1145 assert!(!fragment.actors.is_empty());
1146
1147 for actor_to_create in &actors_to_create {
1148 let new_actor_id = actor_to_create.0;
1149 let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1150
1151 new_actor.actor_id = *new_actor_id;
1155
1156 Self::modify_actor_upstream_and_downstream(
1157 &ctx,
1158 &fragment_actors_to_remove,
1159 &fragment_actors_to_create,
1160 &fragment_actor_bitmap,
1161 &no_shuffle_downstream_actors_map,
1162 &mut new_actor,
1163 &mut dispatchers,
1164 )?;
1165
1166 if let Some(bitmap) = fragment_actor_bitmap
1167 .get(fragment_id)
1168 .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1169 {
1170 new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1171 }
1172
1173 new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1174 }
1175 }
1176
1177 let mut fragment_actor_splits = HashMap::new();
1180 for fragment_id in reschedules.keys() {
1181 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1182
1183 if ctx.stream_source_fragment_ids.contains(fragment_id) {
1184 let fragment = &ctx.fragment_map[fragment_id];
1185
1186 let prev_actor_ids = fragment
1187 .actors
1188 .iter()
1189 .map(|actor| actor.actor_id)
1190 .collect_vec();
1191
1192 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1193
1194 let actor_splits = self
1195 .source_manager
1196 .migrate_splits_for_source_actors(
1197 *fragment_id,
1198 &prev_actor_ids,
1199 &curr_actor_ids,
1200 )
1201 .await?;
1202
1203 tracing::debug!(
1204 "source actor splits: {:?}, fragment_id: {}",
1205 actor_splits,
1206 fragment_id
1207 );
1208 fragment_actor_splits.insert(*fragment_id, actor_splits);
1209 }
1210 }
1211 if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1213 for fragment_id in reschedules.keys() {
1214 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1215
1216 if let Some(upstream_source_fragment_id) =
1217 ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1218 {
1219 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1220
1221 let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1222 *fragment_id,
1223 *upstream_source_fragment_id,
1224 &curr_actor_ids,
1225 &fragment_actor_splits,
1226 &no_shuffle_upstream_actor_map,
1227 )?;
1228 tracing::debug!(
1229 "source backfill actor splits: {:?}, fragment_id: {}",
1230 actor_splits,
1231 fragment_id
1232 );
1233 fragment_actor_splits.insert(*fragment_id, actor_splits);
1234 }
1235 }
1236 }
1237
1238 let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1240 HashMap::with_capacity(reschedules.len());
1241
1242 for (fragment_id, _) in reschedules {
1243 let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1244
1245 if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1246 for (actor_id, worker_id) in actor_worker_maps {
1247 actors_to_create
1248 .entry(worker_id)
1249 .or_default()
1250 .push(actor_id);
1251 }
1252 }
1253
1254 let actors_to_remove = fragment_actors_to_remove
1255 .get(&fragment_id)
1256 .cloned()
1257 .unwrap_or_default()
1258 .into_keys()
1259 .collect();
1260
1261 let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1262
1263 assert!(!actors_after_reschedule.is_empty());
1264
1265 let fragment = &ctx.fragment_map[&fragment_id];
1266
1267 let in_degree_types: HashSet<_> = ctx
1268 .fragment_upstreams
1269 .get(&(fragment_id as _))
1270 .map(|upstreams| upstreams.values())
1271 .into_iter()
1272 .flatten()
1273 .cloned()
1274 .collect();
1275
1276 let upstream_dispatcher_mapping = match fragment.distribution_type() {
1277 FragmentDistributionType::Hash => {
1278 if !in_degree_types.contains(&DispatcherType::Hash) {
1279 None
1280 } else {
1281 Some(ActorMapping::from_bitmaps(
1283 &fragment_actor_bitmap[&fragment_id],
1284 ))
1285 }
1286 }
1287
1288 FragmentDistributionType::Single => {
1289 assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1290 None
1291 }
1292 FragmentDistributionType::Unspecified => unreachable!(),
1293 };
1294
1295 let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1296
1297 {
1298 if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1299 for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1300 match upstream_dispatcher_type {
1301 DispatcherType::NoShuffle => {}
1302 _ => {
1303 upstream_fragment_dispatcher_set.insert((
1304 *upstream_fragment_id as FragmentId,
1305 fragment.fragment_id as DispatcherId,
1306 ));
1307 }
1308 }
1309 }
1310 }
1311 }
1312
1313 let downstream_fragment_ids = if let Some(downstream_fragments) =
1314 ctx.fragment_dispatcher_map.get(&fragment_id)
1315 {
1316 downstream_fragments
1319 .iter()
1320 .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1321 .map(|(fragment_id, _)| *fragment_id)
1322 .collect_vec()
1323 } else {
1324 vec![]
1325 };
1326
1327 let vnode_bitmap_updates = match fragment.distribution_type() {
1328 FragmentDistributionType::Hash => {
1329 let mut vnode_bitmap_updates =
1330 fragment_actor_bitmap.remove(&fragment_id).unwrap();
1331
1332 for actor_id in actors_after_reschedule.keys() {
1335 assert!(vnode_bitmap_updates.contains_key(actor_id));
1336
1337 if let Some(actor) = ctx.actor_map.get(actor_id) {
1339 let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1340
1341 if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref() {
1342 if prev_bitmap.eq(bitmap) {
1343 vnode_bitmap_updates.remove(actor_id);
1344 }
1345 }
1346 }
1347 }
1348
1349 vnode_bitmap_updates
1350 }
1351 FragmentDistributionType::Single => HashMap::new(),
1352 FragmentDistributionType::Unspecified => unreachable!(),
1353 };
1354
1355 let upstream_fragment_dispatcher_ids =
1356 upstream_fragment_dispatcher_set.into_iter().collect_vec();
1357
1358 let actor_splits = fragment_actor_splits
1359 .get(&fragment_id)
1360 .cloned()
1361 .unwrap_or_default();
1362
1363 reschedule_fragment.insert(
1364 fragment_id,
1365 Reschedule {
1366 added_actors: actors_to_create,
1367 removed_actors: actors_to_remove,
1368 vnode_bitmap_updates,
1369 upstream_fragment_dispatcher_ids,
1370 upstream_dispatcher_mapping,
1371 downstream_fragment_ids,
1372 actor_splits,
1373 newly_created_actors: Default::default(),
1374 },
1375 );
1376 }
1377
1378 let mut fragment_created_actors = HashMap::new();
1379 for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1380 let mut created_actors = HashMap::new();
1381 for (actor_id, worker_id) in actors_to_create {
1382 let actor = new_created_actors.get(actor_id).cloned().unwrap();
1383 created_actors.insert(*actor_id, (actor, *worker_id));
1384 }
1385
1386 fragment_created_actors.insert(*fragment_id, created_actors);
1387 }
1388
1389 for (fragment_id, to_create) in fragment_created_actors {
1390 let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1391 reschedule.newly_created_actors = to_create;
1392 }
1393 tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1394
1395 Ok(reschedule_fragment)
1396 }
1397
1398 #[expect(clippy::type_complexity)]
1399 fn arrange_reschedules(
1400 &self,
1401 reschedule: &HashMap<FragmentId, WorkerReschedule>,
1402 ctx: &RescheduleContext,
1403 ) -> MetaResult<(
1404 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1405 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1406 )> {
1407 let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1408 let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1409
1410 for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1411 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1412
1413 let mut actors_to_remove = BTreeMap::new();
1415 let mut actors_to_create = BTreeMap::new();
1416
1417 let mut worker_to_actors = HashMap::new();
1419
1420 for actor in &fragment.actors {
1421 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1422 worker_to_actors
1423 .entry(worker_id)
1424 .or_insert(BTreeSet::new())
1425 .insert(actor.actor_id as ActorId);
1426 }
1427
1428 let decreased_actor_count = worker_actor_diff
1429 .iter()
1430 .filter(|(_, change)| change.is_negative())
1431 .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1432
1433 for (worker_id, n) in decreased_actor_count {
1434 if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1435 if actor_ids.len() < n {
1436 bail!(
1437 "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1438 fragment_id,
1439 worker_id,
1440 actor_ids.len(),
1441 n
1442 );
1443 }
1444
1445 let removed_actors: Vec<_> = actor_ids
1446 .iter()
1447 .skip(actor_ids.len().saturating_sub(n))
1448 .cloned()
1449 .collect();
1450
1451 for actor in removed_actors {
1452 actors_to_remove.insert(actor, *worker_id);
1453 }
1454 }
1455 }
1456
1457 let increased_actor_count = worker_actor_diff
1458 .iter()
1459 .filter(|(_, change)| change.is_positive());
1460
1461 for (worker, n) in increased_actor_count {
1462 for _ in 0..*n {
1463 let id = self
1464 .env
1465 .id_gen_manager()
1466 .generate_interval::<{ IdCategory::Actor }>(1)
1467 as ActorId;
1468 actors_to_create.insert(id, *worker);
1469 }
1470 }
1471
1472 if !actors_to_remove.is_empty() {
1473 fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1474 }
1475
1476 if !actors_to_create.is_empty() {
1477 fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1478 }
1479 }
1480
1481 for actors_to_remove in fragment_actors_to_remove.values() {
1483 for actor_id in actors_to_remove.keys() {
1484 let actor = ctx.actor_map.get(actor_id).unwrap();
1485 for dispatcher in &actor.dispatcher {
1486 if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1487 let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1488
1489 let _should_exists = fragment_actors_to_remove
1490 .get(&(dispatcher.dispatcher_id as FragmentId))
1491 .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1492 .get(downstream_actor_id)
1493 .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1494 }
1495 }
1496 }
1497 }
1498
1499 Ok((fragment_actors_to_remove, fragment_actors_to_create))
1500 }
1501
1502 fn modify_actor_upstream_and_downstream(
1505 ctx: &RescheduleContext,
1506 fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1507 fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1508 fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1509 no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1510 new_actor: &mut StreamActor,
1511 dispatchers: &mut Vec<PbDispatcher>,
1512 ) -> MetaResult<()> {
1513 for dispatcher in dispatchers {
1515 let downstream_fragment_id = dispatcher
1516 .downstream_actor_id
1517 .iter()
1518 .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1519 .dedup()
1520 .exactly_one()
1521 .unwrap() as FragmentId;
1522
1523 let downstream_fragment_actors_to_remove =
1524 fragment_actors_to_remove.get(&downstream_fragment_id);
1525 let downstream_fragment_actors_to_create =
1526 fragment_actors_to_create.get(&downstream_fragment_id);
1527
1528 match dispatcher.r#type() {
1529 d @ (PbDispatcherType::Hash
1530 | PbDispatcherType::Simple
1531 | PbDispatcherType::Broadcast) => {
1532 if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1533 {
1534 dispatcher
1535 .downstream_actor_id
1536 .retain(|id| !downstream_actors_to_remove.contains_key(id));
1537 }
1538
1539 if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1540 {
1541 dispatcher
1542 .downstream_actor_id
1543 .extend(downstream_actors_to_create.keys().cloned())
1544 }
1545
1546 if d == PbDispatcherType::Simple {
1548 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1549 }
1550 }
1551 PbDispatcherType::NoShuffle => {
1552 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1553 let downstream_actor_id = no_shuffle_downstream_actors_map
1554 .get(&new_actor.actor_id)
1555 .and_then(|map| map.get(&downstream_fragment_id))
1556 .unwrap();
1557 dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1558 }
1559 PbDispatcherType::Unspecified => unreachable!(),
1560 }
1561
1562 if let Some(mapping) = dispatcher.hash_mapping.as_mut() {
1563 if let Some(downstream_updated_bitmap) =
1564 fragment_actor_bitmap.get(&downstream_fragment_id)
1565 {
1566 *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1568 }
1569 }
1570 }
1571
1572 Ok(())
1573 }
1574
1575 pub async fn post_apply_reschedule(
1576 &self,
1577 reschedules: &HashMap<FragmentId, Reschedule>,
1578 post_updates: &JobReschedulePostUpdates,
1579 ) -> MetaResult<()> {
1580 self.metadata_manager
1582 .post_apply_reschedules(reschedules.clone(), post_updates)
1583 .await?;
1584
1585 if !reschedules.is_empty() {
1587 let workers = self
1588 .metadata_manager
1589 .list_active_serving_compute_nodes()
1590 .await?;
1591 let streaming_parallelisms = self
1592 .metadata_manager
1593 .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1594 .await?;
1595 let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1596 let (upserted, failed) =
1597 serving_worker_slot_mapping.upsert(streaming_parallelisms, &workers);
1598 if !upserted.is_empty() {
1599 tracing::debug!(
1600 "Update serving vnode mapping for fragments {:?}.",
1601 upserted.keys()
1602 );
1603 self.env
1604 .notification_manager()
1605 .notify_frontend_without_version(
1606 Operation::Update,
1607 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1608 mappings: to_fragment_worker_slot_mapping(&upserted),
1609 }),
1610 );
1611 }
1612 if !failed.is_empty() {
1613 tracing::debug!(
1614 "Fail to update serving vnode mapping for fragments {:?}.",
1615 failed
1616 );
1617 self.env
1618 .notification_manager()
1619 .notify_frontend_without_version(
1620 Operation::Delete,
1621 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1622 mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1623 }),
1624 );
1625 }
1626 }
1627
1628 let mut stream_source_actor_splits = HashMap::new();
1629 let mut stream_source_dropped_actors = HashSet::new();
1630
1631 for (fragment_id, reschedule) in reschedules {
1633 if !reschedule.actor_splits.is_empty() {
1634 stream_source_actor_splits
1635 .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1636 stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1637 }
1638 }
1639
1640 if !stream_source_actor_splits.is_empty() {
1641 self.source_manager
1642 .apply_source_change(SourceChange::Reschedule {
1643 split_assignment: stream_source_actor_splits,
1644 dropped_actors: stream_source_dropped_actors,
1645 })
1646 .await;
1647 }
1648
1649 Ok(())
1650 }
1651
1652 pub async fn generate_job_reschedule_plan(
1653 &self,
1654 policy: JobReschedulePolicy,
1655 ) -> MetaResult<JobReschedulePlan> {
1656 type VnodeCount = usize;
1657
1658 let JobReschedulePolicy { targets } = policy;
1659
1660 let workers = self
1661 .metadata_manager
1662 .list_active_streaming_compute_nodes()
1663 .await?;
1664
1665 let workers: HashMap<_, _> = workers
1667 .into_iter()
1668 .filter(|worker| worker.is_streaming_schedulable())
1669 .map(|worker| (worker.id, worker))
1670 .collect();
1671
1672 #[derive(Debug)]
1673 struct JobUpdate {
1674 filtered_worker_ids: BTreeSet<WorkerId>,
1675 parallelism: TableParallelism,
1676 }
1677
1678 let mut job_parallelism_updates = HashMap::new();
1679
1680 let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1681 parallelism_updates: Default::default(),
1682 resource_group_updates: Default::default(),
1683 };
1684
1685 for (
1686 job_id,
1687 JobRescheduleTarget {
1688 parallelism: parallelism_update,
1689 resource_group: resource_group_update,
1690 },
1691 ) in &targets
1692 {
1693 let parallelism = match parallelism_update {
1694 JobParallelismTarget::Update(parallelism) => *parallelism,
1695 JobParallelismTarget::Refresh => {
1696 let parallelism = self
1697 .metadata_manager
1698 .catalog_controller
1699 .get_job_streaming_parallelisms(*job_id as _)
1700 .await?;
1701
1702 parallelism.into()
1703 }
1704 };
1705
1706 job_reschedule_post_updates
1707 .parallelism_updates
1708 .insert(TableId::from(*job_id), parallelism);
1709
1710 let current_resource_group = match resource_group_update {
1711 JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1712 job_reschedule_post_updates.resource_group_updates.insert(
1713 *job_id as ObjectId,
1714 Some(specific_resource_group.to_owned()),
1715 );
1716
1717 specific_resource_group.to_owned()
1718 }
1719 JobResourceGroupTarget::Update(None) => {
1720 let database_resource_group = self
1721 .metadata_manager
1722 .catalog_controller
1723 .get_existing_job_database_resource_group(*job_id as _)
1724 .await?;
1725
1726 job_reschedule_post_updates
1727 .resource_group_updates
1728 .insert(*job_id as ObjectId, None);
1729 database_resource_group
1730 }
1731 JobResourceGroupTarget::Keep => {
1732 self.metadata_manager
1733 .catalog_controller
1734 .get_existing_job_resource_group(*job_id as _)
1735 .await?
1736 }
1737 };
1738
1739 let filtered_worker_ids =
1740 filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1741
1742 if filtered_worker_ids.is_empty() {
1743 bail!("Cannot resize streaming_job {job_id} to empty worker set")
1744 }
1745
1746 job_parallelism_updates.insert(
1747 *job_id,
1748 JobUpdate {
1749 filtered_worker_ids,
1750 parallelism,
1751 },
1752 );
1753 }
1754
1755 let mut no_shuffle_source_fragment_ids = HashSet::new();
1757 let mut no_shuffle_target_fragment_ids = HashSet::new();
1758
1759 let mut fragment_distribution_map = HashMap::new();
1761 let mut actor_location = HashMap::new();
1763 let mut table_fragment_id_map = HashMap::new();
1765 let mut fragment_actor_id_map = HashMap::new();
1767
1768 async fn build_index(
1769 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1770 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1771 fragment_distribution_map: &mut HashMap<
1772 FragmentId,
1773 (FragmentDistributionType, VnodeCount),
1774 >,
1775 actor_location: &mut HashMap<ActorId, WorkerId>,
1776 table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1777 fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1778 mgr: &MetadataManager,
1779 table_ids: Vec<ObjectId>,
1780 ) -> Result<(), MetaError> {
1781 let RescheduleWorkingSet {
1782 fragments,
1783 actors,
1784 actor_dispatchers: _actor_dispatchers,
1785 fragment_downstreams,
1786 fragment_upstreams: _fragment_upstreams,
1787 related_jobs: _related_jobs,
1788 job_resource_groups: _job_resource_groups,
1789 } = mgr
1790 .catalog_controller
1791 .resolve_working_set_for_reschedule_tables(table_ids)
1792 .await?;
1793
1794 for (fragment_id, downstreams) in fragment_downstreams {
1795 for (downstream_fragment_id, dispatcher_type) in downstreams {
1796 if let risingwave_meta_model::actor_dispatcher::DispatcherType::NoShuffle =
1797 dispatcher_type
1798 {
1799 no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1800 no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1801 }
1802 }
1803 }
1804
1805 for (fragment_id, fragment) in fragments {
1806 fragment_distribution_map.insert(
1807 fragment_id as FragmentId,
1808 (
1809 FragmentDistributionType::from(fragment.distribution_type),
1810 fragment.vnode_count as _,
1811 ),
1812 );
1813
1814 table_fragment_id_map
1815 .entry(fragment.job_id as u32)
1816 .or_default()
1817 .insert(fragment_id as FragmentId);
1818 }
1819
1820 for (actor_id, actor) in actors {
1821 actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1822 fragment_actor_id_map
1823 .entry(actor.fragment_id as FragmentId)
1824 .or_default()
1825 .insert(actor_id as ActorId);
1826 }
1827
1828 Ok(())
1829 }
1830
1831 let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1832
1833 build_index(
1834 &mut no_shuffle_source_fragment_ids,
1835 &mut no_shuffle_target_fragment_ids,
1836 &mut fragment_distribution_map,
1837 &mut actor_location,
1838 &mut table_fragment_id_map,
1839 &mut fragment_actor_id_map,
1840 &self.metadata_manager,
1841 table_ids,
1842 )
1843 .await?;
1844 tracing::debug!(
1845 ?job_reschedule_post_updates,
1846 ?job_parallelism_updates,
1847 ?no_shuffle_source_fragment_ids,
1848 ?no_shuffle_target_fragment_ids,
1849 ?fragment_distribution_map,
1850 ?actor_location,
1851 ?table_fragment_id_map,
1852 ?fragment_actor_id_map,
1853 "generate_table_resize_plan, after build_index"
1854 );
1855
1856 let adaptive_parallelism_strategy = self
1857 .env
1858 .system_params_reader()
1859 .await
1860 .adaptive_parallelism_strategy();
1861
1862 let mut target_plan = HashMap::new();
1863
1864 for (
1865 table_id,
1866 JobUpdate {
1867 filtered_worker_ids,
1868 parallelism,
1869 },
1870 ) in job_parallelism_updates
1871 {
1872 let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1873
1874 let available_worker_slots = workers
1875 .iter()
1876 .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1877 .map(|(_, worker)| (worker.id as WorkerId, worker.compute_node_parallelism()))
1878 .collect::<BTreeMap<_, _>>();
1879
1880 for fragment_id in fragment_map {
1881 if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1883 continue;
1884 }
1885
1886 let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1887
1888 for actor_id in &fragment_actor_id_map[&fragment_id] {
1889 let worker_id = actor_location[actor_id];
1890 *fragment_slots.entry(worker_id).or_default() += 1;
1891 }
1892
1893 let available_slot_count: usize = available_worker_slots.values().cloned().sum();
1894
1895 if available_slot_count == 0 {
1896 bail!(
1897 "No schedulable slots available for fragment {}",
1898 fragment_id
1899 );
1900 }
1901
1902 let (dist, vnode_count) = fragment_distribution_map[&fragment_id];
1903 let max_parallelism = vnode_count;
1904
1905 match dist {
1906 FragmentDistributionType::Unspecified => unreachable!(),
1907 FragmentDistributionType::Single => {
1908 let (single_worker_id, should_be_one) = fragment_slots
1909 .iter()
1910 .exactly_one()
1911 .expect("single fragment should have only one worker slot");
1912
1913 assert_eq!(*should_be_one, 1);
1914
1915 let units = schedule_units_for_slots(&available_worker_slots, 1, table_id)?;
1916
1917 let (chosen_target_worker_id, should_be_one) =
1918 units.iter().exactly_one().ok().with_context(|| {
1919 format!(
1920 "Cannot find a single target worker for fragment {fragment_id}"
1921 )
1922 })?;
1923
1924 assert_eq!(*should_be_one, 1);
1925
1926 if *chosen_target_worker_id == *single_worker_id {
1927 tracing::debug!(
1928 "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1929 );
1930 continue;
1931 }
1932
1933 target_plan.insert(
1934 fragment_id,
1935 WorkerReschedule {
1936 worker_actor_diff: BTreeMap::from_iter(vec![
1937 (*chosen_target_worker_id, 1),
1938 (*single_worker_id, -1),
1939 ]),
1940 },
1941 );
1942 }
1943 FragmentDistributionType::Hash => match parallelism {
1944 TableParallelism::Adaptive => {
1945 let target_slot_count = adaptive_parallelism_strategy
1946 .compute_target_parallelism(available_slot_count);
1947
1948 if target_slot_count > max_parallelism {
1949 tracing::warn!(
1950 "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1951 );
1952 let target_worker_slots = schedule_units_for_slots(
1954 &available_worker_slots,
1955 max_parallelism,
1956 table_id,
1957 )?;
1958
1959 target_plan.insert(
1960 fragment_id,
1961 Self::diff_worker_slot_changes(
1962 &fragment_slots,
1963 &target_worker_slots,
1964 ),
1965 );
1966 } else if available_slot_count != target_slot_count {
1967 tracing::info!(
1968 "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
1969 );
1970 let target_worker_slots = schedule_units_for_slots(
1971 &available_worker_slots,
1972 target_slot_count,
1973 table_id,
1974 )?;
1975
1976 target_plan.insert(
1977 fragment_id,
1978 Self::diff_worker_slot_changes(
1979 &fragment_slots,
1980 &target_worker_slots,
1981 ),
1982 );
1983 } else {
1984 target_plan.insert(
1985 fragment_id,
1986 Self::diff_worker_slot_changes(
1987 &fragment_slots,
1988 &available_worker_slots,
1989 ),
1990 );
1991 }
1992 }
1993 TableParallelism::Fixed(mut n) => {
1994 if n > max_parallelism {
1995 tracing::warn!(
1996 "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1997 );
1998 n = max_parallelism
1999 }
2000
2001 let target_worker_slots =
2002 schedule_units_for_slots(&available_worker_slots, n, table_id)?;
2003
2004 target_plan.insert(
2005 fragment_id,
2006 Self::diff_worker_slot_changes(
2007 &fragment_slots,
2008 &target_worker_slots,
2009 ),
2010 );
2011 }
2012 TableParallelism::Custom => {
2013 }
2015 },
2016 }
2017 }
2018 }
2019
2020 target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2021 tracing::debug!(
2022 ?target_plan,
2023 "generate_table_resize_plan finished target_plan"
2024 );
2025
2026 Ok(JobReschedulePlan {
2027 reschedules: target_plan,
2028 post_updates: job_reschedule_post_updates,
2029 })
2030 }
2031
2032 fn diff_worker_slot_changes(
2033 fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2034 target_worker_slots: &BTreeMap<WorkerId, usize>,
2035 ) -> WorkerReschedule {
2036 let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2037 let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2038
2039 for (&worker_id, &target_slots) in target_worker_slots {
2040 let ¤t_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2041
2042 if target_slots > current_slots {
2043 increased_actor_count.insert(worker_id, target_slots - current_slots);
2044 }
2045 }
2046
2047 for (&worker_id, ¤t_slots) in fragment_worker_slots {
2048 let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2049
2050 if current_slots > target_slots {
2051 decreased_actor_count.insert(worker_id, current_slots - target_slots);
2052 }
2053 }
2054
2055 let worker_ids: HashSet<_> = increased_actor_count
2056 .keys()
2057 .chain(decreased_actor_count.keys())
2058 .cloned()
2059 .collect();
2060
2061 let mut worker_actor_diff = BTreeMap::new();
2062
2063 for worker_id in worker_ids {
2064 let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2065 let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2066 let change = increased - decreased;
2067
2068 assert_ne!(change, 0);
2069
2070 worker_actor_diff.insert(worker_id, change);
2071 }
2072
2073 WorkerReschedule { worker_actor_diff }
2074 }
2075
2076 fn build_no_shuffle_relation_index(
2077 actor_map: &HashMap<ActorId, CustomActorInfo>,
2078 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2079 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2080 ) {
2081 let mut fragment_cache = HashSet::new();
2082 for actor in actor_map.values() {
2083 if fragment_cache.contains(&actor.fragment_id) {
2084 continue;
2085 }
2086
2087 for dispatcher in &actor.dispatcher {
2088 for downstream_actor_id in &dispatcher.downstream_actor_id {
2089 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2090 if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2092 no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2093 no_shuffle_target_fragment_ids
2094 .insert(downstream_actor.fragment_id as FragmentId);
2095 }
2096 }
2097 }
2098 }
2099
2100 fragment_cache.insert(actor.fragment_id);
2101 }
2102 }
2103
2104 fn build_fragment_dispatcher_index(
2105 actor_map: &HashMap<ActorId, CustomActorInfo>,
2106 fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2107 ) {
2108 for actor in actor_map.values() {
2109 for dispatcher in &actor.dispatcher {
2110 for downstream_actor_id in &dispatcher.downstream_actor_id {
2111 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2112 fragment_dispatcher_map
2113 .entry(actor.fragment_id as FragmentId)
2114 .or_default()
2115 .insert(
2116 downstream_actor.fragment_id as FragmentId,
2117 dispatcher.r#type().into(),
2118 );
2119 }
2120 }
2121 }
2122 }
2123 }
2124
2125 pub fn resolve_no_shuffle_upstream_tables(
2126 fragment_ids: HashSet<FragmentId>,
2127 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2128 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2129 fragment_to_table: &HashMap<FragmentId, TableId>,
2130 fragment_upstreams: &HashMap<
2131 risingwave_meta_model::FragmentId,
2132 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2133 >,
2134 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2135 ) -> MetaResult<()> {
2136 let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2137
2138 let mut fragment_ids = fragment_ids;
2139
2140 while let Some(fragment_id) = queue.pop_front() {
2143 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2144 continue;
2145 }
2146
2147 for upstream_fragment_id in fragment_upstreams
2149 .get(&(fragment_id as _))
2150 .map(|upstreams| upstreams.keys())
2151 .into_iter()
2152 .flatten()
2153 {
2154 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2155 let upstream_fragment_id = &upstream_fragment_id;
2156 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2157 continue;
2158 }
2159
2160 let table_id = &fragment_to_table[&fragment_id];
2161 let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2162
2163 if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2165 if let Some(upstream_table_parallelism) =
2166 table_parallelisms.get(upstream_table_id)
2167 {
2168 if upstream_table_parallelism != &TableParallelism::Custom {
2169 bail!(
2170 "Cannot change upstream table {} from {:?} to {:?}",
2171 upstream_table_id,
2172 upstream_table_parallelism,
2173 TableParallelism::Custom
2174 )
2175 }
2176 } else {
2177 table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2178 }
2179 }
2180
2181 fragment_ids.insert(*upstream_fragment_id);
2182 queue.push_back(*upstream_fragment_id);
2183 }
2184 }
2185
2186 let downstream_fragment_ids = fragment_ids
2187 .iter()
2188 .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2189
2190 let downstream_table_ids = downstream_fragment_ids
2191 .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2192 .collect::<HashSet<_>>();
2193
2194 table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2195
2196 Ok(())
2197 }
2198
2199 pub fn resolve_no_shuffle_upstream_fragments<T>(
2200 reschedule: &mut HashMap<FragmentId, T>,
2201 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2202 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2203 fragment_upstreams: &HashMap<
2204 risingwave_meta_model::FragmentId,
2205 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2206 >,
2207 ) -> MetaResult<()>
2208 where
2209 T: Clone + Eq,
2210 {
2211 let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2212
2213 while let Some(fragment_id) = queue.pop_front() {
2216 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2217 continue;
2218 }
2219
2220 for upstream_fragment_id in fragment_upstreams
2222 .get(&(fragment_id as _))
2223 .map(|upstreams| upstreams.keys())
2224 .into_iter()
2225 .flatten()
2226 {
2227 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2228 let upstream_fragment_id = &upstream_fragment_id;
2229 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2230 continue;
2231 }
2232
2233 let reschedule_plan = &reschedule[&fragment_id];
2234
2235 if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2236 if upstream_reschedule_plan != reschedule_plan {
2237 bail!(
2238 "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2239 fragment_id,
2240 upstream_fragment_id
2241 );
2242 }
2243
2244 continue;
2245 }
2246
2247 reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2248
2249 queue.push_back(*upstream_fragment_id);
2250 }
2251 }
2252
2253 reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2254
2255 Ok(())
2256 }
2257
2258 pub async fn resolve_related_no_shuffle_jobs(
2259 &self,
2260 jobs: &[TableId],
2261 ) -> MetaResult<HashSet<TableId>> {
2262 let RescheduleWorkingSet { related_jobs, .. } = self
2263 .metadata_manager
2264 .catalog_controller
2265 .resolve_working_set_for_reschedule_tables(
2266 jobs.iter().map(|id| id.table_id as _).collect(),
2267 )
2268 .await?;
2269
2270 Ok(related_jobs
2271 .keys()
2272 .map(|id| TableId::new(*id as _))
2273 .collect())
2274 }
2275}
2276
2277#[derive(Debug, Clone)]
2278pub enum JobParallelismTarget {
2279 Update(TableParallelism),
2280 Refresh,
2281}
2282
2283#[derive(Debug, Clone)]
2284pub enum JobResourceGroupTarget {
2285 Update(Option<String>),
2286 Keep,
2287}
2288
2289#[derive(Debug, Clone)]
2290pub struct JobRescheduleTarget {
2291 pub parallelism: JobParallelismTarget,
2292 pub resource_group: JobResourceGroupTarget,
2293}
2294
2295#[derive(Debug)]
2296pub struct JobReschedulePolicy {
2297 pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2298}
2299
2300#[derive(Debug, Clone)]
2302pub struct JobReschedulePostUpdates {
2303 pub parallelism_updates: HashMap<TableId, TableParallelism>,
2304 pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2305}
2306
2307#[derive(Debug)]
2308pub struct JobReschedulePlan {
2309 pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2310 pub post_updates: JobReschedulePostUpdates,
2311}
2312
2313impl GlobalStreamManager {
2314 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2315 self.scale_controller.reschedule_lock.read().await
2316 }
2317
2318 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2319 self.scale_controller.reschedule_lock.write().await
2320 }
2321
2322 pub async fn reschedule_actors(
2330 &self,
2331 database_id: DatabaseId,
2332 plan: JobReschedulePlan,
2333 options: RescheduleOptions,
2334 ) -> MetaResult<()> {
2335 let JobReschedulePlan {
2336 reschedules,
2337 mut post_updates,
2338 } = plan;
2339
2340 let reschedule_fragment = self
2341 .scale_controller
2342 .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2343 .await?;
2344
2345 tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2346
2347 let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2348 .iter()
2349 .flat_map(|(_, reschedule)| {
2350 reschedule
2351 .upstream_fragment_dispatcher_ids
2352 .iter()
2353 .map(|(fragment_id, _)| *fragment_id)
2354 .chain(reschedule.downstream_fragment_ids.iter().cloned())
2355 })
2356 .collect();
2357
2358 let fragment_actors =
2359 try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2360 let actor_ids = self
2361 .metadata_manager
2362 .get_running_actors_of_fragment(*fragment_id)
2363 .await?;
2364 Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2365 }))
2366 .await?
2367 .into_iter()
2368 .collect();
2369
2370 let command = Command::RescheduleFragment {
2371 reschedules: reschedule_fragment,
2372 fragment_actors,
2373 post_updates,
2374 };
2375
2376 let _guard = self.source_manager.pause_tick().await;
2377
2378 self.barrier_scheduler
2379 .run_command(database_id, command)
2380 .await?;
2381
2382 tracing::info!("reschedule done");
2383
2384 Ok(())
2385 }
2386
2387 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2396 tracing::info!("trigger parallelism control");
2397
2398 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2399
2400 let background_streaming_jobs = self
2401 .metadata_manager
2402 .list_background_creating_jobs()
2403 .await?;
2404
2405 let skipped_jobs = if !background_streaming_jobs.is_empty() {
2406 let jobs = self
2407 .scale_controller
2408 .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2409 .await?;
2410
2411 tracing::info!(
2412 "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2413 background_streaming_jobs,
2414 jobs
2415 );
2416
2417 jobs
2418 } else {
2419 HashSet::new()
2420 };
2421
2422 let job_ids: HashSet<_> = {
2423 let streaming_parallelisms = self
2424 .metadata_manager
2425 .catalog_controller
2426 .get_all_streaming_parallelisms()
2427 .await?;
2428
2429 streaming_parallelisms
2430 .into_iter()
2431 .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2432 .map(|(table_id, _)| table_id)
2433 .collect()
2434 };
2435
2436 let workers = self
2437 .metadata_manager
2438 .cluster_controller
2439 .list_active_streaming_workers()
2440 .await?;
2441
2442 let schedulable_worker_ids: BTreeSet<_> = workers
2443 .iter()
2444 .filter(|worker| {
2445 !worker
2446 .property
2447 .as_ref()
2448 .map(|p| p.is_unschedulable)
2449 .unwrap_or(false)
2450 })
2451 .map(|worker| worker.id as WorkerId)
2452 .collect();
2453
2454 if job_ids.is_empty() {
2455 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2456 return Ok(false);
2457 }
2458
2459 let batch_size = match self.env.opts.parallelism_control_batch_size {
2460 0 => job_ids.len(),
2461 n => n,
2462 };
2463
2464 tracing::info!(
2465 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2466 job_ids.len(),
2467 batch_size,
2468 schedulable_worker_ids
2469 );
2470
2471 let batches: Vec<_> = job_ids
2472 .into_iter()
2473 .chunks(batch_size)
2474 .into_iter()
2475 .map(|chunk| chunk.collect_vec())
2476 .collect();
2477
2478 let mut reschedules = None;
2479
2480 for batch in batches {
2481 let targets: HashMap<_, _> = batch
2482 .into_iter()
2483 .map(|job_id| {
2484 (
2485 job_id as u32,
2486 JobRescheduleTarget {
2487 parallelism: JobParallelismTarget::Refresh,
2488 resource_group: JobResourceGroupTarget::Keep,
2489 },
2490 )
2491 })
2492 .collect();
2493
2494 let plan = self
2495 .scale_controller
2496 .generate_job_reschedule_plan(JobReschedulePolicy { targets })
2497 .await?;
2498
2499 if !plan.reschedules.is_empty() {
2500 tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2501 reschedules = Some(plan);
2502 break;
2503 }
2504 }
2505
2506 let Some(plan) = reschedules else {
2507 tracing::info!("no reschedule plan generated");
2508 return Ok(false);
2509 };
2510
2511 for (database_id, reschedules) in self
2513 .metadata_manager
2514 .split_fragment_map_by_database(plan.reschedules)
2515 .await?
2516 {
2517 self.reschedule_actors(
2518 database_id,
2519 JobReschedulePlan {
2520 reschedules,
2521 post_updates: plan.post_updates.clone(),
2522 },
2523 RescheduleOptions {
2524 resolve_no_shuffle_upstream: false,
2525 skip_create_new_actors: false,
2526 },
2527 )
2528 .await?;
2529 }
2530
2531 Ok(true)
2532 }
2533
2534 async fn run(&self, mut shutdown_rx: Receiver<()>) {
2536 tracing::info!("starting automatic parallelism control monitor");
2537
2538 let check_period =
2539 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2540
2541 let mut ticker = tokio::time::interval_at(
2542 Instant::now()
2543 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2544 check_period,
2545 );
2546 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2547
2548 ticker.tick().await;
2550
2551 let (local_notification_tx, mut local_notification_rx) =
2552 tokio::sync::mpsc::unbounded_channel();
2553
2554 self.env
2555 .notification_manager()
2556 .insert_local_sender(local_notification_tx)
2557 .await;
2558
2559 let worker_nodes = self
2560 .metadata_manager
2561 .list_active_streaming_compute_nodes()
2562 .await
2563 .expect("list active streaming compute nodes");
2564
2565 let mut worker_cache: BTreeMap<_, _> = worker_nodes
2566 .into_iter()
2567 .map(|worker| (worker.id, worker))
2568 .collect();
2569
2570 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2571
2572 let mut should_trigger = false;
2573
2574 loop {
2575 tokio::select! {
2576 biased;
2577
2578 _ = &mut shutdown_rx => {
2579 tracing::info!("Stream manager is stopped");
2580 break;
2581 }
2582
2583 _ = ticker.tick(), if should_trigger => {
2584 let include_workers = worker_cache.keys().copied().collect_vec();
2585
2586 if include_workers.is_empty() {
2587 tracing::debug!("no available worker nodes");
2588 should_trigger = false;
2589 continue;
2590 }
2591
2592 match self.trigger_parallelism_control().await {
2593 Ok(cont) => {
2594 should_trigger = cont;
2595 }
2596 Err(e) => {
2597 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2598 ticker.reset();
2599 }
2600 }
2601 }
2602
2603 notification = local_notification_rx.recv() => {
2604 let notification = notification.expect("local notification channel closed in loop of stream manager");
2605
2606 let worker_is_streaming_compute = |worker: &WorkerNode| {
2608 worker.get_type() == Ok(WorkerType::ComputeNode)
2609 && worker.property.as_ref().unwrap().is_streaming
2610 };
2611
2612 match notification {
2613 LocalNotification::SystemParamsChange(reader) => {
2614 let new_strategy = reader.adaptive_parallelism_strategy();
2615 if new_strategy != previous_adaptive_parallelism_strategy {
2616 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2617 should_trigger = true;
2618 previous_adaptive_parallelism_strategy = new_strategy;
2619 }
2620 }
2621 LocalNotification::WorkerNodeActivated(worker) => {
2622 if !worker_is_streaming_compute(&worker) {
2623 continue;
2624 }
2625
2626 tracing::info!(worker = worker.id, "worker activated notification received");
2627
2628 let prev_worker = worker_cache.insert(worker.id, worker.clone());
2629
2630 match prev_worker {
2631 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
2632 tracing::info!(worker = worker.id, "worker parallelism changed");
2633 should_trigger = true;
2634 }
2635 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
2636 tracing::info!(worker = worker.id, "worker label changed");
2637 should_trigger = true;
2638 }
2639 None => {
2640 tracing::info!(worker = worker.id, "new worker joined");
2641 should_trigger = true;
2642 }
2643 _ => {}
2644 }
2645 }
2646
2647 LocalNotification::WorkerNodeDeleted(worker) => {
2650 if !worker_is_streaming_compute(&worker) {
2651 continue;
2652 }
2653
2654 match worker_cache.remove(&worker.id) {
2655 Some(prev_worker) => {
2656 tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2657 }
2658 None => {
2659 tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2660 }
2661 }
2662 }
2663
2664 _ => {}
2665 }
2666 }
2667 }
2668 }
2669 }
2670
2671 pub fn start_auto_parallelism_monitor(
2672 self: Arc<Self>,
2673 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2674 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2675 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2676 let join_handle = tokio::spawn(async move {
2677 self.run(shutdown_rx).await;
2678 });
2679
2680 (join_handle, shutdown_tx)
2681 }
2682}
2683
2684pub fn schedule_units_for_slots(
2685 slots: &BTreeMap<WorkerId, usize>,
2686 total_unit_size: usize,
2687 salt: u32,
2688) -> MetaResult<BTreeMap<WorkerId, usize>> {
2689 let mut ch = ConsistentHashRing::new(salt);
2690
2691 for (worker_id, parallelism) in slots {
2692 ch.add_worker(*worker_id as _, *parallelism as u32);
2693 }
2694
2695 let target_distribution = ch.distribute_tasks(total_unit_size as u32)?;
2696
2697 Ok(target_distribution
2698 .into_iter()
2699 .map(|(worker_id, task_count)| (worker_id as WorkerId, task_count as usize))
2700 .collect())
2701}
2702
2703pub struct ConsistentHashRing {
2704 ring: BTreeMap<u64, u32>,
2705 weights: BTreeMap<u32, u32>,
2706 virtual_nodes: u32,
2707 salt: u32,
2708}
2709
2710impl ConsistentHashRing {
2711 fn new(salt: u32) -> Self {
2712 ConsistentHashRing {
2713 ring: BTreeMap::new(),
2714 weights: BTreeMap::new(),
2715 virtual_nodes: 1024,
2716 salt,
2717 }
2718 }
2719
2720 fn hash<T: Hash, S: Hash>(key: T, salt: S) -> u64 {
2721 let mut hasher = DefaultHasher::new();
2722 salt.hash(&mut hasher);
2723 key.hash(&mut hasher);
2724 hasher.finish()
2725 }
2726
2727 fn add_worker(&mut self, id: u32, weight: u32) {
2728 let virtual_nodes_count = self.virtual_nodes;
2729
2730 for i in 0..virtual_nodes_count {
2731 let virtual_node_key = (id, i);
2732 let hash = Self::hash(virtual_node_key, self.salt);
2733 self.ring.insert(hash, id);
2734 }
2735
2736 self.weights.insert(id, weight);
2737 }
2738
2739 fn distribute_tasks(&self, total_tasks: u32) -> MetaResult<BTreeMap<u32, u32>> {
2740 let total_weight = self.weights.values().sum::<u32>();
2741
2742 let mut soft_limits = HashMap::new();
2743 for (worker_id, worker_capacity) in &self.weights {
2744 soft_limits.insert(
2745 *worker_id,
2746 (total_tasks as f64 * (*worker_capacity as f64 / total_weight as f64)).ceil()
2747 as u32,
2748 );
2749 }
2750
2751 let mut task_distribution: BTreeMap<u32, u32> = BTreeMap::new();
2752 let mut task_hashes = (0..total_tasks)
2753 .map(|task_idx| Self::hash(task_idx, self.salt))
2754 .collect_vec();
2755
2756 task_hashes.sort();
2758
2759 for task_hash in task_hashes {
2760 let mut assigned = false;
2761
2762 let ring_range = self.ring.range(task_hash..).chain(self.ring.iter());
2764
2765 for (_, &worker_id) in ring_range {
2766 let task_limit = soft_limits[&worker_id];
2767
2768 let worker_task_count = task_distribution.entry(worker_id).or_insert(0);
2769
2770 if *worker_task_count < task_limit {
2771 *worker_task_count += 1;
2772 assigned = true;
2773 break;
2774 }
2775 }
2776
2777 if !assigned {
2778 bail!("Could not distribute tasks due to capacity constraints.");
2779 }
2780 }
2781
2782 Ok(task_distribution)
2783 }
2784}
2785
2786#[cfg(test)]
2787mod tests {
2788 use super::*;
2789
2790 const DEFAULT_SALT: u32 = 42;
2791
2792 #[test]
2793 fn test_single_worker_capacity() {
2794 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2795 ch.add_worker(1, 10);
2796
2797 let total_tasks = 5;
2798 let task_distribution = ch.distribute_tasks(total_tasks).unwrap();
2799
2800 assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 5);
2801 }
2802
2803 #[test]
2804 fn test_multiple_workers_even_distribution() {
2805 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2806
2807 ch.add_worker(1, 1);
2808 ch.add_worker(2, 1);
2809 ch.add_worker(3, 1);
2810
2811 let total_tasks = 3;
2812 let task_distribution = ch.distribute_tasks(total_tasks).unwrap();
2813
2814 for id in 1..=3 {
2815 assert_eq!(task_distribution.get(&id).cloned().unwrap_or(0), 1);
2816 }
2817 }
2818
2819 #[test]
2820 fn test_weighted_distribution() {
2821 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2822
2823 ch.add_worker(1, 2);
2824 ch.add_worker(2, 3);
2825 ch.add_worker(3, 5);
2826
2827 let total_tasks = 10;
2828 let task_distribution = ch.distribute_tasks(total_tasks).unwrap();
2829
2830 assert_eq!(task_distribution.get(&1).cloned().unwrap_or(0), 2);
2831 assert_eq!(task_distribution.get(&2).cloned().unwrap_or(0), 3);
2832 assert_eq!(task_distribution.get(&3).cloned().unwrap_or(0), 5);
2833 }
2834
2835 #[test]
2836 fn test_over_capacity() {
2837 let mut ch = ConsistentHashRing::new(DEFAULT_SALT);
2838
2839 ch.add_worker(1, 1);
2840 ch.add_worker(2, 2);
2841 ch.add_worker(3, 3);
2842
2843 let total_tasks = 10; let task_distribution = ch.distribute_tasks(total_tasks);
2845
2846 assert!(task_distribution.is_ok());
2847 }
2848
2849 #[test]
2850 fn test_balance_distribution() {
2851 for mut worker_capacity in 1..10 {
2852 for workers in 3..10 {
2853 let mut ring = ConsistentHashRing::new(DEFAULT_SALT);
2854
2855 for worker_id in 0..workers {
2856 ring.add_worker(worker_id, worker_capacity);
2857 }
2858
2859 if worker_capacity % 2 == 0 {
2863 worker_capacity /= 2;
2864 }
2865
2866 let total_tasks = worker_capacity * workers;
2867
2868 let task_distribution = ring.distribute_tasks(total_tasks).unwrap();
2869
2870 for (_, v) in task_distribution {
2871 assert_eq!(v, worker_capacity);
2872 }
2873 }
2874 }
2875 }
2876}