1use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36 FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50 ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53 ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60 pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64 pub job_id: u32,
65 pub fragment_id: u32,
66 pub fragment_type_mask: FragmentTypeMask,
67 pub distribution_type: PbFragmentDistributionType,
68 pub state_table_ids: Vec<u32>,
69 pub node: StreamNode,
70 pub actor_template: StreamActorWithDispatchers,
71 pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76 pub actor_id: u32,
77 pub fragment_id: u32,
78 pub dispatcher: Vec<Dispatcher>,
79 pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use futures::future::try_join_all;
85use risingwave_common::system_param::AdaptiveParallelismStrategy;
86use risingwave_common::system_param::reader::SystemParamsRead;
87use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
88use risingwave_meta_model::DispatcherType;
89use risingwave_pb::stream_plan::stream_node::NodeBody;
90
91use crate::controller::id::IdCategory;
92use crate::controller::utils::filter_workers_by_resource_group;
93use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
94
95#[derive(Educe)]
97#[educe(Debug)]
98pub struct RescheduleContext {
99 #[educe(Debug(ignore))]
101 actor_map: HashMap<ActorId, CustomActorInfo>,
102 actor_status: BTreeMap<ActorId, WorkerId>,
104 #[educe(Debug(ignore))]
106 fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
107 stream_source_fragment_ids: HashSet<FragmentId>,
109 stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
111 no_shuffle_target_fragment_ids: HashSet<FragmentId>,
113 no_shuffle_source_fragment_ids: HashSet<FragmentId>,
115 fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
117 fragment_upstreams: HashMap<
118 risingwave_meta_model::FragmentId,
119 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
120 >,
121}
122
123impl RescheduleContext {
124 fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
125 self.actor_status
126 .get(actor_id)
127 .cloned()
128 .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
129 }
130}
131
132pub fn rebalance_actor_vnode(
159 actors: &[CustomActorInfo],
160 actors_to_remove: &BTreeSet<ActorId>,
161 actors_to_create: &BTreeSet<ActorId>,
162) -> HashMap<ActorId, Bitmap> {
163 let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
164
165 assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
166 assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
167
168 assert!(actors.len() >= actors_to_remove.len());
169
170 let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
171 assert!(target_actor_count > 0);
172
173 let vnode_count = actors[0]
175 .vnode_bitmap
176 .as_ref()
177 .expect("vnode bitmap unset")
178 .len();
179
180 #[derive(Debug)]
182 struct Balance {
183 actor_id: ActorId,
184 balance: i32,
185 builder: BitmapBuilder,
186 }
187 let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
188
189 tracing::debug!(
190 "expected {}, remain {}, prev actors {}, target actors {}",
191 expected,
192 remain,
193 actors.len(),
194 target_actor_count,
195 );
196
197 let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
198 .iter()
199 .map(|actor| {
200 (
201 actor.actor_id as ActorId,
202 actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
203 )
204 })
205 .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
206
207 let order_by_bitmap_desc =
208 |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
209 bitmap_a
210 .count_ones()
211 .cmp(&bitmap_b.count_ones())
212 .reverse()
213 .then(id_a.cmp(id_b))
214 };
215
216 let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
217 let mut builder = BitmapBuilder::default();
218 builder.append_bitmap(bitmap);
219 builder
220 };
221
222 let (prev_expected, _) = vnode_count.div_rem(&actors.len());
223
224 let prev_remain = removed
225 .iter()
226 .map(|(_, bitmap)| {
227 assert!(bitmap.count_ones() >= prev_expected);
228 bitmap.count_ones() - prev_expected
229 })
230 .sum::<usize>();
231
232 removed.sort_by(order_by_bitmap_desc);
233 rest.sort_by(order_by_bitmap_desc);
234
235 let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
236 actor_id,
237 balance: bitmap.count_ones() as i32,
238 builder: builder_from_bitmap(&bitmap),
239 });
240
241 let mut rest_balances = rest
242 .into_iter()
243 .map(|(actor_id, bitmap)| Balance {
244 actor_id,
245 balance: bitmap.count_ones() as i32 - expected as i32,
246 builder: builder_from_bitmap(&bitmap),
247 })
248 .collect_vec();
249
250 let mut created_balances = actors_to_create
251 .iter()
252 .map(|actor_id| Balance {
253 actor_id: *actor_id,
254 balance: -(expected as i32),
255 builder: BitmapBuilder::zeroed(vnode_count),
256 })
257 .collect_vec();
258
259 for balance in created_balances
260 .iter_mut()
261 .rev()
262 .take(prev_remain)
263 .chain(rest_balances.iter_mut())
264 {
265 if remain > 0 {
266 balance.balance -= 1;
267 remain -= 1;
268 }
269 }
270
271 for balance in &mut created_balances {
273 if remain > 0 {
274 balance.balance -= 1;
275 remain -= 1;
276 }
277 }
278
279 assert_eq!(remain, 0);
280
281 let mut v: VecDeque<_> = removed_balances
282 .chain(rest_balances)
283 .chain(created_balances)
284 .collect();
285
286 let mut result = HashMap::with_capacity(target_actor_count);
289
290 for balance in &v {
291 tracing::debug!(
292 "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
293 balance.actor_id,
294 balance.balance,
295 actors_to_remove.contains(&balance.actor_id),
296 actors_to_create.contains(&balance.actor_id)
297 );
298 }
299
300 while !v.is_empty() {
301 if v.len() == 1 {
302 let single = v.pop_front().unwrap();
303 assert_eq!(single.balance, 0);
304 if !actors_to_remove.contains(&single.actor_id) {
305 result.insert(single.actor_id, single.builder.finish());
306 }
307
308 continue;
309 }
310
311 let mut src = v.pop_front().unwrap();
312 let mut dst = v.pop_back().unwrap();
313
314 let n = min(abs(src.balance), abs(dst.balance));
315
316 let mut moved = 0;
317 for idx in (0..vnode_count).rev() {
318 if moved >= n {
319 break;
320 }
321
322 if src.builder.is_set(idx) {
323 src.builder.set(idx, false);
324 assert!(!dst.builder.is_set(idx));
325 dst.builder.set(idx, true);
326 moved += 1;
327 }
328 }
329
330 src.balance -= n;
331 dst.balance += n;
332
333 if src.balance != 0 {
334 v.push_front(src);
335 } else if !actors_to_remove.contains(&src.actor_id) {
336 result.insert(src.actor_id, src.builder.finish());
337 }
338
339 if dst.balance != 0 {
340 v.push_back(dst);
341 } else {
342 result.insert(dst.actor_id, dst.builder.finish());
343 }
344 }
345
346 result
347}
348
349#[derive(Debug, Clone, Copy)]
350pub struct RescheduleOptions {
351 pub resolve_no_shuffle_upstream: bool,
353
354 pub skip_create_new_actors: bool,
356}
357
358pub type ScaleControllerRef = Arc<ScaleController>;
359
360pub struct ScaleController {
361 pub metadata_manager: MetadataManager,
362
363 pub source_manager: SourceManagerRef,
364
365 pub env: MetaSrvEnv,
366
367 pub reschedule_lock: RwLock<()>,
370}
371
372impl ScaleController {
373 pub fn new(
374 metadata_manager: &MetadataManager,
375 source_manager: SourceManagerRef,
376 env: MetaSrvEnv,
377 ) -> Self {
378 Self {
379 metadata_manager: metadata_manager.clone(),
380 source_manager,
381 env,
382 reschedule_lock: RwLock::new(()),
383 }
384 }
385
386 pub async fn integrity_check(&self) -> MetaResult<()> {
387 self.metadata_manager
388 .catalog_controller
389 .integrity_check()
390 .await
391 }
392
393 async fn build_reschedule_context(
395 &self,
396 reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
397 options: RescheduleOptions,
398 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
399 ) -> MetaResult<RescheduleContext> {
400 let worker_nodes: HashMap<WorkerId, WorkerNode> = self
401 .metadata_manager
402 .list_active_streaming_compute_nodes()
403 .await?
404 .into_iter()
405 .map(|worker_node| (worker_node.id as _, worker_node))
406 .collect();
407
408 if worker_nodes.is_empty() {
409 bail!("no available compute node in the cluster");
410 }
411
412 let unschedulable_worker_ids: HashSet<_> = worker_nodes
414 .values()
415 .filter(|w| {
416 w.property
417 .as_ref()
418 .map(|property| property.is_unschedulable)
419 .unwrap_or(false)
420 })
421 .map(|worker| worker.id as WorkerId)
422 .collect();
423
424 for (fragment_id, reschedule) in &*reschedule {
425 for (worker_id, change) in &reschedule.worker_actor_diff {
426 if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
427 bail!(
428 "unable to move fragment {} to unschedulable worker {}",
429 fragment_id,
430 worker_id
431 );
432 }
433 }
434 }
435
436 let mut actor_map = HashMap::new();
439 let mut fragment_map = HashMap::new();
441 let mut actor_status = BTreeMap::new();
443 let mut fragment_state = HashMap::new();
444 let mut fragment_to_table = HashMap::new();
445
446 fn fulfill_index_by_fragment_ids(
447 actor_map: &mut HashMap<u32, CustomActorInfo>,
448 fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
449 actor_status: &mut BTreeMap<ActorId, WorkerId>,
450 fragment_state: &mut HashMap<FragmentId, State>,
451 fragment_to_table: &mut HashMap<FragmentId, TableId>,
452 fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
453 actors: HashMap<ActorId, actor::Model>,
454 mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
455 related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
456 ) {
457 let mut fragment_actors: HashMap<
458 risingwave_meta_model::FragmentId,
459 Vec<CustomActorInfo>,
460 > = HashMap::new();
461
462 let mut expr_contexts = HashMap::new();
463 for (
464 _,
465 actor::Model {
466 actor_id,
467 fragment_id,
468 status: _,
469 worker_id,
470 vnode_bitmap,
471 expr_context,
472 ..
473 },
474 ) in actors
475 {
476 let dispatchers = actor_dispatchers
477 .remove(&(actor_id as _))
478 .unwrap_or_default();
479
480 let actor_info = CustomActorInfo {
481 actor_id: actor_id as _,
482 fragment_id: fragment_id as _,
483 dispatcher: dispatchers,
484 vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
485 };
486
487 actor_map.insert(actor_id as _, actor_info.clone());
488
489 fragment_actors
490 .entry(fragment_id as _)
491 .or_default()
492 .push(actor_info);
493
494 actor_status.insert(actor_id as _, worker_id as WorkerId);
495
496 expr_contexts.insert(actor_id as u32, expr_context);
497 }
498
499 for (
500 _,
501 fragment::Model {
502 fragment_id,
503 job_id,
504 fragment_type_mask,
505 distribution_type,
506 stream_node,
507 state_table_ids,
508 ..
509 },
510 ) in fragments
511 {
512 let actors = fragment_actors
513 .remove(&(fragment_id as _))
514 .unwrap_or_default();
515
516 let CustomActorInfo {
517 actor_id,
518 fragment_id,
519 dispatcher,
520 vnode_bitmap,
521 } = actors.first().unwrap().clone();
522
523 let (related_job, job_definition) =
524 related_jobs.get(&job_id).expect("job not found");
525
526 let fragment = CustomFragmentInfo {
527 job_id: job_id as _,
528 fragment_id: fragment_id as _,
529 fragment_type_mask: fragment_type_mask.into(),
530 distribution_type: distribution_type.into(),
531 state_table_ids: state_table_ids.into_u32_array(),
532 node: stream_node.to_protobuf(),
533 actor_template: (
534 StreamActor {
535 actor_id,
536 fragment_id: fragment_id as _,
537 vnode_bitmap,
538 mview_definition: job_definition.to_owned(),
539 expr_context: expr_contexts
540 .get(&actor_id)
541 .cloned()
542 .map(|expr_context| expr_context.to_protobuf()),
543 },
544 dispatcher,
545 ),
546 actors,
547 };
548
549 fragment_map.insert(fragment_id as _, fragment);
550
551 fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
552
553 fragment_state.insert(
554 fragment_id,
555 table_fragments::PbState::from(related_job.job_status),
556 );
557 }
558 }
559 let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
560 let working_set = self
561 .metadata_manager
562 .catalog_controller
563 .resolve_working_set_for_reschedule_fragments(fragment_ids)
564 .await?;
565
566 fulfill_index_by_fragment_ids(
567 &mut actor_map,
568 &mut fragment_map,
569 &mut actor_status,
570 &mut fragment_state,
571 &mut fragment_to_table,
572 working_set.fragments,
573 working_set.actors,
574 working_set.actor_dispatchers,
575 working_set.related_jobs,
576 );
577
578 let mut no_shuffle_source_fragment_ids = HashSet::new();
580 let mut no_shuffle_target_fragment_ids = HashSet::new();
581
582 Self::build_no_shuffle_relation_index(
583 &actor_map,
584 &mut no_shuffle_source_fragment_ids,
585 &mut no_shuffle_target_fragment_ids,
586 );
587
588 if options.resolve_no_shuffle_upstream {
589 let original_reschedule_keys = reschedule.keys().cloned().collect();
590
591 Self::resolve_no_shuffle_upstream_fragments(
592 reschedule,
593 &no_shuffle_source_fragment_ids,
594 &no_shuffle_target_fragment_ids,
595 &working_set.fragment_upstreams,
596 )?;
597
598 if !table_parallelisms.is_empty() {
599 Self::resolve_no_shuffle_upstream_tables(
601 original_reschedule_keys,
602 &no_shuffle_source_fragment_ids,
603 &no_shuffle_target_fragment_ids,
604 &fragment_to_table,
605 &working_set.fragment_upstreams,
606 table_parallelisms,
607 )?;
608 }
609 }
610
611 let mut fragment_dispatcher_map = HashMap::new();
612 Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
613
614 let mut stream_source_fragment_ids = HashSet::new();
615 let mut stream_source_backfill_fragment_ids = HashMap::new();
616 let mut no_shuffle_reschedule = HashMap::new();
617 for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
618 let fragment = fragment_map
619 .get(fragment_id)
620 .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
621
622 match fragment_state[fragment_id] {
624 table_fragments::State::Unspecified => unreachable!(),
625 state @ table_fragments::State::Initial => {
626 bail!(
627 "the materialized view of fragment {fragment_id} is in state {}",
628 state.as_str_name()
629 )
630 }
631 state @ table_fragments::State::Creating => {
632 let stream_node = &fragment.node;
633
634 let mut is_reschedulable = true;
635 visit_stream_node_cont(stream_node, |body| {
636 if let Some(NodeBody::StreamScan(node)) = &body.node_body {
637 if !node.stream_scan_type().is_reschedulable() {
638 is_reschedulable = false;
639
640 return false;
642 }
643
644 return true;
646 }
647
648 true
650 });
651
652 if !is_reschedulable {
653 bail!(
654 "the materialized view of fragment {fragment_id} is in state {}",
655 state.as_str_name()
656 )
657 }
658 }
659 table_fragments::State::Created => {}
660 }
661
662 if no_shuffle_target_fragment_ids.contains(fragment_id) {
663 bail!(
664 "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
665 );
666 }
667
668 if no_shuffle_source_fragment_ids.contains(fragment_id) {
673 let mut queue: VecDeque<_> = fragment_dispatcher_map
675 .get(fragment_id)
676 .unwrap()
677 .keys()
678 .cloned()
679 .collect();
680
681 while let Some(downstream_id) = queue.pop_front() {
682 if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
683 continue;
684 }
685
686 if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
687 {
688 let no_shuffle_downstreams = downstream_fragments
689 .iter()
690 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
691 .map(|(fragment_id, _)| fragment_id);
692
693 queue.extend(no_shuffle_downstreams.copied());
694 }
695
696 no_shuffle_reschedule.insert(
697 downstream_id,
698 WorkerReschedule {
699 worker_actor_diff: worker_actor_diff.clone(),
700 },
701 );
702 }
703 }
704
705 if fragment
706 .fragment_type_mask
707 .contains(FragmentTypeFlag::Source)
708 && fragment.node.find_stream_source().is_some()
709 {
710 stream_source_fragment_ids.insert(*fragment_id);
711 }
712
713 let current_worker_ids = fragment
715 .actors
716 .iter()
717 .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
718 .collect::<HashSet<_>>();
719
720 for (removed, change) in worker_actor_diff {
721 if !current_worker_ids.contains(removed) && change.is_negative() {
722 bail!(
723 "no actor on the worker {} of fragment {}",
724 removed,
725 fragment_id
726 );
727 }
728 }
729
730 let added_actor_count: usize = worker_actor_diff
731 .values()
732 .filter(|change| change.is_positive())
733 .cloned()
734 .map(|change| change as usize)
735 .sum();
736
737 let removed_actor_count: usize = worker_actor_diff
738 .values()
739 .filter(|change| change.is_positive())
740 .cloned()
741 .map(|v| v.unsigned_abs())
742 .sum();
743
744 match fragment.distribution_type {
745 FragmentDistributionType::Hash => {
746 if fragment.actors.len() + added_actor_count <= removed_actor_count {
747 bail!("can't remove all actors from fragment {}", fragment_id);
748 }
749 }
750 FragmentDistributionType::Single => {
751 if added_actor_count != removed_actor_count {
752 bail!("single distribution fragment only support migration");
753 }
754 }
755 FragmentDistributionType::Unspecified => unreachable!(),
756 }
757 }
758
759 if !no_shuffle_reschedule.is_empty() {
760 tracing::info!(
761 "reschedule plan rewritten with NoShuffle reschedule {:?}",
762 no_shuffle_reschedule
763 );
764
765 for noshuffle_downstream in no_shuffle_reschedule.keys() {
766 let fragment = fragment_map.get(noshuffle_downstream).unwrap();
767 if fragment
769 .fragment_type_mask
770 .contains(FragmentTypeFlag::SourceScan)
771 {
772 let stream_node = &fragment.node;
773 if let Some((_source_id, upstream_source_fragment_id)) =
774 stream_node.find_source_backfill()
775 {
776 stream_source_backfill_fragment_ids
777 .insert(fragment.fragment_id, upstream_source_fragment_id);
778 }
779 }
780 }
781 }
782
783 reschedule.extend(no_shuffle_reschedule.into_iter());
785
786 Ok(RescheduleContext {
787 actor_map,
788 actor_status,
789 fragment_map,
790 stream_source_fragment_ids,
791 stream_source_backfill_fragment_ids,
792 no_shuffle_target_fragment_ids,
793 no_shuffle_source_fragment_ids,
794 fragment_dispatcher_map,
795 fragment_upstreams: working_set.fragment_upstreams,
796 })
797 }
798
799 pub(crate) async fn analyze_reschedule_plan(
811 &self,
812 mut reschedules: HashMap<FragmentId, WorkerReschedule>,
813 options: RescheduleOptions,
814 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
815 ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
816 tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
817 let ctx = self
818 .build_reschedule_context(&mut reschedules, options, table_parallelisms)
819 .await?;
820 tracing::debug!("reschedule context: {:#?}", ctx);
821 let reschedules = reschedules;
822
823 let (fragment_actors_to_remove, fragment_actors_to_create) =
828 self.arrange_reschedules(&reschedules, &ctx)?;
829
830 let mut fragment_actor_bitmap = HashMap::new();
831 for fragment_id in reschedules.keys() {
832 if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
833 continue;
836 }
837
838 let actors_to_create = fragment_actors_to_create
839 .get(fragment_id)
840 .map(|map| map.keys().copied().collect())
841 .unwrap_or_default();
842
843 let actors_to_remove = fragment_actors_to_remove
844 .get(fragment_id)
845 .map(|map| map.keys().copied().collect())
846 .unwrap_or_default();
847
848 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
849
850 match fragment.distribution_type {
851 FragmentDistributionType::Single => {
852 fragment_actor_bitmap
854 .insert(fragment.fragment_id as FragmentId, Default::default());
855 }
856 FragmentDistributionType::Hash => {
857 let actor_vnode = rebalance_actor_vnode(
858 &fragment.actors,
859 &actors_to_remove,
860 &actors_to_create,
861 );
862
863 fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
864 }
865
866 FragmentDistributionType::Unspecified => unreachable!(),
867 }
868 }
869
870 let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
874 for fragment_id in reschedules.keys() {
875 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
876 let mut new_actor_ids = BTreeMap::new();
877 for actor in &fragment.actors {
878 if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
879 && actors_to_remove.contains_key(&actor.actor_id)
880 {
881 continue;
882 }
883 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
884 new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
885 }
886
887 if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
888 for (actor_id, worker_id) in actors_to_create {
889 new_actor_ids.insert(*actor_id, *worker_id);
890 }
891 }
892
893 assert!(
894 !new_actor_ids.is_empty(),
895 "should be at least one actor in fragment {} after rescheduling",
896 fragment_id
897 );
898
899 fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
900 }
901
902 let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
903
904 fn arrange_no_shuffle_relation(
910 ctx: &RescheduleContext,
911 fragment_id: &FragmentId,
912 upstream_fragment_id: &FragmentId,
913 fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
914 actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
915 fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
916 no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
917 no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
918 ) {
919 if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
920 return;
921 }
922
923 let fragment = &ctx.fragment_map[fragment_id];
924
925 let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
926
927 for upstream_actor in &upstream_fragment.actors {
929 for dispatcher in &upstream_actor.dispatcher {
930 if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
931 let downstream_actor_id =
932 *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
933
934 if !ctx
936 .no_shuffle_target_fragment_ids
937 .contains(upstream_fragment_id)
938 {
939 actor_group_map.insert(
940 upstream_actor.actor_id,
941 (upstream_fragment.fragment_id, upstream_actor.actor_id),
942 );
943 actor_group_map.insert(
944 downstream_actor_id,
945 (upstream_fragment.fragment_id, upstream_actor.actor_id),
946 );
947 } else {
948 let root_actor_id = actor_group_map[&upstream_actor.actor_id];
949
950 actor_group_map.insert(downstream_actor_id, root_actor_id);
951 }
952 }
953 }
954 }
955
956 let upstream_fragment_bitmap = fragment_updated_bitmap
958 .get(upstream_fragment_id)
959 .cloned()
960 .unwrap_or_default();
961
962 if upstream_fragment.distribution_type == FragmentDistributionType::Single {
964 assert!(
965 upstream_fragment_bitmap.is_empty(),
966 "single fragment should have no bitmap updates"
967 );
968 }
969
970 let upstream_fragment_actor_map = fragment_actors_after_reschedule
971 .get(upstream_fragment_id)
972 .cloned()
973 .unwrap();
974
975 let fragment_actor_map = fragment_actors_after_reschedule
976 .get(fragment_id)
977 .cloned()
978 .unwrap();
979
980 let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
981
982 let mut fragment_bitmap = HashMap::new();
984
985 for (actor_id, worker_id) in &fragment_actor_map {
986 if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
987 let root_bitmap = fragment_updated_bitmap
988 .get(root_fragment)
989 .expect("root fragment bitmap not found")
990 .get(root_actor_id)
991 .cloned()
992 .expect("root actor bitmap not found");
993
994 fragment_bitmap.insert(*actor_id, root_bitmap);
996
997 no_shuffle_upstream_actor_map
998 .entry(*actor_id as ActorId)
999 .or_default()
1000 .insert(*upstream_fragment_id, *root_actor_id);
1001 no_shuffle_downstream_actors_map
1002 .entry(*root_actor_id)
1003 .or_default()
1004 .insert(*fragment_id, *actor_id);
1005 } else {
1006 worker_reverse_index
1007 .entry(*worker_id)
1008 .or_default()
1009 .insert(*actor_id);
1010 }
1011 }
1012
1013 let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1014
1015 for (actor_id, worker_id) in &upstream_fragment_actor_map {
1016 if !actor_group_map.contains_key(actor_id) {
1017 upstream_worker_reverse_index
1018 .entry(*worker_id)
1019 .or_default()
1020 .insert(*actor_id);
1021 }
1022 }
1023
1024 for (worker_id, actor_ids) in worker_reverse_index {
1026 let upstream_actor_ids = upstream_worker_reverse_index
1027 .get(&worker_id)
1028 .unwrap()
1029 .clone();
1030
1031 assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1032
1033 for (actor_id, upstream_actor_id) in actor_ids
1034 .into_iter()
1035 .zip_eq_debug(upstream_actor_ids.into_iter())
1036 {
1037 match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1038 None => {
1039 assert_eq!(
1041 upstream_fragment.distribution_type,
1042 FragmentDistributionType::Single
1043 );
1044 }
1045 Some(bitmap) => {
1046 fragment_bitmap.insert(actor_id, bitmap);
1048 }
1049 }
1050
1051 no_shuffle_upstream_actor_map
1052 .entry(actor_id as ActorId)
1053 .or_default()
1054 .insert(*upstream_fragment_id, upstream_actor_id);
1055 no_shuffle_downstream_actors_map
1056 .entry(upstream_actor_id)
1057 .or_default()
1058 .insert(*fragment_id, actor_id);
1059 }
1060 }
1061
1062 match fragment.distribution_type {
1063 FragmentDistributionType::Hash => {}
1064 FragmentDistributionType::Single => {
1065 assert!(fragment_bitmap.is_empty());
1067 }
1068 FragmentDistributionType::Unspecified => unreachable!(),
1069 }
1070
1071 if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1072 assert_eq!(
1073 e.entry.get(),
1074 &e.value,
1075 "bitmaps derived from different no-shuffle upstreams mismatch"
1076 );
1077 }
1078
1079 if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1081 let no_shuffle_downstreams = downstream_fragments
1082 .iter()
1083 .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1084 .map(|(fragment_id, _)| fragment_id);
1085
1086 for downstream_fragment_id in no_shuffle_downstreams {
1087 arrange_no_shuffle_relation(
1088 ctx,
1089 downstream_fragment_id,
1090 fragment_id,
1091 fragment_actors_after_reschedule,
1092 actor_group_map,
1093 fragment_updated_bitmap,
1094 no_shuffle_upstream_actor_map,
1095 no_shuffle_downstream_actors_map,
1096 );
1097 }
1098 }
1099 }
1100
1101 let mut no_shuffle_upstream_actor_map = HashMap::new();
1102 let mut no_shuffle_downstream_actors_map = HashMap::new();
1103 let mut actor_group_map = HashMap::new();
1104 for fragment_id in reschedules.keys() {
1107 if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1108 && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1109 && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1110 {
1111 for downstream_fragment_id in downstream_fragments.keys() {
1112 arrange_no_shuffle_relation(
1113 &ctx,
1114 downstream_fragment_id,
1115 fragment_id,
1116 &fragment_actors_after_reschedule,
1117 &mut actor_group_map,
1118 &mut fragment_actor_bitmap,
1119 &mut no_shuffle_upstream_actor_map,
1120 &mut no_shuffle_downstream_actors_map,
1121 );
1122 }
1123 }
1124 }
1125
1126 tracing::debug!("actor group map {:?}", actor_group_map);
1127
1128 let mut new_created_actors = HashMap::new();
1129 for fragment_id in reschedules.keys() {
1130 let actors_to_create = fragment_actors_to_create
1131 .get(fragment_id)
1132 .cloned()
1133 .unwrap_or_default();
1134
1135 let fragment = &ctx.fragment_map[fragment_id];
1136
1137 assert!(!fragment.actors.is_empty());
1138
1139 for actor_to_create in &actors_to_create {
1140 let new_actor_id = actor_to_create.0;
1141 let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1142
1143 new_actor.actor_id = *new_actor_id;
1147
1148 Self::modify_actor_upstream_and_downstream(
1149 &ctx,
1150 &fragment_actors_to_remove,
1151 &fragment_actors_to_create,
1152 &fragment_actor_bitmap,
1153 &no_shuffle_downstream_actors_map,
1154 &mut new_actor,
1155 &mut dispatchers,
1156 )?;
1157
1158 if let Some(bitmap) = fragment_actor_bitmap
1159 .get(fragment_id)
1160 .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1161 {
1162 new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1163 }
1164
1165 new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1166 }
1167 }
1168
1169 let mut fragment_actor_splits = HashMap::new();
1172 for fragment_id in reschedules.keys() {
1173 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1174
1175 if ctx.stream_source_fragment_ids.contains(fragment_id) {
1176 let fragment = &ctx.fragment_map[fragment_id];
1177
1178 let prev_actor_ids = fragment
1179 .actors
1180 .iter()
1181 .map(|actor| actor.actor_id)
1182 .collect_vec();
1183
1184 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1185
1186 let actor_splits = self
1187 .env
1188 .shared_actor_infos()
1189 .migrate_splits_for_source_actors(
1190 *fragment_id,
1191 &prev_actor_ids,
1192 &curr_actor_ids,
1193 )?;
1194
1195 tracing::debug!(
1196 "source actor splits: {:?}, fragment_id: {}",
1197 actor_splits,
1198 fragment_id
1199 );
1200 fragment_actor_splits.insert(*fragment_id, actor_splits);
1201 }
1202 }
1203 if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1205 for fragment_id in reschedules.keys() {
1206 let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1207
1208 if let Some(upstream_source_fragment_id) =
1209 ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1210 {
1211 let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1212
1213 let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1214 *fragment_id,
1215 *upstream_source_fragment_id,
1216 &curr_actor_ids,
1217 &fragment_actor_splits,
1218 &no_shuffle_upstream_actor_map,
1219 )?;
1220 tracing::debug!(
1221 "source backfill actor splits: {:?}, fragment_id: {}",
1222 actor_splits,
1223 fragment_id
1224 );
1225 fragment_actor_splits.insert(*fragment_id, actor_splits);
1226 }
1227 }
1228 }
1229
1230 let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1232 HashMap::with_capacity(reschedules.len());
1233
1234 for (fragment_id, _) in reschedules {
1235 let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1236
1237 if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1238 for (actor_id, worker_id) in actor_worker_maps {
1239 actors_to_create
1240 .entry(worker_id)
1241 .or_default()
1242 .push(actor_id);
1243 }
1244 }
1245
1246 let actors_to_remove = fragment_actors_to_remove
1247 .get(&fragment_id)
1248 .cloned()
1249 .unwrap_or_default()
1250 .into_keys()
1251 .collect();
1252
1253 let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1254
1255 assert!(!actors_after_reschedule.is_empty());
1256
1257 let fragment = &ctx.fragment_map[&fragment_id];
1258
1259 let in_degree_types: HashSet<_> = ctx
1260 .fragment_upstreams
1261 .get(&(fragment_id as _))
1262 .map(|upstreams| upstreams.values())
1263 .into_iter()
1264 .flatten()
1265 .cloned()
1266 .collect();
1267
1268 let upstream_dispatcher_mapping = match fragment.distribution_type {
1269 FragmentDistributionType::Hash => {
1270 if !in_degree_types.contains(&DispatcherType::Hash) {
1271 None
1272 } else {
1273 Some(ActorMapping::from_bitmaps(
1275 &fragment_actor_bitmap[&fragment_id],
1276 ))
1277 }
1278 }
1279
1280 FragmentDistributionType::Single => {
1281 assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1282 None
1283 }
1284 FragmentDistributionType::Unspecified => unreachable!(),
1285 };
1286
1287 let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1288
1289 {
1290 if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1291 for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1292 match upstream_dispatcher_type {
1293 DispatcherType::NoShuffle => {}
1294 _ => {
1295 upstream_fragment_dispatcher_set.insert((
1296 *upstream_fragment_id as FragmentId,
1297 fragment.fragment_id as DispatcherId,
1298 ));
1299 }
1300 }
1301 }
1302 }
1303 }
1304
1305 let downstream_fragment_ids = if let Some(downstream_fragments) =
1306 ctx.fragment_dispatcher_map.get(&fragment_id)
1307 {
1308 downstream_fragments
1311 .iter()
1312 .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1313 .map(|(fragment_id, _)| *fragment_id)
1314 .collect_vec()
1315 } else {
1316 vec![]
1317 };
1318
1319 let vnode_bitmap_updates = match fragment.distribution_type {
1320 FragmentDistributionType::Hash => {
1321 let mut vnode_bitmap_updates =
1322 fragment_actor_bitmap.remove(&fragment_id).unwrap();
1323
1324 for actor_id in actors_after_reschedule.keys() {
1327 assert!(vnode_bitmap_updates.contains_key(actor_id));
1328
1329 if let Some(actor) = ctx.actor_map.get(actor_id) {
1331 let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1332
1333 if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1334 && prev_bitmap.eq(bitmap)
1335 {
1336 vnode_bitmap_updates.remove(actor_id);
1337 }
1338 }
1339 }
1340
1341 vnode_bitmap_updates
1342 }
1343 FragmentDistributionType::Single => HashMap::new(),
1344 FragmentDistributionType::Unspecified => unreachable!(),
1345 };
1346
1347 let upstream_fragment_dispatcher_ids =
1348 upstream_fragment_dispatcher_set.into_iter().collect_vec();
1349
1350 let actor_splits = fragment_actor_splits
1351 .get(&fragment_id)
1352 .cloned()
1353 .unwrap_or_default();
1354
1355 let mut cdc_table_id = None;
1356 let cdc_table_snapshot_split_assignment = if fragment
1357 .fragment_type_mask
1358 .contains(FragmentTypeFlag::StreamCdcScan)
1359 {
1360 cdc_table_id = Some(fragment.job_id);
1361 assign_cdc_table_snapshot_splits_impl(
1362 fragment.job_id,
1363 fragment_actors_after_reschedule
1364 .get(&fragment_id)
1365 .unwrap()
1366 .keys()
1367 .copied()
1368 .collect(),
1369 self.env.meta_store_ref(),
1370 None,
1371 )
1372 .await?
1373 } else {
1374 HashMap::default()
1375 };
1376
1377 reschedule_fragment.insert(
1378 fragment_id,
1379 Reschedule {
1380 added_actors: actors_to_create,
1381 removed_actors: actors_to_remove,
1382 vnode_bitmap_updates,
1383 upstream_fragment_dispatcher_ids,
1384 upstream_dispatcher_mapping,
1385 downstream_fragment_ids,
1386 actor_splits,
1387 newly_created_actors: Default::default(),
1388 cdc_table_snapshot_split_assignment,
1389 cdc_table_id,
1390 },
1391 );
1392 }
1393
1394 let mut fragment_created_actors = HashMap::new();
1395 for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1396 let mut created_actors = HashMap::new();
1397 for (actor_id, worker_id) in actors_to_create {
1398 let actor = new_created_actors.get(actor_id).cloned().unwrap();
1399 created_actors.insert(*actor_id, (actor, *worker_id));
1400 }
1401
1402 fragment_created_actors.insert(*fragment_id, created_actors);
1403 }
1404
1405 for (fragment_id, to_create) in fragment_created_actors {
1406 let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1407 reschedule.newly_created_actors = to_create;
1408 }
1409 tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1410
1411 Ok(reschedule_fragment)
1412 }
1413
1414 #[expect(clippy::type_complexity)]
1415 fn arrange_reschedules(
1416 &self,
1417 reschedule: &HashMap<FragmentId, WorkerReschedule>,
1418 ctx: &RescheduleContext,
1419 ) -> MetaResult<(
1420 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1421 HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1422 )> {
1423 let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1424 let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1425
1426 for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1427 let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1428
1429 let mut actors_to_remove = BTreeMap::new();
1431 let mut actors_to_create = BTreeMap::new();
1432
1433 let mut worker_to_actors = HashMap::new();
1435
1436 for actor in &fragment.actors {
1437 let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1438 worker_to_actors
1439 .entry(worker_id)
1440 .or_insert(BTreeSet::new())
1441 .insert(actor.actor_id as ActorId);
1442 }
1443
1444 let decreased_actor_count = worker_actor_diff
1445 .iter()
1446 .filter(|(_, change)| change.is_negative())
1447 .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1448
1449 for (worker_id, n) in decreased_actor_count {
1450 if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1451 if actor_ids.len() < n {
1452 bail!(
1453 "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1454 fragment_id,
1455 worker_id,
1456 actor_ids.len(),
1457 n
1458 );
1459 }
1460
1461 let removed_actors: Vec<_> = actor_ids
1462 .iter()
1463 .skip(actor_ids.len().saturating_sub(n))
1464 .cloned()
1465 .collect();
1466
1467 for actor in removed_actors {
1468 actors_to_remove.insert(actor, *worker_id);
1469 }
1470 }
1471 }
1472
1473 let increased_actor_count = worker_actor_diff
1474 .iter()
1475 .filter(|(_, change)| change.is_positive());
1476
1477 for (worker, n) in increased_actor_count {
1478 for _ in 0..*n {
1479 let id = self
1480 .env
1481 .id_gen_manager()
1482 .generate_interval::<{ IdCategory::Actor }>(1)
1483 as ActorId;
1484 actors_to_create.insert(id, *worker);
1485 }
1486 }
1487
1488 if !actors_to_remove.is_empty() {
1489 fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1490 }
1491
1492 if !actors_to_create.is_empty() {
1493 fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1494 }
1495 }
1496
1497 for actors_to_remove in fragment_actors_to_remove.values() {
1499 for actor_id in actors_to_remove.keys() {
1500 let actor = ctx.actor_map.get(actor_id).unwrap();
1501 for dispatcher in &actor.dispatcher {
1502 if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1503 let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1504
1505 let _should_exists = fragment_actors_to_remove
1506 .get(&(dispatcher.dispatcher_id as FragmentId))
1507 .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1508 .get(downstream_actor_id)
1509 .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1510 }
1511 }
1512 }
1513 }
1514
1515 Ok((fragment_actors_to_remove, fragment_actors_to_create))
1516 }
1517
1518 fn modify_actor_upstream_and_downstream(
1521 ctx: &RescheduleContext,
1522 fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1523 fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1524 fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1525 no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1526 new_actor: &mut StreamActor,
1527 dispatchers: &mut Vec<PbDispatcher>,
1528 ) -> MetaResult<()> {
1529 for dispatcher in dispatchers {
1531 let downstream_fragment_id = dispatcher
1532 .downstream_actor_id
1533 .iter()
1534 .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1535 .dedup()
1536 .exactly_one()
1537 .unwrap() as FragmentId;
1538
1539 let downstream_fragment_actors_to_remove =
1540 fragment_actors_to_remove.get(&downstream_fragment_id);
1541 let downstream_fragment_actors_to_create =
1542 fragment_actors_to_create.get(&downstream_fragment_id);
1543
1544 match dispatcher.r#type() {
1545 d @ (PbDispatcherType::Hash
1546 | PbDispatcherType::Simple
1547 | PbDispatcherType::Broadcast) => {
1548 if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1549 {
1550 dispatcher
1551 .downstream_actor_id
1552 .retain(|id| !downstream_actors_to_remove.contains_key(id));
1553 }
1554
1555 if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1556 {
1557 dispatcher
1558 .downstream_actor_id
1559 .extend(downstream_actors_to_create.keys().cloned())
1560 }
1561
1562 if d == PbDispatcherType::Simple {
1564 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1565 }
1566 }
1567 PbDispatcherType::NoShuffle => {
1568 assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1569 let downstream_actor_id = no_shuffle_downstream_actors_map
1570 .get(&new_actor.actor_id)
1571 .and_then(|map| map.get(&downstream_fragment_id))
1572 .unwrap();
1573 dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1574 }
1575 PbDispatcherType::Unspecified => unreachable!(),
1576 }
1577
1578 if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1579 && let Some(downstream_updated_bitmap) =
1580 fragment_actor_bitmap.get(&downstream_fragment_id)
1581 {
1582 *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1584 }
1585 }
1586
1587 Ok(())
1588 }
1589
1590 #[await_tree::instrument]
1591 pub async fn post_apply_reschedule(
1592 &self,
1593 reschedules: &HashMap<FragmentId, Reschedule>,
1594 post_updates: &JobReschedulePostUpdates,
1595 ) -> MetaResult<()> {
1596 self.metadata_manager
1598 .post_apply_reschedules(reschedules.clone(), post_updates)
1599 .await?;
1600
1601 if !reschedules.is_empty() {
1603 let workers = self
1604 .metadata_manager
1605 .list_active_serving_compute_nodes()
1606 .await?;
1607 let streaming_parallelisms = self
1608 .metadata_manager
1609 .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1610 .await?;
1611 let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1612 let max_serving_parallelism = self
1613 .env
1614 .session_params_manager_impl_ref()
1615 .get_params()
1616 .await
1617 .batch_parallelism()
1618 .map(|p| p.get());
1619 let (upserted, failed) = serving_worker_slot_mapping.upsert(
1620 streaming_parallelisms,
1621 &workers,
1622 max_serving_parallelism,
1623 );
1624 if !upserted.is_empty() {
1625 tracing::debug!(
1626 "Update serving vnode mapping for fragments {:?}.",
1627 upserted.keys()
1628 );
1629 self.env
1630 .notification_manager()
1631 .notify_frontend_without_version(
1632 Operation::Update,
1633 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1634 mappings: to_fragment_worker_slot_mapping(&upserted),
1635 }),
1636 );
1637 }
1638 if !failed.is_empty() {
1639 tracing::debug!(
1640 "Fail to update serving vnode mapping for fragments {:?}.",
1641 failed
1642 );
1643 self.env
1644 .notification_manager()
1645 .notify_frontend_without_version(
1646 Operation::Delete,
1647 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1648 mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1649 }),
1650 );
1651 }
1652 }
1653
1654 Ok(())
1655 }
1656
1657 pub async fn generate_job_reschedule_plan(
1658 &self,
1659 policy: JobReschedulePolicy,
1660 generate_plan_for_cdc_table_backfill: bool,
1661 ) -> MetaResult<JobReschedulePlan> {
1662 type VnodeCount = usize;
1663
1664 let JobReschedulePolicy { targets } = policy;
1665
1666 let workers = self
1667 .metadata_manager
1668 .list_active_streaming_compute_nodes()
1669 .await?;
1670
1671 let workers: HashMap<_, _> = workers
1673 .into_iter()
1674 .filter(|worker| worker.is_streaming_schedulable())
1675 .map(|worker| (worker.id, worker))
1676 .collect();
1677
1678 #[derive(Debug)]
1679 struct JobUpdate {
1680 filtered_worker_ids: BTreeSet<WorkerId>,
1681 parallelism: TableParallelism,
1682 }
1683
1684 let mut job_parallelism_updates = HashMap::new();
1685
1686 let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1687 parallelism_updates: Default::default(),
1688 resource_group_updates: Default::default(),
1689 };
1690
1691 for (
1692 job_id,
1693 JobRescheduleTarget {
1694 parallelism: parallelism_update,
1695 resource_group: resource_group_update,
1696 },
1697 ) in &targets
1698 {
1699 let parallelism = match parallelism_update {
1700 JobParallelismTarget::Update(parallelism) => *parallelism,
1701 JobParallelismTarget::Refresh => {
1702 let parallelism = self
1703 .metadata_manager
1704 .catalog_controller
1705 .get_job_streaming_parallelisms(*job_id as _)
1706 .await?;
1707
1708 parallelism.into()
1709 }
1710 };
1711
1712 job_reschedule_post_updates
1713 .parallelism_updates
1714 .insert(TableId::from(*job_id), parallelism);
1715
1716 let current_resource_group = match resource_group_update {
1717 JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1718 job_reschedule_post_updates.resource_group_updates.insert(
1719 *job_id as ObjectId,
1720 Some(specific_resource_group.to_owned()),
1721 );
1722
1723 specific_resource_group.to_owned()
1724 }
1725 JobResourceGroupTarget::Update(None) => {
1726 let database_resource_group = self
1727 .metadata_manager
1728 .catalog_controller
1729 .get_existing_job_database_resource_group(*job_id as _)
1730 .await?;
1731
1732 job_reschedule_post_updates
1733 .resource_group_updates
1734 .insert(*job_id as ObjectId, None);
1735 database_resource_group
1736 }
1737 JobResourceGroupTarget::Keep => {
1738 self.metadata_manager
1739 .catalog_controller
1740 .get_existing_job_resource_group(*job_id as _)
1741 .await?
1742 }
1743 };
1744
1745 let filtered_worker_ids =
1746 filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1747
1748 if filtered_worker_ids.is_empty() {
1749 bail!("Cannot resize streaming_job {job_id} to empty worker set")
1750 }
1751
1752 job_parallelism_updates.insert(
1753 *job_id,
1754 JobUpdate {
1755 filtered_worker_ids,
1756 parallelism,
1757 },
1758 );
1759 }
1760
1761 let mut no_shuffle_source_fragment_ids = HashSet::new();
1763 let mut no_shuffle_target_fragment_ids = HashSet::new();
1764
1765 let mut fragment_distribution_map = HashMap::new();
1767 let mut actor_location = HashMap::new();
1769 let mut table_fragment_id_map = HashMap::new();
1771 let mut fragment_actor_id_map = HashMap::new();
1773
1774 async fn build_index(
1775 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1776 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1777 fragment_distribution_map: &mut HashMap<
1778 FragmentId,
1779 (FragmentDistributionType, VnodeCount, bool),
1780 >,
1781 actor_location: &mut HashMap<ActorId, WorkerId>,
1782 table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1783 fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1784 mgr: &MetadataManager,
1785 table_ids: Vec<ObjectId>,
1786 generate_plan_only_for_cdc_table_backfill: bool,
1787 ) -> Result<(), MetaError> {
1788 let RescheduleWorkingSet {
1789 fragments,
1790 actors,
1791 actor_dispatchers: _actor_dispatchers,
1792 fragment_downstreams,
1793 fragment_upstreams: _fragment_upstreams,
1794 related_jobs: _related_jobs,
1795 job_resource_groups: _job_resource_groups,
1796 } = mgr
1797 .catalog_controller
1798 .resolve_working_set_for_reschedule_tables(table_ids)
1799 .await?;
1800
1801 for (fragment_id, downstreams) in fragment_downstreams {
1802 for (downstream_fragment_id, dispatcher_type) in downstreams {
1803 if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1804 no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1805 no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1806 }
1807 }
1808 }
1809
1810 for (fragment_id, fragment) in fragments {
1811 let is_cdc_backfill_v2_fragment =
1812 FragmentTypeMask::from(fragment.fragment_type_mask)
1813 .contains(FragmentTypeFlag::StreamCdcScan);
1814 if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1815 continue;
1816 }
1817 fragment_distribution_map.insert(
1818 fragment_id as FragmentId,
1819 (
1820 FragmentDistributionType::from(fragment.distribution_type),
1821 fragment.vnode_count as _,
1822 is_cdc_backfill_v2_fragment,
1823 ),
1824 );
1825
1826 table_fragment_id_map
1827 .entry(fragment.job_id as u32)
1828 .or_default()
1829 .insert(fragment_id as FragmentId);
1830 }
1831
1832 for (actor_id, actor) in actors {
1833 actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1834 fragment_actor_id_map
1835 .entry(actor.fragment_id as FragmentId)
1836 .or_default()
1837 .insert(actor_id as ActorId);
1838 }
1839
1840 Ok(())
1841 }
1842
1843 let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1844
1845 build_index(
1846 &mut no_shuffle_source_fragment_ids,
1847 &mut no_shuffle_target_fragment_ids,
1848 &mut fragment_distribution_map,
1849 &mut actor_location,
1850 &mut table_fragment_id_map,
1851 &mut fragment_actor_id_map,
1852 &self.metadata_manager,
1853 table_ids,
1854 generate_plan_for_cdc_table_backfill,
1855 )
1856 .await?;
1857 tracing::debug!(
1858 ?job_reschedule_post_updates,
1859 ?job_parallelism_updates,
1860 ?no_shuffle_source_fragment_ids,
1861 ?no_shuffle_target_fragment_ids,
1862 ?fragment_distribution_map,
1863 ?actor_location,
1864 ?table_fragment_id_map,
1865 ?fragment_actor_id_map,
1866 "generate_table_resize_plan, after build_index"
1867 );
1868
1869 let adaptive_parallelism_strategy = self
1870 .env
1871 .system_params_reader()
1872 .await
1873 .adaptive_parallelism_strategy();
1874
1875 let mut target_plan = HashMap::new();
1876
1877 for (
1878 table_id,
1879 JobUpdate {
1880 filtered_worker_ids,
1881 parallelism,
1882 },
1883 ) in job_parallelism_updates
1884 {
1885 let assigner = AssignerBuilder::new(table_id).build();
1886
1887 let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1888
1889 let available_worker_slots = workers
1890 .iter()
1891 .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1892 .map(|(_, worker)| {
1893 (
1894 worker.id as WorkerId,
1895 NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1896 )
1897 })
1898 .collect::<BTreeMap<_, _>>();
1899
1900 for fragment_id in fragment_map {
1901 if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1903 continue;
1904 }
1905
1906 let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1907
1908 for actor_id in &fragment_actor_id_map[&fragment_id] {
1909 let worker_id = actor_location[actor_id];
1910 *fragment_slots.entry(worker_id).or_default() += 1;
1911 }
1912
1913 let available_slot_count: usize = available_worker_slots
1914 .values()
1915 .cloned()
1916 .map(NonZeroUsize::get)
1917 .sum();
1918
1919 if available_slot_count == 0 {
1920 bail!(
1921 "No schedulable slots available for fragment {}",
1922 fragment_id
1923 );
1924 }
1925
1926 let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1927 fragment_distribution_map[&fragment_id];
1928 let max_parallelism = vnode_count;
1929 let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1930 assert!(is_cdc_backfill_v2_fragment);
1931 let TableParallelism::Fixed(new_parallelism) = parallelism else {
1932 return Err(anyhow::anyhow!(
1933 "invalid new parallelism {:?}, expect fixed parallelism",
1934 parallelism
1935 )
1936 .into());
1937 };
1938 if new_parallelism > max_parallelism || new_parallelism == 0 {
1939 return Err(anyhow::anyhow!(
1940 "invalid new parallelism {}, max parallelism {}",
1941 new_parallelism,
1942 max_parallelism
1943 )
1944 .into());
1945 }
1946 TableParallelism::Fixed(new_parallelism)
1947 } else if is_cdc_backfill_v2_fragment {
1948 TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1949 } else {
1950 parallelism
1951 };
1952 match dist {
1953 FragmentDistributionType::Unspecified => unreachable!(),
1954 FragmentDistributionType::Single => {
1955 let (single_worker_id, should_be_one) = fragment_slots
1956 .iter()
1957 .exactly_one()
1958 .expect("single fragment should have only one worker slot");
1959
1960 assert_eq!(*should_be_one, 1);
1961
1962 let assignment =
1963 assigner.count_actors_per_worker(&available_worker_slots, 1);
1964
1965 let (chosen_target_worker_id, should_be_one) =
1966 assignment.iter().exactly_one().ok().with_context(|| {
1967 format!(
1968 "Cannot find a single target worker for fragment {fragment_id}"
1969 )
1970 })?;
1971
1972 assert_eq!(*should_be_one, 1);
1973
1974 if *chosen_target_worker_id == *single_worker_id {
1975 tracing::debug!(
1976 "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1977 );
1978 continue;
1979 }
1980
1981 target_plan.insert(
1982 fragment_id,
1983 WorkerReschedule {
1984 worker_actor_diff: BTreeMap::from_iter(vec![
1985 (*chosen_target_worker_id, 1),
1986 (*single_worker_id, -1),
1987 ]),
1988 },
1989 );
1990 }
1991 FragmentDistributionType::Hash => match fragment_parallelism_strategy {
1992 TableParallelism::Adaptive => {
1993 let target_slot_count = adaptive_parallelism_strategy
1994 .compute_target_parallelism(available_slot_count);
1995
1996 if target_slot_count > max_parallelism {
1997 tracing::warn!(
1998 "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1999 );
2000
2001 let target_worker_slots = assigner.count_actors_per_worker(
2002 &available_worker_slots,
2003 max_parallelism,
2004 );
2005
2006 target_plan.insert(
2007 fragment_id,
2008 Self::diff_worker_slot_changes(
2009 &fragment_slots,
2010 &target_worker_slots,
2011 ),
2012 );
2013 } else if available_slot_count != target_slot_count {
2014 tracing::info!(
2015 "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2016 );
2017
2018 let target_worker_slots = assigner.count_actors_per_worker(
2019 &available_worker_slots,
2020 target_slot_count,
2021 );
2022
2023 target_plan.insert(
2024 fragment_id,
2025 Self::diff_worker_slot_changes(
2026 &fragment_slots,
2027 &target_worker_slots,
2028 ),
2029 );
2030 } else {
2031 let available_worker_slots = available_worker_slots
2032 .iter()
2033 .map(|(worker_id, v)| (*worker_id, v.get()))
2034 .collect();
2035
2036 target_plan.insert(
2037 fragment_id,
2038 Self::diff_worker_slot_changes(
2039 &fragment_slots,
2040 &available_worker_slots,
2041 ),
2042 );
2043 }
2044 }
2045 TableParallelism::Fixed(mut n) => {
2046 if n > max_parallelism {
2047 tracing::warn!(
2048 "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2049 );
2050 n = max_parallelism
2051 }
2052
2053 let target_worker_slots =
2054 assigner.count_actors_per_worker(&available_worker_slots, n);
2055
2056 target_plan.insert(
2057 fragment_id,
2058 Self::diff_worker_slot_changes(
2059 &fragment_slots,
2060 &target_worker_slots,
2061 ),
2062 );
2063 }
2064 TableParallelism::Custom => {
2065 }
2067 },
2068 }
2069 }
2070 }
2071
2072 target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2073 tracing::debug!(
2074 ?target_plan,
2075 "generate_table_resize_plan finished target_plan"
2076 );
2077 if generate_plan_for_cdc_table_backfill {
2078 job_reschedule_post_updates.resource_group_updates = HashMap::default();
2079 job_reschedule_post_updates.parallelism_updates = HashMap::default();
2080 }
2081 Ok(JobReschedulePlan {
2082 reschedules: target_plan,
2083 post_updates: job_reschedule_post_updates,
2084 })
2085 }
2086
2087 fn diff_worker_slot_changes(
2088 fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2089 target_worker_slots: &BTreeMap<WorkerId, usize>,
2090 ) -> WorkerReschedule {
2091 let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2092 let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2093
2094 for (&worker_id, &target_slots) in target_worker_slots {
2095 let ¤t_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2096
2097 if target_slots > current_slots {
2098 increased_actor_count.insert(worker_id, target_slots - current_slots);
2099 }
2100 }
2101
2102 for (&worker_id, ¤t_slots) in fragment_worker_slots {
2103 let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2104
2105 if current_slots > target_slots {
2106 decreased_actor_count.insert(worker_id, current_slots - target_slots);
2107 }
2108 }
2109
2110 let worker_ids: HashSet<_> = increased_actor_count
2111 .keys()
2112 .chain(decreased_actor_count.keys())
2113 .cloned()
2114 .collect();
2115
2116 let mut worker_actor_diff = BTreeMap::new();
2117
2118 for worker_id in worker_ids {
2119 let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2120 let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2121 let change = increased - decreased;
2122
2123 assert_ne!(change, 0);
2124
2125 worker_actor_diff.insert(worker_id, change);
2126 }
2127
2128 WorkerReschedule { worker_actor_diff }
2129 }
2130
2131 fn build_no_shuffle_relation_index(
2132 actor_map: &HashMap<ActorId, CustomActorInfo>,
2133 no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2134 no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2135 ) {
2136 let mut fragment_cache = HashSet::new();
2137 for actor in actor_map.values() {
2138 if fragment_cache.contains(&actor.fragment_id) {
2139 continue;
2140 }
2141
2142 for dispatcher in &actor.dispatcher {
2143 for downstream_actor_id in &dispatcher.downstream_actor_id {
2144 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2145 if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2147 no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2148 no_shuffle_target_fragment_ids
2149 .insert(downstream_actor.fragment_id as FragmentId);
2150 }
2151 }
2152 }
2153 }
2154
2155 fragment_cache.insert(actor.fragment_id);
2156 }
2157 }
2158
2159 fn build_fragment_dispatcher_index(
2160 actor_map: &HashMap<ActorId, CustomActorInfo>,
2161 fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2162 ) {
2163 for actor in actor_map.values() {
2164 for dispatcher in &actor.dispatcher {
2165 for downstream_actor_id in &dispatcher.downstream_actor_id {
2166 if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2167 fragment_dispatcher_map
2168 .entry(actor.fragment_id as FragmentId)
2169 .or_default()
2170 .insert(
2171 downstream_actor.fragment_id as FragmentId,
2172 dispatcher.r#type().into(),
2173 );
2174 }
2175 }
2176 }
2177 }
2178 }
2179
2180 pub fn resolve_no_shuffle_upstream_tables(
2181 fragment_ids: HashSet<FragmentId>,
2182 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2183 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2184 fragment_to_table: &HashMap<FragmentId, TableId>,
2185 fragment_upstreams: &HashMap<
2186 risingwave_meta_model::FragmentId,
2187 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2188 >,
2189 table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2190 ) -> MetaResult<()> {
2191 let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2192
2193 let mut fragment_ids = fragment_ids;
2194
2195 while let Some(fragment_id) = queue.pop_front() {
2198 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2199 continue;
2200 }
2201
2202 for upstream_fragment_id in fragment_upstreams
2204 .get(&(fragment_id as _))
2205 .map(|upstreams| upstreams.keys())
2206 .into_iter()
2207 .flatten()
2208 {
2209 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2210 let upstream_fragment_id = &upstream_fragment_id;
2211 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2212 continue;
2213 }
2214
2215 let table_id = &fragment_to_table[&fragment_id];
2216 let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2217
2218 if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2220 if let Some(upstream_table_parallelism) =
2221 table_parallelisms.get(upstream_table_id)
2222 {
2223 if upstream_table_parallelism != &TableParallelism::Custom {
2224 bail!(
2225 "Cannot change upstream table {} from {:?} to {:?}",
2226 upstream_table_id,
2227 upstream_table_parallelism,
2228 TableParallelism::Custom
2229 )
2230 }
2231 } else {
2232 table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2233 }
2234 }
2235
2236 fragment_ids.insert(*upstream_fragment_id);
2237 queue.push_back(*upstream_fragment_id);
2238 }
2239 }
2240
2241 let downstream_fragment_ids = fragment_ids
2242 .iter()
2243 .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2244
2245 let downstream_table_ids = downstream_fragment_ids
2246 .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2247 .collect::<HashSet<_>>();
2248
2249 table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2250
2251 Ok(())
2252 }
2253
2254 pub fn resolve_no_shuffle_upstream_fragments<T>(
2255 reschedule: &mut HashMap<FragmentId, T>,
2256 no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2257 no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2258 fragment_upstreams: &HashMap<
2259 risingwave_meta_model::FragmentId,
2260 HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2261 >,
2262 ) -> MetaResult<()>
2263 where
2264 T: Clone + Eq,
2265 {
2266 let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2267
2268 while let Some(fragment_id) = queue.pop_front() {
2271 if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2272 continue;
2273 }
2274
2275 for upstream_fragment_id in fragment_upstreams
2277 .get(&(fragment_id as _))
2278 .map(|upstreams| upstreams.keys())
2279 .into_iter()
2280 .flatten()
2281 {
2282 let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2283 let upstream_fragment_id = &upstream_fragment_id;
2284 if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2285 continue;
2286 }
2287
2288 let reschedule_plan = &reschedule[&fragment_id];
2289
2290 if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2291 if upstream_reschedule_plan != reschedule_plan {
2292 bail!(
2293 "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2294 fragment_id,
2295 upstream_fragment_id
2296 );
2297 }
2298
2299 continue;
2300 }
2301
2302 reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2303
2304 queue.push_back(*upstream_fragment_id);
2305 }
2306 }
2307
2308 reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2309
2310 Ok(())
2311 }
2312
2313 pub async fn resolve_related_no_shuffle_jobs(
2314 &self,
2315 jobs: &[TableId],
2316 ) -> MetaResult<HashSet<TableId>> {
2317 let RescheduleWorkingSet { related_jobs, .. } = self
2318 .metadata_manager
2319 .catalog_controller
2320 .resolve_working_set_for_reschedule_tables(
2321 jobs.iter().map(|id| id.table_id as _).collect(),
2322 )
2323 .await?;
2324
2325 Ok(related_jobs
2326 .keys()
2327 .map(|id| TableId::new(*id as _))
2328 .collect())
2329 }
2330}
2331
2332#[derive(Debug, Clone)]
2333pub enum JobParallelismTarget {
2334 Update(TableParallelism),
2335 Refresh,
2336}
2337
2338#[derive(Debug, Clone)]
2339pub enum JobResourceGroupTarget {
2340 Update(Option<String>),
2341 Keep,
2342}
2343
2344#[derive(Debug, Clone)]
2345pub struct JobRescheduleTarget {
2346 pub parallelism: JobParallelismTarget,
2347 pub resource_group: JobResourceGroupTarget,
2348}
2349
2350#[derive(Debug)]
2351pub struct JobReschedulePolicy {
2352 pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2353}
2354
2355#[derive(Debug, Clone)]
2357pub struct JobReschedulePostUpdates {
2358 pub parallelism_updates: HashMap<TableId, TableParallelism>,
2359 pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2360}
2361
2362#[derive(Debug)]
2363pub struct JobReschedulePlan {
2364 pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2365 pub post_updates: JobReschedulePostUpdates,
2366}
2367
2368impl GlobalStreamManager {
2369 #[await_tree::instrument("acquire_reschedule_read_guard")]
2370 pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2371 self.scale_controller.reschedule_lock.read().await
2372 }
2373
2374 #[await_tree::instrument("acquire_reschedule_write_guard")]
2375 pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2376 self.scale_controller.reschedule_lock.write().await
2377 }
2378
2379 pub async fn reschedule_actors(
2387 &self,
2388 database_id: DatabaseId,
2389 plan: JobReschedulePlan,
2390 options: RescheduleOptions,
2391 ) -> MetaResult<()> {
2392 let JobReschedulePlan {
2393 reschedules,
2394 mut post_updates,
2395 } = plan;
2396
2397 let reschedule_fragment = self
2398 .scale_controller
2399 .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2400 .await?;
2401
2402 tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2403
2404 let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2405 .iter()
2406 .flat_map(|(_, reschedule)| {
2407 reschedule
2408 .upstream_fragment_dispatcher_ids
2409 .iter()
2410 .map(|(fragment_id, _)| *fragment_id)
2411 .chain(reschedule.downstream_fragment_ids.iter().cloned())
2412 })
2413 .collect();
2414
2415 let fragment_actors =
2416 try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2417 let actor_ids = self
2418 .metadata_manager
2419 .get_running_actors_of_fragment(*fragment_id)
2420 .await?;
2421 Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2422 }))
2423 .await?
2424 .into_iter()
2425 .collect();
2426
2427 let command = Command::RescheduleFragment {
2428 reschedules: reschedule_fragment,
2429 fragment_actors,
2430 post_updates,
2431 };
2432
2433 let _guard = self.source_manager.pause_tick().await;
2434
2435 self.barrier_scheduler
2436 .run_command(database_id, command)
2437 .await?;
2438
2439 tracing::info!("reschedule done");
2440
2441 Ok(())
2442 }
2443
2444 async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2453 tracing::info!("trigger parallelism control");
2454
2455 let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2456
2457 let background_streaming_jobs = self
2458 .metadata_manager
2459 .list_background_creating_jobs()
2460 .await?;
2461
2462 let skipped_jobs = if !background_streaming_jobs.is_empty() {
2463 let jobs = self
2464 .scale_controller
2465 .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2466 .await?;
2467
2468 tracing::info!(
2469 "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2470 background_streaming_jobs,
2471 jobs
2472 );
2473
2474 jobs
2475 } else {
2476 HashSet::new()
2477 };
2478
2479 let job_ids: HashSet<_> = {
2480 let streaming_parallelisms = self
2481 .metadata_manager
2482 .catalog_controller
2483 .get_all_streaming_parallelisms()
2484 .await?;
2485
2486 streaming_parallelisms
2487 .into_iter()
2488 .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2489 .map(|(table_id, _)| table_id)
2490 .collect()
2491 };
2492
2493 let workers = self
2494 .metadata_manager
2495 .cluster_controller
2496 .list_active_streaming_workers()
2497 .await?;
2498
2499 let schedulable_worker_ids: BTreeSet<_> = workers
2500 .iter()
2501 .filter(|worker| {
2502 !worker
2503 .property
2504 .as_ref()
2505 .map(|p| p.is_unschedulable)
2506 .unwrap_or(false)
2507 })
2508 .map(|worker| worker.id as WorkerId)
2509 .collect();
2510
2511 if job_ids.is_empty() {
2512 tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2513 return Ok(false);
2514 }
2515
2516 let batch_size = match self.env.opts.parallelism_control_batch_size {
2517 0 => job_ids.len(),
2518 n => n,
2519 };
2520
2521 tracing::info!(
2522 "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2523 job_ids.len(),
2524 batch_size,
2525 schedulable_worker_ids
2526 );
2527
2528 let batches: Vec<_> = job_ids
2529 .into_iter()
2530 .chunks(batch_size)
2531 .into_iter()
2532 .map(|chunk| chunk.collect_vec())
2533 .collect();
2534
2535 let mut reschedules = None;
2536
2537 for batch in batches {
2538 let targets: HashMap<_, _> = batch
2539 .into_iter()
2540 .map(|job_id| {
2541 (
2542 job_id as u32,
2543 JobRescheduleTarget {
2544 parallelism: JobParallelismTarget::Refresh,
2545 resource_group: JobResourceGroupTarget::Keep,
2546 },
2547 )
2548 })
2549 .collect();
2550
2551 let plan = self
2552 .scale_controller
2553 .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2554 .await?;
2555
2556 if !plan.reschedules.is_empty() {
2557 tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2558 reschedules = Some(plan);
2559 break;
2560 }
2561 }
2562
2563 let Some(plan) = reschedules else {
2564 tracing::info!("no reschedule plan generated");
2565 return Ok(false);
2566 };
2567
2568 for (database_id, reschedules) in self
2570 .metadata_manager
2571 .split_fragment_map_by_database(plan.reschedules)
2572 .await?
2573 {
2574 self.reschedule_actors(
2575 database_id,
2576 JobReschedulePlan {
2577 reschedules,
2578 post_updates: plan.post_updates.clone(),
2579 },
2580 RescheduleOptions {
2581 resolve_no_shuffle_upstream: false,
2582 skip_create_new_actors: false,
2583 },
2584 )
2585 .await?;
2586 }
2587
2588 Ok(true)
2589 }
2590
2591 async fn run(&self, mut shutdown_rx: Receiver<()>) {
2593 tracing::info!("starting automatic parallelism control monitor");
2594
2595 let check_period =
2596 Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2597
2598 let mut ticker = tokio::time::interval_at(
2599 Instant::now()
2600 + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2601 check_period,
2602 );
2603 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2604
2605 ticker.tick().await;
2607
2608 let (local_notification_tx, mut local_notification_rx) =
2609 tokio::sync::mpsc::unbounded_channel();
2610
2611 self.env
2612 .notification_manager()
2613 .insert_local_sender(local_notification_tx);
2614
2615 let worker_nodes = self
2616 .metadata_manager
2617 .list_active_streaming_compute_nodes()
2618 .await
2619 .expect("list active streaming compute nodes");
2620
2621 let mut worker_cache: BTreeMap<_, _> = worker_nodes
2622 .into_iter()
2623 .map(|worker| (worker.id, worker))
2624 .collect();
2625
2626 let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2627
2628 let mut should_trigger = false;
2629
2630 loop {
2631 tokio::select! {
2632 biased;
2633
2634 _ = &mut shutdown_rx => {
2635 tracing::info!("Stream manager is stopped");
2636 break;
2637 }
2638
2639 _ = ticker.tick(), if should_trigger => {
2640 let include_workers = worker_cache.keys().copied().collect_vec();
2641
2642 if include_workers.is_empty() {
2643 tracing::debug!("no available worker nodes");
2644 should_trigger = false;
2645 continue;
2646 }
2647
2648 match self.trigger_parallelism_control().await {
2649 Ok(cont) => {
2650 should_trigger = cont;
2651 }
2652 Err(e) => {
2653 tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2654 ticker.reset();
2655 }
2656 }
2657 }
2658
2659 notification = local_notification_rx.recv() => {
2660 let notification = notification.expect("local notification channel closed in loop of stream manager");
2661
2662 let worker_is_streaming_compute = |worker: &WorkerNode| {
2664 worker.get_type() == Ok(WorkerType::ComputeNode)
2665 && worker.property.as_ref().unwrap().is_streaming
2666 };
2667
2668 match notification {
2669 LocalNotification::SystemParamsChange(reader) => {
2670 let new_strategy = reader.adaptive_parallelism_strategy();
2671 if new_strategy != previous_adaptive_parallelism_strategy {
2672 tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2673 should_trigger = true;
2674 previous_adaptive_parallelism_strategy = new_strategy;
2675 }
2676 }
2677 LocalNotification::WorkerNodeActivated(worker) => {
2678 if !worker_is_streaming_compute(&worker) {
2679 continue;
2680 }
2681
2682 tracing::info!(worker = worker.id, "worker activated notification received");
2683
2684 let prev_worker = worker_cache.insert(worker.id, worker.clone());
2685
2686 match prev_worker {
2687 Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
2688 tracing::info!(worker = worker.id, "worker parallelism changed");
2689 should_trigger = true;
2690 }
2691 Some(prev_worker) if prev_worker.resource_group() != worker.resource_group() => {
2692 tracing::info!(worker = worker.id, "worker label changed");
2693 should_trigger = true;
2694 }
2695 None => {
2696 tracing::info!(worker = worker.id, "new worker joined");
2697 should_trigger = true;
2698 }
2699 _ => {}
2700 }
2701 }
2702
2703 LocalNotification::WorkerNodeDeleted(worker) => {
2706 if !worker_is_streaming_compute(&worker) {
2707 continue;
2708 }
2709
2710 match worker_cache.remove(&worker.id) {
2711 Some(prev_worker) => {
2712 tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2713 }
2714 None => {
2715 tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2716 }
2717 }
2718 }
2719
2720 _ => {}
2721 }
2722 }
2723 }
2724 }
2725 }
2726
2727 pub fn start_auto_parallelism_monitor(
2728 self: Arc<Self>,
2729 ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2730 tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2731 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2732 let join_handle = tokio::spawn(async move {
2733 self.run(shutdown_rx).await;
2734 });
2735
2736 (join_handle, shutdown_tx)
2737 }
2738}