risingwave_meta/stream/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36    FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50    ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53    ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64    pub fragment_id: u32,
65    pub fragment_type_mask: FragmentTypeMask,
66    pub distribution_type: PbFragmentDistributionType,
67    pub state_table_ids: Vec<u32>,
68    pub node: StreamNode,
69    pub actor_template: StreamActorWithDispatchers,
70    pub actors: Vec<CustomActorInfo>,
71}
72
73#[derive(Default, Clone)]
74pub struct CustomActorInfo {
75    pub actor_id: u32,
76    pub fragment_id: u32,
77    pub dispatcher: Vec<Dispatcher>,
78    /// `None` if singleton.
79    pub vnode_bitmap: Option<Bitmap>,
80}
81
82use educe::Educe;
83use futures::future::try_join_all;
84use risingwave_common::system_param::AdaptiveParallelismStrategy;
85use risingwave_common::system_param::reader::SystemParamsRead;
86use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
87use risingwave_meta_model::DispatcherType;
88use risingwave_pb::stream_plan::stream_node::NodeBody;
89
90use super::SourceChange;
91use crate::controller::id::IdCategory;
92use crate::controller::utils::filter_workers_by_resource_group;
93
94// The debug implementation is arbitrary. Just used in debug logs.
95#[derive(Educe)]
96#[educe(Debug)]
97pub struct RescheduleContext {
98    /// Meta information for all Actors
99    #[educe(Debug(ignore))]
100    actor_map: HashMap<ActorId, CustomActorInfo>,
101    /// Status of all Actors, used to find the location of the `Actor`
102    actor_status: BTreeMap<ActorId, WorkerId>,
103    /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor`
104    #[educe(Debug(ignore))]
105    fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
106    /// Fragments with `StreamSource`
107    stream_source_fragment_ids: HashSet<FragmentId>,
108    /// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment
109    stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
110    /// Target fragments in `NoShuffle` relation
111    no_shuffle_target_fragment_ids: HashSet<FragmentId>,
112    /// Source fragments in `NoShuffle` relation
113    no_shuffle_source_fragment_ids: HashSet<FragmentId>,
114    // index for dispatcher type from upstream fragment to downstream fragment
115    fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
116    fragment_upstreams: HashMap<
117        risingwave_meta_model::FragmentId,
118        HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
119    >,
120}
121
122impl RescheduleContext {
123    fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
124        self.actor_status
125            .get(actor_id)
126            .cloned()
127            .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
128    }
129}
130
131/// This function provides an simple balancing method
132/// The specific process is as follows
133///
134/// 1. Calculate the number of target actors, and calculate the average value and the remainder, and
135///    use the average value as expected.
136///
137/// 2. Filter out the actor to be removed and the actor to be retained, and sort them from largest
138///    to smallest (according to the number of virtual nodes held).
139///
140/// 3. Calculate their balance, 1) For the actors to be removed, the number of virtual nodes per
141///    actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the
142///    balance. 3) For newly created actors, -expected is the balance (always negative).
143///
144/// 4. Allocate the remainder, high priority to newly created nodes.
145///
146/// 5. After that, merge removed, retained and created into a queue, with the head of the queue
147///    being the source, and move the virtual nodes to the destination at the end of the queue.
148///
149/// This can handle scale in, scale out, migration, and simultaneous scaling with as much affinity
150/// as possible.
151///
152/// Note that this function can only rebalance actors whose `vnode_bitmap` is not `None`, in other
153/// words, for `Fragment` of `FragmentDistributionType::Single`, using this function will cause
154/// assert to fail and should be skipped from the upper level.
155///
156/// The return value is the bitmap distribution after scaling, which covers all virtual node indexes
157pub fn rebalance_actor_vnode(
158    actors: &[CustomActorInfo],
159    actors_to_remove: &BTreeSet<ActorId>,
160    actors_to_create: &BTreeSet<ActorId>,
161) -> HashMap<ActorId, Bitmap> {
162    let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
163
164    assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
165    assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
166
167    assert!(actors.len() >= actors_to_remove.len());
168
169    let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
170    assert!(target_actor_count > 0);
171
172    // `vnode_bitmap` must be set on distributed fragments.
173    let vnode_count = actors[0]
174        .vnode_bitmap
175        .as_ref()
176        .expect("vnode bitmap unset")
177        .len();
178
179    // represents the balance of each actor, used to sort later
180    #[derive(Debug)]
181    struct Balance {
182        actor_id: ActorId,
183        balance: i32,
184        builder: BitmapBuilder,
185    }
186    let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
187
188    tracing::debug!(
189        "expected {}, remain {}, prev actors {}, target actors {}",
190        expected,
191        remain,
192        actors.len(),
193        target_actor_count,
194    );
195
196    let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
197        .iter()
198        .map(|actor| {
199            (
200                actor.actor_id as ActorId,
201                actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
202            )
203        })
204        .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
205
206    let order_by_bitmap_desc =
207        |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
208            bitmap_a
209                .count_ones()
210                .cmp(&bitmap_b.count_ones())
211                .reverse()
212                .then(id_a.cmp(id_b))
213        };
214
215    let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
216        let mut builder = BitmapBuilder::default();
217        builder.append_bitmap(bitmap);
218        builder
219    };
220
221    let (prev_expected, _) = vnode_count.div_rem(&actors.len());
222
223    let prev_remain = removed
224        .iter()
225        .map(|(_, bitmap)| {
226            assert!(bitmap.count_ones() >= prev_expected);
227            bitmap.count_ones() - prev_expected
228        })
229        .sum::<usize>();
230
231    removed.sort_by(order_by_bitmap_desc);
232    rest.sort_by(order_by_bitmap_desc);
233
234    let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
235        actor_id,
236        balance: bitmap.count_ones() as i32,
237        builder: builder_from_bitmap(&bitmap),
238    });
239
240    let mut rest_balances = rest
241        .into_iter()
242        .map(|(actor_id, bitmap)| Balance {
243            actor_id,
244            balance: bitmap.count_ones() as i32 - expected as i32,
245            builder: builder_from_bitmap(&bitmap),
246        })
247        .collect_vec();
248
249    let mut created_balances = actors_to_create
250        .iter()
251        .map(|actor_id| Balance {
252            actor_id: *actor_id,
253            balance: -(expected as i32),
254            builder: BitmapBuilder::zeroed(vnode_count),
255        })
256        .collect_vec();
257
258    for balance in created_balances
259        .iter_mut()
260        .rev()
261        .take(prev_remain)
262        .chain(rest_balances.iter_mut())
263    {
264        if remain > 0 {
265            balance.balance -= 1;
266            remain -= 1;
267        }
268    }
269
270    // consume the rest `remain`
271    for balance in &mut created_balances {
272        if remain > 0 {
273            balance.balance -= 1;
274            remain -= 1;
275        }
276    }
277
278    assert_eq!(remain, 0);
279
280    let mut v: VecDeque<_> = removed_balances
281        .chain(rest_balances)
282        .chain(created_balances)
283        .collect();
284
285    // We will return the full bitmap here after rebalancing,
286    // if we want to return only the changed actors, filter balance = 0 here
287    let mut result = HashMap::with_capacity(target_actor_count);
288
289    for balance in &v {
290        tracing::debug!(
291            "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
292            balance.actor_id,
293            balance.balance,
294            actors_to_remove.contains(&balance.actor_id),
295            actors_to_create.contains(&balance.actor_id)
296        );
297    }
298
299    while !v.is_empty() {
300        if v.len() == 1 {
301            let single = v.pop_front().unwrap();
302            assert_eq!(single.balance, 0);
303            if !actors_to_remove.contains(&single.actor_id) {
304                result.insert(single.actor_id, single.builder.finish());
305            }
306
307            continue;
308        }
309
310        let mut src = v.pop_front().unwrap();
311        let mut dst = v.pop_back().unwrap();
312
313        let n = min(abs(src.balance), abs(dst.balance));
314
315        let mut moved = 0;
316        for idx in (0..vnode_count).rev() {
317            if moved >= n {
318                break;
319            }
320
321            if src.builder.is_set(idx) {
322                src.builder.set(idx, false);
323                assert!(!dst.builder.is_set(idx));
324                dst.builder.set(idx, true);
325                moved += 1;
326            }
327        }
328
329        src.balance -= n;
330        dst.balance += n;
331
332        if src.balance != 0 {
333            v.push_front(src);
334        } else if !actors_to_remove.contains(&src.actor_id) {
335            result.insert(src.actor_id, src.builder.finish());
336        }
337
338        if dst.balance != 0 {
339            v.push_back(dst);
340        } else {
341            result.insert(dst.actor_id, dst.builder.finish());
342        }
343    }
344
345    result
346}
347
348#[derive(Debug, Clone, Copy)]
349pub struct RescheduleOptions {
350    /// Whether to resolve the upstream of `NoShuffle` when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree.
351    pub resolve_no_shuffle_upstream: bool,
352
353    /// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created.
354    pub skip_create_new_actors: bool,
355}
356
357pub type ScaleControllerRef = Arc<ScaleController>;
358
359pub struct ScaleController {
360    pub metadata_manager: MetadataManager,
361
362    pub source_manager: SourceManagerRef,
363
364    pub env: MetaSrvEnv,
365
366    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
367    /// e.g., a MV cannot be rescheduled during foreground backfill.
368    pub reschedule_lock: RwLock<()>,
369}
370
371impl ScaleController {
372    pub fn new(
373        metadata_manager: &MetadataManager,
374        source_manager: SourceManagerRef,
375        env: MetaSrvEnv,
376    ) -> Self {
377        Self {
378            metadata_manager: metadata_manager.clone(),
379            source_manager,
380            env,
381            reschedule_lock: RwLock::new(()),
382        }
383    }
384
385    pub async fn integrity_check(&self) -> MetaResult<()> {
386        self.metadata_manager
387            .catalog_controller
388            .integrity_check()
389            .await
390    }
391
392    /// Build the context for rescheduling and do some validation for the request.
393    async fn build_reschedule_context(
394        &self,
395        reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
396        options: RescheduleOptions,
397        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
398    ) -> MetaResult<RescheduleContext> {
399        let worker_nodes: HashMap<WorkerId, WorkerNode> = self
400            .metadata_manager
401            .list_active_streaming_compute_nodes()
402            .await?
403            .into_iter()
404            .map(|worker_node| (worker_node.id as _, worker_node))
405            .collect();
406
407        if worker_nodes.is_empty() {
408            bail!("no available compute node in the cluster");
409        }
410
411        // Check if we are trying to move a fragment to a node marked as unschedulable
412        let unschedulable_worker_ids: HashSet<_> = worker_nodes
413            .values()
414            .filter(|w| {
415                w.property
416                    .as_ref()
417                    .map(|property| property.is_unschedulable)
418                    .unwrap_or(false)
419            })
420            .map(|worker| worker.id as WorkerId)
421            .collect();
422
423        for (fragment_id, reschedule) in &*reschedule {
424            for (worker_id, change) in &reschedule.worker_actor_diff {
425                if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
426                    bail!(
427                        "unable to move fragment {} to unschedulable worker {}",
428                        fragment_id,
429                        worker_id
430                    );
431                }
432            }
433        }
434
435        // FIXME: the same as anther place calling `list_table_fragments` in scaling.
436        // Index for StreamActor
437        let mut actor_map = HashMap::new();
438        // Index for Fragment
439        let mut fragment_map = HashMap::new();
440        // Index for actor status, including actor's worker id
441        let mut actor_status = BTreeMap::new();
442        let mut fragment_state = HashMap::new();
443        let mut fragment_to_table = HashMap::new();
444
445        fn fulfill_index_by_fragment_ids(
446            actor_map: &mut HashMap<u32, CustomActorInfo>,
447            fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
448            actor_status: &mut BTreeMap<ActorId, WorkerId>,
449            fragment_state: &mut HashMap<FragmentId, State>,
450            fragment_to_table: &mut HashMap<FragmentId, TableId>,
451            fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
452            actors: HashMap<ActorId, actor::Model>,
453            mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
454            related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
455        ) {
456            let mut fragment_actors: HashMap<
457                risingwave_meta_model::FragmentId,
458                Vec<CustomActorInfo>,
459            > = HashMap::new();
460
461            let mut expr_contexts = HashMap::new();
462            for (
463                _,
464                actor::Model {
465                    actor_id,
466                    fragment_id,
467                    status: _,
468                    splits: _,
469                    worker_id,
470                    vnode_bitmap,
471                    expr_context,
472                    ..
473                },
474            ) in actors
475            {
476                let dispatchers = actor_dispatchers
477                    .remove(&(actor_id as _))
478                    .unwrap_or_default();
479
480                let actor_info = CustomActorInfo {
481                    actor_id: actor_id as _,
482                    fragment_id: fragment_id as _,
483                    dispatcher: dispatchers,
484                    vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
485                };
486
487                actor_map.insert(actor_id as _, actor_info.clone());
488
489                fragment_actors
490                    .entry(fragment_id as _)
491                    .or_default()
492                    .push(actor_info);
493
494                actor_status.insert(actor_id as _, worker_id as WorkerId);
495
496                expr_contexts.insert(actor_id as u32, expr_context);
497            }
498
499            for (
500                _,
501                fragment::Model {
502                    fragment_id,
503                    job_id,
504                    fragment_type_mask,
505                    distribution_type,
506                    stream_node,
507                    state_table_ids,
508                    ..
509                },
510            ) in fragments
511            {
512                let actors = fragment_actors
513                    .remove(&(fragment_id as _))
514                    .unwrap_or_default();
515
516                let CustomActorInfo {
517                    actor_id,
518                    fragment_id,
519                    dispatcher,
520                    vnode_bitmap,
521                } = actors.first().unwrap().clone();
522
523                let (related_job, job_definition) =
524                    related_jobs.get(&job_id).expect("job not found");
525
526                let fragment = CustomFragmentInfo {
527                    fragment_id: fragment_id as _,
528                    fragment_type_mask: fragment_type_mask.into(),
529                    distribution_type: distribution_type.into(),
530                    state_table_ids: state_table_ids.into_u32_array(),
531                    node: stream_node.to_protobuf(),
532                    actor_template: (
533                        StreamActor {
534                            actor_id,
535                            fragment_id: fragment_id as _,
536                            vnode_bitmap,
537                            mview_definition: job_definition.to_owned(),
538                            expr_context: expr_contexts
539                                .get(&actor_id)
540                                .cloned()
541                                .map(|expr_context| expr_context.to_protobuf()),
542                        },
543                        dispatcher,
544                    ),
545                    actors,
546                };
547
548                fragment_map.insert(fragment_id as _, fragment);
549
550                fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
551
552                fragment_state.insert(
553                    fragment_id,
554                    table_fragments::PbState::from(related_job.job_status),
555                );
556            }
557        }
558        let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
559        let working_set = self
560            .metadata_manager
561            .catalog_controller
562            .resolve_working_set_for_reschedule_fragments(fragment_ids)
563            .await?;
564
565        fulfill_index_by_fragment_ids(
566            &mut actor_map,
567            &mut fragment_map,
568            &mut actor_status,
569            &mut fragment_state,
570            &mut fragment_to_table,
571            working_set.fragments,
572            working_set.actors,
573            working_set.actor_dispatchers,
574            working_set.related_jobs,
575        );
576
577        // NoShuffle relation index
578        let mut no_shuffle_source_fragment_ids = HashSet::new();
579        let mut no_shuffle_target_fragment_ids = HashSet::new();
580
581        Self::build_no_shuffle_relation_index(
582            &actor_map,
583            &mut no_shuffle_source_fragment_ids,
584            &mut no_shuffle_target_fragment_ids,
585        );
586
587        if options.resolve_no_shuffle_upstream {
588            let original_reschedule_keys = reschedule.keys().cloned().collect();
589
590            Self::resolve_no_shuffle_upstream_fragments(
591                reschedule,
592                &no_shuffle_source_fragment_ids,
593                &no_shuffle_target_fragment_ids,
594                &working_set.fragment_upstreams,
595            )?;
596
597            if !table_parallelisms.is_empty() {
598                // We need to reiterate through the NO_SHUFFLE dependencies in order to ascertain which downstream table the custom modifications of the table have been propagated from.
599                Self::resolve_no_shuffle_upstream_tables(
600                    original_reschedule_keys,
601                    &no_shuffle_source_fragment_ids,
602                    &no_shuffle_target_fragment_ids,
603                    &fragment_to_table,
604                    &working_set.fragment_upstreams,
605                    table_parallelisms,
606                )?;
607            }
608        }
609
610        let mut fragment_dispatcher_map = HashMap::new();
611        Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
612
613        let mut stream_source_fragment_ids = HashSet::new();
614        let mut stream_source_backfill_fragment_ids = HashMap::new();
615        let mut no_shuffle_reschedule = HashMap::new();
616        for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
617            let fragment = fragment_map
618                .get(fragment_id)
619                .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
620
621            // Check if the rescheduling is supported.
622            match fragment_state[fragment_id] {
623                table_fragments::State::Unspecified => unreachable!(),
624                state @ table_fragments::State::Initial => {
625                    bail!(
626                        "the materialized view of fragment {fragment_id} is in state {}",
627                        state.as_str_name()
628                    )
629                }
630                state @ table_fragments::State::Creating => {
631                    let stream_node = &fragment.node;
632
633                    let mut is_reschedulable = true;
634                    visit_stream_node_cont(stream_node, |body| {
635                        if let Some(NodeBody::StreamScan(node)) = &body.node_body {
636                            if !node.stream_scan_type().is_reschedulable() {
637                                is_reschedulable = false;
638
639                                // fail fast
640                                return false;
641                            }
642
643                            // continue visiting
644                            return true;
645                        }
646
647                        // continue visiting
648                        true
649                    });
650
651                    if !is_reschedulable {
652                        bail!(
653                            "the materialized view of fragment {fragment_id} is in state {}",
654                            state.as_str_name()
655                        )
656                    }
657                }
658                table_fragments::State::Created => {}
659            }
660
661            if no_shuffle_target_fragment_ids.contains(fragment_id) {
662                bail!(
663                    "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
664                );
665            }
666
667            // For the relation of NoShuffle (e.g. Materialize and Chain), we need a special
668            // treatment because the upstream and downstream of NoShuffle are always 1-1
669            // correspondence, so we need to clone the reschedule plan to the downstream of all
670            // cascading relations.
671            if no_shuffle_source_fragment_ids.contains(fragment_id) {
672                // This fragment is a NoShuffle's upstream.
673                let mut queue: VecDeque<_> = fragment_dispatcher_map
674                    .get(fragment_id)
675                    .unwrap()
676                    .keys()
677                    .cloned()
678                    .collect();
679
680                while let Some(downstream_id) = queue.pop_front() {
681                    if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
682                        continue;
683                    }
684
685                    if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
686                    {
687                        let no_shuffle_downstreams = downstream_fragments
688                            .iter()
689                            .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
690                            .map(|(fragment_id, _)| fragment_id);
691
692                        queue.extend(no_shuffle_downstreams.copied());
693                    }
694
695                    no_shuffle_reschedule.insert(
696                        downstream_id,
697                        WorkerReschedule {
698                            worker_actor_diff: worker_actor_diff.clone(),
699                        },
700                    );
701                }
702            }
703
704            if fragment
705                .fragment_type_mask
706                .contains(FragmentTypeFlag::Source)
707                && fragment.node.find_stream_source().is_some()
708            {
709                stream_source_fragment_ids.insert(*fragment_id);
710            }
711
712            // Check if the reschedule plan is valid.
713            let current_worker_ids = fragment
714                .actors
715                .iter()
716                .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
717                .collect::<HashSet<_>>();
718
719            for (removed, change) in worker_actor_diff {
720                if !current_worker_ids.contains(removed) && change.is_negative() {
721                    bail!(
722                        "no actor on the worker {} of fragment {}",
723                        removed,
724                        fragment_id
725                    );
726                }
727            }
728
729            let added_actor_count: usize = worker_actor_diff
730                .values()
731                .filter(|change| change.is_positive())
732                .cloned()
733                .map(|change| change as usize)
734                .sum();
735
736            let removed_actor_count: usize = worker_actor_diff
737                .values()
738                .filter(|change| change.is_positive())
739                .cloned()
740                .map(|v| v.unsigned_abs())
741                .sum();
742
743            match fragment.distribution_type {
744                FragmentDistributionType::Hash => {
745                    if fragment.actors.len() + added_actor_count <= removed_actor_count {
746                        bail!("can't remove all actors from fragment {}", fragment_id);
747                    }
748                }
749                FragmentDistributionType::Single => {
750                    if added_actor_count != removed_actor_count {
751                        bail!("single distribution fragment only support migration");
752                    }
753                }
754                FragmentDistributionType::Unspecified => unreachable!(),
755            }
756        }
757
758        if !no_shuffle_reschedule.is_empty() {
759            tracing::info!(
760                "reschedule plan rewritten with NoShuffle reschedule {:?}",
761                no_shuffle_reschedule
762            );
763
764            for noshuffle_downstream in no_shuffle_reschedule.keys() {
765                let fragment = fragment_map.get(noshuffle_downstream).unwrap();
766                // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
767                if fragment
768                    .fragment_type_mask
769                    .contains(FragmentTypeFlag::SourceScan)
770                {
771                    let stream_node = &fragment.node;
772                    if let Some((_source_id, upstream_source_fragment_id)) =
773                        stream_node.find_source_backfill()
774                    {
775                        stream_source_backfill_fragment_ids
776                            .insert(fragment.fragment_id, upstream_source_fragment_id);
777                    }
778                }
779            }
780        }
781
782        // Modifications for NoShuffle downstream.
783        reschedule.extend(no_shuffle_reschedule.into_iter());
784
785        Ok(RescheduleContext {
786            actor_map,
787            actor_status,
788            fragment_map,
789            stream_source_fragment_ids,
790            stream_source_backfill_fragment_ids,
791            no_shuffle_target_fragment_ids,
792            no_shuffle_source_fragment_ids,
793            fragment_dispatcher_map,
794            fragment_upstreams: working_set.fragment_upstreams,
795        })
796    }
797
798    /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`].
799    ///
800    /// Returns `(reschedule_fragment, applied_reschedules)`
801    /// - `reschedule_fragment`: the generated reschedule plan
802    /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1).
803    ///
804    /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors`), we use the returned values to
805    /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling.
806    /// Meta store is updated after the barrier is collected.
807    ///
808    /// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta.
809    pub(crate) async fn analyze_reschedule_plan(
810        &self,
811        mut reschedules: HashMap<FragmentId, WorkerReschedule>,
812        options: RescheduleOptions,
813        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
814    ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
815        tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
816        let ctx = self
817            .build_reschedule_context(&mut reschedules, options, table_parallelisms)
818            .await?;
819        tracing::debug!("reschedule context: {:#?}", ctx);
820        let reschedules = reschedules;
821
822        // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.
823
824        // Index of actors to create/remove
825        // Fragment Id => ( Actor Id => Worker Id )
826        let (fragment_actors_to_remove, fragment_actors_to_create) =
827            self.arrange_reschedules(&reschedules, &ctx)?;
828
829        let mut fragment_actor_bitmap = HashMap::new();
830        for fragment_id in reschedules.keys() {
831            if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
832                // skipping chain fragment, we need to clone the upstream materialize fragment's
833                // mapping later
834                continue;
835            }
836
837            let actors_to_create = fragment_actors_to_create
838                .get(fragment_id)
839                .map(|map| map.keys().copied().collect())
840                .unwrap_or_default();
841
842            let actors_to_remove = fragment_actors_to_remove
843                .get(fragment_id)
844                .map(|map| map.keys().copied().collect())
845                .unwrap_or_default();
846
847            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
848
849            match fragment.distribution_type {
850                FragmentDistributionType::Single => {
851                    // Skip re-balancing action for single distribution (always None)
852                    fragment_actor_bitmap
853                        .insert(fragment.fragment_id as FragmentId, Default::default());
854                }
855                FragmentDistributionType::Hash => {
856                    let actor_vnode = rebalance_actor_vnode(
857                        &fragment.actors,
858                        &actors_to_remove,
859                        &actors_to_create,
860                    );
861
862                    fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
863                }
864
865                FragmentDistributionType::Unspecified => unreachable!(),
866            }
867        }
868
869        // Index for fragment -> { actor -> worker_id } after reschedule.
870        // Since we need to organize the upstream and downstream relationships of NoShuffle,
871        // we need to organize the actor distribution after a scaling.
872        let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
873        for fragment_id in reschedules.keys() {
874            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
875            let mut new_actor_ids = BTreeMap::new();
876            for actor in &fragment.actors {
877                if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
878                    && actors_to_remove.contains_key(&actor.actor_id)
879                {
880                    continue;
881                }
882                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
883                new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
884            }
885
886            if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
887                for (actor_id, worker_id) in actors_to_create {
888                    new_actor_ids.insert(*actor_id, *worker_id);
889                }
890            }
891
892            assert!(
893                !new_actor_ids.is_empty(),
894                "should be at least one actor in fragment {} after rescheduling",
895                fragment_id
896            );
897
898            fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
899        }
900
901        let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
902
903        // In order to maintain consistency with the original structure, the upstream and downstream
904        // actors of NoShuffle need to be in the same worker slot and hold the same virtual nodes,
905        // so for the actors after the upstream re-balancing, since we have sorted the actors of the same fragment by id on all workers,
906        // we can identify the corresponding upstream actor with NO_SHUFFLE.
907        // NOTE: There should be more asserts here to ensure correctness.
908        fn arrange_no_shuffle_relation(
909            ctx: &RescheduleContext,
910            fragment_id: &FragmentId,
911            upstream_fragment_id: &FragmentId,
912            fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
913            actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
914            fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
915            no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
916            no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
917        ) {
918            if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
919                return;
920            }
921
922            let fragment = &ctx.fragment_map[fragment_id];
923
924            let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
925
926            // build actor group map
927            for upstream_actor in &upstream_fragment.actors {
928                for dispatcher in &upstream_actor.dispatcher {
929                    if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
930                        let downstream_actor_id =
931                            *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
932
933                        // upstream is root
934                        if !ctx
935                            .no_shuffle_target_fragment_ids
936                            .contains(upstream_fragment_id)
937                        {
938                            actor_group_map.insert(
939                                upstream_actor.actor_id,
940                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
941                            );
942                            actor_group_map.insert(
943                                downstream_actor_id,
944                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
945                            );
946                        } else {
947                            let root_actor_id = actor_group_map[&upstream_actor.actor_id];
948
949                            actor_group_map.insert(downstream_actor_id, root_actor_id);
950                        }
951                    }
952                }
953            }
954
955            // If the upstream is a Singleton Fragment, there will be no Bitmap changes
956            let upstream_fragment_bitmap = fragment_updated_bitmap
957                .get(upstream_fragment_id)
958                .cloned()
959                .unwrap_or_default();
960
961            // Question: Is it possible to have Hash Distribution Fragment but the Actor's bitmap remains unchanged?
962            if upstream_fragment.distribution_type == FragmentDistributionType::Single {
963                assert!(
964                    upstream_fragment_bitmap.is_empty(),
965                    "single fragment should have no bitmap updates"
966                );
967            }
968
969            let upstream_fragment_actor_map = fragment_actors_after_reschedule
970                .get(upstream_fragment_id)
971                .cloned()
972                .unwrap();
973
974            let fragment_actor_map = fragment_actors_after_reschedule
975                .get(fragment_id)
976                .cloned()
977                .unwrap();
978
979            let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
980
981            // first, find existing actor bitmap, copy them
982            let mut fragment_bitmap = HashMap::new();
983
984            for (actor_id, worker_id) in &fragment_actor_map {
985                if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
986                    let root_bitmap = fragment_updated_bitmap
987                        .get(root_fragment)
988                        .expect("root fragment bitmap not found")
989                        .get(root_actor_id)
990                        .cloned()
991                        .expect("root actor bitmap not found");
992
993                    // Copy the bitmap
994                    fragment_bitmap.insert(*actor_id, root_bitmap);
995
996                    no_shuffle_upstream_actor_map
997                        .entry(*actor_id as ActorId)
998                        .or_default()
999                        .insert(*upstream_fragment_id, *root_actor_id);
1000                    no_shuffle_downstream_actors_map
1001                        .entry(*root_actor_id)
1002                        .or_default()
1003                        .insert(*fragment_id, *actor_id);
1004                } else {
1005                    worker_reverse_index
1006                        .entry(*worker_id)
1007                        .or_default()
1008                        .insert(*actor_id);
1009                }
1010            }
1011
1012            let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1013
1014            for (actor_id, worker_id) in &upstream_fragment_actor_map {
1015                if !actor_group_map.contains_key(actor_id) {
1016                    upstream_worker_reverse_index
1017                        .entry(*worker_id)
1018                        .or_default()
1019                        .insert(*actor_id);
1020                }
1021            }
1022
1023            // then, find the rest of the actors and copy the bitmap
1024            for (worker_id, actor_ids) in worker_reverse_index {
1025                let upstream_actor_ids = upstream_worker_reverse_index
1026                    .get(&worker_id)
1027                    .unwrap()
1028                    .clone();
1029
1030                assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1031
1032                for (actor_id, upstream_actor_id) in actor_ids
1033                    .into_iter()
1034                    .zip_eq_debug(upstream_actor_ids.into_iter())
1035                {
1036                    match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1037                        None => {
1038                            // single fragment should have no bitmap updates (same as upstream)
1039                            assert_eq!(
1040                                upstream_fragment.distribution_type,
1041                                FragmentDistributionType::Single
1042                            );
1043                        }
1044                        Some(bitmap) => {
1045                            // Copy the bitmap
1046                            fragment_bitmap.insert(actor_id, bitmap);
1047                        }
1048                    }
1049
1050                    no_shuffle_upstream_actor_map
1051                        .entry(actor_id as ActorId)
1052                        .or_default()
1053                        .insert(*upstream_fragment_id, upstream_actor_id);
1054                    no_shuffle_downstream_actors_map
1055                        .entry(upstream_actor_id)
1056                        .or_default()
1057                        .insert(*fragment_id, actor_id);
1058                }
1059            }
1060
1061            match fragment.distribution_type {
1062                FragmentDistributionType::Hash => {}
1063                FragmentDistributionType::Single => {
1064                    // single distribution should update nothing
1065                    assert!(fragment_bitmap.is_empty());
1066                }
1067                FragmentDistributionType::Unspecified => unreachable!(),
1068            }
1069
1070            if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1071                assert_eq!(
1072                    e.entry.get(),
1073                    &e.value,
1074                    "bitmaps derived from different no-shuffle upstreams mismatch"
1075                );
1076            }
1077
1078            // Visit downstream fragments recursively.
1079            if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1080                let no_shuffle_downstreams = downstream_fragments
1081                    .iter()
1082                    .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1083                    .map(|(fragment_id, _)| fragment_id);
1084
1085                for downstream_fragment_id in no_shuffle_downstreams {
1086                    arrange_no_shuffle_relation(
1087                        ctx,
1088                        downstream_fragment_id,
1089                        fragment_id,
1090                        fragment_actors_after_reschedule,
1091                        actor_group_map,
1092                        fragment_updated_bitmap,
1093                        no_shuffle_upstream_actor_map,
1094                        no_shuffle_downstream_actors_map,
1095                    );
1096                }
1097            }
1098        }
1099
1100        let mut no_shuffle_upstream_actor_map = HashMap::new();
1101        let mut no_shuffle_downstream_actors_map = HashMap::new();
1102        let mut actor_group_map = HashMap::new();
1103        // For all roots in the upstream and downstream dependency trees of NoShuffle, recursively
1104        // find all correspondences
1105        for fragment_id in reschedules.keys() {
1106            if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1107                && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1108                && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1109            {
1110                for downstream_fragment_id in downstream_fragments.keys() {
1111                    arrange_no_shuffle_relation(
1112                        &ctx,
1113                        downstream_fragment_id,
1114                        fragment_id,
1115                        &fragment_actors_after_reschedule,
1116                        &mut actor_group_map,
1117                        &mut fragment_actor_bitmap,
1118                        &mut no_shuffle_upstream_actor_map,
1119                        &mut no_shuffle_downstream_actors_map,
1120                    );
1121                }
1122            }
1123        }
1124
1125        tracing::debug!("actor group map {:?}", actor_group_map);
1126
1127        let mut new_created_actors = HashMap::new();
1128        for fragment_id in reschedules.keys() {
1129            let actors_to_create = fragment_actors_to_create
1130                .get(fragment_id)
1131                .cloned()
1132                .unwrap_or_default();
1133
1134            let fragment = &ctx.fragment_map[fragment_id];
1135
1136            assert!(!fragment.actors.is_empty());
1137
1138            for actor_to_create in &actors_to_create {
1139                let new_actor_id = actor_to_create.0;
1140                let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1141
1142                // This should be assigned before the `modify_actor_upstream_and_downstream` call,
1143                // because we need to use the new actor id to find the upstream and
1144                // downstream in the NoShuffle relationship
1145                new_actor.actor_id = *new_actor_id;
1146
1147                Self::modify_actor_upstream_and_downstream(
1148                    &ctx,
1149                    &fragment_actors_to_remove,
1150                    &fragment_actors_to_create,
1151                    &fragment_actor_bitmap,
1152                    &no_shuffle_downstream_actors_map,
1153                    &mut new_actor,
1154                    &mut dispatchers,
1155                )?;
1156
1157                if let Some(bitmap) = fragment_actor_bitmap
1158                    .get(fragment_id)
1159                    .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1160                {
1161                    new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1162                }
1163
1164                new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1165            }
1166        }
1167
1168        // For stream source & source backfill fragments, we need to reallocate the splits.
1169        // Because we are in the Pause state, so it's no problem to reallocate
1170        let mut fragment_actor_splits = HashMap::new();
1171        for fragment_id in reschedules.keys() {
1172            let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1173
1174            if ctx.stream_source_fragment_ids.contains(fragment_id) {
1175                let fragment = &ctx.fragment_map[fragment_id];
1176
1177                let prev_actor_ids = fragment
1178                    .actors
1179                    .iter()
1180                    .map(|actor| actor.actor_id)
1181                    .collect_vec();
1182
1183                let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1184
1185                let actor_splits = self
1186                    .source_manager
1187                    .migrate_splits_for_source_actors(
1188                        *fragment_id,
1189                        &prev_actor_ids,
1190                        &curr_actor_ids,
1191                    )
1192                    .await?;
1193
1194                tracing::debug!(
1195                    "source actor splits: {:?}, fragment_id: {}",
1196                    actor_splits,
1197                    fragment_id
1198                );
1199                fragment_actor_splits.insert(*fragment_id, actor_splits);
1200            }
1201        }
1202        // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
1203        if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1204            for fragment_id in reschedules.keys() {
1205                let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1206
1207                if let Some(upstream_source_fragment_id) =
1208                    ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1209                {
1210                    let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1211
1212                    let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1213                        *fragment_id,
1214                        *upstream_source_fragment_id,
1215                        &curr_actor_ids,
1216                        &fragment_actor_splits,
1217                        &no_shuffle_upstream_actor_map,
1218                    )?;
1219                    tracing::debug!(
1220                        "source backfill actor splits: {:?}, fragment_id: {}",
1221                        actor_splits,
1222                        fragment_id
1223                    );
1224                    fragment_actor_splits.insert(*fragment_id, actor_splits);
1225                }
1226            }
1227        }
1228
1229        // Generate fragment reschedule plan
1230        let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1231            HashMap::with_capacity(reschedules.len());
1232
1233        for (fragment_id, _) in reschedules {
1234            let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1235
1236            if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1237                for (actor_id, worker_id) in actor_worker_maps {
1238                    actors_to_create
1239                        .entry(worker_id)
1240                        .or_default()
1241                        .push(actor_id);
1242                }
1243            }
1244
1245            let actors_to_remove = fragment_actors_to_remove
1246                .get(&fragment_id)
1247                .cloned()
1248                .unwrap_or_default()
1249                .into_keys()
1250                .collect();
1251
1252            let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1253
1254            assert!(!actors_after_reschedule.is_empty());
1255
1256            let fragment = &ctx.fragment_map[&fragment_id];
1257
1258            let in_degree_types: HashSet<_> = ctx
1259                .fragment_upstreams
1260                .get(&(fragment_id as _))
1261                .map(|upstreams| upstreams.values())
1262                .into_iter()
1263                .flatten()
1264                .cloned()
1265                .collect();
1266
1267            let upstream_dispatcher_mapping = match fragment.distribution_type {
1268                FragmentDistributionType::Hash => {
1269                    if !in_degree_types.contains(&DispatcherType::Hash) {
1270                        None
1271                    } else {
1272                        // Changes of the bitmap must occur in the case of HashDistribution
1273                        Some(ActorMapping::from_bitmaps(
1274                            &fragment_actor_bitmap[&fragment_id],
1275                        ))
1276                    }
1277                }
1278
1279                FragmentDistributionType::Single => {
1280                    assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1281                    None
1282                }
1283                FragmentDistributionType::Unspecified => unreachable!(),
1284            };
1285
1286            let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1287
1288            {
1289                if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1290                    for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1291                        match upstream_dispatcher_type {
1292                            DispatcherType::NoShuffle => {}
1293                            _ => {
1294                                upstream_fragment_dispatcher_set.insert((
1295                                    *upstream_fragment_id as FragmentId,
1296                                    fragment.fragment_id as DispatcherId,
1297                                ));
1298                            }
1299                        }
1300                    }
1301                }
1302            }
1303
1304            let downstream_fragment_ids = if let Some(downstream_fragments) =
1305                ctx.fragment_dispatcher_map.get(&fragment_id)
1306            {
1307                // Skip fragments' no-shuffle downstream, as there's no need to update the merger
1308                // (receiver) of a no-shuffle downstream
1309                downstream_fragments
1310                    .iter()
1311                    .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1312                    .map(|(fragment_id, _)| *fragment_id)
1313                    .collect_vec()
1314            } else {
1315                vec![]
1316            };
1317
1318            let vnode_bitmap_updates = match fragment.distribution_type {
1319                FragmentDistributionType::Hash => {
1320                    let mut vnode_bitmap_updates =
1321                        fragment_actor_bitmap.remove(&fragment_id).unwrap();
1322
1323                    // We need to keep the bitmaps from changed actors only,
1324                    // otherwise the barrier will become very large with many actors
1325                    for actor_id in actors_after_reschedule.keys() {
1326                        assert!(vnode_bitmap_updates.contains_key(actor_id));
1327
1328                        // retain actor
1329                        if let Some(actor) = ctx.actor_map.get(actor_id) {
1330                            let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1331
1332                            if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1333                                && prev_bitmap.eq(bitmap)
1334                            {
1335                                vnode_bitmap_updates.remove(actor_id);
1336                            }
1337                        }
1338                    }
1339
1340                    vnode_bitmap_updates
1341                }
1342                FragmentDistributionType::Single => HashMap::new(),
1343                FragmentDistributionType::Unspecified => unreachable!(),
1344            };
1345
1346            let upstream_fragment_dispatcher_ids =
1347                upstream_fragment_dispatcher_set.into_iter().collect_vec();
1348
1349            let actor_splits = fragment_actor_splits
1350                .get(&fragment_id)
1351                .cloned()
1352                .unwrap_or_default();
1353
1354            reschedule_fragment.insert(
1355                fragment_id,
1356                Reschedule {
1357                    added_actors: actors_to_create,
1358                    removed_actors: actors_to_remove,
1359                    vnode_bitmap_updates,
1360                    upstream_fragment_dispatcher_ids,
1361                    upstream_dispatcher_mapping,
1362                    downstream_fragment_ids,
1363                    actor_splits,
1364                    newly_created_actors: Default::default(),
1365                },
1366            );
1367        }
1368
1369        let mut fragment_created_actors = HashMap::new();
1370        for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1371            let mut created_actors = HashMap::new();
1372            for (actor_id, worker_id) in actors_to_create {
1373                let actor = new_created_actors.get(actor_id).cloned().unwrap();
1374                created_actors.insert(*actor_id, (actor, *worker_id));
1375            }
1376
1377            fragment_created_actors.insert(*fragment_id, created_actors);
1378        }
1379
1380        for (fragment_id, to_create) in fragment_created_actors {
1381            let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1382            reschedule.newly_created_actors = to_create;
1383        }
1384        tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1385
1386        Ok(reschedule_fragment)
1387    }
1388
1389    #[expect(clippy::type_complexity)]
1390    fn arrange_reschedules(
1391        &self,
1392        reschedule: &HashMap<FragmentId, WorkerReschedule>,
1393        ctx: &RescheduleContext,
1394    ) -> MetaResult<(
1395        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1396        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1397    )> {
1398        let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1399        let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1400
1401        for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1402            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1403
1404            // Actor Id => Worker Id
1405            let mut actors_to_remove = BTreeMap::new();
1406            let mut actors_to_create = BTreeMap::new();
1407
1408            // NOTE(important): The value needs to be a BTreeSet to ensure that the actors on the worker are sorted in ascending order.
1409            let mut worker_to_actors = HashMap::new();
1410
1411            for actor in &fragment.actors {
1412                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1413                worker_to_actors
1414                    .entry(worker_id)
1415                    .or_insert(BTreeSet::new())
1416                    .insert(actor.actor_id as ActorId);
1417            }
1418
1419            let decreased_actor_count = worker_actor_diff
1420                .iter()
1421                .filter(|(_, change)| change.is_negative())
1422                .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1423
1424            for (worker_id, n) in decreased_actor_count {
1425                if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1426                    if actor_ids.len() < n {
1427                        bail!(
1428                            "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1429                            fragment_id,
1430                            worker_id,
1431                            actor_ids.len(),
1432                            n
1433                        );
1434                    }
1435
1436                    let removed_actors: Vec<_> = actor_ids
1437                        .iter()
1438                        .skip(actor_ids.len().saturating_sub(n))
1439                        .cloned()
1440                        .collect();
1441
1442                    for actor in removed_actors {
1443                        actors_to_remove.insert(actor, *worker_id);
1444                    }
1445                }
1446            }
1447
1448            let increased_actor_count = worker_actor_diff
1449                .iter()
1450                .filter(|(_, change)| change.is_positive());
1451
1452            for (worker, n) in increased_actor_count {
1453                for _ in 0..*n {
1454                    let id = self
1455                        .env
1456                        .id_gen_manager()
1457                        .generate_interval::<{ IdCategory::Actor }>(1)
1458                        as ActorId;
1459                    actors_to_create.insert(id, *worker);
1460                }
1461            }
1462
1463            if !actors_to_remove.is_empty() {
1464                fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1465            }
1466
1467            if !actors_to_create.is_empty() {
1468                fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1469            }
1470        }
1471
1472        // sanity checking
1473        for actors_to_remove in fragment_actors_to_remove.values() {
1474            for actor_id in actors_to_remove.keys() {
1475                let actor = ctx.actor_map.get(actor_id).unwrap();
1476                for dispatcher in &actor.dispatcher {
1477                    if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1478                        let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1479
1480                        let _should_exists = fragment_actors_to_remove
1481                            .get(&(dispatcher.dispatcher_id as FragmentId))
1482                            .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1483                            .get(downstream_actor_id)
1484                            .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1485                    }
1486                }
1487            }
1488        }
1489
1490        Ok((fragment_actors_to_remove, fragment_actors_to_create))
1491    }
1492
1493    /// Modifies the upstream and downstream actors of the new created actor according to the
1494    /// overall changes, and is used to handle cascading updates
1495    fn modify_actor_upstream_and_downstream(
1496        ctx: &RescheduleContext,
1497        fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1498        fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1499        fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1500        no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1501        new_actor: &mut StreamActor,
1502        dispatchers: &mut Vec<PbDispatcher>,
1503    ) -> MetaResult<()> {
1504        // Update downstream actor ids
1505        for dispatcher in dispatchers {
1506            let downstream_fragment_id = dispatcher
1507                .downstream_actor_id
1508                .iter()
1509                .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1510                .dedup()
1511                .exactly_one()
1512                .unwrap() as FragmentId;
1513
1514            let downstream_fragment_actors_to_remove =
1515                fragment_actors_to_remove.get(&downstream_fragment_id);
1516            let downstream_fragment_actors_to_create =
1517                fragment_actors_to_create.get(&downstream_fragment_id);
1518
1519            match dispatcher.r#type() {
1520                d @ (PbDispatcherType::Hash
1521                | PbDispatcherType::Simple
1522                | PbDispatcherType::Broadcast) => {
1523                    if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1524                    {
1525                        dispatcher
1526                            .downstream_actor_id
1527                            .retain(|id| !downstream_actors_to_remove.contains_key(id));
1528                    }
1529
1530                    if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1531                    {
1532                        dispatcher
1533                            .downstream_actor_id
1534                            .extend(downstream_actors_to_create.keys().cloned())
1535                    }
1536
1537                    // There should be still exactly one downstream actor
1538                    if d == PbDispatcherType::Simple {
1539                        assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1540                    }
1541                }
1542                PbDispatcherType::NoShuffle => {
1543                    assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1544                    let downstream_actor_id = no_shuffle_downstream_actors_map
1545                        .get(&new_actor.actor_id)
1546                        .and_then(|map| map.get(&downstream_fragment_id))
1547                        .unwrap();
1548                    dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1549                }
1550                PbDispatcherType::Unspecified => unreachable!(),
1551            }
1552
1553            if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1554                && let Some(downstream_updated_bitmap) =
1555                    fragment_actor_bitmap.get(&downstream_fragment_id)
1556            {
1557                // If downstream scale in/out
1558                *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1559            }
1560        }
1561
1562        Ok(())
1563    }
1564
1565    #[await_tree::instrument]
1566    pub async fn post_apply_reschedule(
1567        &self,
1568        reschedules: &HashMap<FragmentId, Reschedule>,
1569        post_updates: &JobReschedulePostUpdates,
1570    ) -> MetaResult<()> {
1571        // Update fragment info after rescheduling in meta store.
1572        self.metadata_manager
1573            .post_apply_reschedules(reschedules.clone(), post_updates)
1574            .await?;
1575
1576        // Update serving fragment info after rescheduling in meta store.
1577        if !reschedules.is_empty() {
1578            let workers = self
1579                .metadata_manager
1580                .list_active_serving_compute_nodes()
1581                .await?;
1582            let streaming_parallelisms = self
1583                .metadata_manager
1584                .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1585                .await?;
1586            let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1587            let max_serving_parallelism = self
1588                .env
1589                .session_params_manager_impl_ref()
1590                .get_params()
1591                .await
1592                .batch_parallelism()
1593                .map(|p| p.get());
1594            let (upserted, failed) = serving_worker_slot_mapping.upsert(
1595                streaming_parallelisms,
1596                &workers,
1597                max_serving_parallelism,
1598            );
1599            if !upserted.is_empty() {
1600                tracing::debug!(
1601                    "Update serving vnode mapping for fragments {:?}.",
1602                    upserted.keys()
1603                );
1604                self.env
1605                    .notification_manager()
1606                    .notify_frontend_without_version(
1607                        Operation::Update,
1608                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1609                            mappings: to_fragment_worker_slot_mapping(&upserted),
1610                        }),
1611                    );
1612            }
1613            if !failed.is_empty() {
1614                tracing::debug!(
1615                    "Fail to update serving vnode mapping for fragments {:?}.",
1616                    failed
1617                );
1618                self.env
1619                    .notification_manager()
1620                    .notify_frontend_without_version(
1621                        Operation::Delete,
1622                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1623                            mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1624                        }),
1625                    );
1626            }
1627        }
1628
1629        let mut stream_source_actor_splits = HashMap::new();
1630        let mut stream_source_dropped_actors = HashSet::new();
1631
1632        // todo: handle adaptive splits
1633        for (fragment_id, reschedule) in reschedules {
1634            if !reschedule.actor_splits.is_empty() {
1635                stream_source_actor_splits
1636                    .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1637                stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1638            }
1639        }
1640
1641        if !stream_source_actor_splits.is_empty() {
1642            self.source_manager
1643                .apply_source_change(SourceChange::Reschedule {
1644                    split_assignment: stream_source_actor_splits,
1645                    dropped_actors: stream_source_dropped_actors,
1646                })
1647                .await;
1648        }
1649
1650        Ok(())
1651    }
1652
1653    pub async fn generate_job_reschedule_plan(
1654        &self,
1655        policy: JobReschedulePolicy,
1656    ) -> MetaResult<JobReschedulePlan> {
1657        type VnodeCount = usize;
1658
1659        let JobReschedulePolicy { targets } = policy;
1660
1661        let workers = self
1662            .metadata_manager
1663            .list_active_streaming_compute_nodes()
1664            .await?;
1665
1666        // The `schedulable` field should eventually be replaced by resource groups like `unschedulable`
1667        let workers: HashMap<_, _> = workers
1668            .into_iter()
1669            .filter(|worker| worker.is_streaming_schedulable())
1670            .map(|worker| (worker.id, worker))
1671            .collect();
1672
1673        #[derive(Debug)]
1674        struct JobUpdate {
1675            filtered_worker_ids: BTreeSet<WorkerId>,
1676            parallelism: TableParallelism,
1677        }
1678
1679        let mut job_parallelism_updates = HashMap::new();
1680
1681        let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1682            parallelism_updates: Default::default(),
1683            resource_group_updates: Default::default(),
1684        };
1685
1686        for (
1687            job_id,
1688            JobRescheduleTarget {
1689                parallelism: parallelism_update,
1690                resource_group: resource_group_update,
1691            },
1692        ) in &targets
1693        {
1694            let parallelism = match parallelism_update {
1695                JobParallelismTarget::Update(parallelism) => *parallelism,
1696                JobParallelismTarget::Refresh => {
1697                    let parallelism = self
1698                        .metadata_manager
1699                        .catalog_controller
1700                        .get_job_streaming_parallelisms(*job_id as _)
1701                        .await?;
1702
1703                    parallelism.into()
1704                }
1705            };
1706
1707            job_reschedule_post_updates
1708                .parallelism_updates
1709                .insert(TableId::from(*job_id), parallelism);
1710
1711            let current_resource_group = match resource_group_update {
1712                JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1713                    job_reschedule_post_updates.resource_group_updates.insert(
1714                        *job_id as ObjectId,
1715                        Some(specific_resource_group.to_owned()),
1716                    );
1717
1718                    specific_resource_group.to_owned()
1719                }
1720                JobResourceGroupTarget::Update(None) => {
1721                    let database_resource_group = self
1722                        .metadata_manager
1723                        .catalog_controller
1724                        .get_existing_job_database_resource_group(*job_id as _)
1725                        .await?;
1726
1727                    job_reschedule_post_updates
1728                        .resource_group_updates
1729                        .insert(*job_id as ObjectId, None);
1730                    database_resource_group
1731                }
1732                JobResourceGroupTarget::Keep => {
1733                    self.metadata_manager
1734                        .catalog_controller
1735                        .get_existing_job_resource_group(*job_id as _)
1736                        .await?
1737                }
1738            };
1739
1740            let filtered_worker_ids =
1741                filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1742
1743            if filtered_worker_ids.is_empty() {
1744                bail!("Cannot resize streaming_job {job_id} to empty worker set")
1745            }
1746
1747            job_parallelism_updates.insert(
1748                *job_id,
1749                JobUpdate {
1750                    filtered_worker_ids,
1751                    parallelism,
1752                },
1753            );
1754        }
1755
1756        // index for no shuffle relation
1757        let mut no_shuffle_source_fragment_ids = HashSet::new();
1758        let mut no_shuffle_target_fragment_ids = HashSet::new();
1759
1760        // index for fragment_id -> (distribution_type, vnode_count)
1761        let mut fragment_distribution_map = HashMap::new();
1762        // index for actor -> worker id
1763        let mut actor_location = HashMap::new();
1764        // index for table_id -> [fragment_id]
1765        let mut table_fragment_id_map = HashMap::new();
1766        // index for fragment_id -> [actor_id]
1767        let mut fragment_actor_id_map = HashMap::new();
1768
1769        async fn build_index(
1770            no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1771            no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1772            fragment_distribution_map: &mut HashMap<
1773                FragmentId,
1774                (FragmentDistributionType, VnodeCount),
1775            >,
1776            actor_location: &mut HashMap<ActorId, WorkerId>,
1777            table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1778            fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1779            mgr: &MetadataManager,
1780            table_ids: Vec<ObjectId>,
1781        ) -> Result<(), MetaError> {
1782            let RescheduleWorkingSet {
1783                fragments,
1784                actors,
1785                actor_dispatchers: _actor_dispatchers,
1786                fragment_downstreams,
1787                fragment_upstreams: _fragment_upstreams,
1788                related_jobs: _related_jobs,
1789                job_resource_groups: _job_resource_groups,
1790            } = mgr
1791                .catalog_controller
1792                .resolve_working_set_for_reschedule_tables(table_ids)
1793                .await?;
1794
1795            for (fragment_id, downstreams) in fragment_downstreams {
1796                for (downstream_fragment_id, dispatcher_type) in downstreams {
1797                    if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1798                        no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1799                        no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1800                    }
1801                }
1802            }
1803
1804            for (fragment_id, fragment) in fragments {
1805                fragment_distribution_map.insert(
1806                    fragment_id as FragmentId,
1807                    (
1808                        FragmentDistributionType::from(fragment.distribution_type),
1809                        fragment.vnode_count as _,
1810                    ),
1811                );
1812
1813                table_fragment_id_map
1814                    .entry(fragment.job_id as u32)
1815                    .or_default()
1816                    .insert(fragment_id as FragmentId);
1817            }
1818
1819            for (actor_id, actor) in actors {
1820                actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1821                fragment_actor_id_map
1822                    .entry(actor.fragment_id as FragmentId)
1823                    .or_default()
1824                    .insert(actor_id as ActorId);
1825            }
1826
1827            Ok(())
1828        }
1829
1830        let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1831
1832        build_index(
1833            &mut no_shuffle_source_fragment_ids,
1834            &mut no_shuffle_target_fragment_ids,
1835            &mut fragment_distribution_map,
1836            &mut actor_location,
1837            &mut table_fragment_id_map,
1838            &mut fragment_actor_id_map,
1839            &self.metadata_manager,
1840            table_ids,
1841        )
1842        .await?;
1843        tracing::debug!(
1844            ?job_reschedule_post_updates,
1845            ?job_parallelism_updates,
1846            ?no_shuffle_source_fragment_ids,
1847            ?no_shuffle_target_fragment_ids,
1848            ?fragment_distribution_map,
1849            ?actor_location,
1850            ?table_fragment_id_map,
1851            ?fragment_actor_id_map,
1852            "generate_table_resize_plan, after build_index"
1853        );
1854
1855        let adaptive_parallelism_strategy = self
1856            .env
1857            .system_params_reader()
1858            .await
1859            .adaptive_parallelism_strategy();
1860
1861        let mut target_plan = HashMap::new();
1862
1863        for (
1864            table_id,
1865            JobUpdate {
1866                filtered_worker_ids,
1867                parallelism,
1868            },
1869        ) in job_parallelism_updates
1870        {
1871            let assigner = AssignerBuilder::new(table_id).build();
1872
1873            let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1874
1875            let available_worker_slots = workers
1876                .iter()
1877                .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1878                .map(|(_, worker)| {
1879                    (
1880                        worker.id as WorkerId,
1881                        NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1882                    )
1883                })
1884                .collect::<BTreeMap<_, _>>();
1885
1886            for fragment_id in fragment_map {
1887                // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream.
1888                if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1889                    continue;
1890                }
1891
1892                let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1893
1894                for actor_id in &fragment_actor_id_map[&fragment_id] {
1895                    let worker_id = actor_location[actor_id];
1896                    *fragment_slots.entry(worker_id).or_default() += 1;
1897                }
1898
1899                let available_slot_count: usize = available_worker_slots
1900                    .values()
1901                    .cloned()
1902                    .map(NonZeroUsize::get)
1903                    .sum();
1904
1905                if available_slot_count == 0 {
1906                    bail!(
1907                        "No schedulable slots available for fragment {}",
1908                        fragment_id
1909                    );
1910                }
1911
1912                let (dist, vnode_count) = fragment_distribution_map[&fragment_id];
1913                let max_parallelism = vnode_count;
1914
1915                match dist {
1916                    FragmentDistributionType::Unspecified => unreachable!(),
1917                    FragmentDistributionType::Single => {
1918                        let (single_worker_id, should_be_one) = fragment_slots
1919                            .iter()
1920                            .exactly_one()
1921                            .expect("single fragment should have only one worker slot");
1922
1923                        assert_eq!(*should_be_one, 1);
1924
1925                        let assignment =
1926                            assigner.count_actors_per_worker(&available_worker_slots, 1);
1927
1928                        let (chosen_target_worker_id, should_be_one) =
1929                            assignment.iter().exactly_one().ok().with_context(|| {
1930                                format!(
1931                                    "Cannot find a single target worker for fragment {fragment_id}"
1932                                )
1933                            })?;
1934
1935                        assert_eq!(*should_be_one, 1);
1936
1937                        if *chosen_target_worker_id == *single_worker_id {
1938                            tracing::debug!(
1939                                "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
1940                            );
1941                            continue;
1942                        }
1943
1944                        target_plan.insert(
1945                            fragment_id,
1946                            WorkerReschedule {
1947                                worker_actor_diff: BTreeMap::from_iter(vec![
1948                                    (*chosen_target_worker_id, 1),
1949                                    (*single_worker_id, -1),
1950                                ]),
1951                            },
1952                        );
1953                    }
1954                    FragmentDistributionType::Hash => match parallelism {
1955                        TableParallelism::Adaptive => {
1956                            let target_slot_count = adaptive_parallelism_strategy
1957                                .compute_target_parallelism(available_slot_count);
1958
1959                            if target_slot_count > max_parallelism {
1960                                tracing::warn!(
1961                                    "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
1962                                );
1963
1964                                let target_worker_slots = assigner.count_actors_per_worker(
1965                                    &available_worker_slots,
1966                                    max_parallelism,
1967                                );
1968
1969                                target_plan.insert(
1970                                    fragment_id,
1971                                    Self::diff_worker_slot_changes(
1972                                        &fragment_slots,
1973                                        &target_worker_slots,
1974                                    ),
1975                                );
1976                            } else if available_slot_count != target_slot_count {
1977                                tracing::info!(
1978                                    "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
1979                                );
1980
1981                                let target_worker_slots = assigner.count_actors_per_worker(
1982                                    &available_worker_slots,
1983                                    target_slot_count,
1984                                );
1985
1986                                target_plan.insert(
1987                                    fragment_id,
1988                                    Self::diff_worker_slot_changes(
1989                                        &fragment_slots,
1990                                        &target_worker_slots,
1991                                    ),
1992                                );
1993                            } else {
1994                                let available_worker_slots = available_worker_slots
1995                                    .iter()
1996                                    .map(|(worker_id, v)| (*worker_id, v.get()))
1997                                    .collect();
1998
1999                                target_plan.insert(
2000                                    fragment_id,
2001                                    Self::diff_worker_slot_changes(
2002                                        &fragment_slots,
2003                                        &available_worker_slots,
2004                                    ),
2005                                );
2006                            }
2007                        }
2008                        TableParallelism::Fixed(mut n) => {
2009                            if n > max_parallelism {
2010                                tracing::warn!(
2011                                    "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2012                                );
2013                                n = max_parallelism
2014                            }
2015
2016                            let target_worker_slots =
2017                                assigner.count_actors_per_worker(&available_worker_slots, n);
2018
2019                            target_plan.insert(
2020                                fragment_id,
2021                                Self::diff_worker_slot_changes(
2022                                    &fragment_slots,
2023                                    &target_worker_slots,
2024                                ),
2025                            );
2026                        }
2027                        TableParallelism::Custom => {
2028                            // skipping for custom
2029                        }
2030                    },
2031                }
2032            }
2033        }
2034
2035        target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2036        tracing::debug!(
2037            ?target_plan,
2038            "generate_table_resize_plan finished target_plan"
2039        );
2040
2041        Ok(JobReschedulePlan {
2042            reschedules: target_plan,
2043            post_updates: job_reschedule_post_updates,
2044        })
2045    }
2046
2047    fn diff_worker_slot_changes(
2048        fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2049        target_worker_slots: &BTreeMap<WorkerId, usize>,
2050    ) -> WorkerReschedule {
2051        let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2052        let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2053
2054        for (&worker_id, &target_slots) in target_worker_slots {
2055            let &current_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2056
2057            if target_slots > current_slots {
2058                increased_actor_count.insert(worker_id, target_slots - current_slots);
2059            }
2060        }
2061
2062        for (&worker_id, &current_slots) in fragment_worker_slots {
2063            let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2064
2065            if current_slots > target_slots {
2066                decreased_actor_count.insert(worker_id, current_slots - target_slots);
2067            }
2068        }
2069
2070        let worker_ids: HashSet<_> = increased_actor_count
2071            .keys()
2072            .chain(decreased_actor_count.keys())
2073            .cloned()
2074            .collect();
2075
2076        let mut worker_actor_diff = BTreeMap::new();
2077
2078        for worker_id in worker_ids {
2079            let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2080            let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2081            let change = increased - decreased;
2082
2083            assert_ne!(change, 0);
2084
2085            worker_actor_diff.insert(worker_id, change);
2086        }
2087
2088        WorkerReschedule { worker_actor_diff }
2089    }
2090
2091    fn build_no_shuffle_relation_index(
2092        actor_map: &HashMap<ActorId, CustomActorInfo>,
2093        no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2094        no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2095    ) {
2096        let mut fragment_cache = HashSet::new();
2097        for actor in actor_map.values() {
2098            if fragment_cache.contains(&actor.fragment_id) {
2099                continue;
2100            }
2101
2102            for dispatcher in &actor.dispatcher {
2103                for downstream_actor_id in &dispatcher.downstream_actor_id {
2104                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2105                        // Checking for no shuffle dispatchers
2106                        if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2107                            no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2108                            no_shuffle_target_fragment_ids
2109                                .insert(downstream_actor.fragment_id as FragmentId);
2110                        }
2111                    }
2112                }
2113            }
2114
2115            fragment_cache.insert(actor.fragment_id);
2116        }
2117    }
2118
2119    fn build_fragment_dispatcher_index(
2120        actor_map: &HashMap<ActorId, CustomActorInfo>,
2121        fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2122    ) {
2123        for actor in actor_map.values() {
2124            for dispatcher in &actor.dispatcher {
2125                for downstream_actor_id in &dispatcher.downstream_actor_id {
2126                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2127                        fragment_dispatcher_map
2128                            .entry(actor.fragment_id as FragmentId)
2129                            .or_default()
2130                            .insert(
2131                                downstream_actor.fragment_id as FragmentId,
2132                                dispatcher.r#type().into(),
2133                            );
2134                    }
2135                }
2136            }
2137        }
2138    }
2139
2140    pub fn resolve_no_shuffle_upstream_tables(
2141        fragment_ids: HashSet<FragmentId>,
2142        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2143        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2144        fragment_to_table: &HashMap<FragmentId, TableId>,
2145        fragment_upstreams: &HashMap<
2146            risingwave_meta_model::FragmentId,
2147            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2148        >,
2149        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2150    ) -> MetaResult<()> {
2151        let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2152
2153        let mut fragment_ids = fragment_ids;
2154
2155        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2156        // for every no_shuffle relation.
2157        while let Some(fragment_id) = queue.pop_front() {
2158            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2159                continue;
2160            }
2161
2162            // for upstream
2163            for upstream_fragment_id in fragment_upstreams
2164                .get(&(fragment_id as _))
2165                .map(|upstreams| upstreams.keys())
2166                .into_iter()
2167                .flatten()
2168            {
2169                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2170                let upstream_fragment_id = &upstream_fragment_id;
2171                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2172                    continue;
2173                }
2174
2175                let table_id = &fragment_to_table[&fragment_id];
2176                let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2177
2178                // Only custom parallelism will be propagated to the no shuffle upstream.
2179                if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2180                    if let Some(upstream_table_parallelism) =
2181                        table_parallelisms.get(upstream_table_id)
2182                    {
2183                        if upstream_table_parallelism != &TableParallelism::Custom {
2184                            bail!(
2185                                "Cannot change upstream table {} from {:?} to {:?}",
2186                                upstream_table_id,
2187                                upstream_table_parallelism,
2188                                TableParallelism::Custom
2189                            )
2190                        }
2191                    } else {
2192                        table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2193                    }
2194                }
2195
2196                fragment_ids.insert(*upstream_fragment_id);
2197                queue.push_back(*upstream_fragment_id);
2198            }
2199        }
2200
2201        let downstream_fragment_ids = fragment_ids
2202            .iter()
2203            .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2204
2205        let downstream_table_ids = downstream_fragment_ids
2206            .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2207            .collect::<HashSet<_>>();
2208
2209        table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2210
2211        Ok(())
2212    }
2213
2214    pub fn resolve_no_shuffle_upstream_fragments<T>(
2215        reschedule: &mut HashMap<FragmentId, T>,
2216        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2217        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2218        fragment_upstreams: &HashMap<
2219            risingwave_meta_model::FragmentId,
2220            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2221        >,
2222    ) -> MetaResult<()>
2223    where
2224        T: Clone + Eq,
2225    {
2226        let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2227
2228        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2229        // for every no_shuffle relation.
2230        while let Some(fragment_id) = queue.pop_front() {
2231            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2232                continue;
2233            }
2234
2235            // for upstream
2236            for upstream_fragment_id in fragment_upstreams
2237                .get(&(fragment_id as _))
2238                .map(|upstreams| upstreams.keys())
2239                .into_iter()
2240                .flatten()
2241            {
2242                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2243                let upstream_fragment_id = &upstream_fragment_id;
2244                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2245                    continue;
2246                }
2247
2248                let reschedule_plan = &reschedule[&fragment_id];
2249
2250                if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2251                    if upstream_reschedule_plan != reschedule_plan {
2252                        bail!(
2253                            "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2254                            fragment_id,
2255                            upstream_fragment_id
2256                        );
2257                    }
2258
2259                    continue;
2260                }
2261
2262                reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2263
2264                queue.push_back(*upstream_fragment_id);
2265            }
2266        }
2267
2268        reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2269
2270        Ok(())
2271    }
2272
2273    pub async fn resolve_related_no_shuffle_jobs(
2274        &self,
2275        jobs: &[TableId],
2276    ) -> MetaResult<HashSet<TableId>> {
2277        let RescheduleWorkingSet { related_jobs, .. } = self
2278            .metadata_manager
2279            .catalog_controller
2280            .resolve_working_set_for_reschedule_tables(
2281                jobs.iter().map(|id| id.table_id as _).collect(),
2282            )
2283            .await?;
2284
2285        Ok(related_jobs
2286            .keys()
2287            .map(|id| TableId::new(*id as _))
2288            .collect())
2289    }
2290}
2291
2292#[derive(Debug, Clone)]
2293pub enum JobParallelismTarget {
2294    Update(TableParallelism),
2295    Refresh,
2296}
2297
2298#[derive(Debug, Clone)]
2299pub enum JobResourceGroupTarget {
2300    Update(Option<String>),
2301    Keep,
2302}
2303
2304#[derive(Debug, Clone)]
2305pub struct JobRescheduleTarget {
2306    pub parallelism: JobParallelismTarget,
2307    pub resource_group: JobResourceGroupTarget,
2308}
2309
2310#[derive(Debug)]
2311pub struct JobReschedulePolicy {
2312    pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2313}
2314
2315// final updates for `post_collect`
2316#[derive(Debug, Clone)]
2317pub struct JobReschedulePostUpdates {
2318    pub parallelism_updates: HashMap<TableId, TableParallelism>,
2319    pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2320}
2321
2322#[derive(Debug)]
2323pub struct JobReschedulePlan {
2324    pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2325    pub post_updates: JobReschedulePostUpdates,
2326}
2327
2328impl GlobalStreamManager {
2329    #[await_tree::instrument("acquire_reschedule_read_guard")]
2330    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2331        self.scale_controller.reschedule_lock.read().await
2332    }
2333
2334    #[await_tree::instrument("acquire_reschedule_write_guard")]
2335    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2336        self.scale_controller.reschedule_lock.write().await
2337    }
2338
2339    /// The entrypoint of rescheduling actors.
2340    ///
2341    /// Used by:
2342    /// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService` (`risectl meta reschedule`)
2343    /// - High-level parallelism control API
2344    ///     * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM`
2345    ///     * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed
2346    pub async fn reschedule_actors(
2347        &self,
2348        database_id: DatabaseId,
2349        plan: JobReschedulePlan,
2350        options: RescheduleOptions,
2351    ) -> MetaResult<()> {
2352        let JobReschedulePlan {
2353            reschedules,
2354            mut post_updates,
2355        } = plan;
2356
2357        let reschedule_fragment = self
2358            .scale_controller
2359            .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2360            .await?;
2361
2362        tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2363
2364        let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2365            .iter()
2366            .flat_map(|(_, reschedule)| {
2367                reschedule
2368                    .upstream_fragment_dispatcher_ids
2369                    .iter()
2370                    .map(|(fragment_id, _)| *fragment_id)
2371                    .chain(reschedule.downstream_fragment_ids.iter().cloned())
2372            })
2373            .collect();
2374
2375        let fragment_actors =
2376            try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2377                let actor_ids = self
2378                    .metadata_manager
2379                    .get_running_actors_of_fragment(*fragment_id)
2380                    .await?;
2381                Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2382            }))
2383            .await?
2384            .into_iter()
2385            .collect();
2386
2387        let command = Command::RescheduleFragment {
2388            reschedules: reschedule_fragment,
2389            fragment_actors,
2390            post_updates,
2391        };
2392
2393        let _guard = self.source_manager.pause_tick().await;
2394
2395        self.barrier_scheduler
2396            .run_command(database_id, command)
2397            .await?;
2398
2399        tracing::info!("reschedule done");
2400
2401        Ok(())
2402    }
2403
2404    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
2405    /// examines if there are any jobs can be scaled, and scales them if found.
2406    ///
2407    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
2408    ///
2409    /// Returns
2410    /// - `Ok(false)` if no jobs can be scaled;
2411    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
2412    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2413        tracing::info!("trigger parallelism control");
2414
2415        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2416
2417        let background_streaming_jobs = self
2418            .metadata_manager
2419            .list_background_creating_jobs()
2420            .await?;
2421
2422        let skipped_jobs = if !background_streaming_jobs.is_empty() {
2423            let jobs = self
2424                .scale_controller
2425                .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2426                .await?;
2427
2428            tracing::info!(
2429                "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2430                background_streaming_jobs,
2431                jobs
2432            );
2433
2434            jobs
2435        } else {
2436            HashSet::new()
2437        };
2438
2439        let job_ids: HashSet<_> = {
2440            let streaming_parallelisms = self
2441                .metadata_manager
2442                .catalog_controller
2443                .get_all_streaming_parallelisms()
2444                .await?;
2445
2446            streaming_parallelisms
2447                .into_iter()
2448                .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2449                .map(|(table_id, _)| table_id)
2450                .collect()
2451        };
2452
2453        let workers = self
2454            .metadata_manager
2455            .cluster_controller
2456            .list_active_streaming_workers()
2457            .await?;
2458
2459        let schedulable_worker_ids: BTreeSet<_> = workers
2460            .iter()
2461            .filter(|worker| {
2462                !worker
2463                    .property
2464                    .as_ref()
2465                    .map(|p| p.is_unschedulable)
2466                    .unwrap_or(false)
2467            })
2468            .map(|worker| worker.id as WorkerId)
2469            .collect();
2470
2471        if job_ids.is_empty() {
2472            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2473            return Ok(false);
2474        }
2475
2476        let batch_size = match self.env.opts.parallelism_control_batch_size {
2477            0 => job_ids.len(),
2478            n => n,
2479        };
2480
2481        tracing::info!(
2482            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2483            job_ids.len(),
2484            batch_size,
2485            schedulable_worker_ids
2486        );
2487
2488        let batches: Vec<_> = job_ids
2489            .into_iter()
2490            .chunks(batch_size)
2491            .into_iter()
2492            .map(|chunk| chunk.collect_vec())
2493            .collect();
2494
2495        let mut reschedules = None;
2496
2497        for batch in batches {
2498            let targets: HashMap<_, _> = batch
2499                .into_iter()
2500                .map(|job_id| {
2501                    (
2502                        job_id as u32,
2503                        JobRescheduleTarget {
2504                            parallelism: JobParallelismTarget::Refresh,
2505                            resource_group: JobResourceGroupTarget::Keep,
2506                        },
2507                    )
2508                })
2509                .collect();
2510
2511            let plan = self
2512                .scale_controller
2513                .generate_job_reschedule_plan(JobReschedulePolicy { targets })
2514                .await?;
2515
2516            if !plan.reschedules.is_empty() {
2517                tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2518                reschedules = Some(plan);
2519                break;
2520            }
2521        }
2522
2523        let Some(plan) = reschedules else {
2524            tracing::info!("no reschedule plan generated");
2525            return Ok(false);
2526        };
2527
2528        // todo
2529        for (database_id, reschedules) in self
2530            .metadata_manager
2531            .split_fragment_map_by_database(plan.reschedules)
2532            .await?
2533        {
2534            self.reschedule_actors(
2535                database_id,
2536                JobReschedulePlan {
2537                    reschedules,
2538                    post_updates: plan.post_updates.clone(),
2539                },
2540                RescheduleOptions {
2541                    resolve_no_shuffle_upstream: false,
2542                    skip_create_new_actors: false,
2543                },
2544            )
2545            .await?;
2546        }
2547
2548        Ok(true)
2549    }
2550
2551    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
2552    async fn run(&self, mut shutdown_rx: Receiver<()>) {
2553        tracing::info!("starting automatic parallelism control monitor");
2554
2555        let check_period =
2556            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2557
2558        let mut ticker = tokio::time::interval_at(
2559            Instant::now()
2560                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2561            check_period,
2562        );
2563        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2564
2565        // waiting for the first tick
2566        ticker.tick().await;
2567
2568        let (local_notification_tx, mut local_notification_rx) =
2569            tokio::sync::mpsc::unbounded_channel();
2570
2571        self.env
2572            .notification_manager()
2573            .insert_local_sender(local_notification_tx)
2574            .await;
2575
2576        let worker_nodes = self
2577            .metadata_manager
2578            .list_active_streaming_compute_nodes()
2579            .await
2580            .expect("list active streaming compute nodes");
2581
2582        let mut worker_cache: BTreeMap<_, _> = worker_nodes
2583            .into_iter()
2584            .map(|worker| (worker.id, worker))
2585            .collect();
2586
2587        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2588
2589        let mut should_trigger = false;
2590
2591        loop {
2592            tokio::select! {
2593                biased;
2594
2595                _ = &mut shutdown_rx => {
2596                    tracing::info!("Stream manager is stopped");
2597                    break;
2598                }
2599
2600                _ = ticker.tick(), if should_trigger => {
2601                    let include_workers = worker_cache.keys().copied().collect_vec();
2602
2603                    if include_workers.is_empty() {
2604                        tracing::debug!("no available worker nodes");
2605                        should_trigger = false;
2606                        continue;
2607                    }
2608
2609                    match self.trigger_parallelism_control().await {
2610                        Ok(cont) => {
2611                            should_trigger = cont;
2612                        }
2613                        Err(e) => {
2614                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2615                            ticker.reset();
2616                        }
2617                    }
2618                }
2619
2620                notification = local_notification_rx.recv() => {
2621                    let notification = notification.expect("local notification channel closed in loop of stream manager");
2622
2623                    // Only maintain the cache for streaming compute nodes.
2624                    let worker_is_streaming_compute = |worker: &WorkerNode| {
2625                        worker.get_type() == Ok(WorkerType::ComputeNode)
2626                            && worker.property.as_ref().unwrap().is_streaming
2627                    };
2628
2629                    match notification {
2630                        LocalNotification::SystemParamsChange(reader) => {
2631                            let new_strategy = reader.adaptive_parallelism_strategy();
2632                            if new_strategy != previous_adaptive_parallelism_strategy {
2633                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2634                                should_trigger = true;
2635                                previous_adaptive_parallelism_strategy = new_strategy;
2636                            }
2637                        }
2638                        LocalNotification::WorkerNodeActivated(worker) => {
2639                            if !worker_is_streaming_compute(&worker) {
2640                                continue;
2641                            }
2642
2643                            tracing::info!(worker = worker.id, "worker activated notification received");
2644
2645                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
2646
2647                            match prev_worker {
2648                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
2649                                    tracing::info!(worker = worker.id, "worker parallelism changed");
2650                                    should_trigger = true;
2651                                }
2652                                Some(prev_worker) if  prev_worker.resource_group() != worker.resource_group()  => {
2653                                    tracing::info!(worker = worker.id, "worker label changed");
2654                                    should_trigger = true;
2655                                }
2656                                None => {
2657                                    tracing::info!(worker = worker.id, "new worker joined");
2658                                    should_trigger = true;
2659                                }
2660                                _ => {}
2661                            }
2662                        }
2663
2664                        // Since our logic for handling passive scale-in is within the barrier manager,
2665                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
2666                        LocalNotification::WorkerNodeDeleted(worker) => {
2667                            if !worker_is_streaming_compute(&worker) {
2668                                continue;
2669                            }
2670
2671                            match worker_cache.remove(&worker.id) {
2672                                Some(prev_worker) => {
2673                                    tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2674                                }
2675                                None => {
2676                                    tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2677                                }
2678                            }
2679                        }
2680
2681                        _ => {}
2682                    }
2683                }
2684            }
2685        }
2686    }
2687
2688    pub fn start_auto_parallelism_monitor(
2689        self: Arc<Self>,
2690    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2691        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2692        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2693        let join_handle = tokio::spawn(async move {
2694            self.run(shutdown_rx).await;
2695        });
2696
2697        (join_handle, shutdown_tx)
2698    }
2699}