1use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36 FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50 ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53 ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64 pub fragment_id: u32,
65 pub fragment_type_mask: FragmentTypeMask,
66 pub distribution_type: PbFragmentDistributionType,
67 pub state_table_ids: Vec<u32>,
68 pub node: StreamNode,
69 pub actor_template: StreamActorWithDispatchers,
70 pub actors: Vec<CustomActorInfo>,
71}
72
73#[derive(Default, Clone)]
74pub struct CustomActorInfo {
75 pub actor_id: u32,
76 pub fragment_id: u32,
77 pub dispatcher: Vec<Dispatcher>,
78 pub vnode_bitmap: Option<Bitmap>,
80}
81
82use educe::Educe;
83use futures::future::try_join_all;
84use risingwave_common::system_param::AdaptiveParallelismStrategy;
85use risingwave_common::system_param::reader::SystemParamsRead;
86use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
87use risingwave_meta_model::DispatcherType;
88use risingwave_pb::stream_plan::stream_node::NodeBody;
89
90use super::SourceChange;
91use crate::controller::id::IdCategory;
92use crate::controller::utils::filter_workers_by_resource_group;
93
94#[derive(Educe)]
96#[educe(Debug)]
97pub struct RescheduleContext {
98 #[educe(Debug(ignore))]
100 actor_map: HashMap<ActorId, CustomActorInfo>,
101 actor_status: BTreeMap<ActorId, WorkerId>,
103 #[educe(Debug(ignore))]
105 fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
106 stream_source_fragment_ids: HashSet<FragmentId>,
108 stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
110 no_shuffle_target_fragment_ids: HashSet<FragmentId>,
112 no_shuffle_source_fragment_ids: HashSet<FragmentId>,
114 fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
116 fragment_upstreams: HashMap<
117 risingwave_meta_model::FragmentId,
118 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
119 >,
120}
121
122impl RescheduleContext {
123 fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
124 self.actor_status
125 .get(actor_id)
126 .cloned()
127 .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
128 }
129}
130
131pub fn rebalance_actor_vnode(
158 actors: &[CustomActorInfo],
159 actors_to_remove: &BTreeSet<ActorId>,
160 actors_to_create: &BTreeSet<ActorId>,
161) -> HashMap<ActorId, Bitmap> {
162 let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
163
164 assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
165 assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
166
167 assert!(actors.len() >= actors_to_remove.len());
168
169 let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
170 assert!(target_actor_count > 0);
171
172 let vnode_count = actors[0]
174 .vnode_bitmap
175 .as_ref()
176 .expect("vnode bitmap unset")
177 .len();
178
179 #[derive(Debug)]
181 struct Balance {
182 actor_id: ActorId,
183 balance: i32,
184 builder: BitmapBuilder,
185 }
186 let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
187
188 tracing::debug!(
189 "expected {}, remain {}, prev actors {}, target actors {}",
190 expected,
191 remain,
192 actors.len(),
193 target_actor_count,
194 );
195
196 let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
197 .iter()
198 .map(|actor| {
199 (
200 actor.actor_id as ActorId,
201 actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
202 )
203 })
204 .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
205
206 let order_by_bitmap_desc =
207 |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
208 bitmap_a
209 .count_ones()
210 .cmp(&bitmap_b.count_ones())
211 .reverse()
212 .then(id_a.cmp(id_b))
213 };
214
215 let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
216 let mut builder = BitmapBuilder::default();
217 builder.append_bitmap(bitmap);
218 builder
219 };
220
221 let (prev_expected, _) = vnode_count.div_rem(&actors.len());
222
223 let prev_remain = removed
224 .iter()
225 .map(|(_, bitmap)| {
226 assert!(bitmap.count_ones() >= prev_expected);
227 bitmap.count_ones() - prev_expected
228 })
229 .sum::<usize>();
230
231 removed.sort_by(order_by_bitmap_desc);
232 rest.sort_by(order_by_bitmap_desc);
233
234 let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
235 actor_id,
236 balance: bitmap.count_ones() as i32,
237 builder: builder_from_bitmap(&bitmap),
238 });
239
240 let mut rest_balances = rest
241 .into_iter()
242 .map(|(actor_id, bitmap)| Balance {
243 actor_id,
244 balance: bitmap.count_ones() as i32 - expected as i32,
245 builder: builder_from_bitmap(&bitmap),
246 })
247 .collect_vec();
248
249 let mut created_balances = actors_to_create
250 .iter()
251 .map(|actor_id| Balance {
252 actor_id: *actor_id,
253 balance: -(expected as i32),
254 builder: BitmapBuilder::zeroed(vnode_count),
255 })
256 .collect_vec();
257
258 for balance in created_balances
259 .iter_mut()
260 .rev()
261 .take(prev_remain)
262 .chain(rest_balances.iter_mut())
263 {
264 if remain > 0 {
265 balance.balance -= 1;
266 remain -= 1;
267 }
268 }
269
270 for balance in &mut created_balances {
272 if remain > 0 {
273 balance.balance -= 1;
274 remain -= 1;
275 }
276 }
277
278 assert_eq!(remain, 0);
279
280 let mut v: VecDeque<_> = removed_balances
281 .chain(rest_balances)
282 .chain(created_balances)
283 .collect();
284
285 let mut result = HashMap::with_capacity(target_actor_count);
288
289 for balance in &v {
290 tracing::debug!(
291 "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
292 balance.actor_id,
293 balance.balance,
294 actors_to_remove.contains(&balance.actor_id),
295 actors_to_create.contains(&balance.actor_id)
296 );
297 }
298
299 while !v.is_empty() {
300 if v.len() == 1 {
301 let single = v.pop_front().unwrap();
302 assert_eq!(single.balance, 0);
303 if !actors_to_remove.contains(&single.actor_id) {
304 result.insert(single.actor_id, single.builder.finish());
305 }
306
307 continue;
308 }
309
310 let mut src = v.pop_front().unwrap();
311 let mut dst = v.pop_back().unwrap();
312
313 let n = min(abs(src.balance), abs(dst.balance));
314
315 let mut moved = 0;
316 for idx in (0..vnode_count).rev() {
317 if moved >= n {
318 break;
319 }
320
321 if src.builder.is_set(idx) {
322 src.builder.set(idx, false);
323 assert!(!dst.builder.is_set(idx));
324 dst.builder.set(idx, true);
325 moved += 1;
326 }
327 }
328
329 src.balance -= n;
330 dst.balance += n;
331
332 if src.balance != 0 {
333 v.push_front(src);
334 } else if !actors_to_remove.contains(&src.actor_id) {
335 result.insert(src.actor_id, src.builder.finish());
336 }
337
338 if dst.balance != 0 {
339 v.push_back(dst);
340 } else {
341 result.insert(dst.actor_id, dst.builder.finish());
342 }
343 }
344
345 result
346}
347
348#[derive(Debug, Clone, Copy)]
349pub struct RescheduleOptions {
350 pub resolve_no_shuffle_upstream: bool,
352
353 pub skip_create_new_actors: bool,
355}
356
357pub type ScaleControllerRef = Arc<ScaleController>;
358
359pub struct ScaleController {
360 pub metadata_manager: MetadataManager,
361
362 pub source_manager: SourceManagerRef,
363
364 pub env: MetaSrvEnv,
365
366 pub reschedule_lock: RwLock<()>,
369}
370
371impl ScaleController {
372 pub fn new(
373 metadata_manager: &MetadataManager,
374 source_manager: SourceManagerRef,
375 env: MetaSrvEnv,
376 ) -> Self {
377 Self {
378 metadata_manager: metadata_manager.clone(),
379 source_manager,
380 env,
381 reschedule_lock: RwLock::new(()),
382 }
383 }
384
385 pub async fn integrity_check(&self) -> MetaResult<()> {
386 self.metadata_manager
387 .catalog_controller
388 .integrity_check()
389 .await
390 }
391
392 async fn build_reschedule_context(
394 &self,
395 reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
396 options: RescheduleOptions,
397 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
398 ) -> MetaResult<RescheduleContext> {
399 let worker_nodes: HashMap<WorkerId, WorkerNode> = self
400 .metadata_manager
401 .list_active_streaming_compute_nodes()
402 .await?
403 .into_iter()
404 .map(|worker_node| (worker_node.id as _, worker_node))
405 .collect();
406
407 if worker_nodes.is_empty() {
408 bail!("no available compute node in the cluster");
409 }
410
411 let unschedulable_worker_ids: HashSet<_> = worker_nodes
413 .values()
414 .filter(|w| {
415 w.property
416 .as_ref()
417 .map(|property| property.is_unschedulable)
418 .unwrap_or(false)
419 })
420 .map(|worker| worker.id as WorkerId)
421 .collect();
422
423 for (fragment_id, reschedule) in &*reschedule {
424 for (worker_id, change) in &reschedule.worker_actor_diff {
425 if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
426 bail!(
427 "unable to move fragment {} to unschedulable worker {}",
428 fragment_id,
429 worker_id
430 );
431 }
432 }
433 }
434
435 let mut actor_map = HashMap::new();
438 let mut fragment_map = HashMap::new();
440 let mut actor_status = BTreeMap::new();
442 let mut fragment_state = HashMap::new();
443 let mut fragment_to_table = HashMap::new();
444
445 fn fulfill_index_by_fragment_ids(
446 actor_map: &mut HashMap<u32, CustomActorInfo>,
447 fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
448 actor_status: &mut BTreeMap<ActorId, WorkerId>,
449 fragment_state: &mut HashMap<FragmentId, State>,
450 fragment_to_table: &mut HashMap<FragmentId, TableId>,
451 fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
452 actors: HashMap<ActorId, actor::Model>,
453 mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
454 related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
455 ) {
456 let mut fragment_actors: HashMap<
457 risingwave_meta_model::FragmentId,
458 Vec<CustomActorInfo>,
459 > = HashMap::new();
460
461 let mut expr_contexts = HashMap::new();
462 for (
463 _,
464 actor::Model {
465 actor_id,
466 fragment_id,
467 status: _,
468 splits: _,
469 worker_id,
470 vnode_bitmap,
471 expr_context,
472 ..
473 },
474 ) in actors
475 {
476 let dispatchers = actor_dispatchers
477 .remove(&(actor_id as _))
478 .unwrap_or_default();
479
480 let actor_info = CustomActorInfo {
481 actor_id: actor_id as _,
482 fragment_id: fragment_id as _,
483 dispatcher: dispatchers,
484 vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
485 };
486
487 actor_map.insert(actor_id as _, actor_info.clone());
488
489 fragment_actors
490 .entry(fragment_id as _)
491 .or_default()
492 .push(actor_info);
493
494 actor_status.insert(actor_id as _, worker_id as WorkerId);
495
496 expr_contexts.insert(actor_id as u32, expr_context);
497 }
498
499 for (
500 _,
501 fragment::Model {
502 fragment_id,
503 job_id,
504 fragment_type_mask,
505 distribution_type,
506 stream_node,
507 state_table_ids,
508 ..
509 },
510 ) in fragments
511 {
512 let actors = fragment_actors
513 .remove(&(fragment_id as _))
514 .unwrap_or_default();
515
516 let CustomActorInfo {
517 actor_id,
518 fragment_id,
519 dispatcher,
520 vnode_bitmap,
521 } = actors.first().unwrap().clone();
522
523 let (related_job, job_definition) =
524 related_jobs.get(&job_id).expect("job not found");
525
526 let fragment = CustomFragmentInfo {
527 fragment_id: fragment_id as _,
528 fragment_type_mask: fragment_type_mask.into(),
529 distribution_type: distribution_type.into(),
530 state_table_ids: state_table_ids.into_u32_array(),
531 node: stream_node.to_protobuf(),
532 actor_template: (
533 StreamActor {
534 actor_id,
535 fragment_id: fragment_id as _,
536 vnode_bitmap,
537 mview_definition: job_definition.to_owned(),
538 expr_context: expr_contexts
539 .get(&actor_id)
540 .cloned()
541 .map(|expr_context| expr_context.to_protobuf()),
542 },
543 dispatcher,
544 ),
545 actors,
546 };
547
548 fragment_map.insert(fragment_id as _, fragment);
549
550 fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
551
552 fragment_state.insert(
553 fragment_id,
554 table_fragments::PbState::from(related_job.job_status),
555 );
556 }
557 }
558 let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
559 let working_set = self
560 .metadata_manager
561 .catalog_controller
562 .resolve_working_set_for_reschedule_fragments(fragment_ids)
563 .await?;
564
565 fulfill_index_by_fragment_ids(
566 &mut actor_map,
567 &mut fragment_map,
568 &mut actor_status,
569 &mut fragment_state,
570 &mut fragment_to_table,
571 working_set.fragments,
572 working_set.actors,
573 working_set.actor_dispatchers,
574 working_set.related_jobs,
575 );
576
577 let mut no_shuffle_source_fragment_ids = HashSet::new();
579 let mut no_shuffle_target_fragment_ids = HashSet::new();
580
581 Self::build_no_shuffle_relation_index(
582 &actor_map,
583 &mut no_shuffle_source_fragment_ids,
584 &mut no_shuffle_target_fragment_ids,
585 );
586
587 if options.resolve_no_shuffle_upstream {
588 let original_reschedule_keys = reschedule.keys().cloned().collect();
589
590 Self::resolve_no_shuffle_upstream_fragments(
591 reschedule,
592 &no_shuffle_source_fragment_ids,
593 &no_shuffle_target_fragment_ids,
594 &working_set.fragment_upstreams,
595 )?;
596
597 if !table_parallelisms.is_empty() {
598 Self::resolve_no_shuffle_upstream_tables(
600 original_reschedule_keys,
601 &no_shuffle_source_fragment_ids,
602 &no_shuffle_target_fragment_ids,
603 &fragment_to_table,
604 &working_set.fragment_upstreams,
605 table_parallelisms,
606 )?;
607 }
608 }
609
610 let mut fragment_dispatcher_map = HashMap::new();
611 Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
612
613 let mut stream_source_fragment_ids = HashSet::new();
614 let mut stream_source_backfill_fragment_ids = HashMap::new();
615 let mut no_shuffle_reschedule = HashMap::new();
616 for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
617 let fragment = fragment_map
618 .get(fragment_id)
619 .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
620
621 match fragment_state[fragment_id] {
623 table_fragments::State::Unspecified => unreachable!(),
624 state @ table_fragments::State::Initial => {
625 bail!(
626 "the materialized view of fragment {fragment_id} is in state {}",
627 state.as_str_name()
628 )
629 }
630 state @ table_fragments::State::Creating => {
631 let stream_node = &fragment.node;
632
633 let mut is_reschedulable = true;
634 visit_stream_node_cont(stream_node, |body| {
635 if let Some(NodeBody::StreamScan(node)) = &body.node_body {
636 if !node.stream_scan_type().is_reschedulable() {
637 is_reschedulable = false;
638
639 return false;
641 }
642
643 return true;
645 }
646
647 true
649 });
650
651 if !is_reschedulable {
652 bail!(
653 "the materialized view of fragment {fragment_id} is in state {}",
654 state.as_str_name()
655 )
656 }
657 }
658 table_fragments::State::Created => {}
659 }
660
661 if no_shuffle_target_fragment_ids.contains(fragment_id) {
662 bail!(
663 "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
664 );
665 }
666
667 if no_shuffle_source_fragment_ids.contains(fragment_id) {
672 let mut queue: VecDeque<_> = fragment_dispatcher_map
674 .get(fragment_id)
675 .unwrap()
676 .keys()
677 .cloned()
678 .collect();
679
680 while let Some(downstream_id) = queue.pop_front() {
681 if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
682 continue;
683 }
684
685 if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
686 {
687 let no_shuffle_downstreams = downstream_fragments
688 .iter()
689 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
690 .map(|(fragment_id, _)| fragment_id);
691
692 queue.extend(no_shuffle_downstreams.copied());
693 }
694
695 no_shuffle_reschedule.insert(
696 downstream_id,
697 WorkerReschedule {
698 worker_actor_diff: worker_actor_diff.clone(),
699 },
700 );
701 }
702 }
703
704 if fragment
705 .fragment_type_mask
706 .contains(FragmentTypeFlag::Source)
707 && fragment.node.find_stream_source().is_some()
708 {
709 stream_source_fragment_ids.insert(*fragment_id);
710 }
711
712 let current_worker_ids = fragment
714 .actors
715 .iter()
716 .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
717 .collect::<HashSet<_>>();
718
719 for (removed, change) in worker_actor_diff {
720 if !current_worker_ids.contains(removed) && change.is_negative() {
721 bail!(
722 "no actor on the worker {} of fragment {}",
723 removed,
724 fragment_id
725 );
726 }
727 }
728
729 let added_actor_count: usize = worker_actor_diff
730 .values()
731 .filter(|change| change.is_positive())
732 .cloned()
733 .map(|change| change as usize)
734 .sum();
735
736 let removed_actor_count: usize = worker_actor_diff
737 .values()
738 .filter(|change| change.is_positive())
739 .cloned()
740 .map(|v| v.unsigned_abs())
741 .sum();
742
743 match fragment.distribution_type {
744 FragmentDistributionType::Hash => {
745 if fragment.actors.len() + added_actor_count <= removed_actor_count {
746 bail!("can't remove all actors from fragment {}", fragment_id);
747 }
748 }
749 FragmentDistributionType::Single => {
750 if added_actor_count != removed_actor_count {
751 bail!("single distribution fragment only support migration");
752 }
753 }
754 FragmentDistributionType::Unspecified => unreachable!(),
755 }
756 }
757
758 if !no_shuffle_reschedule.is_empty() {
759 tracing::info!(
760 "reschedule plan rewritten with NoShuffle reschedule {:?}",
761 no_shuffle_reschedule
762 );
763
764 for noshuffle_downstream in no_shuffle_reschedule.keys() {
765 let fragment = fragment_map.get(noshuffle_downstream).unwrap();
766 if fragment
768 .fragment_type_mask
769 .contains(FragmentTypeFlag::SourceScan)
770 {
771 let stream_node = &fragment.node;
772 if let Some((_source_id, upstream_source_fragment_id)) =
773 stream_node.find_source_backfill()
774 {
775 stream_source_backfill_fragment_ids
776 .insert(fragment.fragment_id, upstream_source_fragment_id);
777 }
778 }
779 }
780 }
781
782 reschedule.extend(no_shuffle_reschedule.into_iter());
784
785 Ok(RescheduleContext {
786 actor_map,
787 actor_status,
788 fragment_map,
789 stream_source_fragment_ids,
790 stream_source_backfill_fragment_ids,
791 no_shuffle_target_fragment_ids,
792 no_shuffle_source_fragment_ids,
793 fragment_dispatcher_map,
794 fragment_upstreams: working_set.fragment_upstreams,
795 })
796 }
797
798 pub(crate) async fn analyze_reschedule_plan(
810 &self,
811 mut reschedules: HashMap<FragmentId, WorkerReschedule>,
812 options: RescheduleOptions,
813 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
814 ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
815 tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
816 let ctx = self
817 .build_reschedule_context(&mut reschedules, options, table_parallelisms)
818 .await?;
819 tracing::debug!("reschedule context: {:#?}", ctx);
820 let reschedules = reschedules;
821
822 let (fragment_actors_to_remove, fragment_actors_to_create) =
827 self.arrange_reschedules(&reschedules, &ctx)?;
828
829 let mut fragment_actor_bitmap = HashMap::new();
830 for fragment_id in reschedules.keys() {
831 if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
832 continue;
835 }
836
837 let actors_to_create = fragment_actors_to_create
838 .get(fragment_id)
839 .map(|map| map.keys().copied().collect())
840 .unwrap_or_default();
841
842 let actors_to_remove = fragment_actors_to_remove
843 .get(fragment_id)
844 .map(|map| map.keys().copied().collect())
845 .unwrap_or_default();
846
847 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
848
849 match fragment.distribution_type {
850 FragmentDistributionType::Single => {
851 fragment_actor_bitmap
853 .insert(fragment.fragment_id as FragmentId, Default::default());
854 }
855 FragmentDistributionType::Hash => {
856 let actor_vnode = rebalance_actor_vnode(
857 &fragment.actors,
858 &actors_to_remove,
859 &actors_to_create,
860 );
861
862 fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
863 }
864
865 FragmentDistributionType::Unspecified => unreachable!(),
866 }
867 }
868
869 let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
873 for fragment_id in reschedules.keys() {
874 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
875 let mut new_actor_ids = BTreeMap::new();
876 for actor in &fragment.actors {
877 if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
878 && actors_to_remove.contains_key(&actor.actor_id)
879 {
880 continue;
881 }
882 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
883 new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
884 }
885
886 if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
887 for (actor_id, worker_id) in actors_to_create {
888 new_actor_ids.insert(*actor_id, *worker_id);
889 }
890 }
891
892 assert!(
893 !new_actor_ids.is_empty(),
894 "should be at least one actor in fragment {} after rescheduling",
895 fragment_id
896 );
897
898 fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
899 }
900
901 let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
902
903 fn arrange_no_shuffle_relation(
909 ctx: &RescheduleContext,
910 fragment_id: &FragmentId,
911 upstream_fragment_id: &FragmentId,
912 fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
913 actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
914 fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
915 no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
916 no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
917 ) {
918 if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
919 return;
920 }
921
922 let fragment = &ctx.fragment_map[fragment_id];
923
924 let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
925
926 for upstream_actor in &upstream_fragment.actors {
928 for dispatcher in &upstream_actor.dispatcher {
929 if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
930 let downstream_actor_id =
931 *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
932
933 if !ctx
935 .no_shuffle_target_fragment_ids
936 .contains(upstream_fragment_id)
937 {
938 actor_group_map.insert(
939 upstream_actor.actor_id,
940 (upstream_fragment.fragment_id, upstream_actor.actor_id),
941 );
942 actor_group_map.insert(
943 downstream_actor_id,
944 (upstream_fragment.fragment_id, upstream_actor.actor_id),
945 );
946 } else {
947 let root_actor_id = actor_group_map[&upstream_actor.actor_id];
948
949 actor_group_map.insert(downstream_actor_id, root_actor_id);
950 }
951 }
952 }
953 }
954
955 let upstream_fragment_bitmap = fragment_updated_bitmap
957 .get(upstream_fragment_id)
958 .cloned()
959 .unwrap_or_default();
960
961 if upstream_fragment.distribution_type == FragmentDistributionType::Single {
963 assert!(
964 upstream_fragment_bitmap.is_empty(),
965 "single fragment should have no bitmap updates"
966 );
967 }
968
969 let upstream_fragment_actor_map = fragment_actors_after_reschedule
970 .get(upstream_fragment_id)
971 .cloned()
972 .unwrap();
973
974 let fragment_actor_map = fragment_actors_after_reschedule
975 .get(fragment_id)
976 .cloned()
977 .unwrap();
978
979 let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
980
981 let mut fragment_bitmap = HashMap::new();
983
984 for (actor_id, worker_id) in &fragment_actor_map {
985 if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
986 let root_bitmap = fragment_updated_bitmap
987 .get(root_fragment)
988 .expect("root fragment bitmap not found")
989 .get(root_actor_id)
990 .cloned()
991 .expect("root actor bitmap not found");
992
993 fragment_bitmap.insert(*actor_id, root_bitmap);
995
996 no_shuffle_upstream_actor_map
997 .entry(*actor_id as ActorId)
998 .or_default()
999 .insert(*upstream_fragment_id, *root_actor_id);
1000 no_shuffle_downstream_actors_map
1001 .entry(*root_actor_id)
1002 .or_default()
1003 .insert(*fragment_id, *actor_id);
1004 } else {
1005 worker_reverse_index
1006 .entry(*worker_id)
1007 .or_default()
1008 .insert(*actor_id);
1009 }
1010 }
1011
1012 let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1013
1014 for (actor_id, worker_id) in &upstream_fragment_actor_map {
1015 if !actor_group_map.contains_key(actor_id) {
1016 upstream_worker_reverse_index
1017 .entry(*worker_id)
1018 .or_default()
1019 .insert(*actor_id);
1020 }
1021 }
1022
1023 for (worker_id, actor_ids) in worker_reverse_index {
1025 let upstream_actor_ids = upstream_worker_reverse_index
1026 .get(&worker_id)
1027 .unwrap()
1028 .clone();
1029
1030 assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1031
1032 for (actor_id, upstream_actor_id) in actor_ids
1033 .into_iter()
1034 .zip_eq_debug(upstream_actor_ids.into_iter())
1035 {
1036 match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1037 None => {
1038 assert_eq!(
1040 upstream_fragment.distribution_type,
1041 FragmentDistributionType::Single
1042 );
1043 }
1044 Some(bitmap) => {
1045 fragment_bitmap.insert(actor_id, bitmap);
1047 }
1048 }
1049
1050 no_shuffle_upstream_actor_map
1051 .entry(actor_id as ActorId)
1052 .or_default()
1053 .insert(*upstream_fragment_id, upstream_actor_id);
1054 no_shuffle_downstream_actors_map
1055 .entry(upstream_actor_id)
1056 .or_default()
1057 .insert(*fragment_id, actor_id);
1058 }
1059 }
1060
1061 match fragment.distribution_type {
1062 FragmentDistributionType::Hash => {}
1063 FragmentDistributionType::Single => {
1064 assert!(fragment_bitmap.is_empty());
1066 }
1067 FragmentDistributionType::Unspecified => unreachable!(),
1068 }
1069
1070 if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1071 assert_eq!(
1072 e.entry.get(),
1073 &e.value,
1074 "bitmaps derived from different no-shuffle upstreams mismatch"
1075 );
1076 }
1077
1078 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1080 let no_shuffle_downstreams = downstream_fragments
1081 .iter()
1082 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1083 .map(|(fragment_id, _)| fragment_id);
1084
1085 for downstream_fragment_id in no_shuffle_downstreams {
1086 arrange_no_shuffle_relation(
1087 ctx,
1088 downstream_fragment_id,
1089 fragment_id,
1090 fragment_actors_after_reschedule,
1091 actor_group_map,
1092 fragment_updated_bitmap,
1093 no_shuffle_upstream_actor_map,
1094 no_shuffle_downstream_actors_map,
1095 );
1096 }
1097 }
1098 }
1099
1100 let mut no_shuffle_upstream_actor_map = HashMap::new();
1101 let mut no_shuffle_downstream_actors_map = HashMap::new();
1102 let mut actor_group_map = HashMap::new();
1103 for fragment_id in reschedules.keys() {
1106 if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1107 && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1108 && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1109 {
1110 for downstream_fragment_id in downstream_fragments.keys() {
1111 arrange_no_shuffle_relation(
1112 &ctx,
1113 downstream_fragment_id,
1114 fragment_id,
1115 &fragment_actors_after_reschedule,
1116 &mut actor_group_map,
1117 &mut fragment_actor_bitmap,
1118 &mut no_shuffle_upstream_actor_map,
1119 &mut no_shuffle_downstream_actors_map,
1120 );
1121 }
1122 }
1123 }
1124
1125 tracing::debug!("actor group map {:?}", actor_group_map);
1126
1127 let mut new_created_actors = HashMap::new();
1128 for fragment_id in reschedules.keys() {
1129 let actors_to_create = fragment_actors_to_create
1130 .get(fragment_id)
1131 .cloned()
1132 .unwrap_or_default();
1133
1134 let fragment = &ctx.fragment_map[fragment_id];
1135
1136 assert!(!fragment.actors.is_empty());
1137
1138 for actor_to_create in &actors_to_create {
1139 let new_actor_id = actor_to_create.0;
1140 let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1141
1142 new_actor.actor_id = *new_actor_id;
1146
1147 Self::modify_actor_upstream_and_downstream(
1148 &ctx,
1149 &fragment_actors_to_remove,
1150 &fragment_actors_to_create,
1151 &fragment_actor_bitmap,
1152 &no_shuffle_downstream_actors_map,
1153 &mut new_actor,
1154 &mut dispatchers,
1155 )?;
1156
1157 if let Some(bitmap) = fragment_actor_bitmap
1158 .get(fragment_id)
1159 .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1160 {
1161 new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1162 }
1163
1164 new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1165 }
1166 }
1167
1168 let mut fragment_actor_splits = HashMap::new();
1171 for fragment_id in reschedules.keys() {
1172 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1173
1174 if ctx.stream_source_fragment_ids.contains(fragment_id) {
1175 let fragment = &ctx.fragment_map[fragment_id];
1176
1177 let prev_actor_ids = fragment
1178 .actors
1179 .iter()
1180 .map(|actor| actor.actor_id)
1181 .collect_vec();
1182
1183 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1184
1185 let actor_splits = self
1186 .source_manager
1187 .migrate_splits_for_source_actors(
1188 *fragment_id,
1189 &prev_actor_ids,
1190 &curr_actor_ids,
1191 )
1192 .await?;
1193
1194 tracing::debug!(
1195 "source actor splits: {:?}, fragment_id: {}",
1196 actor_splits,
1197 fragment_id
1198 );
1199 fragment_actor_splits.insert(*fragment_id, actor_splits);
1200 }
1201 }
1202 if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1204 for fragment_id in reschedules.keys() {
1205 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1206
1207 if let Some(upstream_source_fragment_id) =
1208 ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1209 {
1210 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1211
1212 let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1213 *fragment_id,
1214 *upstream_source_fragment_id,
1215 &curr_actor_ids,
1216 &fragment_actor_splits,
1217 &no_shuffle_upstream_actor_map,
1218 )?;
1219 tracing::debug!(
1220 "source backfill actor splits: {:?}, fragment_id: {}",
1221 actor_splits,
1222 fragment_id
1223 );
1224 fragment_actor_splits.insert(*fragment_id, actor_splits);
1225 }
1226 }
1227 }
1228
1229 let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1231 HashMap::with_capacity(reschedules.len());
1232
1233 for (fragment_id, _) in reschedules {
1234 let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1235
1236 if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1237 for (actor_id, worker_id) in actor_worker_maps {
1238 actors_to_create
1239 .entry(worker_id)
1240 .or_default()
1241 .push(actor_id);
1242 }
1243 }
1244
1245 let actors_to_remove = fragment_actors_to_remove
1246 .get(&fragment_id)
1247 .cloned()
1248 .unwrap_or_default()
1249 .into_keys()
1250 .collect();
1251
1252 let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1253
1254 assert!(!actors_after_reschedule.is_empty());
1255
1256 let fragment = &ctx.fragment_map[&fragment_id];
1257
1258 let in_degree_types: HashSet<_> = ctx
1259 .fragment_upstreams
1260 .get(&(fragment_id as _))
1261 .map(|upstreams| upstreams.values())
1262 .into_iter()
1263 .flatten()
1264 .cloned()
1265 .collect();
1266
1267 let upstream_dispatcher_mapping = match fragment.distribution_type {
1268 FragmentDistributionType::Hash => {
1269 if !in_degree_types.contains(&DispatcherType::Hash) {
1270 None
1271 } else {
1272 Some(ActorMapping::from_bitmaps(
1274 &fragment_actor_bitmap[&fragment_id],
1275 ))
1276 }
1277 }
1278
1279 FragmentDistributionType::Single => {
1280 assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1281 None
1282 }
1283 FragmentDistributionType::Unspecified => unreachable!(),
1284 };
1285
1286 let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1287
1288 {
1289 if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1290 for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1291 match upstream_dispatcher_type {
1292 DispatcherType::NoShuffle => {}
1293 _ => {
1294 upstream_fragment_dispatcher_set.insert((
1295 *upstream_fragment_id as FragmentId,
1296 fragment.fragment_id as DispatcherId,
1297 ));
1298 }
1299 }
1300 }
1301 }
1302 }
1303
1304 let downstream_fragment_ids = if let Some(downstream_fragments) =
1305 ctx.fragment_dispatcher_map.get(&fragment_id)
1306 {
1307 downstream_fragments
1310 .iter()
1311 .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1312 .map(|(fragment_id, _)| *fragment_id)
1313 .collect_vec()
1314 } else {
1315 vec![]
1316 };
1317
1318 let vnode_bitmap_updates = match fragment.distribution_type {
1319 FragmentDistributionType::Hash => {
1320 let mut vnode_bitmap_updates =
1321 fragment_actor_bitmap.remove(&fragment_id).unwrap();
1322
1323 for actor_id in actors_after_reschedule.keys() {
1326 assert!(vnode_bitmap_updates.contains_key(actor_id));
1327
1328 if let Some(actor) = ctx.actor_map.get(actor_id) {
1330 let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1331
1332 if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1333 && prev_bitmap.eq(bitmap)
1334 {
1335 vnode_bitmap_updates.remove(actor_id);
1336 }
1337 }
1338 }
1339
1340 vnode_bitmap_updates
1341 }
1342 FragmentDistributionType::Single => HashMap::new(),
1343 FragmentDistributionType::Unspecified => unreachable!(),
1344 };
1345
1346 let upstream_fragment_dispatcher_ids =
1347 upstream_fragment_dispatcher_set.into_iter().collect_vec();
1348
1349 let actor_splits = fragment_actor_splits
1350 .get(&fragment_id)
1351 .cloned()
1352 .unwrap_or_default();
1353
1354 reschedule_fragment.insert(
1355 fragment_id,
1356 Reschedule {
1357 added_actors: actors_to_create,
1358 removed_actors: actors_to_remove,
1359 vnode_bitmap_updates,
1360 upstream_fragment_dispatcher_ids,
1361 upstream_dispatcher_mapping,
1362 downstream_fragment_ids,
1363 actor_splits,
1364 newly_created_actors: Default::default(),
1365 },
1366 );
1367 }
1368
1369 let mut fragment_created_actors = HashMap::new();
1370 for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1371 let mut created_actors = HashMap::new();
1372 for (actor_id, worker_id) in actors_to_create {
1373 let actor = new_created_actors.get(actor_id).cloned().unwrap();
1374 created_actors.insert(*actor_id, (actor, *worker_id));
1375 }
1376
1377 fragment_created_actors.insert(*fragment_id, created_actors);
1378 }
1379
1380 for (fragment_id, to_create) in fragment_created_actors {
1381 let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1382 reschedule.newly_created_actors = to_create;
1383 }
1384 tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1385
1386 Ok(reschedule_fragment)
1387 }
1388
1389 #[expect(clippy::type_complexity)]
1390 fn arrange_reschedules(
1391 &self,
1392 reschedule: &HashMap<FragmentId, WorkerReschedule>,
1393 ctx: &RescheduleContext,
1394 ) -> MetaResult<(
1395 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1396 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1397 )> {
1398 let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1399 let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1400
1401 for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1402 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1403
1404 let mut actors_to_remove = BTreeMap::new();
1406 let mut actors_to_create = BTreeMap::new();
1407
1408 let mut worker_to_actors = HashMap::new();
1410
1411 for actor in &fragment.actors {
1412 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1413 worker_to_actors
1414 .entry(worker_id)
1415 .or_insert(BTreeSet::new())
1416 .insert(actor.actor_id as ActorId);
1417 }
1418
1419 let decreased_actor_count = worker_actor_diff
1420 .iter()
1421 .filter(|(_, change)| change.is_negative())
1422 .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1423
1424 for (worker_id, n) in decreased_actor_count {
1425 if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1426 if actor_ids.len() < n {
1427 bail!(
1428 "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1429 fragment_id,
1430 worker_id,
1431 actor_ids.len(),
1432 n
1433 );
1434 }
1435
1436 let removed_actors: Vec<_> = actor_ids
1437 .iter()
1438 .skip(actor_ids.len().saturating_sub(n))
1439 .cloned()
1440 .collect();
1441
1442 for actor in removed_actors {
1443 actors_to_remove.insert(actor, *worker_id);
1444 }
1445 }
1446 }
1447
1448 let increased_actor_count = worker_actor_diff
1449 .iter()
1450 .filter(|(_, change)| change.is_positive());
1451
1452 for (worker, n) in increased_actor_count {
1453 for _ in 0..*n {
1454 let id = self
1455 .env
1456 .id_gen_manager()
1457 .generate_interval::<{ IdCategory::Actor }>(1)
1458 as ActorId;
1459 actors_to_create.insert(id, *worker);
1460 }
1461 }
1462
1463 if !actors_to_remove.is_empty() {
1464 fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1465 }
1466
1467 if !actors_to_create.is_empty() {
1468 fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1469 }
1470 }
1471
1472 for actors_to_remove in fragment_actors_to_remove.values() {
1474 for actor_id in actors_to_remove.keys() {
1475 let actor = ctx.actor_map.get(actor_id).unwrap();
1476 for dispatcher in &actor.dispatcher {
1477 if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1478 let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1479
1480 let _should_exists = fragment_actors_to_remove
1481 .get(&(dispatcher.dispatcher_id as FragmentId))
1482 .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1483 .get(downstream_actor_id)
1484 .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1485 }
1486 }
1487 }
1488 }
1489
1490 Ok((fragment_actors_to_remove, fragment_actors_to_create))
1491 }
1492
1493 fn modify_actor_upstream_and_downstream(
1496 ctx: &RescheduleContext,
1497 fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1498 fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1499 fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1500 no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1501 new_actor: &mut StreamActor,
1502 dispatchers: &mut Vec<PbDispatcher>,
1503 ) -> MetaResult<()> {
1504 for dispatcher in dispatchers {
1506 let downstream_fragment_id = dispatcher
1507 .downstream_actor_id
1508 .iter()
1509 .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1510 .dedup()
1511 .exactly_one()
1512 .unwrap() as FragmentId;
1513
1514 let downstream_fragment_actors_to_remove =
1515 fragment_actors_to_remove.get(&downstream_fragment_id);
1516 let downstream_fragment_actors_to_create =
1517 fragment_actors_to_create.get(&downstream_fragment_id);
1518
1519 match dispatcher.r#type() {
1520 d @ (PbDispatcherType::Hash
1521 | PbDispatcherType::Simple
1522 | PbDispatcherType::Broadcast) => {
1523 if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1524 {
1525 dispatcher
1526 .downstream_actor_id
1527 .retain(|id| !downstream_actors_to_remove.contains_key(id));
1528 }
1529
1530 if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1531 {
1532 dispatcher
1533 .downstream_actor_id
1534 .extend(downstream_actors_to_create.keys().cloned())
1535 }
1536
1537 if d == PbDispatcherType::Simple {
1539 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1540 }
1541 }
1542 PbDispatcherType::NoShuffle => {
1543 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1544 let downstream_actor_id = no_shuffle_downstream_actors_map
1545 .get(&new_actor.actor_id)
1546 .and_then(|map| map.get(&downstream_fragment_id))
1547 .unwrap();
1548 dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1549 }
1550 PbDispatcherType::Unspecified => unreachable!(),
1551 }
1552
1553 if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1554 && let Some(downstream_updated_bitmap) =
1555 fragment_actor_bitmap.get(&downstream_fragment_id)
1556 {
1557 *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1559 }
1560 }
1561
1562 Ok(())
1563 }
1564
1565 #[await_tree::instrument]
1566 pub async fn post_apply_reschedule(
1567 &self,
1568 reschedules: &HashMap<FragmentId, Reschedule>,
1569 post_updates: &JobReschedulePostUpdates,
1570 ) -> MetaResult<()> {
1571 self.metadata_manager
1573 .post_apply_reschedules(reschedules.clone(), post_updates)
1574 .await?;
1575
1576 if !reschedules.is_empty() {
1578 let workers = self
1579 .metadata_manager
1580 .list_active_serving_compute_nodes()
1581 .await?;
1582 let streaming_parallelisms = self
1583 .metadata_manager
1584 .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1585 .await?;
1586 let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1587 let max_serving_parallelism = self
1588 .env
1589 .session_params_manager_impl_ref()
1590 .get_params()
1591 .await
1592 .batch_parallelism()
1593 .map(|p| p.get());
1594 let (upserted, failed) = serving_worker_slot_mapping.upsert(
1595 streaming_parallelisms,
1596 &workers,
1597 max_serving_parallelism,
1598 );
1599 if !upserted.is_empty() {
1600 tracing::debug!(
1601 "Update serving vnode mapping for fragments {:?}.",
1602 upserted.keys()
1603 );
1604 self.env
1605 .notification_manager()
1606 .notify_frontend_without_version(
1607 Operation::Update,
1608 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1609 mappings: to_fragment_worker_slot_mapping(&upserted),
1610 }),
1611 );
1612 }
1613 if !failed.is_empty() {
1614 tracing::debug!(
1615 "Fail to update serving vnode mapping for fragments {:?}.",
1616 failed
1617 );
1618 self.env
1619 .notification_manager()
1620 .notify_frontend_without_version(
1621 Operation::Delete,
1622 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1623 mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1624 }),
1625 );
1626 }
1627 }
1628
1629 let mut stream_source_actor_splits = HashMap::new();
1630 let mut stream_source_dropped_actors = HashSet::new();
1631
1632 for (fragment_id, reschedule) in reschedules {
1634 if !reschedule.actor_splits.is_empty() {
1635 stream_source_actor_splits
1636 .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1637 stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1638 }
1639 }
1640
1641 if !stream_source_actor_splits.is_empty() {
1642 self.source_manager
1643 .apply_source_change(SourceChange::Reschedule {
1644 split_assignment: stream_source_actor_splits,
1645 dropped_actors: stream_source_dropped_actors,
1646 })
1647 .await;
1648 }
1649
1650 Ok(())
1651 }
1652
1653 pub async fn generate_job_reschedule_plan(
1654 &self,
1655 policy: JobReschedulePolicy,
1656 ) -> MetaResult<JobReschedulePlan> {
1657 type VnodeCount = usize;
1658
1659 let JobReschedulePolicy { targets } = policy;
1660
1661 let workers = self
1662 .metadata_manager
1663 .list_active_streaming_compute_nodes()
1664 .await?;
1665
1666 let workers: HashMap<_, _> = workers
1668 .into_iter()
1669 .filter(|worker| worker.is_streaming_schedulable())
1670 .map(|worker| (worker.id, worker))
1671 .collect();
1672
1673 #[derive(Debug)]
1674 struct JobUpdate {
1675 filtered_worker_ids: BTreeSet<WorkerId>,
1676 parallelism: TableParallelism,
1677 }
1678
1679 let mut job_parallelism_updates = HashMap::new();
1680
1681 let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1682 parallelism_updates: Default::default(),
1683 resource_group_updates: Default::default(),
1684 };
1685
1686 for (
1687 job_id,
1688 JobRescheduleTarget {
1689 parallelism: parallelism_update,
1690 resource_group: resource_group_update,
1691 },
1692 ) in &targets
1693 {
1694 let parallelism = match parallelism_update {
1695 JobParallelismTarget::Update(parallelism) => *parallelism,
1696 JobParallelismTarget::Refresh => {
1697 let parallelism = self
1698 .metadata_manager
1699 .catalog_controller
1700 .get_job_streaming_parallelisms(*job_id as _)
1701 .await?;
1702
1703 parallelism.into()
1704 }
1705 };
1706
1707 job_reschedule_post_updates
1708 .parallelism_updates
1709 .insert(TableId::from(*job_id), parallelism);
1710
1711 let current_resource_group = match resource_group_update {
1712 JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1713 job_reschedule_post_updates.resource_group_updates.insert(
1714 *job_id as ObjectId,
1715 Some(specific_resource_group.to_owned()),
1716 );
1717
1718 specific_resource_group.to_owned()
1719 }
1720 JobResourceGroupTarget::Update(None) => {
1721 let database_resource_group = self
1722 .metadata_manager
1723 .catalog_controller
1724 .get_existing_job_database_resource_group(*job_id as _)
1725 .await?;
1726
1727 job_reschedule_post_updates
1728 .resource_group_updates
1729 .insert(*job_id as ObjectId, None);
1730 database_resource_group
1731 }
1732 JobResourceGroupTarget::Keep => {
1733 self.metadata_manager
1734 .catalog_controller
1735 .get_existing_job_resource_group(*job_id as _)
1736 .await?
1737 }
1738 };
1739
1740 let filtered_worker_ids =
1741 filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1742
1743 if filtered_worker_ids.is_empty() {
1744 bail!("Cannot resize streaming_job {job_id} to empty worker set")
1745 }
1746
1747 job_parallelism_updates.insert(
1748 *job_id,
1749 JobUpdate {
1750 filtered_worker_ids,
1751 parallelism,
1752 },
1753 );
1754 }
1755
1756 let mut no_shuffle_source_fragment_ids = HashSet::new();
1758 let mut no_shuffle_target_fragment_ids = HashSet::new();
1759
1760 let mut fragment_distribution_map = HashMap::new();
1762 let mut actor_location = HashMap::new();
1764 let mut table_fragment_id_map = HashMap::new();
1766 let mut fragment_actor_id_map = HashMap::new();
1768
1769 async fn build_index(
1770 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1771 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1772 fragment_distribution_map: &mut HashMap<
1773 FragmentId,
1774 (FragmentDistributionType, VnodeCount),
1775 >,
1776 actor_location: &mut HashMap<ActorId, WorkerId>,
1777 table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1778 fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1779 mgr: &MetadataManager,
1780 table_ids: Vec<ObjectId>,
1781 ) -> Result<(), MetaError> {
1782 let RescheduleWorkingSet {
1783 fragments,
1784 actors,
1785 actor_dispatchers: _actor_dispatchers,
1786 fragment_downstreams,
1787 fragment_upstreams: _fragment_upstreams,
1788 related_jobs: _related_jobs,
1789 job_resource_groups: _job_resource_groups,
1790 } = mgr
1791 .catalog_controller
1792 .resolve_working_set_for_reschedule_tables(table_ids)
1793 .await?;
1794
1795 for (fragment_id, downstreams) in fragment_downstreams {
1796 for (downstream_fragment_id, dispatcher_type) in downstreams {
1797 if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1798 no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1799 no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1800 }
1801 }
1802 }
1803
1804 for (fragment_id, fragment) in fragments {
1805 fragment_distribution_map.insert(
1806 fragment_id as FragmentId,
1807 (
1808 FragmentDistributionType::from(fragment.distribution_type),
1809 fragment.vnode_count as _,
1810 ),
1811 );
1812
1813 table_fragment_id_map
1814 .entry(fragment.job_id as u32)
1815 .or_default()
1816 .insert(fragment_id as FragmentId);
1817 }
1818
1819 for (actor_id, actor) in actors {
1820 actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1821 fragment_actor_id_map
1822 .entry(actor.fragment_id as FragmentId)
1823 .or_default()
1824 .insert(actor_id as ActorId);
1825 }
1826
1827 Ok(())
1828 }
1829
1830 let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1831
1832 build_index(
1833 &mut no_shuffle_source_fragment_ids,
1834 &mut no_shuffle_target_fragment_ids,
1835 &mut fragment_distribution_map,
1836 &mut actor_location,
1837 &mut table_fragment_id_map,
1838 &mut fragment_actor_id_map,
1839 &self.metadata_manager,
1840 table_ids,
1841 )
1842 .await?;
1843 tracing::debug!(
1844 ?job_reschedule_post_updates,
1845 ?job_parallelism_updates,
1846 ?no_shuffle_source_fragment_ids,
1847 ?no_shuffle_target_fragment_ids,
1848 ?fragment_distribution_map,
1849 ?actor_location,
1850 ?table_fragment_id_map,
1851 ?fragment_actor_id_map,
1852 "generate_table_resize_plan, after build_index"
1853 );
1854
1855 let adaptive_parallelism_strategy = self
1856 .env
1857 .system_params_reader()
1858 .await
1859 .adaptive_parallelism_strategy();
1860
1861 let mut target_plan = HashMap::new();
1862
1863 for (
1864 table_id,
1865 JobUpdate {
1866 filtered_worker_ids,
1867 parallelism,
1868 },
1869 ) in job_parallelism_updates
1870 {
1871 let assigner = AssignerBuilder::new(table_id).build();
1872
1873 let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1874
1875 let available_worker_slots = workers
1876 .iter()
1877 .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1878 .map(|(_, worker)| {
1879 (
1880 worker.id as WorkerId,
1881 NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1882 )
1883 })
1884 .collect::<BTreeMap<_, _>>();
1885
1886 for fragment_id in fragment_map {
1887 if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1889 continue;
1890 }
1891
1892 let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1893
1894 for actor_id in &fragment_actor_id_map[&fragment_id] {
1895 let worker_id = actor_location[actor_id];
1896 *fragment_slots.entry(worker_id).or_default() += 1;
1897 }
1898
1899 let available_slot_count: usize = available_worker_slots
1900 .values()
1901 .cloned()
1902 .map(NonZeroUsize::get)
1903 .sum();
1904
1905 if available_slot_count == 0 {
1906 bail!(
1907 "No schedulable slots available for fragment {}",
1908 fragment_id
1909 );
1910 }
1911
1912 let (dist, vnode_count) = fragment_distribution_map[&fragment_id];
1913 let max_parallelism = vnode_count;
1914
1915 match dist {
1916 FragmentDistributionType::Unspecified => unreachable!(),
1917 FragmentDistributionType::Single => {
1918 let (single_worker_id, should_be_one) = fragment_slots
1919 .iter()
1920 .exactly_one()
1921 .expect("single fragment should have only one worker slot");
1922
1923 assert_eq!(*should_be_one, 1);
1924
1925 let assignment =
1926 assigner.count_actors_per_worker(&available_worker_slots, 1);
1927
1928 let (chosen_target_worker_id, should_be_one) =
1929 assignment.iter().exactly_one().ok().with_context(|| {
1930 format!(
1931 "Cannot find a single target worker for fragment {fragment_id}"
1932 )
1933 })?;
1934
1935 assert_eq!(*should_be_one, 1);
1936
1937 if *chosen_target_worker_id == *single_worker_id {
1938 tracing::debug!(
1939 "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1940 );
1941 continue;
1942 }
1943
1944 target_plan.insert(
1945 fragment_id,
1946 WorkerReschedule {
1947 worker_actor_diff: BTreeMap::from_iter(vec![
1948 (*chosen_target_worker_id, 1),
1949 (*single_worker_id, -1),
1950 ]),
1951 },
1952 );
1953 }
1954 FragmentDistributionType::Hash => match parallelism {
1955 TableParallelism::Adaptive => {
1956 let target_slot_count = adaptive_parallelism_strategy
1957 .compute_target_parallelism(available_slot_count);
1958
1959 if target_slot_count > max_parallelism {
1960 tracing::warn!(
1961 "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1962 );
1963
1964 let target_worker_slots = assigner.count_actors_per_worker(
1965 &available_worker_slots,
1966 max_parallelism,
1967 );
1968
1969 target_plan.insert(
1970 fragment_id,
1971 Self::diff_worker_slot_changes(
1972 &fragment_slots,
1973 &target_worker_slots,
1974 ),
1975 );
1976 } else if available_slot_count != target_slot_count {
1977 tracing::info!(
1978 "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
1979 );
1980
1981 let target_worker_slots = assigner.count_actors_per_worker(
1982 &available_worker_slots,
1983 target_slot_count,
1984 );
1985
1986 target_plan.insert(
1987 fragment_id,
1988 Self::diff_worker_slot_changes(
1989 &fragment_slots,
1990 &target_worker_slots,
1991 ),
1992 );
1993 } else {
1994 let available_worker_slots = available_worker_slots
1995 .iter()
1996 .map(|(worker_id, v)| (*worker_id, v.get()))
1997 .collect();
1998
1999 target_plan.insert(
2000 fragment_id,
2001 Self::diff_worker_slot_changes(
2002 &fragment_slots,
2003 &available_worker_slots,
2004 ),
2005 );
2006 }
2007 }
2008 TableParallelism::Fixed(mut n) => {
2009 if n > max_parallelism {
2010 tracing::warn!(
2011 "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2012 );
2013 n = max_parallelism
2014 }
2015
2016 let target_worker_slots =
2017 assigner.count_actors_per_worker(&available_worker_slots, n);
2018
2019 target_plan.insert(
2020 fragment_id,
2021 Self::diff_worker_slot_changes(
2022 &fragment_slots,
2023 &target_worker_slots,
2024 ),
2025 );
2026 }
2027 TableParallelism::Custom => {
2028 }
2030 },
2031 }
2032 }
2033 }
2034
2035 target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2036 tracing::debug!(
2037 ?target_plan,
2038 "generate_table_resize_plan finished target_plan"
2039 );
2040
2041 Ok(JobReschedulePlan {
2042 reschedules: target_plan,
2043 post_updates: job_reschedule_post_updates,
2044 })
2045 }
2046
2047 fn diff_worker_slot_changes(
2048 fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2049 target_worker_slots: &BTreeMap<WorkerId, usize>,
2050 ) -> WorkerReschedule {
2051 let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2052 let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2053
2054 for (&worker_id, &target_slots) in target_worker_slots {
2055 let ¤t_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2056
2057 if target_slots > current_slots {
2058 increased_actor_count.insert(worker_id, target_slots - current_slots);
2059 }
2060 }
2061
2062 for (&worker_id, ¤t_slots) in fragment_worker_slots {
2063 let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2064
2065 if current_slots > target_slots {
2066 decreased_actor_count.insert(worker_id, current_slots - target_slots);
2067 }
2068 }
2069
2070 let worker_ids: HashSet<_> = increased_actor_count
2071 .keys()
2072 .chain(decreased_actor_count.keys())
2073 .cloned()
2074 .collect();
2075
2076 let mut worker_actor_diff = BTreeMap::new();
2077
2078 for worker_id in worker_ids {
2079 let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2080 let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2081 let change = increased - decreased;
2082
2083 assert_ne!(change, 0);
2084
2085 worker_actor_diff.insert(worker_id, change);
2086 }
2087
2088 WorkerReschedule { worker_actor_diff }
2089 }
2090
2091 fn build_no_shuffle_relation_index(
2092 actor_map: &HashMap<ActorId, CustomActorInfo>,
2093 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2094 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2095 ) {
2096 let mut fragment_cache = HashSet::new();
2097 for actor in actor_map.values() {
2098 if fragment_cache.contains(&actor.fragment_id) {
2099 continue;
2100 }
2101
2102 for dispatcher in &actor.dispatcher {
2103 for downstream_actor_id in &dispatcher.downstream_actor_id {
2104 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2105 if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2107 no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2108 no_shuffle_target_fragment_ids
2109 .insert(downstream_actor.fragment_id as FragmentId);
2110 }
2111 }
2112 }
2113 }
2114
2115 fragment_cache.insert(actor.fragment_id);
2116 }
2117 }
2118
2119 fn build_fragment_dispatcher_index(
2120 actor_map: &HashMap<ActorId, CustomActorInfo>,
2121 fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2122 ) {
2123 for actor in actor_map.values() {
2124 for dispatcher in &actor.dispatcher {
2125 for downstream_actor_id in &dispatcher.downstream_actor_id {
2126 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2127 fragment_dispatcher_map
2128 .entry(actor.fragment_id as FragmentId)
2129 .or_default()
2130 .insert(
2131 downstream_actor.fragment_id as FragmentId,
2132 dispatcher.r#type().into(),
2133 );
2134 }
2135 }
2136 }
2137 }
2138 }
2139
2140 pub fn resolve_no_shuffle_upstream_tables(
2141 fragment_ids: HashSet<FragmentId>,
2142 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2143 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2144 fragment_to_table: &HashMap<FragmentId, TableId>,
2145 fragment_upstreams: &HashMap<
2146 risingwave_meta_model::FragmentId,
2147 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2148 >,
2149 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2150 ) -> MetaResult<()> {
2151 let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2152
2153 let mut fragment_ids = fragment_ids;
2154
2155 while let Some(fragment_id) = queue.pop_front() {
2158 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2159 continue;
2160 }
2161
2162 for upstream_fragment_id in fragment_upstreams
2164 .get(&(fragment_id as _))
2165 .map(|upstreams| upstreams.keys())
2166 .into_iter()
2167 .flatten()
2168 {
2169 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2170 let upstream_fragment_id = &upstream_fragment_id;
2171 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2172 continue;
2173 }
2174
2175 let table_id = &fragment_to_table[&fragment_id];
2176 let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2177
2178 if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2180 if let Some(upstream_table_parallelism) =
2181 table_parallelisms.get(upstream_table_id)
2182 {
2183 if upstream_table_parallelism != &TableParallelism::Custom {
2184 bail!(
2185 "Cannot change upstream table {} from {:?} to {:?}",
2186 upstream_table_id,
2187 upstream_table_parallelism,
2188 TableParallelism::Custom
2189 )
2190 }
2191 } else {
2192 table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2193 }
2194 }
2195
2196 fragment_ids.insert(*upstream_fragment_id);
2197 queue.push_back(*upstream_fragment_id);
2198 }
2199 }
2200
2201 let downstream_fragment_ids = fragment_ids
2202 .iter()
2203 .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2204
2205 let downstream_table_ids = downstream_fragment_ids
2206 .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2207 .collect::<HashSet<_>>();
2208
2209 table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2210
2211 Ok(())
2212 }
2213
2214 pub fn resolve_no_shuffle_upstream_fragments<T>(
2215 reschedule: &mut HashMap<FragmentId, T>,
2216 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2217 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2218 fragment_upstreams: &HashMap<
2219 risingwave_meta_model::FragmentId,
2220 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2221 >,
2222 ) -> MetaResult<()>
2223 where
2224 T: Clone + Eq,
2225 {
2226 let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2227
2228 while let Some(fragment_id) = queue.pop_front() {
2231 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2232 continue;
2233 }
2234
2235 for upstream_fragment_id in fragment_upstreams
2237 .get(&(fragment_id as _))
2238 .map(|upstreams| upstreams.keys())
2239 .into_iter()
2240 .flatten()
2241 {
2242 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2243 let upstream_fragment_id = &upstream_fragment_id;
2244 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2245 continue;
2246 }
2247
2248 let reschedule_plan = &reschedule[&fragment_id];
2249
2250 if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2251 if upstream_reschedule_plan != reschedule_plan {
2252 bail!(
2253 "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2254 fragment_id,
2255 upstream_fragment_id
2256 );
2257 }
2258
2259 continue;
2260 }
2261
2262 reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2263
2264 queue.push_back(*upstream_fragment_id);
2265 }
2266 }
2267
2268 reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2269
2270 Ok(())
2271 }
2272
2273 pub async fn resolve_related_no_shuffle_jobs(
2274 &self,
2275 jobs: &[TableId],
2276 ) -> MetaResult<HashSet<TableId>> {
2277 let RescheduleWorkingSet { related_jobs, .. } = self
2278 .metadata_manager
2279 .catalog_controller
2280 .resolve_working_set_for_reschedule_tables(
2281 jobs.iter().map(|id| id.table_id as _).collect(),
2282 )
2283 .await?;
2284
2285 Ok(related_jobs
2286 .keys()
2287 .map(|id| TableId::new(*id as _))
2288 .collect())
2289 }
2290}
2291
2292#[derive(Debug, Clone)]
2293pub enum JobParallelismTarget {
2294 Update(TableParallelism),
2295 Refresh,
2296}
2297
2298#[derive(Debug, Clone)]
2299pub enum JobResourceGroupTarget {
2300 Update(Option<String>),
2301 Keep,
2302}
2303
2304#[derive(Debug, Clone)]
2305pub struct JobRescheduleTarget {
2306 pub parallelism: JobParallelismTarget,
2307 pub resource_group: JobResourceGroupTarget,
2308}
2309
2310#[derive(Debug)]
2311pub struct JobReschedulePolicy {
2312 pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2313}
2314
2315#[derive(Debug, Clone)]
2317pub struct JobReschedulePostUpdates {
2318 pub parallelism_updates: HashMap<TableId, TableParallelism>,
2319 pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2320}
2321
2322#[derive(Debug)]
2323pub struct JobReschedulePlan {
2324 pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2325 pub post_updates: JobReschedulePostUpdates,
2326}
2327
2328impl GlobalStreamManager {
2329 #[await_tree::instrument("acquire_reschedule_read_guard")]
2330 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2331 self.scale_controller.reschedule_lock.read().await
2332 }
2333
2334 #[await_tree::instrument("acquire_reschedule_write_guard")]
2335 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2336 self.scale_controller.reschedule_lock.write().await
2337 }
2338
2339 pub async fn reschedule_actors(
2347 &self,
2348 database_id: DatabaseId,
2349 plan: JobReschedulePlan,
2350 options: RescheduleOptions,
2351 ) -> MetaResult<()> {
2352 let JobReschedulePlan {
2353 reschedules,
2354 mut post_updates,
2355 } = plan;
2356
2357 let reschedule_fragment = self
2358 .scale_controller
2359 .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2360 .await?;
2361
2362 tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2363
2364 let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2365 .iter()
2366 .flat_map(|(_, reschedule)| {
2367 reschedule
2368 .upstream_fragment_dispatcher_ids
2369 .iter()
2370 .map(|(fragment_id, _)| *fragment_id)
2371 .chain(reschedule.downstream_fragment_ids.iter().cloned())
2372 })
2373 .collect();
2374
2375 let fragment_actors =
2376 try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2377 let actor_ids = self
2378 .metadata_manager
2379 .get_running_actors_of_fragment(*fragment_id)
2380 .await?;
2381 Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2382 }))
2383 .await?
2384 .into_iter()
2385 .collect();
2386
2387 let command = Command::RescheduleFragment {
2388 reschedules: reschedule_fragment,
2389 fragment_actors,
2390 post_updates,
2391 };
2392
2393 let _guard = self.source_manager.pause_tick().await;
2394
2395 self.barrier_scheduler
2396 .run_command(database_id, command)
2397 .await?;
2398
2399 tracing::info!("reschedule done");
2400
2401 Ok(())
2402 }
2403
2404 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2413 tracing::info!("trigger parallelism control");
2414
2415 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2416
2417 let background_streaming_jobs = self
2418 .metadata_manager
2419 .list_background_creating_jobs()
2420 .await?;
2421
2422 let skipped_jobs = if !background_streaming_jobs.is_empty() {
2423 let jobs = self
2424 .scale_controller
2425 .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2426 .await?;
2427
2428 tracing::info!(
2429 "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2430 background_streaming_jobs,
2431 jobs
2432 );
2433
2434 jobs
2435 } else {
2436 HashSet::new()
2437 };
2438
2439 let job_ids: HashSet<_> = {
2440 let streaming_parallelisms = self
2441 .metadata_manager
2442 .catalog_controller
2443 .get_all_streaming_parallelisms()
2444 .await?;
2445
2446 streaming_parallelisms
2447 .into_iter()
2448 .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2449 .map(|(table_id, _)| table_id)
2450 .collect()
2451 };
2452
2453 let workers = self
2454 .metadata_manager
2455 .cluster_controller
2456 .list_active_streaming_workers()
2457 .await?;
2458
2459 let schedulable_worker_ids: BTreeSet<_> = workers
2460 .iter()
2461 .filter(|worker| {
2462 !worker
2463 .property
2464 .as_ref()
2465 .map(|p| p.is_unschedulable)
2466 .unwrap_or(false)
2467 })
2468 .map(|worker| worker.id as WorkerId)
2469 .collect();
2470
2471 if job_ids.is_empty() {
2472 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2473 return Ok(false);
2474 }
2475
2476 let batch_size = match self.env.opts.parallelism_control_batch_size {
2477 0 => job_ids.len(),
2478 n => n,
2479 };
2480
2481 tracing::info!(
2482 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2483 job_ids.len(),
2484 batch_size,
2485 schedulable_worker_ids
2486 );
2487
2488 let batches: Vec<_> = job_ids
2489 .into_iter()
2490 .chunks(batch_size)
2491 .into_iter()
2492 .map(|chunk| chunk.collect_vec())
2493 .collect();
2494
2495 let mut reschedules = None;
2496
2497 for batch in batches {
2498 let targets: HashMap<_, _> = batch
2499 .into_iter()
2500 .map(|job_id| {
2501 (
2502 job_id as u32,
2503 JobRescheduleTarget {
2504 parallelism: JobParallelismTarget::Refresh,
2505 resource_group: JobResourceGroupTarget::Keep,
2506 },
2507 )
2508 })
2509 .collect();
2510
2511 let plan = self
2512 .scale_controller
2513 .generate_job_reschedule_plan(JobReschedulePolicy { targets })
2514 .await?;
2515
2516 if !plan.reschedules.is_empty() {
2517 tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2518 reschedules = Some(plan);
2519 break;
2520 }
2521 }
2522
2523 let Some(plan) = reschedules else {
2524 tracing::info!("no reschedule plan generated");
2525 return Ok(false);
2526 };
2527
2528 for (database_id, reschedules) in self
2530 .metadata_manager
2531 .split_fragment_map_by_database(plan.reschedules)
2532 .await?
2533 {
2534 self.reschedule_actors(
2535 database_id,
2536 JobReschedulePlan {
2537 reschedules,
2538 post_updates: plan.post_updates.clone(),
2539 },
2540 RescheduleOptions {
2541 resolve_no_shuffle_upstream: false,
2542 skip_create_new_actors: false,
2543 },
2544 )
2545 .await?;
2546 }
2547
2548 Ok(true)
2549 }
2550
2551 async fn run(&self, mut shutdown_rx: Receiver<()>) {
2553 tracing::info!("starting automatic parallelism control monitor");
2554
2555 let check_period =
2556 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2557
2558 let mut ticker = tokio::time::interval_at(
2559 Instant::now()
2560 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2561 check_period,
2562 );
2563 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2564
2565 ticker.tick().await;
2567
2568 let (local_notification_tx, mut local_notification_rx) =
2569 tokio::sync::mpsc::unbounded_channel();
2570
2571 self.env
2572 .notification_manager()
2573 .insert_local_sender(local_notification_tx)
2574 .await;
2575
2576 let worker_nodes = self
2577 .metadata_manager
2578 .list_active_streaming_compute_nodes()
2579 .await
2580 .expect("list active streaming compute nodes");
2581
2582 let mut worker_cache: BTreeMap<_, _> = worker_nodes
2583 .into_iter()
2584 .map(|worker| (worker.id, worker))
2585 .collect();
2586
2587 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2588
2589 let mut should_trigger = false;
2590
2591 loop {
2592 tokio::select! {
2593 biased;
2594
2595 _ = &mut shutdown_rx => {
2596 tracing::info!("Stream manager is stopped");
2597 break;
2598 }
2599
2600 _ = ticker.tick(), if should_trigger => {
2601 let include_workers = worker_cache.keys().copied().collect_vec();
2602
2603 if include_workers.is_empty() {
2604 tracing::debug!("no available worker nodes");
2605 should_trigger = false;
2606 continue;
2607 }
2608
2609 match self.trigger_parallelism_control().await {
2610 Ok(cont) => {
2611 should_trigger = cont;
2612 }
2613 Err(e) => {
2614 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2615 ticker.reset();
2616 }
2617 }
2618 }
2619
2620 notification = local_notification_rx.recv() => {
2621 let notification = notification.expect("local notification channel closed in loop of stream manager");
2622
2623 let worker_is_streaming_compute = |worker: &WorkerNode| {
2625 worker.get_type() == Ok(WorkerType::ComputeNode)
2626 && worker.property.as_ref().unwrap().is_streaming
2627 };
2628
2629 match notification {
2630 LocalNotification::SystemParamsChange(reader) => {
2631 let new_strategy = reader.adaptive_parallelism_strategy();
2632 if new_strategy != previous_adaptive_parallelism_strategy {
2633 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2634 should_trigger = true;
2635 previous_adaptive_parallelism_strategy = new_strategy;
2636 }
2637 }
2638 LocalNotification::WorkerNodeActivated(worker) => {
2639 if !worker_is_streaming_compute(&worker) {
2640 continue;
2641 }
2642
2643 tracing::info!(worker = worker.id, "worker activated notification received");
2644
2645 let prev_worker = worker_cache.insert(worker.id, worker.clone());
2646
2647 match prev_worker {
2648 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
2649 tracing::info!(worker = worker.id, "worker parallelism changed");
2650 should_trigger = true;
2651 }
2652 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
2653 tracing::info!(worker = worker.id, "worker label changed");
2654 should_trigger = true;
2655 }
2656 None => {
2657 tracing::info!(worker = worker.id, "new worker joined");
2658 should_trigger = true;
2659 }
2660 _ => {}
2661 }
2662 }
2663
2664 LocalNotification::WorkerNodeDeleted(worker) => {
2667 if !worker_is_streaming_compute(&worker) {
2668 continue;
2669 }
2670
2671 match worker_cache.remove(&worker.id) {
2672 Some(prev_worker) => {
2673 tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2674 }
2675 None => {
2676 tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2677 }
2678 }
2679 }
2680
2681 _ => {}
2682 }
2683 }
2684 }
2685 }
2686 }
2687
2688 pub fn start_auto_parallelism_monitor(
2689 self: Arc<Self>,
2690 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2691 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2692 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2693 let join_handle = tokio::spawn(async move {
2694 self.run(shutdown_rx).await;
2695 });
2696
2697 (join_handle, shutdown_tx)
2698 }
2699}