risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49    PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50    StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51    UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::partial_graph::PartialGraphBarrierInfo;
61use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
62use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::scale::LoadedFragmentContext;
65use crate::controller::utils::StreamingJobExtraInfo;
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
71    StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::{
74    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
75    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
76    build_actor_connector_splits,
77};
78use crate::{MetaError, MetaResult};
79
80/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
81/// used for actor scaling or migration.
82#[derive(Debug, Clone)]
83pub struct Reschedule {
84    /// Added actors in this fragment.
85    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87    /// Removed actors in this fragment.
88    pub removed_actors: HashSet<ActorId>,
89
90    /// Vnode bitmap updates for some actors in this fragment.
91    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
94    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95    /// New hash mapping of the upstream dispatcher to be updated.
96    ///
97    /// This field exists only when there's upstream fragment and the current fragment is
98    /// hash-sharded.
99    pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101    /// The downstream fragments of this fragment.
102    pub downstream_fragment_ids: Vec<FragmentId>,
103
104    /// Reassigned splits for source actors.
105    /// It becomes the `actor_splits` in [`UpdateMutation`].
106    /// `Source` and `SourceBackfill` are handled together here.
107    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110}
111
112#[derive(Debug, Clone)]
113pub struct ReschedulePlan {
114    pub reschedules: HashMap<FragmentId, Reschedule>,
115    /// Should contain the actor ids in upstream and downstream fragments referenced by
116    /// `reschedules`.
117    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
118}
119
120/// Preloaded context for rescheduling, built outside the barrier worker.
121#[derive(Debug, Clone)]
122pub struct RescheduleContext {
123    pub loaded: LoadedFragmentContext,
124    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
125    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
128}
129
130impl RescheduleContext {
131    pub fn empty() -> Self {
132        Self {
133            loaded: LoadedFragmentContext::default(),
134            job_extra_info: HashMap::new(),
135            upstream_fragments: HashMap::new(),
136            downstream_fragments: HashMap::new(),
137            downstream_relations: HashMap::new(),
138        }
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.loaded.is_empty()
143    }
144
145    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
146        let loaded = self.loaded.for_database(database_id)?;
147        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
148        // Use the filtered loaded context as the source of truth so every side map is pruned by
149        // the same fragment set.
150        let fragment_ids: HashSet<FragmentId> = loaded
151            .job_fragments
152            .values()
153            .flat_map(|fragments| fragments.keys().copied())
154            .collect();
155
156        let job_extra_info = self
157            .job_extra_info
158            .iter()
159            .filter(|(job_id, _)| job_ids.contains(*job_id))
160            .map(|(job_id, info)| (*job_id, info.clone()))
161            .collect();
162
163        let upstream_fragments = self
164            .upstream_fragments
165            .iter()
166            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
167            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
168            .collect();
169
170        let downstream_fragments = self
171            .downstream_fragments
172            .iter()
173            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
174            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
175            .collect();
176
177        let downstream_relations = self
178            .downstream_relations
179            .iter()
180            // Ownership of this map is source-fragment based. We keep all downstream edges for
181            // selected sources because the target side can still be referenced during dispatcher
182            // reconstruction even if that target fragment is not being rescheduled.
183            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
184            .map(|(key, relation)| (*key, relation.clone()))
185            .collect();
186
187        Some(Self {
188            loaded,
189            job_extra_info,
190            upstream_fragments,
191            downstream_fragments,
192            downstream_relations,
193        })
194    }
195
196    /// Split this context into per-database contexts without cloning the large loaded graph
197    /// payloads.
198    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
199        let Self {
200            loaded,
201            job_extra_info,
202            upstream_fragments,
203            downstream_fragments,
204            downstream_relations,
205        } = self;
206
207        let mut contexts: HashMap<_, _> = loaded
208            .into_database_contexts()
209            .into_iter()
210            .map(|(database_id, loaded)| {
211                (
212                    database_id,
213                    Self {
214                        loaded,
215                        job_extra_info: HashMap::new(),
216                        upstream_fragments: HashMap::new(),
217                        downstream_fragments: HashMap::new(),
218                        downstream_relations: HashMap::new(),
219                    },
220                )
221            })
222            .collect();
223
224        if contexts.is_empty() {
225            return contexts;
226        }
227
228        let mut job_databases = HashMap::new();
229        let mut fragment_databases = HashMap::new();
230        for (&database_id, context) in &contexts {
231            for job_id in context.loaded.job_map.keys().copied() {
232                job_databases.insert(job_id, database_id);
233            }
234            for fragment_id in context
235                .loaded
236                .job_fragments
237                .values()
238                .flat_map(|fragments| fragments.keys().copied())
239            {
240                fragment_databases.insert(fragment_id, database_id);
241            }
242        }
243
244        for (job_id, info) in job_extra_info {
245            if let Some(database_id) = job_databases.get(&job_id).copied() {
246                contexts
247                    .get_mut(&database_id)
248                    .expect("database context should exist for job")
249                    .job_extra_info
250                    .insert(job_id, info);
251            }
252        }
253
254        for (fragment_id, upstreams) in upstream_fragments {
255            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
256                contexts
257                    .get_mut(&database_id)
258                    .expect("database context should exist for fragment")
259                    .upstream_fragments
260                    .insert(fragment_id, upstreams);
261            }
262        }
263
264        for (fragment_id, downstreams) in downstream_fragments {
265            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
266                contexts
267                    .get_mut(&database_id)
268                    .expect("database context should exist for fragment")
269                    .downstream_fragments
270                    .insert(fragment_id, downstreams);
271            }
272        }
273
274        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
275            // Route by source fragment ownership. A target may be outside of current reschedule
276            // set, but this edge still belongs to the source-side command.
277            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
278                contexts
279                    .get_mut(&database_id)
280                    .expect("database context should exist for relation source")
281                    .downstream_relations
282                    .insert((source_fragment_id, target_fragment_id), relation);
283            }
284        }
285
286        contexts
287    }
288}
289
290/// Replacing an old job with a new one. All actors in the job will be rebuilt.
291///
292/// Current use cases:
293/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
294/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
295///   will replace a table job's plan.
296#[derive(Debug, Clone)]
297pub struct ReplaceStreamJobPlan {
298    pub old_fragments: StreamJobFragments,
299    pub new_fragments: StreamJobFragmentsToCreate,
300    /// The resource group of the database this job belongs to.
301    pub database_resource_group: String,
302    /// Downstream jobs of the replaced job need to update their `Merge` node to
303    /// connect to the new fragment.
304    pub replace_upstream: FragmentReplaceUpstream,
305    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
306    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
307    /// inside the barrier worker.
308    pub split_plan: ReplaceJobSplitPlan,
309    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
310    pub streaming_job: StreamingJob,
311    /// The `streaming_job::Model` for this job, loaded from meta store.
312    pub streaming_job_model: streaming_job::Model,
313    /// The temporary dummy job fragments id of new table fragment
314    pub tmp_id: JobId,
315    /// The state table ids to be dropped.
316    pub to_drop_state_table_ids: Vec<TableId>,
317    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
318}
319
320impl ReplaceStreamJobPlan {
321    /// `old_fragment_id` -> `new_fragment_id`
322    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
323        let mut fragment_replacements = HashMap::new();
324        for (upstream_fragment_id, new_upstream_fragment_id) in
325            self.replace_upstream.values().flatten()
326        {
327            {
328                let r =
329                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
330                if let Some(r) = r {
331                    assert_eq!(
332                        *new_upstream_fragment_id, r,
333                        "one fragment is replaced by multiple fragments"
334                    );
335                }
336            }
337        }
338        fragment_replacements
339    }
340}
341
342#[derive(educe::Educe, Clone)]
343#[educe(Debug)]
344pub struct CreateStreamingJobCommandInfo {
345    #[educe(Debug(ignore))]
346    pub stream_job_fragments: StreamJobFragmentsToCreate,
347    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
348    /// The resource group of the database this job belongs to.
349    pub database_resource_group: String,
350    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
351    pub init_split_assignment: SourceSplitAssignment,
352    pub definition: String,
353    pub job_type: StreamingJobType,
354    pub create_type: CreateType,
355    pub streaming_job: StreamingJob,
356    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
357    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
358    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
359    pub is_serverless: bool,
360    /// The `streaming_job::Model` for this job, loaded from meta store.
361    pub streaming_job_model: streaming_job::Model,
362}
363
364impl StreamJobFragments {
365    /// Build fragment-level info for new fragments, populating actor infos from the
366    /// rendered actors and their locations, and applying split assignment to actor splits.
367    pub(super) fn new_fragment_info<'a>(
368        &'a self,
369        stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
370        actor_location: &'a HashMap<ActorId, WorkerId>,
371        assignment: &'a SplitAssignment,
372    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
373        self.fragments.values().map(|fragment| {
374            (
375                fragment.fragment_id,
376                InflightFragmentInfo {
377                    fragment_id: fragment.fragment_id,
378                    distribution_type: fragment.distribution_type.into(),
379                    fragment_type_mask: fragment.fragment_type_mask,
380                    vnode_count: fragment.vnode_count(),
381                    nodes: fragment.nodes.clone(),
382                    actors: stream_actors
383                        .get(&fragment.fragment_id)
384                        .into_iter()
385                        .flatten()
386                        .map(|actor| {
387                            (
388                                actor.actor_id,
389                                InflightActorInfo {
390                                    worker_id: actor_location[&actor.actor_id],
391                                    vnode_bitmap: actor.vnode_bitmap.clone(),
392                                    splits: assignment
393                                        .get(&fragment.fragment_id)
394                                        .and_then(|s| s.get(&actor.actor_id))
395                                        .cloned()
396                                        .unwrap_or_default(),
397                                },
398                            )
399                        })
400                        .collect(),
401                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
402                },
403            )
404        })
405    }
406}
407
408#[derive(Debug, Clone)]
409pub struct SnapshotBackfillInfo {
410    /// `table_id` -> `Some(snapshot_backfill_epoch)`
411    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
412    /// by global barrier worker when handling the command.
413    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
414}
415
416#[derive(Debug, Clone)]
417pub enum CreateStreamingJobType {
418    Normal,
419    SinkIntoTable(UpstreamSinkInfo),
420    SnapshotBackfill(SnapshotBackfillInfo),
421}
422
423/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
424/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
425/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
426// FIXME: this enum is significantly large on stack, box it
427#[derive(Debug)]
428pub enum Command {
429    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
430    /// all messages before the checkpoint barrier should have been committed.
431    Flush,
432
433    /// `Pause` command generates a `Pause` barrier **only if**
434    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
435    Pause,
436
437    /// `Resume` command generates a `Resume` barrier **only
438    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
439    /// will be generated.
440    Resume,
441
442    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
443    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
444    /// dropped by reference counts before.
445    ///
446    /// Barriers from the actors to be dropped will STILL be collected.
447    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
448    /// drop actors, and then delete the job fragments info from meta store.
449    DropStreamingJobs {
450        streaming_job_ids: HashSet<JobId>,
451        unregistered_state_table_ids: HashSet<TableId>,
452        unregistered_fragment_ids: HashSet<FragmentId>,
453        // target_fragment -> [sink_fragments]
454        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
455    },
456
457    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
458    ///
459    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
460    /// be collected since the barrier should be passthrough.
461    ///
462    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
463    /// it adds the job fragments info to meta store. However, the creating progress will **last
464    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
465    /// will be set to `Created`.
466    CreateStreamingJob {
467        info: CreateStreamingJobCommandInfo,
468        job_type: CreateStreamingJobType,
469        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
470    },
471
472    /// Reschedule context. It must be resolved inside the barrier worker before injection.
473    RescheduleIntent {
474        context: RescheduleContext,
475        /// Filled by the barrier worker after resolving `context` against current worker topology.
476        ///
477        /// We keep unresolved `context` outside of checkpoint state and only materialize this
478        /// execution plan right before injection, then drop `context` to release memory earlier.
479        reschedule_plan: Option<ReschedulePlan>,
480    },
481
482    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
483    /// essentially switching the downstream of the old job fragments to the new ones, and
484    /// dropping the old job fragments. Used for schema change.
485    ///
486    /// This can be treated as a special case of reschedule, while the upstream fragment
487    /// of the Merge executors are changed additionally.
488    ReplaceStreamJob(ReplaceStreamJobPlan),
489
490    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
491    /// changed splits.
492    SourceChangeSplit(SplitState),
493
494    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
495    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
496    Throttle {
497        jobs: HashSet<JobId>,
498        config: HashMap<FragmentId, ThrottleConfig>,
499    },
500
501    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
502    /// materialize executor to start storing old value for subscription.
503    CreateSubscription {
504        subscription_id: SubscriptionId,
505        upstream_mv_table_id: TableId,
506        retention_second: u64,
507    },
508
509    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
510    /// materialize executor to stop storing old value when there is no
511    /// subscription depending on it.
512    DropSubscription {
513        subscription_id: SubscriptionId,
514        upstream_mv_table_id: TableId,
515    },
516
517    /// `AlterSubscriptionRetention` command updates the subscription retention time.
518    AlterSubscriptionRetention {
519        subscription_id: SubscriptionId,
520        upstream_mv_table_id: TableId,
521        retention_second: u64,
522    },
523
524    ConnectorPropsChange(ConnectorPropsChange),
525
526    /// `Refresh` command generates a barrier to refresh a table by truncating state
527    /// and reloading data from source.
528    Refresh {
529        table_id: TableId,
530        associated_source_id: SourceId,
531    },
532    ListFinish {
533        table_id: TableId,
534        associated_source_id: SourceId,
535    },
536    LoadFinish {
537        table_id: TableId,
538        associated_source_id: SourceId,
539    },
540
541    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
542    /// Used when upstream binlog/oplog has expired.
543    ResetSource {
544        source_id: SourceId,
545    },
546
547    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
548    /// to resume for troubleshooting.
549    ResumeBackfill {
550        target: ResumeBackfillTarget,
551    },
552
553    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
554    /// into source splits (UNSAFE - admin only).
555    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
556    InjectSourceOffsets {
557        source_id: SourceId,
558        /// Split ID -> offset (JSON-encoded based on connector type)
559        split_offsets: HashMap<String, String>,
560    },
561}
562
563#[derive(Debug, Clone, Copy)]
564pub enum ResumeBackfillTarget {
565    Job(JobId),
566    Fragment(FragmentId),
567}
568
569// For debugging and observability purposes. Can add more details later if needed.
570impl std::fmt::Display for Command {
571    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
572        match self {
573            Command::Flush => write!(f, "Flush"),
574            Command::Pause => write!(f, "Pause"),
575            Command::Resume => write!(f, "Resume"),
576            Command::DropStreamingJobs {
577                streaming_job_ids, ..
578            } => {
579                write!(
580                    f,
581                    "DropStreamingJobs: {}",
582                    streaming_job_ids.iter().sorted().join(", ")
583                )
584            }
585            Command::CreateStreamingJob { info, .. } => {
586                write!(f, "CreateStreamingJob: {}", info.streaming_job)
587            }
588            Command::RescheduleIntent {
589                reschedule_plan, ..
590            } => {
591                if reschedule_plan.is_some() {
592                    write!(f, "RescheduleIntent(planned)")
593                } else {
594                    write!(f, "RescheduleIntent")
595                }
596            }
597            Command::ReplaceStreamJob(plan) => {
598                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
599            }
600            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
601            Command::Throttle { .. } => write!(f, "Throttle"),
602            Command::CreateSubscription {
603                subscription_id, ..
604            } => write!(f, "CreateSubscription: {subscription_id}"),
605            Command::DropSubscription {
606                subscription_id, ..
607            } => write!(f, "DropSubscription: {subscription_id}"),
608            Command::AlterSubscriptionRetention {
609                subscription_id,
610                retention_second,
611                ..
612            } => write!(
613                f,
614                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
615            ),
616            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
617            Command::Refresh {
618                table_id,
619                associated_source_id,
620            } => write!(
621                f,
622                "Refresh: {} (source: {})",
623                table_id, associated_source_id
624            ),
625            Command::ListFinish {
626                table_id,
627                associated_source_id,
628            } => write!(
629                f,
630                "ListFinish: {} (source: {})",
631                table_id, associated_source_id
632            ),
633            Command::LoadFinish {
634                table_id,
635                associated_source_id,
636            } => write!(
637                f,
638                "LoadFinish: {} (source: {})",
639                table_id, associated_source_id
640            ),
641            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
642            Command::ResumeBackfill { target } => match target {
643                ResumeBackfillTarget::Job(job_id) => {
644                    write!(f, "ResumeBackfill: job={job_id}")
645                }
646                ResumeBackfillTarget::Fragment(fragment_id) => {
647                    write!(f, "ResumeBackfill: fragment={fragment_id}")
648                }
649            },
650            Command::InjectSourceOffsets {
651                source_id,
652                split_offsets,
653            } => write!(
654                f,
655                "InjectSourceOffsets: {} ({} splits)",
656                source_id,
657                split_offsets.len()
658            ),
659        }
660    }
661}
662
663impl Command {
664    pub fn pause() -> Self {
665        Self::Pause
666    }
667
668    pub fn resume() -> Self {
669        Self::Resume
670    }
671
672    pub fn need_checkpoint(&self) -> bool {
673        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
674        !matches!(self, Command::Resume)
675    }
676}
677
678#[derive(Debug)]
679pub enum PostCollectCommand {
680    Command(String),
681    DropStreamingJobs {
682        streaming_job_ids: HashSet<JobId>,
683        unregistered_state_table_ids: HashSet<TableId>,
684    },
685    CreateStreamingJob {
686        info: CreateStreamingJobCommandInfo,
687        job_type: CreateStreamingJobType,
688        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
689        resolved_split_assignment: SplitAssignment,
690    },
691    Reschedule {
692        reschedules: HashMap<FragmentId, Reschedule>,
693    },
694    ReplaceStreamJob {
695        plan: ReplaceStreamJobPlan,
696        resolved_split_assignment: SplitAssignment,
697    },
698    SourceChangeSplit {
699        split_assignment: SplitAssignment,
700    },
701    CreateSubscription {
702        subscription_id: SubscriptionId,
703    },
704    ConnectorPropsChange(ConnectorPropsChange),
705    ResumeBackfill {
706        target: ResumeBackfillTarget,
707    },
708}
709
710impl PostCollectCommand {
711    pub fn barrier() -> Self {
712        PostCollectCommand::Command("barrier".to_owned())
713    }
714
715    pub fn should_checkpoint(&self) -> bool {
716        match self {
717            PostCollectCommand::DropStreamingJobs { .. }
718            | PostCollectCommand::CreateStreamingJob { .. }
719            | PostCollectCommand::Reschedule { .. }
720            | PostCollectCommand::ReplaceStreamJob { .. }
721            | PostCollectCommand::SourceChangeSplit { .. }
722            | PostCollectCommand::CreateSubscription { .. }
723            | PostCollectCommand::ConnectorPropsChange(_)
724            | PostCollectCommand::ResumeBackfill { .. } => true,
725            PostCollectCommand::Command(_) => false,
726        }
727    }
728
729    pub fn command_name(&self) -> &str {
730        match self {
731            PostCollectCommand::Command(name) => name.as_str(),
732            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
733            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
734            PostCollectCommand::Reschedule { .. } => "Reschedule",
735            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
736            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
737            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
738            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
739            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
740        }
741    }
742}
743
744impl Display for PostCollectCommand {
745    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746        f.write_str(self.command_name())
747    }
748}
749
750#[derive(Debug, Clone, PartialEq, Eq)]
751pub enum BarrierKind {
752    Initial,
753    Barrier,
754    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
755    Checkpoint(Vec<u64>),
756}
757
758impl BarrierKind {
759    pub fn to_protobuf(&self) -> PbBarrierKind {
760        match self {
761            BarrierKind::Initial => PbBarrierKind::Initial,
762            BarrierKind::Barrier => PbBarrierKind::Barrier,
763            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
764        }
765    }
766
767    pub fn is_checkpoint(&self) -> bool {
768        matches!(self, BarrierKind::Checkpoint(_))
769    }
770
771    pub fn is_initial(&self) -> bool {
772        matches!(self, BarrierKind::Initial)
773    }
774
775    pub fn as_str_name(&self) -> &'static str {
776        match self {
777            BarrierKind::Initial => "Initial",
778            BarrierKind::Barrier => "Barrier",
779            BarrierKind::Checkpoint(_) => "Checkpoint",
780        }
781    }
782}
783
784impl BarrierInfo {
785    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
786        let Some(truncate_timestamptz) = Timestamptz::from_secs(
787            self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
788        ) else {
789            warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
790            return self.prev_epoch.value();
791        };
792        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
793    }
794}
795
796impl Command {
797    pub(super) fn collect_commit_epoch_info(
798        database_info: &InflightDatabaseInfo,
799        barrier_info: &PartialGraphBarrierInfo,
800        info: &mut CommitEpochInfo,
801        resps: Vec<BarrierCompleteResponse>,
802        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
803    ) {
804        let (
805            sst_to_context,
806            synced_ssts,
807            new_table_watermarks,
808            old_value_ssts,
809            vector_index_adds,
810            truncate_tables,
811        ) = collect_resp_info(resps);
812
813        let new_table_fragment_infos =
814            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
815                &barrier_info.post_collect_command
816            {
817                assert!(!matches!(
818                    job_type,
819                    CreateStreamingJobType::SnapshotBackfill(_)
820                ));
821                let table_fragments = &info.stream_job_fragments;
822                let mut table_ids: HashSet<_> =
823                    table_fragments.internal_table_ids().into_iter().collect();
824                if let Some(mv_table_id) = table_fragments.mv_table_id() {
825                    table_ids.insert(mv_table_id);
826                }
827
828                vec![NewTableFragmentInfo { table_ids }]
829            } else {
830                vec![]
831            };
832
833        let mut mv_log_store_truncate_epoch = HashMap::new();
834        // TODO: may collect cross db snapshot backfill
835        let mut update_truncate_epoch =
836            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
837                Entry::Occupied(mut entry) => {
838                    let prev_truncate_epoch = entry.get_mut();
839                    if truncate_epoch < *prev_truncate_epoch {
840                        *prev_truncate_epoch = truncate_epoch;
841                    }
842                }
843                Entry::Vacant(entry) => {
844                    entry.insert(truncate_epoch);
845                }
846            };
847        for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
848            let truncate_epoch = barrier_info
849                .barrier_info
850                .get_truncate_epoch(max_retention)
851                .0;
852            update_truncate_epoch(mv_table_id, truncate_epoch);
853        }
854        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
855            for mv_table_id in upstream_mv_table_ids {
856                update_truncate_epoch(mv_table_id, backfill_epoch);
857            }
858        }
859
860        let table_new_change_log = build_table_change_log_delta(
861            old_value_ssts.into_iter(),
862            synced_ssts.iter().map(|sst| &sst.sst_info),
863            must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
864            mv_log_store_truncate_epoch.into_iter(),
865        );
866
867        let epoch = barrier_info.barrier_info.prev_epoch();
868        for table_id in &barrier_info.table_ids_to_commit {
869            info.tables_to_commit
870                .try_insert(*table_id, epoch)
871                .expect("non duplicate");
872        }
873
874        info.sstables.extend(synced_ssts);
875        info.new_table_watermarks.extend(new_table_watermarks);
876        info.sst_to_context.extend(sst_to_context);
877        info.new_table_fragment_infos
878            .extend(new_table_fragment_infos);
879        info.change_log_delta.extend(table_new_change_log);
880        for (table_id, vector_index_adds) in vector_index_adds {
881            info.vector_index_delta
882                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
883                .expect("non-duplicate");
884        }
885        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
886            &barrier_info.post_collect_command
887            && let Some(index_table) = collect_new_vector_index_info(job_info)
888        {
889            info.vector_index_delta
890                .try_insert(
891                    index_table.id,
892                    VectorIndexDelta::Init(PbVectorIndexInit {
893                        info: Some(index_table.vector_index_info.unwrap()),
894                    }),
895                )
896                .expect("non-duplicate");
897        }
898        info.truncate_tables.extend(truncate_tables);
899    }
900}
901
902impl Command {
903    /// Build the `Pause` mutation.
904    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
905        {
906            {
907                // Only pause when the cluster is not already paused.
908                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
909                if !is_currently_paused {
910                    Some(Mutation::Pause(PauseMutation {}))
911                } else {
912                    None
913                }
914            }
915        }
916    }
917
918    /// Build the `Resume` mutation.
919    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
920        {
921            {
922                // Only resume when the cluster is paused with the same reason.
923                if is_currently_paused {
924                    Some(Mutation::Resume(ResumeMutation {}))
925                } else {
926                    None
927                }
928            }
929        }
930    }
931
932    /// Build the `Splits` mutation for `SourceChangeSplit`.
933    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
934        {
935            {
936                let mut diff = HashMap::new();
937
938                for actor_splits in split_assignment.values() {
939                    diff.extend(actor_splits.clone());
940                }
941
942                Mutation::Splits(SourceChangeSplitMutation {
943                    actor_splits: build_actor_connector_splits(&diff),
944                })
945            }
946        }
947    }
948
949    /// Build the `Throttle` mutation.
950    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
951        {
952            {
953                let config = config.clone();
954                Mutation::Throttle(ThrottleMutation {
955                    fragment_throttle: config,
956                })
957            }
958        }
959    }
960
961    /// Build the `Stop` mutation for `DropStreamingJobs`.
962    pub(super) fn drop_streaming_jobs_to_mutation(
963        actors: &Vec<ActorId>,
964        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
965    ) -> Mutation {
966        {
967            Mutation::Stop(StopMutation {
968                actors: actors.clone(),
969                dropped_sink_fragments: dropped_sink_fragment_by_targets
970                    .values()
971                    .flatten()
972                    .cloned()
973                    .collect(),
974            })
975        }
976    }
977
978    /// Build the `Add` mutation for `CreateStreamingJob`.
979    pub(super) fn create_streaming_job_to_mutation(
980        info: &CreateStreamingJobCommandInfo,
981        job_type: &CreateStreamingJobType,
982        is_currently_paused: bool,
983        edges: &mut FragmentEdgeBuildResult,
984        control_stream_manager: &ControlStreamManager,
985        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
986        split_assignment: &SplitAssignment,
987        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
988        actor_location: &HashMap<ActorId, WorkerId>,
989    ) -> MetaResult<Mutation> {
990        {
991            {
992                let CreateStreamingJobCommandInfo {
993                    stream_job_fragments,
994                    upstream_fragment_downstreams,
995                    fragment_backfill_ordering,
996                    streaming_job,
997                    ..
998                } = info;
999                let database_id = streaming_job.database_id();
1000                let added_actors: Vec<ActorId> = stream_actors
1001                    .values()
1002                    .flatten()
1003                    .map(|actor| actor.actor_id)
1004                    .collect();
1005                let actor_splits = split_assignment
1006                    .values()
1007                    .flat_map(build_actor_connector_splits)
1008                    .collect();
1009                let subscriptions_to_add =
1010                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1011                        job_type
1012                    {
1013                        snapshot_backfill_info
1014                            .upstream_mv_table_id_to_backfill_epoch
1015                            .keys()
1016                            .map(|table_id| SubscriptionUpstreamInfo {
1017                                subscriber_id: stream_job_fragments
1018                                    .stream_job_id()
1019                                    .as_subscriber_id(),
1020                                upstream_mv_table_id: *table_id,
1021                            })
1022                            .collect()
1023                    } else {
1024                        Default::default()
1025                    };
1026                let backfill_nodes_to_pause: Vec<_> =
1027                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1028                        .into_iter()
1029                        .collect();
1030
1031                let new_upstream_sinks =
1032                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1033                        sink_fragment_id,
1034                        sink_output_fields,
1035                        project_exprs,
1036                        new_sink_downstream,
1037                        ..
1038                    }) = job_type
1039                    {
1040                        let new_sink_actors = stream_actors
1041                            .get(sink_fragment_id)
1042                            .unwrap_or_else(|| {
1043                                panic!("upstream sink fragment {sink_fragment_id} not exist")
1044                            })
1045                            .iter()
1046                            .map(|actor| {
1047                                let worker_id = actor_location[&actor.actor_id];
1048                                PbActorInfo {
1049                                    actor_id: actor.actor_id,
1050                                    host: Some(control_stream_manager.host_addr(worker_id)),
1051                                    partial_graph_id: to_partial_graph_id(database_id, None),
1052                                }
1053                            });
1054                        let new_upstream_sink = PbNewUpstreamSink {
1055                            info: Some(PbUpstreamSinkInfo {
1056                                upstream_fragment_id: *sink_fragment_id,
1057                                sink_output_schema: sink_output_fields.clone(),
1058                                project_exprs: project_exprs.clone(),
1059                            }),
1060                            upstream_actors: new_sink_actors.collect(),
1061                        };
1062                        HashMap::from([(
1063                            new_sink_downstream.downstream_fragment_id,
1064                            new_upstream_sink,
1065                        )])
1066                    } else {
1067                        HashMap::new()
1068                    };
1069
1070                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1071                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1072
1073                let add_mutation = AddMutation {
1074                    actor_dispatchers: edges
1075                        .dispatchers
1076                        .extract_if(|fragment_id, _| {
1077                            upstream_fragment_downstreams.contains_key(fragment_id)
1078                        })
1079                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1080                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1081                        .collect(),
1082                    added_actors,
1083                    actor_splits,
1084                    // If the cluster is already paused, the new actors should be paused too.
1085                    pause: is_currently_paused,
1086                    subscriptions_to_add,
1087                    backfill_nodes_to_pause,
1088                    actor_cdc_table_snapshot_splits,
1089                    new_upstream_sinks,
1090                };
1091
1092                Ok(Mutation::Add(add_mutation))
1093            }
1094        }
1095    }
1096
1097    /// Build the `Update` mutation for `ReplaceStreamJob`.
1098    pub(super) fn replace_stream_job_to_mutation(
1099        ReplaceStreamJobPlan {
1100            old_fragments,
1101            replace_upstream,
1102            upstream_fragment_downstreams,
1103            auto_refresh_schema_sinks,
1104            ..
1105        }: &ReplaceStreamJobPlan,
1106        edges: &mut FragmentEdgeBuildResult,
1107        database_info: &mut InflightDatabaseInfo,
1108        split_assignment: &SplitAssignment,
1109    ) -> MetaResult<Option<Mutation>> {
1110        {
1111            {
1112                let merge_updates = edges
1113                    .merge_updates
1114                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1115                    .collect();
1116                let dispatchers = edges
1117                    .dispatchers
1118                    .extract_if(|fragment_id, _| {
1119                        upstream_fragment_downstreams.contains_key(fragment_id)
1120                    })
1121                    .collect();
1122                let actor_cdc_table_snapshot_splits = database_info
1123                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1124                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1125                let old_fragments = old_fragments.fragments.keys().copied();
1126                let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1127                    .as_ref()
1128                    .into_iter()
1129                    .flat_map(|sinks| sinks.iter())
1130                    .map(|sink| sink.original_fragment.fragment_id);
1131                Ok(Self::generate_update_mutation_for_replace_table(
1132                    old_fragments
1133                        .chain(auto_refresh_sink_fragment_ids)
1134                        .flat_map(|fragment_id| {
1135                            database_info.fragment(fragment_id).actors.keys().copied()
1136                        }),
1137                    merge_updates,
1138                    dispatchers,
1139                    split_assignment,
1140                    actor_cdc_table_snapshot_splits,
1141                    auto_refresh_schema_sinks.as_ref(),
1142                ))
1143            }
1144        }
1145    }
1146
1147    /// Build the `Update` mutation for `RescheduleIntent`.
1148    pub(super) fn reschedule_to_mutation(
1149        reschedules: &HashMap<FragmentId, Reschedule>,
1150        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1151        control_stream_manager: &ControlStreamManager,
1152        database_info: &mut InflightDatabaseInfo,
1153    ) -> MetaResult<Option<Mutation>> {
1154        {
1155            {
1156                let database_id = database_info.database_id;
1157                let mut dispatcher_update = HashMap::new();
1158                for reschedule in reschedules.values() {
1159                    for &(upstream_fragment_id, dispatcher_id) in
1160                        &reschedule.upstream_fragment_dispatcher_ids
1161                    {
1162                        // Find the actors of the upstream fragment.
1163                        let upstream_actor_ids = fragment_actors
1164                            .get(&upstream_fragment_id)
1165                            .expect("should contain");
1166
1167                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1168
1169                        // Record updates for all actors.
1170                        for &actor_id in upstream_actor_ids {
1171                            let added_downstream_actor_id = if upstream_reschedule
1172                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1173                                .unwrap_or(true)
1174                            {
1175                                reschedule
1176                                    .added_actors
1177                                    .values()
1178                                    .flatten()
1179                                    .cloned()
1180                                    .collect()
1181                            } else {
1182                                Default::default()
1183                            };
1184                            // Index with the dispatcher id to check duplicates.
1185                            dispatcher_update
1186                                .try_insert(
1187                                    (actor_id, dispatcher_id),
1188                                    DispatcherUpdate {
1189                                        actor_id,
1190                                        dispatcher_id,
1191                                        hash_mapping: reschedule
1192                                            .upstream_dispatcher_mapping
1193                                            .as_ref()
1194                                            .map(|m| m.to_protobuf()),
1195                                        added_downstream_actor_id,
1196                                        removed_downstream_actor_id: reschedule
1197                                            .removed_actors
1198                                            .iter()
1199                                            .cloned()
1200                                            .collect(),
1201                                    },
1202                                )
1203                                .unwrap();
1204                        }
1205                    }
1206                }
1207                let dispatcher_update = dispatcher_update.into_values().collect();
1208
1209                let mut merge_update = HashMap::new();
1210                for (&fragment_id, reschedule) in reschedules {
1211                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1212                        // Find the actors of the downstream fragment.
1213                        let downstream_actor_ids = fragment_actors
1214                            .get(&downstream_fragment_id)
1215                            .expect("should contain");
1216
1217                        // Downstream removed actors should be skipped
1218                        // Newly created actors of the current fragment will not dispatch Update
1219                        // barriers to them
1220                        let downstream_removed_actors: HashSet<_> = reschedules
1221                            .get(&downstream_fragment_id)
1222                            .map(|downstream_reschedule| {
1223                                downstream_reschedule
1224                                    .removed_actors
1225                                    .iter()
1226                                    .copied()
1227                                    .collect()
1228                            })
1229                            .unwrap_or_default();
1230
1231                        // Record updates for all actors.
1232                        for &actor_id in downstream_actor_ids {
1233                            if downstream_removed_actors.contains(&actor_id) {
1234                                continue;
1235                            }
1236
1237                            // Index with the fragment id to check duplicates.
1238                            merge_update
1239                                .try_insert(
1240                                    (actor_id, fragment_id),
1241                                    MergeUpdate {
1242                                        actor_id,
1243                                        upstream_fragment_id: fragment_id,
1244                                        new_upstream_fragment_id: None,
1245                                        added_upstream_actors: reschedule
1246                                            .added_actors
1247                                            .iter()
1248                                            .flat_map(|(worker_id, actors)| {
1249                                                let host =
1250                                                    control_stream_manager.host_addr(*worker_id);
1251                                                actors.iter().map(move |&actor_id| PbActorInfo {
1252                                                    actor_id,
1253                                                    host: Some(host.clone()),
1254                                                    // we assume that we only scale the partial graph of database
1255                                                    partial_graph_id: to_partial_graph_id(
1256                                                        database_id,
1257                                                        None,
1258                                                    ),
1259                                                })
1260                                            })
1261                                            .collect(),
1262                                        removed_upstream_actor_id: reschedule
1263                                            .removed_actors
1264                                            .iter()
1265                                            .cloned()
1266                                            .collect(),
1267                                    },
1268                                )
1269                                .unwrap();
1270                        }
1271                    }
1272                }
1273                let merge_update = merge_update.into_values().collect();
1274
1275                let mut actor_vnode_bitmap_update = HashMap::new();
1276                for reschedule in reschedules.values() {
1277                    // Record updates for all actors in this fragment.
1278                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1279                        let bitmap = bitmap.to_protobuf();
1280                        actor_vnode_bitmap_update
1281                            .try_insert(actor_id, bitmap)
1282                            .unwrap();
1283                    }
1284                }
1285                let dropped_actors = reschedules
1286                    .values()
1287                    .flat_map(|r| r.removed_actors.iter().copied())
1288                    .collect();
1289                let mut actor_splits = HashMap::new();
1290                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1291                for (fragment_id, reschedule) in reschedules {
1292                    for (actor_id, splits) in &reschedule.actor_splits {
1293                        actor_splits.insert(
1294                            *actor_id,
1295                            ConnectorSplits {
1296                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1297                            },
1298                        );
1299                    }
1300
1301                    if let Some(assignment) =
1302                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1303                    {
1304                        actor_cdc_table_snapshot_splits.extend(assignment)
1305                    }
1306                }
1307
1308                // we don't create dispatchers in reschedule scenario
1309                let actor_new_dispatchers = HashMap::new();
1310                let mutation = Mutation::Update(UpdateMutation {
1311                    dispatcher_update,
1312                    merge_update,
1313                    actor_vnode_bitmap_update,
1314                    dropped_actors,
1315                    actor_splits,
1316                    actor_new_dispatchers,
1317                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1318                        splits: actor_cdc_table_snapshot_splits,
1319                    }),
1320                    sink_schema_change: Default::default(),
1321                    subscriptions_to_drop: vec![],
1322                });
1323                tracing::debug!("update mutation: {mutation:?}");
1324                Ok(Some(mutation))
1325            }
1326        }
1327    }
1328
1329    /// Build the `Add` mutation for `CreateSubscription`.
1330    pub(super) fn create_subscription_to_mutation(
1331        upstream_mv_table_id: TableId,
1332        subscription_id: SubscriptionId,
1333    ) -> Mutation {
1334        {
1335            Mutation::Add(AddMutation {
1336                actor_dispatchers: Default::default(),
1337                added_actors: vec![],
1338                actor_splits: Default::default(),
1339                pause: false,
1340                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1341                    upstream_mv_table_id,
1342                    subscriber_id: subscription_id.as_subscriber_id(),
1343                }],
1344                backfill_nodes_to_pause: vec![],
1345                actor_cdc_table_snapshot_splits: None,
1346                new_upstream_sinks: Default::default(),
1347            })
1348        }
1349    }
1350
1351    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1352    pub(super) fn drop_subscription_to_mutation(
1353        upstream_mv_table_id: TableId,
1354        subscription_id: SubscriptionId,
1355    ) -> Mutation {
1356        {
1357            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1358                info: vec![SubscriptionUpstreamInfo {
1359                    subscriber_id: subscription_id.as_subscriber_id(),
1360                    upstream_mv_table_id,
1361                }],
1362            })
1363        }
1364    }
1365
1366    /// Build the `ConnectorPropsChange` mutation.
1367    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1368        {
1369            {
1370                let mut connector_props_infos = HashMap::default();
1371                for (k, v) in config {
1372                    connector_props_infos.insert(
1373                        k.as_raw_id(),
1374                        ConnectorPropsInfo {
1375                            connector_props_info: v.clone(),
1376                        },
1377                    );
1378                }
1379                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1380                    connector_props_infos,
1381                })
1382            }
1383        }
1384    }
1385
1386    /// Build the `RefreshStart` mutation.
1387    pub(super) fn refresh_to_mutation(
1388        table_id: TableId,
1389        associated_source_id: SourceId,
1390    ) -> Mutation {
1391        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1392            table_id,
1393            associated_source_id,
1394        })
1395    }
1396
1397    /// Build the `ListFinish` mutation.
1398    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1399        Mutation::ListFinish(ListFinishMutation {
1400            associated_source_id,
1401        })
1402    }
1403
1404    /// Build the `LoadFinish` mutation.
1405    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1406        Mutation::LoadFinish(LoadFinishMutation {
1407            associated_source_id,
1408        })
1409    }
1410
1411    /// Build the `ResetSource` mutation.
1412    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1413        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1414            source_id: source_id.as_raw_id(),
1415        })
1416    }
1417
1418    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1419    pub(super) fn resume_backfill_to_mutation(
1420        target: &ResumeBackfillTarget,
1421        database_info: &InflightDatabaseInfo,
1422    ) -> MetaResult<Option<Mutation>> {
1423        {
1424            {
1425                let fragment_ids: HashSet<_> = match target {
1426                    ResumeBackfillTarget::Job(job_id) => {
1427                        database_info.backfill_fragment_ids_for_job(*job_id)?
1428                    }
1429                    ResumeBackfillTarget::Fragment(fragment_id) => {
1430                        if !database_info.is_backfill_fragment(*fragment_id)? {
1431                            return Err(MetaError::invalid_parameter(format!(
1432                                "fragment {} is not a backfill node",
1433                                fragment_id
1434                            )));
1435                        }
1436                        HashSet::from([*fragment_id])
1437                    }
1438                };
1439                if fragment_ids.is_empty() {
1440                    warn!(
1441                        ?target,
1442                        "resume backfill command ignored because no backfill fragments found"
1443                    );
1444                    Ok(None)
1445                } else {
1446                    Ok(Some(Mutation::StartFragmentBackfill(
1447                        StartFragmentBackfillMutation {
1448                            fragment_ids: fragment_ids.into_iter().collect(),
1449                        },
1450                    )))
1451                }
1452            }
1453        }
1454    }
1455
1456    /// Build the `InjectSourceOffsets` mutation.
1457    pub(super) fn inject_source_offsets_to_mutation(
1458        source_id: SourceId,
1459        split_offsets: &HashMap<String, String>,
1460    ) -> Mutation {
1461        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1462            source_id: source_id.as_raw_id(),
1463            split_offsets: split_offsets.clone(),
1464        })
1465    }
1466
1467    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1468    pub(super) fn create_streaming_job_actors_to_create(
1469        info: &CreateStreamingJobCommandInfo,
1470        edges: &mut FragmentEdgeBuildResult,
1471        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1472        actor_location: &HashMap<ActorId, WorkerId>,
1473    ) -> StreamJobActorsToCreate {
1474        {
1475            {
1476                edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1477                    |fragment| {
1478                        let actors = stream_actors
1479                            .get(&fragment.fragment_id)
1480                            .into_iter()
1481                            .flatten()
1482                            .map(|actor| (actor, actor_location[&actor.actor_id]));
1483                        (
1484                            fragment.fragment_id,
1485                            &fragment.nodes,
1486                            actors,
1487                            [], // no subscriber for new job to create
1488                        )
1489                    },
1490                ))
1491            }
1492        }
1493    }
1494
1495    /// Collect actors to create for `RescheduleIntent`.
1496    pub(super) fn reschedule_actors_to_create(
1497        reschedules: &HashMap<FragmentId, Reschedule>,
1498        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1499        database_info: &InflightDatabaseInfo,
1500        control_stream_manager: &ControlStreamManager,
1501    ) -> StreamJobActorsToCreate {
1502        {
1503            {
1504                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1505                    reschedules.iter().map(|(fragment_id, reschedule)| {
1506                        (
1507                            *fragment_id,
1508                            reschedule.newly_created_actors.values().map(
1509                                |((actor, dispatchers), _)| {
1510                                    (actor.actor_id, dispatchers.as_slice())
1511                                },
1512                            ),
1513                        )
1514                    }),
1515                    Some((reschedules, fragment_actors)),
1516                    database_info,
1517                    control_stream_manager,
1518                );
1519                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1520                for (fragment_id, (actor, dispatchers), worker_id) in
1521                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1522                        reschedule
1523                            .newly_created_actors
1524                            .values()
1525                            .map(|(actors, status)| (*fragment_id, actors, status))
1526                    })
1527                {
1528                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1529                    map.entry(*worker_id)
1530                        .or_default()
1531                        .entry(fragment_id)
1532                        .or_insert_with(|| {
1533                            let node = database_info.fragment(fragment_id).nodes.clone();
1534                            let subscribers =
1535                                database_info.fragment_subscribers(fragment_id).collect();
1536                            (node, vec![], subscribers)
1537                        })
1538                        .1
1539                        .push((actor.clone(), upstreams, dispatchers.clone()));
1540                }
1541                map
1542            }
1543        }
1544    }
1545
1546    /// Collect actors to create for `ReplaceStreamJob`.
1547    pub(super) fn replace_stream_job_actors_to_create(
1548        replace_table: &ReplaceStreamJobPlan,
1549        edges: &mut FragmentEdgeBuildResult,
1550        database_info: &InflightDatabaseInfo,
1551        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1552        actor_location: &HashMap<ActorId, WorkerId>,
1553    ) -> StreamJobActorsToCreate {
1554        {
1555            {
1556                let mut actors = edges.collect_actors_to_create(
1557                    replace_table
1558                        .new_fragments
1559                        .fragments
1560                        .values()
1561                        .map(|fragment| {
1562                            let actors = stream_actors
1563                                .get(&fragment.fragment_id)
1564                                .into_iter()
1565                                .flatten()
1566                                .map(|actor| (actor, actor_location[&actor.actor_id]));
1567                            (
1568                                fragment.fragment_id,
1569                                &fragment.nodes,
1570                                actors,
1571                                database_info
1572                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1573                            )
1574                        }),
1575                );
1576
1577                // Handle auto-refresh schema sinks
1578                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1579                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1580                        (
1581                            sink.new_fragment.fragment_id,
1582                            &sink.new_fragment.nodes,
1583                            stream_actors
1584                                .get(&sink.new_fragment.fragment_id)
1585                                .into_iter()
1586                                .flatten()
1587                                .map(|actor| (actor, actor_location[&actor.actor_id])),
1588                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1589                        )
1590                    }));
1591                    for (worker_id, fragment_actors) in sink_actors {
1592                        actors.entry(worker_id).or_default().extend(fragment_actors);
1593                    }
1594                }
1595                actors
1596            }
1597        }
1598    }
1599
1600    fn generate_update_mutation_for_replace_table(
1601        dropped_actors: impl IntoIterator<Item = ActorId>,
1602        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1603        dispatchers: FragmentActorDispatchers,
1604        split_assignment: &SplitAssignment,
1605        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1606        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1607    ) -> Option<Mutation> {
1608        let dropped_actors = dropped_actors.into_iter().collect();
1609
1610        let actor_new_dispatchers = dispatchers
1611            .into_values()
1612            .flatten()
1613            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1614            .collect();
1615
1616        let actor_splits = split_assignment
1617            .values()
1618            .flat_map(build_actor_connector_splits)
1619            .collect();
1620        Some(Mutation::Update(UpdateMutation {
1621            actor_new_dispatchers,
1622            merge_update: merge_updates.into_values().flatten().collect(),
1623            dropped_actors,
1624            actor_splits,
1625            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1626            sink_schema_change: auto_refresh_schema_sinks
1627                .as_ref()
1628                .into_iter()
1629                .flat_map(|sinks| {
1630                    sinks.iter().map(|sink| {
1631                        let op = if !sink.removed_column_names.is_empty() {
1632                            PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1633                                column_names: sink.removed_column_names.clone(),
1634                            })
1635                        } else {
1636                            PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1637                                fields: sink
1638                                    .newly_add_fields
1639                                    .iter()
1640                                    .map(|field| field.to_prost())
1641                                    .collect(),
1642                            })
1643                        };
1644                        (
1645                            sink.original_sink.id.as_raw_id(),
1646                            PbSinkSchemaChange {
1647                                original_schema: sink
1648                                    .original_sink
1649                                    .columns
1650                                    .iter()
1651                                    .map(|col| PbField {
1652                                        data_type: Some(
1653                                            col.column_desc
1654                                                .as_ref()
1655                                                .unwrap()
1656                                                .column_type
1657                                                .as_ref()
1658                                                .unwrap()
1659                                                .clone(),
1660                                        ),
1661                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1662                                    })
1663                                    .collect(),
1664                                op: Some(op),
1665                            },
1666                        )
1667                    })
1668                })
1669                .collect(),
1670            ..Default::default()
1671        }))
1672    }
1673}
1674
1675impl Command {
1676    #[expect(clippy::type_complexity)]
1677    pub(super) fn collect_database_partial_graph_actor_upstreams(
1678        actor_dispatchers: impl Iterator<
1679            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1680        >,
1681        reschedule_dispatcher_update: Option<(
1682            &HashMap<FragmentId, Reschedule>,
1683            &HashMap<FragmentId, HashSet<ActorId>>,
1684        )>,
1685        database_info: &InflightDatabaseInfo,
1686        control_stream_manager: &ControlStreamManager,
1687    ) -> HashMap<ActorId, ActorUpstreams> {
1688        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1689        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1690            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1691            for (upstream_actor_id, dispatchers) in upstream_actors {
1692                let upstream_actor_location =
1693                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1694                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1695                for downstream_actor_id in dispatchers
1696                    .iter()
1697                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1698                {
1699                    actor_upstreams
1700                        .entry(*downstream_actor_id)
1701                        .or_default()
1702                        .entry(upstream_fragment_id)
1703                        .or_default()
1704                        .insert(
1705                            upstream_actor_id,
1706                            PbActorInfo {
1707                                actor_id: upstream_actor_id,
1708                                host: Some(upstream_actor_host.clone()),
1709                                partial_graph_id: to_partial_graph_id(
1710                                    database_info.database_id,
1711                                    None,
1712                                ),
1713                            },
1714                        );
1715                }
1716            }
1717        }
1718        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1719            for reschedule in reschedules.values() {
1720                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1721                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1722                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1723                    for upstream_actor_id in fragment_actors
1724                        .get(upstream_fragment_id)
1725                        .expect("should exist")
1726                    {
1727                        let upstream_actor_location =
1728                            upstream_fragment.actors[upstream_actor_id].worker_id;
1729                        let upstream_actor_host =
1730                            control_stream_manager.host_addr(upstream_actor_location);
1731                        if let Some(upstream_reschedule) = upstream_reschedule
1732                            && upstream_reschedule
1733                                .removed_actors
1734                                .contains(upstream_actor_id)
1735                        {
1736                            continue;
1737                        }
1738                        for (_, downstream_actor_id) in
1739                            reschedule
1740                                .added_actors
1741                                .iter()
1742                                .flat_map(|(worker_id, actors)| {
1743                                    actors.iter().map(|actor| (*worker_id, *actor))
1744                                })
1745                        {
1746                            actor_upstreams
1747                                .entry(downstream_actor_id)
1748                                .or_default()
1749                                .entry(*upstream_fragment_id)
1750                                .or_default()
1751                                .insert(
1752                                    *upstream_actor_id,
1753                                    PbActorInfo {
1754                                        actor_id: *upstream_actor_id,
1755                                        host: Some(upstream_actor_host.clone()),
1756                                        partial_graph_id: to_partial_graph_id(
1757                                            database_info.database_id,
1758                                            None,
1759                                        ),
1760                                    },
1761                                );
1762                        }
1763                    }
1764                }
1765            }
1766        }
1767        actor_upstreams
1768    }
1769}