risingwave_meta/stream/
scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, min};
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
17use std::fmt::Debug;
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use itertools::Itertools;
24use num_integer::Integer;
25use num_traits::abs;
26use risingwave_common::bail;
27use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
28use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
29use risingwave_common::hash::ActorMapping;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_meta_model::{ObjectId, WorkerId, actor, fragment, streaming_job};
32use risingwave_pb::common::{WorkerNode, WorkerType};
33use risingwave_pb::meta::FragmentWorkerSlotMappings;
34use risingwave_pb::meta::subscribe_response::{Info, Operation};
35use risingwave_pb::meta::table_fragments::fragment::{
36    FragmentDistributionType, PbFragmentDistributionType,
37};
38use risingwave_pb::meta::table_fragments::{self, State};
39use risingwave_pb::stream_plan::{Dispatcher, PbDispatcher, PbDispatcherType, StreamNode};
40use thiserror_ext::AsReport;
41use tokio::sync::oneshot::Receiver;
42use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, oneshot};
43use tokio::task::JoinHandle;
44use tokio::time::{Instant, MissedTickBehavior};
45
46use crate::barrier::{Command, Reschedule};
47use crate::controller::scale::RescheduleWorkingSet;
48use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager};
49use crate::model::{
50    ActorId, DispatcherId, FragmentId, StreamActor, StreamActorWithDispatchers, TableParallelism,
51};
52use crate::serving::{
53    ServingVnodeMapping, to_deleted_fragment_worker_slot_mapping, to_fragment_worker_slot_mapping,
54};
55use crate::stream::{AssignerBuilder, GlobalStreamManager, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58#[derive(Debug, Clone, Eq, PartialEq)]
59pub struct WorkerReschedule {
60    pub worker_actor_diff: BTreeMap<WorkerId, isize>,
61}
62
63pub struct CustomFragmentInfo {
64    pub job_id: u32,
65    pub fragment_id: u32,
66    pub fragment_type_mask: FragmentTypeMask,
67    pub distribution_type: PbFragmentDistributionType,
68    pub state_table_ids: Vec<u32>,
69    pub node: StreamNode,
70    pub actor_template: StreamActorWithDispatchers,
71    pub actors: Vec<CustomActorInfo>,
72}
73
74#[derive(Default, Clone)]
75pub struct CustomActorInfo {
76    pub actor_id: u32,
77    pub fragment_id: u32,
78    pub dispatcher: Vec<Dispatcher>,
79    /// `None` if singleton.
80    pub vnode_bitmap: Option<Bitmap>,
81}
82
83use educe::Educe;
84use futures::future::try_join_all;
85use risingwave_common::system_param::AdaptiveParallelismStrategy;
86use risingwave_common::system_param::reader::SystemParamsRead;
87use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
88use risingwave_meta_model::DispatcherType;
89use risingwave_pb::stream_plan::stream_node::NodeBody;
90
91use super::SourceChange;
92use crate::controller::id::IdCategory;
93use crate::controller::utils::filter_workers_by_resource_group;
94use crate::stream::cdc::assign_cdc_table_snapshot_splits_impl;
95
96// The debug implementation is arbitrary. Just used in debug logs.
97#[derive(Educe)]
98#[educe(Debug)]
99pub struct RescheduleContext {
100    /// Meta information for all Actors
101    #[educe(Debug(ignore))]
102    actor_map: HashMap<ActorId, CustomActorInfo>,
103    /// Status of all Actors, used to find the location of the `Actor`
104    actor_status: BTreeMap<ActorId, WorkerId>,
105    /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor`
106    #[educe(Debug(ignore))]
107    fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
108    /// Fragments with `StreamSource`
109    stream_source_fragment_ids: HashSet<FragmentId>,
110    /// Fragments with `StreamSourceBackfill` and the corresponding upstream source fragment
111    stream_source_backfill_fragment_ids: HashMap<FragmentId, FragmentId>,
112    /// Target fragments in `NoShuffle` relation
113    no_shuffle_target_fragment_ids: HashSet<FragmentId>,
114    /// Source fragments in `NoShuffle` relation
115    no_shuffle_source_fragment_ids: HashSet<FragmentId>,
116    // index for dispatcher type from upstream fragment to downstream fragment
117    fragment_dispatcher_map: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
118    fragment_upstreams: HashMap<
119        risingwave_meta_model::FragmentId,
120        HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
121    >,
122}
123
124impl RescheduleContext {
125    fn actor_id_to_worker_id(&self, actor_id: &ActorId) -> MetaResult<WorkerId> {
126        self.actor_status
127            .get(actor_id)
128            .cloned()
129            .ok_or_else(|| anyhow!("could not find worker for actor {}", actor_id).into())
130    }
131}
132
133/// This function provides an simple balancing method
134/// The specific process is as follows
135///
136/// 1. Calculate the number of target actors, and calculate the average value and the remainder, and
137///    use the average value as expected.
138///
139/// 2. Filter out the actor to be removed and the actor to be retained, and sort them from largest
140///    to smallest (according to the number of virtual nodes held).
141///
142/// 3. Calculate their balance, 1) For the actors to be removed, the number of virtual nodes per
143///    actor is the balance. 2) For retained actors, the number of virtual nodes - expected is the
144///    balance. 3) For newly created actors, -expected is the balance (always negative).
145///
146/// 4. Allocate the remainder, high priority to newly created nodes.
147///
148/// 5. After that, merge removed, retained and created into a queue, with the head of the queue
149///    being the source, and move the virtual nodes to the destination at the end of the queue.
150///
151/// This can handle scale in, scale out, migration, and simultaneous scaling with as much affinity
152/// as possible.
153///
154/// Note that this function can only rebalance actors whose `vnode_bitmap` is not `None`, in other
155/// words, for `Fragment` of `FragmentDistributionType::Single`, using this function will cause
156/// assert to fail and should be skipped from the upper level.
157///
158/// The return value is the bitmap distribution after scaling, which covers all virtual node indexes
159pub fn rebalance_actor_vnode(
160    actors: &[CustomActorInfo],
161    actors_to_remove: &BTreeSet<ActorId>,
162    actors_to_create: &BTreeSet<ActorId>,
163) -> HashMap<ActorId, Bitmap> {
164    let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();
165
166    assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
167    assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);
168
169    assert!(actors.len() >= actors_to_remove.len());
170
171    let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
172    assert!(target_actor_count > 0);
173
174    // `vnode_bitmap` must be set on distributed fragments.
175    let vnode_count = actors[0]
176        .vnode_bitmap
177        .as_ref()
178        .expect("vnode bitmap unset")
179        .len();
180
181    // represents the balance of each actor, used to sort later
182    #[derive(Debug)]
183    struct Balance {
184        actor_id: ActorId,
185        balance: i32,
186        builder: BitmapBuilder,
187    }
188    let (expected, mut remain) = vnode_count.div_rem(&target_actor_count);
189
190    tracing::debug!(
191        "expected {}, remain {}, prev actors {}, target actors {}",
192        expected,
193        remain,
194        actors.len(),
195        target_actor_count,
196    );
197
198    let (mut removed, mut rest): (Vec<_>, Vec<_>) = actors
199        .iter()
200        .map(|actor| {
201            (
202                actor.actor_id as ActorId,
203                actor.vnode_bitmap.clone().expect("vnode bitmap unset"),
204            )
205        })
206        .partition(|(actor_id, _)| actors_to_remove.contains(actor_id));
207
208    let order_by_bitmap_desc =
209        |(id_a, bitmap_a): &(ActorId, Bitmap), (id_b, bitmap_b): &(ActorId, Bitmap)| -> Ordering {
210            bitmap_a
211                .count_ones()
212                .cmp(&bitmap_b.count_ones())
213                .reverse()
214                .then(id_a.cmp(id_b))
215        };
216
217    let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder {
218        let mut builder = BitmapBuilder::default();
219        builder.append_bitmap(bitmap);
220        builder
221    };
222
223    let (prev_expected, _) = vnode_count.div_rem(&actors.len());
224
225    let prev_remain = removed
226        .iter()
227        .map(|(_, bitmap)| {
228            assert!(bitmap.count_ones() >= prev_expected);
229            bitmap.count_ones() - prev_expected
230        })
231        .sum::<usize>();
232
233    removed.sort_by(order_by_bitmap_desc);
234    rest.sort_by(order_by_bitmap_desc);
235
236    let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance {
237        actor_id,
238        balance: bitmap.count_ones() as i32,
239        builder: builder_from_bitmap(&bitmap),
240    });
241
242    let mut rest_balances = rest
243        .into_iter()
244        .map(|(actor_id, bitmap)| Balance {
245            actor_id,
246            balance: bitmap.count_ones() as i32 - expected as i32,
247            builder: builder_from_bitmap(&bitmap),
248        })
249        .collect_vec();
250
251    let mut created_balances = actors_to_create
252        .iter()
253        .map(|actor_id| Balance {
254            actor_id: *actor_id,
255            balance: -(expected as i32),
256            builder: BitmapBuilder::zeroed(vnode_count),
257        })
258        .collect_vec();
259
260    for balance in created_balances
261        .iter_mut()
262        .rev()
263        .take(prev_remain)
264        .chain(rest_balances.iter_mut())
265    {
266        if remain > 0 {
267            balance.balance -= 1;
268            remain -= 1;
269        }
270    }
271
272    // consume the rest `remain`
273    for balance in &mut created_balances {
274        if remain > 0 {
275            balance.balance -= 1;
276            remain -= 1;
277        }
278    }
279
280    assert_eq!(remain, 0);
281
282    let mut v: VecDeque<_> = removed_balances
283        .chain(rest_balances)
284        .chain(created_balances)
285        .collect();
286
287    // We will return the full bitmap here after rebalancing,
288    // if we want to return only the changed actors, filter balance = 0 here
289    let mut result = HashMap::with_capacity(target_actor_count);
290
291    for balance in &v {
292        tracing::debug!(
293            "actor {:5}\tbalance {:5}\tR[{:5}]\tC[{:5}]",
294            balance.actor_id,
295            balance.balance,
296            actors_to_remove.contains(&balance.actor_id),
297            actors_to_create.contains(&balance.actor_id)
298        );
299    }
300
301    while !v.is_empty() {
302        if v.len() == 1 {
303            let single = v.pop_front().unwrap();
304            assert_eq!(single.balance, 0);
305            if !actors_to_remove.contains(&single.actor_id) {
306                result.insert(single.actor_id, single.builder.finish());
307            }
308
309            continue;
310        }
311
312        let mut src = v.pop_front().unwrap();
313        let mut dst = v.pop_back().unwrap();
314
315        let n = min(abs(src.balance), abs(dst.balance));
316
317        let mut moved = 0;
318        for idx in (0..vnode_count).rev() {
319            if moved >= n {
320                break;
321            }
322
323            if src.builder.is_set(idx) {
324                src.builder.set(idx, false);
325                assert!(!dst.builder.is_set(idx));
326                dst.builder.set(idx, true);
327                moved += 1;
328            }
329        }
330
331        src.balance -= n;
332        dst.balance += n;
333
334        if src.balance != 0 {
335            v.push_front(src);
336        } else if !actors_to_remove.contains(&src.actor_id) {
337            result.insert(src.actor_id, src.builder.finish());
338        }
339
340        if dst.balance != 0 {
341            v.push_back(dst);
342        } else {
343            result.insert(dst.actor_id, dst.builder.finish());
344        }
345    }
346
347    result
348}
349
350#[derive(Debug, Clone, Copy)]
351pub struct RescheduleOptions {
352    /// Whether to resolve the upstream of `NoShuffle` when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree.
353    pub resolve_no_shuffle_upstream: bool,
354
355    /// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created.
356    pub skip_create_new_actors: bool,
357}
358
359pub type ScaleControllerRef = Arc<ScaleController>;
360
361pub struct ScaleController {
362    pub metadata_manager: MetadataManager,
363
364    pub source_manager: SourceManagerRef,
365
366    pub env: MetaSrvEnv,
367
368    /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
369    /// e.g., a MV cannot be rescheduled during foreground backfill.
370    pub reschedule_lock: RwLock<()>,
371}
372
373impl ScaleController {
374    pub fn new(
375        metadata_manager: &MetadataManager,
376        source_manager: SourceManagerRef,
377        env: MetaSrvEnv,
378    ) -> Self {
379        Self {
380            metadata_manager: metadata_manager.clone(),
381            source_manager,
382            env,
383            reschedule_lock: RwLock::new(()),
384        }
385    }
386
387    pub async fn integrity_check(&self) -> MetaResult<()> {
388        self.metadata_manager
389            .catalog_controller
390            .integrity_check()
391            .await
392    }
393
394    /// Build the context for rescheduling and do some validation for the request.
395    async fn build_reschedule_context(
396        &self,
397        reschedule: &mut HashMap<FragmentId, WorkerReschedule>,
398        options: RescheduleOptions,
399        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
400    ) -> MetaResult<RescheduleContext> {
401        let worker_nodes: HashMap<WorkerId, WorkerNode> = self
402            .metadata_manager
403            .list_active_streaming_compute_nodes()
404            .await?
405            .into_iter()
406            .map(|worker_node| (worker_node.id as _, worker_node))
407            .collect();
408
409        if worker_nodes.is_empty() {
410            bail!("no available compute node in the cluster");
411        }
412
413        // Check if we are trying to move a fragment to a node marked as unschedulable
414        let unschedulable_worker_ids: HashSet<_> = worker_nodes
415            .values()
416            .filter(|w| {
417                w.property
418                    .as_ref()
419                    .map(|property| property.is_unschedulable)
420                    .unwrap_or(false)
421            })
422            .map(|worker| worker.id as WorkerId)
423            .collect();
424
425        for (fragment_id, reschedule) in &*reschedule {
426            for (worker_id, change) in &reschedule.worker_actor_diff {
427                if unschedulable_worker_ids.contains(worker_id) && change.is_positive() {
428                    bail!(
429                        "unable to move fragment {} to unschedulable worker {}",
430                        fragment_id,
431                        worker_id
432                    );
433                }
434            }
435        }
436
437        // FIXME: the same as anther place calling `list_table_fragments` in scaling.
438        // Index for StreamActor
439        let mut actor_map = HashMap::new();
440        // Index for Fragment
441        let mut fragment_map = HashMap::new();
442        // Index for actor status, including actor's worker id
443        let mut actor_status = BTreeMap::new();
444        let mut fragment_state = HashMap::new();
445        let mut fragment_to_table = HashMap::new();
446
447        fn fulfill_index_by_fragment_ids(
448            actor_map: &mut HashMap<u32, CustomActorInfo>,
449            fragment_map: &mut HashMap<FragmentId, CustomFragmentInfo>,
450            actor_status: &mut BTreeMap<ActorId, WorkerId>,
451            fragment_state: &mut HashMap<FragmentId, State>,
452            fragment_to_table: &mut HashMap<FragmentId, TableId>,
453            fragments: HashMap<risingwave_meta_model::FragmentId, fragment::Model>,
454            actors: HashMap<ActorId, actor::Model>,
455            mut actor_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
456            related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
457        ) {
458            let mut fragment_actors: HashMap<
459                risingwave_meta_model::FragmentId,
460                Vec<CustomActorInfo>,
461            > = HashMap::new();
462
463            let mut expr_contexts = HashMap::new();
464            for (
465                _,
466                actor::Model {
467                    actor_id,
468                    fragment_id,
469                    status: _,
470                    splits: _,
471                    worker_id,
472                    vnode_bitmap,
473                    expr_context,
474                    ..
475                },
476            ) in actors
477            {
478                let dispatchers = actor_dispatchers
479                    .remove(&(actor_id as _))
480                    .unwrap_or_default();
481
482                let actor_info = CustomActorInfo {
483                    actor_id: actor_id as _,
484                    fragment_id: fragment_id as _,
485                    dispatcher: dispatchers,
486                    vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
487                };
488
489                actor_map.insert(actor_id as _, actor_info.clone());
490
491                fragment_actors
492                    .entry(fragment_id as _)
493                    .or_default()
494                    .push(actor_info);
495
496                actor_status.insert(actor_id as _, worker_id as WorkerId);
497
498                expr_contexts.insert(actor_id as u32, expr_context);
499            }
500
501            for (
502                _,
503                fragment::Model {
504                    fragment_id,
505                    job_id,
506                    fragment_type_mask,
507                    distribution_type,
508                    stream_node,
509                    state_table_ids,
510                    ..
511                },
512            ) in fragments
513            {
514                let actors = fragment_actors
515                    .remove(&(fragment_id as _))
516                    .unwrap_or_default();
517
518                let CustomActorInfo {
519                    actor_id,
520                    fragment_id,
521                    dispatcher,
522                    vnode_bitmap,
523                } = actors.first().unwrap().clone();
524
525                let (related_job, job_definition) =
526                    related_jobs.get(&job_id).expect("job not found");
527
528                let fragment = CustomFragmentInfo {
529                    job_id: job_id as _,
530                    fragment_id: fragment_id as _,
531                    fragment_type_mask: fragment_type_mask.into(),
532                    distribution_type: distribution_type.into(),
533                    state_table_ids: state_table_ids.into_u32_array(),
534                    node: stream_node.to_protobuf(),
535                    actor_template: (
536                        StreamActor {
537                            actor_id,
538                            fragment_id: fragment_id as _,
539                            vnode_bitmap,
540                            mview_definition: job_definition.to_owned(),
541                            expr_context: expr_contexts
542                                .get(&actor_id)
543                                .cloned()
544                                .map(|expr_context| expr_context.to_protobuf()),
545                        },
546                        dispatcher,
547                    ),
548                    actors,
549                };
550
551                fragment_map.insert(fragment_id as _, fragment);
552
553                fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));
554
555                fragment_state.insert(
556                    fragment_id,
557                    table_fragments::PbState::from(related_job.job_status),
558                );
559            }
560        }
561        let fragment_ids = reschedule.keys().map(|id| *id as _).collect();
562        let working_set = self
563            .metadata_manager
564            .catalog_controller
565            .resolve_working_set_for_reschedule_fragments(fragment_ids)
566            .await?;
567
568        fulfill_index_by_fragment_ids(
569            &mut actor_map,
570            &mut fragment_map,
571            &mut actor_status,
572            &mut fragment_state,
573            &mut fragment_to_table,
574            working_set.fragments,
575            working_set.actors,
576            working_set.actor_dispatchers,
577            working_set.related_jobs,
578        );
579
580        // NoShuffle relation index
581        let mut no_shuffle_source_fragment_ids = HashSet::new();
582        let mut no_shuffle_target_fragment_ids = HashSet::new();
583
584        Self::build_no_shuffle_relation_index(
585            &actor_map,
586            &mut no_shuffle_source_fragment_ids,
587            &mut no_shuffle_target_fragment_ids,
588        );
589
590        if options.resolve_no_shuffle_upstream {
591            let original_reschedule_keys = reschedule.keys().cloned().collect();
592
593            Self::resolve_no_shuffle_upstream_fragments(
594                reschedule,
595                &no_shuffle_source_fragment_ids,
596                &no_shuffle_target_fragment_ids,
597                &working_set.fragment_upstreams,
598            )?;
599
600            if !table_parallelisms.is_empty() {
601                // We need to reiterate through the NO_SHUFFLE dependencies in order to ascertain which downstream table the custom modifications of the table have been propagated from.
602                Self::resolve_no_shuffle_upstream_tables(
603                    original_reschedule_keys,
604                    &no_shuffle_source_fragment_ids,
605                    &no_shuffle_target_fragment_ids,
606                    &fragment_to_table,
607                    &working_set.fragment_upstreams,
608                    table_parallelisms,
609                )?;
610            }
611        }
612
613        let mut fragment_dispatcher_map = HashMap::new();
614        Self::build_fragment_dispatcher_index(&actor_map, &mut fragment_dispatcher_map);
615
616        let mut stream_source_fragment_ids = HashSet::new();
617        let mut stream_source_backfill_fragment_ids = HashMap::new();
618        let mut no_shuffle_reschedule = HashMap::new();
619        for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
620            let fragment = fragment_map
621                .get(fragment_id)
622                .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?;
623
624            // Check if the rescheduling is supported.
625            match fragment_state[fragment_id] {
626                table_fragments::State::Unspecified => unreachable!(),
627                state @ table_fragments::State::Initial => {
628                    bail!(
629                        "the materialized view of fragment {fragment_id} is in state {}",
630                        state.as_str_name()
631                    )
632                }
633                state @ table_fragments::State::Creating => {
634                    let stream_node = &fragment.node;
635
636                    let mut is_reschedulable = true;
637                    visit_stream_node_cont(stream_node, |body| {
638                        if let Some(NodeBody::StreamScan(node)) = &body.node_body {
639                            if !node.stream_scan_type().is_reschedulable() {
640                                is_reschedulable = false;
641
642                                // fail fast
643                                return false;
644                            }
645
646                            // continue visiting
647                            return true;
648                        }
649
650                        // continue visiting
651                        true
652                    });
653
654                    if !is_reschedulable {
655                        bail!(
656                            "the materialized view of fragment {fragment_id} is in state {}",
657                            state.as_str_name()
658                        )
659                    }
660                }
661                table_fragments::State::Created => {}
662            }
663
664            if no_shuffle_target_fragment_ids.contains(fragment_id) {
665                bail!(
666                    "rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"
667                );
668            }
669
670            // For the relation of NoShuffle (e.g. Materialize and Chain), we need a special
671            // treatment because the upstream and downstream of NoShuffle are always 1-1
672            // correspondence, so we need to clone the reschedule plan to the downstream of all
673            // cascading relations.
674            if no_shuffle_source_fragment_ids.contains(fragment_id) {
675                // This fragment is a NoShuffle's upstream.
676                let mut queue: VecDeque<_> = fragment_dispatcher_map
677                    .get(fragment_id)
678                    .unwrap()
679                    .keys()
680                    .cloned()
681                    .collect();
682
683                while let Some(downstream_id) = queue.pop_front() {
684                    if !no_shuffle_target_fragment_ids.contains(&downstream_id) {
685                        continue;
686                    }
687
688                    if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id)
689                    {
690                        let no_shuffle_downstreams = downstream_fragments
691                            .iter()
692                            .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
693                            .map(|(fragment_id, _)| fragment_id);
694
695                        queue.extend(no_shuffle_downstreams.copied());
696                    }
697
698                    no_shuffle_reschedule.insert(
699                        downstream_id,
700                        WorkerReschedule {
701                            worker_actor_diff: worker_actor_diff.clone(),
702                        },
703                    );
704                }
705            }
706
707            if fragment
708                .fragment_type_mask
709                .contains(FragmentTypeFlag::Source)
710                && fragment.node.find_stream_source().is_some()
711            {
712                stream_source_fragment_ids.insert(*fragment_id);
713            }
714
715            // Check if the reschedule plan is valid.
716            let current_worker_ids = fragment
717                .actors
718                .iter()
719                .map(|a| actor_status.get(&a.actor_id).cloned().unwrap())
720                .collect::<HashSet<_>>();
721
722            for (removed, change) in worker_actor_diff {
723                if !current_worker_ids.contains(removed) && change.is_negative() {
724                    bail!(
725                        "no actor on the worker {} of fragment {}",
726                        removed,
727                        fragment_id
728                    );
729                }
730            }
731
732            let added_actor_count: usize = worker_actor_diff
733                .values()
734                .filter(|change| change.is_positive())
735                .cloned()
736                .map(|change| change as usize)
737                .sum();
738
739            let removed_actor_count: usize = worker_actor_diff
740                .values()
741                .filter(|change| change.is_positive())
742                .cloned()
743                .map(|v| v.unsigned_abs())
744                .sum();
745
746            match fragment.distribution_type {
747                FragmentDistributionType::Hash => {
748                    if fragment.actors.len() + added_actor_count <= removed_actor_count {
749                        bail!("can't remove all actors from fragment {}", fragment_id);
750                    }
751                }
752                FragmentDistributionType::Single => {
753                    if added_actor_count != removed_actor_count {
754                        bail!("single distribution fragment only support migration");
755                    }
756                }
757                FragmentDistributionType::Unspecified => unreachable!(),
758            }
759        }
760
761        if !no_shuffle_reschedule.is_empty() {
762            tracing::info!(
763                "reschedule plan rewritten with NoShuffle reschedule {:?}",
764                no_shuffle_reschedule
765            );
766
767            for noshuffle_downstream in no_shuffle_reschedule.keys() {
768                let fragment = fragment_map.get(noshuffle_downstream).unwrap();
769                // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
770                if fragment
771                    .fragment_type_mask
772                    .contains(FragmentTypeFlag::SourceScan)
773                {
774                    let stream_node = &fragment.node;
775                    if let Some((_source_id, upstream_source_fragment_id)) =
776                        stream_node.find_source_backfill()
777                    {
778                        stream_source_backfill_fragment_ids
779                            .insert(fragment.fragment_id, upstream_source_fragment_id);
780                    }
781                }
782            }
783        }
784
785        // Modifications for NoShuffle downstream.
786        reschedule.extend(no_shuffle_reschedule.into_iter());
787
788        Ok(RescheduleContext {
789            actor_map,
790            actor_status,
791            fragment_map,
792            stream_source_fragment_ids,
793            stream_source_backfill_fragment_ids,
794            no_shuffle_target_fragment_ids,
795            no_shuffle_source_fragment_ids,
796            fragment_dispatcher_map,
797            fragment_upstreams: working_set.fragment_upstreams,
798        })
799    }
800
801    /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`].
802    ///
803    /// Returns `(reschedule_fragment, applied_reschedules)`
804    /// - `reschedule_fragment`: the generated reschedule plan
805    /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1).
806    ///
807    /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors`), we use the returned values to
808    /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling.
809    /// Meta store is updated after the barrier is collected.
810    ///
811    /// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta.
812    pub(crate) async fn analyze_reschedule_plan(
813        &self,
814        mut reschedules: HashMap<FragmentId, WorkerReschedule>,
815        options: RescheduleOptions,
816        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
817    ) -> MetaResult<HashMap<FragmentId, Reschedule>> {
818        tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
819        let ctx = self
820            .build_reschedule_context(&mut reschedules, options, table_parallelisms)
821            .await?;
822        tracing::debug!("reschedule context: {:#?}", ctx);
823        let reschedules = reschedules;
824
825        // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.
826
827        // Index of actors to create/remove
828        // Fragment Id => ( Actor Id => Worker Id )
829        let (fragment_actors_to_remove, fragment_actors_to_create) =
830            self.arrange_reschedules(&reschedules, &ctx)?;
831
832        let mut fragment_actor_bitmap = HashMap::new();
833        for fragment_id in reschedules.keys() {
834            if ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
835                // skipping chain fragment, we need to clone the upstream materialize fragment's
836                // mapping later
837                continue;
838            }
839
840            let actors_to_create = fragment_actors_to_create
841                .get(fragment_id)
842                .map(|map| map.keys().copied().collect())
843                .unwrap_or_default();
844
845            let actors_to_remove = fragment_actors_to_remove
846                .get(fragment_id)
847                .map(|map| map.keys().copied().collect())
848                .unwrap_or_default();
849
850            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
851
852            match fragment.distribution_type {
853                FragmentDistributionType::Single => {
854                    // Skip re-balancing action for single distribution (always None)
855                    fragment_actor_bitmap
856                        .insert(fragment.fragment_id as FragmentId, Default::default());
857                }
858                FragmentDistributionType::Hash => {
859                    let actor_vnode = rebalance_actor_vnode(
860                        &fragment.actors,
861                        &actors_to_remove,
862                        &actors_to_create,
863                    );
864
865                    fragment_actor_bitmap.insert(fragment.fragment_id as FragmentId, actor_vnode);
866                }
867
868                FragmentDistributionType::Unspecified => unreachable!(),
869            }
870        }
871
872        // Index for fragment -> { actor -> worker_id } after reschedule.
873        // Since we need to organize the upstream and downstream relationships of NoShuffle,
874        // we need to organize the actor distribution after a scaling.
875        let mut fragment_actors_after_reschedule = HashMap::with_capacity(reschedules.len());
876        for fragment_id in reschedules.keys() {
877            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
878            let mut new_actor_ids = BTreeMap::new();
879            for actor in &fragment.actors {
880                if let Some(actors_to_remove) = fragment_actors_to_remove.get(fragment_id)
881                    && actors_to_remove.contains_key(&actor.actor_id)
882                {
883                    continue;
884                }
885                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id)?;
886                new_actor_ids.insert(actor.actor_id as ActorId, worker_id);
887            }
888
889            if let Some(actors_to_create) = fragment_actors_to_create.get(fragment_id) {
890                for (actor_id, worker_id) in actors_to_create {
891                    new_actor_ids.insert(*actor_id, *worker_id);
892                }
893            }
894
895            assert!(
896                !new_actor_ids.is_empty(),
897                "should be at least one actor in fragment {} after rescheduling",
898                fragment_id
899            );
900
901            fragment_actors_after_reschedule.insert(*fragment_id, new_actor_ids);
902        }
903
904        let fragment_actors_after_reschedule = fragment_actors_after_reschedule;
905
906        // In order to maintain consistency with the original structure, the upstream and downstream
907        // actors of NoShuffle need to be in the same worker slot and hold the same virtual nodes,
908        // so for the actors after the upstream re-balancing, since we have sorted the actors of the same fragment by id on all workers,
909        // we can identify the corresponding upstream actor with NO_SHUFFLE.
910        // NOTE: There should be more asserts here to ensure correctness.
911        fn arrange_no_shuffle_relation(
912            ctx: &RescheduleContext,
913            fragment_id: &FragmentId,
914            upstream_fragment_id: &FragmentId,
915            fragment_actors_after_reschedule: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
916            actor_group_map: &mut HashMap<ActorId, (FragmentId, ActorId)>,
917            fragment_updated_bitmap: &mut HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
918            no_shuffle_upstream_actor_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
919            no_shuffle_downstream_actors_map: &mut HashMap<ActorId, HashMap<FragmentId, ActorId>>,
920        ) {
921            if !ctx.no_shuffle_target_fragment_ids.contains(fragment_id) {
922                return;
923            }
924
925            let fragment = &ctx.fragment_map[fragment_id];
926
927            let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
928
929            // build actor group map
930            for upstream_actor in &upstream_fragment.actors {
931                for dispatcher in &upstream_actor.dispatcher {
932                    if let PbDispatcherType::NoShuffle = dispatcher.get_type().unwrap() {
933                        let downstream_actor_id =
934                            *dispatcher.downstream_actor_id.iter().exactly_one().unwrap();
935
936                        // upstream is root
937                        if !ctx
938                            .no_shuffle_target_fragment_ids
939                            .contains(upstream_fragment_id)
940                        {
941                            actor_group_map.insert(
942                                upstream_actor.actor_id,
943                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
944                            );
945                            actor_group_map.insert(
946                                downstream_actor_id,
947                                (upstream_fragment.fragment_id, upstream_actor.actor_id),
948                            );
949                        } else {
950                            let root_actor_id = actor_group_map[&upstream_actor.actor_id];
951
952                            actor_group_map.insert(downstream_actor_id, root_actor_id);
953                        }
954                    }
955                }
956            }
957
958            // If the upstream is a Singleton Fragment, there will be no Bitmap changes
959            let upstream_fragment_bitmap = fragment_updated_bitmap
960                .get(upstream_fragment_id)
961                .cloned()
962                .unwrap_or_default();
963
964            // Question: Is it possible to have Hash Distribution Fragment but the Actor's bitmap remains unchanged?
965            if upstream_fragment.distribution_type == FragmentDistributionType::Single {
966                assert!(
967                    upstream_fragment_bitmap.is_empty(),
968                    "single fragment should have no bitmap updates"
969                );
970            }
971
972            let upstream_fragment_actor_map = fragment_actors_after_reschedule
973                .get(upstream_fragment_id)
974                .cloned()
975                .unwrap();
976
977            let fragment_actor_map = fragment_actors_after_reschedule
978                .get(fragment_id)
979                .cloned()
980                .unwrap();
981
982            let mut worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
983
984            // first, find existing actor bitmap, copy them
985            let mut fragment_bitmap = HashMap::new();
986
987            for (actor_id, worker_id) in &fragment_actor_map {
988                if let Some((root_fragment, root_actor_id)) = actor_group_map.get(actor_id) {
989                    let root_bitmap = fragment_updated_bitmap
990                        .get(root_fragment)
991                        .expect("root fragment bitmap not found")
992                        .get(root_actor_id)
993                        .cloned()
994                        .expect("root actor bitmap not found");
995
996                    // Copy the bitmap
997                    fragment_bitmap.insert(*actor_id, root_bitmap);
998
999                    no_shuffle_upstream_actor_map
1000                        .entry(*actor_id as ActorId)
1001                        .or_default()
1002                        .insert(*upstream_fragment_id, *root_actor_id);
1003                    no_shuffle_downstream_actors_map
1004                        .entry(*root_actor_id)
1005                        .or_default()
1006                        .insert(*fragment_id, *actor_id);
1007                } else {
1008                    worker_reverse_index
1009                        .entry(*worker_id)
1010                        .or_default()
1011                        .insert(*actor_id);
1012                }
1013            }
1014
1015            let mut upstream_worker_reverse_index: HashMap<WorkerId, BTreeSet<_>> = HashMap::new();
1016
1017            for (actor_id, worker_id) in &upstream_fragment_actor_map {
1018                if !actor_group_map.contains_key(actor_id) {
1019                    upstream_worker_reverse_index
1020                        .entry(*worker_id)
1021                        .or_default()
1022                        .insert(*actor_id);
1023                }
1024            }
1025
1026            // then, find the rest of the actors and copy the bitmap
1027            for (worker_id, actor_ids) in worker_reverse_index {
1028                let upstream_actor_ids = upstream_worker_reverse_index
1029                    .get(&worker_id)
1030                    .unwrap()
1031                    .clone();
1032
1033                assert_eq!(actor_ids.len(), upstream_actor_ids.len());
1034
1035                for (actor_id, upstream_actor_id) in actor_ids
1036                    .into_iter()
1037                    .zip_eq_debug(upstream_actor_ids.into_iter())
1038                {
1039                    match upstream_fragment_bitmap.get(&upstream_actor_id).cloned() {
1040                        None => {
1041                            // single fragment should have no bitmap updates (same as upstream)
1042                            assert_eq!(
1043                                upstream_fragment.distribution_type,
1044                                FragmentDistributionType::Single
1045                            );
1046                        }
1047                        Some(bitmap) => {
1048                            // Copy the bitmap
1049                            fragment_bitmap.insert(actor_id, bitmap);
1050                        }
1051                    }
1052
1053                    no_shuffle_upstream_actor_map
1054                        .entry(actor_id as ActorId)
1055                        .or_default()
1056                        .insert(*upstream_fragment_id, upstream_actor_id);
1057                    no_shuffle_downstream_actors_map
1058                        .entry(upstream_actor_id)
1059                        .or_default()
1060                        .insert(*fragment_id, actor_id);
1061                }
1062            }
1063
1064            match fragment.distribution_type {
1065                FragmentDistributionType::Hash => {}
1066                FragmentDistributionType::Single => {
1067                    // single distribution should update nothing
1068                    assert!(fragment_bitmap.is_empty());
1069                }
1070                FragmentDistributionType::Unspecified => unreachable!(),
1071            }
1072
1073            if let Err(e) = fragment_updated_bitmap.try_insert(*fragment_id, fragment_bitmap) {
1074                assert_eq!(
1075                    e.entry.get(),
1076                    &e.value,
1077                    "bitmaps derived from different no-shuffle upstreams mismatch"
1078                );
1079            }
1080
1081            // Visit downstream fragments recursively.
1082            if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) {
1083                let no_shuffle_downstreams = downstream_fragments
1084                    .iter()
1085                    .filter(|(_, ty)| **ty == DispatcherType::NoShuffle)
1086                    .map(|(fragment_id, _)| fragment_id);
1087
1088                for downstream_fragment_id in no_shuffle_downstreams {
1089                    arrange_no_shuffle_relation(
1090                        ctx,
1091                        downstream_fragment_id,
1092                        fragment_id,
1093                        fragment_actors_after_reschedule,
1094                        actor_group_map,
1095                        fragment_updated_bitmap,
1096                        no_shuffle_upstream_actor_map,
1097                        no_shuffle_downstream_actors_map,
1098                    );
1099                }
1100            }
1101        }
1102
1103        let mut no_shuffle_upstream_actor_map = HashMap::new();
1104        let mut no_shuffle_downstream_actors_map = HashMap::new();
1105        let mut actor_group_map = HashMap::new();
1106        // For all roots in the upstream and downstream dependency trees of NoShuffle, recursively
1107        // find all correspondences
1108        for fragment_id in reschedules.keys() {
1109            if ctx.no_shuffle_source_fragment_ids.contains(fragment_id)
1110                && !ctx.no_shuffle_target_fragment_ids.contains(fragment_id)
1111                && let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id)
1112            {
1113                for downstream_fragment_id in downstream_fragments.keys() {
1114                    arrange_no_shuffle_relation(
1115                        &ctx,
1116                        downstream_fragment_id,
1117                        fragment_id,
1118                        &fragment_actors_after_reschedule,
1119                        &mut actor_group_map,
1120                        &mut fragment_actor_bitmap,
1121                        &mut no_shuffle_upstream_actor_map,
1122                        &mut no_shuffle_downstream_actors_map,
1123                    );
1124                }
1125            }
1126        }
1127
1128        tracing::debug!("actor group map {:?}", actor_group_map);
1129
1130        let mut new_created_actors = HashMap::new();
1131        for fragment_id in reschedules.keys() {
1132            let actors_to_create = fragment_actors_to_create
1133                .get(fragment_id)
1134                .cloned()
1135                .unwrap_or_default();
1136
1137            let fragment = &ctx.fragment_map[fragment_id];
1138
1139            assert!(!fragment.actors.is_empty());
1140
1141            for actor_to_create in &actors_to_create {
1142                let new_actor_id = actor_to_create.0;
1143                let (mut new_actor, mut dispatchers) = fragment.actor_template.clone();
1144
1145                // This should be assigned before the `modify_actor_upstream_and_downstream` call,
1146                // because we need to use the new actor id to find the upstream and
1147                // downstream in the NoShuffle relationship
1148                new_actor.actor_id = *new_actor_id;
1149
1150                Self::modify_actor_upstream_and_downstream(
1151                    &ctx,
1152                    &fragment_actors_to_remove,
1153                    &fragment_actors_to_create,
1154                    &fragment_actor_bitmap,
1155                    &no_shuffle_downstream_actors_map,
1156                    &mut new_actor,
1157                    &mut dispatchers,
1158                )?;
1159
1160                if let Some(bitmap) = fragment_actor_bitmap
1161                    .get(fragment_id)
1162                    .and_then(|actor_bitmaps| actor_bitmaps.get(new_actor_id))
1163                {
1164                    new_actor.vnode_bitmap = Some(bitmap.to_protobuf().into());
1165                }
1166
1167                new_created_actors.insert(*new_actor_id, (new_actor, dispatchers));
1168            }
1169        }
1170
1171        // For stream source & source backfill fragments, we need to reallocate the splits.
1172        // Because we are in the Pause state, so it's no problem to reallocate
1173        let mut fragment_actor_splits = HashMap::new();
1174        for fragment_id in reschedules.keys() {
1175            let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1176
1177            if ctx.stream_source_fragment_ids.contains(fragment_id) {
1178                let fragment = &ctx.fragment_map[fragment_id];
1179
1180                let prev_actor_ids = fragment
1181                    .actors
1182                    .iter()
1183                    .map(|actor| actor.actor_id)
1184                    .collect_vec();
1185
1186                let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1187
1188                let actor_splits = self
1189                    .source_manager
1190                    .migrate_splits_for_source_actors(
1191                        *fragment_id,
1192                        &prev_actor_ids,
1193                        &curr_actor_ids,
1194                    )
1195                    .await?;
1196
1197                tracing::debug!(
1198                    "source actor splits: {:?}, fragment_id: {}",
1199                    actor_splits,
1200                    fragment_id
1201                );
1202                fragment_actor_splits.insert(*fragment_id, actor_splits);
1203            }
1204        }
1205        // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
1206        if !ctx.stream_source_backfill_fragment_ids.is_empty() {
1207            for fragment_id in reschedules.keys() {
1208                let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];
1209
1210                if let Some(upstream_source_fragment_id) =
1211                    ctx.stream_source_backfill_fragment_ids.get(fragment_id)
1212                {
1213                    let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
1214
1215                    let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
1216                        *fragment_id,
1217                        *upstream_source_fragment_id,
1218                        &curr_actor_ids,
1219                        &fragment_actor_splits,
1220                        &no_shuffle_upstream_actor_map,
1221                    )?;
1222                    tracing::debug!(
1223                        "source backfill actor splits: {:?}, fragment_id: {}",
1224                        actor_splits,
1225                        fragment_id
1226                    );
1227                    fragment_actor_splits.insert(*fragment_id, actor_splits);
1228                }
1229            }
1230        }
1231
1232        // Generate fragment reschedule plan
1233        let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
1234            HashMap::with_capacity(reschedules.len());
1235
1236        for (fragment_id, _) in reschedules {
1237            let mut actors_to_create: HashMap<_, Vec<_>> = HashMap::new();
1238
1239            if let Some(actor_worker_maps) = fragment_actors_to_create.get(&fragment_id).cloned() {
1240                for (actor_id, worker_id) in actor_worker_maps {
1241                    actors_to_create
1242                        .entry(worker_id)
1243                        .or_default()
1244                        .push(actor_id);
1245                }
1246            }
1247
1248            let actors_to_remove = fragment_actors_to_remove
1249                .get(&fragment_id)
1250                .cloned()
1251                .unwrap_or_default()
1252                .into_keys()
1253                .collect();
1254
1255            let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];
1256
1257            assert!(!actors_after_reschedule.is_empty());
1258
1259            let fragment = &ctx.fragment_map[&fragment_id];
1260
1261            let in_degree_types: HashSet<_> = ctx
1262                .fragment_upstreams
1263                .get(&(fragment_id as _))
1264                .map(|upstreams| upstreams.values())
1265                .into_iter()
1266                .flatten()
1267                .cloned()
1268                .collect();
1269
1270            let upstream_dispatcher_mapping = match fragment.distribution_type {
1271                FragmentDistributionType::Hash => {
1272                    if !in_degree_types.contains(&DispatcherType::Hash) {
1273                        None
1274                    } else {
1275                        // Changes of the bitmap must occur in the case of HashDistribution
1276                        Some(ActorMapping::from_bitmaps(
1277                            &fragment_actor_bitmap[&fragment_id],
1278                        ))
1279                    }
1280                }
1281
1282                FragmentDistributionType::Single => {
1283                    assert!(fragment_actor_bitmap.get(&fragment_id).unwrap().is_empty());
1284                    None
1285                }
1286                FragmentDistributionType::Unspecified => unreachable!(),
1287            };
1288
1289            let mut upstream_fragment_dispatcher_set = BTreeSet::new();
1290
1291            {
1292                if let Some(upstreams) = ctx.fragment_upstreams.get(&(fragment.fragment_id as _)) {
1293                    for (upstream_fragment_id, upstream_dispatcher_type) in upstreams {
1294                        match upstream_dispatcher_type {
1295                            DispatcherType::NoShuffle => {}
1296                            _ => {
1297                                upstream_fragment_dispatcher_set.insert((
1298                                    *upstream_fragment_id as FragmentId,
1299                                    fragment.fragment_id as DispatcherId,
1300                                ));
1301                            }
1302                        }
1303                    }
1304                }
1305            }
1306
1307            let downstream_fragment_ids = if let Some(downstream_fragments) =
1308                ctx.fragment_dispatcher_map.get(&fragment_id)
1309            {
1310                // Skip fragments' no-shuffle downstream, as there's no need to update the merger
1311                // (receiver) of a no-shuffle downstream
1312                downstream_fragments
1313                    .iter()
1314                    .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle)
1315                    .map(|(fragment_id, _)| *fragment_id)
1316                    .collect_vec()
1317            } else {
1318                vec![]
1319            };
1320
1321            let vnode_bitmap_updates = match fragment.distribution_type {
1322                FragmentDistributionType::Hash => {
1323                    let mut vnode_bitmap_updates =
1324                        fragment_actor_bitmap.remove(&fragment_id).unwrap();
1325
1326                    // We need to keep the bitmaps from changed actors only,
1327                    // otherwise the barrier will become very large with many actors
1328                    for actor_id in actors_after_reschedule.keys() {
1329                        assert!(vnode_bitmap_updates.contains_key(actor_id));
1330
1331                        // retain actor
1332                        if let Some(actor) = ctx.actor_map.get(actor_id) {
1333                            let bitmap = vnode_bitmap_updates.get(actor_id).unwrap();
1334
1335                            if let Some(prev_bitmap) = actor.vnode_bitmap.as_ref()
1336                                && prev_bitmap.eq(bitmap)
1337                            {
1338                                vnode_bitmap_updates.remove(actor_id);
1339                            }
1340                        }
1341                    }
1342
1343                    vnode_bitmap_updates
1344                }
1345                FragmentDistributionType::Single => HashMap::new(),
1346                FragmentDistributionType::Unspecified => unreachable!(),
1347            };
1348
1349            let upstream_fragment_dispatcher_ids =
1350                upstream_fragment_dispatcher_set.into_iter().collect_vec();
1351
1352            let actor_splits = fragment_actor_splits
1353                .get(&fragment_id)
1354                .cloned()
1355                .unwrap_or_default();
1356
1357            let mut cdc_table_id = None;
1358            let cdc_table_snapshot_split_assignment = if fragment
1359                .fragment_type_mask
1360                .contains(FragmentTypeFlag::StreamCdcScan)
1361            {
1362                cdc_table_id = Some(fragment.job_id);
1363                assign_cdc_table_snapshot_splits_impl(
1364                    fragment.job_id,
1365                    fragment_actors_after_reschedule
1366                        .get(&fragment_id)
1367                        .unwrap()
1368                        .keys()
1369                        .copied()
1370                        .collect(),
1371                    self.env.meta_store_ref(),
1372                    None,
1373                )
1374                .await?
1375            } else {
1376                HashMap::default()
1377            };
1378
1379            reschedule_fragment.insert(
1380                fragment_id,
1381                Reschedule {
1382                    added_actors: actors_to_create,
1383                    removed_actors: actors_to_remove,
1384                    vnode_bitmap_updates,
1385                    upstream_fragment_dispatcher_ids,
1386                    upstream_dispatcher_mapping,
1387                    downstream_fragment_ids,
1388                    actor_splits,
1389                    newly_created_actors: Default::default(),
1390                    cdc_table_snapshot_split_assignment,
1391                    cdc_table_id,
1392                },
1393            );
1394        }
1395
1396        let mut fragment_created_actors = HashMap::new();
1397        for (fragment_id, actors_to_create) in &fragment_actors_to_create {
1398            let mut created_actors = HashMap::new();
1399            for (actor_id, worker_id) in actors_to_create {
1400                let actor = new_created_actors.get(actor_id).cloned().unwrap();
1401                created_actors.insert(*actor_id, (actor, *worker_id));
1402            }
1403
1404            fragment_created_actors.insert(*fragment_id, created_actors);
1405        }
1406
1407        for (fragment_id, to_create) in fragment_created_actors {
1408            let reschedule = reschedule_fragment.get_mut(&fragment_id).unwrap();
1409            reschedule.newly_created_actors = to_create;
1410        }
1411        tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
1412
1413        Ok(reschedule_fragment)
1414    }
1415
1416    #[expect(clippy::type_complexity)]
1417    fn arrange_reschedules(
1418        &self,
1419        reschedule: &HashMap<FragmentId, WorkerReschedule>,
1420        ctx: &RescheduleContext,
1421    ) -> MetaResult<(
1422        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1423        HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1424    )> {
1425        let mut fragment_actors_to_remove = HashMap::with_capacity(reschedule.len());
1426        let mut fragment_actors_to_create = HashMap::with_capacity(reschedule.len());
1427
1428        for (fragment_id, WorkerReschedule { worker_actor_diff }) in reschedule {
1429            let fragment = ctx.fragment_map.get(fragment_id).unwrap();
1430
1431            // Actor Id => Worker Id
1432            let mut actors_to_remove = BTreeMap::new();
1433            let mut actors_to_create = BTreeMap::new();
1434
1435            // NOTE(important): The value needs to be a BTreeSet to ensure that the actors on the worker are sorted in ascending order.
1436            let mut worker_to_actors = HashMap::new();
1437
1438            for actor in &fragment.actors {
1439                let worker_id = ctx.actor_id_to_worker_id(&actor.actor_id).unwrap();
1440                worker_to_actors
1441                    .entry(worker_id)
1442                    .or_insert(BTreeSet::new())
1443                    .insert(actor.actor_id as ActorId);
1444            }
1445
1446            let decreased_actor_count = worker_actor_diff
1447                .iter()
1448                .filter(|(_, change)| change.is_negative())
1449                .map(|(worker_id, change)| (worker_id, change.unsigned_abs()));
1450
1451            for (worker_id, n) in decreased_actor_count {
1452                if let Some(actor_ids) = worker_to_actors.get(worker_id) {
1453                    if actor_ids.len() < n {
1454                        bail!(
1455                            "plan illegal, for fragment {}, worker {} only has {} actors, but needs to reduce {}",
1456                            fragment_id,
1457                            worker_id,
1458                            actor_ids.len(),
1459                            n
1460                        );
1461                    }
1462
1463                    let removed_actors: Vec<_> = actor_ids
1464                        .iter()
1465                        .skip(actor_ids.len().saturating_sub(n))
1466                        .cloned()
1467                        .collect();
1468
1469                    for actor in removed_actors {
1470                        actors_to_remove.insert(actor, *worker_id);
1471                    }
1472                }
1473            }
1474
1475            let increased_actor_count = worker_actor_diff
1476                .iter()
1477                .filter(|(_, change)| change.is_positive());
1478
1479            for (worker, n) in increased_actor_count {
1480                for _ in 0..*n {
1481                    let id = self
1482                        .env
1483                        .id_gen_manager()
1484                        .generate_interval::<{ IdCategory::Actor }>(1)
1485                        as ActorId;
1486                    actors_to_create.insert(id, *worker);
1487                }
1488            }
1489
1490            if !actors_to_remove.is_empty() {
1491                fragment_actors_to_remove.insert(*fragment_id as FragmentId, actors_to_remove);
1492            }
1493
1494            if !actors_to_create.is_empty() {
1495                fragment_actors_to_create.insert(*fragment_id as FragmentId, actors_to_create);
1496            }
1497        }
1498
1499        // sanity checking
1500        for actors_to_remove in fragment_actors_to_remove.values() {
1501            for actor_id in actors_to_remove.keys() {
1502                let actor = ctx.actor_map.get(actor_id).unwrap();
1503                for dispatcher in &actor.dispatcher {
1504                    if PbDispatcherType::NoShuffle == dispatcher.get_type().unwrap() {
1505                        let downstream_actor_id = dispatcher.downstream_actor_id.iter().exactly_one().expect("there should be only one downstream actor id in NO_SHUFFLE dispatcher");
1506
1507                        let _should_exists = fragment_actors_to_remove
1508                            .get(&(dispatcher.dispatcher_id as FragmentId))
1509                            .expect("downstream fragment of NO_SHUFFLE relation should be in the removing map")
1510                            .get(downstream_actor_id)
1511                            .expect("downstream actor of NO_SHUFFLE relation should be in the removing map");
1512                    }
1513                }
1514            }
1515        }
1516
1517        Ok((fragment_actors_to_remove, fragment_actors_to_create))
1518    }
1519
1520    /// Modifies the upstream and downstream actors of the new created actor according to the
1521    /// overall changes, and is used to handle cascading updates
1522    fn modify_actor_upstream_and_downstream(
1523        ctx: &RescheduleContext,
1524        fragment_actors_to_remove: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1525        fragment_actors_to_create: &HashMap<FragmentId, BTreeMap<ActorId, WorkerId>>,
1526        fragment_actor_bitmap: &HashMap<FragmentId, HashMap<ActorId, Bitmap>>,
1527        no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
1528        new_actor: &mut StreamActor,
1529        dispatchers: &mut Vec<PbDispatcher>,
1530    ) -> MetaResult<()> {
1531        // Update downstream actor ids
1532        for dispatcher in dispatchers {
1533            let downstream_fragment_id = dispatcher
1534                .downstream_actor_id
1535                .iter()
1536                .filter_map(|actor_id| ctx.actor_map.get(actor_id).map(|actor| actor.fragment_id))
1537                .dedup()
1538                .exactly_one()
1539                .unwrap() as FragmentId;
1540
1541            let downstream_fragment_actors_to_remove =
1542                fragment_actors_to_remove.get(&downstream_fragment_id);
1543            let downstream_fragment_actors_to_create =
1544                fragment_actors_to_create.get(&downstream_fragment_id);
1545
1546            match dispatcher.r#type() {
1547                d @ (PbDispatcherType::Hash
1548                | PbDispatcherType::Simple
1549                | PbDispatcherType::Broadcast) => {
1550                    if let Some(downstream_actors_to_remove) = downstream_fragment_actors_to_remove
1551                    {
1552                        dispatcher
1553                            .downstream_actor_id
1554                            .retain(|id| !downstream_actors_to_remove.contains_key(id));
1555                    }
1556
1557                    if let Some(downstream_actors_to_create) = downstream_fragment_actors_to_create
1558                    {
1559                        dispatcher
1560                            .downstream_actor_id
1561                            .extend(downstream_actors_to_create.keys().cloned())
1562                    }
1563
1564                    // There should be still exactly one downstream actor
1565                    if d == PbDispatcherType::Simple {
1566                        assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1567                    }
1568                }
1569                PbDispatcherType::NoShuffle => {
1570                    assert_eq!(dispatcher.downstream_actor_id.len(), 1);
1571                    let downstream_actor_id = no_shuffle_downstream_actors_map
1572                        .get(&new_actor.actor_id)
1573                        .and_then(|map| map.get(&downstream_fragment_id))
1574                        .unwrap();
1575                    dispatcher.downstream_actor_id = vec![*downstream_actor_id as ActorId];
1576                }
1577                PbDispatcherType::Unspecified => unreachable!(),
1578            }
1579
1580            if let Some(mapping) = dispatcher.hash_mapping.as_mut()
1581                && let Some(downstream_updated_bitmap) =
1582                    fragment_actor_bitmap.get(&downstream_fragment_id)
1583            {
1584                // If downstream scale in/out
1585                *mapping = ActorMapping::from_bitmaps(downstream_updated_bitmap).to_protobuf();
1586            }
1587        }
1588
1589        Ok(())
1590    }
1591
1592    #[await_tree::instrument]
1593    pub async fn post_apply_reschedule(
1594        &self,
1595        reschedules: &HashMap<FragmentId, Reschedule>,
1596        post_updates: &JobReschedulePostUpdates,
1597    ) -> MetaResult<()> {
1598        // Update fragment info after rescheduling in meta store.
1599        self.metadata_manager
1600            .post_apply_reschedules(reschedules.clone(), post_updates)
1601            .await?;
1602
1603        // Update serving fragment info after rescheduling in meta store.
1604        if !reschedules.is_empty() {
1605            let workers = self
1606                .metadata_manager
1607                .list_active_serving_compute_nodes()
1608                .await?;
1609            let streaming_parallelisms = self
1610                .metadata_manager
1611                .running_fragment_parallelisms(Some(reschedules.keys().cloned().collect()))
1612                .await?;
1613            let serving_worker_slot_mapping = Arc::new(ServingVnodeMapping::default());
1614            let max_serving_parallelism = self
1615                .env
1616                .session_params_manager_impl_ref()
1617                .get_params()
1618                .await
1619                .batch_parallelism()
1620                .map(|p| p.get());
1621            let (upserted, failed) = serving_worker_slot_mapping.upsert(
1622                streaming_parallelisms,
1623                &workers,
1624                max_serving_parallelism,
1625            );
1626            if !upserted.is_empty() {
1627                tracing::debug!(
1628                    "Update serving vnode mapping for fragments {:?}.",
1629                    upserted.keys()
1630                );
1631                self.env
1632                    .notification_manager()
1633                    .notify_frontend_without_version(
1634                        Operation::Update,
1635                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1636                            mappings: to_fragment_worker_slot_mapping(&upserted),
1637                        }),
1638                    );
1639            }
1640            if !failed.is_empty() {
1641                tracing::debug!(
1642                    "Fail to update serving vnode mapping for fragments {:?}.",
1643                    failed
1644                );
1645                self.env
1646                    .notification_manager()
1647                    .notify_frontend_without_version(
1648                        Operation::Delete,
1649                        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
1650                            mappings: to_deleted_fragment_worker_slot_mapping(&failed),
1651                        }),
1652                    );
1653            }
1654        }
1655
1656        let mut stream_source_actor_splits = HashMap::new();
1657        let mut stream_source_dropped_actors = HashSet::new();
1658
1659        // todo: handle adaptive splits
1660        for (fragment_id, reschedule) in reschedules {
1661            if !reschedule.actor_splits.is_empty() {
1662                stream_source_actor_splits
1663                    .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone());
1664                stream_source_dropped_actors.extend(reschedule.removed_actors.clone());
1665            }
1666        }
1667
1668        if !stream_source_actor_splits.is_empty() {
1669            self.source_manager
1670                .apply_source_change(SourceChange::Reschedule {
1671                    split_assignment: stream_source_actor_splits,
1672                    dropped_actors: stream_source_dropped_actors,
1673                })
1674                .await;
1675        }
1676
1677        Ok(())
1678    }
1679
1680    pub async fn generate_job_reschedule_plan(
1681        &self,
1682        policy: JobReschedulePolicy,
1683        generate_plan_for_cdc_table_backfill: bool,
1684    ) -> MetaResult<JobReschedulePlan> {
1685        type VnodeCount = usize;
1686
1687        let JobReschedulePolicy { targets } = policy;
1688
1689        let workers = self
1690            .metadata_manager
1691            .list_active_streaming_compute_nodes()
1692            .await?;
1693
1694        // The `schedulable` field should eventually be replaced by resource groups like `unschedulable`
1695        let workers: HashMap<_, _> = workers
1696            .into_iter()
1697            .filter(|worker| worker.is_streaming_schedulable())
1698            .map(|worker| (worker.id, worker))
1699            .collect();
1700
1701        #[derive(Debug)]
1702        struct JobUpdate {
1703            filtered_worker_ids: BTreeSet<WorkerId>,
1704            parallelism: TableParallelism,
1705        }
1706
1707        let mut job_parallelism_updates = HashMap::new();
1708
1709        let mut job_reschedule_post_updates = JobReschedulePostUpdates {
1710            parallelism_updates: Default::default(),
1711            resource_group_updates: Default::default(),
1712        };
1713
1714        for (
1715            job_id,
1716            JobRescheduleTarget {
1717                parallelism: parallelism_update,
1718                resource_group: resource_group_update,
1719            },
1720        ) in &targets
1721        {
1722            let parallelism = match parallelism_update {
1723                JobParallelismTarget::Update(parallelism) => *parallelism,
1724                JobParallelismTarget::Refresh => {
1725                    let parallelism = self
1726                        .metadata_manager
1727                        .catalog_controller
1728                        .get_job_streaming_parallelisms(*job_id as _)
1729                        .await?;
1730
1731                    parallelism.into()
1732                }
1733            };
1734
1735            job_reschedule_post_updates
1736                .parallelism_updates
1737                .insert(TableId::from(*job_id), parallelism);
1738
1739            let current_resource_group = match resource_group_update {
1740                JobResourceGroupTarget::Update(Some(specific_resource_group)) => {
1741                    job_reschedule_post_updates.resource_group_updates.insert(
1742                        *job_id as ObjectId,
1743                        Some(specific_resource_group.to_owned()),
1744                    );
1745
1746                    specific_resource_group.to_owned()
1747                }
1748                JobResourceGroupTarget::Update(None) => {
1749                    let database_resource_group = self
1750                        .metadata_manager
1751                        .catalog_controller
1752                        .get_existing_job_database_resource_group(*job_id as _)
1753                        .await?;
1754
1755                    job_reschedule_post_updates
1756                        .resource_group_updates
1757                        .insert(*job_id as ObjectId, None);
1758                    database_resource_group
1759                }
1760                JobResourceGroupTarget::Keep => {
1761                    self.metadata_manager
1762                        .catalog_controller
1763                        .get_existing_job_resource_group(*job_id as _)
1764                        .await?
1765                }
1766            };
1767
1768            let filtered_worker_ids =
1769                filter_workers_by_resource_group(&workers, current_resource_group.as_str());
1770
1771            if filtered_worker_ids.is_empty() {
1772                bail!("Cannot resize streaming_job {job_id} to empty worker set")
1773            }
1774
1775            job_parallelism_updates.insert(
1776                *job_id,
1777                JobUpdate {
1778                    filtered_worker_ids,
1779                    parallelism,
1780                },
1781            );
1782        }
1783
1784        // index for no shuffle relation
1785        let mut no_shuffle_source_fragment_ids = HashSet::new();
1786        let mut no_shuffle_target_fragment_ids = HashSet::new();
1787
1788        // index for fragment_id -> (distribution_type, vnode_count)
1789        let mut fragment_distribution_map = HashMap::new();
1790        // index for actor -> worker id
1791        let mut actor_location = HashMap::new();
1792        // index for table_id -> [fragment_id]
1793        let mut table_fragment_id_map = HashMap::new();
1794        // index for fragment_id -> [actor_id]
1795        let mut fragment_actor_id_map = HashMap::new();
1796
1797        async fn build_index(
1798            no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
1799            no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
1800            fragment_distribution_map: &mut HashMap<
1801                FragmentId,
1802                (FragmentDistributionType, VnodeCount, bool),
1803            >,
1804            actor_location: &mut HashMap<ActorId, WorkerId>,
1805            table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
1806            fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
1807            mgr: &MetadataManager,
1808            table_ids: Vec<ObjectId>,
1809            generate_plan_only_for_cdc_table_backfill: bool,
1810        ) -> Result<(), MetaError> {
1811            let RescheduleWorkingSet {
1812                fragments,
1813                actors,
1814                actor_dispatchers: _actor_dispatchers,
1815                fragment_downstreams,
1816                fragment_upstreams: _fragment_upstreams,
1817                related_jobs: _related_jobs,
1818                job_resource_groups: _job_resource_groups,
1819            } = mgr
1820                .catalog_controller
1821                .resolve_working_set_for_reschedule_tables(table_ids)
1822                .await?;
1823
1824            for (fragment_id, downstreams) in fragment_downstreams {
1825                for (downstream_fragment_id, dispatcher_type) in downstreams {
1826                    if let risingwave_meta_model::DispatcherType::NoShuffle = dispatcher_type {
1827                        no_shuffle_source_fragment_ids.insert(fragment_id as FragmentId);
1828                        no_shuffle_target_fragment_ids.insert(downstream_fragment_id as FragmentId);
1829                    }
1830                }
1831            }
1832
1833            for (fragment_id, fragment) in fragments {
1834                let is_cdc_backfill_v2_fragment =
1835                    FragmentTypeMask::from(fragment.fragment_type_mask)
1836                        .contains(FragmentTypeFlag::StreamCdcScan);
1837                if generate_plan_only_for_cdc_table_backfill && !is_cdc_backfill_v2_fragment {
1838                    continue;
1839                }
1840                fragment_distribution_map.insert(
1841                    fragment_id as FragmentId,
1842                    (
1843                        FragmentDistributionType::from(fragment.distribution_type),
1844                        fragment.vnode_count as _,
1845                        is_cdc_backfill_v2_fragment,
1846                    ),
1847                );
1848
1849                table_fragment_id_map
1850                    .entry(fragment.job_id as u32)
1851                    .or_default()
1852                    .insert(fragment_id as FragmentId);
1853            }
1854
1855            for (actor_id, actor) in actors {
1856                actor_location.insert(actor_id as ActorId, actor.worker_id as WorkerId);
1857                fragment_actor_id_map
1858                    .entry(actor.fragment_id as FragmentId)
1859                    .or_default()
1860                    .insert(actor_id as ActorId);
1861            }
1862
1863            Ok(())
1864        }
1865
1866        let table_ids = targets.keys().map(|id| *id as ObjectId).collect();
1867
1868        build_index(
1869            &mut no_shuffle_source_fragment_ids,
1870            &mut no_shuffle_target_fragment_ids,
1871            &mut fragment_distribution_map,
1872            &mut actor_location,
1873            &mut table_fragment_id_map,
1874            &mut fragment_actor_id_map,
1875            &self.metadata_manager,
1876            table_ids,
1877            generate_plan_for_cdc_table_backfill,
1878        )
1879        .await?;
1880        tracing::debug!(
1881            ?job_reschedule_post_updates,
1882            ?job_parallelism_updates,
1883            ?no_shuffle_source_fragment_ids,
1884            ?no_shuffle_target_fragment_ids,
1885            ?fragment_distribution_map,
1886            ?actor_location,
1887            ?table_fragment_id_map,
1888            ?fragment_actor_id_map,
1889            "generate_table_resize_plan, after build_index"
1890        );
1891
1892        let adaptive_parallelism_strategy = self
1893            .env
1894            .system_params_reader()
1895            .await
1896            .adaptive_parallelism_strategy();
1897
1898        let mut target_plan = HashMap::new();
1899
1900        for (
1901            table_id,
1902            JobUpdate {
1903                filtered_worker_ids,
1904                parallelism,
1905            },
1906        ) in job_parallelism_updates
1907        {
1908            let assigner = AssignerBuilder::new(table_id).build();
1909
1910            let fragment_map = table_fragment_id_map.remove(&table_id).unwrap();
1911
1912            let available_worker_slots = workers
1913                .iter()
1914                .filter(|(id, _)| filtered_worker_ids.contains(&(**id as WorkerId)))
1915                .map(|(_, worker)| {
1916                    (
1917                        worker.id as WorkerId,
1918                        NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
1919                    )
1920                })
1921                .collect::<BTreeMap<_, _>>();
1922
1923            for fragment_id in fragment_map {
1924                // Currently, all of our NO_SHUFFLE relation propagations are only transmitted from upstream to downstream.
1925                if no_shuffle_target_fragment_ids.contains(&fragment_id) {
1926                    continue;
1927                }
1928
1929                let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();
1930
1931                for actor_id in &fragment_actor_id_map[&fragment_id] {
1932                    let worker_id = actor_location[actor_id];
1933                    *fragment_slots.entry(worker_id).or_default() += 1;
1934                }
1935
1936                let available_slot_count: usize = available_worker_slots
1937                    .values()
1938                    .cloned()
1939                    .map(NonZeroUsize::get)
1940                    .sum();
1941
1942                if available_slot_count == 0 {
1943                    bail!(
1944                        "No schedulable slots available for fragment {}",
1945                        fragment_id
1946                    );
1947                }
1948
1949                let (dist, vnode_count, is_cdc_backfill_v2_fragment) =
1950                    fragment_distribution_map[&fragment_id];
1951                let max_parallelism = vnode_count;
1952                let fragment_parallelism_strategy = if generate_plan_for_cdc_table_backfill {
1953                    assert!(is_cdc_backfill_v2_fragment);
1954                    let TableParallelism::Fixed(new_parallelism) = parallelism else {
1955                        return Err(anyhow::anyhow!(
1956                            "invalid new parallelism {:?}, expect fixed parallelism",
1957                            parallelism
1958                        )
1959                        .into());
1960                    };
1961                    if new_parallelism > max_parallelism || new_parallelism == 0 {
1962                        return Err(anyhow::anyhow!(
1963                            "invalid new parallelism {}, max parallelism {}",
1964                            new_parallelism,
1965                            max_parallelism
1966                        )
1967                        .into());
1968                    }
1969                    TableParallelism::Fixed(new_parallelism)
1970                } else if is_cdc_backfill_v2_fragment {
1971                    TableParallelism::Fixed(fragment_actor_id_map[&fragment_id].len())
1972                } else {
1973                    parallelism
1974                };
1975                match dist {
1976                    FragmentDistributionType::Unspecified => unreachable!(),
1977                    FragmentDistributionType::Single => {
1978                        let (single_worker_id, should_be_one) = fragment_slots
1979                            .iter()
1980                            .exactly_one()
1981                            .expect("single fragment should have only one worker slot");
1982
1983                        assert_eq!(*should_be_one, 1);
1984
1985                        let assignment =
1986                            assigner.count_actors_per_worker(&available_worker_slots, 1);
1987
1988                        let (chosen_target_worker_id, should_be_one) =
1989                            assignment.iter().exactly_one().ok().with_context(|| {
1990                                format!(
1991                                    "Cannot find a single target worker for fragment {fragment_id}"
1992                                )
1993                            })?;
1994
1995                        assert_eq!(*should_be_one, 1);
1996
1997                        if *chosen_target_worker_id == *single_worker_id {
1998                            tracing::debug!(
1999                                "single fragment {fragment_id} already on target worker {chosen_target_worker_id}"
2000                            );
2001                            continue;
2002                        }
2003
2004                        target_plan.insert(
2005                            fragment_id,
2006                            WorkerReschedule {
2007                                worker_actor_diff: BTreeMap::from_iter(vec![
2008                                    (*chosen_target_worker_id, 1),
2009                                    (*single_worker_id, -1),
2010                                ]),
2011                            },
2012                        );
2013                    }
2014                    FragmentDistributionType::Hash => match fragment_parallelism_strategy {
2015                        TableParallelism::Adaptive => {
2016                            let target_slot_count = adaptive_parallelism_strategy
2017                                .compute_target_parallelism(available_slot_count);
2018
2019                            if target_slot_count > max_parallelism {
2020                                tracing::warn!(
2021                                    "available parallelism for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2022                                );
2023
2024                                let target_worker_slots = assigner.count_actors_per_worker(
2025                                    &available_worker_slots,
2026                                    max_parallelism,
2027                                );
2028
2029                                target_plan.insert(
2030                                    fragment_id,
2031                                    Self::diff_worker_slot_changes(
2032                                        &fragment_slots,
2033                                        &target_worker_slots,
2034                                    ),
2035                                );
2036                            } else if available_slot_count != target_slot_count {
2037                                tracing::info!(
2038                                    "available parallelism for table {table_id} is limit by adaptive strategy {adaptive_parallelism_strategy}, resetting to {target_slot_count}"
2039                                );
2040
2041                                let target_worker_slots = assigner.count_actors_per_worker(
2042                                    &available_worker_slots,
2043                                    target_slot_count,
2044                                );
2045
2046                                target_plan.insert(
2047                                    fragment_id,
2048                                    Self::diff_worker_slot_changes(
2049                                        &fragment_slots,
2050                                        &target_worker_slots,
2051                                    ),
2052                                );
2053                            } else {
2054                                let available_worker_slots = available_worker_slots
2055                                    .iter()
2056                                    .map(|(worker_id, v)| (*worker_id, v.get()))
2057                                    .collect();
2058
2059                                target_plan.insert(
2060                                    fragment_id,
2061                                    Self::diff_worker_slot_changes(
2062                                        &fragment_slots,
2063                                        &available_worker_slots,
2064                                    ),
2065                                );
2066                            }
2067                        }
2068                        TableParallelism::Fixed(mut n) => {
2069                            if n > max_parallelism {
2070                                tracing::warn!(
2071                                    "specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"
2072                                );
2073                                n = max_parallelism
2074                            }
2075
2076                            let target_worker_slots =
2077                                assigner.count_actors_per_worker(&available_worker_slots, n);
2078
2079                            target_plan.insert(
2080                                fragment_id,
2081                                Self::diff_worker_slot_changes(
2082                                    &fragment_slots,
2083                                    &target_worker_slots,
2084                                ),
2085                            );
2086                        }
2087                        TableParallelism::Custom => {
2088                            // skipping for custom
2089                        }
2090                    },
2091                }
2092            }
2093        }
2094
2095        target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
2096        tracing::debug!(
2097            ?target_plan,
2098            "generate_table_resize_plan finished target_plan"
2099        );
2100        if generate_plan_for_cdc_table_backfill {
2101            job_reschedule_post_updates.resource_group_updates = HashMap::default();
2102            job_reschedule_post_updates.parallelism_updates = HashMap::default();
2103        }
2104        Ok(JobReschedulePlan {
2105            reschedules: target_plan,
2106            post_updates: job_reschedule_post_updates,
2107        })
2108    }
2109
2110    fn diff_worker_slot_changes(
2111        fragment_worker_slots: &BTreeMap<WorkerId, usize>,
2112        target_worker_slots: &BTreeMap<WorkerId, usize>,
2113    ) -> WorkerReschedule {
2114        let mut increased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2115        let mut decreased_actor_count: BTreeMap<WorkerId, usize> = BTreeMap::new();
2116
2117        for (&worker_id, &target_slots) in target_worker_slots {
2118            let &current_slots = fragment_worker_slots.get(&worker_id).unwrap_or(&0);
2119
2120            if target_slots > current_slots {
2121                increased_actor_count.insert(worker_id, target_slots - current_slots);
2122            }
2123        }
2124
2125        for (&worker_id, &current_slots) in fragment_worker_slots {
2126            let &target_slots = target_worker_slots.get(&worker_id).unwrap_or(&0);
2127
2128            if current_slots > target_slots {
2129                decreased_actor_count.insert(worker_id, current_slots - target_slots);
2130            }
2131        }
2132
2133        let worker_ids: HashSet<_> = increased_actor_count
2134            .keys()
2135            .chain(decreased_actor_count.keys())
2136            .cloned()
2137            .collect();
2138
2139        let mut worker_actor_diff = BTreeMap::new();
2140
2141        for worker_id in worker_ids {
2142            let increased = increased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2143            let decreased = decreased_actor_count.remove(&worker_id).unwrap_or(0) as isize;
2144            let change = increased - decreased;
2145
2146            assert_ne!(change, 0);
2147
2148            worker_actor_diff.insert(worker_id, change);
2149        }
2150
2151        WorkerReschedule { worker_actor_diff }
2152    }
2153
2154    fn build_no_shuffle_relation_index(
2155        actor_map: &HashMap<ActorId, CustomActorInfo>,
2156        no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
2157        no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
2158    ) {
2159        let mut fragment_cache = HashSet::new();
2160        for actor in actor_map.values() {
2161            if fragment_cache.contains(&actor.fragment_id) {
2162                continue;
2163            }
2164
2165            for dispatcher in &actor.dispatcher {
2166                for downstream_actor_id in &dispatcher.downstream_actor_id {
2167                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2168                        // Checking for no shuffle dispatchers
2169                        if dispatcher.r#type() == PbDispatcherType::NoShuffle {
2170                            no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId);
2171                            no_shuffle_target_fragment_ids
2172                                .insert(downstream_actor.fragment_id as FragmentId);
2173                        }
2174                    }
2175                }
2176            }
2177
2178            fragment_cache.insert(actor.fragment_id);
2179        }
2180    }
2181
2182    fn build_fragment_dispatcher_index(
2183        actor_map: &HashMap<ActorId, CustomActorInfo>,
2184        fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
2185    ) {
2186        for actor in actor_map.values() {
2187            for dispatcher in &actor.dispatcher {
2188                for downstream_actor_id in &dispatcher.downstream_actor_id {
2189                    if let Some(downstream_actor) = actor_map.get(downstream_actor_id) {
2190                        fragment_dispatcher_map
2191                            .entry(actor.fragment_id as FragmentId)
2192                            .or_default()
2193                            .insert(
2194                                downstream_actor.fragment_id as FragmentId,
2195                                dispatcher.r#type().into(),
2196                            );
2197                    }
2198                }
2199            }
2200        }
2201    }
2202
2203    pub fn resolve_no_shuffle_upstream_tables(
2204        fragment_ids: HashSet<FragmentId>,
2205        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2206        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2207        fragment_to_table: &HashMap<FragmentId, TableId>,
2208        fragment_upstreams: &HashMap<
2209            risingwave_meta_model::FragmentId,
2210            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2211        >,
2212        table_parallelisms: &mut HashMap<TableId, TableParallelism>,
2213    ) -> MetaResult<()> {
2214        let mut queue: VecDeque<FragmentId> = fragment_ids.iter().cloned().collect();
2215
2216        let mut fragment_ids = fragment_ids;
2217
2218        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2219        // for every no_shuffle relation.
2220        while let Some(fragment_id) = queue.pop_front() {
2221            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2222                continue;
2223            }
2224
2225            // for upstream
2226            for upstream_fragment_id in fragment_upstreams
2227                .get(&(fragment_id as _))
2228                .map(|upstreams| upstreams.keys())
2229                .into_iter()
2230                .flatten()
2231            {
2232                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2233                let upstream_fragment_id = &upstream_fragment_id;
2234                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2235                    continue;
2236                }
2237
2238                let table_id = &fragment_to_table[&fragment_id];
2239                let upstream_table_id = &fragment_to_table[upstream_fragment_id];
2240
2241                // Only custom parallelism will be propagated to the no shuffle upstream.
2242                if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
2243                    if let Some(upstream_table_parallelism) =
2244                        table_parallelisms.get(upstream_table_id)
2245                    {
2246                        if upstream_table_parallelism != &TableParallelism::Custom {
2247                            bail!(
2248                                "Cannot change upstream table {} from {:?} to {:?}",
2249                                upstream_table_id,
2250                                upstream_table_parallelism,
2251                                TableParallelism::Custom
2252                            )
2253                        }
2254                    } else {
2255                        table_parallelisms.insert(*upstream_table_id, TableParallelism::Custom);
2256                    }
2257                }
2258
2259                fragment_ids.insert(*upstream_fragment_id);
2260                queue.push_back(*upstream_fragment_id);
2261            }
2262        }
2263
2264        let downstream_fragment_ids = fragment_ids
2265            .iter()
2266            .filter(|fragment_id| no_shuffle_target_fragment_ids.contains(fragment_id));
2267
2268        let downstream_table_ids = downstream_fragment_ids
2269            .map(|fragment_id| fragment_to_table.get(fragment_id).unwrap())
2270            .collect::<HashSet<_>>();
2271
2272        table_parallelisms.retain(|table_id, _| !downstream_table_ids.contains(table_id));
2273
2274        Ok(())
2275    }
2276
2277    pub fn resolve_no_shuffle_upstream_fragments<T>(
2278        reschedule: &mut HashMap<FragmentId, T>,
2279        no_shuffle_source_fragment_ids: &HashSet<FragmentId>,
2280        no_shuffle_target_fragment_ids: &HashSet<FragmentId>,
2281        fragment_upstreams: &HashMap<
2282            risingwave_meta_model::FragmentId,
2283            HashMap<risingwave_meta_model::FragmentId, DispatcherType>,
2284        >,
2285    ) -> MetaResult<()>
2286    where
2287        T: Clone + Eq,
2288    {
2289        let mut queue: VecDeque<FragmentId> = reschedule.keys().cloned().collect();
2290
2291        // We trace the upstreams of each downstream under the hierarchy until we reach the top
2292        // for every no_shuffle relation.
2293        while let Some(fragment_id) = queue.pop_front() {
2294            if !no_shuffle_target_fragment_ids.contains(&fragment_id) {
2295                continue;
2296            }
2297
2298            // for upstream
2299            for upstream_fragment_id in fragment_upstreams
2300                .get(&(fragment_id as _))
2301                .map(|upstreams| upstreams.keys())
2302                .into_iter()
2303                .flatten()
2304            {
2305                let upstream_fragment_id = *upstream_fragment_id as FragmentId;
2306                let upstream_fragment_id = &upstream_fragment_id;
2307                if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
2308                    continue;
2309                }
2310
2311                let reschedule_plan = &reschedule[&fragment_id];
2312
2313                if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
2314                    if upstream_reschedule_plan != reschedule_plan {
2315                        bail!(
2316                            "Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}",
2317                            fragment_id,
2318                            upstream_fragment_id
2319                        );
2320                    }
2321
2322                    continue;
2323                }
2324
2325                reschedule.insert(*upstream_fragment_id, reschedule_plan.clone());
2326
2327                queue.push_back(*upstream_fragment_id);
2328            }
2329        }
2330
2331        reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id));
2332
2333        Ok(())
2334    }
2335
2336    pub async fn resolve_related_no_shuffle_jobs(
2337        &self,
2338        jobs: &[TableId],
2339    ) -> MetaResult<HashSet<TableId>> {
2340        let RescheduleWorkingSet { related_jobs, .. } = self
2341            .metadata_manager
2342            .catalog_controller
2343            .resolve_working_set_for_reschedule_tables(
2344                jobs.iter().map(|id| id.table_id as _).collect(),
2345            )
2346            .await?;
2347
2348        Ok(related_jobs
2349            .keys()
2350            .map(|id| TableId::new(*id as _))
2351            .collect())
2352    }
2353}
2354
2355#[derive(Debug, Clone)]
2356pub enum JobParallelismTarget {
2357    Update(TableParallelism),
2358    Refresh,
2359}
2360
2361#[derive(Debug, Clone)]
2362pub enum JobResourceGroupTarget {
2363    Update(Option<String>),
2364    Keep,
2365}
2366
2367#[derive(Debug, Clone)]
2368pub struct JobRescheduleTarget {
2369    pub parallelism: JobParallelismTarget,
2370    pub resource_group: JobResourceGroupTarget,
2371}
2372
2373#[derive(Debug)]
2374pub struct JobReschedulePolicy {
2375    pub(crate) targets: HashMap<u32, JobRescheduleTarget>,
2376}
2377
2378// final updates for `post_collect`
2379#[derive(Debug, Clone)]
2380pub struct JobReschedulePostUpdates {
2381    pub parallelism_updates: HashMap<TableId, TableParallelism>,
2382    pub resource_group_updates: HashMap<ObjectId, Option<String>>,
2383}
2384
2385#[derive(Debug)]
2386pub struct JobReschedulePlan {
2387    pub reschedules: HashMap<FragmentId, WorkerReschedule>,
2388    pub post_updates: JobReschedulePostUpdates,
2389}
2390
2391impl GlobalStreamManager {
2392    #[await_tree::instrument("acquire_reschedule_read_guard")]
2393    pub async fn reschedule_lock_read_guard(&self) -> RwLockReadGuard<'_, ()> {
2394        self.scale_controller.reschedule_lock.read().await
2395    }
2396
2397    #[await_tree::instrument("acquire_reschedule_write_guard")]
2398    pub async fn reschedule_lock_write_guard(&self) -> RwLockWriteGuard<'_, ()> {
2399        self.scale_controller.reschedule_lock.write().await
2400    }
2401
2402    /// The entrypoint of rescheduling actors.
2403    ///
2404    /// Used by:
2405    /// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService` (`risectl meta reschedule`)
2406    /// - High-level parallelism control API
2407    ///     * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM`
2408    ///     * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed
2409    pub async fn reschedule_actors(
2410        &self,
2411        database_id: DatabaseId,
2412        plan: JobReschedulePlan,
2413        options: RescheduleOptions,
2414    ) -> MetaResult<()> {
2415        let JobReschedulePlan {
2416            reschedules,
2417            mut post_updates,
2418        } = plan;
2419
2420        let reschedule_fragment = self
2421            .scale_controller
2422            .analyze_reschedule_plan(reschedules, options, &mut post_updates.parallelism_updates)
2423            .await?;
2424
2425        tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
2426
2427        let up_down_stream_fragment: HashSet<_> = reschedule_fragment
2428            .iter()
2429            .flat_map(|(_, reschedule)| {
2430                reschedule
2431                    .upstream_fragment_dispatcher_ids
2432                    .iter()
2433                    .map(|(fragment_id, _)| *fragment_id)
2434                    .chain(reschedule.downstream_fragment_ids.iter().cloned())
2435            })
2436            .collect();
2437
2438        let fragment_actors =
2439            try_join_all(up_down_stream_fragment.iter().map(|fragment_id| async {
2440                let actor_ids = self
2441                    .metadata_manager
2442                    .get_running_actors_of_fragment(*fragment_id)
2443                    .await?;
2444                Result::<_, MetaError>::Ok((*fragment_id, actor_ids))
2445            }))
2446            .await?
2447            .into_iter()
2448            .collect();
2449
2450        let command = Command::RescheduleFragment {
2451            reschedules: reschedule_fragment,
2452            fragment_actors,
2453            post_updates,
2454        };
2455
2456        let _guard = self.source_manager.pause_tick().await;
2457
2458        self.barrier_scheduler
2459            .run_command(database_id, command)
2460            .await?;
2461
2462        tracing::info!("reschedule done");
2463
2464        Ok(())
2465    }
2466
2467    /// When new worker nodes joined, or the parallelism of existing worker nodes changed,
2468    /// examines if there are any jobs can be scaled, and scales them if found.
2469    ///
2470    /// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
2471    ///
2472    /// Returns
2473    /// - `Ok(false)` if no jobs can be scaled;
2474    /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
2475    async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
2476        tracing::info!("trigger parallelism control");
2477
2478        let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
2479
2480        let background_streaming_jobs = self
2481            .metadata_manager
2482            .list_background_creating_jobs()
2483            .await?;
2484
2485        let skipped_jobs = if !background_streaming_jobs.is_empty() {
2486            let jobs = self
2487                .scale_controller
2488                .resolve_related_no_shuffle_jobs(&background_streaming_jobs)
2489                .await?;
2490
2491            tracing::info!(
2492                "skipping parallelism control of background jobs {:?} and associated jobs {:?}",
2493                background_streaming_jobs,
2494                jobs
2495            );
2496
2497            jobs
2498        } else {
2499            HashSet::new()
2500        };
2501
2502        let job_ids: HashSet<_> = {
2503            let streaming_parallelisms = self
2504                .metadata_manager
2505                .catalog_controller
2506                .get_all_streaming_parallelisms()
2507                .await?;
2508
2509            streaming_parallelisms
2510                .into_iter()
2511                .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
2512                .map(|(table_id, _)| table_id)
2513                .collect()
2514        };
2515
2516        let workers = self
2517            .metadata_manager
2518            .cluster_controller
2519            .list_active_streaming_workers()
2520            .await?;
2521
2522        let schedulable_worker_ids: BTreeSet<_> = workers
2523            .iter()
2524            .filter(|worker| {
2525                !worker
2526                    .property
2527                    .as_ref()
2528                    .map(|p| p.is_unschedulable)
2529                    .unwrap_or(false)
2530            })
2531            .map(|worker| worker.id as WorkerId)
2532            .collect();
2533
2534        if job_ids.is_empty() {
2535            tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
2536            return Ok(false);
2537        }
2538
2539        let batch_size = match self.env.opts.parallelism_control_batch_size {
2540            0 => job_ids.len(),
2541            n => n,
2542        };
2543
2544        tracing::info!(
2545            "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
2546            job_ids.len(),
2547            batch_size,
2548            schedulable_worker_ids
2549        );
2550
2551        let batches: Vec<_> = job_ids
2552            .into_iter()
2553            .chunks(batch_size)
2554            .into_iter()
2555            .map(|chunk| chunk.collect_vec())
2556            .collect();
2557
2558        let mut reschedules = None;
2559
2560        for batch in batches {
2561            let targets: HashMap<_, _> = batch
2562                .into_iter()
2563                .map(|job_id| {
2564                    (
2565                        job_id as u32,
2566                        JobRescheduleTarget {
2567                            parallelism: JobParallelismTarget::Refresh,
2568                            resource_group: JobResourceGroupTarget::Keep,
2569                        },
2570                    )
2571                })
2572                .collect();
2573
2574            let plan = self
2575                .scale_controller
2576                .generate_job_reschedule_plan(JobReschedulePolicy { targets }, false)
2577                .await?;
2578
2579            if !plan.reschedules.is_empty() {
2580                tracing::info!("reschedule plan generated for streaming jobs {:?}", plan);
2581                reschedules = Some(plan);
2582                break;
2583            }
2584        }
2585
2586        let Some(plan) = reschedules else {
2587            tracing::info!("no reschedule plan generated");
2588            return Ok(false);
2589        };
2590
2591        // todo
2592        for (database_id, reschedules) in self
2593            .metadata_manager
2594            .split_fragment_map_by_database(plan.reschedules)
2595            .await?
2596        {
2597            self.reschedule_actors(
2598                database_id,
2599                JobReschedulePlan {
2600                    reschedules,
2601                    post_updates: plan.post_updates.clone(),
2602                },
2603                RescheduleOptions {
2604                    resolve_no_shuffle_upstream: false,
2605                    skip_create_new_actors: false,
2606                },
2607            )
2608            .await?;
2609        }
2610
2611        Ok(true)
2612    }
2613
2614    /// Handles notification of worker node activation and deletion, and triggers parallelism control.
2615    async fn run(&self, mut shutdown_rx: Receiver<()>) {
2616        tracing::info!("starting automatic parallelism control monitor");
2617
2618        let check_period =
2619            Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);
2620
2621        let mut ticker = tokio::time::interval_at(
2622            Instant::now()
2623                + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
2624            check_period,
2625        );
2626        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
2627
2628        // waiting for the first tick
2629        ticker.tick().await;
2630
2631        let (local_notification_tx, mut local_notification_rx) =
2632            tokio::sync::mpsc::unbounded_channel();
2633
2634        self.env
2635            .notification_manager()
2636            .insert_local_sender(local_notification_tx);
2637
2638        let worker_nodes = self
2639            .metadata_manager
2640            .list_active_streaming_compute_nodes()
2641            .await
2642            .expect("list active streaming compute nodes");
2643
2644        let mut worker_cache: BTreeMap<_, _> = worker_nodes
2645            .into_iter()
2646            .map(|worker| (worker.id, worker))
2647            .collect();
2648
2649        let mut previous_adaptive_parallelism_strategy = AdaptiveParallelismStrategy::default();
2650
2651        let mut should_trigger = false;
2652
2653        loop {
2654            tokio::select! {
2655                biased;
2656
2657                _ = &mut shutdown_rx => {
2658                    tracing::info!("Stream manager is stopped");
2659                    break;
2660                }
2661
2662                _ = ticker.tick(), if should_trigger => {
2663                    let include_workers = worker_cache.keys().copied().collect_vec();
2664
2665                    if include_workers.is_empty() {
2666                        tracing::debug!("no available worker nodes");
2667                        should_trigger = false;
2668                        continue;
2669                    }
2670
2671                    match self.trigger_parallelism_control().await {
2672                        Ok(cont) => {
2673                            should_trigger = cont;
2674                        }
2675                        Err(e) => {
2676                            tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
2677                            ticker.reset();
2678                        }
2679                    }
2680                }
2681
2682                notification = local_notification_rx.recv() => {
2683                    let notification = notification.expect("local notification channel closed in loop of stream manager");
2684
2685                    // Only maintain the cache for streaming compute nodes.
2686                    let worker_is_streaming_compute = |worker: &WorkerNode| {
2687                        worker.get_type() == Ok(WorkerType::ComputeNode)
2688                            && worker.property.as_ref().unwrap().is_streaming
2689                    };
2690
2691                    match notification {
2692                        LocalNotification::SystemParamsChange(reader) => {
2693                            let new_strategy = reader.adaptive_parallelism_strategy();
2694                            if new_strategy != previous_adaptive_parallelism_strategy {
2695                                tracing::info!("adaptive parallelism strategy changed from {:?} to {:?}", previous_adaptive_parallelism_strategy, new_strategy);
2696                                should_trigger = true;
2697                                previous_adaptive_parallelism_strategy = new_strategy;
2698                            }
2699                        }
2700                        LocalNotification::WorkerNodeActivated(worker) => {
2701                            if !worker_is_streaming_compute(&worker) {
2702                                continue;
2703                            }
2704
2705                            tracing::info!(worker = worker.id, "worker activated notification received");
2706
2707                            let prev_worker = worker_cache.insert(worker.id, worker.clone());
2708
2709                            match prev_worker {
2710                                Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism()  => {
2711                                    tracing::info!(worker = worker.id, "worker parallelism changed");
2712                                    should_trigger = true;
2713                                }
2714                                Some(prev_worker) if  prev_worker.resource_group() != worker.resource_group()  => {
2715                                    tracing::info!(worker = worker.id, "worker label changed");
2716                                    should_trigger = true;
2717                                }
2718                                None => {
2719                                    tracing::info!(worker = worker.id, "new worker joined");
2720                                    should_trigger = true;
2721                                }
2722                                _ => {}
2723                            }
2724                        }
2725
2726                        // Since our logic for handling passive scale-in is within the barrier manager,
2727                        // there’s not much we can do here. All we can do is proactively remove the entries from our cache.
2728                        LocalNotification::WorkerNodeDeleted(worker) => {
2729                            if !worker_is_streaming_compute(&worker) {
2730                                continue;
2731                            }
2732
2733                            match worker_cache.remove(&worker.id) {
2734                                Some(prev_worker) => {
2735                                    tracing::info!(worker = prev_worker.id, "worker removed from stream manager cache");
2736                                }
2737                                None => {
2738                                    tracing::warn!(worker = worker.id, "worker not found in stream manager cache, but it was removed");
2739                                }
2740                            }
2741                        }
2742
2743                        _ => {}
2744                    }
2745                }
2746            }
2747        }
2748    }
2749
2750    pub fn start_auto_parallelism_monitor(
2751        self: Arc<Self>,
2752    ) -> (JoinHandle<()>, oneshot::Sender<()>) {
2753        tracing::info!("Automatic parallelism scale-out is enabled for streaming jobs");
2754        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
2755        let join_handle = tokio::spawn(async move {
2756            self.run(shutdown_rx).await;
2757        });
2758
2759        (join_handle, shutdown_tx)
2760    }
2761}