risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
49    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
50    StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::InflightDatabaseInfo;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68    FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
69    StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::{
72    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
73    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
74    build_actor_connector_splits,
75};
76use crate::{MetaError, MetaResult};
77
78/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
79/// used for actor scaling or migration.
80#[derive(Debug, Clone)]
81pub struct Reschedule {
82    /// Added actors in this fragment.
83    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
84
85    /// Removed actors in this fragment.
86    pub removed_actors: HashSet<ActorId>,
87
88    /// Vnode bitmap updates for some actors in this fragment.
89    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
90
91    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
92    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
93    /// New hash mapping of the upstream dispatcher to be updated.
94    ///
95    /// This field exists only when there's upstream fragment and the current fragment is
96    /// hash-sharded.
97    pub upstream_dispatcher_mapping: Option<ActorMapping>,
98
99    /// The downstream fragments of this fragment.
100    pub downstream_fragment_ids: Vec<FragmentId>,
101
102    /// Reassigned splits for source actors.
103    /// It becomes the `actor_splits` in [`UpdateMutation`].
104    /// `Source` and `SourceBackfill` are handled together here.
105    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
106
107    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
108}
109
110#[derive(Debug, Clone)]
111pub struct ReschedulePlan {
112    pub reschedules: HashMap<FragmentId, Reschedule>,
113    /// Should contain the actor ids in upstream and downstream fragments referenced by
114    /// `reschedules`.
115    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
116}
117
118/// Preloaded context for rescheduling, built outside the barrier worker.
119#[derive(Debug, Clone)]
120pub struct RescheduleContext {
121    pub loaded: LoadedFragmentContext,
122    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
123    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
126}
127
128impl RescheduleContext {
129    pub fn empty() -> Self {
130        Self {
131            loaded: LoadedFragmentContext::default(),
132            job_extra_info: HashMap::new(),
133            upstream_fragments: HashMap::new(),
134            downstream_fragments: HashMap::new(),
135            downstream_relations: HashMap::new(),
136        }
137    }
138
139    pub fn is_empty(&self) -> bool {
140        self.loaded.is_empty()
141    }
142
143    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
144        let loaded = self.loaded.for_database(database_id)?;
145        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
146        // Use the filtered loaded context as the source of truth so every side map is pruned by
147        // the same fragment set.
148        let fragment_ids: HashSet<FragmentId> = loaded
149            .job_fragments
150            .values()
151            .flat_map(|fragments| fragments.keys().copied())
152            .collect();
153
154        let job_extra_info = self
155            .job_extra_info
156            .iter()
157            .filter(|(job_id, _)| job_ids.contains(*job_id))
158            .map(|(job_id, info)| (*job_id, info.clone()))
159            .collect();
160
161        let upstream_fragments = self
162            .upstream_fragments
163            .iter()
164            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
165            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
166            .collect();
167
168        let downstream_fragments = self
169            .downstream_fragments
170            .iter()
171            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
172            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
173            .collect();
174
175        let downstream_relations = self
176            .downstream_relations
177            .iter()
178            // Ownership of this map is source-fragment based. We keep all downstream edges for
179            // selected sources because the target side can still be referenced during dispatcher
180            // reconstruction even if that target fragment is not being rescheduled.
181            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
182            .map(|(key, relation)| (*key, relation.clone()))
183            .collect();
184
185        Some(Self {
186            loaded,
187            job_extra_info,
188            upstream_fragments,
189            downstream_fragments,
190            downstream_relations,
191        })
192    }
193
194    /// Split this context into per-database contexts without cloning the large loaded graph
195    /// payloads.
196    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
197        let Self {
198            loaded,
199            job_extra_info,
200            upstream_fragments,
201            downstream_fragments,
202            downstream_relations,
203        } = self;
204
205        let mut contexts: HashMap<_, _> = loaded
206            .into_database_contexts()
207            .into_iter()
208            .map(|(database_id, loaded)| {
209                (
210                    database_id,
211                    Self {
212                        loaded,
213                        job_extra_info: HashMap::new(),
214                        upstream_fragments: HashMap::new(),
215                        downstream_fragments: HashMap::new(),
216                        downstream_relations: HashMap::new(),
217                    },
218                )
219            })
220            .collect();
221
222        if contexts.is_empty() {
223            return contexts;
224        }
225
226        let mut job_databases = HashMap::new();
227        let mut fragment_databases = HashMap::new();
228        for (&database_id, context) in &contexts {
229            for job_id in context.loaded.job_map.keys().copied() {
230                job_databases.insert(job_id, database_id);
231            }
232            for fragment_id in context
233                .loaded
234                .job_fragments
235                .values()
236                .flat_map(|fragments| fragments.keys().copied())
237            {
238                fragment_databases.insert(fragment_id, database_id);
239            }
240        }
241
242        for (job_id, info) in job_extra_info {
243            if let Some(database_id) = job_databases.get(&job_id).copied() {
244                contexts
245                    .get_mut(&database_id)
246                    .expect("database context should exist for job")
247                    .job_extra_info
248                    .insert(job_id, info);
249            }
250        }
251
252        for (fragment_id, upstreams) in upstream_fragments {
253            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
254                contexts
255                    .get_mut(&database_id)
256                    .expect("database context should exist for fragment")
257                    .upstream_fragments
258                    .insert(fragment_id, upstreams);
259            }
260        }
261
262        for (fragment_id, downstreams) in downstream_fragments {
263            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
264                contexts
265                    .get_mut(&database_id)
266                    .expect("database context should exist for fragment")
267                    .downstream_fragments
268                    .insert(fragment_id, downstreams);
269            }
270        }
271
272        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
273            // Route by source fragment ownership. A target may be outside of current reschedule
274            // set, but this edge still belongs to the source-side command.
275            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
276                contexts
277                    .get_mut(&database_id)
278                    .expect("database context should exist for relation source")
279                    .downstream_relations
280                    .insert((source_fragment_id, target_fragment_id), relation);
281            }
282        }
283
284        contexts
285    }
286}
287
288/// Replacing an old job with a new one. All actors in the job will be rebuilt.
289///
290/// Current use cases:
291/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
292/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
293///   will replace a table job's plan.
294#[derive(Debug, Clone)]
295pub struct ReplaceStreamJobPlan {
296    pub old_fragments: StreamJobFragments,
297    pub new_fragments: StreamJobFragmentsToCreate,
298    /// The resource group of the database this job belongs to.
299    pub database_resource_group: String,
300    /// Downstream jobs of the replaced job need to update their `Merge` node to
301    /// connect to the new fragment.
302    pub replace_upstream: FragmentReplaceUpstream,
303    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
304    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
305    /// inside the barrier worker.
306    pub split_plan: ReplaceJobSplitPlan,
307    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
308    pub streaming_job: StreamingJob,
309    /// The `streaming_job::Model` for this job, loaded from meta store.
310    pub streaming_job_model: streaming_job::Model,
311    /// The temporary dummy job fragments id of new table fragment
312    pub tmp_id: JobId,
313    /// The state table ids to be dropped.
314    pub to_drop_state_table_ids: Vec<TableId>,
315    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
316}
317
318impl ReplaceStreamJobPlan {
319    /// `old_fragment_id` -> `new_fragment_id`
320    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
321        let mut fragment_replacements = HashMap::new();
322        for (upstream_fragment_id, new_upstream_fragment_id) in
323            self.replace_upstream.values().flatten()
324        {
325            {
326                let r =
327                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
328                if let Some(r) = r {
329                    assert_eq!(
330                        *new_upstream_fragment_id, r,
331                        "one fragment is replaced by multiple fragments"
332                    );
333                }
334            }
335        }
336        fragment_replacements
337    }
338}
339
340#[derive(educe::Educe, Clone)]
341#[educe(Debug)]
342pub struct CreateStreamingJobCommandInfo {
343    #[educe(Debug(ignore))]
344    pub stream_job_fragments: StreamJobFragmentsToCreate,
345    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
346    /// The resource group of the database this job belongs to.
347    pub database_resource_group: String,
348    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
349    pub init_split_assignment: SourceSplitAssignment,
350    pub definition: String,
351    pub job_type: StreamingJobType,
352    pub create_type: CreateType,
353    pub streaming_job: StreamingJob,
354    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
355    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
356    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
357    pub is_serverless: bool,
358    /// The `streaming_job::Model` for this job, loaded from meta store.
359    pub streaming_job_model: streaming_job::Model,
360}
361
362impl StreamJobFragments {
363    /// Build fragment-level info for new fragments, populating actor infos from the
364    /// rendered actors and their locations, and applying split assignment to actor splits.
365    pub(super) fn new_fragment_info<'a>(
366        &'a self,
367        stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
368        actor_location: &'a HashMap<ActorId, WorkerId>,
369        assignment: &'a SplitAssignment,
370    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
371        self.fragments.values().map(|fragment| {
372            (
373                fragment.fragment_id,
374                InflightFragmentInfo {
375                    fragment_id: fragment.fragment_id,
376                    distribution_type: fragment.distribution_type.into(),
377                    fragment_type_mask: fragment.fragment_type_mask,
378                    vnode_count: fragment.vnode_count(),
379                    nodes: fragment.nodes.clone(),
380                    actors: stream_actors
381                        .get(&fragment.fragment_id)
382                        .into_iter()
383                        .flatten()
384                        .map(|actor| {
385                            (
386                                actor.actor_id,
387                                InflightActorInfo {
388                                    worker_id: actor_location[&actor.actor_id],
389                                    vnode_bitmap: actor.vnode_bitmap.clone(),
390                                    splits: assignment
391                                        .get(&fragment.fragment_id)
392                                        .and_then(|s| s.get(&actor.actor_id))
393                                        .cloned()
394                                        .unwrap_or_default(),
395                                },
396                            )
397                        })
398                        .collect(),
399                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
400                },
401            )
402        })
403    }
404}
405
406#[derive(Debug, Clone)]
407pub struct SnapshotBackfillInfo {
408    /// `table_id` -> `Some(snapshot_backfill_epoch)`
409    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
410    /// by global barrier worker when handling the command.
411    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
412}
413
414#[derive(Debug, Clone)]
415pub enum CreateStreamingJobType {
416    Normal,
417    SinkIntoTable(UpstreamSinkInfo),
418    SnapshotBackfill(SnapshotBackfillInfo),
419}
420
421/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
422/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
423/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
424// FIXME: this enum is significantly large on stack, box it
425#[derive(Debug)]
426pub enum Command {
427    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
428    /// all messages before the checkpoint barrier should have been committed.
429    Flush,
430
431    /// `Pause` command generates a `Pause` barrier **only if**
432    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
433    Pause,
434
435    /// `Resume` command generates a `Resume` barrier **only
436    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
437    /// will be generated.
438    Resume,
439
440    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
441    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
442    /// dropped by reference counts before.
443    ///
444    /// Barriers from the actors to be dropped will STILL be collected.
445    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
446    /// drop actors, and then delete the job fragments info from meta store.
447    DropStreamingJobs {
448        streaming_job_ids: HashSet<JobId>,
449        unregistered_state_table_ids: HashSet<TableId>,
450        unregistered_fragment_ids: HashSet<FragmentId>,
451        // target_fragment -> [sink_fragments]
452        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
453    },
454
455    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
456    ///
457    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
458    /// be collected since the barrier should be passthrough.
459    ///
460    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
461    /// it adds the job fragments info to meta store. However, the creating progress will **last
462    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
463    /// will be set to `Created`.
464    CreateStreamingJob {
465        info: CreateStreamingJobCommandInfo,
466        job_type: CreateStreamingJobType,
467        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
468    },
469
470    /// Reschedule context. It must be resolved inside the barrier worker before injection.
471    RescheduleIntent {
472        context: RescheduleContext,
473        /// Filled by the barrier worker after resolving `context` against current worker topology.
474        ///
475        /// We keep unresolved `context` outside of checkpoint state and only materialize this
476        /// execution plan right before injection, then drop `context` to release memory earlier.
477        reschedule_plan: Option<ReschedulePlan>,
478    },
479
480    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
481    /// essentially switching the downstream of the old job fragments to the new ones, and
482    /// dropping the old job fragments. Used for schema change.
483    ///
484    /// This can be treated as a special case of reschedule, while the upstream fragment
485    /// of the Merge executors are changed additionally.
486    ReplaceStreamJob(ReplaceStreamJobPlan),
487
488    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
489    /// changed splits.
490    SourceChangeSplit(SplitState),
491
492    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
493    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
494    Throttle {
495        jobs: HashSet<JobId>,
496        config: HashMap<FragmentId, ThrottleConfig>,
497    },
498
499    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
500    /// materialize executor to start storing old value for subscription.
501    CreateSubscription {
502        subscription_id: SubscriptionId,
503        upstream_mv_table_id: TableId,
504        retention_second: u64,
505    },
506
507    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
508    /// materialize executor to stop storing old value when there is no
509    /// subscription depending on it.
510    DropSubscription {
511        subscription_id: SubscriptionId,
512        upstream_mv_table_id: TableId,
513    },
514
515    /// `AlterSubscriptionRetention` command updates the subscription retention time.
516    AlterSubscriptionRetention {
517        subscription_id: SubscriptionId,
518        upstream_mv_table_id: TableId,
519        retention_second: u64,
520    },
521
522    ConnectorPropsChange(ConnectorPropsChange),
523
524    /// `Refresh` command generates a barrier to refresh a table by truncating state
525    /// and reloading data from source.
526    Refresh {
527        table_id: TableId,
528        associated_source_id: SourceId,
529    },
530    ListFinish {
531        table_id: TableId,
532        associated_source_id: SourceId,
533    },
534    LoadFinish {
535        table_id: TableId,
536        associated_source_id: SourceId,
537    },
538
539    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
540    /// Used when upstream binlog/oplog has expired.
541    ResetSource {
542        source_id: SourceId,
543    },
544
545    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
546    /// to resume for troubleshooting.
547    ResumeBackfill {
548        target: ResumeBackfillTarget,
549    },
550
551    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
552    /// into source splits (UNSAFE - admin only).
553    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
554    InjectSourceOffsets {
555        source_id: SourceId,
556        /// Split ID -> offset (JSON-encoded based on connector type)
557        split_offsets: HashMap<String, String>,
558    },
559}
560
561#[derive(Debug, Clone, Copy)]
562pub enum ResumeBackfillTarget {
563    Job(JobId),
564    Fragment(FragmentId),
565}
566
567// For debugging and observability purposes. Can add more details later if needed.
568impl std::fmt::Display for Command {
569    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
570        match self {
571            Command::Flush => write!(f, "Flush"),
572            Command::Pause => write!(f, "Pause"),
573            Command::Resume => write!(f, "Resume"),
574            Command::DropStreamingJobs {
575                streaming_job_ids, ..
576            } => {
577                write!(
578                    f,
579                    "DropStreamingJobs: {}",
580                    streaming_job_ids.iter().sorted().join(", ")
581                )
582            }
583            Command::CreateStreamingJob { info, .. } => {
584                write!(f, "CreateStreamingJob: {}", info.streaming_job)
585            }
586            Command::RescheduleIntent {
587                reschedule_plan, ..
588            } => {
589                if reschedule_plan.is_some() {
590                    write!(f, "RescheduleIntent(planned)")
591                } else {
592                    write!(f, "RescheduleIntent")
593                }
594            }
595            Command::ReplaceStreamJob(plan) => {
596                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
597            }
598            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
599            Command::Throttle { .. } => write!(f, "Throttle"),
600            Command::CreateSubscription {
601                subscription_id, ..
602            } => write!(f, "CreateSubscription: {subscription_id}"),
603            Command::DropSubscription {
604                subscription_id, ..
605            } => write!(f, "DropSubscription: {subscription_id}"),
606            Command::AlterSubscriptionRetention {
607                subscription_id,
608                retention_second,
609                ..
610            } => write!(
611                f,
612                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
613            ),
614            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
615            Command::Refresh {
616                table_id,
617                associated_source_id,
618            } => write!(
619                f,
620                "Refresh: {} (source: {})",
621                table_id, associated_source_id
622            ),
623            Command::ListFinish {
624                table_id,
625                associated_source_id,
626            } => write!(
627                f,
628                "ListFinish: {} (source: {})",
629                table_id, associated_source_id
630            ),
631            Command::LoadFinish {
632                table_id,
633                associated_source_id,
634            } => write!(
635                f,
636                "LoadFinish: {} (source: {})",
637                table_id, associated_source_id
638            ),
639            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
640            Command::ResumeBackfill { target } => match target {
641                ResumeBackfillTarget::Job(job_id) => {
642                    write!(f, "ResumeBackfill: job={job_id}")
643                }
644                ResumeBackfillTarget::Fragment(fragment_id) => {
645                    write!(f, "ResumeBackfill: fragment={fragment_id}")
646                }
647            },
648            Command::InjectSourceOffsets {
649                source_id,
650                split_offsets,
651            } => write!(
652                f,
653                "InjectSourceOffsets: {} ({} splits)",
654                source_id,
655                split_offsets.len()
656            ),
657        }
658    }
659}
660
661impl Command {
662    pub fn pause() -> Self {
663        Self::Pause
664    }
665
666    pub fn resume() -> Self {
667        Self::Resume
668    }
669
670    pub fn need_checkpoint(&self) -> bool {
671        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
672        !matches!(self, Command::Resume)
673    }
674}
675
676#[derive(Debug)]
677pub enum PostCollectCommand {
678    Command(String),
679    DropStreamingJobs {
680        streaming_job_ids: HashSet<JobId>,
681        unregistered_state_table_ids: HashSet<TableId>,
682    },
683    CreateStreamingJob {
684        info: CreateStreamingJobCommandInfo,
685        job_type: CreateStreamingJobType,
686        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
687        resolved_split_assignment: SplitAssignment,
688    },
689    Reschedule {
690        reschedules: HashMap<FragmentId, Reschedule>,
691    },
692    ReplaceStreamJob {
693        plan: ReplaceStreamJobPlan,
694        resolved_split_assignment: SplitAssignment,
695    },
696    SourceChangeSplit {
697        split_assignment: SplitAssignment,
698    },
699    CreateSubscription {
700        subscription_id: SubscriptionId,
701    },
702    ConnectorPropsChange(ConnectorPropsChange),
703    ResumeBackfill {
704        target: ResumeBackfillTarget,
705    },
706}
707
708impl PostCollectCommand {
709    pub fn barrier() -> Self {
710        PostCollectCommand::Command("barrier".to_owned())
711    }
712
713    pub fn command_name(&self) -> &str {
714        match self {
715            PostCollectCommand::Command(name) => name.as_str(),
716            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
717            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
718            PostCollectCommand::Reschedule { .. } => "Reschedule",
719            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
720            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
721            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
722            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
723            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
724        }
725    }
726}
727
728impl Display for PostCollectCommand {
729    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
730        f.write_str(self.command_name())
731    }
732}
733
734#[derive(Debug, Clone)]
735pub enum BarrierKind {
736    Initial,
737    Barrier,
738    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
739    Checkpoint(Vec<u64>),
740}
741
742impl BarrierKind {
743    pub fn to_protobuf(&self) -> PbBarrierKind {
744        match self {
745            BarrierKind::Initial => PbBarrierKind::Initial,
746            BarrierKind::Barrier => PbBarrierKind::Barrier,
747            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
748        }
749    }
750
751    pub fn is_checkpoint(&self) -> bool {
752        matches!(self, BarrierKind::Checkpoint(_))
753    }
754
755    pub fn is_initial(&self) -> bool {
756        matches!(self, BarrierKind::Initial)
757    }
758
759    pub fn as_str_name(&self) -> &'static str {
760        match self {
761            BarrierKind::Initial => "Initial",
762            BarrierKind::Barrier => "Barrier",
763            BarrierKind::Checkpoint(_) => "Checkpoint",
764        }
765    }
766}
767
768/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
769/// [`Command`].
770pub(super) struct CommandContext {
771    mv_subscription_max_retention: HashMap<TableId, u64>,
772
773    pub(super) barrier_info: BarrierInfo,
774
775    pub(super) table_ids_to_commit: HashSet<TableId>,
776
777    pub(super) command: PostCollectCommand,
778
779    /// The tracing span of this command.
780    ///
781    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
782    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
783    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
784    _span: tracing::Span,
785}
786
787impl std::fmt::Debug for CommandContext {
788    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
789        f.debug_struct("CommandContext")
790            .field("barrier_info", &self.barrier_info)
791            .field("command", &self.command.command_name())
792            .finish()
793    }
794}
795
796impl CommandContext {
797    pub(super) fn new(
798        barrier_info: BarrierInfo,
799        mv_subscription_max_retention: HashMap<TableId, u64>,
800        table_ids_to_commit: HashSet<TableId>,
801        command: PostCollectCommand,
802        span: tracing::Span,
803    ) -> Self {
804        Self {
805            mv_subscription_max_retention,
806            barrier_info,
807            table_ids_to_commit,
808            command,
809            _span: span,
810        }
811    }
812
813    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
814        let Some(truncate_timestamptz) = Timestamptz::from_secs(
815            self.barrier_info
816                .prev_epoch
817                .value()
818                .as_timestamptz()
819                .timestamp()
820                - retention_second as i64,
821        ) else {
822            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
823            return self.barrier_info.prev_epoch.value();
824        };
825        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
826    }
827
828    pub(super) fn collect_commit_epoch_info(
829        &self,
830        info: &mut CommitEpochInfo,
831        resps: Vec<BarrierCompleteResponse>,
832        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
833    ) {
834        let (
835            sst_to_context,
836            synced_ssts,
837            new_table_watermarks,
838            old_value_ssts,
839            vector_index_adds,
840            truncate_tables,
841        ) = collect_resp_info(resps);
842
843        let new_table_fragment_infos =
844            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
845                assert!(!matches!(
846                    job_type,
847                    CreateStreamingJobType::SnapshotBackfill(_)
848                ));
849                let table_fragments = &info.stream_job_fragments;
850                let mut table_ids: HashSet<_> =
851                    table_fragments.internal_table_ids().into_iter().collect();
852                if let Some(mv_table_id) = table_fragments.mv_table_id() {
853                    table_ids.insert(mv_table_id);
854                }
855
856                vec![NewTableFragmentInfo { table_ids }]
857            } else {
858                vec![]
859            };
860
861        let mut mv_log_store_truncate_epoch = HashMap::new();
862        // TODO: may collect cross db snapshot backfill
863        let mut update_truncate_epoch =
864            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
865                Entry::Occupied(mut entry) => {
866                    let prev_truncate_epoch = entry.get_mut();
867                    if truncate_epoch < *prev_truncate_epoch {
868                        *prev_truncate_epoch = truncate_epoch;
869                    }
870                }
871                Entry::Vacant(entry) => {
872                    entry.insert(truncate_epoch);
873                }
874            };
875        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
876            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
877            update_truncate_epoch(*mv_table_id, truncate_epoch);
878        }
879        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
880            for mv_table_id in upstream_mv_table_ids {
881                update_truncate_epoch(mv_table_id, backfill_epoch);
882            }
883        }
884
885        let table_new_change_log = build_table_change_log_delta(
886            old_value_ssts.into_iter(),
887            synced_ssts.iter().map(|sst| &sst.sst_info),
888            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
889            mv_log_store_truncate_epoch.into_iter(),
890        );
891
892        let epoch = self.barrier_info.prev_epoch();
893        for table_id in &self.table_ids_to_commit {
894            info.tables_to_commit
895                .try_insert(*table_id, epoch)
896                .expect("non duplicate");
897        }
898
899        info.sstables.extend(synced_ssts);
900        info.new_table_watermarks.extend(new_table_watermarks);
901        info.sst_to_context.extend(sst_to_context);
902        info.new_table_fragment_infos
903            .extend(new_table_fragment_infos);
904        info.change_log_delta.extend(table_new_change_log);
905        for (table_id, vector_index_adds) in vector_index_adds {
906            info.vector_index_delta
907                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
908                .expect("non-duplicate");
909        }
910        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
911            && let Some(index_table) = collect_new_vector_index_info(job_info)
912        {
913            info.vector_index_delta
914                .try_insert(
915                    index_table.id,
916                    VectorIndexDelta::Init(PbVectorIndexInit {
917                        info: Some(index_table.vector_index_info.unwrap()),
918                    }),
919                )
920                .expect("non-duplicate");
921        }
922        info.truncate_tables.extend(truncate_tables);
923    }
924}
925
926impl Command {
927    /// Build the `Pause` mutation.
928    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
929        {
930            {
931                // Only pause when the cluster is not already paused.
932                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
933                if !is_currently_paused {
934                    Some(Mutation::Pause(PauseMutation {}))
935                } else {
936                    None
937                }
938            }
939        }
940    }
941
942    /// Build the `Resume` mutation.
943    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
944        {
945            {
946                // Only resume when the cluster is paused with the same reason.
947                if is_currently_paused {
948                    Some(Mutation::Resume(ResumeMutation {}))
949                } else {
950                    None
951                }
952            }
953        }
954    }
955
956    /// Build the `Splits` mutation for `SourceChangeSplit`.
957    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
958        {
959            {
960                let mut diff = HashMap::new();
961
962                for actor_splits in split_assignment.values() {
963                    diff.extend(actor_splits.clone());
964                }
965
966                Mutation::Splits(SourceChangeSplitMutation {
967                    actor_splits: build_actor_connector_splits(&diff),
968                })
969            }
970        }
971    }
972
973    /// Build the `Throttle` mutation.
974    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
975        {
976            {
977                let config = config.clone();
978                Mutation::Throttle(ThrottleMutation {
979                    fragment_throttle: config,
980                })
981            }
982        }
983    }
984
985    /// Build the `Stop` mutation for `DropStreamingJobs`.
986    pub(super) fn drop_streaming_jobs_to_mutation(
987        actors: &Vec<ActorId>,
988        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
989    ) -> Mutation {
990        {
991            Mutation::Stop(StopMutation {
992                actors: actors.clone(),
993                dropped_sink_fragments: dropped_sink_fragment_by_targets
994                    .values()
995                    .flatten()
996                    .cloned()
997                    .collect(),
998            })
999        }
1000    }
1001
1002    /// Build the `Add` mutation for `CreateStreamingJob`.
1003    pub(super) fn create_streaming_job_to_mutation(
1004        info: &CreateStreamingJobCommandInfo,
1005        job_type: &CreateStreamingJobType,
1006        is_currently_paused: bool,
1007        edges: &mut FragmentEdgeBuildResult,
1008        control_stream_manager: &ControlStreamManager,
1009        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1010        split_assignment: &SplitAssignment,
1011        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1012        actor_location: &HashMap<ActorId, WorkerId>,
1013    ) -> MetaResult<Mutation> {
1014        {
1015            {
1016                let CreateStreamingJobCommandInfo {
1017                    stream_job_fragments,
1018                    upstream_fragment_downstreams,
1019                    fragment_backfill_ordering,
1020                    streaming_job,
1021                    ..
1022                } = info;
1023                let database_id = streaming_job.database_id();
1024                let added_actors: Vec<ActorId> = stream_actors
1025                    .values()
1026                    .flatten()
1027                    .map(|actor| actor.actor_id)
1028                    .collect();
1029                let actor_splits = split_assignment
1030                    .values()
1031                    .flat_map(build_actor_connector_splits)
1032                    .collect();
1033                let subscriptions_to_add =
1034                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1035                        job_type
1036                    {
1037                        snapshot_backfill_info
1038                            .upstream_mv_table_id_to_backfill_epoch
1039                            .keys()
1040                            .map(|table_id| SubscriptionUpstreamInfo {
1041                                subscriber_id: stream_job_fragments
1042                                    .stream_job_id()
1043                                    .as_subscriber_id(),
1044                                upstream_mv_table_id: *table_id,
1045                            })
1046                            .collect()
1047                    } else {
1048                        Default::default()
1049                    };
1050                let backfill_nodes_to_pause: Vec<_> =
1051                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1052                        .into_iter()
1053                        .collect();
1054
1055                let new_upstream_sinks =
1056                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1057                        sink_fragment_id,
1058                        sink_output_fields,
1059                        project_exprs,
1060                        new_sink_downstream,
1061                        ..
1062                    }) = job_type
1063                    {
1064                        let new_sink_actors = stream_actors
1065                            .get(sink_fragment_id)
1066                            .unwrap_or_else(|| {
1067                                panic!("upstream sink fragment {sink_fragment_id} not exist")
1068                            })
1069                            .iter()
1070                            .map(|actor| {
1071                                let worker_id = actor_location[&actor.actor_id];
1072                                PbActorInfo {
1073                                    actor_id: actor.actor_id,
1074                                    host: Some(control_stream_manager.host_addr(worker_id)),
1075                                    partial_graph_id: to_partial_graph_id(database_id, None),
1076                                }
1077                            });
1078                        let new_upstream_sink = PbNewUpstreamSink {
1079                            info: Some(PbUpstreamSinkInfo {
1080                                upstream_fragment_id: *sink_fragment_id,
1081                                sink_output_schema: sink_output_fields.clone(),
1082                                project_exprs: project_exprs.clone(),
1083                            }),
1084                            upstream_actors: new_sink_actors.collect(),
1085                        };
1086                        HashMap::from([(
1087                            new_sink_downstream.downstream_fragment_id,
1088                            new_upstream_sink,
1089                        )])
1090                    } else {
1091                        HashMap::new()
1092                    };
1093
1094                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1095                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1096
1097                let add_mutation = AddMutation {
1098                    actor_dispatchers: edges
1099                        .dispatchers
1100                        .extract_if(|fragment_id, _| {
1101                            upstream_fragment_downstreams.contains_key(fragment_id)
1102                        })
1103                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1104                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1105                        .collect(),
1106                    added_actors,
1107                    actor_splits,
1108                    // If the cluster is already paused, the new actors should be paused too.
1109                    pause: is_currently_paused,
1110                    subscriptions_to_add,
1111                    backfill_nodes_to_pause,
1112                    actor_cdc_table_snapshot_splits,
1113                    new_upstream_sinks,
1114                };
1115
1116                Ok(Mutation::Add(add_mutation))
1117            }
1118        }
1119    }
1120
1121    /// Build the `Update` mutation for `ReplaceStreamJob`.
1122    pub(super) fn replace_stream_job_to_mutation(
1123        ReplaceStreamJobPlan {
1124            old_fragments,
1125            replace_upstream,
1126            upstream_fragment_downstreams,
1127            auto_refresh_schema_sinks,
1128            ..
1129        }: &ReplaceStreamJobPlan,
1130        edges: &mut FragmentEdgeBuildResult,
1131        database_info: &mut InflightDatabaseInfo,
1132        split_assignment: &SplitAssignment,
1133    ) -> MetaResult<Option<Mutation>> {
1134        {
1135            {
1136                let merge_updates = edges
1137                    .merge_updates
1138                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1139                    .collect();
1140                let dispatchers = edges
1141                    .dispatchers
1142                    .extract_if(|fragment_id, _| {
1143                        upstream_fragment_downstreams.contains_key(fragment_id)
1144                    })
1145                    .collect();
1146                let actor_cdc_table_snapshot_splits = database_info
1147                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1148                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1149                let old_fragments = old_fragments.fragments.keys().copied();
1150                let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1151                    .as_ref()
1152                    .into_iter()
1153                    .flat_map(|sinks| sinks.iter())
1154                    .map(|sink| sink.original_fragment.fragment_id);
1155                Ok(Self::generate_update_mutation_for_replace_table(
1156                    old_fragments
1157                        .chain(auto_refresh_sink_fragment_ids)
1158                        .flat_map(|fragment_id| {
1159                            database_info.fragment(fragment_id).actors.keys().copied()
1160                        }),
1161                    merge_updates,
1162                    dispatchers,
1163                    split_assignment,
1164                    actor_cdc_table_snapshot_splits,
1165                    auto_refresh_schema_sinks.as_ref(),
1166                ))
1167            }
1168        }
1169    }
1170
1171    /// Build the `Update` mutation for `RescheduleIntent`.
1172    pub(super) fn reschedule_to_mutation(
1173        reschedules: &HashMap<FragmentId, Reschedule>,
1174        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1175        control_stream_manager: &ControlStreamManager,
1176        database_info: &mut InflightDatabaseInfo,
1177    ) -> MetaResult<Option<Mutation>> {
1178        {
1179            {
1180                let database_id = database_info.database_id;
1181                let mut dispatcher_update = HashMap::new();
1182                for reschedule in reschedules.values() {
1183                    for &(upstream_fragment_id, dispatcher_id) in
1184                        &reschedule.upstream_fragment_dispatcher_ids
1185                    {
1186                        // Find the actors of the upstream fragment.
1187                        let upstream_actor_ids = fragment_actors
1188                            .get(&upstream_fragment_id)
1189                            .expect("should contain");
1190
1191                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1192
1193                        // Record updates for all actors.
1194                        for &actor_id in upstream_actor_ids {
1195                            let added_downstream_actor_id = if upstream_reschedule
1196                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1197                                .unwrap_or(true)
1198                            {
1199                                reschedule
1200                                    .added_actors
1201                                    .values()
1202                                    .flatten()
1203                                    .cloned()
1204                                    .collect()
1205                            } else {
1206                                Default::default()
1207                            };
1208                            // Index with the dispatcher id to check duplicates.
1209                            dispatcher_update
1210                                .try_insert(
1211                                    (actor_id, dispatcher_id),
1212                                    DispatcherUpdate {
1213                                        actor_id,
1214                                        dispatcher_id,
1215                                        hash_mapping: reschedule
1216                                            .upstream_dispatcher_mapping
1217                                            .as_ref()
1218                                            .map(|m| m.to_protobuf()),
1219                                        added_downstream_actor_id,
1220                                        removed_downstream_actor_id: reschedule
1221                                            .removed_actors
1222                                            .iter()
1223                                            .cloned()
1224                                            .collect(),
1225                                    },
1226                                )
1227                                .unwrap();
1228                        }
1229                    }
1230                }
1231                let dispatcher_update = dispatcher_update.into_values().collect();
1232
1233                let mut merge_update = HashMap::new();
1234                for (&fragment_id, reschedule) in reschedules {
1235                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1236                        // Find the actors of the downstream fragment.
1237                        let downstream_actor_ids = fragment_actors
1238                            .get(&downstream_fragment_id)
1239                            .expect("should contain");
1240
1241                        // Downstream removed actors should be skipped
1242                        // Newly created actors of the current fragment will not dispatch Update
1243                        // barriers to them
1244                        let downstream_removed_actors: HashSet<_> = reschedules
1245                            .get(&downstream_fragment_id)
1246                            .map(|downstream_reschedule| {
1247                                downstream_reschedule
1248                                    .removed_actors
1249                                    .iter()
1250                                    .copied()
1251                                    .collect()
1252                            })
1253                            .unwrap_or_default();
1254
1255                        // Record updates for all actors.
1256                        for &actor_id in downstream_actor_ids {
1257                            if downstream_removed_actors.contains(&actor_id) {
1258                                continue;
1259                            }
1260
1261                            // Index with the fragment id to check duplicates.
1262                            merge_update
1263                                .try_insert(
1264                                    (actor_id, fragment_id),
1265                                    MergeUpdate {
1266                                        actor_id,
1267                                        upstream_fragment_id: fragment_id,
1268                                        new_upstream_fragment_id: None,
1269                                        added_upstream_actors: reschedule
1270                                            .added_actors
1271                                            .iter()
1272                                            .flat_map(|(worker_id, actors)| {
1273                                                let host =
1274                                                    control_stream_manager.host_addr(*worker_id);
1275                                                actors.iter().map(move |&actor_id| PbActorInfo {
1276                                                    actor_id,
1277                                                    host: Some(host.clone()),
1278                                                    // we assume that we only scale the partial graph of database
1279                                                    partial_graph_id: to_partial_graph_id(
1280                                                        database_id,
1281                                                        None,
1282                                                    ),
1283                                                })
1284                                            })
1285                                            .collect(),
1286                                        removed_upstream_actor_id: reschedule
1287                                            .removed_actors
1288                                            .iter()
1289                                            .cloned()
1290                                            .collect(),
1291                                    },
1292                                )
1293                                .unwrap();
1294                        }
1295                    }
1296                }
1297                let merge_update = merge_update.into_values().collect();
1298
1299                let mut actor_vnode_bitmap_update = HashMap::new();
1300                for reschedule in reschedules.values() {
1301                    // Record updates for all actors in this fragment.
1302                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1303                        let bitmap = bitmap.to_protobuf();
1304                        actor_vnode_bitmap_update
1305                            .try_insert(actor_id, bitmap)
1306                            .unwrap();
1307                    }
1308                }
1309                let dropped_actors = reschedules
1310                    .values()
1311                    .flat_map(|r| r.removed_actors.iter().copied())
1312                    .collect();
1313                let mut actor_splits = HashMap::new();
1314                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1315                for (fragment_id, reschedule) in reschedules {
1316                    for (actor_id, splits) in &reschedule.actor_splits {
1317                        actor_splits.insert(
1318                            *actor_id,
1319                            ConnectorSplits {
1320                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1321                            },
1322                        );
1323                    }
1324
1325                    if let Some(assignment) =
1326                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1327                    {
1328                        actor_cdc_table_snapshot_splits.extend(assignment)
1329                    }
1330                }
1331
1332                // we don't create dispatchers in reschedule scenario
1333                let actor_new_dispatchers = HashMap::new();
1334                let mutation = Mutation::Update(UpdateMutation {
1335                    dispatcher_update,
1336                    merge_update,
1337                    actor_vnode_bitmap_update,
1338                    dropped_actors,
1339                    actor_splits,
1340                    actor_new_dispatchers,
1341                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1342                        splits: actor_cdc_table_snapshot_splits,
1343                    }),
1344                    sink_schema_change: Default::default(),
1345                    subscriptions_to_drop: vec![],
1346                });
1347                tracing::debug!("update mutation: {mutation:?}");
1348                Ok(Some(mutation))
1349            }
1350        }
1351    }
1352
1353    /// Build the `Add` mutation for `CreateSubscription`.
1354    pub(super) fn create_subscription_to_mutation(
1355        upstream_mv_table_id: TableId,
1356        subscription_id: SubscriptionId,
1357    ) -> Mutation {
1358        {
1359            Mutation::Add(AddMutation {
1360                actor_dispatchers: Default::default(),
1361                added_actors: vec![],
1362                actor_splits: Default::default(),
1363                pause: false,
1364                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1365                    upstream_mv_table_id,
1366                    subscriber_id: subscription_id.as_subscriber_id(),
1367                }],
1368                backfill_nodes_to_pause: vec![],
1369                actor_cdc_table_snapshot_splits: None,
1370                new_upstream_sinks: Default::default(),
1371            })
1372        }
1373    }
1374
1375    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1376    pub(super) fn drop_subscription_to_mutation(
1377        upstream_mv_table_id: TableId,
1378        subscription_id: SubscriptionId,
1379    ) -> Mutation {
1380        {
1381            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1382                info: vec![SubscriptionUpstreamInfo {
1383                    subscriber_id: subscription_id.as_subscriber_id(),
1384                    upstream_mv_table_id,
1385                }],
1386            })
1387        }
1388    }
1389
1390    /// Build the `ConnectorPropsChange` mutation.
1391    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1392        {
1393            {
1394                let mut connector_props_infos = HashMap::default();
1395                for (k, v) in config {
1396                    connector_props_infos.insert(
1397                        k.as_raw_id(),
1398                        ConnectorPropsInfo {
1399                            connector_props_info: v.clone(),
1400                        },
1401                    );
1402                }
1403                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1404                    connector_props_infos,
1405                })
1406            }
1407        }
1408    }
1409
1410    /// Build the `RefreshStart` mutation.
1411    pub(super) fn refresh_to_mutation(
1412        table_id: TableId,
1413        associated_source_id: SourceId,
1414    ) -> Mutation {
1415        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1416            table_id,
1417            associated_source_id,
1418        })
1419    }
1420
1421    /// Build the `ListFinish` mutation.
1422    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1423        Mutation::ListFinish(ListFinishMutation {
1424            associated_source_id,
1425        })
1426    }
1427
1428    /// Build the `LoadFinish` mutation.
1429    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1430        Mutation::LoadFinish(LoadFinishMutation {
1431            associated_source_id,
1432        })
1433    }
1434
1435    /// Build the `ResetSource` mutation.
1436    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1437        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1438            source_id: source_id.as_raw_id(),
1439        })
1440    }
1441
1442    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1443    pub(super) fn resume_backfill_to_mutation(
1444        target: &ResumeBackfillTarget,
1445        database_info: &InflightDatabaseInfo,
1446    ) -> MetaResult<Option<Mutation>> {
1447        {
1448            {
1449                let fragment_ids: HashSet<_> = match target {
1450                    ResumeBackfillTarget::Job(job_id) => {
1451                        database_info.backfill_fragment_ids_for_job(*job_id)?
1452                    }
1453                    ResumeBackfillTarget::Fragment(fragment_id) => {
1454                        if !database_info.is_backfill_fragment(*fragment_id)? {
1455                            return Err(MetaError::invalid_parameter(format!(
1456                                "fragment {} is not a backfill node",
1457                                fragment_id
1458                            )));
1459                        }
1460                        HashSet::from([*fragment_id])
1461                    }
1462                };
1463                if fragment_ids.is_empty() {
1464                    warn!(
1465                        ?target,
1466                        "resume backfill command ignored because no backfill fragments found"
1467                    );
1468                    Ok(None)
1469                } else {
1470                    Ok(Some(Mutation::StartFragmentBackfill(
1471                        StartFragmentBackfillMutation {
1472                            fragment_ids: fragment_ids.into_iter().collect(),
1473                        },
1474                    )))
1475                }
1476            }
1477        }
1478    }
1479
1480    /// Build the `InjectSourceOffsets` mutation.
1481    pub(super) fn inject_source_offsets_to_mutation(
1482        source_id: SourceId,
1483        split_offsets: &HashMap<String, String>,
1484    ) -> Mutation {
1485        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1486            source_id: source_id.as_raw_id(),
1487            split_offsets: split_offsets.clone(),
1488        })
1489    }
1490
1491    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1492    pub(super) fn create_streaming_job_actors_to_create(
1493        info: &CreateStreamingJobCommandInfo,
1494        edges: &mut FragmentEdgeBuildResult,
1495        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1496        actor_location: &HashMap<ActorId, WorkerId>,
1497    ) -> StreamJobActorsToCreate {
1498        {
1499            {
1500                edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1501                    |fragment| {
1502                        let actors = stream_actors
1503                            .get(&fragment.fragment_id)
1504                            .into_iter()
1505                            .flatten()
1506                            .map(|actor| (actor, actor_location[&actor.actor_id]));
1507                        (
1508                            fragment.fragment_id,
1509                            &fragment.nodes,
1510                            actors,
1511                            [], // no subscriber for new job to create
1512                        )
1513                    },
1514                ))
1515            }
1516        }
1517    }
1518
1519    /// Collect actors to create for `RescheduleIntent`.
1520    pub(super) fn reschedule_actors_to_create(
1521        reschedules: &HashMap<FragmentId, Reschedule>,
1522        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1523        database_info: &InflightDatabaseInfo,
1524        control_stream_manager: &ControlStreamManager,
1525    ) -> StreamJobActorsToCreate {
1526        {
1527            {
1528                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1529                    reschedules.iter().map(|(fragment_id, reschedule)| {
1530                        (
1531                            *fragment_id,
1532                            reschedule.newly_created_actors.values().map(
1533                                |((actor, dispatchers), _)| {
1534                                    (actor.actor_id, dispatchers.as_slice())
1535                                },
1536                            ),
1537                        )
1538                    }),
1539                    Some((reschedules, fragment_actors)),
1540                    database_info,
1541                    control_stream_manager,
1542                );
1543                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1544                for (fragment_id, (actor, dispatchers), worker_id) in
1545                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1546                        reschedule
1547                            .newly_created_actors
1548                            .values()
1549                            .map(|(actors, status)| (*fragment_id, actors, status))
1550                    })
1551                {
1552                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1553                    map.entry(*worker_id)
1554                        .or_default()
1555                        .entry(fragment_id)
1556                        .or_insert_with(|| {
1557                            let node = database_info.fragment(fragment_id).nodes.clone();
1558                            let subscribers =
1559                                database_info.fragment_subscribers(fragment_id).collect();
1560                            (node, vec![], subscribers)
1561                        })
1562                        .1
1563                        .push((actor.clone(), upstreams, dispatchers.clone()));
1564                }
1565                map
1566            }
1567        }
1568    }
1569
1570    /// Collect actors to create for `ReplaceStreamJob`.
1571    pub(super) fn replace_stream_job_actors_to_create(
1572        replace_table: &ReplaceStreamJobPlan,
1573        edges: &mut FragmentEdgeBuildResult,
1574        database_info: &InflightDatabaseInfo,
1575        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1576        actor_location: &HashMap<ActorId, WorkerId>,
1577    ) -> StreamJobActorsToCreate {
1578        {
1579            {
1580                let mut actors = edges.collect_actors_to_create(
1581                    replace_table
1582                        .new_fragments
1583                        .fragments
1584                        .values()
1585                        .map(|fragment| {
1586                            let actors = stream_actors
1587                                .get(&fragment.fragment_id)
1588                                .into_iter()
1589                                .flatten()
1590                                .map(|actor| (actor, actor_location[&actor.actor_id]));
1591                            (
1592                                fragment.fragment_id,
1593                                &fragment.nodes,
1594                                actors,
1595                                database_info
1596                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1597                            )
1598                        }),
1599                );
1600
1601                // Handle auto-refresh schema sinks
1602                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1603                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1604                        (
1605                            sink.new_fragment.fragment_id,
1606                            &sink.new_fragment.nodes,
1607                            stream_actors
1608                                .get(&sink.new_fragment.fragment_id)
1609                                .into_iter()
1610                                .flatten()
1611                                .map(|actor| (actor, actor_location[&actor.actor_id])),
1612                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1613                        )
1614                    }));
1615                    for (worker_id, fragment_actors) in sink_actors {
1616                        actors.entry(worker_id).or_default().extend(fragment_actors);
1617                    }
1618                }
1619                actors
1620            }
1621        }
1622    }
1623
1624    fn generate_update_mutation_for_replace_table(
1625        dropped_actors: impl IntoIterator<Item = ActorId>,
1626        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1627        dispatchers: FragmentActorDispatchers,
1628        split_assignment: &SplitAssignment,
1629        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1630        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1631    ) -> Option<Mutation> {
1632        let dropped_actors = dropped_actors.into_iter().collect();
1633
1634        let actor_new_dispatchers = dispatchers
1635            .into_values()
1636            .flatten()
1637            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1638            .collect();
1639
1640        let actor_splits = split_assignment
1641            .values()
1642            .flat_map(build_actor_connector_splits)
1643            .collect();
1644        Some(Mutation::Update(UpdateMutation {
1645            actor_new_dispatchers,
1646            merge_update: merge_updates.into_values().flatten().collect(),
1647            dropped_actors,
1648            actor_splits,
1649            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1650            sink_schema_change: auto_refresh_schema_sinks
1651                .as_ref()
1652                .into_iter()
1653                .flat_map(|sinks| {
1654                    sinks.iter().map(|sink| {
1655                        (
1656                            sink.original_sink.id.as_raw_id(),
1657                            PbSinkSchemaChange {
1658                                original_schema: sink
1659                                    .original_sink
1660                                    .columns
1661                                    .iter()
1662                                    .map(|col| PbField {
1663                                        data_type: Some(
1664                                            col.column_desc
1665                                                .as_ref()
1666                                                .unwrap()
1667                                                .column_type
1668                                                .as_ref()
1669                                                .unwrap()
1670                                                .clone(),
1671                                        ),
1672                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1673                                    })
1674                                    .collect(),
1675                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1676                                    fields: sink
1677                                        .newly_add_fields
1678                                        .iter()
1679                                        .map(|field| field.to_prost())
1680                                        .collect(),
1681                                })),
1682                            },
1683                        )
1684                    })
1685                })
1686                .collect(),
1687            ..Default::default()
1688        }))
1689    }
1690}
1691
1692impl Command {
1693    #[expect(clippy::type_complexity)]
1694    pub(super) fn collect_database_partial_graph_actor_upstreams(
1695        actor_dispatchers: impl Iterator<
1696            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1697        >,
1698        reschedule_dispatcher_update: Option<(
1699            &HashMap<FragmentId, Reschedule>,
1700            &HashMap<FragmentId, HashSet<ActorId>>,
1701        )>,
1702        database_info: &InflightDatabaseInfo,
1703        control_stream_manager: &ControlStreamManager,
1704    ) -> HashMap<ActorId, ActorUpstreams> {
1705        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1706        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1707            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1708            for (upstream_actor_id, dispatchers) in upstream_actors {
1709                let upstream_actor_location =
1710                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1711                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1712                for downstream_actor_id in dispatchers
1713                    .iter()
1714                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1715                {
1716                    actor_upstreams
1717                        .entry(*downstream_actor_id)
1718                        .or_default()
1719                        .entry(upstream_fragment_id)
1720                        .or_default()
1721                        .insert(
1722                            upstream_actor_id,
1723                            PbActorInfo {
1724                                actor_id: upstream_actor_id,
1725                                host: Some(upstream_actor_host.clone()),
1726                                partial_graph_id: to_partial_graph_id(
1727                                    database_info.database_id,
1728                                    None,
1729                                ),
1730                            },
1731                        );
1732                }
1733            }
1734        }
1735        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1736            for reschedule in reschedules.values() {
1737                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1738                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1739                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1740                    for upstream_actor_id in fragment_actors
1741                        .get(upstream_fragment_id)
1742                        .expect("should exist")
1743                    {
1744                        let upstream_actor_location =
1745                            upstream_fragment.actors[upstream_actor_id].worker_id;
1746                        let upstream_actor_host =
1747                            control_stream_manager.host_addr(upstream_actor_location);
1748                        if let Some(upstream_reschedule) = upstream_reschedule
1749                            && upstream_reschedule
1750                                .removed_actors
1751                                .contains(upstream_actor_id)
1752                        {
1753                            continue;
1754                        }
1755                        for (_, downstream_actor_id) in
1756                            reschedule
1757                                .added_actors
1758                                .iter()
1759                                .flat_map(|(worker_id, actors)| {
1760                                    actors.iter().map(|actor| (*worker_id, *actor))
1761                                })
1762                        {
1763                            actor_upstreams
1764                                .entry(downstream_actor_id)
1765                                .or_default()
1766                                .entry(*upstream_fragment_id)
1767                                .or_default()
1768                                .insert(
1769                                    *upstream_actor_id,
1770                                    PbActorInfo {
1771                                        actor_id: *upstream_actor_id,
1772                                        host: Some(upstream_actor_host.clone()),
1773                                        partial_graph_id: to_partial_graph_id(
1774                                            database_info.database_id,
1775                                            None,
1776                                        ),
1777                                    },
1778                                );
1779                        }
1780                    }
1781                }
1782            }
1783        }
1784        actor_upstreams
1785    }
1786}