1use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36 FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50 ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53 ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64 pub job_id: u32,
65 pub fragment_id: u32,
66 pub fragment_type_mask: FragmentTypeMask,
67 pub distribution_type: PbFragmentDistributionType,
68 pub state_table_ids: Vec<u32>,
69 pub node: StreamNode,
70 pub actor_template: StreamActorWithDispatchers,
71 pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76 pub actor_id: u32,
77 pub fragment_id: u32,
78 pub dispatcher: Vec<Dispatcher>,
79 pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use risingwave_common::system_param::AdaptiveParallelismStrategy;
85use risingwave_common::system_param::reader::SystemParamsRead;
86use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
87use risingwave_meta_model::DispatcherType;
88use risingwave_pb::stream_plan::stream_node::NodeBody;
89
90use crate::controller::id::IdCategory;
91use crate::controller::utils::filter_workers_by_resource_group;
92use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
93
94#[derive(Educe)]
96#[educe(Debug)]
97pub struct RescheduleContext {
98 #[educe(Debug(ignore))]
100 actor_map: HashMap<ActorId, CustomActorInfo>,
101 actor_status: BTreeMap<ActorId, WorkerId>,
103 #[educe(Debug(ignore))]
105 fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
106 stream_source_fragment_ids: HashSet<FragmentId>,
108 stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
110 no_shuffle_target_fragment_ids: HashSet<FragmentId>,
112 no_shuffle_source_fragment_ids: HashSet<FragmentId>,
114 fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
116 fragment_upstreams: HashMap<
117 risingwave_meta_model::FragmentId,
118 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
119 >,
120}
121
122impl RescheduleContext {
123 fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
124 self.actor_status
125 .get(actor_id)
126 .cloned()
127 .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
128 }
129}
130
131pub fn rebalance_actor_vnode(
158 actors: &[CustomActorInfo],
159 actors_to_remove: &BTreeSet<ActorId>,
160 actors_to_create: &BTreeSet<ActorId>,
161) -> HashMap<ActorId, Bitmap> {
162 let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
163
164 assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
165 assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
166
167 assert!(actors.len() >= actors_to_remove.len());
168
169 let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
170 assert!(target_actor_count > 0);
171
172 let vnode_count = actors[0]
174 .vnode_bitmap
175 .as_ref()
176 .expect("vnode bitmap unset")
177 .len();
178
179 #[derive(Debug)]
181 struct Balance {
182 actor_id: ActorId,
183 balance: i32,
184 builder: BitmapBuilder,
185 }
186 let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
187
188 tracing::debug!(
189 "expected {}, remain {}, prev actors {}, target actors {}",
190 expected,
191 remain,
192 actors.len(),
193 target_actor_count,
194 );
195
196 let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
197 .iter()
198 .map(|actor| {
199 (
200 actor.actor_id as ActorId,
201 actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
202 )
203 })
204 .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
205
206 let order_by_bitmap_desc =
207 |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
208 bitmap_a
209 .count_ones()
210 .cmp(&bitmap_b.count_ones())
211 .reverse()
212 .then(id_a.cmp(id_b))
213 };
214
215 let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
216 let mut builder = BitmapBuilder::default();
217 builder.append_bitmap(bitmap);
218 builder
219 };
220
221 let (prev_expected, _) = vnode_count.div_rem(&actors.len());
222
223 let prev_remain = removed
224 .iter()
225 .map(|(_, bitmap)| {
226 assert!(bitmap.count_ones() >= prev_expected);
227 bitmap.count_ones() - prev_expected
228 })
229 .sum::<usize>();
230
231 removed.sort_by(order_by_bitmap_desc);
232 rest.sort_by(order_by_bitmap_desc);
233
234 let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
235 actor_id,
236 balance: bitmap.count_ones() as i32,
237 builder: builder_from_bitmap(&bitmap),
238 });
239
240 let mut rest_balances = rest
241 .into_iter()
242 .map(|(actor_id, bitmap)| Balance {
243 actor_id,
244 balance: bitmap.count_ones() as i32 - expected as i32,
245 builder: builder_from_bitmap(&bitmap),
246 })
247 .collect_vec();
248
249 let mut created_balances = actors_to_create
250 .iter()
251 .map(|actor_id| Balance {
252 actor_id: *actor_id,
253 balance: -(expected as i32),
254 builder: BitmapBuilder::zeroed(vnode_count),
255 })
256 .collect_vec();
257
258 for balance in created_balances
259 .iter_mut()
260 .rev()
261 .take(prev_remain)
262 .chain(rest_balances.iter_mut())
263 {
264 if remain > 0 {
265 balance.balance -= 1;
266 remain -= 1;
267 }
268 }
269
270 for balance in &mut created_balances {
272 if remain > 0 {
273 balance.balance -= 1;
274 remain -= 1;
275 }
276 }
277
278 assert_eq!(remain, 0);
279
280 let mut v: VecDeque<_> = removed_balances
281 .chain(rest_balances)
282 .chain(created_balances)
283 .collect();
284
285 let mut result = HashMap::with_capacity(target_actor_count);
288
289 for balance in &v {
290 tracing::debug!(
291 "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
292 balance.actor_id,
293 balance.balance,
294 actors_to_remove.contains(&balance.actor_id),
295 actors_to_create.contains(&balance.actor_id)
296 );
297 }
298
299 while !v.is_empty() {
300 if v.len() == 1 {
301 let single = v.pop_front().unwrap();
302 assert_eq!(single.balance, 0);
303 if !actors_to_remove.contains(&single.actor_id) {
304 result.insert(single.actor_id, single.builder.finish());
305 }
306
307 continue;
308 }
309
310 let mut src = v.pop_front().unwrap();
311 let mut dst = v.pop_back().unwrap();
312
313 let n = min(abs(src.balance), abs(dst.balance));
314
315 let mut moved = 0;
316 for idx in (0..vnode_count).rev() {
317 if moved >= n {
318 break;
319 }
320
321 if src.builder.is_set(idx) {
322 src.builder.set(idx, false);
323 assert!(!dst.builder.is_set(idx));
324 dst.builder.set(idx, true);
325 moved += 1;
326 }
327 }
328
329 src.balance -= n;
330 dst.balance += n;
331
332 if src.balance != 0 {
333 v.push_front(src);
334 } else if !actors_to_remove.contains(&src.actor_id) {
335 result.insert(src.actor_id, src.builder.finish());
336 }
337
338 if dst.balance != 0 {
339 v.push_back(dst);
340 } else {
341 result.insert(dst.actor_id, dst.builder.finish());
342 }
343 }
344
345 result
346}
347
348#[derive(Debug, Clone, Copy)]
349pub struct RescheduleOptions {
350 pub resolve_no_shuffle_upstream: bool,
352
353 pub skip_create_new_actors: bool,
355}
356
357pub type ScaleControllerRef = Arc<ScaleController>;
358
359pub struct ScaleController {
360 pub metadata_manager: MetadataManager,
361
362 pub source_manager: SourceManagerRef,
363
364 pub env: MetaSrvEnv,
365
366 pub reschedule_lock: RwLock<()>,
369}
370
371impl ScaleController {
372 pub fn new(
373 metadata_manager: &MetadataManager,
374 source_manager: SourceManagerRef,
375 env: MetaSrvEnv,
376 ) -> Self {
377 Self {
378 metadata_manager: metadata_manager.clone(),
379 source_manager,
380 env,
381 reschedule_lock: RwLock::new(()),
382 }
383 }
384
385 pub async fn integrity_check(&self) -> MetaResult<()> {
386 self.metadata_manager
387 .catalog_controller
388 .integrity_check()
389 .await
390 }
391
392 async fn build_reschedule_context(
394 &self,
395 reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
396 options: RescheduleOptions,
397 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
398 ) -> MetaResult<RescheduleContext> {
399 let worker_nodes: HashMap<WorkerId, WorkerNode> = self
400 .metadata_manager
401 .list_active_streaming_compute_nodes()
402 .await?
403 .into_iter()
404 .map(|worker_node| (worker_node.id as _, worker_node))
405 .collect();
406
407 if worker_nodes.is_empty() {
408 bail!("no available compute node in the cluster");
409 }
410
411 let unschedulable_worker_ids: HashSet<_> = worker_nodes
413 .values()
414 .filter(|w| {
415 w.property
416 .as_ref()
417 .map(|property| property.is_unschedulable)
418 .unwrap_or(false)
419 })
420 .map(|worker| worker.id as WorkerId)
421 .collect();
422
423 for (fragment_id, reschedule) in &*reschedule {
424 for (worker_id, change) in &reschedule.worker_actor_diff {
425 if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
426 bail!(
427 "unable to move fragment {} to unschedulable worker {}",
428 fragment_id,
429 worker_id
430 );
431 }
432 }
433 }
434
435 let mut actor_map = HashMap::new();
438 let mut fragment_map = HashMap::new();
440 let mut actor_status = BTreeMap::new();
442 let mut fragment_state = HashMap::new();
443 let mut fragment_to_table = HashMap::new();
444
445 fn fulfill_index_by_fragment_ids(
446 actor_map: &mut HashMap<u32, CustomActorInfo>,
447 fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
448 actor_status: &mut BTreeMap<ActorId, WorkerId>,
449 fragment_state: &mut HashMap<FragmentId, State>,
450 fragment_to_table: &mut HashMap<FragmentId, TableId>,
451 fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
452 actors: HashMap<ActorId, actor::Model>,
453 mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
454 related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
455 ) {
456 let mut fragment_actors: HashMap<
457 risingwave_meta_model::FragmentId,
458 Vec<CustomActorInfo>,
459 > = HashMap::new();
460
461 let mut expr_contexts = HashMap::new();
462 for (
463 _,
464 actor::Model {
465 actor_id,
466 fragment_id,
467 status: _,
468 worker_id,
469 vnode_bitmap,
470 expr_context,
471 ..
472 },
473 ) in actors
474 {
475 let dispatchers = actor_dispatchers
476 .remove(&(actor_id as _))
477 .unwrap_or_default();
478
479 let actor_info = CustomActorInfo {
480 actor_id: actor_id as _,
481 fragment_id: fragment_id as _,
482 dispatcher: dispatchers,
483 vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
484 };
485
486 actor_map.insert(actor_id as _, actor_info.clone());
487
488 fragment_actors
489 .entry(fragment_id as _)
490 .or_default()
491 .push(actor_info);
492
493 actor_status.insert(actor_id as _, worker_id as WorkerId);
494
495 expr_contexts.insert(actor_id as u32, expr_context);
496 }
497
498 for (
499 _,
500 fragment::Model {
501 fragment_id,
502 job_id,
503 fragment_type_mask,
504 distribution_type,
505 stream_node,
506 state_table_ids,
507 ..
508 },
509 ) in fragments
510 {
511 let actors = fragment_actors
512 .remove(&(fragment_id as _))
513 .unwrap_or_default();
514
515 let CustomActorInfo {
516 actor_id,
517 fragment_id,
518 dispatcher,
519 vnode_bitmap,
520 } = actors.first().unwrap().clone();
521
522 let (related_job, job_definition) =
523 related_jobs.get(&job_id).expect("job not found");
524
525 let fragment = CustomFragmentInfo {
526 job_id: job_id as _,
527 fragment_id: fragment_id as _,
528 fragment_type_mask: fragment_type_mask.into(),
529 distribution_type: distribution_type.into(),
530 state_table_ids: state_table_ids.into_u32_array(),
531 node: stream_node.to_protobuf(),
532 actor_template: (
533 StreamActor {
534 actor_id,
535 fragment_id: fragment_id as _,
536 vnode_bitmap,
537 mview_definition: job_definition.to_owned(),
538 expr_context: expr_contexts
539 .get(&actor_id)
540 .cloned()
541 .map(|expr_context| expr_context.to_protobuf()),
542 },
543 dispatcher,
544 ),
545 actors,
546 };
547
548 fragment_map.insert(fragment_id as _, fragment);
549
550 fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
551
552 fragment_state.insert(
553 fragment_id,
554 table_fragments::PbState::from(related_job.job_status),
555 );
556 }
557 }
558 let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
559 let working_set = self
560 .metadata_manager
561 .catalog_controller
562 .resolve_working_set_for_reschedule_fragments(fragment_ids)
563 .await?;
564
565 fulfill_index_by_fragment_ids(
566 &mut actor_map,
567 &mut fragment_map,
568 &mut actor_status,
569 &mut fragment_state,
570 &mut fragment_to_table,
571 working_set.fragments,
572 working_set.actors,
573 working_set.actor_dispatchers,
574 working_set.related_jobs,
575 );
576
577 let mut no_shuffle_source_fragment_ids = HashSet::new();
579 let mut no_shuffle_target_fragment_ids = HashSet::new();
580
581 Self::build_no_shuffle_relation_index(
582 &actor_map,
583 &mut no_shuffle_source_fragment_ids,
584 &mut no_shuffle_target_fragment_ids,
585 );
586
587 if options.resolve_no_shuffle_upstream {
588 let original_reschedule_keys = reschedule.keys().cloned().collect();
589
590 Self::resolve_no_shuffle_upstream_fragments(
591 reschedule,
592 &no_shuffle_source_fragment_ids,
593 &no_shuffle_target_fragment_ids,
594 &working_set.fragment_upstreams,
595 )?;
596
597 if !table_parallelisms.is_empty() {
598 Self::resolve_no_shuffle_upstream_tables(
600 original_reschedule_keys,
601 &no_shuffle_source_fragment_ids,
602 &no_shuffle_target_fragment_ids,
603 &fragment_to_table,
604 &working_set.fragment_upstreams,
605 table_parallelisms,
606 )?;
607 }
608 }
609
610 let mut fragment_dispatcher_map = HashMap::new();
611 Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
612
613 let mut stream_source_fragment_ids = HashSet::new();
614 let mut stream_source_backfill_fragment_ids = HashMap::new();
615 let mut no_shuffle_reschedule = HashMap::new();
616 for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
617 let fragment = fragment_map
618 .get(fragment_id)
619 .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
620
621 match fragment_state[fragment_id] {
623 table_fragments::State::Unspecified => unreachable!(),
624 state @ table_fragments::State::Initial => {
625 bail!(
626 "the materialized view of fragment {fragment_id} is in state {}",
627 state.as_str_name()
628 )
629 }
630 state @ table_fragments::State::Creating => {
631 let stream_node = &fragment.node;
632
633 let mut is_reschedulable = true;
634 visit_stream_node_cont(stream_node, |body| {
635 if let Some(NodeBody::StreamScan(node)) = &body.node_body {
636 if !node.stream_scan_type().is_reschedulable() {
637 is_reschedulable = false;
638
639 return false;
641 }
642
643 return true;
645 }
646
647 true
649 });
650
651 if !is_reschedulable {
652 bail!(
653 "the materialized view of fragment {fragment_id} is in state {}",
654 state.as_str_name()
655 )
656 }
657 }
658 table_fragments::State::Created => {}
659 }
660
661 if no_shuffle_target_fragment_ids.contains(fragment_id) {
662 bail!(
663 "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
664 );
665 }
666
667 if no_shuffle_source_fragment_ids.contains(fragment_id) {
672 let mut queue: VecDeque<_> = fragment_dispatcher_map
674 .get(fragment_id)
675 .unwrap()
676 .keys()
677 .cloned()
678 .collect();
679
680 while let Some(downstream_id) = queue.pop_front() {
681 if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
682 continue;
683 }
684
685 if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
686 {
687 let no_shuffle_downstreams = downstream_fragments
688 .iter()
689 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
690 .map(|(fragment_id, _)| fragment_id);
691
692 queue.extend(no_shuffle_downstreams.copied());
693 }
694
695 no_shuffle_reschedule.insert(
696 downstream_id,
697 WorkerReschedule {
698 worker_actor_diff: worker_actor_diff.clone(),
699 },
700 );
701 }
702 }
703
704 if fragment
705 .fragment_type_mask
706 .contains(FragmentTypeFlag::Source)
707 && fragment.node.find_stream_source().is_some()
708 {
709 stream_source_fragment_ids.insert(*fragment_id);
710 }
711
712 let current_worker_ids = fragment
714 .actors
715 .iter()
716 .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
717 .collect::<HashSet<_>>();
718
719 for (removed, change) in worker_actor_diff {
720 if !current_worker_ids.contains(removed) && change.is_negative() {
721 bail!(
722 "no actor on the worker {} of fragment {}",
723 removed,
724 fragment_id
725 );
726 }
727 }
728
729 let added_actor_count: usize = worker_actor_diff
730 .values()
731 .filter(|change| change.is_positive())
732 .cloned()
733 .map(|change| change as usize)
734 .sum();
735
736 let removed_actor_count: usize = worker_actor_diff
737 .values()
738 .filter(|change| change.is_positive())
739 .cloned()
740 .map(|v| v.unsigned_abs())
741 .sum();
742
743 match fragment.distribution_type {
744 FragmentDistributionType::Hash => {
745 if fragment.actors.len() + added_actor_count <= removed_actor_count {
746 bail!("can't remove all actors from fragment {}", fragment_id);
747 }
748 }
749 FragmentDistributionType::Single => {
750 if added_actor_count != removed_actor_count {
751 bail!("single distribution fragment only support migration");
752 }
753 }
754 FragmentDistributionType::Unspecified => unreachable!(),
755 }
756 }
757
758 if !no_shuffle_reschedule.is_empty() {
759 tracing::info!(
760 "reschedule plan rewritten with NoShuffle reschedule {:?}",
761 no_shuffle_reschedule
762 );
763
764 for noshuffle_downstream in no_shuffle_reschedule.keys() {
765 let fragment = fragment_map.get(noshuffle_downstream).unwrap();
766 if fragment
768 .fragment_type_mask
769 .contains(FragmentTypeFlag::SourceScan)
770 {
771 let stream_node = &fragment.node;
772 if let Some((_source_id, upstream_source_fragment_id)) =
773 stream_node.find_source_backfill()
774 {
775 stream_source_backfill_fragment_ids
776 .insert(fragment.fragment_id, upstream_source_fragment_id);
777 }
778 }
779 }
780 }
781
782 reschedule.extend(no_shuffle_reschedule.into_iter());
784
785 Ok(RescheduleContext {
786 actor_map,
787 actor_status,
788 fragment_map,
789 stream_source_fragment_ids,
790 stream_source_backfill_fragment_ids,
791 no_shuffle_target_fragment_ids,
792 no_shuffle_source_fragment_ids,
793 fragment_dispatcher_map,
794 fragment_upstreams: working_set.fragment_upstreams,
795 })
796 }
797
798 pub(crate) async fn analyze_reschedule_plan(
810 &self,
811 mut reschedules: HashMap<FragmentId, WorkerReschedule>,
812 options: RescheduleOptions,
813 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
814 ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
815 tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
816 let ctx = self
817 .build_reschedule_context(&mut reschedules, options, table_parallelisms)
818 .await?;
819 tracing::debug!("reschedule context: {:#?}", ctx);
820 let reschedules = reschedules;
821
822 let (fragment_actors_to_remove, fragment_actors_to_create) =
827 self.arrange_reschedules(&reschedules, &ctx)?;
828
829 let mut fragment_actor_bitmap = HashMap::new();
830 for fragment_id in reschedules.keys() {
831 if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
832 continue;
835 }
836
837 let actors_to_create = fragment_actors_to_create
838 .get(fragment_id)
839 .map(|map| map.keys().copied().collect())
840 .unwrap_or_default();
841
842 let actors_to_remove = fragment_actors_to_remove
843 .get(fragment_id)
844 .map(|map| map.keys().copied().collect())
845 .unwrap_or_default();
846
847 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
848
849 match fragment.distribution_type {
850 FragmentDistributionType::Single => {
851 fragment_actor_bitmap
853 .insert(fragment.fragment_id as FragmentId, Default::default());
854 }
855 FragmentDistributionType::Hash => {
856 let actor_vnode = rebalance_actor_vnode(
857 &fragment.actors,
858 &actors_to_remove,
859 &actors_to_create,
860 );
861
862 fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
863 }
864
865 FragmentDistributionType::Unspecified => unreachable!(),
866 }
867 }
868
869 let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
873 for fragment_id in reschedules.keys() {
874 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
875 let mut new_actor_ids = BTreeMap::new();
876 for actor in &fragment.actors {
877 if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
878 && actors_to_remove.contains_key(&actor.actor_id)
879 {
880 continue;
881 }
882 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
883 new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
884 }
885
886 if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
887 for (actor_id, worker_id) in actors_to_create {
888 new_actor_ids.insert(*actor_id, *worker_id);
889 }
890 }
891
892 assert!(
893 !new_actor_ids.is_empty(),
894 "should be at least one actor in fragment {} after rescheduling",
895 fragment_id
896 );
897
898 fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
899 }
900
901 let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
902
903 fn arrange_no_shuffle_relation(
909 ctx: &RescheduleContext,
910 fragment_id: &FragmentId,
911 upstream_fragment_id: &FragmentId,
912 fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
913 actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
914 fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
915 no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
916 no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
917 ) {
918 if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
919 return;
920 }
921
922 let fragment = &ctx.fragment_map[fragment_id];
923
924 let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
925
926 for upstream_actor in &upstream_fragment.actors {
928 for dispatcher in &upstream_actor.dispatcher {
929 if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
930 let downstream_actor_id =
931 *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
932
933 if !ctx
935 .no_shuffle_target_fragment_ids
936 .contains(upstream_fragment_id)
937 {
938 actor_group_map.insert(
939 upstream_actor.actor_id,
940 (upstream_fragment.fragment_id, upstream_actor.actor_id),
941 );
942 actor_group_map.insert(
943 downstream_actor_id,
944 (upstream_fragment.fragment_id, upstream_actor.actor_id),
945 );
946 } else {
947 let root_actor_id = actor_group_map[&upstream_actor.actor_id];
948
949 actor_group_map.insert(downstream_actor_id, root_actor_id);
950 }
951 }
952 }
953 }
954
955 let upstream_fragment_bitmap = fragment_updated_bitmap
957 .get(upstream_fragment_id)
958 .cloned()
959 .unwrap_or_default();
960
961 if upstream_fragment.distribution_type == FragmentDistributionType::Single {
963 assert!(
964 upstream_fragment_bitmap.is_empty(),
965 "single fragment should have no bitmap updates"
966 );
967 }
968
969 let upstream_fragment_actor_map = fragment_actors_after_reschedule
970 .get(upstream_fragment_id)
971 .cloned()
972 .unwrap();
973
974 let fragment_actor_map = fragment_actors_after_reschedule
975 .get(fragment_id)
976 .cloned()
977 .unwrap();
978
979 let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
980
981 let mut fragment_bitmap = HashMap::new();
983
984 for (actor_id, worker_id) in &fragment_actor_map {
985 if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
986 let root_bitmap = fragment_updated_bitmap
987 .get(root_fragment)
988 .expect("root fragment bitmap not found")
989 .get(root_actor_id)
990 .cloned()
991 .expect("root actor bitmap not found");
992
993 fragment_bitmap.insert(*actor_id, root_bitmap);
995
996 no_shuffle_upstream_actor_map
997 .entry(*actor_id as ActorId)
998 .or_default()
999 .insert(*upstream_fragment_id, *root_actor_id);
1000 no_shuffle_downstream_actors_map
1001 .entry(*root_actor_id)
1002 .or_default()
1003 .insert(*fragment_id, *actor_id);
1004 } else {
1005 worker_reverse_index
1006 .entry(*worker_id)
1007 .or_default()
1008 .insert(*actor_id);
1009 }
1010 }
1011
1012 let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1013
1014 for (actor_id, worker_id) in &upstream_fragment_actor_map {
1015 if !actor_group_map.contains_key(actor_id) {
1016 upstream_worker_reverse_index
1017 .entry(*worker_id)
1018 .or_default()
1019 .insert(*actor_id);
1020 }
1021 }
1022
1023 for (worker_id, actor_ids) in worker_reverse_index {
1025 let upstream_actor_ids = upstream_worker_reverse_index
1026 .get(&worker_id)
1027 .unwrap()
1028 .clone();
1029
1030 assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1031
1032 for (actor_id, upstream_actor_id) in actor_ids
1033 .into_iter()
1034 .zip_eq_debug(upstream_actor_ids.into_iter())
1035 {
1036 match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1037 None => {
1038 assert_eq!(
1040 upstream_fragment.distribution_type,
1041 FragmentDistributionType::Single
1042 );
1043 }
1044 Some(bitmap) => {
1045 fragment_bitmap.insert(actor_id, bitmap);
1047 }
1048 }
1049
1050 no_shuffle_upstream_actor_map
1051 .entry(actor_id as ActorId)
1052 .or_default()
1053 .insert(*upstream_fragment_id, upstream_actor_id);
1054 no_shuffle_downstream_actors_map
1055 .entry(upstream_actor_id)
1056 .or_default()
1057 .insert(*fragment_id, actor_id);
1058 }
1059 }
1060
1061 match fragment.distribution_type {
1062 FragmentDistributionType::Hash => {}
1063 FragmentDistributionType::Single => {
1064 assert!(fragment_bitmap.is_empty());
1066 }
1067 FragmentDistributionType::Unspecified => unreachable!(),
1068 }
1069
1070 if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1071 assert_eq!(
1072 e.entry.get(),
1073 &e.value,
1074 "bitmaps derived from different no-shuffle upstreams mismatch"
1075 );
1076 }
1077
1078 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1080 let no_shuffle_downstreams = downstream_fragments
1081 .iter()
1082 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1083 .map(|(fragment_id, _)| fragment_id);
1084
1085 for downstream_fragment_id in no_shuffle_downstreams {
1086 arrange_no_shuffle_relation(
1087 ctx,
1088 downstream_fragment_id,
1089 fragment_id,
1090 fragment_actors_after_reschedule,
1091 actor_group_map,
1092 fragment_updated_bitmap,
1093 no_shuffle_upstream_actor_map,
1094 no_shuffle_downstream_actors_map,
1095 );
1096 }
1097 }
1098 }
1099
1100 let mut no_shuffle_upstream_actor_map = HashMap::new();
1101 let mut no_shuffle_downstream_actors_map = HashMap::new();
1102 let mut actor_group_map = HashMap::new();
1103 for fragment_id in reschedules.keys() {
1106 if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1107 && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1108 && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1109 {
1110 for downstream_fragment_id in downstream_fragments.keys() {
1111 arrange_no_shuffle_relation(
1112 &ctx,
1113 downstream_fragment_id,
1114 fragment_id,
1115 &fragment_actors_after_reschedule,
1116 &mut actor_group_map,
1117 &mut fragment_actor_bitmap,
1118 &mut no_shuffle_upstream_actor_map,
1119 &mut no_shuffle_downstream_actors_map,
1120 );
1121 }
1122 }
1123 }
1124
1125 tracing::debug!("actor group map {:?}", actor_group_map);
1126
1127 let mut new_created_actors = HashMap::new();
1128 for fragment_id in reschedules.keys() {
1129 let actors_to_create = fragment_actors_to_create
1130 .get(fragment_id)
1131 .cloned()
1132 .unwrap_or_default();
1133
1134 let fragment = &ctx.fragment_map[fragment_id];
1135
1136 assert!(!fragment.actors.is_empty());
1137
1138 for actor_to_create in &actors_to_create {
1139 let new_actor_id = actor_to_create.0;
1140 let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1141
1142 new_actor.actor_id = *new_actor_id;
1146
1147 Self::modify_actor_upstream_and_downstream(
1148 &ctx,
1149 &fragment_actors_to_remove,
1150 &fragment_actors_to_create,
1151 &fragment_actor_bitmap,
1152 &no_shuffle_downstream_actors_map,
1153 &mut new_actor,
1154 &mut dispatchers,
1155 )?;
1156
1157 if let Some(bitmap) = fragment_actor_bitmap
1158 .get(fragment_id)
1159 .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1160 {
1161 new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1162 }
1163
1164 new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1165 }
1166 }
1167
1168 let mut fragment_actor_splits = HashMap::new();
1171 for fragment_id in reschedules.keys() {
1172 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1173
1174 if ctx.stream_source_fragment_ids.contains(fragment_id) {
1175 let fragment = &ctx.fragment_map[fragment_id];
1176
1177 let prev_actor_ids = fragment
1178 .actors
1179 .iter()
1180 .map(|actor| actor.actor_id)
1181 .collect_vec();
1182
1183 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1184
1185 let actor_splits = self
1186 .env
1187 .shared_actor_infos()
1188 .migrate_splits_for_source_actors(
1189 *fragment_id,
1190 &prev_actor_ids,
1191 &curr_actor_ids,
1192 )?;
1193
1194 tracing::debug!(
1195 "source actor splits: {:?}, fragment_id: {}",
1196 actor_splits,
1197 fragment_id
1198 );
1199 fragment_actor_splits.insert(*fragment_id, actor_splits);
1200 }
1201 }
1202 if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1204 for fragment_id in reschedules.keys() {
1205 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1206
1207 if let Some(upstream_source_fragment_id) =
1208 ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1209 {
1210 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1211
1212 let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1213 *fragment_id,
1214 *upstream_source_fragment_id,
1215 &curr_actor_ids,
1216 &fragment_actor_splits,
1217 &no_shuffle_upstream_actor_map,
1218 )?;
1219 tracing::debug!(
1220 "source backfill actor splits: {:?}, fragment_id: {}",
1221 actor_splits,
1222 fragment_id
1223 );
1224 fragment_actor_splits.insert(*fragment_id, actor_splits);
1225 }
1226 }
1227 }
1228
1229 let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1231 HashMap::with_capacity(reschedules.len());
1232
1233 for (fragment_id, _) in reschedules {
1234 let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1235
1236 if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1237 for (actor_id, worker_id) in actor_worker_maps {
1238 actors_to_create
1239 .entry(worker_id)
1240 .or_default()
1241 .push(actor_id);
1242 }
1243 }
1244
1245 let actors_to_remove = fragment_actors_to_remove
1246 .get(&fragment_id)
1247 .cloned()
1248 .unwrap_or_default()
1249 .into_keys()
1250 .collect();
1251
1252 let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1253
1254 assert!(!actors_after_reschedule.is_empty());
1255
1256 let fragment = &ctx.fragment_map[&fragment_id];
1257
1258 let in_degree_types: HashSet<_> = ctx
1259 .fragment_upstreams
1260 .get(&(fragment_id as _))
1261 .map(|upstreams| upstreams.values())
1262 .into_iter()
1263 .flatten()
1264 .cloned()
1265 .collect();
1266
1267 let upstream_dispatcher_mapping = match fragment.distribution_type {
1268 FragmentDistributionType::Hash => {
1269 if !in_degree_types.contains(&DispatcherType::Hash) {
1270 None
1271 } else {
1272 Some(ActorMapping::from_bitmaps(
1274 &fragment_actor_bitmap[&fragment_id],
1275 ))
1276 }
1277 }
1278
1279 FragmentDistributionType::Single => {
1280 assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1281 None
1282 }
1283 FragmentDistributionType::Unspecified => unreachable!(),
1284 };
1285
1286 let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1287
1288 {
1289 if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1290 for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1291 match upstream_dispatcher_type {
1292 DispatcherType::NoShuffle => {}
1293 _ => {
1294 upstream_fragment_dispatcher_set.insert((
1295 *upstream_fragment_id as FragmentId,
1296 fragment.fragment_id as DispatcherId,
1297 ));
1298 }
1299 }
1300 }
1301 }
1302 }
1303
1304 let downstream_fragment_ids = if let Some(downstream_fragments) =
1305 ctx.fragment_dispatcher_map.get(&fragment_id)
1306 {
1307 downstream_fragments
1310 .iter()
1311 .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1312 .map(|(fragment_id, _)| *fragment_id)
1313 .collect_vec()
1314 } else {
1315 vec![]
1316 };
1317
1318 let vnode_bitmap_updates = match fragment.distribution_type {
1319 FragmentDistributionType::Hash => {
1320 let mut vnode_bitmap_updates =
1321 fragment_actor_bitmap.remove(&fragment_id).unwrap();
1322
1323 for actor_id in actors_after_reschedule.keys() {
1326 assert!(vnode_bitmap_updates.contains_key(actor_id));
1327
1328 if let Some(actor) = ctx.actor_map.get(actor_id) {
1330 let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1331
1332 if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1333 && prev_bitmap.eq(bitmap)
1334 {
1335 vnode_bitmap_updates.remove(actor_id);
1336 }
1337 }
1338 }
1339
1340 vnode_bitmap_updates
1341 }
1342 FragmentDistributionType::Single => HashMap::new(),
1343 FragmentDistributionType::Unspecified => unreachable!(),
1344 };
1345
1346 let upstream_fragment_dispatcher_ids =
1347 upstream_fragment_dispatcher_set.into_iter().collect_vec();
1348
1349 let actor_splits = fragment_actor_splits
1350 .get(&fragment_id)
1351 .cloned()
1352 .unwrap_or_default();
1353
1354 let mut cdc_table_id = None;
1355 let cdc_table_snapshot_split_assignment = if fragment
1356 .fragment_type_mask
1357 .contains(FragmentTypeFlag::StreamCdcScan)
1358 {
1359 cdc_table_id = Some(fragment.job_id);
1360 assign_cdc_table_snapshot_splits_impl(
1361 fragment.job_id,
1362 fragment_actors_after_reschedule
1363 .get(&fragment_id)
1364 .unwrap()
1365 .keys()
1366 .copied()
1367 .collect(),
1368 self.env.meta_store_ref(),
1369 None,
1370 )
1371 .await?
1372 } else {
1373 HashMap::default()
1374 };
1375
1376 reschedule_fragment.insert(
1377 fragment_id,
1378 Reschedule {
1379 added_actors: actors_to_create,
1380 removed_actors: actors_to_remove,
1381 vnode_bitmap_updates,
1382 upstream_fragment_dispatcher_ids,
1383 upstream_dispatcher_mapping,
1384 downstream_fragment_ids,
1385 actor_splits,
1386 newly_created_actors: Default::default(),
1387 cdc_table_snapshot_split_assignment,
1388 cdc_table_id,
1389 },
1390 );
1391 }
1392
1393 let mut fragment_created_actors = HashMap::new();
1394 for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1395 let mut created_actors = HashMap::new();
1396 for (actor_id, worker_id) in actors_to_create {
1397 let actor = new_created_actors.get(actor_id).cloned().unwrap();
1398 created_actors.insert(*actor_id, (actor, *worker_id));
1399 }
1400
1401 fragment_created_actors.insert(*fragment_id, created_actors);
1402 }
1403
1404 for (fragment_id, to_create) in fragment_created_actors {
1405 let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1406 reschedule.newly_created_actors = to_create;
1407 }
1408 tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1409
1410 Ok(reschedule_fragment)
1411 }
1412
1413 #[expect(clippy::type_complexity)]
1414 fn arrange_reschedules(
1415 &self,
1416 reschedule: &HashMap<FragmentId, WorkerReschedule>,
1417 ctx: &RescheduleContext,
1418 ) -> MetaResult<(
1419 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1420 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1421 )> {
1422 let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1423 let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1424
1425 for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1426 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1427
1428 let mut actors_to_remove = BTreeMap::new();
1430 let mut actors_to_create = BTreeMap::new();
1431
1432 let mut worker_to_actors = HashMap::new();
1434
1435 for actor in &fragment.actors {
1436 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1437 worker_to_actors
1438 .entry(worker_id)
1439 .or_insert(BTreeSet::new())
1440 .insert(actor.actor_id as ActorId);
1441 }
1442
1443 let decreased_actor_count = worker_actor_diff
1444 .iter()
1445 .filter(|(_, change)| change.is_negative())
1446 .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1447
1448 for (worker_id, n) in decreased_actor_count {
1449 if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1450 if actor_ids.len() < n {
1451 bail!(
1452 "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1453 fragment_id,
1454 worker_id,
1455 actor_ids.len(),
1456 n
1457 );
1458 }
1459
1460 let removed_actors: Vec<_> = actor_ids
1461 .iter()
1462 .skip(actor_ids.len().saturating_sub(n))
1463 .cloned()
1464 .collect();
1465
1466 for actor in removed_actors {
1467 actors_to_remove.insert(actor, *worker_id);
1468 }
1469 }
1470 }
1471
1472 let increased_actor_count = worker_actor_diff
1473 .iter()
1474 .filter(|(_, change)| change.is_positive());
1475
1476 for (worker, n) in increased_actor_count {
1477 for _ in 0..*n {
1478 let id = self
1479 .env
1480 .id_gen_manager()
1481 .generate_interval::<{ IdCategory::Actor }>(1)
1482 as ActorId;
1483 actors_to_create.insert(id, *worker);
1484 }
1485 }
1486
1487 if !actors_to_remove.is_empty() {
1488 fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1489 }
1490
1491 if !actors_to_create.is_empty() {
1492 fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1493 }
1494 }
1495
1496 for actors_to_remove in fragment_actors_to_remove.values() {
1498 for actor_id in actors_to_remove.keys() {
1499 let actor = ctx.actor_map.get(actor_id).unwrap();
1500 for dispatcher in &actor.dispatcher {
1501 if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1502 let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1503
1504 let _should_exists = fragment_actors_to_remove
1505 .get(&(dispatcher.dispatcher_id as FragmentId))
1506 .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1507 .get(downstream_actor_id)
1508 .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1509 }
1510 }
1511 }
1512 }
1513
1514 Ok((fragment_actors_to_remove, fragment_actors_to_create))
1515 }
1516
1517 fn modify_actor_upstream_and_downstream(
1520 ctx: &RescheduleContext,
1521 fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1522 fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1523 fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1524 no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1525 new_actor: &mut StreamActor,
1526 dispatchers: &mut Vec<PbDispatcher>,
1527 ) -> MetaResult<()> {
1528 for dispatcher in dispatchers {
1530 let downstream_fragment_id = dispatcher
1531 .downstream_actor_id
1532 .iter()
1533 .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1534 .dedup()
1535 .exactly_one()
1536 .unwrap() as FragmentId;
1537
1538 let downstream_fragment_actors_to_remove =
1539 fragment_actors_to_remove.get(&downstream_fragment_id);
1540 let downstream_fragment_actors_to_create =
1541 fragment_actors_to_create.get(&downstream_fragment_id);
1542
1543 match dispatcher.r#type() {
1544 d @ (PbDispatcherType::Hash
1545 | PbDispatcherType::Simple
1546 | PbDispatcherType::Broadcast) => {
1547 if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1548 {
1549 dispatcher
1550 .downstream_actor_id
1551 .retain(|id| !downstream_actors_to_remove.contains_key(id));
1552 }
1553
1554 if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1555 {
1556 dispatcher
1557 .downstream_actor_id
1558 .extend(downstream_actors_to_create.keys().cloned())
1559 }
1560
1561 if d == PbDispatcherType::Simple {
1563 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1564 }
1565 }
1566 PbDispatcherType::NoShuffle => {
1567 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1568 let downstream_actor_id = no_shuffle_downstream_actors_map
1569 .get(&new_actor.actor_id)
1570 .and_then(|map| map.get(&downstream_fragment_id))
1571 .unwrap();
1572 dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1573 }
1574 PbDispatcherType::Unspecified => unreachable!(),
1575 }
1576
1577 if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1578 && let Some(downstream_updated_bitmap) =
1579 fragment_actor_bitmap.get(&downstream_fragment_id)
1580 {
1581 *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1583 }
1584 }
1585
1586 Ok(())
1587 }
1588
1589 #[await_tree::instrument]
1590 pub async fn post_apply_reschedule(
1591 &self,
1592 reschedules: &HashMap<FragmentId, Reschedule>,
1593 post_updates: &JobReschedulePostUpdates,
1594 ) -> MetaResult<()> {
1595 self.metadata_manager
1597 .post_apply_reschedules(reschedules.clone(), post_updates)
1598 .await?;
1599
1600 if !reschedules.is_empty() {
1602 let workers = self
1603 .metadata_manager
1604 .list_active_serving_compute_nodes()
1605 .await?;
1606 let streaming_parallelisms = self
1607 .metadata_manager
1608 .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))?;
1609 let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1610 let max_serving_parallelism = self
1611 .env
1612 .session_params_manager_impl_ref()
1613 .get_params()
1614 .await
1615 .batch_parallelism()
1616 .map(|p| p.get());
1617 let (upserted, failed) = serving_worker_slot_mapping.upsert(
1618 streaming_parallelisms,
1619 &workers,
1620 max_serving_parallelism,
1621 );
1622 if !upserted.is_empty() {
1623 tracing::debug!(
1624 "Update serving vnode mapping for fragments {:?}.",
1625 upserted.keys()
1626 );
1627 self.env
1628 .notification_manager()
1629 .notify_frontend_without_version(
1630 Operation::Update,
1631 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1632 mappings: to_fragment_worker_slot_mapping(&upserted),
1633 }),
1634 );
1635 }
1636 if !failed.is_empty() {
1637 tracing::debug!(
1638 "Fail to update serving vnode mapping for fragments {:?}.",
1639 failed
1640 );
1641 self.env
1642 .notification_manager()
1643 .notify_frontend_without_version(
1644 Operation::Delete,
1645 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1646 mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1647 }),
1648 );
1649 }
1650 }
1651
1652 Ok(())
1653 }
1654
1655 pub async fn generate_job_reschedule_plan(
1656 &self,
1657 policy: JobReschedulePolicy,
1658 generate_plan_for_cdc_table_backfill: bool,
1659 ) -> MetaResult<JobReschedulePlan> {
1660 type VnodeCount = usize;
1661
1662 let JobReschedulePolicy { targets } = policy;
1663
1664 let workers = self
1665 .metadata_manager
1666 .list_active_streaming_compute_nodes()
1667 .await?;
1668
1669 let workers: HashMap<_, _> = workers
1671 .into_iter()
1672 .filter(|worker| worker.is_streaming_schedulable())
1673 .map(|worker| (worker.id, worker))
1674 .collect();
1675
1676 #[derive(Debug)]
1677 struct JobUpdate {
1678 filtered_worker_ids: BTreeSet<WorkerId>,
1679 parallelism: TableParallelism,
1680 }
1681
1682 let mut job_parallelism_updates = HashMap::new();
1683
1684 let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1685 parallelism_updates: Default::default(),
1686 resource_group_updates: Default::default(),
1687 };
1688
1689 for (
1690 job_id,
1691 JobRescheduleTarget {
1692 parallelism: parallelism_update,
1693 resource_group: resource_group_update,
1694 },
1695 ) in &targets
1696 {
1697 let parallelism = match parallelism_update {
1698 JobParallelismTarget::Update(parallelism) => *parallelism,
1699 JobParallelismTarget::Refresh => {
1700 let parallelism = self
1701 .metadata_manager
1702 .catalog_controller
1703 .get_job_streaming_parallelisms(*job_id as _)
1704 .await?;
1705
1706 parallelism.into()
1707 }
1708 };
1709
1710 job_reschedule_post_updates
1711 .parallelism_updates
1712 .insert(TableId::from(*job_id), parallelism);
1713
1714 let current_resource_group = match resource_group_update {
1715 JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1716 job_reschedule_post_updates.resource_group_updates.insert(
1717 *job_id as ObjectId,
1718 Some(specific_resource_group.to_owned()),
1719 );
1720
1721 specific_resource_group.to_owned()
1722 }
1723 JobResourceGroupTarget::Update(None) => {
1724 let database_resource_group = self
1725 .metadata_manager
1726 .catalog_controller
1727 .get_existing_job_database_resource_group(*job_id as _)
1728 .await?;
1729
1730 job_reschedule_post_updates
1731 .resource_group_updates
1732 .insert(*job_id as ObjectId, None);
1733 database_resource_group
1734 }
1735 JobResourceGroupTarget::Keep => {
1736 self.metadata_manager
1737 .catalog_controller
1738 .get_existing_job_resource_group(*job_id as _)
1739 .await?
1740 }
1741 };
1742
1743 let filtered_worker_ids =
1744 filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1745
1746 if filtered_worker_ids.is_empty() {
1747 bail!("Cannot resize streaming_job {job_id} to empty worker set")
1748 }
1749
1750 job_parallelism_updates.insert(
1751 *job_id,
1752 JobUpdate {
1753 filtered_worker_ids,
1754 parallelism,
1755 },
1756 );
1757 }
1758
1759 let mut no_shuffle_source_fragment_ids = HashSet::new();
1761 let mut no_shuffle_target_fragment_ids = HashSet::new();
1762
1763 let mut fragment_distribution_map = HashMap::new();
1765 let mut actor_location = HashMap::new();
1767 let mut table_fragment_id_map = HashMap::new();
1769 let mut fragment_actor_id_map = HashMap::new();
1771
1772 async fn build_index(
1773 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1774 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1775 fragment_distribution_map: &mut HashMap<
1776 FragmentId,
1777 (FragmentDistributionType, VnodeCount, bool),
1778 >,
1779 actor_location: &mut HashMap<ActorId, WorkerId>,
1780 table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1781 fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1782 mgr: &MetadataManager,
1783 table_ids: Vec<ObjectId>,
1784 generate_plan_only_for_cdc_table_backfill: bool,
1785 ) -> Result<(), MetaError> {
1786 let RescheduleWorkingSet {
1787 fragments,
1788 actors,
1789 actor_dispatchers: _actor_dispatchers,
1790 fragment_downstreams,
1791 fragment_upstreams: _fragment_upstreams,
1792 related_jobs: _related_jobs,
1793 job_resource_groups: _job_resource_groups,
1794 } = mgr
1795 .catalog_controller
1796 .resolve_working_set_for_reschedule_tables(table_ids)
1797 .await?;
1798
1799 for (fragment_id, downstreams) in fragment_downstreams {
1800 for (downstream_fragment_id, dispatcher_type) in downstreams {
1801 if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1802 no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1803 no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1804 }
1805 }
1806 }
1807
1808 for (fragment_id, fragment) in fragments {
1809 let is_cdc_backfill_v2_fragment =
1810 FragmentTypeMask::from(fragment.fragment_type_mask)
1811 .contains(FragmentTypeFlag::StreamCdcScan);
1812 if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1813 continue;
1814 }
1815 fragment_distribution_map.insert(
1816 fragment_id as FragmentId,
1817 (
1818 FragmentDistributionType::from(fragment.distribution_type),
1819 fragment.vnode_count as _,
1820 is_cdc_backfill_v2_fragment,
1821 ),
1822 );
1823
1824 table_fragment_id_map
1825 .entry(fragment.job_id as u32)
1826 .or_default()
1827 .insert(fragment_id as FragmentId);
1828 }
1829
1830 for (actor_id, actor) in actors {
1831 actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1832 fragment_actor_id_map
1833 .entry(actor.fragment_id as FragmentId)
1834 .or_default()
1835 .insert(actor_id as ActorId);
1836 }
1837
1838 Ok(())
1839 }
1840
1841 let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1842
1843 build_index(
1844 &mut no_shuffle_source_fragment_ids,
1845 &mut no_shuffle_target_fragment_ids,
1846 &mut fragment_distribution_map,
1847 &mut actor_location,
1848 &mut table_fragment_id_map,
1849 &mut fragment_actor_id_map,
1850 &self.metadata_manager,
1851 table_ids,
1852 generate_plan_for_cdc_table_backfill,
1853 )
1854 .await?;
1855 tracing::debug!(
1856 ?job_reschedule_post_updates,
1857 ?job_parallelism_updates,
1858 ?no_shuffle_source_fragment_ids,
1859 ?no_shuffle_target_fragment_ids,
1860 ?fragment_distribution_map,
1861 ?actor_location,
1862 ?table_fragment_id_map,
1863 ?fragment_actor_id_map,
1864 "generate_table_resize_plan, after build_index"
1865 );
1866
1867 let adaptive_parallelism_strategy = self
1868 .env
1869 .system_params_reader()
1870 .await
1871 .adaptive_parallelism_strategy();
1872
1873 let mut target_plan = HashMap::new();
1874
1875 for (
1876 table_id,
1877 JobUpdate {
1878 filtered_worker_ids,
1879 parallelism,
1880 },
1881 ) in job_parallelism_updates
1882 {
1883 let assigner = AssignerBuilder::new(table_id).build();
1884
1885 let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1886
1887 let available_worker_slots = workers
1888 .iter()
1889 .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1890 .map(|(_, worker)| {
1891 (
1892 worker.id as WorkerId,
1893 NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1894 )
1895 })
1896 .collect::<BTreeMap<_, _>>();
1897
1898 for fragment_id in fragment_map {
1899 if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1901 continue;
1902 }
1903
1904 let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1905
1906 for actor_id in &fragment_actor_id_map[&fragment_id] {
1907 let worker_id = actor_location[actor_id];
1908 *fragment_slots.entry(worker_id).or_default() += 1;
1909 }
1910
1911 let available_slot_count: usize = available_worker_slots
1912 .values()
1913 .cloned()
1914 .map(NonZeroUsize::get)
1915 .sum();
1916
1917 if available_slot_count == 0 {
1918 bail!(
1919 "No schedulable slots available for fragment {}",
1920 fragment_id
1921 );
1922 }
1923
1924 let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1925 fragment_distribution_map[&fragment_id];
1926 let max_parallelism = vnode_count;
1927 let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1928 assert!(is_cdc_backfill_v2_fragment);
1929 let TableParallelism::Fixed(new_parallelism) = parallelism else {
1930 return Err(anyhow::anyhow!(
1931 "invalid new parallelism {:?}, expect fixed parallelism",
1932 parallelism
1933 )
1934 .into());
1935 };
1936 if new_parallelism > max_parallelism || new_parallelism == 0 {
1937 return Err(anyhow::anyhow!(
1938 "invalid new parallelism {}, max parallelism {}",
1939 new_parallelism,
1940 max_parallelism
1941 )
1942 .into());
1943 }
1944 TableParallelism::Fixed(new_parallelism)
1945 } else if is_cdc_backfill_v2_fragment {
1946 TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1947 } else {
1948 parallelism
1949 };
1950 match dist {
1951 FragmentDistributionType::Unspecified => unreachable!(),
1952 FragmentDistributionType::Single => {
1953 let (single_worker_id, should_be_one) = fragment_slots
1954 .iter()
1955 .exactly_one()
1956 .expect("single fragment should have only one worker slot");
1957
1958 assert_eq!(*should_be_one, 1);
1959
1960 let assignment =
1961 assigner.count_actors_per_worker(&available_worker_slots, 1);
1962
1963 let (chosen_target_worker_id, should_be_one) =
1964 assignment.iter().exactly_one().ok().with_context(|| {
1965 format!(
1966 "Cannot find a single target worker for fragment {fragment_id}"
1967 )
1968 })?;
1969
1970 assert_eq!(*should_be_one, 1);
1971
1972 if *chosen_target_worker_id == *single_worker_id {
1973 tracing::debug!(
1974 "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1975 );
1976 continue;
1977 }
1978
1979 target_plan.insert(
1980 fragment_id,
1981 WorkerReschedule {
1982 worker_actor_diff: BTreeMap::from_iter(vec![
1983 (*chosen_target_worker_id, 1),
1984 (*single_worker_id, -1),
1985 ]),
1986 },
1987 );
1988 }
1989 FragmentDistributionType::Hash => match fragment_parallelism_strategy {
1990 TableParallelism::Adaptive => {
1991 let target_slot_count = adaptive_parallelism_strategy
1992 .compute_target_parallelism(available_slot_count);
1993
1994 if target_slot_count > max_parallelism {
1995 tracing::warn!(
1996 "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1997 );
1998
1999 let target_worker_slots = assigner.count_actors_per_worker(
2000 &available_worker_slots,
2001 max_parallelism,
2002 );
2003
2004 target_plan.insert(
2005 fragment_id,
2006 Self::diff_worker_slot_changes(
2007 &fragment_slots,
2008 &target_worker_slots,
2009 ),
2010 );
2011 } else if available_slot_count != target_slot_count {
2012 tracing::info!(
2013 "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2014 );
2015
2016 let target_worker_slots = assigner.count_actors_per_worker(
2017 &available_worker_slots,
2018 target_slot_count,
2019 );
2020
2021 target_plan.insert(
2022 fragment_id,
2023 Self::diff_worker_slot_changes(
2024 &fragment_slots,
2025 &target_worker_slots,
2026 ),
2027 );
2028 } else {
2029 let available_worker_slots = available_worker_slots
2030 .iter()
2031 .map(|(worker_id, v)| (*worker_id, v.get()))
2032 .collect();
2033
2034 target_plan.insert(
2035 fragment_id,
2036 Self::diff_worker_slot_changes(
2037 &fragment_slots,
2038 &available_worker_slots,
2039 ),
2040 );
2041 }
2042 }
2043 TableParallelism::Fixed(mut n) => {
2044 if n > max_parallelism {
2045 tracing::warn!(
2046 "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2047 );
2048 n = max_parallelism
2049 }
2050
2051 let target_worker_slots =
2052 assigner.count_actors_per_worker(&available_worker_slots, n);
2053
2054 target_plan.insert(
2055 fragment_id,
2056 Self::diff_worker_slot_changes(
2057 &fragment_slots,
2058 &target_worker_slots,
2059 ),
2060 );
2061 }
2062 TableParallelism::Custom => {
2063 }
2065 },
2066 }
2067 }
2068 }
2069
2070 target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2071 tracing::debug!(
2072 ?target_plan,
2073 "generate_table_resize_plan finished target_plan"
2074 );
2075 if generate_plan_for_cdc_table_backfill {
2076 job_reschedule_post_updates.resource_group_updates = HashMap::default();
2077 job_reschedule_post_updates.parallelism_updates = HashMap::default();
2078 }
2079 Ok(JobReschedulePlan {
2080 reschedules: target_plan,
2081 post_updates: job_reschedule_post_updates,
2082 })
2083 }
2084
2085 fn diff_worker_slot_changes(
2086 fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2087 target_worker_slots: &BTreeMap<WorkerId, usize>,
2088 ) -> WorkerReschedule {
2089 let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2090 let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2091
2092 for (&worker_id, &target_slots) in target_worker_slots {
2093 let ¤t_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2094
2095 if target_slots > current_slots {
2096 increased_actor_count.insert(worker_id, target_slots - current_slots);
2097 }
2098 }
2099
2100 for (&worker_id, ¤t_slots) in fragment_worker_slots {
2101 let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2102
2103 if current_slots > target_slots {
2104 decreased_actor_count.insert(worker_id, current_slots - target_slots);
2105 }
2106 }
2107
2108 let worker_ids: HashSet<_> = increased_actor_count
2109 .keys()
2110 .chain(decreased_actor_count.keys())
2111 .cloned()
2112 .collect();
2113
2114 let mut worker_actor_diff = BTreeMap::new();
2115
2116 for worker_id in worker_ids {
2117 let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2118 let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2119 let change = increased - decreased;
2120
2121 assert_ne!(change, 0);
2122
2123 worker_actor_diff.insert(worker_id, change);
2124 }
2125
2126 WorkerReschedule { worker_actor_diff }
2127 }
2128
2129 fn build_no_shuffle_relation_index(
2130 actor_map: &HashMap<ActorId, CustomActorInfo>,
2131 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2132 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2133 ) {
2134 let mut fragment_cache = HashSet::new();
2135 for actor in actor_map.values() {
2136 if fragment_cache.contains(&actor.fragment_id) {
2137 continue;
2138 }
2139
2140 for dispatcher in &actor.dispatcher {
2141 for downstream_actor_id in &dispatcher.downstream_actor_id {
2142 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2143 if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2145 no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2146 no_shuffle_target_fragment_ids
2147 .insert(downstream_actor.fragment_id as FragmentId);
2148 }
2149 }
2150 }
2151 }
2152
2153 fragment_cache.insert(actor.fragment_id);
2154 }
2155 }
2156
2157 fn build_fragment_dispatcher_index(
2158 actor_map: &HashMap<ActorId, CustomActorInfo>,
2159 fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2160 ) {
2161 for actor in actor_map.values() {
2162 for dispatcher in &actor.dispatcher {
2163 for downstream_actor_id in &dispatcher.downstream_actor_id {
2164 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2165 fragment_dispatcher_map
2166 .entry(actor.fragment_id as FragmentId)
2167 .or_default()
2168 .insert(
2169 downstream_actor.fragment_id as FragmentId,
2170 dispatcher.r#type().into(),
2171 );
2172 }
2173 }
2174 }
2175 }
2176 }
2177
2178 pub fn resolve_no_shuffle_upstream_tables(
2179 fragment_ids: HashSet<FragmentId>,
2180 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2181 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2182 fragment_to_table: &HashMap<FragmentId, TableId>,
2183 fragment_upstreams: &HashMap<
2184 risingwave_meta_model::FragmentId,
2185 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2186 >,
2187 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2188 ) -> MetaResult<()> {
2189 let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2190
2191 let mut fragment_ids = fragment_ids;
2192
2193 while let Some(fragment_id) = queue.pop_front() {
2196 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2197 continue;
2198 }
2199
2200 for upstream_fragment_id in fragment_upstreams
2202 .get(&(fragment_id as _))
2203 .map(|upstreams| upstreams.keys())
2204 .into_iter()
2205 .flatten()
2206 {
2207 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2208 let upstream_fragment_id = &upstream_fragment_id;
2209 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2210 continue;
2211 }
2212
2213 let table_id = &fragment_to_table[&fragment_id];
2214 let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2215
2216 if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2218 if let Some(upstream_table_parallelism) =
2219 table_parallelisms.get(upstream_table_id)
2220 {
2221 if upstream_table_parallelism != &TableParallelism::Custom {
2222 bail!(
2223 "Cannot change upstream table {} from {:?} to {:?}",
2224 upstream_table_id,
2225 upstream_table_parallelism,
2226 TableParallelism::Custom
2227 )
2228 }
2229 } else {
2230 table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2231 }
2232 }
2233
2234 fragment_ids.insert(*upstream_fragment_id);
2235 queue.push_back(*upstream_fragment_id);
2236 }
2237 }
2238
2239 let downstream_fragment_ids = fragment_ids
2240 .iter()
2241 .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2242
2243 let downstream_table_ids = downstream_fragment_ids
2244 .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2245 .collect::<HashSet<_>>();
2246
2247 table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2248
2249 Ok(())
2250 }
2251
2252 pub fn resolve_no_shuffle_upstream_fragments<T>(
2253 reschedule: &mut HashMap<FragmentId, T>,
2254 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2255 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2256 fragment_upstreams: &HashMap<
2257 risingwave_meta_model::FragmentId,
2258 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2259 >,
2260 ) -> MetaResult<()>
2261 where
2262 T: Clone + Eq,
2263 {
2264 let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2265
2266 while let Some(fragment_id) = queue.pop_front() {
2269 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2270 continue;
2271 }
2272
2273 for upstream_fragment_id in fragment_upstreams
2275 .get(&(fragment_id as _))
2276 .map(|upstreams| upstreams.keys())
2277 .into_iter()
2278 .flatten()
2279 {
2280 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2281 let upstream_fragment_id = &upstream_fragment_id;
2282 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2283 continue;
2284 }
2285
2286 let reschedule_plan = &reschedule[&fragment_id];
2287
2288 if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2289 if upstream_reschedule_plan != reschedule_plan {
2290 bail!(
2291 "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2292 fragment_id,
2293 upstream_fragment_id
2294 );
2295 }
2296
2297 continue;
2298 }
2299
2300 reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2301
2302 queue.push_back(*upstream_fragment_id);
2303 }
2304 }
2305
2306 reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2307
2308 Ok(())
2309 }
2310
2311 pub async fn resolve_related_no_shuffle_jobs(
2312 &self,
2313 jobs: &[TableId],
2314 ) -> MetaResult<HashSet<TableId>> {
2315 let RescheduleWorkingSet { related_jobs, .. } = self
2316 .metadata_manager
2317 .catalog_controller
2318 .resolve_working_set_for_reschedule_tables(
2319 jobs.iter().map(|id| id.table_id as _).collect(),
2320 )
2321 .await?;
2322
2323 Ok(related_jobs
2324 .keys()
2325 .map(|id| TableId::new(*id as _))
2326 .collect())
2327 }
2328}
2329
2330#[derive(Debug, Clone)]
2331pub enum JobParallelismTarget {
2332 Update(TableParallelism),
2333 Refresh,
2334}
2335
2336#[derive(Debug, Clone)]
2337pub enum JobResourceGroupTarget {
2338 Update(Option<String>),
2339 Keep,
2340}
2341
2342#[derive(Debug, Clone)]
2343pub struct JobRescheduleTarget {
2344 pub parallelism: JobParallelismTarget,
2345 pub resource_group: JobResourceGroupTarget,
2346}
2347
2348#[derive(Debug)]
2349pub struct JobReschedulePolicy {
2350 pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2351}
2352
2353#[derive(Debug, Clone)]
2355pub struct JobReschedulePostUpdates {
2356 pub parallelism_updates: HashMap<TableId, TableParallelism>,
2357 pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2358}
2359
2360#[derive(Debug)]
2361pub struct JobReschedulePlan {
2362 pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2363 pub post_updates: JobReschedulePostUpdates,
2364}
2365
2366impl GlobalStreamManager {
2367 #[await_tree::instrument("acquire_reschedule_read_guard")]
2368 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2369 self.scale_controller.reschedule_lock.read().await
2370 }
2371
2372 #[await_tree::instrument("acquire_reschedule_write_guard")]
2373 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2374 self.scale_controller.reschedule_lock.write().await
2375 }
2376
2377 pub async fn reschedule_actors(
2385 &self,
2386 database_id: DatabaseId,
2387 plan: JobReschedulePlan,
2388 options: RescheduleOptions,
2389 ) -> MetaResult<()> {
2390 let JobReschedulePlan {
2391 reschedules,
2392 mut post_updates,
2393 } = plan;
2394
2395 let reschedule_fragment = self
2396 .scale_controller
2397 .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2398 .await?;
2399
2400 tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2401
2402 let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2403 .iter()
2404 .flat_map(|(_, reschedule)| {
2405 reschedule
2406 .upstream_fragment_dispatcher_ids
2407 .iter()
2408 .map(|(fragment_id, _)| *fragment_id)
2409 .chain(reschedule.downstream_fragment_ids.iter().cloned())
2410 })
2411 .collect();
2412
2413 let fragment_actors = up_down_stream_fragment
2414 .iter()
2415 .map(|fragment_id| {
2416 self.metadata_manager
2417 .get_running_actors_of_fragment(*fragment_id)
2418 .map(|actor_ids| (*fragment_id, actor_ids))
2419 })
2420 .collect::<Result<_, _>>()?;
2421
2422 let command = Command::RescheduleFragment {
2423 reschedules: reschedule_fragment,
2424 fragment_actors,
2425 post_updates,
2426 };
2427
2428 let _guard = self.source_manager.pause_tick().await;
2429
2430 self.barrier_scheduler
2431 .run_command(database_id, command)
2432 .await?;
2433
2434 tracing::info!("reschedule done");
2435
2436 Ok(())
2437 }
2438
2439 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2448 tracing::info!("trigger parallelism control");
2449
2450 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2451
2452 let background_streaming_jobs = self
2453 .metadata_manager
2454 .list_background_creating_jobs()
2455 .await?;
2456
2457 let skipped_jobs = if !background_streaming_jobs.is_empty() {
2458 let jobs = self
2459 .scale_controller
2460 .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2461 .await?;
2462
2463 tracing::info!(
2464 "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2465 background_streaming_jobs,
2466 jobs
2467 );
2468
2469 jobs
2470 } else {
2471 HashSet::new()
2472 };
2473
2474 let job_ids: HashSet<_> = {
2475 let streaming_parallelisms = self
2476 .metadata_manager
2477 .catalog_controller
2478 .get_all_streaming_parallelisms()
2479 .await?;
2480
2481 streaming_parallelisms
2482 .into_iter()
2483 .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2484 .map(|(table_id, _)| table_id)
2485 .collect()
2486 };
2487
2488 let workers = self
2489 .metadata_manager
2490 .cluster_controller
2491 .list_active_streaming_workers()
2492 .await?;
2493
2494 let schedulable_worker_ids: BTreeSet<_> = workers
2495 .iter()
2496 .filter(|worker| {
2497 !worker
2498 .property
2499 .as_ref()
2500 .map(|p| p.is_unschedulable)
2501 .unwrap_or(false)
2502 })
2503 .map(|worker| worker.id as WorkerId)
2504 .collect();
2505
2506 if job_ids.is_empty() {
2507 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2508 return Ok(false);
2509 }
2510
2511 let batch_size = match self.env.opts.parallelism_control_batch_size {
2512 0 => job_ids.len(),
2513 n => n,
2514 };
2515
2516 tracing::info!(
2517 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2518 job_ids.len(),
2519 batch_size,
2520 schedulable_worker_ids
2521 );
2522
2523 let batches: Vec<_> = job_ids
2524 .into_iter()
2525 .chunks(batch_size)
2526 .into_iter()
2527 .map(|chunk| chunk.collect_vec())
2528 .collect();
2529
2530 let mut reschedules = None;
2531
2532 for batch in batches {
2533 let targets: HashMap<_, _> = batch
2534 .into_iter()
2535 .map(|job_id| {
2536 (
2537 job_id as u32,
2538 JobRescheduleTarget {
2539 parallelism: JobParallelismTarget::Refresh,
2540 resource_group: JobResourceGroupTarget::Keep,
2541 },
2542 )
2543 })
2544 .collect();
2545
2546 let plan = self
2547 .scale_controller
2548 .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2549 .await?;
2550
2551 if !plan.reschedules.is_empty() {
2552 tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2553 reschedules = Some(plan);
2554 break;
2555 }
2556 }
2557
2558 let Some(plan) = reschedules else {
2559 tracing::info!("no reschedule plan generated");
2560 return Ok(false);
2561 };
2562
2563 for (database_id, reschedules) in self
2565 .metadata_manager
2566 .split_fragment_map_by_database(plan.reschedules)
2567 .await?
2568 {
2569 self.reschedule_actors(
2570 database_id,
2571 JobReschedulePlan {
2572 reschedules,
2573 post_updates: plan.post_updates.clone(),
2574 },
2575 RescheduleOptions {
2576 resolve_no_shuffle_upstream: false,
2577 skip_create_new_actors: false,
2578 },
2579 )
2580 .await?;
2581 }
2582
2583 Ok(true)
2584 }
2585
2586 async fn run(&self, mut shutdown_rx: Receiver<()>) {
2588 tracing::info!("starting automatic parallelism control monitor");
2589
2590 let check_period =
2591 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2592
2593 let mut ticker = tokio::time::interval_at(
2594 Instant::now()
2595 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2596 check_period,
2597 );
2598 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2599
2600 ticker.tick().await;
2602
2603 let (local_notification_tx, mut local_notification_rx) =
2604 tokio::sync::mpsc::unbounded_channel();
2605
2606 self.env
2607 .notification_manager()
2608 .insert_local_sender(local_notification_tx);
2609
2610 let worker_nodes = self
2611 .metadata_manager
2612 .list_active_streaming_compute_nodes()
2613 .await
2614 .expect("list active streaming compute nodes");
2615
2616 let mut worker_cache: BTreeMap<_, _> = worker_nodes
2617 .into_iter()
2618 .map(|worker| (worker.id, worker))
2619 .collect();
2620
2621 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2622
2623 let mut should_trigger = false;
2624
2625 loop {
2626 tokio::select! {
2627 biased;
2628
2629 _ = &mut shutdown_rx => {
2630 tracing::info!("Stream manager is stopped");
2631 break;
2632 }
2633
2634 _ = ticker.tick(), if should_trigger => {
2635 let include_workers = worker_cache.keys().copied().collect_vec();
2636
2637 if include_workers.is_empty() {
2638 tracing::debug!("no available worker nodes");
2639 should_trigger = false;
2640 continue;
2641 }
2642
2643 match self.trigger_parallelism_control().await {
2644 Ok(cont) => {
2645 should_trigger = cont;
2646 }
2647 Err(e) => {
2648 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2649 ticker.reset();
2650 }
2651 }
2652 }
2653
2654 notification = local_notification_rx.recv() => {
2655 let notification = notification.expect("local notification channel closed in loop of stream manager");
2656
2657 let worker_is_streaming_compute = |worker: &WorkerNode| {
2659 worker.get_type() == Ok(WorkerType::ComputeNode)
2660 && worker.property.as_ref().unwrap().is_streaming
2661 };
2662
2663 match notification {
2664 LocalNotification::SystemParamsChange(reader) => {
2665 let new_strategy = reader.adaptive_parallelism_strategy();
2666 if new_strategy != previous_adaptive_parallelism_strategy {
2667 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2668 should_trigger = true;
2669 previous_adaptive_parallelism_strategy = new_strategy;
2670 }
2671 }
2672 LocalNotification::WorkerNodeActivated(worker) => {
2673 if !worker_is_streaming_compute(&worker) {
2674 continue;
2675 }
2676
2677 tracing::info!(worker = worker.id, "worker activated notification received");
2678
2679 let prev_worker = worker_cache.insert(worker.id, worker.clone());
2680
2681 match prev_worker {
2682 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
2683 tracing::info!(worker = worker.id, "worker parallelism changed");
2684 should_trigger = true;
2685 }
2686 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
2687 tracing::info!(worker = worker.id, "worker label changed");
2688 should_trigger = true;
2689 }
2690 None => {
2691 tracing::info!(worker = worker.id, "new worker joined");
2692 should_trigger = true;
2693 }
2694 _ => {}
2695 }
2696 }
2697
2698 LocalNotification::WorkerNodeDeleted(worker) => {
2701 if !worker_is_streaming_compute(&worker) {
2702 continue;
2703 }
2704
2705 match worker_cache.remove(&worker.id) {
2706 Some(prev_worker) => {
2707 tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2708 }
2709 None => {
2710 tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2711 }
2712 }
2713 }
2714
2715 _ => {}
2716 }
2717 }
2718 }
2719 }
2720 }
2721
2722 pub fn start_auto_parallelism_monitor(
2723 self: Arc<Self>,
2724 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2725 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2726 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2727 let join_handle = tokio::spawn(async move {
2728 self.run(shutdown_rx).await;
2729 });
2730
2731 (join_handle, shutdown_tx)
2732 }
2733}