risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
37};
38use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
39use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
42use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
43use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
44use risingwave_pb::stream_plan::update_mutation::*;
45use risingwave_pb::stream_plan::{
46    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
47    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
48    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
49    StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
50};
51use risingwave_pb::stream_service::BarrierCompleteResponse;
52use tracing::warn;
53
54use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
55use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
56use crate::barrier::cdc_progress::CdcTableBackfillTracker;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
72use crate::stream::{
73    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
74    SplitAssignment, SplitState, UpstreamSinkInfo, build_actor_connector_splits,
75};
76use crate::{MetaError, MetaResult};
77
78/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
79/// used for actor scaling or migration.
80#[derive(Debug, Clone)]
81pub struct Reschedule {
82    /// Added actors in this fragment.
83    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
84
85    /// Removed actors in this fragment.
86    pub removed_actors: HashSet<ActorId>,
87
88    /// Vnode bitmap updates for some actors in this fragment.
89    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
90
91    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
92    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
93    /// New hash mapping of the upstream dispatcher to be updated.
94    ///
95    /// This field exists only when there's upstream fragment and the current fragment is
96    /// hash-sharded.
97    pub upstream_dispatcher_mapping: Option<ActorMapping>,
98
99    /// The downstream fragments of this fragment.
100    pub downstream_fragment_ids: Vec<FragmentId>,
101
102    /// Reassigned splits for source actors.
103    /// It becomes the `actor_splits` in [`UpdateMutation`].
104    /// `Source` and `SourceBackfill` are handled together here.
105    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
106
107    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
108}
109
110#[derive(Debug, Clone)]
111pub struct ReschedulePlan {
112    pub reschedules: HashMap<FragmentId, Reschedule>,
113    /// Should contain the actor ids in upstream and downstream fragments referenced by
114    /// `reschedules`.
115    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
116}
117
118/// Preloaded context for rescheduling, built outside the barrier worker.
119#[derive(Debug, Clone)]
120pub struct RescheduleContext {
121    pub loaded: LoadedFragmentContext,
122    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
123    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
126}
127
128impl RescheduleContext {
129    pub fn empty() -> Self {
130        Self {
131            loaded: LoadedFragmentContext::default(),
132            job_extra_info: HashMap::new(),
133            upstream_fragments: HashMap::new(),
134            downstream_fragments: HashMap::new(),
135            downstream_relations: HashMap::new(),
136        }
137    }
138
139    pub fn is_empty(&self) -> bool {
140        self.loaded.is_empty()
141    }
142
143    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
144        let loaded = self.loaded.for_database(database_id)?;
145        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
146        // Use the filtered loaded context as the source of truth so every side map is pruned by
147        // the same fragment set.
148        let fragment_ids: HashSet<FragmentId> = loaded
149            .job_fragments
150            .values()
151            .flat_map(|fragments| fragments.keys().copied())
152            .collect();
153
154        let job_extra_info = self
155            .job_extra_info
156            .iter()
157            .filter(|(job_id, _)| job_ids.contains(*job_id))
158            .map(|(job_id, info)| (*job_id, info.clone()))
159            .collect();
160
161        let upstream_fragments = self
162            .upstream_fragments
163            .iter()
164            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
165            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
166            .collect();
167
168        let downstream_fragments = self
169            .downstream_fragments
170            .iter()
171            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
172            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
173            .collect();
174
175        let downstream_relations = self
176            .downstream_relations
177            .iter()
178            // Ownership of this map is source-fragment based. We keep all downstream edges for
179            // selected sources because the target side can still be referenced during dispatcher
180            // reconstruction even if that target fragment is not being rescheduled.
181            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
182            .map(|(key, relation)| (*key, relation.clone()))
183            .collect();
184
185        Some(Self {
186            loaded,
187            job_extra_info,
188            upstream_fragments,
189            downstream_fragments,
190            downstream_relations,
191        })
192    }
193
194    /// Split this context into per-database contexts without cloning the large loaded graph
195    /// payloads.
196    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
197        let Self {
198            loaded,
199            job_extra_info,
200            upstream_fragments,
201            downstream_fragments,
202            downstream_relations,
203        } = self;
204
205        let mut contexts: HashMap<_, _> = loaded
206            .into_database_contexts()
207            .into_iter()
208            .map(|(database_id, loaded)| {
209                (
210                    database_id,
211                    Self {
212                        loaded,
213                        job_extra_info: HashMap::new(),
214                        upstream_fragments: HashMap::new(),
215                        downstream_fragments: HashMap::new(),
216                        downstream_relations: HashMap::new(),
217                    },
218                )
219            })
220            .collect();
221
222        if contexts.is_empty() {
223            return contexts;
224        }
225
226        let mut job_databases = HashMap::new();
227        let mut fragment_databases = HashMap::new();
228        for (&database_id, context) in &contexts {
229            for job_id in context.loaded.job_map.keys().copied() {
230                job_databases.insert(job_id, database_id);
231            }
232            for fragment_id in context
233                .loaded
234                .job_fragments
235                .values()
236                .flat_map(|fragments| fragments.keys().copied())
237            {
238                fragment_databases.insert(fragment_id, database_id);
239            }
240        }
241
242        for (job_id, info) in job_extra_info {
243            if let Some(database_id) = job_databases.get(&job_id).copied() {
244                contexts
245                    .get_mut(&database_id)
246                    .expect("database context should exist for job")
247                    .job_extra_info
248                    .insert(job_id, info);
249            }
250        }
251
252        for (fragment_id, upstreams) in upstream_fragments {
253            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
254                contexts
255                    .get_mut(&database_id)
256                    .expect("database context should exist for fragment")
257                    .upstream_fragments
258                    .insert(fragment_id, upstreams);
259            }
260        }
261
262        for (fragment_id, downstreams) in downstream_fragments {
263            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
264                contexts
265                    .get_mut(&database_id)
266                    .expect("database context should exist for fragment")
267                    .downstream_fragments
268                    .insert(fragment_id, downstreams);
269            }
270        }
271
272        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
273            // Route by source fragment ownership. A target may be outside of current reschedule
274            // set, but this edge still belongs to the source-side command.
275            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
276                contexts
277                    .get_mut(&database_id)
278                    .expect("database context should exist for relation source")
279                    .downstream_relations
280                    .insert((source_fragment_id, target_fragment_id), relation);
281            }
282        }
283
284        contexts
285    }
286}
287
288/// Replacing an old job with a new one. All actors in the job will be rebuilt.
289///
290/// Current use cases:
291/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
292/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
293///   will replace a table job's plan.
294#[derive(Debug, Clone)]
295pub struct ReplaceStreamJobPlan {
296    pub old_fragments: StreamJobFragments,
297    pub new_fragments: StreamJobFragmentsToCreate,
298    /// Downstream jobs of the replaced job need to update their `Merge` node to
299    /// connect to the new fragment.
300    pub replace_upstream: FragmentReplaceUpstream,
301    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
302    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
303    /// We need to reassign splits for it.
304    ///
305    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
306    /// `backfill_splits`.
307    pub init_split_assignment: SplitAssignment,
308    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
309    pub streaming_job: StreamingJob,
310    /// The temporary dummy job fragments id of new table fragment
311    pub tmp_id: JobId,
312    /// The state table ids to be dropped.
313    pub to_drop_state_table_ids: Vec<TableId>,
314    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
315}
316
317impl ReplaceStreamJobPlan {
318    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
319        let mut fragment_changes = HashMap::new();
320        for (fragment_id, new_fragment) in self
321            .new_fragments
322            .new_fragment_info(&self.init_split_assignment)
323        {
324            let fragment_change = CommandFragmentChanges::NewFragment {
325                job_id: self.streaming_job.id(),
326                info: new_fragment,
327            };
328            fragment_changes
329                .try_insert(fragment_id, fragment_change)
330                .expect("non-duplicate");
331        }
332        for fragment in self.old_fragments.fragments.values() {
333            fragment_changes
334                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
335                .expect("non-duplicate");
336        }
337        for (fragment_id, replace_map) in &self.replace_upstream {
338            fragment_changes
339                .try_insert(
340                    *fragment_id,
341                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
342                )
343                .expect("non-duplicate");
344        }
345        if let Some(sinks) = &self.auto_refresh_schema_sinks {
346            for sink in sinks {
347                let fragment_change = CommandFragmentChanges::NewFragment {
348                    job_id: sink.original_sink.id.as_job_id(),
349                    info: sink.new_fragment_info(),
350                };
351                fragment_changes
352                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
353                    .expect("non-duplicate");
354                fragment_changes
355                    .try_insert(
356                        sink.original_fragment.fragment_id,
357                        CommandFragmentChanges::RemoveFragment,
358                    )
359                    .expect("non-duplicate");
360            }
361        }
362        fragment_changes
363    }
364
365    /// `old_fragment_id` -> `new_fragment_id`
366    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
367        let mut fragment_replacements = HashMap::new();
368        for (upstream_fragment_id, new_upstream_fragment_id) in
369            self.replace_upstream.values().flatten()
370        {
371            {
372                let r =
373                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
374                if let Some(r) = r {
375                    assert_eq!(
376                        *new_upstream_fragment_id, r,
377                        "one fragment is replaced by multiple fragments"
378                    );
379                }
380            }
381        }
382        fragment_replacements
383    }
384}
385
386#[derive(educe::Educe, Clone)]
387#[educe(Debug)]
388pub struct CreateStreamingJobCommandInfo {
389    #[educe(Debug(ignore))]
390    pub stream_job_fragments: StreamJobFragmentsToCreate,
391    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
392    pub init_split_assignment: SplitAssignment,
393    pub definition: String,
394    pub job_type: StreamingJobType,
395    pub create_type: CreateType,
396    pub streaming_job: StreamingJob,
397    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
398    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
399    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
400    pub is_serverless: bool,
401}
402
403impl StreamJobFragments {
404    pub(super) fn new_fragment_info<'a>(
405        &'a self,
406        assignment: &'a SplitAssignment,
407    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
408        self.fragments.values().map(|fragment| {
409            let mut fragment_splits = assignment
410                .get(&fragment.fragment_id)
411                .cloned()
412                .unwrap_or_default();
413
414            (
415                fragment.fragment_id,
416                InflightFragmentInfo {
417                    fragment_id: fragment.fragment_id,
418                    distribution_type: fragment.distribution_type.into(),
419                    fragment_type_mask: fragment.fragment_type_mask,
420                    vnode_count: fragment.vnode_count(),
421                    nodes: fragment.nodes.clone(),
422                    actors: fragment
423                        .actors
424                        .iter()
425                        .map(|actor| {
426                            (
427                                actor.actor_id,
428                                InflightActorInfo {
429                                    worker_id: self
430                                        .actor_status
431                                        .get(&actor.actor_id)
432                                        .expect("should exist")
433                                        .worker_id(),
434                                    vnode_bitmap: actor.vnode_bitmap.clone(),
435                                    splits: fragment_splits
436                                        .remove(&actor.actor_id)
437                                        .unwrap_or_default(),
438                                },
439                            )
440                        })
441                        .collect(),
442                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
443                },
444            )
445        })
446    }
447}
448
449#[derive(Debug, Clone)]
450pub struct SnapshotBackfillInfo {
451    /// `table_id` -> `Some(snapshot_backfill_epoch)`
452    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
453    /// by global barrier worker when handling the command.
454    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
455}
456
457#[derive(Debug, Clone)]
458pub enum CreateStreamingJobType {
459    Normal,
460    SinkIntoTable(UpstreamSinkInfo),
461    SnapshotBackfill(SnapshotBackfillInfo),
462}
463
464/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
465/// it will [build different barriers to send](Self::to_mutation),
466/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
467// FIXME: this enum is significantly large on stack, box it
468#[derive(Debug)]
469pub enum Command {
470    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
471    /// all messages before the checkpoint barrier should have been committed.
472    Flush,
473
474    /// `Pause` command generates a `Pause` barrier **only if**
475    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
476    Pause,
477
478    /// `Resume` command generates a `Resume` barrier **only
479    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
480    /// will be generated.
481    Resume,
482
483    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
484    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
485    /// dropped by reference counts before.
486    ///
487    /// Barriers from the actors to be dropped will STILL be collected.
488    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
489    /// drop actors, and then delete the job fragments info from meta store.
490    DropStreamingJobs {
491        streaming_job_ids: HashSet<JobId>,
492        actors: Vec<ActorId>,
493        unregistered_state_table_ids: HashSet<TableId>,
494        unregistered_fragment_ids: HashSet<FragmentId>,
495        // target_fragment -> [sink_fragments]
496        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
497    },
498
499    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
500    ///
501    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
502    /// be collected since the barrier should be passthrough.
503    ///
504    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
505    /// it adds the job fragments info to meta store. However, the creating progress will **last
506    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
507    /// will be set to `Created`.
508    CreateStreamingJob {
509        info: CreateStreamingJobCommandInfo,
510        job_type: CreateStreamingJobType,
511        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
512    },
513
514    /// Reschedule context. It must be resolved inside the barrier worker before injection.
515    RescheduleIntent {
516        context: RescheduleContext,
517        /// Filled by the barrier worker after resolving `context` against current worker topology.
518        ///
519        /// We keep unresolved `context` outside of checkpoint state and only materialize this
520        /// execution plan right before injection, then drop `context` to release memory earlier.
521        reschedule_plan: Option<ReschedulePlan>,
522    },
523
524    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
525    /// essentially switching the downstream of the old job fragments to the new ones, and
526    /// dropping the old job fragments. Used for schema change.
527    ///
528    /// This can be treated as a special case of reschedule, while the upstream fragment
529    /// of the Merge executors are changed additionally.
530    ReplaceStreamJob(ReplaceStreamJobPlan),
531
532    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
533    /// changed splits.
534    SourceChangeSplit(SplitState),
535
536    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
537    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
538    Throttle {
539        jobs: HashSet<JobId>,
540        config: HashMap<FragmentId, ThrottleConfig>,
541    },
542
543    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
544    /// materialize executor to start storing old value for subscription.
545    CreateSubscription {
546        subscription_id: SubscriptionId,
547        upstream_mv_table_id: TableId,
548        retention_second: u64,
549    },
550
551    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
552    /// materialize executor to stop storing old value when there is no
553    /// subscription depending on it.
554    DropSubscription {
555        subscription_id: SubscriptionId,
556        upstream_mv_table_id: TableId,
557    },
558
559    ConnectorPropsChange(ConnectorPropsChange),
560
561    /// `Refresh` command generates a barrier to refresh a table by truncating state
562    /// and reloading data from source.
563    Refresh {
564        table_id: TableId,
565        associated_source_id: SourceId,
566    },
567    ListFinish {
568        table_id: TableId,
569        associated_source_id: SourceId,
570    },
571    LoadFinish {
572        table_id: TableId,
573        associated_source_id: SourceId,
574    },
575
576    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
577    /// Used when upstream binlog/oplog has expired.
578    ResetSource {
579        source_id: SourceId,
580    },
581
582    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
583    /// to resume for troubleshooting.
584    ResumeBackfill {
585        target: ResumeBackfillTarget,
586    },
587
588    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
589    /// into source splits (UNSAFE - admin only).
590    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
591    InjectSourceOffsets {
592        source_id: SourceId,
593        /// Split ID -> offset (JSON-encoded based on connector type)
594        split_offsets: HashMap<String, String>,
595    },
596}
597
598#[derive(Debug, Clone, Copy)]
599pub enum ResumeBackfillTarget {
600    Job(JobId),
601    Fragment(FragmentId),
602}
603
604// For debugging and observability purposes. Can add more details later if needed.
605impl std::fmt::Display for Command {
606    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
607        match self {
608            Command::Flush => write!(f, "Flush"),
609            Command::Pause => write!(f, "Pause"),
610            Command::Resume => write!(f, "Resume"),
611            Command::DropStreamingJobs {
612                streaming_job_ids, ..
613            } => {
614                write!(
615                    f,
616                    "DropStreamingJobs: {}",
617                    streaming_job_ids.iter().sorted().join(", ")
618                )
619            }
620            Command::CreateStreamingJob { info, .. } => {
621                write!(f, "CreateStreamingJob: {}", info.streaming_job)
622            }
623            Command::RescheduleIntent {
624                reschedule_plan, ..
625            } => {
626                if reschedule_plan.is_some() {
627                    write!(f, "RescheduleIntent(planned)")
628                } else {
629                    write!(f, "RescheduleIntent")
630                }
631            }
632            Command::ReplaceStreamJob(plan) => {
633                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
634            }
635            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
636            Command::Throttle { .. } => write!(f, "Throttle"),
637            Command::CreateSubscription {
638                subscription_id, ..
639            } => write!(f, "CreateSubscription: {subscription_id}"),
640            Command::DropSubscription {
641                subscription_id, ..
642            } => write!(f, "DropSubscription: {subscription_id}"),
643            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
644            Command::Refresh {
645                table_id,
646                associated_source_id,
647            } => write!(
648                f,
649                "Refresh: {} (source: {})",
650                table_id, associated_source_id
651            ),
652            Command::ListFinish {
653                table_id,
654                associated_source_id,
655            } => write!(
656                f,
657                "ListFinish: {} (source: {})",
658                table_id, associated_source_id
659            ),
660            Command::LoadFinish {
661                table_id,
662                associated_source_id,
663            } => write!(
664                f,
665                "LoadFinish: {} (source: {})",
666                table_id, associated_source_id
667            ),
668            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
669            Command::ResumeBackfill { target } => match target {
670                ResumeBackfillTarget::Job(job_id) => {
671                    write!(f, "ResumeBackfill: job={job_id}")
672                }
673                ResumeBackfillTarget::Fragment(fragment_id) => {
674                    write!(f, "ResumeBackfill: fragment={fragment_id}")
675                }
676            },
677            Command::InjectSourceOffsets {
678                source_id,
679                split_offsets,
680            } => write!(
681                f,
682                "InjectSourceOffsets: {} ({} splits)",
683                source_id,
684                split_offsets.len()
685            ),
686        }
687    }
688}
689
690impl Command {
691    pub fn pause() -> Self {
692        Self::Pause
693    }
694
695    pub fn resume() -> Self {
696        Self::Resume
697    }
698
699    #[expect(clippy::type_complexity)]
700    pub(super) fn fragment_changes(
701        &self,
702    ) -> Option<(
703        Option<(JobId, Option<CdcTableBackfillTracker>)>,
704        HashMap<FragmentId, CommandFragmentChanges>,
705    )> {
706        match self {
707            Command::Flush => None,
708            Command::Pause => None,
709            Command::Resume => None,
710            Command::DropStreamingJobs {
711                unregistered_fragment_ids,
712                dropped_sink_fragment_by_targets,
713                ..
714            } => {
715                let changes = unregistered_fragment_ids
716                    .iter()
717                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
718                    .chain(dropped_sink_fragment_by_targets.iter().map(
719                        |(target_fragment, sink_fragments)| {
720                            (
721                                *target_fragment,
722                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
723                            )
724                        },
725                    ))
726                    .collect();
727
728                Some((None, changes))
729            }
730            Command::CreateStreamingJob { info, job_type, .. } => {
731                assert!(
732                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
733                    "should handle fragment changes separately for snapshot backfill"
734                );
735                let mut changes: HashMap<_, _> = info
736                    .stream_job_fragments
737                    .new_fragment_info(&info.init_split_assignment)
738                    .map(|(fragment_id, fragment_infos)| {
739                        (
740                            fragment_id,
741                            CommandFragmentChanges::NewFragment {
742                                job_id: info.streaming_job.id(),
743                                info: fragment_infos,
744                            },
745                        )
746                    })
747                    .collect();
748
749                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
750                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
751                    changes.insert(
752                        downstream_fragment_id,
753                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
754                            upstream_fragment_id: ctx.sink_fragment_id,
755                            sink_output_schema: ctx.sink_output_fields.clone(),
756                            project_exprs: ctx.project_exprs.clone(),
757                        }),
758                    );
759                }
760
761                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
762                    let (fragment, _) =
763                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
764                            .expect("should have parallel cdc fragment");
765                    Some(CdcTableBackfillTracker::new(
766                        fragment.fragment_id,
767                        splits.clone(),
768                    ))
769                } else {
770                    None
771                };
772
773                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
774            }
775            Command::RescheduleIntent {
776                reschedule_plan, ..
777            } => {
778                let ReschedulePlan { reschedules, .. } = reschedule_plan
779                    .as_ref()
780                    .expect("reschedule intent should be resolved in global barrier worker");
781                Some((
782                    None,
783                    reschedules
784                        .iter()
785                        .map(|(fragment_id, reschedule)| {
786                            (
787                                *fragment_id,
788                                CommandFragmentChanges::Reschedule {
789                                    new_actors: reschedule
790                                        .added_actors
791                                        .iter()
792                                        .flat_map(|(node_id, actors)| {
793                                            actors.iter().map(|actor_id| {
794                                                (
795                                                    *actor_id,
796                                                    InflightActorInfo {
797                                                        worker_id: *node_id,
798                                                        vnode_bitmap: reschedule
799                                                            .newly_created_actors
800                                                            .get(actor_id)
801                                                            .expect("should exist")
802                                                            .0
803                                                            .0
804                                                            .vnode_bitmap
805                                                            .clone(),
806                                                        splits: reschedule
807                                                            .actor_splits
808                                                            .get(actor_id)
809                                                            .cloned()
810                                                            .unwrap_or_default(),
811                                                    },
812                                                )
813                                            })
814                                        })
815                                        .collect(),
816                                    actor_update_vnode_bitmap: reschedule
817                                        .vnode_bitmap_updates
818                                        .iter()
819                                        .filter(|(actor_id, _)| {
820                                            // only keep the existing actors
821                                            !reschedule.newly_created_actors.contains_key(*actor_id)
822                                        })
823                                        .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
824                                        .collect(),
825                                    to_remove: reschedule.removed_actors.iter().cloned().collect(),
826                                    actor_splits: reschedule.actor_splits.clone(),
827                                },
828                            )
829                        })
830                        .collect(),
831                ))
832            }
833            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
834            Command::SourceChangeSplit(SplitState {
835                split_assignment, ..
836            }) => Some((
837                None,
838                split_assignment
839                    .iter()
840                    .map(|(&fragment_id, splits)| {
841                        (
842                            fragment_id,
843                            CommandFragmentChanges::SplitAssignment {
844                                actor_splits: splits.clone(),
845                            },
846                        )
847                    })
848                    .collect(),
849            )),
850            Command::Throttle { .. } => None,
851            Command::CreateSubscription { .. } => None,
852            Command::DropSubscription { .. } => None,
853            Command::ConnectorPropsChange(_) => None,
854            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
855            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
856            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
857            Command::ResetSource { .. } => None, // ResetSource doesn't change fragment structure
858            Command::ResumeBackfill { .. } => None, /* ResumeBackfill doesn't change fragment structure */
859            Command::InjectSourceOffsets { .. } => None, /* InjectSourceOffsets doesn't change fragment structure */
860        }
861    }
862
863    pub fn need_checkpoint(&self) -> bool {
864        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
865        !matches!(self, Command::Resume)
866    }
867}
868
869#[derive(Debug)]
870pub enum PostCollectCommand {
871    Command(String),
872    DropStreamingJobs {
873        streaming_job_ids: HashSet<JobId>,
874        unregistered_state_table_ids: HashSet<TableId>,
875    },
876    CreateStreamingJob {
877        info: CreateStreamingJobCommandInfo,
878        job_type: CreateStreamingJobType,
879        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
880    },
881    Reschedule {
882        reschedules: HashMap<FragmentId, Reschedule>,
883    },
884    ReplaceStreamJob(ReplaceStreamJobPlan),
885    SourceChangeSplit {
886        split_assignment: SplitAssignment,
887    },
888    CreateSubscription {
889        subscription_id: SubscriptionId,
890    },
891    ConnectorPropsChange(ConnectorPropsChange),
892    ResumeBackfill {
893        target: ResumeBackfillTarget,
894    },
895}
896
897impl PostCollectCommand {
898    pub fn barrier() -> Self {
899        PostCollectCommand::Command("barrier".to_owned())
900    }
901
902    pub fn command_name(&self) -> &str {
903        match self {
904            PostCollectCommand::Command(name) => name.as_str(),
905            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
906            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
907            PostCollectCommand::Reschedule { .. } => "Reschedule",
908            PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
909            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
910            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
911            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
912            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
913        }
914    }
915}
916
917impl Display for PostCollectCommand {
918    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919        f.write_str(self.command_name())
920    }
921}
922
923impl Command {
924    pub(super) fn into_post_collect(self) -> PostCollectCommand {
925        match self {
926            Command::DropStreamingJobs {
927                streaming_job_ids,
928                unregistered_state_table_ids,
929                ..
930            } => PostCollectCommand::DropStreamingJobs {
931                streaming_job_ids,
932                unregistered_state_table_ids,
933            },
934            Command::CreateStreamingJob {
935                info,
936                job_type,
937                cross_db_snapshot_backfill_info,
938            } => match job_type {
939                CreateStreamingJobType::SnapshotBackfill(_) => PostCollectCommand::barrier(),
940                job_type => PostCollectCommand::CreateStreamingJob {
941                    info,
942                    job_type,
943                    cross_db_snapshot_backfill_info,
944                },
945            },
946            Command::RescheduleIntent {
947                reschedule_plan, ..
948            } => {
949                let ReschedulePlan { reschedules, .. } = reschedule_plan
950                    .expect("reschedule intent should be resolved in global barrier worker");
951                PostCollectCommand::Reschedule { reschedules }
952            }
953            Command::ReplaceStreamJob(plan) => PostCollectCommand::ReplaceStreamJob(plan),
954            Command::SourceChangeSplit(SplitState { split_assignment }) => {
955                PostCollectCommand::SourceChangeSplit { split_assignment }
956            }
957            Command::CreateSubscription {
958                subscription_id, ..
959            } => PostCollectCommand::CreateSubscription { subscription_id },
960            Command::ConnectorPropsChange(connector_props_change) => {
961                PostCollectCommand::ConnectorPropsChange(connector_props_change)
962            }
963            Command::Flush => PostCollectCommand::Command("Flush".to_owned()),
964            Command::Pause => PostCollectCommand::Command("Pause".to_owned()),
965            Command::Resume => PostCollectCommand::Command("Resume".to_owned()),
966            Command::Throttle { .. } => PostCollectCommand::Command("Throttle".to_owned()),
967            Command::DropSubscription { .. } => {
968                PostCollectCommand::Command("DropSubscription".to_owned())
969            }
970            Command::Refresh { .. } => PostCollectCommand::Command("Refresh".to_owned()),
971            Command::ListFinish { .. } => PostCollectCommand::Command("ListFinish".to_owned()),
972            Command::LoadFinish { .. } => PostCollectCommand::Command("LoadFinish".to_owned()),
973            Command::ResetSource { .. } => PostCollectCommand::Command("ResetSource".to_owned()),
974            Command::ResumeBackfill { target } => PostCollectCommand::ResumeBackfill { target },
975            Command::InjectSourceOffsets { .. } => {
976                PostCollectCommand::Command("InjectSourceOffsets".to_owned())
977            }
978        }
979    }
980}
981
982#[derive(Debug, Clone)]
983pub enum BarrierKind {
984    Initial,
985    Barrier,
986    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
987    Checkpoint(Vec<u64>),
988}
989
990impl BarrierKind {
991    pub fn to_protobuf(&self) -> PbBarrierKind {
992        match self {
993            BarrierKind::Initial => PbBarrierKind::Initial,
994            BarrierKind::Barrier => PbBarrierKind::Barrier,
995            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
996        }
997    }
998
999    pub fn is_checkpoint(&self) -> bool {
1000        matches!(self, BarrierKind::Checkpoint(_))
1001    }
1002
1003    pub fn is_initial(&self) -> bool {
1004        matches!(self, BarrierKind::Initial)
1005    }
1006
1007    pub fn as_str_name(&self) -> &'static str {
1008        match self {
1009            BarrierKind::Initial => "Initial",
1010            BarrierKind::Barrier => "Barrier",
1011            BarrierKind::Checkpoint(_) => "Checkpoint",
1012        }
1013    }
1014}
1015
1016/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
1017/// [`Command`].
1018pub(super) struct CommandContext {
1019    mv_subscription_max_retention: HashMap<TableId, u64>,
1020
1021    pub(super) barrier_info: BarrierInfo,
1022
1023    pub(super) table_ids_to_commit: HashSet<TableId>,
1024
1025    pub(super) command: PostCollectCommand,
1026
1027    /// The tracing span of this command.
1028    ///
1029    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
1030    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
1031    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
1032    _span: tracing::Span,
1033}
1034
1035impl std::fmt::Debug for CommandContext {
1036    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1037        f.debug_struct("CommandContext")
1038            .field("barrier_info", &self.barrier_info)
1039            .field("command", &self.command.command_name())
1040            .finish()
1041    }
1042}
1043
1044impl CommandContext {
1045    pub(super) fn new(
1046        barrier_info: BarrierInfo,
1047        mv_subscription_max_retention: HashMap<TableId, u64>,
1048        table_ids_to_commit: HashSet<TableId>,
1049        command: PostCollectCommand,
1050        span: tracing::Span,
1051    ) -> Self {
1052        Self {
1053            mv_subscription_max_retention,
1054            barrier_info,
1055            table_ids_to_commit,
1056            command,
1057            _span: span,
1058        }
1059    }
1060
1061    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
1062        let Some(truncate_timestamptz) = Timestamptz::from_secs(
1063            self.barrier_info
1064                .prev_epoch
1065                .value()
1066                .as_timestamptz()
1067                .timestamp()
1068                - retention_second as i64,
1069        ) else {
1070            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
1071            return self.barrier_info.prev_epoch.value();
1072        };
1073        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
1074    }
1075
1076    pub(super) fn collect_commit_epoch_info(
1077        &self,
1078        info: &mut CommitEpochInfo,
1079        resps: Vec<BarrierCompleteResponse>,
1080        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
1081    ) {
1082        let (
1083            sst_to_context,
1084            synced_ssts,
1085            new_table_watermarks,
1086            old_value_ssts,
1087            vector_index_adds,
1088            truncate_tables,
1089        ) = collect_resp_info(resps);
1090
1091        let new_table_fragment_infos =
1092            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
1093                assert!(!matches!(
1094                    job_type,
1095                    CreateStreamingJobType::SnapshotBackfill(_)
1096                ));
1097                let table_fragments = &info.stream_job_fragments;
1098                let mut table_ids: HashSet<_> =
1099                    table_fragments.internal_table_ids().into_iter().collect();
1100                if let Some(mv_table_id) = table_fragments.mv_table_id() {
1101                    table_ids.insert(mv_table_id);
1102                }
1103
1104                vec![NewTableFragmentInfo { table_ids }]
1105            } else {
1106                vec![]
1107            };
1108
1109        let mut mv_log_store_truncate_epoch = HashMap::new();
1110        // TODO: may collect cross db snapshot backfill
1111        let mut update_truncate_epoch =
1112            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
1113                Entry::Occupied(mut entry) => {
1114                    let prev_truncate_epoch = entry.get_mut();
1115                    if truncate_epoch < *prev_truncate_epoch {
1116                        *prev_truncate_epoch = truncate_epoch;
1117                    }
1118                }
1119                Entry::Vacant(entry) => {
1120                    entry.insert(truncate_epoch);
1121                }
1122            };
1123        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
1124            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
1125            update_truncate_epoch(*mv_table_id, truncate_epoch);
1126        }
1127        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
1128            for mv_table_id in upstream_mv_table_ids {
1129                update_truncate_epoch(mv_table_id, backfill_epoch);
1130            }
1131        }
1132
1133        let table_new_change_log = build_table_change_log_delta(
1134            old_value_ssts.into_iter(),
1135            synced_ssts.iter().map(|sst| &sst.sst_info),
1136            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
1137            mv_log_store_truncate_epoch.into_iter(),
1138        );
1139
1140        let epoch = self.barrier_info.prev_epoch();
1141        for table_id in &self.table_ids_to_commit {
1142            info.tables_to_commit
1143                .try_insert(*table_id, epoch)
1144                .expect("non duplicate");
1145        }
1146
1147        info.sstables.extend(synced_ssts);
1148        info.new_table_watermarks.extend(new_table_watermarks);
1149        info.sst_to_context.extend(sst_to_context);
1150        info.new_table_fragment_infos
1151            .extend(new_table_fragment_infos);
1152        info.change_log_delta.extend(table_new_change_log);
1153        for (table_id, vector_index_adds) in vector_index_adds {
1154            info.vector_index_delta
1155                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
1156                .expect("non-duplicate");
1157        }
1158        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
1159            && let Some(index_table) = collect_new_vector_index_info(job_info)
1160        {
1161            info.vector_index_delta
1162                .try_insert(
1163                    index_table.id,
1164                    VectorIndexDelta::Init(PbVectorIndexInit {
1165                        info: Some(index_table.vector_index_info.unwrap()),
1166                    }),
1167                )
1168                .expect("non-duplicate");
1169        }
1170        info.truncate_tables.extend(truncate_tables);
1171    }
1172}
1173
1174impl Command {
1175    /// Generate a mutation for the given command.
1176    ///
1177    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
1178    pub(super) fn to_mutation(
1179        &self,
1180        is_currently_paused: bool,
1181        edges: &mut Option<FragmentEdgeBuildResult>,
1182        control_stream_manager: &ControlStreamManager,
1183        database_info: &mut InflightDatabaseInfo,
1184    ) -> MetaResult<Option<Mutation>> {
1185        let database_id = database_info.database_id;
1186        let mutation = match self {
1187            Command::Flush => None,
1188
1189            Command::Pause => {
1190                // Only pause when the cluster is not already paused.
1191                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
1192                if !is_currently_paused {
1193                    Some(Mutation::Pause(PauseMutation {}))
1194                } else {
1195                    None
1196                }
1197            }
1198
1199            Command::Resume => {
1200                // Only resume when the cluster is paused with the same reason.
1201                if is_currently_paused {
1202                    Some(Mutation::Resume(ResumeMutation {}))
1203                } else {
1204                    None
1205                }
1206            }
1207
1208            Command::SourceChangeSplit(SplitState {
1209                split_assignment, ..
1210            }) => {
1211                let mut diff = HashMap::new();
1212
1213                for actor_splits in split_assignment.values() {
1214                    diff.extend(actor_splits.clone());
1215                }
1216
1217                Some(Mutation::Splits(SourceChangeSplitMutation {
1218                    actor_splits: build_actor_connector_splits(&diff),
1219                }))
1220            }
1221
1222            Command::Throttle { config, .. } => {
1223                let config = config.clone();
1224                Some(Mutation::Throttle(ThrottleMutation {
1225                    fragment_throttle: config,
1226                }))
1227            }
1228
1229            Command::DropStreamingJobs {
1230                actors,
1231                dropped_sink_fragment_by_targets,
1232                ..
1233            } => Some(Mutation::Stop(StopMutation {
1234                actors: actors.clone(),
1235                dropped_sink_fragments: dropped_sink_fragment_by_targets
1236                    .values()
1237                    .flatten()
1238                    .cloned()
1239                    .collect(),
1240            })),
1241
1242            Command::CreateStreamingJob {
1243                info:
1244                    CreateStreamingJobCommandInfo {
1245                        stream_job_fragments,
1246                        init_split_assignment: split_assignment,
1247                        upstream_fragment_downstreams,
1248                        fragment_backfill_ordering,
1249                        ..
1250                    },
1251                job_type,
1252                ..
1253            } => {
1254                let edges = edges.as_mut().expect("should exist");
1255                let added_actors = stream_job_fragments.actor_ids().collect();
1256                let actor_splits = split_assignment
1257                    .values()
1258                    .flat_map(build_actor_connector_splits)
1259                    .collect();
1260                let subscriptions_to_add =
1261                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1262                        job_type
1263                    {
1264                        snapshot_backfill_info
1265                            .upstream_mv_table_id_to_backfill_epoch
1266                            .keys()
1267                            .map(|table_id| SubscriptionUpstreamInfo {
1268                                subscriber_id: stream_job_fragments
1269                                    .stream_job_id()
1270                                    .as_subscriber_id(),
1271                                upstream_mv_table_id: *table_id,
1272                            })
1273                            .collect()
1274                    } else {
1275                        Default::default()
1276                    };
1277                let backfill_nodes_to_pause: Vec<_> =
1278                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1279                        .into_iter()
1280                        .collect();
1281
1282                let new_upstream_sinks =
1283                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1284                        sink_fragment_id,
1285                        sink_output_fields,
1286                        project_exprs,
1287                        new_sink_downstream,
1288                        ..
1289                    }) = job_type
1290                    {
1291                        let new_sink_actors = stream_job_fragments
1292                            .actors_to_create()
1293                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1294                            .exactly_one()
1295                            .map(|(_, _, actors)| {
1296                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1297                                    actor_id: actor.actor_id,
1298                                    host: Some(control_stream_manager.host_addr(worker_id)),
1299                                    partial_graph_id: to_partial_graph_id(database_id, None),
1300                                })
1301                            })
1302                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1303                        let new_upstream_sink = PbNewUpstreamSink {
1304                            info: Some(PbUpstreamSinkInfo {
1305                                upstream_fragment_id: *sink_fragment_id,
1306                                sink_output_schema: sink_output_fields.clone(),
1307                                project_exprs: project_exprs.clone(),
1308                            }),
1309                            upstream_actors: new_sink_actors.collect(),
1310                        };
1311                        HashMap::from([(
1312                            new_sink_downstream.downstream_fragment_id,
1313                            new_upstream_sink,
1314                        )])
1315                    } else {
1316                        HashMap::new()
1317                    };
1318
1319                let actor_cdc_table_snapshot_splits =
1320                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1321                        database_info
1322                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1323                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1324                    } else {
1325                        None
1326                    };
1327
1328                let add_mutation = AddMutation {
1329                    actor_dispatchers: edges
1330                        .dispatchers
1331                        .extract_if(|fragment_id, _| {
1332                            upstream_fragment_downstreams.contains_key(fragment_id)
1333                        })
1334                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1335                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1336                        .collect(),
1337                    added_actors,
1338                    actor_splits,
1339                    // If the cluster is already paused, the new actors should be paused too.
1340                    pause: is_currently_paused,
1341                    subscriptions_to_add,
1342                    backfill_nodes_to_pause,
1343                    actor_cdc_table_snapshot_splits,
1344                    new_upstream_sinks,
1345                };
1346
1347                Some(Mutation::Add(add_mutation))
1348            }
1349
1350            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1351                old_fragments,
1352                replace_upstream,
1353                upstream_fragment_downstreams,
1354                init_split_assignment,
1355                auto_refresh_schema_sinks,
1356                ..
1357            }) => {
1358                let edges = edges.as_mut().expect("should exist");
1359                let merge_updates = edges
1360                    .merge_updates
1361                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1362                    .collect();
1363                let dispatchers = edges
1364                    .dispatchers
1365                    .extract_if(|fragment_id, _| {
1366                        upstream_fragment_downstreams.contains_key(fragment_id)
1367                    })
1368                    .collect();
1369                let actor_cdc_table_snapshot_splits = database_info
1370                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1371                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1372                Self::generate_update_mutation_for_replace_table(
1373                    old_fragments.actor_ids().chain(
1374                        auto_refresh_schema_sinks
1375                            .as_ref()
1376                            .into_iter()
1377                            .flat_map(|sinks| {
1378                                sinks.iter().flat_map(|sink| {
1379                                    sink.original_fragment
1380                                        .actors
1381                                        .iter()
1382                                        .map(|actor| actor.actor_id)
1383                                })
1384                            }),
1385                    ),
1386                    merge_updates,
1387                    dispatchers,
1388                    init_split_assignment,
1389                    actor_cdc_table_snapshot_splits,
1390                    auto_refresh_schema_sinks.as_ref(),
1391                )
1392            }
1393
1394            Command::RescheduleIntent {
1395                reschedule_plan, ..
1396            } => {
1397                let ReschedulePlan {
1398                    reschedules,
1399                    fragment_actors,
1400                } = reschedule_plan
1401                    .as_ref()
1402                    .expect("reschedule intent should be resolved in global barrier worker");
1403                let mut dispatcher_update = HashMap::new();
1404                for reschedule in reschedules.values() {
1405                    for &(upstream_fragment_id, dispatcher_id) in
1406                        &reschedule.upstream_fragment_dispatcher_ids
1407                    {
1408                        // Find the actors of the upstream fragment.
1409                        let upstream_actor_ids = fragment_actors
1410                            .get(&upstream_fragment_id)
1411                            .expect("should contain");
1412
1413                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1414
1415                        // Record updates for all actors.
1416                        for &actor_id in upstream_actor_ids {
1417                            let added_downstream_actor_id = if upstream_reschedule
1418                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1419                                .unwrap_or(true)
1420                            {
1421                                reschedule
1422                                    .added_actors
1423                                    .values()
1424                                    .flatten()
1425                                    .cloned()
1426                                    .collect()
1427                            } else {
1428                                Default::default()
1429                            };
1430                            // Index with the dispatcher id to check duplicates.
1431                            dispatcher_update
1432                                .try_insert(
1433                                    (actor_id, dispatcher_id),
1434                                    DispatcherUpdate {
1435                                        actor_id,
1436                                        dispatcher_id,
1437                                        hash_mapping: reschedule
1438                                            .upstream_dispatcher_mapping
1439                                            .as_ref()
1440                                            .map(|m| m.to_protobuf()),
1441                                        added_downstream_actor_id,
1442                                        removed_downstream_actor_id: reschedule
1443                                            .removed_actors
1444                                            .iter()
1445                                            .cloned()
1446                                            .collect(),
1447                                    },
1448                                )
1449                                .unwrap();
1450                        }
1451                    }
1452                }
1453                let dispatcher_update = dispatcher_update.into_values().collect();
1454
1455                let mut merge_update = HashMap::new();
1456                for (&fragment_id, reschedule) in reschedules {
1457                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1458                        // Find the actors of the downstream fragment.
1459                        let downstream_actor_ids = fragment_actors
1460                            .get(&downstream_fragment_id)
1461                            .expect("should contain");
1462
1463                        // Downstream removed actors should be skipped
1464                        // Newly created actors of the current fragment will not dispatch Update
1465                        // barriers to them
1466                        let downstream_removed_actors: HashSet<_> = reschedules
1467                            .get(&downstream_fragment_id)
1468                            .map(|downstream_reschedule| {
1469                                downstream_reschedule
1470                                    .removed_actors
1471                                    .iter()
1472                                    .copied()
1473                                    .collect()
1474                            })
1475                            .unwrap_or_default();
1476
1477                        // Record updates for all actors.
1478                        for &actor_id in downstream_actor_ids {
1479                            if downstream_removed_actors.contains(&actor_id) {
1480                                continue;
1481                            }
1482
1483                            // Index with the fragment id to check duplicates.
1484                            merge_update
1485                                .try_insert(
1486                                    (actor_id, fragment_id),
1487                                    MergeUpdate {
1488                                        actor_id,
1489                                        upstream_fragment_id: fragment_id,
1490                                        new_upstream_fragment_id: None,
1491                                        added_upstream_actors: reschedule
1492                                            .added_actors
1493                                            .iter()
1494                                            .flat_map(|(worker_id, actors)| {
1495                                                let host =
1496                                                    control_stream_manager.host_addr(*worker_id);
1497                                                actors.iter().map(move |&actor_id| PbActorInfo {
1498                                                    actor_id,
1499                                                    host: Some(host.clone()),
1500                                                    // we assume that we only scale the partial graph of database
1501                                                    partial_graph_id: to_partial_graph_id(
1502                                                        database_id,
1503                                                        None,
1504                                                    ),
1505                                                })
1506                                            })
1507                                            .collect(),
1508                                        removed_upstream_actor_id: reschedule
1509                                            .removed_actors
1510                                            .iter()
1511                                            .cloned()
1512                                            .collect(),
1513                                    },
1514                                )
1515                                .unwrap();
1516                        }
1517                    }
1518                }
1519                let merge_update = merge_update.into_values().collect();
1520
1521                let mut actor_vnode_bitmap_update = HashMap::new();
1522                for reschedule in reschedules.values() {
1523                    // Record updates for all actors in this fragment.
1524                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1525                        let bitmap = bitmap.to_protobuf();
1526                        actor_vnode_bitmap_update
1527                            .try_insert(actor_id, bitmap)
1528                            .unwrap();
1529                    }
1530                }
1531                let dropped_actors = reschedules
1532                    .values()
1533                    .flat_map(|r| r.removed_actors.iter().copied())
1534                    .collect();
1535                let mut actor_splits = HashMap::new();
1536                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1537                for (fragment_id, reschedule) in reschedules {
1538                    for (actor_id, splits) in &reschedule.actor_splits {
1539                        actor_splits.insert(
1540                            *actor_id,
1541                            ConnectorSplits {
1542                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1543                            },
1544                        );
1545                    }
1546
1547                    if let Some(assignment) =
1548                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1549                    {
1550                        actor_cdc_table_snapshot_splits.extend(assignment)
1551                    }
1552                }
1553
1554                // we don't create dispatchers in reschedule scenario
1555                let actor_new_dispatchers = HashMap::new();
1556                let mutation = Mutation::Update(UpdateMutation {
1557                    dispatcher_update,
1558                    merge_update,
1559                    actor_vnode_bitmap_update,
1560                    dropped_actors,
1561                    actor_splits,
1562                    actor_new_dispatchers,
1563                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1564                        splits: actor_cdc_table_snapshot_splits,
1565                    }),
1566                    sink_schema_change: Default::default(),
1567                    subscriptions_to_drop: vec![],
1568                });
1569                tracing::debug!("update mutation: {mutation:?}");
1570                Some(mutation)
1571            }
1572
1573            Command::CreateSubscription {
1574                upstream_mv_table_id,
1575                subscription_id,
1576                ..
1577            } => Some(Mutation::Add(AddMutation {
1578                actor_dispatchers: Default::default(),
1579                added_actors: vec![],
1580                actor_splits: Default::default(),
1581                pause: false,
1582                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1583                    upstream_mv_table_id: *upstream_mv_table_id,
1584                    subscriber_id: subscription_id.as_subscriber_id(),
1585                }],
1586                backfill_nodes_to_pause: vec![],
1587                actor_cdc_table_snapshot_splits: None,
1588                new_upstream_sinks: Default::default(),
1589            })),
1590            Command::DropSubscription {
1591                upstream_mv_table_id,
1592                subscription_id,
1593            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1594                info: vec![SubscriptionUpstreamInfo {
1595                    subscriber_id: subscription_id.as_subscriber_id(),
1596                    upstream_mv_table_id: *upstream_mv_table_id,
1597                }],
1598            })),
1599            Command::ConnectorPropsChange(config) => {
1600                let mut connector_props_infos = HashMap::default();
1601                for (k, v) in config {
1602                    connector_props_infos.insert(
1603                        k.as_raw_id(),
1604                        ConnectorPropsInfo {
1605                            connector_props_info: v.clone(),
1606                        },
1607                    );
1608                }
1609                Some(Mutation::ConnectorPropsChange(
1610                    ConnectorPropsChangeMutation {
1611                        connector_props_infos,
1612                    },
1613                ))
1614            }
1615            Command::Refresh {
1616                table_id,
1617                associated_source_id,
1618            } => Some(Mutation::RefreshStart(
1619                risingwave_pb::stream_plan::RefreshStartMutation {
1620                    table_id: *table_id,
1621                    associated_source_id: *associated_source_id,
1622                },
1623            )),
1624            Command::ListFinish {
1625                table_id: _,
1626                associated_source_id,
1627            } => Some(Mutation::ListFinish(ListFinishMutation {
1628                associated_source_id: *associated_source_id,
1629            })),
1630            Command::LoadFinish {
1631                table_id: _,
1632                associated_source_id,
1633            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1634                associated_source_id: *associated_source_id,
1635            })),
1636            Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1637                risingwave_pb::stream_plan::ResetSourceMutation {
1638                    source_id: source_id.as_raw_id(),
1639                },
1640            )),
1641            Command::ResumeBackfill { target } => {
1642                let fragment_ids: HashSet<_> = match target {
1643                    ResumeBackfillTarget::Job(job_id) => {
1644                        database_info.backfill_fragment_ids_for_job(*job_id)?
1645                    }
1646                    ResumeBackfillTarget::Fragment(fragment_id) => {
1647                        if !database_info.is_backfill_fragment(*fragment_id)? {
1648                            return Err(MetaError::invalid_parameter(format!(
1649                                "fragment {} is not a backfill node",
1650                                fragment_id
1651                            )));
1652                        }
1653                        HashSet::from([*fragment_id])
1654                    }
1655                };
1656                if fragment_ids.is_empty() {
1657                    warn!(
1658                        ?target,
1659                        "resume backfill command ignored because no backfill fragments found"
1660                    );
1661                    None
1662                } else {
1663                    Some(Mutation::StartFragmentBackfill(
1664                        StartFragmentBackfillMutation {
1665                            fragment_ids: fragment_ids.into_iter().collect(),
1666                        },
1667                    ))
1668                }
1669            }
1670            Command::InjectSourceOffsets {
1671                source_id,
1672                split_offsets,
1673            } => Some(Mutation::InjectSourceOffsets(
1674                risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1675                    source_id: source_id.as_raw_id(),
1676                    split_offsets: split_offsets.clone(),
1677                },
1678            )),
1679        };
1680        Ok(mutation)
1681    }
1682
1683    pub(super) fn actors_to_create(
1684        &self,
1685        database_info: &InflightDatabaseInfo,
1686        edges: &mut Option<FragmentEdgeBuildResult>,
1687        control_stream_manager: &ControlStreamManager,
1688    ) -> Option<StreamJobActorsToCreate> {
1689        match self {
1690            Command::CreateStreamingJob { info, job_type, .. } => {
1691                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1692                    // for snapshot backfill, the actors to create is measured separately
1693                    return None;
1694                }
1695                let actors_to_create = info.stream_job_fragments.actors_to_create();
1696                let edges = edges.as_mut().expect("should exist");
1697                Some(edges.collect_actors_to_create(actors_to_create.map(
1698                    |(fragment_id, node, actors)| {
1699                        (
1700                            fragment_id,
1701                            node,
1702                            actors,
1703                            [], // no subscriber for new job to create
1704                        )
1705                    },
1706                )))
1707            }
1708            Command::RescheduleIntent {
1709                reschedule_plan, ..
1710            } => {
1711                let ReschedulePlan {
1712                    reschedules,
1713                    fragment_actors,
1714                } = reschedule_plan
1715                    .as_ref()
1716                    .expect("reschedule intent should be resolved in global barrier worker");
1717                // we assume that we only scale the actors in database partial graph
1718                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1719                    reschedules.iter().map(|(fragment_id, reschedule)| {
1720                        (
1721                            *fragment_id,
1722                            reschedule.newly_created_actors.values().map(
1723                                |((actor, dispatchers), _)| {
1724                                    (actor.actor_id, dispatchers.as_slice())
1725                                },
1726                            ),
1727                        )
1728                    }),
1729                    Some((reschedules, fragment_actors)),
1730                    database_info,
1731                    control_stream_manager,
1732                );
1733                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1734                for (fragment_id, (actor, dispatchers), worker_id) in
1735                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1736                        reschedule
1737                            .newly_created_actors
1738                            .values()
1739                            .map(|(actors, status)| (*fragment_id, actors, status))
1740                    })
1741                {
1742                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1743                    map.entry(*worker_id)
1744                        .or_default()
1745                        .entry(fragment_id)
1746                        .or_insert_with(|| {
1747                            let node = database_info.fragment(fragment_id).nodes.clone();
1748                            let subscribers =
1749                                database_info.fragment_subscribers(fragment_id).collect();
1750                            (node, vec![], subscribers)
1751                        })
1752                        .1
1753                        .push((actor.clone(), upstreams, dispatchers.clone()));
1754                }
1755                Some(map)
1756            }
1757            Command::ReplaceStreamJob(replace_table) => {
1758                let edges = edges.as_mut().expect("should exist");
1759                let mut actors = edges.collect_actors_to_create(
1760                    replace_table.new_fragments.actors_to_create().map(
1761                        |(fragment_id, node, actors)| {
1762                            (
1763                                fragment_id,
1764                                node,
1765                                actors,
1766                                database_info
1767                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1768                            )
1769                        },
1770                    ),
1771                );
1772                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1773                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1774                        (
1775                            sink.new_fragment.fragment_id,
1776                            &sink.new_fragment.nodes,
1777                            sink.new_fragment.actors.iter().map(|actor| {
1778                                (
1779                                    actor,
1780                                    sink.actor_status[&actor.actor_id]
1781                                        .location
1782                                        .as_ref()
1783                                        .unwrap()
1784                                        .worker_node_id,
1785                                )
1786                            }),
1787                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1788                        )
1789                    }));
1790                    for (worker_id, fragment_actors) in sink_actors {
1791                        actors.entry(worker_id).or_default().extend(fragment_actors);
1792                    }
1793                }
1794                Some(actors)
1795            }
1796            _ => None,
1797        }
1798    }
1799
1800    fn generate_update_mutation_for_replace_table(
1801        dropped_actors: impl IntoIterator<Item = ActorId>,
1802        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1803        dispatchers: FragmentActorDispatchers,
1804        init_split_assignment: &SplitAssignment,
1805        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1806        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1807    ) -> Option<Mutation> {
1808        let dropped_actors = dropped_actors.into_iter().collect();
1809
1810        let actor_new_dispatchers = dispatchers
1811            .into_values()
1812            .flatten()
1813            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1814            .collect();
1815
1816        let actor_splits = init_split_assignment
1817            .values()
1818            .flat_map(build_actor_connector_splits)
1819            .collect();
1820        Some(Mutation::Update(UpdateMutation {
1821            actor_new_dispatchers,
1822            merge_update: merge_updates.into_values().flatten().collect(),
1823            dropped_actors,
1824            actor_splits,
1825            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1826            sink_schema_change: auto_refresh_schema_sinks
1827                .as_ref()
1828                .into_iter()
1829                .flat_map(|sinks| {
1830                    sinks.iter().map(|sink| {
1831                        (
1832                            sink.original_sink.id.as_raw_id(),
1833                            PbSinkSchemaChange {
1834                                original_schema: sink
1835                                    .original_sink
1836                                    .columns
1837                                    .iter()
1838                                    .map(|col| PbField {
1839                                        data_type: Some(
1840                                            col.column_desc
1841                                                .as_ref()
1842                                                .unwrap()
1843                                                .column_type
1844                                                .as_ref()
1845                                                .unwrap()
1846                                                .clone(),
1847                                        ),
1848                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1849                                    })
1850                                    .collect(),
1851                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1852                                    fields: sink
1853                                        .newly_add_fields
1854                                        .iter()
1855                                        .map(|field| field.to_prost())
1856                                        .collect(),
1857                                })),
1858                            },
1859                        )
1860                    })
1861                })
1862                .collect(),
1863            ..Default::default()
1864        }))
1865    }
1866}
1867
1868impl Command {
1869    #[expect(clippy::type_complexity)]
1870    pub(super) fn collect_database_partial_graph_actor_upstreams(
1871        actor_dispatchers: impl Iterator<
1872            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1873        >,
1874        reschedule_dispatcher_update: Option<(
1875            &HashMap<FragmentId, Reschedule>,
1876            &HashMap<FragmentId, HashSet<ActorId>>,
1877        )>,
1878        database_info: &InflightDatabaseInfo,
1879        control_stream_manager: &ControlStreamManager,
1880    ) -> HashMap<ActorId, ActorUpstreams> {
1881        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1882        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1883            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1884            for (upstream_actor_id, dispatchers) in upstream_actors {
1885                let upstream_actor_location =
1886                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1887                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1888                for downstream_actor_id in dispatchers
1889                    .iter()
1890                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1891                {
1892                    actor_upstreams
1893                        .entry(*downstream_actor_id)
1894                        .or_default()
1895                        .entry(upstream_fragment_id)
1896                        .or_default()
1897                        .insert(
1898                            upstream_actor_id,
1899                            PbActorInfo {
1900                                actor_id: upstream_actor_id,
1901                                host: Some(upstream_actor_host.clone()),
1902                                partial_graph_id: to_partial_graph_id(
1903                                    database_info.database_id,
1904                                    None,
1905                                ),
1906                            },
1907                        );
1908                }
1909            }
1910        }
1911        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1912            for reschedule in reschedules.values() {
1913                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1914                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1915                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1916                    for upstream_actor_id in fragment_actors
1917                        .get(upstream_fragment_id)
1918                        .expect("should exist")
1919                    {
1920                        let upstream_actor_location =
1921                            upstream_fragment.actors[upstream_actor_id].worker_id;
1922                        let upstream_actor_host =
1923                            control_stream_manager.host_addr(upstream_actor_location);
1924                        if let Some(upstream_reschedule) = upstream_reschedule
1925                            && upstream_reschedule
1926                                .removed_actors
1927                                .contains(upstream_actor_id)
1928                        {
1929                            continue;
1930                        }
1931                        for (_, downstream_actor_id) in
1932                            reschedule
1933                                .added_actors
1934                                .iter()
1935                                .flat_map(|(worker_id, actors)| {
1936                                    actors.iter().map(|actor| (*worker_id, *actor))
1937                                })
1938                        {
1939                            actor_upstreams
1940                                .entry(downstream_actor_id)
1941                                .or_default()
1942                                .entry(*upstream_fragment_id)
1943                                .or_default()
1944                                .insert(
1945                                    *upstream_actor_id,
1946                                    PbActorInfo {
1947                                        actor_id: *upstream_actor_id,
1948                                        host: Some(upstream_actor_host.clone()),
1949                                        partial_graph_id: to_partial_graph_id(
1950                                            database_info.database_id,
1951                                            None,
1952                                        ),
1953                                    },
1954                                );
1955                        }
1956                    }
1957                }
1958            }
1959        }
1960        actor_upstreams
1961    }
1962}