1use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36 FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50 ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53 ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64 pub job_id: u32,
65 pub fragment_id: u32,
66 pub fragment_type_mask: FragmentTypeMask,
67 pub distribution_type: PbFragmentDistributionType,
68 pub state_table_ids: Vec<u32>,
69 pub node: StreamNode,
70 pub actor_template: StreamActorWithDispatchers,
71 pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76 pub actor_id: u32,
77 pub fragment_id: u32,
78 pub dispatcher: Vec<Dispatcher>,
79 pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use futures::future::try_join_all;
85use risingwave_common::system_param::AdaptiveParallelismStrategy;
86use risingwave_common::system_param::reader::SystemParamsRead;
87use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
88use risingwave_meta_model::DispatcherType;
89use risingwave_pb::stream_plan::stream_node::NodeBody;
90
91use super::SourceChange;
92use crate::controller::id::IdCategory;
93use crate::controller::utils::filter_workers_by_resource_group;
94use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
95
96#[derive(Educe)]
98#[educe(Debug)]
99pub struct RescheduleContext {
100 #[educe(Debug(ignore))]
102 actor_map: HashMap<ActorId, CustomActorInfo>,
103 actor_status: BTreeMap<ActorId, WorkerId>,
105 #[educe(Debug(ignore))]
107 fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
108 stream_source_fragment_ids: HashSet<FragmentId>,
110 stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
112 no_shuffle_target_fragment_ids: HashSet<FragmentId>,
114 no_shuffle_source_fragment_ids: HashSet<FragmentId>,
116 fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
118 fragment_upstreams: HashMap<
119 risingwave_meta_model::FragmentId,
120 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
121 >,
122}
123
124impl RescheduleContext {
125 fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
126 self.actor_status
127 .get(actor_id)
128 .cloned()
129 .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
130 }
131}
132
133pub fn rebalance_actor_vnode(
160 actors: &[CustomActorInfo],
161 actors_to_remove: &BTreeSet<ActorId>,
162 actors_to_create: &BTreeSet<ActorId>,
163) -> HashMap<ActorId, Bitmap> {
164 let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
165
166 assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
167 assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
168
169 assert!(actors.len() >= actors_to_remove.len());
170
171 let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
172 assert!(target_actor_count > 0);
173
174 let vnode_count = actors[0]
176 .vnode_bitmap
177 .as_ref()
178 .expect("vnode bitmap unset")
179 .len();
180
181 #[derive(Debug)]
183 struct Balance {
184 actor_id: ActorId,
185 balance: i32,
186 builder: BitmapBuilder,
187 }
188 let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
189
190 tracing::debug!(
191 "expected {}, remain {}, prev actors {}, target actors {}",
192 expected,
193 remain,
194 actors.len(),
195 target_actor_count,
196 );
197
198 let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
199 .iter()
200 .map(|actor| {
201 (
202 actor.actor_id as ActorId,
203 actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
204 )
205 })
206 .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
207
208 let order_by_bitmap_desc =
209 |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
210 bitmap_a
211 .count_ones()
212 .cmp(&bitmap_b.count_ones())
213 .reverse()
214 .then(id_a.cmp(id_b))
215 };
216
217 let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
218 let mut builder = BitmapBuilder::default();
219 builder.append_bitmap(bitmap);
220 builder
221 };
222
223 let (prev_expected, _) = vnode_count.div_rem(&actors.len());
224
225 let prev_remain = removed
226 .iter()
227 .map(|(_, bitmap)| {
228 assert!(bitmap.count_ones() >= prev_expected);
229 bitmap.count_ones() - prev_expected
230 })
231 .sum::<usize>();
232
233 removed.sort_by(order_by_bitmap_desc);
234 rest.sort_by(order_by_bitmap_desc);
235
236 let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
237 actor_id,
238 balance: bitmap.count_ones() as i32,
239 builder: builder_from_bitmap(&bitmap),
240 });
241
242 let mut rest_balances = rest
243 .into_iter()
244 .map(|(actor_id, bitmap)| Balance {
245 actor_id,
246 balance: bitmap.count_ones() as i32 - expected as i32,
247 builder: builder_from_bitmap(&bitmap),
248 })
249 .collect_vec();
250
251 let mut created_balances = actors_to_create
252 .iter()
253 .map(|actor_id| Balance {
254 actor_id: *actor_id,
255 balance: -(expected as i32),
256 builder: BitmapBuilder::zeroed(vnode_count),
257 })
258 .collect_vec();
259
260 for balance in created_balances
261 .iter_mut()
262 .rev()
263 .take(prev_remain)
264 .chain(rest_balances.iter_mut())
265 {
266 if remain > 0 {
267 balance.balance -= 1;
268 remain -= 1;
269 }
270 }
271
272 for balance in &mut created_balances {
274 if remain > 0 {
275 balance.balance -= 1;
276 remain -= 1;
277 }
278 }
279
280 assert_eq!(remain, 0);
281
282 let mut v: VecDeque<_> = removed_balances
283 .chain(rest_balances)
284 .chain(created_balances)
285 .collect();
286
287 let mut result = HashMap::with_capacity(target_actor_count);
290
291 for balance in &v {
292 tracing::debug!(
293 "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
294 balance.actor_id,
295 balance.balance,
296 actors_to_remove.contains(&balance.actor_id),
297 actors_to_create.contains(&balance.actor_id)
298 );
299 }
300
301 while !v.is_empty() {
302 if v.len() == 1 {
303 let single = v.pop_front().unwrap();
304 assert_eq!(single.balance, 0);
305 if !actors_to_remove.contains(&single.actor_id) {
306 result.insert(single.actor_id, single.builder.finish());
307 }
308
309 continue;
310 }
311
312 let mut src = v.pop_front().unwrap();
313 let mut dst = v.pop_back().unwrap();
314
315 let n = min(abs(src.balance), abs(dst.balance));
316
317 let mut moved = 0;
318 for idx in (0..vnode_count).rev() {
319 if moved >= n {
320 break;
321 }
322
323 if src.builder.is_set(idx) {
324 src.builder.set(idx, false);
325 assert!(!dst.builder.is_set(idx));
326 dst.builder.set(idx, true);
327 moved += 1;
328 }
329 }
330
331 src.balance -= n;
332 dst.balance += n;
333
334 if src.balance != 0 {
335 v.push_front(src);
336 } else if !actors_to_remove.contains(&src.actor_id) {
337 result.insert(src.actor_id, src.builder.finish());
338 }
339
340 if dst.balance != 0 {
341 v.push_back(dst);
342 } else {
343 result.insert(dst.actor_id, dst.builder.finish());
344 }
345 }
346
347 result
348}
349
350#[derive(Debug, Clone, Copy)]
351pub struct RescheduleOptions {
352 pub resolve_no_shuffle_upstream: bool,
354
355 pub skip_create_new_actors: bool,
357}
358
359pub type ScaleControllerRef = Arc<ScaleController>;
360
361pub struct ScaleController {
362 pub metadata_manager: MetadataManager,
363
364 pub source_manager: SourceManagerRef,
365
366 pub env: MetaSrvEnv,
367
368 pub reschedule_lock: RwLock<()>,
371}
372
373impl ScaleController {
374 pub fn new(
375 metadata_manager: &MetadataManager,
376 source_manager: SourceManagerRef,
377 env: MetaSrvEnv,
378 ) -> Self {
379 Self {
380 metadata_manager: metadata_manager.clone(),
381 source_manager,
382 env,
383 reschedule_lock: RwLock::new(()),
384 }
385 }
386
387 pub async fn integrity_check(&self) -> MetaResult<()> {
388 self.metadata_manager
389 .catalog_controller
390 .integrity_check()
391 .await
392 }
393
394 async fn build_reschedule_context(
396 &self,
397 reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
398 options: RescheduleOptions,
399 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
400 ) -> MetaResult<RescheduleContext> {
401 let worker_nodes: HashMap<WorkerId, WorkerNode> = self
402 .metadata_manager
403 .list_active_streaming_compute_nodes()
404 .await?
405 .into_iter()
406 .map(|worker_node| (worker_node.id as _, worker_node))
407 .collect();
408
409 if worker_nodes.is_empty() {
410 bail!("no available compute node in the cluster");
411 }
412
413 let unschedulable_worker_ids: HashSet<_> = worker_nodes
415 .values()
416 .filter(|w| {
417 w.property
418 .as_ref()
419 .map(|property| property.is_unschedulable)
420 .unwrap_or(false)
421 })
422 .map(|worker| worker.id as WorkerId)
423 .collect();
424
425 for (fragment_id, reschedule) in &*reschedule {
426 for (worker_id, change) in &reschedule.worker_actor_diff {
427 if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
428 bail!(
429 "unable to move fragment {} to unschedulable worker {}",
430 fragment_id,
431 worker_id
432 );
433 }
434 }
435 }
436
437 let mut actor_map = HashMap::new();
440 let mut fragment_map = HashMap::new();
442 let mut actor_status = BTreeMap::new();
444 let mut fragment_state = HashMap::new();
445 let mut fragment_to_table = HashMap::new();
446
447 fn fulfill_index_by_fragment_ids(
448 actor_map: &mut HashMap<u32, CustomActorInfo>,
449 fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
450 actor_status: &mut BTreeMap<ActorId, WorkerId>,
451 fragment_state: &mut HashMap<FragmentId, State>,
452 fragment_to_table: &mut HashMap<FragmentId, TableId>,
453 fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
454 actors: HashMap<ActorId, actor::Model>,
455 mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
456 related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
457 ) {
458 let mut fragment_actors: HashMap<
459 risingwave_meta_model::FragmentId,
460 Vec<CustomActorInfo>,
461 > = HashMap::new();
462
463 let mut expr_contexts = HashMap::new();
464 for (
465 _,
466 actor::Model {
467 actor_id,
468 fragment_id,
469 status: _,
470 splits: _,
471 worker_id,
472 vnode_bitmap,
473 expr_context,
474 ..
475 },
476 ) in actors
477 {
478 let dispatchers = actor_dispatchers
479 .remove(&(actor_id as _))
480 .unwrap_or_default();
481
482 let actor_info = CustomActorInfo {
483 actor_id: actor_id as _,
484 fragment_id: fragment_id as _,
485 dispatcher: dispatchers,
486 vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
487 };
488
489 actor_map.insert(actor_id as _, actor_info.clone());
490
491 fragment_actors
492 .entry(fragment_id as _)
493 .or_default()
494 .push(actor_info);
495
496 actor_status.insert(actor_id as _, worker_id as WorkerId);
497
498 expr_contexts.insert(actor_id as u32, expr_context);
499 }
500
501 for (
502 _,
503 fragment::Model {
504 fragment_id,
505 job_id,
506 fragment_type_mask,
507 distribution_type,
508 stream_node,
509 state_table_ids,
510 ..
511 },
512 ) in fragments
513 {
514 let actors = fragment_actors
515 .remove(&(fragment_id as _))
516 .unwrap_or_default();
517
518 let CustomActorInfo {
519 actor_id,
520 fragment_id,
521 dispatcher,
522 vnode_bitmap,
523 } = actors.first().unwrap().clone();
524
525 let (related_job, job_definition) =
526 related_jobs.get(&job_id).expect("job not found");
527
528 let fragment = CustomFragmentInfo {
529 job_id: job_id as _,
530 fragment_id: fragment_id as _,
531 fragment_type_mask: fragment_type_mask.into(),
532 distribution_type: distribution_type.into(),
533 state_table_ids: state_table_ids.into_u32_array(),
534 node: stream_node.to_protobuf(),
535 actor_template: (
536 StreamActor {
537 actor_id,
538 fragment_id: fragment_id as _,
539 vnode_bitmap,
540 mview_definition: job_definition.to_owned(),
541 expr_context: expr_contexts
542 .get(&actor_id)
543 .cloned()
544 .map(|expr_context| expr_context.to_protobuf()),
545 },
546 dispatcher,
547 ),
548 actors,
549 };
550
551 fragment_map.insert(fragment_id as _, fragment);
552
553 fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
554
555 fragment_state.insert(
556 fragment_id,
557 table_fragments::PbState::from(related_job.job_status),
558 );
559 }
560 }
561 let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
562 let working_set = self
563 .metadata_manager
564 .catalog_controller
565 .resolve_working_set_for_reschedule_fragments(fragment_ids)
566 .await?;
567
568 fulfill_index_by_fragment_ids(
569 &mut actor_map,
570 &mut fragment_map,
571 &mut actor_status,
572 &mut fragment_state,
573 &mut fragment_to_table,
574 working_set.fragments,
575 working_set.actors,
576 working_set.actor_dispatchers,
577 working_set.related_jobs,
578 );
579
580 let mut no_shuffle_source_fragment_ids = HashSet::new();
582 let mut no_shuffle_target_fragment_ids = HashSet::new();
583
584 Self::build_no_shuffle_relation_index(
585 &actor_map,
586 &mut no_shuffle_source_fragment_ids,
587 &mut no_shuffle_target_fragment_ids,
588 );
589
590 if options.resolve_no_shuffle_upstream {
591 let original_reschedule_keys = reschedule.keys().cloned().collect();
592
593 Self::resolve_no_shuffle_upstream_fragments(
594 reschedule,
595 &no_shuffle_source_fragment_ids,
596 &no_shuffle_target_fragment_ids,
597 &working_set.fragment_upstreams,
598 )?;
599
600 if !table_parallelisms.is_empty() {
601 Self::resolve_no_shuffle_upstream_tables(
603 original_reschedule_keys,
604 &no_shuffle_source_fragment_ids,
605 &no_shuffle_target_fragment_ids,
606 &fragment_to_table,
607 &working_set.fragment_upstreams,
608 table_parallelisms,
609 )?;
610 }
611 }
612
613 let mut fragment_dispatcher_map = HashMap::new();
614 Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
615
616 let mut stream_source_fragment_ids = HashSet::new();
617 let mut stream_source_backfill_fragment_ids = HashMap::new();
618 let mut no_shuffle_reschedule = HashMap::new();
619 for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
620 let fragment = fragment_map
621 .get(fragment_id)
622 .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
623
624 match fragment_state[fragment_id] {
626 table_fragments::State::Unspecified => unreachable!(),
627 state @ table_fragments::State::Initial => {
628 bail!(
629 "the materialized view of fragment {fragment_id} is in state {}",
630 state.as_str_name()
631 )
632 }
633 state @ table_fragments::State::Creating => {
634 let stream_node = &fragment.node;
635
636 let mut is_reschedulable = true;
637 visit_stream_node_cont(stream_node, |body| {
638 if let Some(NodeBody::StreamScan(node)) = &body.node_body {
639 if !node.stream_scan_type().is_reschedulable() {
640 is_reschedulable = false;
641
642 return false;
644 }
645
646 return true;
648 }
649
650 true
652 });
653
654 if !is_reschedulable {
655 bail!(
656 "the materialized view of fragment {fragment_id} is in state {}",
657 state.as_str_name()
658 )
659 }
660 }
661 table_fragments::State::Created => {}
662 }
663
664 if no_shuffle_target_fragment_ids.contains(fragment_id) {
665 bail!(
666 "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
667 );
668 }
669
670 if no_shuffle_source_fragment_ids.contains(fragment_id) {
675 let mut queue: VecDeque<_> = fragment_dispatcher_map
677 .get(fragment_id)
678 .unwrap()
679 .keys()
680 .cloned()
681 .collect();
682
683 while let Some(downstream_id) = queue.pop_front() {
684 if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
685 continue;
686 }
687
688 if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
689 {
690 let no_shuffle_downstreams = downstream_fragments
691 .iter()
692 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
693 .map(|(fragment_id, _)| fragment_id);
694
695 queue.extend(no_shuffle_downstreams.copied());
696 }
697
698 no_shuffle_reschedule.insert(
699 downstream_id,
700 WorkerReschedule {
701 worker_actor_diff: worker_actor_diff.clone(),
702 },
703 );
704 }
705 }
706
707 if fragment
708 .fragment_type_mask
709 .contains(FragmentTypeFlag::Source)
710 && fragment.node.find_stream_source().is_some()
711 {
712 stream_source_fragment_ids.insert(*fragment_id);
713 }
714
715 let current_worker_ids = fragment
717 .actors
718 .iter()
719 .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
720 .collect::<HashSet<_>>();
721
722 for (removed, change) in worker_actor_diff {
723 if !current_worker_ids.contains(removed) && change.is_negative() {
724 bail!(
725 "no actor on the worker {} of fragment {}",
726 removed,
727 fragment_id
728 );
729 }
730 }
731
732 let added_actor_count: usize = worker_actor_diff
733 .values()
734 .filter(|change| change.is_positive())
735 .cloned()
736 .map(|change| change as usize)
737 .sum();
738
739 let removed_actor_count: usize = worker_actor_diff
740 .values()
741 .filter(|change| change.is_positive())
742 .cloned()
743 .map(|v| v.unsigned_abs())
744 .sum();
745
746 match fragment.distribution_type {
747 FragmentDistributionType::Hash => {
748 if fragment.actors.len() + added_actor_count <= removed_actor_count {
749 bail!("can't remove all actors from fragment {}", fragment_id);
750 }
751 }
752 FragmentDistributionType::Single => {
753 if added_actor_count != removed_actor_count {
754 bail!("single distribution fragment only support migration");
755 }
756 }
757 FragmentDistributionType::Unspecified => unreachable!(),
758 }
759 }
760
761 if !no_shuffle_reschedule.is_empty() {
762 tracing::info!(
763 "reschedule plan rewritten with NoShuffle reschedule {:?}",
764 no_shuffle_reschedule
765 );
766
767 for noshuffle_downstream in no_shuffle_reschedule.keys() {
768 let fragment = fragment_map.get(noshuffle_downstream).unwrap();
769 if fragment
771 .fragment_type_mask
772 .contains(FragmentTypeFlag::SourceScan)
773 {
774 let stream_node = &fragment.node;
775 if let Some((_source_id, upstream_source_fragment_id)) =
776 stream_node.find_source_backfill()
777 {
778 stream_source_backfill_fragment_ids
779 .insert(fragment.fragment_id, upstream_source_fragment_id);
780 }
781 }
782 }
783 }
784
785 reschedule.extend(no_shuffle_reschedule.into_iter());
787
788 Ok(RescheduleContext {
789 actor_map,
790 actor_status,
791 fragment_map,
792 stream_source_fragment_ids,
793 stream_source_backfill_fragment_ids,
794 no_shuffle_target_fragment_ids,
795 no_shuffle_source_fragment_ids,
796 fragment_dispatcher_map,
797 fragment_upstreams: working_set.fragment_upstreams,
798 })
799 }
800
801 pub(crate) async fn analyze_reschedule_plan(
813 &self,
814 mut reschedules: HashMap<FragmentId, WorkerReschedule>,
815 options: RescheduleOptions,
816 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
817 ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
818 tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
819 let ctx = self
820 .build_reschedule_context(&mut reschedules, options, table_parallelisms)
821 .await?;
822 tracing::debug!("reschedule context: {:#?}", ctx);
823 let reschedules = reschedules;
824
825 let (fragment_actors_to_remove, fragment_actors_to_create) =
830 self.arrange_reschedules(&reschedules, &ctx)?;
831
832 let mut fragment_actor_bitmap = HashMap::new();
833 for fragment_id in reschedules.keys() {
834 if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
835 continue;
838 }
839
840 let actors_to_create = fragment_actors_to_create
841 .get(fragment_id)
842 .map(|map| map.keys().copied().collect())
843 .unwrap_or_default();
844
845 let actors_to_remove = fragment_actors_to_remove
846 .get(fragment_id)
847 .map(|map| map.keys().copied().collect())
848 .unwrap_or_default();
849
850 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
851
852 match fragment.distribution_type {
853 FragmentDistributionType::Single => {
854 fragment_actor_bitmap
856 .insert(fragment.fragment_id as FragmentId, Default::default());
857 }
858 FragmentDistributionType::Hash => {
859 let actor_vnode = rebalance_actor_vnode(
860 &fragment.actors,
861 &actors_to_remove,
862 &actors_to_create,
863 );
864
865 fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
866 }
867
868 FragmentDistributionType::Unspecified => unreachable!(),
869 }
870 }
871
872 let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
876 for fragment_id in reschedules.keys() {
877 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
878 let mut new_actor_ids = BTreeMap::new();
879 for actor in &fragment.actors {
880 if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
881 && actors_to_remove.contains_key(&actor.actor_id)
882 {
883 continue;
884 }
885 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
886 new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
887 }
888
889 if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
890 for (actor_id, worker_id) in actors_to_create {
891 new_actor_ids.insert(*actor_id, *worker_id);
892 }
893 }
894
895 assert!(
896 !new_actor_ids.is_empty(),
897 "should be at least one actor in fragment {} after rescheduling",
898 fragment_id
899 );
900
901 fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
902 }
903
904 let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
905
906 fn arrange_no_shuffle_relation(
912 ctx: &RescheduleContext,
913 fragment_id: &FragmentId,
914 upstream_fragment_id: &FragmentId,
915 fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
916 actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
917 fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
918 no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
919 no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
920 ) {
921 if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
922 return;
923 }
924
925 let fragment = &ctx.fragment_map[fragment_id];
926
927 let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
928
929 for upstream_actor in &upstream_fragment.actors {
931 for dispatcher in &upstream_actor.dispatcher {
932 if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
933 let downstream_actor_id =
934 *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
935
936 if !ctx
938 .no_shuffle_target_fragment_ids
939 .contains(upstream_fragment_id)
940 {
941 actor_group_map.insert(
942 upstream_actor.actor_id,
943 (upstream_fragment.fragment_id, upstream_actor.actor_id),
944 );
945 actor_group_map.insert(
946 downstream_actor_id,
947 (upstream_fragment.fragment_id, upstream_actor.actor_id),
948 );
949 } else {
950 let root_actor_id = actor_group_map[&upstream_actor.actor_id];
951
952 actor_group_map.insert(downstream_actor_id, root_actor_id);
953 }
954 }
955 }
956 }
957
958 let upstream_fragment_bitmap = fragment_updated_bitmap
960 .get(upstream_fragment_id)
961 .cloned()
962 .unwrap_or_default();
963
964 if upstream_fragment.distribution_type == FragmentDistributionType::Single {
966 assert!(
967 upstream_fragment_bitmap.is_empty(),
968 "single fragment should have no bitmap updates"
969 );
970 }
971
972 let upstream_fragment_actor_map = fragment_actors_after_reschedule
973 .get(upstream_fragment_id)
974 .cloned()
975 .unwrap();
976
977 let fragment_actor_map = fragment_actors_after_reschedule
978 .get(fragment_id)
979 .cloned()
980 .unwrap();
981
982 let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
983
984 let mut fragment_bitmap = HashMap::new();
986
987 for (actor_id, worker_id) in &fragment_actor_map {
988 if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
989 let root_bitmap = fragment_updated_bitmap
990 .get(root_fragment)
991 .expect("root fragment bitmap not found")
992 .get(root_actor_id)
993 .cloned()
994 .expect("root actor bitmap not found");
995
996 fragment_bitmap.insert(*actor_id, root_bitmap);
998
999 no_shuffle_upstream_actor_map
1000 .entry(*actor_id as ActorId)
1001 .or_default()
1002 .insert(*upstream_fragment_id, *root_actor_id);
1003 no_shuffle_downstream_actors_map
1004 .entry(*root_actor_id)
1005 .or_default()
1006 .insert(*fragment_id, *actor_id);
1007 } else {
1008 worker_reverse_index
1009 .entry(*worker_id)
1010 .or_default()
1011 .insert(*actor_id);
1012 }
1013 }
1014
1015 let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1016
1017 for (actor_id, worker_id) in &upstream_fragment_actor_map {
1018 if !actor_group_map.contains_key(actor_id) {
1019 upstream_worker_reverse_index
1020 .entry(*worker_id)
1021 .or_default()
1022 .insert(*actor_id);
1023 }
1024 }
1025
1026 for (worker_id, actor_ids) in worker_reverse_index {
1028 let upstream_actor_ids = upstream_worker_reverse_index
1029 .get(&worker_id)
1030 .unwrap()
1031 .clone();
1032
1033 assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1034
1035 for (actor_id, upstream_actor_id) in actor_ids
1036 .into_iter()
1037 .zip_eq_debug(upstream_actor_ids.into_iter())
1038 {
1039 match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1040 None => {
1041 assert_eq!(
1043 upstream_fragment.distribution_type,
1044 FragmentDistributionType::Single
1045 );
1046 }
1047 Some(bitmap) => {
1048 fragment_bitmap.insert(actor_id, bitmap);
1050 }
1051 }
1052
1053 no_shuffle_upstream_actor_map
1054 .entry(actor_id as ActorId)
1055 .or_default()
1056 .insert(*upstream_fragment_id, upstream_actor_id);
1057 no_shuffle_downstream_actors_map
1058 .entry(upstream_actor_id)
1059 .or_default()
1060 .insert(*fragment_id, actor_id);
1061 }
1062 }
1063
1064 match fragment.distribution_type {
1065 FragmentDistributionType::Hash => {}
1066 FragmentDistributionType::Single => {
1067 assert!(fragment_bitmap.is_empty());
1069 }
1070 FragmentDistributionType::Unspecified => unreachable!(),
1071 }
1072
1073 if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1074 assert_eq!(
1075 e.entry.get(),
1076 &e.value,
1077 "bitmaps derived from different no-shuffle upstreams mismatch"
1078 );
1079 }
1080
1081 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1083 let no_shuffle_downstreams = downstream_fragments
1084 .iter()
1085 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1086 .map(|(fragment_id, _)| fragment_id);
1087
1088 for downstream_fragment_id in no_shuffle_downstreams {
1089 arrange_no_shuffle_relation(
1090 ctx,
1091 downstream_fragment_id,
1092 fragment_id,
1093 fragment_actors_after_reschedule,
1094 actor_group_map,
1095 fragment_updated_bitmap,
1096 no_shuffle_upstream_actor_map,
1097 no_shuffle_downstream_actors_map,
1098 );
1099 }
1100 }
1101 }
1102
1103 let mut no_shuffle_upstream_actor_map = HashMap::new();
1104 let mut no_shuffle_downstream_actors_map = HashMap::new();
1105 let mut actor_group_map = HashMap::new();
1106 for fragment_id in reschedules.keys() {
1109 if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1110 && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1111 && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1112 {
1113 for downstream_fragment_id in downstream_fragments.keys() {
1114 arrange_no_shuffle_relation(
1115 &ctx,
1116 downstream_fragment_id,
1117 fragment_id,
1118 &fragment_actors_after_reschedule,
1119 &mut actor_group_map,
1120 &mut fragment_actor_bitmap,
1121 &mut no_shuffle_upstream_actor_map,
1122 &mut no_shuffle_downstream_actors_map,
1123 );
1124 }
1125 }
1126 }
1127
1128 tracing::debug!("actor group map {:?}", actor_group_map);
1129
1130 let mut new_created_actors = HashMap::new();
1131 for fragment_id in reschedules.keys() {
1132 let actors_to_create = fragment_actors_to_create
1133 .get(fragment_id)
1134 .cloned()
1135 .unwrap_or_default();
1136
1137 let fragment = &ctx.fragment_map[fragment_id];
1138
1139 assert!(!fragment.actors.is_empty());
1140
1141 for actor_to_create in &actors_to_create {
1142 let new_actor_id = actor_to_create.0;
1143 let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1144
1145 new_actor.actor_id = *new_actor_id;
1149
1150 Self::modify_actor_upstream_and_downstream(
1151 &ctx,
1152 &fragment_actors_to_remove,
1153 &fragment_actors_to_create,
1154 &fragment_actor_bitmap,
1155 &no_shuffle_downstream_actors_map,
1156 &mut new_actor,
1157 &mut dispatchers,
1158 )?;
1159
1160 if let Some(bitmap) = fragment_actor_bitmap
1161 .get(fragment_id)
1162 .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1163 {
1164 new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1165 }
1166
1167 new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1168 }
1169 }
1170
1171 let mut fragment_actor_splits = HashMap::new();
1174 for fragment_id in reschedules.keys() {
1175 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1176
1177 if ctx.stream_source_fragment_ids.contains(fragment_id) {
1178 let fragment = &ctx.fragment_map[fragment_id];
1179
1180 let prev_actor_ids = fragment
1181 .actors
1182 .iter()
1183 .map(|actor| actor.actor_id)
1184 .collect_vec();
1185
1186 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1187
1188 let actor_splits = self
1189 .source_manager
1190 .migrate_splits_for_source_actors(
1191 *fragment_id,
1192 &prev_actor_ids,
1193 &curr_actor_ids,
1194 )
1195 .await?;
1196
1197 tracing::debug!(
1198 "source actor splits: {:?}, fragment_id: {}",
1199 actor_splits,
1200 fragment_id
1201 );
1202 fragment_actor_splits.insert(*fragment_id, actor_splits);
1203 }
1204 }
1205 if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1207 for fragment_id in reschedules.keys() {
1208 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1209
1210 if let Some(upstream_source_fragment_id) =
1211 ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1212 {
1213 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1214
1215 let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1216 *fragment_id,
1217 *upstream_source_fragment_id,
1218 &curr_actor_ids,
1219 &fragment_actor_splits,
1220 &no_shuffle_upstream_actor_map,
1221 )?;
1222 tracing::debug!(
1223 "source backfill actor splits: {:?}, fragment_id: {}",
1224 actor_splits,
1225 fragment_id
1226 );
1227 fragment_actor_splits.insert(*fragment_id, actor_splits);
1228 }
1229 }
1230 }
1231
1232 let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1234 HashMap::with_capacity(reschedules.len());
1235
1236 for (fragment_id, _) in reschedules {
1237 let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1238
1239 if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1240 for (actor_id, worker_id) in actor_worker_maps {
1241 actors_to_create
1242 .entry(worker_id)
1243 .or_default()
1244 .push(actor_id);
1245 }
1246 }
1247
1248 let actors_to_remove = fragment_actors_to_remove
1249 .get(&fragment_id)
1250 .cloned()
1251 .unwrap_or_default()
1252 .into_keys()
1253 .collect();
1254
1255 let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1256
1257 assert!(!actors_after_reschedule.is_empty());
1258
1259 let fragment = &ctx.fragment_map[&fragment_id];
1260
1261 let in_degree_types: HashSet<_> = ctx
1262 .fragment_upstreams
1263 .get(&(fragment_id as _))
1264 .map(|upstreams| upstreams.values())
1265 .into_iter()
1266 .flatten()
1267 .cloned()
1268 .collect();
1269
1270 let upstream_dispatcher_mapping = match fragment.distribution_type {
1271 FragmentDistributionType::Hash => {
1272 if !in_degree_types.contains(&DispatcherType::Hash) {
1273 None
1274 } else {
1275 Some(ActorMapping::from_bitmaps(
1277 &fragment_actor_bitmap[&fragment_id],
1278 ))
1279 }
1280 }
1281
1282 FragmentDistributionType::Single => {
1283 assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1284 None
1285 }
1286 FragmentDistributionType::Unspecified => unreachable!(),
1287 };
1288
1289 let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1290
1291 {
1292 if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1293 for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1294 match upstream_dispatcher_type {
1295 DispatcherType::NoShuffle => {}
1296 _ => {
1297 upstream_fragment_dispatcher_set.insert((
1298 *upstream_fragment_id as FragmentId,
1299 fragment.fragment_id as DispatcherId,
1300 ));
1301 }
1302 }
1303 }
1304 }
1305 }
1306
1307 let downstream_fragment_ids = if let Some(downstream_fragments) =
1308 ctx.fragment_dispatcher_map.get(&fragment_id)
1309 {
1310 downstream_fragments
1313 .iter()
1314 .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1315 .map(|(fragment_id, _)| *fragment_id)
1316 .collect_vec()
1317 } else {
1318 vec![]
1319 };
1320
1321 let vnode_bitmap_updates = match fragment.distribution_type {
1322 FragmentDistributionType::Hash => {
1323 let mut vnode_bitmap_updates =
1324 fragment_actor_bitmap.remove(&fragment_id).unwrap();
1325
1326 for actor_id in actors_after_reschedule.keys() {
1329 assert!(vnode_bitmap_updates.contains_key(actor_id));
1330
1331 if let Some(actor) = ctx.actor_map.get(actor_id) {
1333 let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1334
1335 if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1336 && prev_bitmap.eq(bitmap)
1337 {
1338 vnode_bitmap_updates.remove(actor_id);
1339 }
1340 }
1341 }
1342
1343 vnode_bitmap_updates
1344 }
1345 FragmentDistributionType::Single => HashMap::new(),
1346 FragmentDistributionType::Unspecified => unreachable!(),
1347 };
1348
1349 let upstream_fragment_dispatcher_ids =
1350 upstream_fragment_dispatcher_set.into_iter().collect_vec();
1351
1352 let actor_splits = fragment_actor_splits
1353 .get(&fragment_id)
1354 .cloned()
1355 .unwrap_or_default();
1356
1357 let cdc_table_snapshot_split_assignment = if fragment
1358 .fragment_type_mask
1359 .contains(FragmentTypeFlag::StreamCdcScan)
1360 {
1361 assign_cdc_table_snapshot_splits_impl(
1362 fragment.job_id,
1363 fragment_actors_after_reschedule
1364 .get(&fragment_id)
1365 .unwrap()
1366 .keys()
1367 .copied()
1368 .collect(),
1369 self.env.meta_store_ref(),
1370 )
1371 .await?
1372 } else {
1373 HashMap::default()
1374 };
1375
1376 reschedule_fragment.insert(
1377 fragment_id,
1378 Reschedule {
1379 added_actors: actors_to_create,
1380 removed_actors: actors_to_remove,
1381 vnode_bitmap_updates,
1382 upstream_fragment_dispatcher_ids,
1383 upstream_dispatcher_mapping,
1384 downstream_fragment_ids,
1385 actor_splits,
1386 newly_created_actors: Default::default(),
1387 cdc_table_snapshot_split_assignment,
1388 },
1389 );
1390 }
1391
1392 let mut fragment_created_actors = HashMap::new();
1393 for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1394 let mut created_actors = HashMap::new();
1395 for (actor_id, worker_id) in actors_to_create {
1396 let actor = new_created_actors.get(actor_id).cloned().unwrap();
1397 created_actors.insert(*actor_id, (actor, *worker_id));
1398 }
1399
1400 fragment_created_actors.insert(*fragment_id, created_actors);
1401 }
1402
1403 for (fragment_id, to_create) in fragment_created_actors {
1404 let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1405 reschedule.newly_created_actors = to_create;
1406 }
1407 tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1408
1409 Ok(reschedule_fragment)
1410 }
1411
1412 #[expect(clippy::type_complexity)]
1413 fn arrange_reschedules(
1414 &self,
1415 reschedule: &HashMap<FragmentId, WorkerReschedule>,
1416 ctx: &RescheduleContext,
1417 ) -> MetaResult<(
1418 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1419 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1420 )> {
1421 let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1422 let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1423
1424 for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1425 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1426
1427 let mut actors_to_remove = BTreeMap::new();
1429 let mut actors_to_create = BTreeMap::new();
1430
1431 let mut worker_to_actors = HashMap::new();
1433
1434 for actor in &fragment.actors {
1435 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1436 worker_to_actors
1437 .entry(worker_id)
1438 .or_insert(BTreeSet::new())
1439 .insert(actor.actor_id as ActorId);
1440 }
1441
1442 let decreased_actor_count = worker_actor_diff
1443 .iter()
1444 .filter(|(_, change)| change.is_negative())
1445 .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1446
1447 for (worker_id, n) in decreased_actor_count {
1448 if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1449 if actor_ids.len() < n {
1450 bail!(
1451 "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1452 fragment_id,
1453 worker_id,
1454 actor_ids.len(),
1455 n
1456 );
1457 }
1458
1459 let removed_actors: Vec<_> = actor_ids
1460 .iter()
1461 .skip(actor_ids.len().saturating_sub(n))
1462 .cloned()
1463 .collect();
1464
1465 for actor in removed_actors {
1466 actors_to_remove.insert(actor, *worker_id);
1467 }
1468 }
1469 }
1470
1471 let increased_actor_count = worker_actor_diff
1472 .iter()
1473 .filter(|(_, change)| change.is_positive());
1474
1475 for (worker, n) in increased_actor_count {
1476 for _ in 0..*n {
1477 let id = self
1478 .env
1479 .id_gen_manager()
1480 .generate_interval::<{ IdCategory::Actor }>(1)
1481 as ActorId;
1482 actors_to_create.insert(id, *worker);
1483 }
1484 }
1485
1486 if !actors_to_remove.is_empty() {
1487 fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1488 }
1489
1490 if !actors_to_create.is_empty() {
1491 fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1492 }
1493 }
1494
1495 for actors_to_remove in fragment_actors_to_remove.values() {
1497 for actor_id in actors_to_remove.keys() {
1498 let actor = ctx.actor_map.get(actor_id).unwrap();
1499 for dispatcher in &actor.dispatcher {
1500 if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1501 let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1502
1503 let _should_exists = fragment_actors_to_remove
1504 .get(&(dispatcher.dispatcher_id as FragmentId))
1505 .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1506 .get(downstream_actor_id)
1507 .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1508 }
1509 }
1510 }
1511 }
1512
1513 Ok((fragment_actors_to_remove, fragment_actors_to_create))
1514 }
1515
1516 fn modify_actor_upstream_and_downstream(
1519 ctx: &RescheduleContext,
1520 fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1521 fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1522 fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1523 no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1524 new_actor: &mut StreamActor,
1525 dispatchers: &mut Vec<PbDispatcher>,
1526 ) -> MetaResult<()> {
1527 for dispatcher in dispatchers {
1529 let downstream_fragment_id = dispatcher
1530 .downstream_actor_id
1531 .iter()
1532 .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1533 .dedup()
1534 .exactly_one()
1535 .unwrap() as FragmentId;
1536
1537 let downstream_fragment_actors_to_remove =
1538 fragment_actors_to_remove.get(&downstream_fragment_id);
1539 let downstream_fragment_actors_to_create =
1540 fragment_actors_to_create.get(&downstream_fragment_id);
1541
1542 match dispatcher.r#type() {
1543 d @ (PbDispatcherType::Hash
1544 | PbDispatcherType::Simple
1545 | PbDispatcherType::Broadcast) => {
1546 if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1547 {
1548 dispatcher
1549 .downstream_actor_id
1550 .retain(|id| !downstream_actors_to_remove.contains_key(id));
1551 }
1552
1553 if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1554 {
1555 dispatcher
1556 .downstream_actor_id
1557 .extend(downstream_actors_to_create.keys().cloned())
1558 }
1559
1560 if d == PbDispatcherType::Simple {
1562 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1563 }
1564 }
1565 PbDispatcherType::NoShuffle => {
1566 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1567 let downstream_actor_id = no_shuffle_downstream_actors_map
1568 .get(&new_actor.actor_id)
1569 .and_then(|map| map.get(&downstream_fragment_id))
1570 .unwrap();
1571 dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1572 }
1573 PbDispatcherType::Unspecified => unreachable!(),
1574 }
1575
1576 if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1577 && let Some(downstream_updated_bitmap) =
1578 fragment_actor_bitmap.get(&downstream_fragment_id)
1579 {
1580 *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1582 }
1583 }
1584
1585 Ok(())
1586 }
1587
1588 #[await_tree::instrument]
1589 pub async fn post_apply_reschedule(
1590 &self,
1591 reschedules: &HashMap<FragmentId, Reschedule>,
1592 post_updates: &JobReschedulePostUpdates,
1593 ) -> MetaResult<()> {
1594 self.metadata_manager
1596 .post_apply_reschedules(reschedules.clone(), post_updates)
1597 .await?;
1598
1599 if !reschedules.is_empty() {
1601 let workers = self
1602 .metadata_manager
1603 .list_active_serving_compute_nodes()
1604 .await?;
1605 let streaming_parallelisms = self
1606 .metadata_manager
1607 .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1608 .await?;
1609 let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1610 let max_serving_parallelism = self
1611 .env
1612 .session_params_manager_impl_ref()
1613 .get_params()
1614 .await
1615 .batch_parallelism()
1616 .map(|p| p.get());
1617 let (upserted, failed) = serving_worker_slot_mapping.upsert(
1618 streaming_parallelisms,
1619 &workers,
1620 max_serving_parallelism,
1621 );
1622 if !upserted.is_empty() {
1623 tracing::debug!(
1624 "Update serving vnode mapping for fragments {:?}.",
1625 upserted.keys()
1626 );
1627 self.env
1628 .notification_manager()
1629 .notify_frontend_without_version(
1630 Operation::Update,
1631 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1632 mappings: to_fragment_worker_slot_mapping(&upserted),
1633 }),
1634 );
1635 }
1636 if !failed.is_empty() {
1637 tracing::debug!(
1638 "Fail to update serving vnode mapping for fragments {:?}.",
1639 failed
1640 );
1641 self.env
1642 .notification_manager()
1643 .notify_frontend_without_version(
1644 Operation::Delete,
1645 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1646 mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1647 }),
1648 );
1649 }
1650 }
1651
1652 let mut stream_source_actor_splits = HashMap::new();
1653 let mut stream_source_dropped_actors = HashSet::new();
1654
1655 for (fragment_id, reschedule) in reschedules {
1657 if !reschedule.actor_splits.is_empty() {
1658 stream_source_actor_splits
1659 .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1660 stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1661 }
1662 }
1663
1664 if !stream_source_actor_splits.is_empty() {
1665 self.source_manager
1666 .apply_source_change(SourceChange::Reschedule {
1667 split_assignment: stream_source_actor_splits,
1668 dropped_actors: stream_source_dropped_actors,
1669 })
1670 .await;
1671 }
1672
1673 Ok(())
1674 }
1675
1676 pub async fn generate_job_reschedule_plan(
1677 &self,
1678 policy: JobReschedulePolicy,
1679 generate_plan_for_cdc_table_backfill: bool,
1680 ) -> MetaResult<JobReschedulePlan> {
1681 type VnodeCount = usize;
1682
1683 let JobReschedulePolicy { targets } = policy;
1684
1685 let workers = self
1686 .metadata_manager
1687 .list_active_streaming_compute_nodes()
1688 .await?;
1689
1690 let workers: HashMap<_, _> = workers
1692 .into_iter()
1693 .filter(|worker| worker.is_streaming_schedulable())
1694 .map(|worker| (worker.id, worker))
1695 .collect();
1696
1697 #[derive(Debug)]
1698 struct JobUpdate {
1699 filtered_worker_ids: BTreeSet<WorkerId>,
1700 parallelism: TableParallelism,
1701 }
1702
1703 let mut job_parallelism_updates = HashMap::new();
1704
1705 let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1706 parallelism_updates: Default::default(),
1707 resource_group_updates: Default::default(),
1708 };
1709
1710 for (
1711 job_id,
1712 JobRescheduleTarget {
1713 parallelism: parallelism_update,
1714 resource_group: resource_group_update,
1715 },
1716 ) in &targets
1717 {
1718 let parallelism = match parallelism_update {
1719 JobParallelismTarget::Update(parallelism) => *parallelism,
1720 JobParallelismTarget::Refresh => {
1721 let parallelism = self
1722 .metadata_manager
1723 .catalog_controller
1724 .get_job_streaming_parallelisms(*job_id as _)
1725 .await?;
1726
1727 parallelism.into()
1728 }
1729 };
1730
1731 job_reschedule_post_updates
1732 .parallelism_updates
1733 .insert(TableId::from(*job_id), parallelism);
1734
1735 let current_resource_group = match resource_group_update {
1736 JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1737 job_reschedule_post_updates.resource_group_updates.insert(
1738 *job_id as ObjectId,
1739 Some(specific_resource_group.to_owned()),
1740 );
1741
1742 specific_resource_group.to_owned()
1743 }
1744 JobResourceGroupTarget::Update(None) => {
1745 let database_resource_group = self
1746 .metadata_manager
1747 .catalog_controller
1748 .get_existing_job_database_resource_group(*job_id as _)
1749 .await?;
1750
1751 job_reschedule_post_updates
1752 .resource_group_updates
1753 .insert(*job_id as ObjectId, None);
1754 database_resource_group
1755 }
1756 JobResourceGroupTarget::Keep => {
1757 self.metadata_manager
1758 .catalog_controller
1759 .get_existing_job_resource_group(*job_id as _)
1760 .await?
1761 }
1762 };
1763
1764 let filtered_worker_ids =
1765 filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1766
1767 if filtered_worker_ids.is_empty() {
1768 bail!("Cannot resize streaming_job {job_id} to empty worker set")
1769 }
1770
1771 job_parallelism_updates.insert(
1772 *job_id,
1773 JobUpdate {
1774 filtered_worker_ids,
1775 parallelism,
1776 },
1777 );
1778 }
1779
1780 let mut no_shuffle_source_fragment_ids = HashSet::new();
1782 let mut no_shuffle_target_fragment_ids = HashSet::new();
1783
1784 let mut fragment_distribution_map = HashMap::new();
1786 let mut actor_location = HashMap::new();
1788 let mut table_fragment_id_map = HashMap::new();
1790 let mut fragment_actor_id_map = HashMap::new();
1792
1793 async fn build_index(
1794 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1795 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1796 fragment_distribution_map: &mut HashMap<
1797 FragmentId,
1798 (FragmentDistributionType, VnodeCount, bool),
1799 >,
1800 actor_location: &mut HashMap<ActorId, WorkerId>,
1801 table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1802 fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1803 mgr: &MetadataManager,
1804 table_ids: Vec<ObjectId>,
1805 generate_plan_only_for_cdc_table_backfill: bool,
1806 ) -> Result<(), MetaError> {
1807 let RescheduleWorkingSet {
1808 fragments,
1809 actors,
1810 actor_dispatchers: _actor_dispatchers,
1811 fragment_downstreams,
1812 fragment_upstreams: _fragment_upstreams,
1813 related_jobs: _related_jobs,
1814 job_resource_groups: _job_resource_groups,
1815 } = mgr
1816 .catalog_controller
1817 .resolve_working_set_for_reschedule_tables(table_ids)
1818 .await?;
1819
1820 for (fragment_id, downstreams) in fragment_downstreams {
1821 for (downstream_fragment_id, dispatcher_type) in downstreams {
1822 if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1823 no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1824 no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1825 }
1826 }
1827 }
1828
1829 for (fragment_id, fragment) in fragments {
1830 let is_cdc_backfill_v2_fragment =
1831 FragmentTypeMask::from(fragment.fragment_type_mask)
1832 .contains(FragmentTypeFlag::StreamCdcScan);
1833 if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1834 continue;
1835 }
1836 fragment_distribution_map.insert(
1837 fragment_id as FragmentId,
1838 (
1839 FragmentDistributionType::from(fragment.distribution_type),
1840 fragment.vnode_count as _,
1841 is_cdc_backfill_v2_fragment,
1842 ),
1843 );
1844
1845 table_fragment_id_map
1846 .entry(fragment.job_id as u32)
1847 .or_default()
1848 .insert(fragment_id as FragmentId);
1849 }
1850
1851 for (actor_id, actor) in actors {
1852 actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1853 fragment_actor_id_map
1854 .entry(actor.fragment_id as FragmentId)
1855 .or_default()
1856 .insert(actor_id as ActorId);
1857 }
1858
1859 Ok(())
1860 }
1861
1862 let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1863
1864 build_index(
1865 &mut no_shuffle_source_fragment_ids,
1866 &mut no_shuffle_target_fragment_ids,
1867 &mut fragment_distribution_map,
1868 &mut actor_location,
1869 &mut table_fragment_id_map,
1870 &mut fragment_actor_id_map,
1871 &self.metadata_manager,
1872 table_ids,
1873 generate_plan_for_cdc_table_backfill,
1874 )
1875 .await?;
1876 tracing::debug!(
1877 ?job_reschedule_post_updates,
1878 ?job_parallelism_updates,
1879 ?no_shuffle_source_fragment_ids,
1880 ?no_shuffle_target_fragment_ids,
1881 ?fragment_distribution_map,
1882 ?actor_location,
1883 ?table_fragment_id_map,
1884 ?fragment_actor_id_map,
1885 "generate_table_resize_plan, after build_index"
1886 );
1887
1888 let adaptive_parallelism_strategy = self
1889 .env
1890 .system_params_reader()
1891 .await
1892 .adaptive_parallelism_strategy();
1893
1894 let mut target_plan = HashMap::new();
1895
1896 for (
1897 table_id,
1898 JobUpdate {
1899 filtered_worker_ids,
1900 parallelism,
1901 },
1902 ) in job_parallelism_updates
1903 {
1904 let assigner = AssignerBuilder::new(table_id).build();
1905
1906 let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1907
1908 let available_worker_slots = workers
1909 .iter()
1910 .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1911 .map(|(_, worker)| {
1912 (
1913 worker.id as WorkerId,
1914 NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1915 )
1916 })
1917 .collect::<BTreeMap<_, _>>();
1918
1919 for fragment_id in fragment_map {
1920 if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1922 continue;
1923 }
1924
1925 let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1926
1927 for actor_id in &fragment_actor_id_map[&fragment_id] {
1928 let worker_id = actor_location[actor_id];
1929 *fragment_slots.entry(worker_id).or_default() += 1;
1930 }
1931
1932 let available_slot_count: usize = available_worker_slots
1933 .values()
1934 .cloned()
1935 .map(NonZeroUsize::get)
1936 .sum();
1937
1938 if available_slot_count == 0 {
1939 bail!(
1940 "No schedulable slots available for fragment {}",
1941 fragment_id
1942 );
1943 }
1944
1945 let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1946 fragment_distribution_map[&fragment_id];
1947 let max_parallelism = vnode_count;
1948 let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1949 assert!(is_cdc_backfill_v2_fragment);
1950 let TableParallelism::Fixed(new_parallelism) = parallelism else {
1951 return Err(anyhow::anyhow!(
1952 "invalid new parallelism {:?}, expect fixed parallelism",
1953 parallelism
1954 )
1955 .into());
1956 };
1957 if new_parallelism > max_parallelism || new_parallelism == 0 {
1958 return Err(anyhow::anyhow!(
1959 "invalid new parallelism {}, max parallelism {}",
1960 new_parallelism,
1961 max_parallelism
1962 )
1963 .into());
1964 }
1965 TableParallelism::Fixed(new_parallelism)
1966 } else if is_cdc_backfill_v2_fragment {
1967 TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1968 } else {
1969 parallelism
1970 };
1971 match dist {
1972 FragmentDistributionType::Unspecified => unreachable!(),
1973 FragmentDistributionType::Single => {
1974 let (single_worker_id, should_be_one) = fragment_slots
1975 .iter()
1976 .exactly_one()
1977 .expect("single fragment should have only one worker slot");
1978
1979 assert_eq!(*should_be_one, 1);
1980
1981 let assignment =
1982 assigner.count_actors_per_worker(&available_worker_slots, 1);
1983
1984 let (chosen_target_worker_id, should_be_one) =
1985 assignment.iter().exactly_one().ok().with_context(|| {
1986 format!(
1987 "Cannot find a single target worker for fragment {fragment_id}"
1988 )
1989 })?;
1990
1991 assert_eq!(*should_be_one, 1);
1992
1993 if *chosen_target_worker_id == *single_worker_id {
1994 tracing::debug!(
1995 "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1996 );
1997 continue;
1998 }
1999
2000 target_plan.insert(
2001 fragment_id,
2002 WorkerReschedule {
2003 worker_actor_diff: BTreeMap::from_iter(vec![
2004 (*chosen_target_worker_id, 1),
2005 (*single_worker_id, -1),
2006 ]),
2007 },
2008 );
2009 }
2010 FragmentDistributionType::Hash => match fragment_parallelism_strategy {
2011 TableParallelism::Adaptive => {
2012 let target_slot_count = adaptive_parallelism_strategy
2013 .compute_target_parallelism(available_slot_count);
2014
2015 if target_slot_count > max_parallelism {
2016 tracing::warn!(
2017 "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2018 );
2019
2020 let target_worker_slots = assigner.count_actors_per_worker(
2021 &available_worker_slots,
2022 max_parallelism,
2023 );
2024
2025 target_plan.insert(
2026 fragment_id,
2027 Self::diff_worker_slot_changes(
2028 &fragment_slots,
2029 &target_worker_slots,
2030 ),
2031 );
2032 } else if available_slot_count != target_slot_count {
2033 tracing::info!(
2034 "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2035 );
2036
2037 let target_worker_slots = assigner.count_actors_per_worker(
2038 &available_worker_slots,
2039 target_slot_count,
2040 );
2041
2042 target_plan.insert(
2043 fragment_id,
2044 Self::diff_worker_slot_changes(
2045 &fragment_slots,
2046 &target_worker_slots,
2047 ),
2048 );
2049 } else {
2050 let available_worker_slots = available_worker_slots
2051 .iter()
2052 .map(|(worker_id, v)| (*worker_id, v.get()))
2053 .collect();
2054
2055 target_plan.insert(
2056 fragment_id,
2057 Self::diff_worker_slot_changes(
2058 &fragment_slots,
2059 &available_worker_slots,
2060 ),
2061 );
2062 }
2063 }
2064 TableParallelism::Fixed(mut n) => {
2065 if n > max_parallelism {
2066 tracing::warn!(
2067 "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2068 );
2069 n = max_parallelism
2070 }
2071
2072 let target_worker_slots =
2073 assigner.count_actors_per_worker(&available_worker_slots, n);
2074
2075 target_plan.insert(
2076 fragment_id,
2077 Self::diff_worker_slot_changes(
2078 &fragment_slots,
2079 &target_worker_slots,
2080 ),
2081 );
2082 }
2083 TableParallelism::Custom => {
2084 }
2086 },
2087 }
2088 }
2089 }
2090
2091 target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2092 tracing::debug!(
2093 ?target_plan,
2094 "generate_table_resize_plan finished target_plan"
2095 );
2096 if generate_plan_for_cdc_table_backfill {
2097 job_reschedule_post_updates.resource_group_updates = HashMap::default();
2098 job_reschedule_post_updates.parallelism_updates = HashMap::default();
2099 }
2100 Ok(JobReschedulePlan {
2101 reschedules: target_plan,
2102 post_updates: job_reschedule_post_updates,
2103 })
2104 }
2105
2106 fn diff_worker_slot_changes(
2107 fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2108 target_worker_slots: &BTreeMap<WorkerId, usize>,
2109 ) -> WorkerReschedule {
2110 let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2111 let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2112
2113 for (&worker_id, &target_slots) in target_worker_slots {
2114 let ¤t_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2115
2116 if target_slots > current_slots {
2117 increased_actor_count.insert(worker_id, target_slots - current_slots);
2118 }
2119 }
2120
2121 for (&worker_id, ¤t_slots) in fragment_worker_slots {
2122 let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2123
2124 if current_slots > target_slots {
2125 decreased_actor_count.insert(worker_id, current_slots - target_slots);
2126 }
2127 }
2128
2129 let worker_ids: HashSet<_> = increased_actor_count
2130 .keys()
2131 .chain(decreased_actor_count.keys())
2132 .cloned()
2133 .collect();
2134
2135 let mut worker_actor_diff = BTreeMap::new();
2136
2137 for worker_id in worker_ids {
2138 let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2139 let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2140 let change = increased - decreased;
2141
2142 assert_ne!(change, 0);
2143
2144 worker_actor_diff.insert(worker_id, change);
2145 }
2146
2147 WorkerReschedule { worker_actor_diff }
2148 }
2149
2150 fn build_no_shuffle_relation_index(
2151 actor_map: &HashMap<ActorId, CustomActorInfo>,
2152 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2153 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2154 ) {
2155 let mut fragment_cache = HashSet::new();
2156 for actor in actor_map.values() {
2157 if fragment_cache.contains(&actor.fragment_id) {
2158 continue;
2159 }
2160
2161 for dispatcher in &actor.dispatcher {
2162 for downstream_actor_id in &dispatcher.downstream_actor_id {
2163 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2164 if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2166 no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2167 no_shuffle_target_fragment_ids
2168 .insert(downstream_actor.fragment_id as FragmentId);
2169 }
2170 }
2171 }
2172 }
2173
2174 fragment_cache.insert(actor.fragment_id);
2175 }
2176 }
2177
2178 fn build_fragment_dispatcher_index(
2179 actor_map: &HashMap<ActorId, CustomActorInfo>,
2180 fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2181 ) {
2182 for actor in actor_map.values() {
2183 for dispatcher in &actor.dispatcher {
2184 for downstream_actor_id in &dispatcher.downstream_actor_id {
2185 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2186 fragment_dispatcher_map
2187 .entry(actor.fragment_id as FragmentId)
2188 .or_default()
2189 .insert(
2190 downstream_actor.fragment_id as FragmentId,
2191 dispatcher.r#type().into(),
2192 );
2193 }
2194 }
2195 }
2196 }
2197 }
2198
2199 pub fn resolve_no_shuffle_upstream_tables(
2200 fragment_ids: HashSet<FragmentId>,
2201 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2202 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2203 fragment_to_table: &HashMap<FragmentId, TableId>,
2204 fragment_upstreams: &HashMap<
2205 risingwave_meta_model::FragmentId,
2206 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2207 >,
2208 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2209 ) -> MetaResult<()> {
2210 let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2211
2212 let mut fragment_ids = fragment_ids;
2213
2214 while let Some(fragment_id) = queue.pop_front() {
2217 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2218 continue;
2219 }
2220
2221 for upstream_fragment_id in fragment_upstreams
2223 .get(&(fragment_id as _))
2224 .map(|upstreams| upstreams.keys())
2225 .into_iter()
2226 .flatten()
2227 {
2228 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2229 let upstream_fragment_id = &upstream_fragment_id;
2230 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2231 continue;
2232 }
2233
2234 let table_id = &fragment_to_table[&fragment_id];
2235 let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2236
2237 if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2239 if let Some(upstream_table_parallelism) =
2240 table_parallelisms.get(upstream_table_id)
2241 {
2242 if upstream_table_parallelism != &TableParallelism::Custom {
2243 bail!(
2244 "Cannot change upstream table {} from {:?} to {:?}",
2245 upstream_table_id,
2246 upstream_table_parallelism,
2247 TableParallelism::Custom
2248 )
2249 }
2250 } else {
2251 table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2252 }
2253 }
2254
2255 fragment_ids.insert(*upstream_fragment_id);
2256 queue.push_back(*upstream_fragment_id);
2257 }
2258 }
2259
2260 let downstream_fragment_ids = fragment_ids
2261 .iter()
2262 .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2263
2264 let downstream_table_ids = downstream_fragment_ids
2265 .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2266 .collect::<HashSet<_>>();
2267
2268 table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2269
2270 Ok(())
2271 }
2272
2273 pub fn resolve_no_shuffle_upstream_fragments<T>(
2274 reschedule: &mut HashMap<FragmentId, T>,
2275 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2276 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2277 fragment_upstreams: &HashMap<
2278 risingwave_meta_model::FragmentId,
2279 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2280 >,
2281 ) -> MetaResult<()>
2282 where
2283 T: Clone + Eq,
2284 {
2285 let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2286
2287 while let Some(fragment_id) = queue.pop_front() {
2290 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2291 continue;
2292 }
2293
2294 for upstream_fragment_id in fragment_upstreams
2296 .get(&(fragment_id as _))
2297 .map(|upstreams| upstreams.keys())
2298 .into_iter()
2299 .flatten()
2300 {
2301 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2302 let upstream_fragment_id = &upstream_fragment_id;
2303 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2304 continue;
2305 }
2306
2307 let reschedule_plan = &reschedule[&fragment_id];
2308
2309 if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2310 if upstream_reschedule_plan != reschedule_plan {
2311 bail!(
2312 "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2313 fragment_id,
2314 upstream_fragment_id
2315 );
2316 }
2317
2318 continue;
2319 }
2320
2321 reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2322
2323 queue.push_back(*upstream_fragment_id);
2324 }
2325 }
2326
2327 reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2328
2329 Ok(())
2330 }
2331
2332 pub async fn resolve_related_no_shuffle_jobs(
2333 &self,
2334 jobs: &[TableId],
2335 ) -> MetaResult<HashSet<TableId>> {
2336 let RescheduleWorkingSet { related_jobs, .. } = self
2337 .metadata_manager
2338 .catalog_controller
2339 .resolve_working_set_for_reschedule_tables(
2340 jobs.iter().map(|id| id.table_id as _).collect(),
2341 )
2342 .await?;
2343
2344 Ok(related_jobs
2345 .keys()
2346 .map(|id| TableId::new(*id as _))
2347 .collect())
2348 }
2349}
2350
2351#[derive(Debug, Clone)]
2352pub enum JobParallelismTarget {
2353 Update(TableParallelism),
2354 Refresh,
2355}
2356
2357#[derive(Debug, Clone)]
2358pub enum JobResourceGroupTarget {
2359 Update(Option<String>),
2360 Keep,
2361}
2362
2363#[derive(Debug, Clone)]
2364pub struct JobRescheduleTarget {
2365 pub parallelism: JobParallelismTarget,
2366 pub resource_group: JobResourceGroupTarget,
2367}
2368
2369#[derive(Debug)]
2370pub struct JobReschedulePolicy {
2371 pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2372}
2373
2374#[derive(Debug, Clone)]
2376pub struct JobReschedulePostUpdates {
2377 pub parallelism_updates: HashMap<TableId, TableParallelism>,
2378 pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2379}
2380
2381#[derive(Debug)]
2382pub struct JobReschedulePlan {
2383 pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2384 pub post_updates: JobReschedulePostUpdates,
2385}
2386
2387impl GlobalStreamManager {
2388 #[await_tree::instrument("acquire_reschedule_read_guard")]
2389 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2390 self.scale_controller.reschedule_lock.read().await
2391 }
2392
2393 #[await_tree::instrument("acquire_reschedule_write_guard")]
2394 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2395 self.scale_controller.reschedule_lock.write().await
2396 }
2397
2398 pub async fn reschedule_actors(
2406 &self,
2407 database_id: DatabaseId,
2408 plan: JobReschedulePlan,
2409 options: RescheduleOptions,
2410 ) -> MetaResult<()> {
2411 let JobReschedulePlan {
2412 reschedules,
2413 mut post_updates,
2414 } = plan;
2415
2416 let reschedule_fragment = self
2417 .scale_controller
2418 .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2419 .await?;
2420
2421 tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2422
2423 let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2424 .iter()
2425 .flat_map(|(_, reschedule)| {
2426 reschedule
2427 .upstream_fragment_dispatcher_ids
2428 .iter()
2429 .map(|(fragment_id, _)| *fragment_id)
2430 .chain(reschedule.downstream_fragment_ids.iter().cloned())
2431 })
2432 .collect();
2433
2434 let fragment_actors =
2435 try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2436 let actor_ids = self
2437 .metadata_manager
2438 .get_running_actors_of_fragment(*fragment_id)
2439 .await?;
2440 Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2441 }))
2442 .await?
2443 .into_iter()
2444 .collect();
2445
2446 let command = Command::RescheduleFragment {
2447 reschedules: reschedule_fragment,
2448 fragment_actors,
2449 post_updates,
2450 };
2451
2452 let _guard = self.source_manager.pause_tick().await;
2453
2454 self.barrier_scheduler
2455 .run_command(database_id, command)
2456 .await?;
2457
2458 tracing::info!("reschedule done");
2459
2460 Ok(())
2461 }
2462
2463 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2472 tracing::info!("trigger parallelism control");
2473
2474 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2475
2476 let background_streaming_jobs = self
2477 .metadata_manager
2478 .list_background_creating_jobs()
2479 .await?;
2480
2481 let skipped_jobs = if !background_streaming_jobs.is_empty() {
2482 let jobs = self
2483 .scale_controller
2484 .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2485 .await?;
2486
2487 tracing::info!(
2488 "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2489 background_streaming_jobs,
2490 jobs
2491 );
2492
2493 jobs
2494 } else {
2495 HashSet::new()
2496 };
2497
2498 let job_ids: HashSet<_> = {
2499 let streaming_parallelisms = self
2500 .metadata_manager
2501 .catalog_controller
2502 .get_all_streaming_parallelisms()
2503 .await?;
2504
2505 streaming_parallelisms
2506 .into_iter()
2507 .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2508 .map(|(table_id, _)| table_id)
2509 .collect()
2510 };
2511
2512 let workers = self
2513 .metadata_manager
2514 .cluster_controller
2515 .list_active_streaming_workers()
2516 .await?;
2517
2518 let schedulable_worker_ids: BTreeSet<_> = workers
2519 .iter()
2520 .filter(|worker| {
2521 !worker
2522 .property
2523 .as_ref()
2524 .map(|p| p.is_unschedulable)
2525 .unwrap_or(false)
2526 })
2527 .map(|worker| worker.id as WorkerId)
2528 .collect();
2529
2530 if job_ids.is_empty() {
2531 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2532 return Ok(false);
2533 }
2534
2535 let batch_size = match self.env.opts.parallelism_control_batch_size {
2536 0 => job_ids.len(),
2537 n => n,
2538 };
2539
2540 tracing::info!(
2541 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2542 job_ids.len(),
2543 batch_size,
2544 schedulable_worker_ids
2545 );
2546
2547 let batches: Vec<_> = job_ids
2548 .into_iter()
2549 .chunks(batch_size)
2550 .into_iter()
2551 .map(|chunk| chunk.collect_vec())
2552 .collect();
2553
2554 let mut reschedules = None;
2555
2556 for batch in batches {
2557 let targets: HashMap<_, _> = batch
2558 .into_iter()
2559 .map(|job_id| {
2560 (
2561 job_id as u32,
2562 JobRescheduleTarget {
2563 parallelism: JobParallelismTarget::Refresh,
2564 resource_group: JobResourceGroupTarget::Keep,
2565 },
2566 )
2567 })
2568 .collect();
2569
2570 let plan = self
2571 .scale_controller
2572 .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2573 .await?;
2574
2575 if !plan.reschedules.is_empty() {
2576 tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2577 reschedules = Some(plan);
2578 break;
2579 }
2580 }
2581
2582 let Some(plan) = reschedules else {
2583 tracing::info!("no reschedule plan generated");
2584 return Ok(false);
2585 };
2586
2587 for (database_id, reschedules) in self
2589 .metadata_manager
2590 .split_fragment_map_by_database(plan.reschedules)
2591 .await?
2592 {
2593 self.reschedule_actors(
2594 database_id,
2595 JobReschedulePlan {
2596 reschedules,
2597 post_updates: plan.post_updates.clone(),
2598 },
2599 RescheduleOptions {
2600 resolve_no_shuffle_upstream: false,
2601 skip_create_new_actors: false,
2602 },
2603 )
2604 .await?;
2605 }
2606
2607 Ok(true)
2608 }
2609
2610 async fn run(&self, mut shutdown_rx: Receiver<()>) {
2612 tracing::info!("starting automatic parallelism control monitor");
2613
2614 let check_period =
2615 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2616
2617 let mut ticker = tokio::time::interval_at(
2618 Instant::now()
2619 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2620 check_period,
2621 );
2622 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2623
2624 ticker.tick().await;
2626
2627 let (local_notification_tx, mut local_notification_rx) =
2628 tokio::sync::mpsc::unbounded_channel();
2629
2630 self.env
2631 .notification_manager()
2632 .insert_local_sender(local_notification_tx);
2633
2634 let worker_nodes = self
2635 .metadata_manager
2636 .list_active_streaming_compute_nodes()
2637 .await
2638 .expect("list active streaming compute nodes");
2639
2640 let mut worker_cache: BTreeMap<_, _> = worker_nodes
2641 .into_iter()
2642 .map(|worker| (worker.id, worker))
2643 .collect();
2644
2645 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2646
2647 let mut should_trigger = false;
2648
2649 loop {
2650 tokio::select! {
2651 biased;
2652
2653 _ = &mut shutdown_rx => {
2654 tracing::info!("Stream manager is stopped");
2655 break;
2656 }
2657
2658 _ = ticker.tick(), if should_trigger => {
2659 let include_workers = worker_cache.keys().copied().collect_vec();
2660
2661 if include_workers.is_empty() {
2662 tracing::debug!("no available worker nodes");
2663 should_trigger = false;
2664 continue;
2665 }
2666
2667 match self.trigger_parallelism_control().await {
2668 Ok(cont) => {
2669 should_trigger = cont;
2670 }
2671 Err(e) => {
2672 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2673 ticker.reset();
2674 }
2675 }
2676 }
2677
2678 notification = local_notification_rx.recv() => {
2679 let notification = notification.expect("local notification channel closed in loop of stream manager");
2680
2681 let worker_is_streaming_compute = |worker: &WorkerNode| {
2683 worker.get_type() == Ok(WorkerType::ComputeNode)
2684 && worker.property.as_ref().unwrap().is_streaming
2685 };
2686
2687 match notification {
2688 LocalNotification::SystemParamsChange(reader) => {
2689 let new_strategy = reader.adaptive_parallelism_strategy();
2690 if new_strategy != previous_adaptive_parallelism_strategy {
2691 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2692 should_trigger = true;
2693 previous_adaptive_parallelism_strategy = new_strategy;
2694 }
2695 }
2696 LocalNotification::WorkerNodeActivated(worker) => {
2697 if !worker_is_streaming_compute(&worker) {
2698 continue;
2699 }
2700
2701 tracing::info!(worker = worker.id, "worker activated notification received");
2702
2703 let prev_worker = worker_cache.insert(worker.id, worker.clone());
2704
2705 match prev_worker {
2706 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
2707 tracing::info!(worker = worker.id, "worker parallelism changed");
2708 should_trigger = true;
2709 }
2710 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
2711 tracing::info!(worker = worker.id, "worker label changed");
2712 should_trigger = true;
2713 }
2714 None => {
2715 tracing::info!(worker = worker.id, "new worker joined");
2716 should_trigger = true;
2717 }
2718 _ => {}
2719 }
2720 }
2721
2722 LocalNotification::WorkerNodeDeleted(worker) => {
2725 if !worker_is_streaming_compute(&worker) {
2726 continue;
2727 }
2728
2729 match worker_cache.remove(&worker.id) {
2730 Some(prev_worker) => {
2731 tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2732 }
2733 None => {
2734 tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2735 }
2736 }
2737 }
2738
2739 _ => {}
2740 }
2741 }
2742 }
2743 }
2744 }
2745
2746 pub fn start_auto_parallelism_monitor(
2747 self: Arc<Self>,
2748 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2749 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2750 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2751 let join_handle = tokio::spawn(async move {
2752 self.run(shutdown_rx).await;
2753 });
2754
2755 (join_handle, shutdown_tx)
2756 }
2757}