risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
49    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
50    StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::InflightDatabaseInfo;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::{
72    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
73    SplitAssignment, SplitState, UpstreamSinkInfo, build_actor_connector_splits,
74};
75use crate::{MetaError, MetaResult};
76
77/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
78/// used for actor scaling or migration.
79#[derive(Debug, Clone)]
80pub struct Reschedule {
81    /// Added actors in this fragment.
82    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
83
84    /// Removed actors in this fragment.
85    pub removed_actors: HashSet<ActorId>,
86
87    /// Vnode bitmap updates for some actors in this fragment.
88    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
89
90    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
91    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
92    /// New hash mapping of the upstream dispatcher to be updated.
93    ///
94    /// This field exists only when there's upstream fragment and the current fragment is
95    /// hash-sharded.
96    pub upstream_dispatcher_mapping: Option<ActorMapping>,
97
98    /// The downstream fragments of this fragment.
99    pub downstream_fragment_ids: Vec<FragmentId>,
100
101    /// Reassigned splits for source actors.
102    /// It becomes the `actor_splits` in [`UpdateMutation`].
103    /// `Source` and `SourceBackfill` are handled together here.
104    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
105
106    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
107}
108
109#[derive(Debug, Clone)]
110pub struct ReschedulePlan {
111    pub reschedules: HashMap<FragmentId, Reschedule>,
112    /// Should contain the actor ids in upstream and downstream fragments referenced by
113    /// `reschedules`.
114    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
115}
116
117/// Preloaded context for rescheduling, built outside the barrier worker.
118#[derive(Debug, Clone)]
119pub struct RescheduleContext {
120    pub loaded: LoadedFragmentContext,
121    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
122    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
123    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
125}
126
127impl RescheduleContext {
128    pub fn empty() -> Self {
129        Self {
130            loaded: LoadedFragmentContext::default(),
131            job_extra_info: HashMap::new(),
132            upstream_fragments: HashMap::new(),
133            downstream_fragments: HashMap::new(),
134            downstream_relations: HashMap::new(),
135        }
136    }
137
138    pub fn is_empty(&self) -> bool {
139        self.loaded.is_empty()
140    }
141
142    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
143        let loaded = self.loaded.for_database(database_id)?;
144        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
145        // Use the filtered loaded context as the source of truth so every side map is pruned by
146        // the same fragment set.
147        let fragment_ids: HashSet<FragmentId> = loaded
148            .job_fragments
149            .values()
150            .flat_map(|fragments| fragments.keys().copied())
151            .collect();
152
153        let job_extra_info = self
154            .job_extra_info
155            .iter()
156            .filter(|(job_id, _)| job_ids.contains(*job_id))
157            .map(|(job_id, info)| (*job_id, info.clone()))
158            .collect();
159
160        let upstream_fragments = self
161            .upstream_fragments
162            .iter()
163            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
164            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
165            .collect();
166
167        let downstream_fragments = self
168            .downstream_fragments
169            .iter()
170            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
171            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
172            .collect();
173
174        let downstream_relations = self
175            .downstream_relations
176            .iter()
177            // Ownership of this map is source-fragment based. We keep all downstream edges for
178            // selected sources because the target side can still be referenced during dispatcher
179            // reconstruction even if that target fragment is not being rescheduled.
180            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
181            .map(|(key, relation)| (*key, relation.clone()))
182            .collect();
183
184        Some(Self {
185            loaded,
186            job_extra_info,
187            upstream_fragments,
188            downstream_fragments,
189            downstream_relations,
190        })
191    }
192
193    /// Split this context into per-database contexts without cloning the large loaded graph
194    /// payloads.
195    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
196        let Self {
197            loaded,
198            job_extra_info,
199            upstream_fragments,
200            downstream_fragments,
201            downstream_relations,
202        } = self;
203
204        let mut contexts: HashMap<_, _> = loaded
205            .into_database_contexts()
206            .into_iter()
207            .map(|(database_id, loaded)| {
208                (
209                    database_id,
210                    Self {
211                        loaded,
212                        job_extra_info: HashMap::new(),
213                        upstream_fragments: HashMap::new(),
214                        downstream_fragments: HashMap::new(),
215                        downstream_relations: HashMap::new(),
216                    },
217                )
218            })
219            .collect();
220
221        if contexts.is_empty() {
222            return contexts;
223        }
224
225        let mut job_databases = HashMap::new();
226        let mut fragment_databases = HashMap::new();
227        for (&database_id, context) in &contexts {
228            for job_id in context.loaded.job_map.keys().copied() {
229                job_databases.insert(job_id, database_id);
230            }
231            for fragment_id in context
232                .loaded
233                .job_fragments
234                .values()
235                .flat_map(|fragments| fragments.keys().copied())
236            {
237                fragment_databases.insert(fragment_id, database_id);
238            }
239        }
240
241        for (job_id, info) in job_extra_info {
242            if let Some(database_id) = job_databases.get(&job_id).copied() {
243                contexts
244                    .get_mut(&database_id)
245                    .expect("database context should exist for job")
246                    .job_extra_info
247                    .insert(job_id, info);
248            }
249        }
250
251        for (fragment_id, upstreams) in upstream_fragments {
252            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
253                contexts
254                    .get_mut(&database_id)
255                    .expect("database context should exist for fragment")
256                    .upstream_fragments
257                    .insert(fragment_id, upstreams);
258            }
259        }
260
261        for (fragment_id, downstreams) in downstream_fragments {
262            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
263                contexts
264                    .get_mut(&database_id)
265                    .expect("database context should exist for fragment")
266                    .downstream_fragments
267                    .insert(fragment_id, downstreams);
268            }
269        }
270
271        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
272            // Route by source fragment ownership. A target may be outside of current reschedule
273            // set, but this edge still belongs to the source-side command.
274            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
275                contexts
276                    .get_mut(&database_id)
277                    .expect("database context should exist for relation source")
278                    .downstream_relations
279                    .insert((source_fragment_id, target_fragment_id), relation);
280            }
281        }
282
283        contexts
284    }
285}
286
287/// Replacing an old job with a new one. All actors in the job will be rebuilt.
288///
289/// Current use cases:
290/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
291/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
292///   will replace a table job's plan.
293#[derive(Debug, Clone)]
294pub struct ReplaceStreamJobPlan {
295    pub old_fragments: StreamJobFragments,
296    pub new_fragments: StreamJobFragmentsToCreate,
297    /// Downstream jobs of the replaced job need to update their `Merge` node to
298    /// connect to the new fragment.
299    pub replace_upstream: FragmentReplaceUpstream,
300    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
301    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
302    /// We need to reassign splits for it.
303    ///
304    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
305    /// `backfill_splits`.
306    pub init_split_assignment: SplitAssignment,
307    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
308    pub streaming_job: StreamingJob,
309    /// The temporary dummy job fragments id of new table fragment
310    pub tmp_id: JobId,
311    /// The state table ids to be dropped.
312    pub to_drop_state_table_ids: Vec<TableId>,
313    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
314}
315
316impl ReplaceStreamJobPlan {
317    /// `old_fragment_id` -> `new_fragment_id`
318    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
319        let mut fragment_replacements = HashMap::new();
320        for (upstream_fragment_id, new_upstream_fragment_id) in
321            self.replace_upstream.values().flatten()
322        {
323            {
324                let r =
325                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
326                if let Some(r) = r {
327                    assert_eq!(
328                        *new_upstream_fragment_id, r,
329                        "one fragment is replaced by multiple fragments"
330                    );
331                }
332            }
333        }
334        fragment_replacements
335    }
336}
337
338#[derive(educe::Educe, Clone)]
339#[educe(Debug)]
340pub struct CreateStreamingJobCommandInfo {
341    #[educe(Debug(ignore))]
342    pub stream_job_fragments: StreamJobFragmentsToCreate,
343    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
344    pub init_split_assignment: SplitAssignment,
345    pub definition: String,
346    pub job_type: StreamingJobType,
347    pub create_type: CreateType,
348    pub streaming_job: StreamingJob,
349    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
350    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
351    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
352    pub is_serverless: bool,
353}
354
355impl StreamJobFragments {
356    pub(super) fn new_fragment_info<'a>(
357        &'a self,
358        assignment: &'a SplitAssignment,
359    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
360        self.fragments.values().map(|fragment| {
361            let mut fragment_splits = assignment
362                .get(&fragment.fragment_id)
363                .cloned()
364                .unwrap_or_default();
365
366            (
367                fragment.fragment_id,
368                InflightFragmentInfo {
369                    fragment_id: fragment.fragment_id,
370                    distribution_type: fragment.distribution_type.into(),
371                    fragment_type_mask: fragment.fragment_type_mask,
372                    vnode_count: fragment.vnode_count(),
373                    nodes: fragment.nodes.clone(),
374                    actors: fragment
375                        .actors
376                        .iter()
377                        .map(|actor| {
378                            (
379                                actor.actor_id,
380                                InflightActorInfo {
381                                    worker_id: self
382                                        .actor_status
383                                        .get(&actor.actor_id)
384                                        .expect("should exist")
385                                        .worker_id(),
386                                    vnode_bitmap: actor.vnode_bitmap.clone(),
387                                    splits: fragment_splits
388                                        .remove(&actor.actor_id)
389                                        .unwrap_or_default(),
390                                },
391                            )
392                        })
393                        .collect(),
394                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
395                },
396            )
397        })
398    }
399}
400
401#[derive(Debug, Clone)]
402pub struct SnapshotBackfillInfo {
403    /// `table_id` -> `Some(snapshot_backfill_epoch)`
404    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
405    /// by global barrier worker when handling the command.
406    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
407}
408
409#[derive(Debug, Clone)]
410pub enum CreateStreamingJobType {
411    Normal,
412    SinkIntoTable(UpstreamSinkInfo),
413    SnapshotBackfill(SnapshotBackfillInfo),
414}
415
416/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
417/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
418/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
419// FIXME: this enum is significantly large on stack, box it
420#[derive(Debug)]
421pub enum Command {
422    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
423    /// all messages before the checkpoint barrier should have been committed.
424    Flush,
425
426    /// `Pause` command generates a `Pause` barrier **only if**
427    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
428    Pause,
429
430    /// `Resume` command generates a `Resume` barrier **only
431    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
432    /// will be generated.
433    Resume,
434
435    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
436    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
437    /// dropped by reference counts before.
438    ///
439    /// Barriers from the actors to be dropped will STILL be collected.
440    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
441    /// drop actors, and then delete the job fragments info from meta store.
442    DropStreamingJobs {
443        streaming_job_ids: HashSet<JobId>,
444        actors: Vec<ActorId>,
445        unregistered_state_table_ids: HashSet<TableId>,
446        unregistered_fragment_ids: HashSet<FragmentId>,
447        // target_fragment -> [sink_fragments]
448        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
449    },
450
451    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
452    ///
453    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
454    /// be collected since the barrier should be passthrough.
455    ///
456    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
457    /// it adds the job fragments info to meta store. However, the creating progress will **last
458    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
459    /// will be set to `Created`.
460    CreateStreamingJob {
461        info: CreateStreamingJobCommandInfo,
462        job_type: CreateStreamingJobType,
463        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
464    },
465
466    /// Reschedule context. It must be resolved inside the barrier worker before injection.
467    RescheduleIntent {
468        context: RescheduleContext,
469        /// Filled by the barrier worker after resolving `context` against current worker topology.
470        ///
471        /// We keep unresolved `context` outside of checkpoint state and only materialize this
472        /// execution plan right before injection, then drop `context` to release memory earlier.
473        reschedule_plan: Option<ReschedulePlan>,
474    },
475
476    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
477    /// essentially switching the downstream of the old job fragments to the new ones, and
478    /// dropping the old job fragments. Used for schema change.
479    ///
480    /// This can be treated as a special case of reschedule, while the upstream fragment
481    /// of the Merge executors are changed additionally.
482    ReplaceStreamJob(ReplaceStreamJobPlan),
483
484    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
485    /// changed splits.
486    SourceChangeSplit(SplitState),
487
488    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
489    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
490    Throttle {
491        jobs: HashSet<JobId>,
492        config: HashMap<FragmentId, ThrottleConfig>,
493    },
494
495    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
496    /// materialize executor to start storing old value for subscription.
497    CreateSubscription {
498        subscription_id: SubscriptionId,
499        upstream_mv_table_id: TableId,
500        retention_second: u64,
501    },
502
503    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
504    /// materialize executor to stop storing old value when there is no
505    /// subscription depending on it.
506    DropSubscription {
507        subscription_id: SubscriptionId,
508        upstream_mv_table_id: TableId,
509    },
510
511    /// `AlterSubscriptionRetention` command updates the subscription retention time.
512    AlterSubscriptionRetention {
513        subscription_id: SubscriptionId,
514        upstream_mv_table_id: TableId,
515        retention_second: u64,
516    },
517
518    ConnectorPropsChange(ConnectorPropsChange),
519
520    /// `Refresh` command generates a barrier to refresh a table by truncating state
521    /// and reloading data from source.
522    Refresh {
523        table_id: TableId,
524        associated_source_id: SourceId,
525    },
526    ListFinish {
527        table_id: TableId,
528        associated_source_id: SourceId,
529    },
530    LoadFinish {
531        table_id: TableId,
532        associated_source_id: SourceId,
533    },
534
535    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
536    /// Used when upstream binlog/oplog has expired.
537    ResetSource {
538        source_id: SourceId,
539    },
540
541    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
542    /// to resume for troubleshooting.
543    ResumeBackfill {
544        target: ResumeBackfillTarget,
545    },
546
547    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
548    /// into source splits (UNSAFE - admin only).
549    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
550    InjectSourceOffsets {
551        source_id: SourceId,
552        /// Split ID -> offset (JSON-encoded based on connector type)
553        split_offsets: HashMap<String, String>,
554    },
555}
556
557#[derive(Debug, Clone, Copy)]
558pub enum ResumeBackfillTarget {
559    Job(JobId),
560    Fragment(FragmentId),
561}
562
563// For debugging and observability purposes. Can add more details later if needed.
564impl std::fmt::Display for Command {
565    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566        match self {
567            Command::Flush => write!(f, "Flush"),
568            Command::Pause => write!(f, "Pause"),
569            Command::Resume => write!(f, "Resume"),
570            Command::DropStreamingJobs {
571                streaming_job_ids, ..
572            } => {
573                write!(
574                    f,
575                    "DropStreamingJobs: {}",
576                    streaming_job_ids.iter().sorted().join(", ")
577                )
578            }
579            Command::CreateStreamingJob { info, .. } => {
580                write!(f, "CreateStreamingJob: {}", info.streaming_job)
581            }
582            Command::RescheduleIntent {
583                reschedule_plan, ..
584            } => {
585                if reschedule_plan.is_some() {
586                    write!(f, "RescheduleIntent(planned)")
587                } else {
588                    write!(f, "RescheduleIntent")
589                }
590            }
591            Command::ReplaceStreamJob(plan) => {
592                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
593            }
594            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
595            Command::Throttle { .. } => write!(f, "Throttle"),
596            Command::CreateSubscription {
597                subscription_id, ..
598            } => write!(f, "CreateSubscription: {subscription_id}"),
599            Command::DropSubscription {
600                subscription_id, ..
601            } => write!(f, "DropSubscription: {subscription_id}"),
602            Command::AlterSubscriptionRetention {
603                subscription_id,
604                retention_second,
605                ..
606            } => write!(
607                f,
608                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
609            ),
610            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
611            Command::Refresh {
612                table_id,
613                associated_source_id,
614            } => write!(
615                f,
616                "Refresh: {} (source: {})",
617                table_id, associated_source_id
618            ),
619            Command::ListFinish {
620                table_id,
621                associated_source_id,
622            } => write!(
623                f,
624                "ListFinish: {} (source: {})",
625                table_id, associated_source_id
626            ),
627            Command::LoadFinish {
628                table_id,
629                associated_source_id,
630            } => write!(
631                f,
632                "LoadFinish: {} (source: {})",
633                table_id, associated_source_id
634            ),
635            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
636            Command::ResumeBackfill { target } => match target {
637                ResumeBackfillTarget::Job(job_id) => {
638                    write!(f, "ResumeBackfill: job={job_id}")
639                }
640                ResumeBackfillTarget::Fragment(fragment_id) => {
641                    write!(f, "ResumeBackfill: fragment={fragment_id}")
642                }
643            },
644            Command::InjectSourceOffsets {
645                source_id,
646                split_offsets,
647            } => write!(
648                f,
649                "InjectSourceOffsets: {} ({} splits)",
650                source_id,
651                split_offsets.len()
652            ),
653        }
654    }
655}
656
657impl Command {
658    pub fn pause() -> Self {
659        Self::Pause
660    }
661
662    pub fn resume() -> Self {
663        Self::Resume
664    }
665
666    pub fn need_checkpoint(&self) -> bool {
667        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
668        !matches!(self, Command::Resume)
669    }
670}
671
672#[derive(Debug)]
673pub enum PostCollectCommand {
674    Command(String),
675    DropStreamingJobs {
676        streaming_job_ids: HashSet<JobId>,
677        unregistered_state_table_ids: HashSet<TableId>,
678    },
679    CreateStreamingJob {
680        info: CreateStreamingJobCommandInfo,
681        job_type: CreateStreamingJobType,
682        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
683    },
684    Reschedule {
685        reschedules: HashMap<FragmentId, Reschedule>,
686    },
687    ReplaceStreamJob(ReplaceStreamJobPlan),
688    SourceChangeSplit {
689        split_assignment: SplitAssignment,
690    },
691    CreateSubscription {
692        subscription_id: SubscriptionId,
693    },
694    ConnectorPropsChange(ConnectorPropsChange),
695    ResumeBackfill {
696        target: ResumeBackfillTarget,
697    },
698}
699
700impl PostCollectCommand {
701    pub fn barrier() -> Self {
702        PostCollectCommand::Command("barrier".to_owned())
703    }
704
705    pub fn command_name(&self) -> &str {
706        match self {
707            PostCollectCommand::Command(name) => name.as_str(),
708            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
709            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
710            PostCollectCommand::Reschedule { .. } => "Reschedule",
711            PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
712            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
713            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
714            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
715            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
716        }
717    }
718}
719
720impl Display for PostCollectCommand {
721    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
722        f.write_str(self.command_name())
723    }
724}
725
726#[derive(Debug, Clone)]
727pub enum BarrierKind {
728    Initial,
729    Barrier,
730    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
731    Checkpoint(Vec<u64>),
732}
733
734impl BarrierKind {
735    pub fn to_protobuf(&self) -> PbBarrierKind {
736        match self {
737            BarrierKind::Initial => PbBarrierKind::Initial,
738            BarrierKind::Barrier => PbBarrierKind::Barrier,
739            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
740        }
741    }
742
743    pub fn is_checkpoint(&self) -> bool {
744        matches!(self, BarrierKind::Checkpoint(_))
745    }
746
747    pub fn is_initial(&self) -> bool {
748        matches!(self, BarrierKind::Initial)
749    }
750
751    pub fn as_str_name(&self) -> &'static str {
752        match self {
753            BarrierKind::Initial => "Initial",
754            BarrierKind::Barrier => "Barrier",
755            BarrierKind::Checkpoint(_) => "Checkpoint",
756        }
757    }
758}
759
760/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
761/// [`Command`].
762pub(super) struct CommandContext {
763    mv_subscription_max_retention: HashMap<TableId, u64>,
764
765    pub(super) barrier_info: BarrierInfo,
766
767    pub(super) table_ids_to_commit: HashSet<TableId>,
768
769    pub(super) command: PostCollectCommand,
770
771    /// The tracing span of this command.
772    ///
773    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
774    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
775    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
776    _span: tracing::Span,
777}
778
779impl std::fmt::Debug for CommandContext {
780    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
781        f.debug_struct("CommandContext")
782            .field("barrier_info", &self.barrier_info)
783            .field("command", &self.command.command_name())
784            .finish()
785    }
786}
787
788impl CommandContext {
789    pub(super) fn new(
790        barrier_info: BarrierInfo,
791        mv_subscription_max_retention: HashMap<TableId, u64>,
792        table_ids_to_commit: HashSet<TableId>,
793        command: PostCollectCommand,
794        span: tracing::Span,
795    ) -> Self {
796        Self {
797            mv_subscription_max_retention,
798            barrier_info,
799            table_ids_to_commit,
800            command,
801            _span: span,
802        }
803    }
804
805    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
806        let Some(truncate_timestamptz) = Timestamptz::from_secs(
807            self.barrier_info
808                .prev_epoch
809                .value()
810                .as_timestamptz()
811                .timestamp()
812                - retention_second as i64,
813        ) else {
814            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
815            return self.barrier_info.prev_epoch.value();
816        };
817        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
818    }
819
820    pub(super) fn collect_commit_epoch_info(
821        &self,
822        info: &mut CommitEpochInfo,
823        resps: Vec<BarrierCompleteResponse>,
824        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
825    ) {
826        let (
827            sst_to_context,
828            synced_ssts,
829            new_table_watermarks,
830            old_value_ssts,
831            vector_index_adds,
832            truncate_tables,
833        ) = collect_resp_info(resps);
834
835        let new_table_fragment_infos =
836            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
837                assert!(!matches!(
838                    job_type,
839                    CreateStreamingJobType::SnapshotBackfill(_)
840                ));
841                let table_fragments = &info.stream_job_fragments;
842                let mut table_ids: HashSet<_> =
843                    table_fragments.internal_table_ids().into_iter().collect();
844                if let Some(mv_table_id) = table_fragments.mv_table_id() {
845                    table_ids.insert(mv_table_id);
846                }
847
848                vec![NewTableFragmentInfo { table_ids }]
849            } else {
850                vec![]
851            };
852
853        let mut mv_log_store_truncate_epoch = HashMap::new();
854        // TODO: may collect cross db snapshot backfill
855        let mut update_truncate_epoch =
856            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
857                Entry::Occupied(mut entry) => {
858                    let prev_truncate_epoch = entry.get_mut();
859                    if truncate_epoch < *prev_truncate_epoch {
860                        *prev_truncate_epoch = truncate_epoch;
861                    }
862                }
863                Entry::Vacant(entry) => {
864                    entry.insert(truncate_epoch);
865                }
866            };
867        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
868            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
869            update_truncate_epoch(*mv_table_id, truncate_epoch);
870        }
871        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
872            for mv_table_id in upstream_mv_table_ids {
873                update_truncate_epoch(mv_table_id, backfill_epoch);
874            }
875        }
876
877        let table_new_change_log = build_table_change_log_delta(
878            old_value_ssts.into_iter(),
879            synced_ssts.iter().map(|sst| &sst.sst_info),
880            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
881            mv_log_store_truncate_epoch.into_iter(),
882        );
883
884        let epoch = self.barrier_info.prev_epoch();
885        for table_id in &self.table_ids_to_commit {
886            info.tables_to_commit
887                .try_insert(*table_id, epoch)
888                .expect("non duplicate");
889        }
890
891        info.sstables.extend(synced_ssts);
892        info.new_table_watermarks.extend(new_table_watermarks);
893        info.sst_to_context.extend(sst_to_context);
894        info.new_table_fragment_infos
895            .extend(new_table_fragment_infos);
896        info.change_log_delta.extend(table_new_change_log);
897        for (table_id, vector_index_adds) in vector_index_adds {
898            info.vector_index_delta
899                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
900                .expect("non-duplicate");
901        }
902        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
903            && let Some(index_table) = collect_new_vector_index_info(job_info)
904        {
905            info.vector_index_delta
906                .try_insert(
907                    index_table.id,
908                    VectorIndexDelta::Init(PbVectorIndexInit {
909                        info: Some(index_table.vector_index_info.unwrap()),
910                    }),
911                )
912                .expect("non-duplicate");
913        }
914        info.truncate_tables.extend(truncate_tables);
915    }
916}
917
918impl Command {
919    /// Build the `Pause` mutation.
920    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
921        {
922            {
923                // Only pause when the cluster is not already paused.
924                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
925                if !is_currently_paused {
926                    Some(Mutation::Pause(PauseMutation {}))
927                } else {
928                    None
929                }
930            }
931        }
932    }
933
934    /// Build the `Resume` mutation.
935    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
936        {
937            {
938                // Only resume when the cluster is paused with the same reason.
939                if is_currently_paused {
940                    Some(Mutation::Resume(ResumeMutation {}))
941                } else {
942                    None
943                }
944            }
945        }
946    }
947
948    /// Build the `Splits` mutation for `SourceChangeSplit`.
949    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
950        {
951            {
952                let mut diff = HashMap::new();
953
954                for actor_splits in split_assignment.values() {
955                    diff.extend(actor_splits.clone());
956                }
957
958                Mutation::Splits(SourceChangeSplitMutation {
959                    actor_splits: build_actor_connector_splits(&diff),
960                })
961            }
962        }
963    }
964
965    /// Build the `Throttle` mutation.
966    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
967        {
968            {
969                let config = config.clone();
970                Mutation::Throttle(ThrottleMutation {
971                    fragment_throttle: config,
972                })
973            }
974        }
975    }
976
977    /// Build the `Stop` mutation for `DropStreamingJobs`.
978    pub(super) fn drop_streaming_jobs_to_mutation(
979        actors: &Vec<ActorId>,
980        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
981    ) -> Mutation {
982        {
983            Mutation::Stop(StopMutation {
984                actors: actors.clone(),
985                dropped_sink_fragments: dropped_sink_fragment_by_targets
986                    .values()
987                    .flatten()
988                    .cloned()
989                    .collect(),
990            })
991        }
992    }
993
994    /// Build the `Add` mutation for `CreateStreamingJob`.
995    pub(super) fn create_streaming_job_to_mutation(
996        info: &CreateStreamingJobCommandInfo,
997        job_type: &CreateStreamingJobType,
998        is_currently_paused: bool,
999        edges: &mut FragmentEdgeBuildResult,
1000        control_stream_manager: &ControlStreamManager,
1001        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1002    ) -> MetaResult<Mutation> {
1003        {
1004            {
1005                let CreateStreamingJobCommandInfo {
1006                    stream_job_fragments,
1007                    init_split_assignment: split_assignment,
1008                    upstream_fragment_downstreams,
1009                    fragment_backfill_ordering,
1010                    streaming_job,
1011                    ..
1012                } = info;
1013                let database_id = streaming_job.database_id();
1014                let added_actors = stream_job_fragments.actor_ids().collect();
1015                let actor_splits = split_assignment
1016                    .values()
1017                    .flat_map(build_actor_connector_splits)
1018                    .collect();
1019                let subscriptions_to_add =
1020                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1021                        job_type
1022                    {
1023                        snapshot_backfill_info
1024                            .upstream_mv_table_id_to_backfill_epoch
1025                            .keys()
1026                            .map(|table_id| SubscriptionUpstreamInfo {
1027                                subscriber_id: stream_job_fragments
1028                                    .stream_job_id()
1029                                    .as_subscriber_id(),
1030                                upstream_mv_table_id: *table_id,
1031                            })
1032                            .collect()
1033                    } else {
1034                        Default::default()
1035                    };
1036                let backfill_nodes_to_pause: Vec<_> =
1037                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1038                        .into_iter()
1039                        .collect();
1040
1041                let new_upstream_sinks =
1042                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1043                        sink_fragment_id,
1044                        sink_output_fields,
1045                        project_exprs,
1046                        new_sink_downstream,
1047                        ..
1048                    }) = job_type
1049                    {
1050                        let new_sink_actors = stream_job_fragments
1051                            .actors_to_create()
1052                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1053                            .exactly_one()
1054                            .map(|(_, _, actors)| {
1055                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1056                                    actor_id: actor.actor_id,
1057                                    host: Some(control_stream_manager.host_addr(worker_id)),
1058                                    partial_graph_id: to_partial_graph_id(database_id, None),
1059                                })
1060                            })
1061                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1062                        let new_upstream_sink = PbNewUpstreamSink {
1063                            info: Some(PbUpstreamSinkInfo {
1064                                upstream_fragment_id: *sink_fragment_id,
1065                                sink_output_schema: sink_output_fields.clone(),
1066                                project_exprs: project_exprs.clone(),
1067                            }),
1068                            upstream_actors: new_sink_actors.collect(),
1069                        };
1070                        HashMap::from([(
1071                            new_sink_downstream.downstream_fragment_id,
1072                            new_upstream_sink,
1073                        )])
1074                    } else {
1075                        HashMap::new()
1076                    };
1077
1078                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1079                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1080
1081                let add_mutation = AddMutation {
1082                    actor_dispatchers: edges
1083                        .dispatchers
1084                        .extract_if(|fragment_id, _| {
1085                            upstream_fragment_downstreams.contains_key(fragment_id)
1086                        })
1087                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1088                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1089                        .collect(),
1090                    added_actors,
1091                    actor_splits,
1092                    // If the cluster is already paused, the new actors should be paused too.
1093                    pause: is_currently_paused,
1094                    subscriptions_to_add,
1095                    backfill_nodes_to_pause,
1096                    actor_cdc_table_snapshot_splits,
1097                    new_upstream_sinks,
1098                };
1099
1100                Ok(Mutation::Add(add_mutation))
1101            }
1102        }
1103    }
1104
1105    /// Build the `Update` mutation for `ReplaceStreamJob`.
1106    pub(super) fn replace_stream_job_to_mutation(
1107        ReplaceStreamJobPlan {
1108            old_fragments,
1109            replace_upstream,
1110            upstream_fragment_downstreams,
1111            init_split_assignment,
1112            auto_refresh_schema_sinks,
1113            ..
1114        }: &ReplaceStreamJobPlan,
1115        edges: &mut FragmentEdgeBuildResult,
1116        database_info: &mut InflightDatabaseInfo,
1117    ) -> MetaResult<Option<Mutation>> {
1118        {
1119            {
1120                let merge_updates = edges
1121                    .merge_updates
1122                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1123                    .collect();
1124                let dispatchers = edges
1125                    .dispatchers
1126                    .extract_if(|fragment_id, _| {
1127                        upstream_fragment_downstreams.contains_key(fragment_id)
1128                    })
1129                    .collect();
1130                let actor_cdc_table_snapshot_splits = database_info
1131                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1132                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1133                Ok(Self::generate_update_mutation_for_replace_table(
1134                    old_fragments.actor_ids().chain(
1135                        auto_refresh_schema_sinks
1136                            .as_ref()
1137                            .into_iter()
1138                            .flat_map(|sinks| {
1139                                sinks.iter().flat_map(|sink| {
1140                                    sink.original_fragment
1141                                        .actors
1142                                        .iter()
1143                                        .map(|actor| actor.actor_id)
1144                                })
1145                            }),
1146                    ),
1147                    merge_updates,
1148                    dispatchers,
1149                    init_split_assignment,
1150                    actor_cdc_table_snapshot_splits,
1151                    auto_refresh_schema_sinks.as_ref(),
1152                ))
1153            }
1154        }
1155    }
1156
1157    /// Build the `Update` mutation for `RescheduleIntent`.
1158    pub(super) fn reschedule_to_mutation(
1159        reschedules: &HashMap<FragmentId, Reschedule>,
1160        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1161        control_stream_manager: &ControlStreamManager,
1162        database_info: &mut InflightDatabaseInfo,
1163    ) -> MetaResult<Option<Mutation>> {
1164        {
1165            {
1166                let database_id = database_info.database_id;
1167                let mut dispatcher_update = HashMap::new();
1168                for reschedule in reschedules.values() {
1169                    for &(upstream_fragment_id, dispatcher_id) in
1170                        &reschedule.upstream_fragment_dispatcher_ids
1171                    {
1172                        // Find the actors of the upstream fragment.
1173                        let upstream_actor_ids = fragment_actors
1174                            .get(&upstream_fragment_id)
1175                            .expect("should contain");
1176
1177                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1178
1179                        // Record updates for all actors.
1180                        for &actor_id in upstream_actor_ids {
1181                            let added_downstream_actor_id = if upstream_reschedule
1182                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1183                                .unwrap_or(true)
1184                            {
1185                                reschedule
1186                                    .added_actors
1187                                    .values()
1188                                    .flatten()
1189                                    .cloned()
1190                                    .collect()
1191                            } else {
1192                                Default::default()
1193                            };
1194                            // Index with the dispatcher id to check duplicates.
1195                            dispatcher_update
1196                                .try_insert(
1197                                    (actor_id, dispatcher_id),
1198                                    DispatcherUpdate {
1199                                        actor_id,
1200                                        dispatcher_id,
1201                                        hash_mapping: reschedule
1202                                            .upstream_dispatcher_mapping
1203                                            .as_ref()
1204                                            .map(|m| m.to_protobuf()),
1205                                        added_downstream_actor_id,
1206                                        removed_downstream_actor_id: reschedule
1207                                            .removed_actors
1208                                            .iter()
1209                                            .cloned()
1210                                            .collect(),
1211                                    },
1212                                )
1213                                .unwrap();
1214                        }
1215                    }
1216                }
1217                let dispatcher_update = dispatcher_update.into_values().collect();
1218
1219                let mut merge_update = HashMap::new();
1220                for (&fragment_id, reschedule) in reschedules {
1221                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1222                        // Find the actors of the downstream fragment.
1223                        let downstream_actor_ids = fragment_actors
1224                            .get(&downstream_fragment_id)
1225                            .expect("should contain");
1226
1227                        // Downstream removed actors should be skipped
1228                        // Newly created actors of the current fragment will not dispatch Update
1229                        // barriers to them
1230                        let downstream_removed_actors: HashSet<_> = reschedules
1231                            .get(&downstream_fragment_id)
1232                            .map(|downstream_reschedule| {
1233                                downstream_reschedule
1234                                    .removed_actors
1235                                    .iter()
1236                                    .copied()
1237                                    .collect()
1238                            })
1239                            .unwrap_or_default();
1240
1241                        // Record updates for all actors.
1242                        for &actor_id in downstream_actor_ids {
1243                            if downstream_removed_actors.contains(&actor_id) {
1244                                continue;
1245                            }
1246
1247                            // Index with the fragment id to check duplicates.
1248                            merge_update
1249                                .try_insert(
1250                                    (actor_id, fragment_id),
1251                                    MergeUpdate {
1252                                        actor_id,
1253                                        upstream_fragment_id: fragment_id,
1254                                        new_upstream_fragment_id: None,
1255                                        added_upstream_actors: reschedule
1256                                            .added_actors
1257                                            .iter()
1258                                            .flat_map(|(worker_id, actors)| {
1259                                                let host =
1260                                                    control_stream_manager.host_addr(*worker_id);
1261                                                actors.iter().map(move |&actor_id| PbActorInfo {
1262                                                    actor_id,
1263                                                    host: Some(host.clone()),
1264                                                    // we assume that we only scale the partial graph of database
1265                                                    partial_graph_id: to_partial_graph_id(
1266                                                        database_id,
1267                                                        None,
1268                                                    ),
1269                                                })
1270                                            })
1271                                            .collect(),
1272                                        removed_upstream_actor_id: reschedule
1273                                            .removed_actors
1274                                            .iter()
1275                                            .cloned()
1276                                            .collect(),
1277                                    },
1278                                )
1279                                .unwrap();
1280                        }
1281                    }
1282                }
1283                let merge_update = merge_update.into_values().collect();
1284
1285                let mut actor_vnode_bitmap_update = HashMap::new();
1286                for reschedule in reschedules.values() {
1287                    // Record updates for all actors in this fragment.
1288                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1289                        let bitmap = bitmap.to_protobuf();
1290                        actor_vnode_bitmap_update
1291                            .try_insert(actor_id, bitmap)
1292                            .unwrap();
1293                    }
1294                }
1295                let dropped_actors = reschedules
1296                    .values()
1297                    .flat_map(|r| r.removed_actors.iter().copied())
1298                    .collect();
1299                let mut actor_splits = HashMap::new();
1300                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1301                for (fragment_id, reschedule) in reschedules {
1302                    for (actor_id, splits) in &reschedule.actor_splits {
1303                        actor_splits.insert(
1304                            *actor_id,
1305                            ConnectorSplits {
1306                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1307                            },
1308                        );
1309                    }
1310
1311                    if let Some(assignment) =
1312                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1313                    {
1314                        actor_cdc_table_snapshot_splits.extend(assignment)
1315                    }
1316                }
1317
1318                // we don't create dispatchers in reschedule scenario
1319                let actor_new_dispatchers = HashMap::new();
1320                let mutation = Mutation::Update(UpdateMutation {
1321                    dispatcher_update,
1322                    merge_update,
1323                    actor_vnode_bitmap_update,
1324                    dropped_actors,
1325                    actor_splits,
1326                    actor_new_dispatchers,
1327                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1328                        splits: actor_cdc_table_snapshot_splits,
1329                    }),
1330                    sink_schema_change: Default::default(),
1331                    subscriptions_to_drop: vec![],
1332                });
1333                tracing::debug!("update mutation: {mutation:?}");
1334                Ok(Some(mutation))
1335            }
1336        }
1337    }
1338
1339    /// Build the `Add` mutation for `CreateSubscription`.
1340    pub(super) fn create_subscription_to_mutation(
1341        upstream_mv_table_id: TableId,
1342        subscription_id: SubscriptionId,
1343    ) -> Mutation {
1344        {
1345            Mutation::Add(AddMutation {
1346                actor_dispatchers: Default::default(),
1347                added_actors: vec![],
1348                actor_splits: Default::default(),
1349                pause: false,
1350                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1351                    upstream_mv_table_id,
1352                    subscriber_id: subscription_id.as_subscriber_id(),
1353                }],
1354                backfill_nodes_to_pause: vec![],
1355                actor_cdc_table_snapshot_splits: None,
1356                new_upstream_sinks: Default::default(),
1357            })
1358        }
1359    }
1360
1361    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1362    pub(super) fn drop_subscription_to_mutation(
1363        upstream_mv_table_id: TableId,
1364        subscription_id: SubscriptionId,
1365    ) -> Mutation {
1366        {
1367            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1368                info: vec![SubscriptionUpstreamInfo {
1369                    subscriber_id: subscription_id.as_subscriber_id(),
1370                    upstream_mv_table_id,
1371                }],
1372            })
1373        }
1374    }
1375
1376    /// Build the `ConnectorPropsChange` mutation.
1377    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1378        {
1379            {
1380                let mut connector_props_infos = HashMap::default();
1381                for (k, v) in config {
1382                    connector_props_infos.insert(
1383                        k.as_raw_id(),
1384                        ConnectorPropsInfo {
1385                            connector_props_info: v.clone(),
1386                        },
1387                    );
1388                }
1389                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1390                    connector_props_infos,
1391                })
1392            }
1393        }
1394    }
1395
1396    /// Build the `RefreshStart` mutation.
1397    pub(super) fn refresh_to_mutation(
1398        table_id: TableId,
1399        associated_source_id: SourceId,
1400    ) -> Mutation {
1401        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1402            table_id,
1403            associated_source_id,
1404        })
1405    }
1406
1407    /// Build the `ListFinish` mutation.
1408    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1409        Mutation::ListFinish(ListFinishMutation {
1410            associated_source_id,
1411        })
1412    }
1413
1414    /// Build the `LoadFinish` mutation.
1415    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1416        Mutation::LoadFinish(LoadFinishMutation {
1417            associated_source_id,
1418        })
1419    }
1420
1421    /// Build the `ResetSource` mutation.
1422    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1423        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1424            source_id: source_id.as_raw_id(),
1425        })
1426    }
1427
1428    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1429    pub(super) fn resume_backfill_to_mutation(
1430        target: &ResumeBackfillTarget,
1431        database_info: &InflightDatabaseInfo,
1432    ) -> MetaResult<Option<Mutation>> {
1433        {
1434            {
1435                let fragment_ids: HashSet<_> = match target {
1436                    ResumeBackfillTarget::Job(job_id) => {
1437                        database_info.backfill_fragment_ids_for_job(*job_id)?
1438                    }
1439                    ResumeBackfillTarget::Fragment(fragment_id) => {
1440                        if !database_info.is_backfill_fragment(*fragment_id)? {
1441                            return Err(MetaError::invalid_parameter(format!(
1442                                "fragment {} is not a backfill node",
1443                                fragment_id
1444                            )));
1445                        }
1446                        HashSet::from([*fragment_id])
1447                    }
1448                };
1449                if fragment_ids.is_empty() {
1450                    warn!(
1451                        ?target,
1452                        "resume backfill command ignored because no backfill fragments found"
1453                    );
1454                    Ok(None)
1455                } else {
1456                    Ok(Some(Mutation::StartFragmentBackfill(
1457                        StartFragmentBackfillMutation {
1458                            fragment_ids: fragment_ids.into_iter().collect(),
1459                        },
1460                    )))
1461                }
1462            }
1463        }
1464    }
1465
1466    /// Build the `InjectSourceOffsets` mutation.
1467    pub(super) fn inject_source_offsets_to_mutation(
1468        source_id: SourceId,
1469        split_offsets: &HashMap<String, String>,
1470    ) -> Mutation {
1471        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1472            source_id: source_id.as_raw_id(),
1473            split_offsets: split_offsets.clone(),
1474        })
1475    }
1476
1477    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1478    pub(super) fn create_streaming_job_actors_to_create(
1479        info: &CreateStreamingJobCommandInfo,
1480        edges: &mut FragmentEdgeBuildResult,
1481    ) -> StreamJobActorsToCreate {
1482        {
1483            {
1484                let actors_to_create = info.stream_job_fragments.actors_to_create();
1485                edges.collect_actors_to_create(actors_to_create.map(
1486                    |(fragment_id, node, actors)| {
1487                        (
1488                            fragment_id,
1489                            node,
1490                            actors,
1491                            [], // no subscriber for new job to create
1492                        )
1493                    },
1494                ))
1495            }
1496        }
1497    }
1498
1499    /// Collect actors to create for `RescheduleIntent`.
1500    pub(super) fn reschedule_actors_to_create(
1501        reschedules: &HashMap<FragmentId, Reschedule>,
1502        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1503        database_info: &InflightDatabaseInfo,
1504        control_stream_manager: &ControlStreamManager,
1505    ) -> StreamJobActorsToCreate {
1506        {
1507            {
1508                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1509                    reschedules.iter().map(|(fragment_id, reschedule)| {
1510                        (
1511                            *fragment_id,
1512                            reschedule.newly_created_actors.values().map(
1513                                |((actor, dispatchers), _)| {
1514                                    (actor.actor_id, dispatchers.as_slice())
1515                                },
1516                            ),
1517                        )
1518                    }),
1519                    Some((reschedules, fragment_actors)),
1520                    database_info,
1521                    control_stream_manager,
1522                );
1523                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1524                for (fragment_id, (actor, dispatchers), worker_id) in
1525                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1526                        reschedule
1527                            .newly_created_actors
1528                            .values()
1529                            .map(|(actors, status)| (*fragment_id, actors, status))
1530                    })
1531                {
1532                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1533                    map.entry(*worker_id)
1534                        .or_default()
1535                        .entry(fragment_id)
1536                        .or_insert_with(|| {
1537                            let node = database_info.fragment(fragment_id).nodes.clone();
1538                            let subscribers =
1539                                database_info.fragment_subscribers(fragment_id).collect();
1540                            (node, vec![], subscribers)
1541                        })
1542                        .1
1543                        .push((actor.clone(), upstreams, dispatchers.clone()));
1544                }
1545                map
1546            }
1547        }
1548    }
1549
1550    /// Collect actors to create for `ReplaceStreamJob`.
1551    pub(super) fn replace_stream_job_actors_to_create(
1552        replace_table: &ReplaceStreamJobPlan,
1553        edges: &mut FragmentEdgeBuildResult,
1554        database_info: &InflightDatabaseInfo,
1555    ) -> StreamJobActorsToCreate {
1556        {
1557            {
1558                let mut actors = edges.collect_actors_to_create(
1559                    replace_table.new_fragments.actors_to_create().map(
1560                        |(fragment_id, node, actors)| {
1561                            (
1562                                fragment_id,
1563                                node,
1564                                actors,
1565                                database_info
1566                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1567                            )
1568                        },
1569                    ),
1570                );
1571                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1572                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1573                        (
1574                            sink.new_fragment.fragment_id,
1575                            &sink.new_fragment.nodes,
1576                            sink.new_fragment.actors.iter().map(|actor| {
1577                                (
1578                                    actor,
1579                                    sink.actor_status[&actor.actor_id]
1580                                        .location
1581                                        .as_ref()
1582                                        .unwrap()
1583                                        .worker_node_id,
1584                                )
1585                            }),
1586                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1587                        )
1588                    }));
1589                    for (worker_id, fragment_actors) in sink_actors {
1590                        actors.entry(worker_id).or_default().extend(fragment_actors);
1591                    }
1592                }
1593                actors
1594            }
1595        }
1596    }
1597
1598    fn generate_update_mutation_for_replace_table(
1599        dropped_actors: impl IntoIterator<Item = ActorId>,
1600        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1601        dispatchers: FragmentActorDispatchers,
1602        init_split_assignment: &SplitAssignment,
1603        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1604        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1605    ) -> Option<Mutation> {
1606        let dropped_actors = dropped_actors.into_iter().collect();
1607
1608        let actor_new_dispatchers = dispatchers
1609            .into_values()
1610            .flatten()
1611            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1612            .collect();
1613
1614        let actor_splits = init_split_assignment
1615            .values()
1616            .flat_map(build_actor_connector_splits)
1617            .collect();
1618        Some(Mutation::Update(UpdateMutation {
1619            actor_new_dispatchers,
1620            merge_update: merge_updates.into_values().flatten().collect(),
1621            dropped_actors,
1622            actor_splits,
1623            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1624            sink_schema_change: auto_refresh_schema_sinks
1625                .as_ref()
1626                .into_iter()
1627                .flat_map(|sinks| {
1628                    sinks.iter().map(|sink| {
1629                        (
1630                            sink.original_sink.id.as_raw_id(),
1631                            PbSinkSchemaChange {
1632                                original_schema: sink
1633                                    .original_sink
1634                                    .columns
1635                                    .iter()
1636                                    .map(|col| PbField {
1637                                        data_type: Some(
1638                                            col.column_desc
1639                                                .as_ref()
1640                                                .unwrap()
1641                                                .column_type
1642                                                .as_ref()
1643                                                .unwrap()
1644                                                .clone(),
1645                                        ),
1646                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1647                                    })
1648                                    .collect(),
1649                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1650                                    fields: sink
1651                                        .newly_add_fields
1652                                        .iter()
1653                                        .map(|field| field.to_prost())
1654                                        .collect(),
1655                                })),
1656                            },
1657                        )
1658                    })
1659                })
1660                .collect(),
1661            ..Default::default()
1662        }))
1663    }
1664}
1665
1666impl Command {
1667    #[expect(clippy::type_complexity)]
1668    pub(super) fn collect_database_partial_graph_actor_upstreams(
1669        actor_dispatchers: impl Iterator<
1670            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1671        >,
1672        reschedule_dispatcher_update: Option<(
1673            &HashMap<FragmentId, Reschedule>,
1674            &HashMap<FragmentId, HashSet<ActorId>>,
1675        )>,
1676        database_info: &InflightDatabaseInfo,
1677        control_stream_manager: &ControlStreamManager,
1678    ) -> HashMap<ActorId, ActorUpstreams> {
1679        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1680        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1681            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1682            for (upstream_actor_id, dispatchers) in upstream_actors {
1683                let upstream_actor_location =
1684                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1685                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1686                for downstream_actor_id in dispatchers
1687                    .iter()
1688                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1689                {
1690                    actor_upstreams
1691                        .entry(*downstream_actor_id)
1692                        .or_default()
1693                        .entry(upstream_fragment_id)
1694                        .or_default()
1695                        .insert(
1696                            upstream_actor_id,
1697                            PbActorInfo {
1698                                actor_id: upstream_actor_id,
1699                                host: Some(upstream_actor_host.clone()),
1700                                partial_graph_id: to_partial_graph_id(
1701                                    database_info.database_id,
1702                                    None,
1703                                ),
1704                            },
1705                        );
1706                }
1707            }
1708        }
1709        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1710            for reschedule in reschedules.values() {
1711                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1712                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1713                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1714                    for upstream_actor_id in fragment_actors
1715                        .get(upstream_fragment_id)
1716                        .expect("should exist")
1717                    {
1718                        let upstream_actor_location =
1719                            upstream_fragment.actors[upstream_actor_id].worker_id;
1720                        let upstream_actor_host =
1721                            control_stream_manager.host_addr(upstream_actor_location);
1722                        if let Some(upstream_reschedule) = upstream_reschedule
1723                            && upstream_reschedule
1724                                .removed_actors
1725                                .contains(upstream_actor_id)
1726                        {
1727                            continue;
1728                        }
1729                        for (_, downstream_actor_id) in
1730                            reschedule
1731                                .added_actors
1732                                .iter()
1733                                .flat_map(|(worker_id, actors)| {
1734                                    actors.iter().map(|actor| (*worker_id, *actor))
1735                                })
1736                        {
1737                            actor_upstreams
1738                                .entry(downstream_actor_id)
1739                                .or_default()
1740                                .entry(*upstream_fragment_id)
1741                                .or_default()
1742                                .insert(
1743                                    *upstream_actor_id,
1744                                    PbActorInfo {
1745                                        actor_id: *upstream_actor_id,
1746                                        host: Some(upstream_actor_host.clone()),
1747                                        partial_graph_id: to_partial_graph_id(
1748                                            database_info.database_id,
1749                                            None,
1750                                        ),
1751                                    },
1752                                );
1753                        }
1754                    }
1755                }
1756            }
1757        }
1758        actor_upstreams
1759    }
1760}