risingwave_meta/stream/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36    FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50    ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53    ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64    pub job_id: u32,
65    pub fragment_id: u32,
66    pub fragment_type_mask: FragmentTypeMask,
67    pub distribution_type: PbFragmentDistributionType,
68    pub state_table_ids: Vec<u32>,
69    pub node: StreamNode,
70    pub actor_template: StreamActorWithDispatchers,
71    pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76    pub actor_id: u32,
77    pub fragment_id: u32,
78    pub dispatcher: Vec<Dispatcher>,
79    /// `None` if singleton.
80    pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use futures::future::try_join_all;
85use risingwave_common::system_param::AdaptiveParallelismStrategy;
86use risingwave_common::system_param::reader::SystemParamsRead;
87use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
88use risingwave_meta_model::DispatcherType;
89use risingwave_pb::stream_plan::stream_node::NodeBody;
90
91use crate::controller::id::IdCategory;
92use crate::controller::utils::filter_workers_by_resource_group;
93use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
94
95// The debug implementation is arbitrary. Just used in debug logs.
96#[derive(Educe)]
97#[educe(Debug)]
98pub struct RescheduleContext {
99    /// Meta information for all Actors
100    #[educe(Debug(ignore))]
101    actor_map: HashMap<ActorId, CustomActorInfo>,
102    /// Status of all Actors, used to find the location of the `Actor`
103    actor_status: BTreeMap<ActorId, WorkerId>,
104    /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor`
105    #[educe(Debug(ignore))]
106    fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
107    /// Fragments with `StreamSource`
108    stream_source_fragment_ids: HashSet<FragmentId>,
109    /// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment
110    stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
111    /// Target fragments in `NoShuffle` relation
112    no_shuffle_target_fragment_ids: HashSet<FragmentId>,
113    /// Source fragments in `NoShuffle` relation
114    no_shuffle_source_fragment_ids: HashSet<FragmentId>,
115    // index for dispatcher type from upstream fragment to downstream fragment
116    fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
117    fragment_upstreams: HashMap<
118        risingwave_meta_model::FragmentId,
119        HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
120    >,
121}
122
123impl RescheduleContext {
124    fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
125        self.actor_status
126            .get(actor_id)
127            .cloned()
128            .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
129    }
130}
131
132/// This function provides an simple balancing method
133/// The specific process is as follows
134///
135/// 1. Calculate the number of target actors, and calculate the average value and the remainder, and
136///    use the average value as expected.
137///
138/// 2. Filter out the actor to be removed and the actor to be retained, and sort them from largest
139///    to smallest (according to the number of virtual nodes held).
140///
141/// 3. Calculate their balance, 1) For the actors to be removed, the number of virtual nodes per
142///    actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the
143///    balance. 3) For newly created actors, -expected is the balance (always negative).
144///
145/// 4. Allocate the remainder, high priority to newly created nodes.
146///
147/// 5. After that, merge removed, retained and created into a queue, with the head of the queue
148///    being the source, and move the virtual nodes to the destination at the end of the queue.
149///
150/// This can handle scale in, scale out, migration, and simultaneous scaling with as much affinity
151/// as possible.
152///
153/// Note that this function can only rebalance actors whose `vnode_bitmap` is not `None`, in other
154/// words, for `Fragment` of `FragmentDistributionType::Single`, using this function will cause
155/// assert to fail and should be skipped from the upper level.
156///
157/// The return value is the bitmap distribution after scaling, which covers all virtual node indexes
158pub fn rebalance_actor_vnode(
159    actors: &[CustomActorInfo],
160    actors_to_remove: &BTreeSet<ActorId>,
161    actors_to_create: &BTreeSet<ActorId>,
162) -> HashMap<ActorId, Bitmap> {
163    let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
164
165    assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
166    assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
167
168    assert!(actors.len() >= actors_to_remove.len());
169
170    let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
171    assert!(target_actor_count > 0);
172
173    // `vnode_bitmap` must be set on distributed fragments.
174    let vnode_count = actors[0]
175        .vnode_bitmap
176        .as_ref()
177        .expect("vnode bitmap unset")
178        .len();
179
180    // represents the balance of each actor, used to sort later
181    #[derive(Debug)]
182    struct Balance {
183        actor_id: ActorId,
184        balance: i32,
185        builder: BitmapBuilder,
186    }
187    let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
188
189    tracing::debug!(
190        "expected {}, remain {}, prev actors {}, target actors {}",
191        expected,
192        remain,
193        actors.len(),
194        target_actor_count,
195    );
196
197    let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
198        .iter()
199        .map(|actor| {
200            (
201                actor.actor_id as ActorId,
202                actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
203            )
204        })
205        .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
206
207    let order_by_bitmap_desc =
208        |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
209            bitmap_a
210                .count_ones()
211                .cmp(&bitmap_b.count_ones())
212                .reverse()
213                .then(id_a.cmp(id_b))
214        };
215
216    let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
217        let mut builder = BitmapBuilder::default();
218        builder.append_bitmap(bitmap);
219        builder
220    };
221
222    let (prev_expected, _) = vnode_count.div_rem(&actors.len());
223
224    let prev_remain = removed
225        .iter()
226        .map(|(_, bitmap)| {
227            assert!(bitmap.count_ones() >= prev_expected);
228            bitmap.count_ones() - prev_expected
229        })
230        .sum::<usize>();
231
232    removed.sort_by(order_by_bitmap_desc);
233    rest.sort_by(order_by_bitmap_desc);
234
235    let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
236        actor_id,
237        balance: bitmap.count_ones() as i32,
238        builder: builder_from_bitmap(&bitmap),
239    });
240
241    let mut rest_balances = rest
242        .into_iter()
243        .map(|(actor_id, bitmap)| Balance {
244            actor_id,
245            balance: bitmap.count_ones() as i32 - expected as i32,
246            builder: builder_from_bitmap(&bitmap),
247        })
248        .collect_vec();
249
250    let mut created_balances = actors_to_create
251        .iter()
252        .map(|actor_id| Balance {
253            actor_id: *actor_id,
254            balance: -(expected as i32),
255            builder: BitmapBuilder::zeroed(vnode_count),
256        })
257        .collect_vec();
258
259    for balance in created_balances
260        .iter_mut()
261        .rev()
262        .take(prev_remain)
263        .chain(rest_balances.iter_mut())
264    {
265        if remain > 0 {
266            balance.balance -= 1;
267            remain -= 1;
268        }
269    }
270
271    // consume the rest `remain`
272    for balance in &mut created_balances {
273        if remain > 0 {
274            balance.balance -= 1;
275            remain -= 1;
276        }
277    }
278
279    assert_eq!(remain, 0);
280
281    let mut v: VecDeque<_> = removed_balances
282        .chain(rest_balances)
283        .chain(created_balances)
284        .collect();
285
286    // We will return the full bitmap here after rebalancing,
287    // if we want to return only the changed actors, filter balance = 0 here
288    let mut result = HashMap::with_capacity(target_actor_count);
289
290    for balance in &v {
291        tracing::debug!(
292            "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
293            balance.actor_id,
294            balance.balance,
295            actors_to_remove.contains(&balance.actor_id),
296            actors_to_create.contains(&balance.actor_id)
297        );
298    }
299
300    while !v.is_empty() {
301        if v.len() == 1 {
302            let single = v.pop_front().unwrap();
303            assert_eq!(single.balance, 0);
304            if !actors_to_remove.contains(&single.actor_id) {
305                result.insert(single.actor_id, single.builder.finish());
306            }
307
308            continue;
309        }
310
311        let mut src = v.pop_front().unwrap();
312        let mut dst = v.pop_back().unwrap();
313
314        let n = min(abs(src.balance), abs(dst.balance));
315
316        let mut moved = 0;
317        for idx in (0..vnode_count).rev() {
318            if moved >= n {
319                break;
320            }
321
322            if src.builder.is_set(idx) {
323                src.builder.set(idx, false);
324                assert!(!dst.builder.is_set(idx));
325                dst.builder.set(idx, true);
326                moved += 1;
327            }
328        }
329
330        src.balance -= n;
331        dst.balance += n;
332
333        if src.balance != 0 {
334            v.push_front(src);
335        } else if !actors_to_remove.contains(&src.actor_id) {
336            result.insert(src.actor_id, src.builder.finish());
337        }
338
339        if dst.balance != 0 {
340            v.push_back(dst);
341        } else {
342            result.insert(dst.actor_id, dst.builder.finish());
343        }
344    }
345
346    result
347}
348
349#[derive(Debug, Clone, Copy)]
350pub struct RescheduleOptions {
351    /// Whether to resolve the upstream of `NoShuffle` when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree.
352    pub resolve_no_shuffle_upstream: bool,
353
354    /// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created.
355    pub skip_create_new_actors: bool,
356}
357
358pub type ScaleControllerRef = Arc<ScaleController>;
359
360pub struct ScaleController {
361    pub metadata_manager: MetadataManager,
362
363    pub source_manager: SourceManagerRef,
364
365    pub env: MetaSrvEnv,
366
367    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
368    /// e.g., a MV cannot be rescheduled during foreground backfill.
369    pub reschedule_lock: RwLock<()>,
370}
371
372impl ScaleController {
373    pub fn new(
374        metadata_manager: &MetadataManager,
375        source_manager: SourceManagerRef,
376        env: MetaSrvEnv,
377    ) -> Self {
378        Self {
379            metadata_manager: metadata_manager.clone(),
380            source_manager,
381            env,
382            reschedule_lock: RwLock::new(()),
383        }
384    }
385
386    pub async fn integrity_check(&self) -> MetaResult<()> {
387        self.metadata_manager
388            .catalog_controller
389            .integrity_check()
390            .await
391    }
392
393    /// Build the context for rescheduling and do some validation for the request.
394    async fn build_reschedule_context(
395        &self,
396        reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
397        options: RescheduleOptions,
398        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
399    ) -> MetaResult<RescheduleContext> {
400        let worker_nodes: HashMap<WorkerId, WorkerNode> = self
401            .metadata_manager
402            .list_active_streaming_compute_nodes()
403            .await?
404            .into_iter()
405            .map(|worker_node| (worker_node.id as _, worker_node))
406            .collect();
407
408        if worker_nodes.is_empty() {
409            bail!("no available compute node in the cluster");
410        }
411
412        // Check if we are trying to move a fragment to a node marked as unschedulable
413        let unschedulable_worker_ids: HashSet<_> = worker_nodes
414            .values()
415            .filter(|w| {
416                w.property
417                    .as_ref()
418                    .map(|property| property.is_unschedulable)
419                    .unwrap_or(false)
420            })
421            .map(|worker| worker.id as WorkerId)
422            .collect();
423
424        for (fragment_id, reschedule) in &*reschedule {
425            for (worker_id, change) in &reschedule.worker_actor_diff {
426                if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
427                    bail!(
428                        "unable to move fragment {} to unschedulable worker {}",
429                        fragment_id,
430                        worker_id
431                    );
432                }
433            }
434        }
435
436        // FIXME: the same as anther place calling `list_table_fragments` in scaling.
437        // Index for StreamActor
438        let mut actor_map = HashMap::new();
439        // Index for Fragment
440        let mut fragment_map = HashMap::new();
441        // Index for actor status, including actor's worker id
442        let mut actor_status = BTreeMap::new();
443        let mut fragment_state = HashMap::new();
444        let mut fragment_to_table = HashMap::new();
445
446        fn fulfill_index_by_fragment_ids(
447            actor_map: &mut HashMap<u32, CustomActorInfo>,
448            fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
449            actor_status: &mut BTreeMap<ActorId, WorkerId>,
450            fragment_state: &mut HashMap<FragmentId, State>,
451            fragment_to_table: &mut HashMap<FragmentId, TableId>,
452            fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
453            actors: HashMap<ActorId, actor::Model>,
454            mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
455            related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
456        ) {
457            let mut fragment_actors: HashMap<
458                risingwave_meta_model::FragmentId,
459                Vec<CustomActorInfo>,
460            > = HashMap::new();
461
462            let mut expr_contexts = HashMap::new();
463            for (
464                _,
465                actor::Model {
466                    actor_id,
467                    fragment_id,
468                    status: _,
469                    worker_id,
470                    vnode_bitmap,
471                    expr_context,
472                    ..
473                },
474            ) in actors
475            {
476                let dispatchers = actor_dispatchers
477                    .remove(&(actor_id as _))
478                    .unwrap_or_default();
479
480                let actor_info = CustomActorInfo {
481                    actor_id: actor_id as _,
482                    fragment_id: fragment_id as _,
483                    dispatcher: dispatchers,
484                    vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
485                };
486
487                actor_map.insert(actor_id as _, actor_info.clone());
488
489                fragment_actors
490                    .entry(fragment_id as _)
491                    .or_default()
492                    .push(actor_info);
493
494                actor_status.insert(actor_id as _, worker_id as WorkerId);
495
496                expr_contexts.insert(actor_id as u32, expr_context);
497            }
498
499            for (
500                _,
501                fragment::Model {
502                    fragment_id,
503                    job_id,
504                    fragment_type_mask,
505                    distribution_type,
506                    stream_node,
507                    state_table_ids,
508                    ..
509                },
510            ) in fragments
511            {
512                let actors = fragment_actors
513                    .remove(&(fragment_id as _))
514                    .unwrap_or_default();
515
516                let CustomActorInfo {
517                    actor_id,
518                    fragment_id,
519                    dispatcher,
520                    vnode_bitmap,
521                } = actors.first().unwrap().clone();
522
523                let (related_job, job_definition) =
524                    related_jobs.get(&job_id).expect("job not found");
525
526                let fragment = CustomFragmentInfo {
527                    job_id: job_id as _,
528                    fragment_id: fragment_id as _,
529                    fragment_type_mask: fragment_type_mask.into(),
530                    distribution_type: distribution_type.into(),
531                    state_table_ids: state_table_ids.into_u32_array(),
532                    node: stream_node.to_protobuf(),
533                    actor_template: (
534                        StreamActor {
535                            actor_id,
536                            fragment_id: fragment_id as _,
537                            vnode_bitmap,
538                            mview_definition: job_definition.to_owned(),
539                            expr_context: expr_contexts
540                                .get(&actor_id)
541                                .cloned()
542                                .map(|expr_context| expr_context.to_protobuf()),
543                        },
544                        dispatcher,
545                    ),
546                    actors,
547                };
548
549                fragment_map.insert(fragment_id as _, fragment);
550
551                fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
552
553                fragment_state.insert(
554                    fragment_id,
555                    table_fragments::PbState::from(related_job.job_status),
556                );
557            }
558        }
559        let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
560        let working_set = self
561            .metadata_manager
562            .catalog_controller
563            .resolve_working_set_for_reschedule_fragments(fragment_ids)
564            .await?;
565
566        fulfill_index_by_fragment_ids(
567            &mut actor_map,
568            &mut fragment_map,
569            &mut actor_status,
570            &mut fragment_state,
571            &mut fragment_to_table,
572            working_set.fragments,
573            working_set.actors,
574            working_set.actor_dispatchers,
575            working_set.related_jobs,
576        );
577
578        // NoShuffle relation index
579        let mut no_shuffle_source_fragment_ids = HashSet::new();
580        let mut no_shuffle_target_fragment_ids = HashSet::new();
581
582        Self::build_no_shuffle_relation_index(
583            &actor_map,
584            &mut no_shuffle_source_fragment_ids,
585            &mut no_shuffle_target_fragment_ids,
586        );
587
588        if options.resolve_no_shuffle_upstream {
589            let original_reschedule_keys = reschedule.keys().cloned().collect();
590
591            Self::resolve_no_shuffle_upstream_fragments(
592                reschedule,
593                &no_shuffle_source_fragment_ids,
594                &no_shuffle_target_fragment_ids,
595                &working_set.fragment_upstreams,
596            )?;
597
598            if !table_parallelisms.is_empty() {
599                // We need to reiterate through the NO_SHUFFLE dependencies in order to ascertain which downstream table the custom modifications of the table have been propagated from.
600                Self::resolve_no_shuffle_upstream_tables(
601                    original_reschedule_keys,
602                    &no_shuffle_source_fragment_ids,
603                    &no_shuffle_target_fragment_ids,
604                    &fragment_to_table,
605                    &working_set.fragment_upstreams,
606                    table_parallelisms,
607                )?;
608            }
609        }
610
611        let mut fragment_dispatcher_map = HashMap::new();
612        Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
613
614        let mut stream_source_fragment_ids = HashSet::new();
615        let mut stream_source_backfill_fragment_ids = HashMap::new();
616        let mut no_shuffle_reschedule = HashMap::new();
617        for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
618            let fragment = fragment_map
619                .get(fragment_id)
620                .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
621
622            // Check if the rescheduling is supported.
623            match fragment_state[fragment_id] {
624                table_fragments::State::Unspecified => unreachable!(),
625                state @ table_fragments::State::Initial => {
626                    bail!(
627                        "the materialized view of fragment {fragment_id} is in state {}",
628                        state.as_str_name()
629                    )
630                }
631                state @ table_fragments::State::Creating => {
632                    let stream_node = &fragment.node;
633
634                    let mut is_reschedulable = true;
635                    visit_stream_node_cont(stream_node, |body| {
636                        if let Some(NodeBody::StreamScan(node)) = &body.node_body {
637                            if !node.stream_scan_type().is_reschedulable() {
638                                is_reschedulable = false;
639
640                                // fail fast
641                                return false;
642                            }
643
644                            // continue visiting
645                            return true;
646                        }
647
648                        // continue visiting
649                        true
650                    });
651
652                    if !is_reschedulable {
653                        bail!(
654                            "the materialized view of fragment {fragment_id} is in state {}",
655                            state.as_str_name()
656                        )
657                    }
658                }
659                table_fragments::State::Created => {}
660            }
661
662            if no_shuffle_target_fragment_ids.contains(fragment_id) {
663                bail!(
664                    "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
665                );
666            }
667
668            // For the relation of NoShuffle (e.g. Materialize and Chain), we need a special
669            // treatment because the upstream and downstream of NoShuffle are always 1-1
670            // correspondence, so we need to clone the reschedule plan to the downstream of all
671            // cascading relations.
672            if no_shuffle_source_fragment_ids.contains(fragment_id) {
673                // This fragment is a NoShuffle's upstream.
674                let mut queue: VecDeque<_> = fragment_dispatcher_map
675                    .get(fragment_id)
676                    .unwrap()
677                    .keys()
678                    .cloned()
679                    .collect();
680
681                while let Some(downstream_id) = queue.pop_front() {
682                    if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
683                        continue;
684                    }
685
686                    if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
687                    {
688                        let no_shuffle_downstreams = downstream_fragments
689                            .iter()
690                            .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
691                            .map(|(fragment_id, _)| fragment_id);
692
693                        queue.extend(no_shuffle_downstreams.copied());
694                    }
695
696                    no_shuffle_reschedule.insert(
697                        downstream_id,
698                        WorkerReschedule {
699                            worker_actor_diff: worker_actor_diff.clone(),
700                        },
701                    );
702                }
703            }
704
705            if fragment
706                .fragment_type_mask
707                .contains(FragmentTypeFlag::Source)
708                && fragment.node.find_stream_source().is_some()
709            {
710                stream_source_fragment_ids.insert(*fragment_id);
711            }
712
713            // Check if the reschedule plan is valid.
714            let current_worker_ids = fragment
715                .actors
716                .iter()
717                .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
718                .collect::<HashSet<_>>();
719
720            for (removed, change) in worker_actor_diff {
721                if !current_worker_ids.contains(removed) && change.is_negative() {
722                    bail!(
723                        "no actor on the worker {} of fragment {}",
724                        removed,
725                        fragment_id
726                    );
727                }
728            }
729
730            let added_actor_count: usize = worker_actor_diff
731                .values()
732                .filter(|change| change.is_positive())
733                .cloned()
734                .map(|change| change as usize)
735                .sum();
736
737            let removed_actor_count: usize = worker_actor_diff
738                .values()
739                .filter(|change| change.is_positive())
740                .cloned()
741                .map(|v| v.unsigned_abs())
742                .sum();
743
744            match fragment.distribution_type {
745                FragmentDistributionType::Hash => {
746                    if fragment.actors.len() + added_actor_count <= removed_actor_count {
747                        bail!("can't remove all actors from fragment {}", fragment_id);
748                    }
749                }
750                FragmentDistributionType::Single => {
751                    if added_actor_count != removed_actor_count {
752                        bail!("single distribution fragment only support migration");
753                    }
754                }
755                FragmentDistributionType::Unspecified => unreachable!(),
756            }
757        }
758
759        if !no_shuffle_reschedule.is_empty() {
760            tracing::info!(
761                "reschedule plan rewritten with NoShuffle reschedule {:?}",
762                no_shuffle_reschedule
763            );
764
765            for noshuffle_downstream in no_shuffle_reschedule.keys() {
766                let fragment = fragment_map.get(noshuffle_downstream).unwrap();
767                // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
768                if fragment
769                    .fragment_type_mask
770                    .contains(FragmentTypeFlag::SourceScan)
771                {
772                    let stream_node = &fragment.node;
773                    if let Some((_source_id, upstream_source_fragment_id)) =
774                        stream_node.find_source_backfill()
775                    {
776                        stream_source_backfill_fragment_ids
777                            .insert(fragment.fragment_id, upstream_source_fragment_id);
778                    }
779                }
780            }
781        }
782
783        // Modifications for NoShuffle downstream.
784        reschedule.extend(no_shuffle_reschedule.into_iter());
785
786        Ok(RescheduleContext {
787            actor_map,
788            actor_status,
789            fragment_map,
790            stream_source_fragment_ids,
791            stream_source_backfill_fragment_ids,
792            no_shuffle_target_fragment_ids,
793            no_shuffle_source_fragment_ids,
794            fragment_dispatcher_map,
795            fragment_upstreams: working_set.fragment_upstreams,
796        })
797    }
798
799    /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`].
800    ///
801    /// Returns `(reschedule_fragment, applied_reschedules)`
802    /// - `reschedule_fragment`: the generated reschedule plan
803    /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1).
804    ///
805    /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors`), we use the returned values to
806    /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling.
807    /// Meta store is updated after the barrier is collected.
808    ///
809    /// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta.
810    pub(crate) async fn analyze_reschedule_plan(
811        &self,
812        mut reschedules: HashMap<FragmentId, WorkerReschedule>,
813        options: RescheduleOptions,
814        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
815    ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
816        tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
817        let ctx = self
818            .build_reschedule_context(&mut reschedules, options, table_parallelisms)
819            .await?;
820        tracing::debug!("reschedule context: {:#?}", ctx);
821        let reschedules = reschedules;
822
823        // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.
824
825        // Index of actors to create/remove
826        // Fragment Id => ( Actor Id => Worker Id )
827        let (fragment_actors_to_remove, fragment_actors_to_create) =
828            self.arrange_reschedules(&reschedules, &ctx)?;
829
830        let mut fragment_actor_bitmap = HashMap::new();
831        for fragment_id in reschedules.keys() {
832            if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
833                // skipping chain fragment, we need to clone the upstream materialize fragment's
834                // mapping later
835                continue;
836            }
837
838            let actors_to_create = fragment_actors_to_create
839                .get(fragment_id)
840                .map(|map| map.keys().copied().collect())
841                .unwrap_or_default();
842
843            let actors_to_remove = fragment_actors_to_remove
844                .get(fragment_id)
845                .map(|map| map.keys().copied().collect())
846                .unwrap_or_default();
847
848            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
849
850            match fragment.distribution_type {
851                FragmentDistributionType::Single => {
852                    // Skip re-balancing action for single distribution (always None)
853                    fragment_actor_bitmap
854                        .insert(fragment.fragment_id as FragmentId, Default::default());
855                }
856                FragmentDistributionType::Hash => {
857                    let actor_vnode = rebalance_actor_vnode(
858                        &fragment.actors,
859                        &actors_to_remove,
860                        &actors_to_create,
861                    );
862
863                    fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
864                }
865
866                FragmentDistributionType::Unspecified => unreachable!(),
867            }
868        }
869
870        // Index for fragment -> { actor -> worker_id } after reschedule.
871        // Since we need to organize the upstream and downstream relationships of NoShuffle,
872        // we need to organize the actor distribution after a scaling.
873        let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
874        for fragment_id in reschedules.keys() {
875            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
876            let mut new_actor_ids = BTreeMap::new();
877            for actor in &fragment.actors {
878                if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
879                    && actors_to_remove.contains_key(&actor.actor_id)
880                {
881                    continue;
882                }
883                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
884                new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
885            }
886
887            if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
888                for (actor_id, worker_id) in actors_to_create {
889                    new_actor_ids.insert(*actor_id, *worker_id);
890                }
891            }
892
893            assert!(
894                !new_actor_ids.is_empty(),
895                "should be at least one actor in fragment {} after rescheduling",
896                fragment_id
897            );
898
899            fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
900        }
901
902        let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
903
904        // In order to maintain consistency with the original structure, the upstream and downstream
905        // actors of NoShuffle need to be in the same worker slot and hold the same virtual nodes,
906        // so for the actors after the upstream re-balancing, since we have sorted the actors of the same fragment by id on all workers,
907        // we can identify the corresponding upstream actor with NO_SHUFFLE.
908        // NOTE: There should be more asserts here to ensure correctness.
909        fn arrange_no_shuffle_relation(
910            ctx: &RescheduleContext,
911            fragment_id: &FragmentId,
912            upstream_fragment_id: &FragmentId,
913            fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
914            actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
915            fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
916            no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
917            no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
918        ) {
919            if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
920                return;
921            }
922
923            let fragment = &ctx.fragment_map[fragment_id];
924
925            let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
926
927            // build actor group map
928            for upstream_actor in &upstream_fragment.actors {
929                for dispatcher in &upstream_actor.dispatcher {
930                    if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
931                        let downstream_actor_id =
932                            *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
933
934                        // upstream is root
935                        if !ctx
936                            .no_shuffle_target_fragment_ids
937                            .contains(upstream_fragment_id)
938                        {
939                            actor_group_map.insert(
940                                upstream_actor.actor_id,
941                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
942                            );
943                            actor_group_map.insert(
944                                downstream_actor_id,
945                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
946                            );
947                        } else {
948                            let root_actor_id = actor_group_map[&upstream_actor.actor_id];
949
950                            actor_group_map.insert(downstream_actor_id, root_actor_id);
951                        }
952                    }
953                }
954            }
955
956            // If the upstream is a Singleton Fragment, there will be no Bitmap changes
957            let upstream_fragment_bitmap = fragment_updated_bitmap
958                .get(upstream_fragment_id)
959                .cloned()
960                .unwrap_or_default();
961
962            // Question: Is it possible to have Hash Distribution Fragment but the Actor's bitmap remains unchanged?
963            if upstream_fragment.distribution_type == FragmentDistributionType::Single {
964                assert!(
965                    upstream_fragment_bitmap.is_empty(),
966                    "single fragment should have no bitmap updates"
967                );
968            }
969
970            let upstream_fragment_actor_map = fragment_actors_after_reschedule
971                .get(upstream_fragment_id)
972                .cloned()
973                .unwrap();
974
975            let fragment_actor_map = fragment_actors_after_reschedule
976                .get(fragment_id)
977                .cloned()
978                .unwrap();
979
980            let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
981
982            // first, find existing actor bitmap, copy them
983            let mut fragment_bitmap = HashMap::new();
984
985            for (actor_id, worker_id) in &fragment_actor_map {
986                if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
987                    let root_bitmap = fragment_updated_bitmap
988                        .get(root_fragment)
989                        .expect("root fragment bitmap not found")
990                        .get(root_actor_id)
991                        .cloned()
992                        .expect("root actor bitmap not found");
993
994                    // Copy the bitmap
995                    fragment_bitmap.insert(*actor_id, root_bitmap);
996
997                    no_shuffle_upstream_actor_map
998                        .entry(*actor_id as ActorId)
999                        .or_default()
1000                        .insert(*upstream_fragment_id, *root_actor_id);
1001                    no_shuffle_downstream_actors_map
1002                        .entry(*root_actor_id)
1003                        .or_default()
1004                        .insert(*fragment_id, *actor_id);
1005                } else {
1006                    worker_reverse_index
1007                        .entry(*worker_id)
1008                        .or_default()
1009                        .insert(*actor_id);
1010                }
1011            }
1012
1013            let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1014
1015            for (actor_id, worker_id) in &upstream_fragment_actor_map {
1016                if !actor_group_map.contains_key(actor_id) {
1017                    upstream_worker_reverse_index
1018                        .entry(*worker_id)
1019                        .or_default()
1020                        .insert(*actor_id);
1021                }
1022            }
1023
1024            // then, find the rest of the actors and copy the bitmap
1025            for (worker_id, actor_ids) in worker_reverse_index {
1026                let upstream_actor_ids = upstream_worker_reverse_index
1027                    .get(&worker_id)
1028                    .unwrap()
1029                    .clone();
1030
1031                assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1032
1033                for (actor_id, upstream_actor_id) in actor_ids
1034                    .into_iter()
1035                    .zip_eq_debug(upstream_actor_ids.into_iter())
1036                {
1037                    match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1038                        None => {
1039                            // single fragment should have no bitmap updates (same as upstream)
1040                            assert_eq!(
1041                                upstream_fragment.distribution_type,
1042                                FragmentDistributionType::Single
1043                            );
1044                        }
1045                        Some(bitmap) => {
1046                            // Copy the bitmap
1047                            fragment_bitmap.insert(actor_id, bitmap);
1048                        }
1049                    }
1050
1051                    no_shuffle_upstream_actor_map
1052                        .entry(actor_id as ActorId)
1053                        .or_default()
1054                        .insert(*upstream_fragment_id, upstream_actor_id);
1055                    no_shuffle_downstream_actors_map
1056                        .entry(upstream_actor_id)
1057                        .or_default()
1058                        .insert(*fragment_id, actor_id);
1059                }
1060            }
1061
1062            match fragment.distribution_type {
1063                FragmentDistributionType::Hash => {}
1064                FragmentDistributionType::Single => {
1065                    // single distribution should update nothing
1066                    assert!(fragment_bitmap.is_empty());
1067                }
1068                FragmentDistributionType::Unspecified => unreachable!(),
1069            }
1070
1071            if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1072                assert_eq!(
1073                    e.entry.get(),
1074                    &e.value,
1075                    "bitmaps derived from different no-shuffle upstreams mismatch"
1076                );
1077            }
1078
1079            // Visit downstream fragments recursively.
1080            if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1081                let no_shuffle_downstreams = downstream_fragments
1082                    .iter()
1083                    .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1084                    .map(|(fragment_id, _)| fragment_id);
1085
1086                for downstream_fragment_id in no_shuffle_downstreams {
1087                    arrange_no_shuffle_relation(
1088                        ctx,
1089                        downstream_fragment_id,
1090                        fragment_id,
1091                        fragment_actors_after_reschedule,
1092                        actor_group_map,
1093                        fragment_updated_bitmap,
1094                        no_shuffle_upstream_actor_map,
1095                        no_shuffle_downstream_actors_map,
1096                    );
1097                }
1098            }
1099        }
1100
1101        let mut no_shuffle_upstream_actor_map = HashMap::new();
1102        let mut no_shuffle_downstream_actors_map = HashMap::new();
1103        let mut actor_group_map = HashMap::new();
1104        // For all roots in the upstream and downstream dependency trees of NoShuffle, recursively
1105        // find all correspondences
1106        for fragment_id in reschedules.keys() {
1107            if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1108                && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1109                && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1110            {
1111                for downstream_fragment_id in downstream_fragments.keys() {
1112                    arrange_no_shuffle_relation(
1113                        &ctx,
1114                        downstream_fragment_id,
1115                        fragment_id,
1116                        &fragment_actors_after_reschedule,
1117                        &mut actor_group_map,
1118                        &mut fragment_actor_bitmap,
1119                        &mut no_shuffle_upstream_actor_map,
1120                        &mut no_shuffle_downstream_actors_map,
1121                    );
1122                }
1123            }
1124        }
1125
1126        tracing::debug!("actor group map {:?}", actor_group_map);
1127
1128        let mut new_created_actors = HashMap::new();
1129        for fragment_id in reschedules.keys() {
1130            let actors_to_create = fragment_actors_to_create
1131                .get(fragment_id)
1132                .cloned()
1133                .unwrap_or_default();
1134
1135            let fragment = &ctx.fragment_map[fragment_id];
1136
1137            assert!(!fragment.actors.is_empty());
1138
1139            for actor_to_create in &actors_to_create {
1140                let new_actor_id = actor_to_create.0;
1141                let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1142
1143                // This should be assigned before the `modify_actor_upstream_and_downstream` call,
1144                // because we need to use the new actor id to find the upstream and
1145                // downstream in the NoShuffle relationship
1146                new_actor.actor_id = *new_actor_id;
1147
1148                Self::modify_actor_upstream_and_downstream(
1149                    &ctx,
1150                    &fragment_actors_to_remove,
1151                    &fragment_actors_to_create,
1152                    &fragment_actor_bitmap,
1153                    &no_shuffle_downstream_actors_map,
1154                    &mut new_actor,
1155                    &mut dispatchers,
1156                )?;
1157
1158                if let Some(bitmap) = fragment_actor_bitmap
1159                    .get(fragment_id)
1160                    .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1161                {
1162                    new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1163                }
1164
1165                new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1166            }
1167        }
1168
1169        // For stream source & source backfill fragments, we need to reallocate the splits.
1170        // Because we are in the Pause state, so it's no problem to reallocate
1171        let mut fragment_actor_splits = HashMap::new();
1172        for fragment_id in reschedules.keys() {
1173            let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1174
1175            if ctx.stream_source_fragment_ids.contains(fragment_id) {
1176                let fragment = &ctx.fragment_map[fragment_id];
1177
1178                let prev_actor_ids = fragment
1179                    .actors
1180                    .iter()
1181                    .map(|actor| actor.actor_id)
1182                    .collect_vec();
1183
1184                let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1185
1186                let actor_splits = self
1187                    .env
1188                    .shared_actor_infos()
1189                    .migrate_splits_for_source_actors(
1190                        *fragment_id,
1191                        &prev_actor_ids,
1192                        &curr_actor_ids,
1193                    )?;
1194
1195                tracing::debug!(
1196                    "source actor splits: {:?}, fragment_id: {}",
1197                    actor_splits,
1198                    fragment_id
1199                );
1200                fragment_actor_splits.insert(*fragment_id, actor_splits);
1201            }
1202        }
1203        // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
1204        if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1205            for fragment_id in reschedules.keys() {
1206                let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1207
1208                if let Some(upstream_source_fragment_id) =
1209                    ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1210                {
1211                    let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1212
1213                    let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1214                        *fragment_id,
1215                        *upstream_source_fragment_id,
1216                        &curr_actor_ids,
1217                        &fragment_actor_splits,
1218                        &no_shuffle_upstream_actor_map,
1219                    )?;
1220                    tracing::debug!(
1221                        "source backfill actor splits: {:?}, fragment_id: {}",
1222                        actor_splits,
1223                        fragment_id
1224                    );
1225                    fragment_actor_splits.insert(*fragment_id, actor_splits);
1226                }
1227            }
1228        }
1229
1230        // Generate fragment reschedule plan
1231        let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1232            HashMap::with_capacity(reschedules.len());
1233
1234        for (fragment_id, _) in reschedules {
1235            let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1236
1237            if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1238                for (actor_id, worker_id) in actor_worker_maps {
1239                    actors_to_create
1240                        .entry(worker_id)
1241                        .or_default()
1242                        .push(actor_id);
1243                }
1244            }
1245
1246            let actors_to_remove = fragment_actors_to_remove
1247                .get(&fragment_id)
1248                .cloned()
1249                .unwrap_or_default()
1250                .into_keys()
1251                .collect();
1252
1253            let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1254
1255            assert!(!actors_after_reschedule.is_empty());
1256
1257            let fragment = &ctx.fragment_map[&fragment_id];
1258
1259            let in_degree_types: HashSet<_> = ctx
1260                .fragment_upstreams
1261                .get(&(fragment_id as _))
1262                .map(|upstreams| upstreams.values())
1263                .into_iter()
1264                .flatten()
1265                .cloned()
1266                .collect();
1267
1268            let upstream_dispatcher_mapping = match fragment.distribution_type {
1269                FragmentDistributionType::Hash => {
1270                    if !in_degree_types.contains(&DispatcherType::Hash) {
1271                        None
1272                    } else {
1273                        // Changes of the bitmap must occur in the case of HashDistribution
1274                        Some(ActorMapping::from_bitmaps(
1275                            &fragment_actor_bitmap[&fragment_id],
1276                        ))
1277                    }
1278                }
1279
1280                FragmentDistributionType::Single => {
1281                    assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1282                    None
1283                }
1284                FragmentDistributionType::Unspecified => unreachable!(),
1285            };
1286
1287            let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1288
1289            {
1290                if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1291                    for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1292                        match upstream_dispatcher_type {
1293                            DispatcherType::NoShuffle => {}
1294                            _ => {
1295                                upstream_fragment_dispatcher_set.insert((
1296                                    *upstream_fragment_id as FragmentId,
1297                                    fragment.fragment_id as DispatcherId,
1298                                ));
1299                            }
1300                        }
1301                    }
1302                }
1303            }
1304
1305            let downstream_fragment_ids = if let Some(downstream_fragments) =
1306                ctx.fragment_dispatcher_map.get(&fragment_id)
1307            {
1308                // Skip fragments' no-shuffle downstream, as there's no need to update the merger
1309                // (receiver) of a no-shuffle downstream
1310                downstream_fragments
1311                    .iter()
1312                    .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1313                    .map(|(fragment_id, _)| *fragment_id)
1314                    .collect_vec()
1315            } else {
1316                vec![]
1317            };
1318
1319            let vnode_bitmap_updates = match fragment.distribution_type {
1320                FragmentDistributionType::Hash => {
1321                    let mut vnode_bitmap_updates =
1322                        fragment_actor_bitmap.remove(&fragment_id).unwrap();
1323
1324                    // We need to keep the bitmaps from changed actors only,
1325                    // otherwise the barrier will become very large with many actors
1326                    for actor_id in actors_after_reschedule.keys() {
1327                        assert!(vnode_bitmap_updates.contains_key(actor_id));
1328
1329                        // retain actor
1330                        if let Some(actor) = ctx.actor_map.get(actor_id) {
1331                            let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1332
1333                            if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1334                                && prev_bitmap.eq(bitmap)
1335                            {
1336                                vnode_bitmap_updates.remove(actor_id);
1337                            }
1338                        }
1339                    }
1340
1341                    vnode_bitmap_updates
1342                }
1343                FragmentDistributionType::Single => HashMap::new(),
1344                FragmentDistributionType::Unspecified => unreachable!(),
1345            };
1346
1347            let upstream_fragment_dispatcher_ids =
1348                upstream_fragment_dispatcher_set.into_iter().collect_vec();
1349
1350            let actor_splits = fragment_actor_splits
1351                .get(&fragment_id)
1352                .cloned()
1353                .unwrap_or_default();
1354
1355            let mut cdc_table_id = None;
1356            let cdc_table_snapshot_split_assignment = if fragment
1357                .fragment_type_mask
1358                .contains(FragmentTypeFlag::StreamCdcScan)
1359            {
1360                cdc_table_id = Some(fragment.job_id);
1361                assign_cdc_table_snapshot_splits_impl(
1362                    fragment.job_id,
1363                    fragment_actors_after_reschedule
1364                        .get(&fragment_id)
1365                        .unwrap()
1366                        .keys()
1367                        .copied()
1368                        .collect(),
1369                    self.env.meta_store_ref(),
1370                    None,
1371                )
1372                .await?
1373            } else {
1374                HashMap::default()
1375            };
1376
1377            reschedule_fragment.insert(
1378                fragment_id,
1379                Reschedule {
1380                    added_actors: actors_to_create,
1381                    removed_actors: actors_to_remove,
1382                    vnode_bitmap_updates,
1383                    upstream_fragment_dispatcher_ids,
1384                    upstream_dispatcher_mapping,
1385                    downstream_fragment_ids,
1386                    actor_splits,
1387                    newly_created_actors: Default::default(),
1388                    cdc_table_snapshot_split_assignment,
1389                    cdc_table_id,
1390                },
1391            );
1392        }
1393
1394        let mut fragment_created_actors = HashMap::new();
1395        for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1396            let mut created_actors = HashMap::new();
1397            for (actor_id, worker_id) in actors_to_create {
1398                let actor = new_created_actors.get(actor_id).cloned().unwrap();
1399                created_actors.insert(*actor_id, (actor, *worker_id));
1400            }
1401
1402            fragment_created_actors.insert(*fragment_id, created_actors);
1403        }
1404
1405        for (fragment_id, to_create) in fragment_created_actors {
1406            let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1407            reschedule.newly_created_actors = to_create;
1408        }
1409        tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1410
1411        Ok(reschedule_fragment)
1412    }
1413
1414    #[expect(clippy::type_complexity)]
1415    fn arrange_reschedules(
1416        &self,
1417        reschedule: &HashMap<FragmentId, WorkerReschedule>,
1418        ctx: &RescheduleContext,
1419    ) -> MetaResult<(
1420        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1421        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1422    )> {
1423        let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1424        let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1425
1426        for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1427            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1428
1429            // Actor Id => Worker Id
1430            let mut actors_to_remove = BTreeMap::new();
1431            let mut actors_to_create = BTreeMap::new();
1432
1433            // NOTE(important): The value needs to be a BTreeSet to ensure that the actors on the worker are sorted in ascending order.
1434            let mut worker_to_actors = HashMap::new();
1435
1436            for actor in &fragment.actors {
1437                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1438                worker_to_actors
1439                    .entry(worker_id)
1440                    .or_insert(BTreeSet::new())
1441                    .insert(actor.actor_id as ActorId);
1442            }
1443
1444            let decreased_actor_count = worker_actor_diff
1445                .iter()
1446                .filter(|(_, change)| change.is_negative())
1447                .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1448
1449            for (worker_id, n) in decreased_actor_count {
1450                if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1451                    if actor_ids.len() < n {
1452                        bail!(
1453                            "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1454                            fragment_id,
1455                            worker_id,
1456                            actor_ids.len(),
1457                            n
1458                        );
1459                    }
1460
1461                    let removed_actors: Vec<_> = actor_ids
1462                        .iter()
1463                        .skip(actor_ids.len().saturating_sub(n))
1464                        .cloned()
1465                        .collect();
1466
1467                    for actor in removed_actors {
1468                        actors_to_remove.insert(actor, *worker_id);
1469                    }
1470                }
1471            }
1472
1473            let increased_actor_count = worker_actor_diff
1474                .iter()
1475                .filter(|(_, change)| change.is_positive());
1476
1477            for (worker, n) in increased_actor_count {
1478                for _ in 0..*n {
1479                    let id = self
1480                        .env
1481                        .id_gen_manager()
1482                        .generate_interval::<{ IdCategory::Actor }>(1)
1483                        as ActorId;
1484                    actors_to_create.insert(id, *worker);
1485                }
1486            }
1487
1488            if !actors_to_remove.is_empty() {
1489                fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1490            }
1491
1492            if !actors_to_create.is_empty() {
1493                fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1494            }
1495        }
1496
1497        // sanity checking
1498        for actors_to_remove in fragment_actors_to_remove.values() {
1499            for actor_id in actors_to_remove.keys() {
1500                let actor = ctx.actor_map.get(actor_id).unwrap();
1501                for dispatcher in &actor.dispatcher {
1502                    if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1503                        let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1504
1505                        let _should_exists = fragment_actors_to_remove
1506                            .get(&(dispatcher.dispatcher_id as FragmentId))
1507                            .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1508                            .get(downstream_actor_id)
1509                            .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1510                    }
1511                }
1512            }
1513        }
1514
1515        Ok((fragment_actors_to_remove, fragment_actors_to_create))
1516    }
1517
1518    /// Modifies the upstream and downstream actors of the new created actor according to the
1519    /// overall changes, and is used to handle cascading updates
1520    fn modify_actor_upstream_and_downstream(
1521        ctx: &RescheduleContext,
1522        fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1523        fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1524        fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1525        no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1526        new_actor: &mut StreamActor,
1527        dispatchers: &mut Vec<PbDispatcher>,
1528    ) -> MetaResult<()> {
1529        // Update downstream actor ids
1530        for dispatcher in dispatchers {
1531            let downstream_fragment_id = dispatcher
1532                .downstream_actor_id
1533                .iter()
1534                .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1535                .dedup()
1536                .exactly_one()
1537                .unwrap() as FragmentId;
1538
1539            let downstream_fragment_actors_to_remove =
1540                fragment_actors_to_remove.get(&downstream_fragment_id);
1541            let downstream_fragment_actors_to_create =
1542                fragment_actors_to_create.get(&downstream_fragment_id);
1543
1544            match dispatcher.r#type() {
1545                d @ (PbDispatcherType::Hash
1546                | PbDispatcherType::Simple
1547                | PbDispatcherType::Broadcast) => {
1548                    if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1549                    {
1550                        dispatcher
1551                            .downstream_actor_id
1552                            .retain(|id| !downstream_actors_to_remove.contains_key(id));
1553                    }
1554
1555                    if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1556                    {
1557                        dispatcher
1558                            .downstream_actor_id
1559                            .extend(downstream_actors_to_create.keys().cloned())
1560                    }
1561
1562                    // There should be still exactly one downstream actor
1563                    if d == PbDispatcherType::Simple {
1564                        assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1565                    }
1566                }
1567                PbDispatcherType::NoShuffle => {
1568                    assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1569                    let downstream_actor_id = no_shuffle_downstream_actors_map
1570                        .get(&new_actor.actor_id)
1571                        .and_then(|map| map.get(&downstream_fragment_id))
1572                        .unwrap();
1573                    dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1574                }
1575                PbDispatcherType::Unspecified => unreachable!(),
1576            }
1577
1578            if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1579                && let Some(downstream_updated_bitmap) =
1580                    fragment_actor_bitmap.get(&downstream_fragment_id)
1581            {
1582                // If downstream scale in/out
1583                *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1584            }
1585        }
1586
1587        Ok(())
1588    }
1589
1590    #[await_tree::instrument]
1591    pub async fn post_apply_reschedule(
1592        &self,
1593        reschedules: &HashMap<FragmentId, Reschedule>,
1594        post_updates: &JobReschedulePostUpdates,
1595    ) -> MetaResult<()> {
1596        // Update fragment info after rescheduling in meta store.
1597        self.metadata_manager
1598            .post_apply_reschedules(reschedules.clone(), post_updates)
1599            .await?;
1600
1601        // Update serving fragment info after rescheduling in meta store.
1602        if !reschedules.is_empty() {
1603            let workers = self
1604                .metadata_manager
1605                .list_active_serving_compute_nodes()
1606                .await?;
1607            let streaming_parallelisms = self
1608                .metadata_manager
1609                .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1610                .await?;
1611            let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1612            let max_serving_parallelism = self
1613                .env
1614                .session_params_manager_impl_ref()
1615                .get_params()
1616                .await
1617                .batch_parallelism()
1618                .map(|p| p.get());
1619            let (upserted, failed) = serving_worker_slot_mapping.upsert(
1620                streaming_parallelisms,
1621                &workers,
1622                max_serving_parallelism,
1623            );
1624            if !upserted.is_empty() {
1625                tracing::debug!(
1626                    "Update serving vnode mapping for fragments {:?}.",
1627                    upserted.keys()
1628                );
1629                self.env
1630                    .notification_manager()
1631                    .notify_frontend_without_version(
1632                        Operation::Update,
1633                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1634                            mappings: to_fragment_worker_slot_mapping(&upserted),
1635                        }),
1636                    );
1637            }
1638            if !failed.is_empty() {
1639                tracing::debug!(
1640                    "Fail to update serving vnode mapping for fragments {:?}.",
1641                    failed
1642                );
1643                self.env
1644                    .notification_manager()
1645                    .notify_frontend_without_version(
1646                        Operation::Delete,
1647                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1648                            mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1649                        }),
1650                    );
1651            }
1652        }
1653
1654        Ok(())
1655    }
1656
1657    pub async fn generate_job_reschedule_plan(
1658        &self,
1659        policy: JobReschedulePolicy,
1660        generate_plan_for_cdc_table_backfill: bool,
1661    ) -> MetaResult<JobReschedulePlan> {
1662        type VnodeCount = usize;
1663
1664        let JobReschedulePolicy { targets } = policy;
1665
1666        let workers = self
1667            .metadata_manager
1668            .list_active_streaming_compute_nodes()
1669            .await?;
1670
1671        // The `schedulable` field should eventually be replaced by resource groups like `unschedulable`
1672        let workers: HashMap<_, _> = workers
1673            .into_iter()
1674            .filter(|worker| worker.is_streaming_schedulable())
1675            .map(|worker| (worker.id, worker))
1676            .collect();
1677
1678        #[derive(Debug)]
1679        struct JobUpdate {
1680            filtered_worker_ids: BTreeSet<WorkerId>,
1681            parallelism: TableParallelism,
1682        }
1683
1684        let mut job_parallelism_updates = HashMap::new();
1685
1686        let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1687            parallelism_updates: Default::default(),
1688            resource_group_updates: Default::default(),
1689        };
1690
1691        for (
1692            job_id,
1693            JobRescheduleTarget {
1694                parallelism: parallelism_update,
1695                resource_group: resource_group_update,
1696            },
1697        ) in &targets
1698        {
1699            let parallelism = match parallelism_update {
1700                JobParallelismTarget::Update(parallelism) => *parallelism,
1701                JobParallelismTarget::Refresh => {
1702                    let parallelism = self
1703                        .metadata_manager
1704                        .catalog_controller
1705                        .get_job_streaming_parallelisms(*job_id as _)
1706                        .await?;
1707
1708                    parallelism.into()
1709                }
1710            };
1711
1712            job_reschedule_post_updates
1713                .parallelism_updates
1714                .insert(TableId::from(*job_id), parallelism);
1715
1716            let current_resource_group = match resource_group_update {
1717                JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1718                    job_reschedule_post_updates.resource_group_updates.insert(
1719                        *job_id as ObjectId,
1720                        Some(specific_resource_group.to_owned()),
1721                    );
1722
1723                    specific_resource_group.to_owned()
1724                }
1725                JobResourceGroupTarget::Update(None) => {
1726                    let database_resource_group = self
1727                        .metadata_manager
1728                        .catalog_controller
1729                        .get_existing_job_database_resource_group(*job_id as _)
1730                        .await?;
1731
1732                    job_reschedule_post_updates
1733                        .resource_group_updates
1734                        .insert(*job_id as ObjectId, None);
1735                    database_resource_group
1736                }
1737                JobResourceGroupTarget::Keep => {
1738                    self.metadata_manager
1739                        .catalog_controller
1740                        .get_existing_job_resource_group(*job_id as _)
1741                        .await?
1742                }
1743            };
1744
1745            let filtered_worker_ids =
1746                filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1747
1748            if filtered_worker_ids.is_empty() {
1749                bail!("Cannot resize streaming_job {job_id} to empty worker set")
1750            }
1751
1752            job_parallelism_updates.insert(
1753                *job_id,
1754                JobUpdate {
1755                    filtered_worker_ids,
1756                    parallelism,
1757                },
1758            );
1759        }
1760
1761        // index for no shuffle relation
1762        let mut no_shuffle_source_fragment_ids = HashSet::new();
1763        let mut no_shuffle_target_fragment_ids = HashSet::new();
1764
1765        // index for fragment_id -> (distribution_type, vnode_count)
1766        let mut fragment_distribution_map = HashMap::new();
1767        // index for actor -> worker id
1768        let mut actor_location = HashMap::new();
1769        // index for table_id -> [fragment_id]
1770        let mut table_fragment_id_map = HashMap::new();
1771        // index for fragment_id -> [actor_id]
1772        let mut fragment_actor_id_map = HashMap::new();
1773
1774        async fn build_index(
1775            no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1776            no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1777            fragment_distribution_map: &mut HashMap<
1778                FragmentId,
1779                (FragmentDistributionType, VnodeCount, bool),
1780            >,
1781            actor_location: &mut HashMap<ActorId, WorkerId>,
1782            table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1783            fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1784            mgr: &MetadataManager,
1785            table_ids: Vec<ObjectId>,
1786            generate_plan_only_for_cdc_table_backfill: bool,
1787        ) -> Result<(), MetaError> {
1788            let RescheduleWorkingSet {
1789                fragments,
1790                actors,
1791                actor_dispatchers: _actor_dispatchers,
1792                fragment_downstreams,
1793                fragment_upstreams: _fragment_upstreams,
1794                related_jobs: _related_jobs,
1795                job_resource_groups: _job_resource_groups,
1796            } = mgr
1797                .catalog_controller
1798                .resolve_working_set_for_reschedule_tables(table_ids)
1799                .await?;
1800
1801            for (fragment_id, downstreams) in fragment_downstreams {
1802                for (downstream_fragment_id, dispatcher_type) in downstreams {
1803                    if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1804                        no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1805                        no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1806                    }
1807                }
1808            }
1809
1810            for (fragment_id, fragment) in fragments {
1811                let is_cdc_backfill_v2_fragment =
1812                    FragmentTypeMask::from(fragment.fragment_type_mask)
1813                        .contains(FragmentTypeFlag::StreamCdcScan);
1814                if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1815                    continue;
1816                }
1817                fragment_distribution_map.insert(
1818                    fragment_id as FragmentId,
1819                    (
1820                        FragmentDistributionType::from(fragment.distribution_type),
1821                        fragment.vnode_count as _,
1822                        is_cdc_backfill_v2_fragment,
1823                    ),
1824                );
1825
1826                table_fragment_id_map
1827                    .entry(fragment.job_id as u32)
1828                    .or_default()
1829                    .insert(fragment_id as FragmentId);
1830            }
1831
1832            for (actor_id, actor) in actors {
1833                actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1834                fragment_actor_id_map
1835                    .entry(actor.fragment_id as FragmentId)
1836                    .or_default()
1837                    .insert(actor_id as ActorId);
1838            }
1839
1840            Ok(())
1841        }
1842
1843        let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1844
1845        build_index(
1846            &mut no_shuffle_source_fragment_ids,
1847            &mut no_shuffle_target_fragment_ids,
1848            &mut fragment_distribution_map,
1849            &mut actor_location,
1850            &mut table_fragment_id_map,
1851            &mut fragment_actor_id_map,
1852            &self.metadata_manager,
1853            table_ids,
1854            generate_plan_for_cdc_table_backfill,
1855        )
1856        .await?;
1857        tracing::debug!(
1858            ?job_reschedule_post_updates,
1859            ?job_parallelism_updates,
1860            ?no_shuffle_source_fragment_ids,
1861            ?no_shuffle_target_fragment_ids,
1862            ?fragment_distribution_map,
1863            ?actor_location,
1864            ?table_fragment_id_map,
1865            ?fragment_actor_id_map,
1866            "generate_table_resize_plan, after build_index"
1867        );
1868
1869        let adaptive_parallelism_strategy = self
1870            .env
1871            .system_params_reader()
1872            .await
1873            .adaptive_parallelism_strategy();
1874
1875        let mut target_plan = HashMap::new();
1876
1877        for (
1878            table_id,
1879            JobUpdate {
1880                filtered_worker_ids,
1881                parallelism,
1882            },
1883        ) in job_parallelism_updates
1884        {
1885            let assigner = AssignerBuilder::new(table_id).build();
1886
1887            let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1888
1889            let available_worker_slots = workers
1890                .iter()
1891                .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1892                .map(|(_, worker)| {
1893                    (
1894                        worker.id as WorkerId,
1895                        NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1896                    )
1897                })
1898                .collect::<BTreeMap<_, _>>();
1899
1900            for fragment_id in fragment_map {
1901                // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream.
1902                if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1903                    continue;
1904                }
1905
1906                let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1907
1908                for actor_id in &fragment_actor_id_map[&fragment_id] {
1909                    let worker_id = actor_location[actor_id];
1910                    *fragment_slots.entry(worker_id).or_default() += 1;
1911                }
1912
1913                let available_slot_count: usize = available_worker_slots
1914                    .values()
1915                    .cloned()
1916                    .map(NonZeroUsize::get)
1917                    .sum();
1918
1919                if available_slot_count == 0 {
1920                    bail!(
1921                        "No schedulable slots available for fragment {}",
1922                        fragment_id
1923                    );
1924                }
1925
1926                let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1927                    fragment_distribution_map[&fragment_id];
1928                let max_parallelism = vnode_count;
1929                let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1930                    assert!(is_cdc_backfill_v2_fragment);
1931                    let TableParallelism::Fixed(new_parallelism) = parallelism else {
1932                        return Err(anyhow::anyhow!(
1933                            "invalid new parallelism {:?}, expect fixed parallelism",
1934                            parallelism
1935                        )
1936                        .into());
1937                    };
1938                    if new_parallelism > max_parallelism || new_parallelism == 0 {
1939                        return Err(anyhow::anyhow!(
1940                            "invalid new parallelism {}, max parallelism {}",
1941                            new_parallelism,
1942                            max_parallelism
1943                        )
1944                        .into());
1945                    }
1946                    TableParallelism::Fixed(new_parallelism)
1947                } else if is_cdc_backfill_v2_fragment {
1948                    TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1949                } else {
1950                    parallelism
1951                };
1952                match dist {
1953                    FragmentDistributionType::Unspecified => unreachable!(),
1954                    FragmentDistributionType::Single => {
1955                        let (single_worker_id, should_be_one) = fragment_slots
1956                            .iter()
1957                            .exactly_one()
1958                            .expect("single fragment should have only one worker slot");
1959
1960                        assert_eq!(*should_be_one, 1);
1961
1962                        let assignment =
1963                            assigner.count_actors_per_worker(&available_worker_slots, 1);
1964
1965                        let (chosen_target_worker_id, should_be_one) =
1966                            assignment.iter().exactly_one().ok().with_context(|| {
1967                                format!(
1968                                    "Cannot find a single target worker for fragment {fragment_id}"
1969                                )
1970                            })?;
1971
1972                        assert_eq!(*should_be_one, 1);
1973
1974                        if *chosen_target_worker_id == *single_worker_id {
1975                            tracing::debug!(
1976                                "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1977                            );
1978                            continue;
1979                        }
1980
1981                        target_plan.insert(
1982                            fragment_id,
1983                            WorkerReschedule {
1984                                worker_actor_diff: BTreeMap::from_iter(vec![
1985                                    (*chosen_target_worker_id, 1),
1986                                    (*single_worker_id, -1),
1987                                ]),
1988                            },
1989                        );
1990                    }
1991                    FragmentDistributionType::Hash => match fragment_parallelism_strategy {
1992                        TableParallelism::Adaptive => {
1993                            let target_slot_count = adaptive_parallelism_strategy
1994                                .compute_target_parallelism(available_slot_count);
1995
1996                            if target_slot_count > max_parallelism {
1997                                tracing::warn!(
1998                                    "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1999                                );
2000
2001                                let target_worker_slots = assigner.count_actors_per_worker(
2002                                    &available_worker_slots,
2003                                    max_parallelism,
2004                                );
2005
2006                                target_plan.insert(
2007                                    fragment_id,
2008                                    Self::diff_worker_slot_changes(
2009                                        &fragment_slots,
2010                                        &target_worker_slots,
2011                                    ),
2012                                );
2013                            } else if available_slot_count != target_slot_count {
2014                                tracing::info!(
2015                                    "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2016                                );
2017
2018                                let target_worker_slots = assigner.count_actors_per_worker(
2019                                    &available_worker_slots,
2020                                    target_slot_count,
2021                                );
2022
2023                                target_plan.insert(
2024                                    fragment_id,
2025                                    Self::diff_worker_slot_changes(
2026                                        &fragment_slots,
2027                                        &target_worker_slots,
2028                                    ),
2029                                );
2030                            } else {
2031                                let available_worker_slots = available_worker_slots
2032                                    .iter()
2033                                    .map(|(worker_id, v)| (*worker_id, v.get()))
2034                                    .collect();
2035
2036                                target_plan.insert(
2037                                    fragment_id,
2038                                    Self::diff_worker_slot_changes(
2039                                        &fragment_slots,
2040                                        &available_worker_slots,
2041                                    ),
2042                                );
2043                            }
2044                        }
2045                        TableParallelism::Fixed(mut n) => {
2046                            if n > max_parallelism {
2047                                tracing::warn!(
2048                                    "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2049                                );
2050                                n = max_parallelism
2051                            }
2052
2053                            let target_worker_slots =
2054                                assigner.count_actors_per_worker(&available_worker_slots, n);
2055
2056                            target_plan.insert(
2057                                fragment_id,
2058                                Self::diff_worker_slot_changes(
2059                                    &fragment_slots,
2060                                    &target_worker_slots,
2061                                ),
2062                            );
2063                        }
2064                        TableParallelism::Custom => {
2065                            // skipping for custom
2066                        }
2067                    },
2068                }
2069            }
2070        }
2071
2072        target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2073        tracing::debug!(
2074            ?target_plan,
2075            "generate_table_resize_plan finished target_plan"
2076        );
2077        if generate_plan_for_cdc_table_backfill {
2078            job_reschedule_post_updates.resource_group_updates = HashMap::default();
2079            job_reschedule_post_updates.parallelism_updates = HashMap::default();
2080        }
2081        Ok(JobReschedulePlan {
2082            reschedules: target_plan,
2083            post_updates: job_reschedule_post_updates,
2084        })
2085    }
2086
2087    fn diff_worker_slot_changes(
2088        fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2089        target_worker_slots: &BTreeMap<WorkerId, usize>,
2090    ) -> WorkerReschedule {
2091        let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2092        let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2093
2094        for (&worker_id, &target_slots) in target_worker_slots {
2095            let &current_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2096
2097            if target_slots > current_slots {
2098                increased_actor_count.insert(worker_id, target_slots - current_slots);
2099            }
2100        }
2101
2102        for (&worker_id, &current_slots) in fragment_worker_slots {
2103            let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2104
2105            if current_slots > target_slots {
2106                decreased_actor_count.insert(worker_id, current_slots - target_slots);
2107            }
2108        }
2109
2110        let worker_ids: HashSet<_> = increased_actor_count
2111            .keys()
2112            .chain(decreased_actor_count.keys())
2113            .cloned()
2114            .collect();
2115
2116        let mut worker_actor_diff = BTreeMap::new();
2117
2118        for worker_id in worker_ids {
2119            let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2120            let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2121            let change = increased - decreased;
2122
2123            assert_ne!(change, 0);
2124
2125            worker_actor_diff.insert(worker_id, change);
2126        }
2127
2128        WorkerReschedule { worker_actor_diff }
2129    }
2130
2131    fn build_no_shuffle_relation_index(
2132        actor_map: &HashMap<ActorId, CustomActorInfo>,
2133        no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2134        no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2135    ) {
2136        let mut fragment_cache = HashSet::new();
2137        for actor in actor_map.values() {
2138            if fragment_cache.contains(&actor.fragment_id) {
2139                continue;
2140            }
2141
2142            for dispatcher in &actor.dispatcher {
2143                for downstream_actor_id in &dispatcher.downstream_actor_id {
2144                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2145                        // Checking for no shuffle dispatchers
2146                        if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2147                            no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2148                            no_shuffle_target_fragment_ids
2149                                .insert(downstream_actor.fragment_id as FragmentId);
2150                        }
2151                    }
2152                }
2153            }
2154
2155            fragment_cache.insert(actor.fragment_id);
2156        }
2157    }
2158
2159    fn build_fragment_dispatcher_index(
2160        actor_map: &HashMap<ActorId, CustomActorInfo>,
2161        fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2162    ) {
2163        for actor in actor_map.values() {
2164            for dispatcher in &actor.dispatcher {
2165                for downstream_actor_id in &dispatcher.downstream_actor_id {
2166                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2167                        fragment_dispatcher_map
2168                            .entry(actor.fragment_id as FragmentId)
2169                            .or_default()
2170                            .insert(
2171                                downstream_actor.fragment_id as FragmentId,
2172                                dispatcher.r#type().into(),
2173                            );
2174                    }
2175                }
2176            }
2177        }
2178    }
2179
2180    pub fn resolve_no_shuffle_upstream_tables(
2181        fragment_ids: HashSet<FragmentId>,
2182        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2183        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2184        fragment_to_table: &HashMap<FragmentId, TableId>,
2185        fragment_upstreams: &HashMap<
2186            risingwave_meta_model::FragmentId,
2187            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2188        >,
2189        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2190    ) -> MetaResult<()> {
2191        let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2192
2193        let mut fragment_ids = fragment_ids;
2194
2195        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2196        // for every no_shuffle relation.
2197        while let Some(fragment_id) = queue.pop_front() {
2198            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2199                continue;
2200            }
2201
2202            // for upstream
2203            for upstream_fragment_id in fragment_upstreams
2204                .get(&(fragment_id as _))
2205                .map(|upstreams| upstreams.keys())
2206                .into_iter()
2207                .flatten()
2208            {
2209                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2210                let upstream_fragment_id = &upstream_fragment_id;
2211                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2212                    continue;
2213                }
2214
2215                let table_id = &fragment_to_table[&fragment_id];
2216                let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2217
2218                // Only custom parallelism will be propagated to the no shuffle upstream.
2219                if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2220                    if let Some(upstream_table_parallelism) =
2221                        table_parallelisms.get(upstream_table_id)
2222                    {
2223                        if upstream_table_parallelism != &TableParallelism::Custom {
2224                            bail!(
2225                                "Cannot change upstream table {} from {:?} to {:?}",
2226                                upstream_table_id,
2227                                upstream_table_parallelism,
2228                                TableParallelism::Custom
2229                            )
2230                        }
2231                    } else {
2232                        table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2233                    }
2234                }
2235
2236                fragment_ids.insert(*upstream_fragment_id);
2237                queue.push_back(*upstream_fragment_id);
2238            }
2239        }
2240
2241        let downstream_fragment_ids = fragment_ids
2242            .iter()
2243            .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2244
2245        let downstream_table_ids = downstream_fragment_ids
2246            .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2247            .collect::<HashSet<_>>();
2248
2249        table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2250
2251        Ok(())
2252    }
2253
2254    pub fn resolve_no_shuffle_upstream_fragments<T>(
2255        reschedule: &mut HashMap<FragmentId, T>,
2256        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2257        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2258        fragment_upstreams: &HashMap<
2259            risingwave_meta_model::FragmentId,
2260            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2261        >,
2262    ) -> MetaResult<()>
2263    where
2264        T: Clone + Eq,
2265    {
2266        let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2267
2268        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2269        // for every no_shuffle relation.
2270        while let Some(fragment_id) = queue.pop_front() {
2271            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2272                continue;
2273            }
2274
2275            // for upstream
2276            for upstream_fragment_id in fragment_upstreams
2277                .get(&(fragment_id as _))
2278                .map(|upstreams| upstreams.keys())
2279                .into_iter()
2280                .flatten()
2281            {
2282                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2283                let upstream_fragment_id = &upstream_fragment_id;
2284                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2285                    continue;
2286                }
2287
2288                let reschedule_plan = &reschedule[&fragment_id];
2289
2290                if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2291                    if upstream_reschedule_plan != reschedule_plan {
2292                        bail!(
2293                            "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2294                            fragment_id,
2295                            upstream_fragment_id
2296                        );
2297                    }
2298
2299                    continue;
2300                }
2301
2302                reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2303
2304                queue.push_back(*upstream_fragment_id);
2305            }
2306        }
2307
2308        reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2309
2310        Ok(())
2311    }
2312
2313    pub async fn resolve_related_no_shuffle_jobs(
2314        &self,
2315        jobs: &[TableId],
2316    ) -> MetaResult<HashSet<TableId>> {
2317        let RescheduleWorkingSet { related_jobs, .. } = self
2318            .metadata_manager
2319            .catalog_controller
2320            .resolve_working_set_for_reschedule_tables(
2321                jobs.iter().map(|id| id.table_id as _).collect(),
2322            )
2323            .await?;
2324
2325        Ok(related_jobs
2326            .keys()
2327            .map(|id| TableId::new(*id as _))
2328            .collect())
2329    }
2330}
2331
2332#[derive(Debug, Clone)]
2333pub enum JobParallelismTarget {
2334    Update(TableParallelism),
2335    Refresh,
2336}
2337
2338#[derive(Debug, Clone)]
2339pub enum JobResourceGroupTarget {
2340    Update(Option<String>),
2341    Keep,
2342}
2343
2344#[derive(Debug, Clone)]
2345pub struct JobRescheduleTarget {
2346    pub parallelism: JobParallelismTarget,
2347    pub resource_group: JobResourceGroupTarget,
2348}
2349
2350#[derive(Debug)]
2351pub struct JobReschedulePolicy {
2352    pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2353}
2354
2355// final updates for `post_collect`
2356#[derive(Debug, Clone)]
2357pub struct JobReschedulePostUpdates {
2358    pub parallelism_updates: HashMap<TableId, TableParallelism>,
2359    pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2360}
2361
2362#[derive(Debug)]
2363pub struct JobReschedulePlan {
2364    pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2365    pub post_updates: JobReschedulePostUpdates,
2366}
2367
2368impl GlobalStreamManager {
2369    #[await_tree::instrument("acquire_reschedule_read_guard")]
2370    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2371        self.scale_controller.reschedule_lock.read().await
2372    }
2373
2374    #[await_tree::instrument("acquire_reschedule_write_guard")]
2375    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2376        self.scale_controller.reschedule_lock.write().await
2377    }
2378
2379    /// The entrypoint of rescheduling actors.
2380    ///
2381    /// Used by:
2382    /// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService` (`risectl meta reschedule`)
2383    /// - High-level parallelism control API
2384    ///     * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM`
2385    ///     * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed
2386    pub async fn reschedule_actors(
2387        &self,
2388        database_id: DatabaseId,
2389        plan: JobReschedulePlan,
2390        options: RescheduleOptions,
2391    ) -> MetaResult<()> {
2392        let JobReschedulePlan {
2393            reschedules,
2394            mut post_updates,
2395        } = plan;
2396
2397        let reschedule_fragment = self
2398            .scale_controller
2399            .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2400            .await?;
2401
2402        tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2403
2404        let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2405            .iter()
2406            .flat_map(|(_, reschedule)| {
2407                reschedule
2408                    .upstream_fragment_dispatcher_ids
2409                    .iter()
2410                    .map(|(fragment_id, _)| *fragment_id)
2411                    .chain(reschedule.downstream_fragment_ids.iter().cloned())
2412            })
2413            .collect();
2414
2415        let fragment_actors =
2416            try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2417                let actor_ids = self
2418                    .metadata_manager
2419                    .get_running_actors_of_fragment(*fragment_id)
2420                    .await?;
2421                Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2422            }))
2423            .await?
2424            .into_iter()
2425            .collect();
2426
2427        let command = Command::RescheduleFragment {
2428            reschedules: reschedule_fragment,
2429            fragment_actors,
2430            post_updates,
2431        };
2432
2433        let _guard = self.source_manager.pause_tick().await;
2434
2435        self.barrier_scheduler
2436            .run_command(database_id, command)
2437            .await?;
2438
2439        tracing::info!("reschedule done");
2440
2441        Ok(())
2442    }
2443
2444    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
2445    /// examines if there are any jobs can be scaled, and scales them if found.
2446    ///
2447    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
2448    ///
2449    /// Returns
2450    /// - `Ok(false)` if no jobs can be scaled;
2451    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
2452    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2453        tracing::info!("trigger parallelism control");
2454
2455        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2456
2457        let background_streaming_jobs = self
2458            .metadata_manager
2459            .list_background_creating_jobs()
2460            .await?;
2461
2462        let skipped_jobs = if !background_streaming_jobs.is_empty() {
2463            let jobs = self
2464                .scale_controller
2465                .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2466                .await?;
2467
2468            tracing::info!(
2469                "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2470                background_streaming_jobs,
2471                jobs
2472            );
2473
2474            jobs
2475        } else {
2476            HashSet::new()
2477        };
2478
2479        let job_ids: HashSet<_> = {
2480            let streaming_parallelisms = self
2481                .metadata_manager
2482                .catalog_controller
2483                .get_all_streaming_parallelisms()
2484                .await?;
2485
2486            streaming_parallelisms
2487                .into_iter()
2488                .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2489                .map(|(table_id, _)| table_id)
2490                .collect()
2491        };
2492
2493        let workers = self
2494            .metadata_manager
2495            .cluster_controller
2496            .list_active_streaming_workers()
2497            .await?;
2498
2499        let schedulable_worker_ids: BTreeSet<_> = workers
2500            .iter()
2501            .filter(|worker| {
2502                !worker
2503                    .property
2504                    .as_ref()
2505                    .map(|p| p.is_unschedulable)
2506                    .unwrap_or(false)
2507            })
2508            .map(|worker| worker.id as WorkerId)
2509            .collect();
2510
2511        if job_ids.is_empty() {
2512            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2513            return Ok(false);
2514        }
2515
2516        let batch_size = match self.env.opts.parallelism_control_batch_size {
2517            0 => job_ids.len(),
2518            n => n,
2519        };
2520
2521        tracing::info!(
2522            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2523            job_ids.len(),
2524            batch_size,
2525            schedulable_worker_ids
2526        );
2527
2528        let batches: Vec<_> = job_ids
2529            .into_iter()
2530            .chunks(batch_size)
2531            .into_iter()
2532            .map(|chunk| chunk.collect_vec())
2533            .collect();
2534
2535        let mut reschedules = None;
2536
2537        for batch in batches {
2538            let targets: HashMap<_, _> = batch
2539                .into_iter()
2540                .map(|job_id| {
2541                    (
2542                        job_id as u32,
2543                        JobRescheduleTarget {
2544                            parallelism: JobParallelismTarget::Refresh,
2545                            resource_group: JobResourceGroupTarget::Keep,
2546                        },
2547                    )
2548                })
2549                .collect();
2550
2551            let plan = self
2552                .scale_controller
2553                .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2554                .await?;
2555
2556            if !plan.reschedules.is_empty() {
2557                tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2558                reschedules = Some(plan);
2559                break;
2560            }
2561        }
2562
2563        let Some(plan) = reschedules else {
2564            tracing::info!("no reschedule plan generated");
2565            return Ok(false);
2566        };
2567
2568        // todo
2569        for (database_id, reschedules) in self
2570            .metadata_manager
2571            .split_fragment_map_by_database(plan.reschedules)
2572            .await?
2573        {
2574            self.reschedule_actors(
2575                database_id,
2576                JobReschedulePlan {
2577                    reschedules,
2578                    post_updates: plan.post_updates.clone(),
2579                },
2580                RescheduleOptions {
2581                    resolve_no_shuffle_upstream: false,
2582                    skip_create_new_actors: false,
2583                },
2584            )
2585            .await?;
2586        }
2587
2588        Ok(true)
2589    }
2590
2591    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
2592    async fn run(&self, mut shutdown_rx: Receiver<()>) {
2593        tracing::info!("starting automatic parallelism control monitor");
2594
2595        let check_period =
2596            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2597
2598        let mut ticker = tokio::time::interval_at(
2599            Instant::now()
2600                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2601            check_period,
2602        );
2603        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2604
2605        // waiting for the first tick
2606        ticker.tick().await;
2607
2608        let (local_notification_tx, mut local_notification_rx) =
2609            tokio::sync::mpsc::unbounded_channel();
2610
2611        self.env
2612            .notification_manager()
2613            .insert_local_sender(local_notification_tx);
2614
2615        let worker_nodes = self
2616            .metadata_manager
2617            .list_active_streaming_compute_nodes()
2618            .await
2619            .expect("list active streaming compute nodes");
2620
2621        let mut worker_cache: BTreeMap<_, _> = worker_nodes
2622            .into_iter()
2623            .map(|worker| (worker.id, worker))
2624            .collect();
2625
2626        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2627
2628        let mut should_trigger = false;
2629
2630        loop {
2631            tokio::select! {
2632                biased;
2633
2634                _ = &mut shutdown_rx => {
2635                    tracing::info!("Stream manager is stopped");
2636                    break;
2637                }
2638
2639                _ = ticker.tick(), if should_trigger => {
2640                    let include_workers = worker_cache.keys().copied().collect_vec();
2641
2642                    if include_workers.is_empty() {
2643                        tracing::debug!("no available worker nodes");
2644                        should_trigger = false;
2645                        continue;
2646                    }
2647
2648                    match self.trigger_parallelism_control().await {
2649                        Ok(cont) => {
2650                            should_trigger = cont;
2651                        }
2652                        Err(e) => {
2653                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2654                            ticker.reset();
2655                        }
2656                    }
2657                }
2658
2659                notification = local_notification_rx.recv() => {
2660                    let notification = notification.expect("local notification channel closed in loop of stream manager");
2661
2662                    // Only maintain the cache for streaming compute nodes.
2663                    let worker_is_streaming_compute = |worker: &WorkerNode| {
2664                        worker.get_type() == Ok(WorkerType::ComputeNode)
2665                            && worker.property.as_ref().unwrap().is_streaming
2666                    };
2667
2668                    match notification {
2669                        LocalNotification::SystemParamsChange(reader) => {
2670                            let new_strategy = reader.adaptive_parallelism_strategy();
2671                            if new_strategy != previous_adaptive_parallelism_strategy {
2672                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2673                                should_trigger = true;
2674                                previous_adaptive_parallelism_strategy = new_strategy;
2675                            }
2676                        }
2677                        LocalNotification::WorkerNodeActivated(worker) => {
2678                            if !worker_is_streaming_compute(&worker) {
2679                                continue;
2680                            }
2681
2682                            tracing::info!(worker = worker.id, "worker activated notification received");
2683
2684                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
2685
2686                            match prev_worker {
2687                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
2688                                    tracing::info!(worker = worker.id, "worker parallelism changed");
2689                                    should_trigger = true;
2690                                }
2691                                Some(prev_worker) if  prev_worker.resource_group() != worker.resource_group()  => {
2692                                    tracing::info!(worker = worker.id, "worker label changed");
2693                                    should_trigger = true;
2694                                }
2695                                None => {
2696                                    tracing::info!(worker = worker.id, "new worker joined");
2697                                    should_trigger = true;
2698                                }
2699                                _ => {}
2700                            }
2701                        }
2702
2703                        // Since our logic for handling passive scale-in is within the barrier manager,
2704                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
2705                        LocalNotification::WorkerNodeDeleted(worker) => {
2706                            if !worker_is_streaming_compute(&worker) {
2707                                continue;
2708                            }
2709
2710                            match worker_cache.remove(&worker.id) {
2711                                Some(prev_worker) => {
2712                                    tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2713                                }
2714                                None => {
2715                                    tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2716                                }
2717                            }
2718                        }
2719
2720                        _ => {}
2721                    }
2722                }
2723            }
2724        }
2725    }
2726
2727    pub fn start_auto_parallelism_monitor(
2728        self: Arc<Self>,
2729    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2730        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2731        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2732        let join_handle = tokio::spawn(async move {
2733            self.run(shutdown_rx).await;
2734        });
2735
2736        (join_handle, shutdown_tx)
2737    }
2738}