risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49    PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50    StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51    UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::partial_graph::PartialGraphBarrierInfo;
61use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
62use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::scale::LoadedFragmentContext;
65use crate::controller::utils::StreamingJobExtraInfo;
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
71    StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::{
74    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
75    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
76    build_actor_connector_splits,
77};
78use crate::{MetaError, MetaResult};
79
80/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
81/// used for actor scaling or migration.
82#[derive(Debug, Clone)]
83pub struct Reschedule {
84    /// Added actors in this fragment.
85    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87    /// Removed actors in this fragment.
88    pub removed_actors: HashSet<ActorId>,
89
90    /// Vnode bitmap updates for some actors in this fragment.
91    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
94    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95    /// New hash mapping of the upstream dispatcher to be updated.
96    ///
97    /// This field exists only when there's upstream fragment and the current fragment is
98    /// hash-sharded.
99    pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101    /// The downstream fragments of this fragment.
102    pub downstream_fragment_ids: Vec<FragmentId>,
103
104    /// Reassigned splits for source actors.
105    /// It becomes the `actor_splits` in [`UpdateMutation`].
106    /// `Source` and `SourceBackfill` are handled together here.
107    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110}
111
112#[derive(Debug, Clone)]
113pub struct ReschedulePlan {
114    pub reschedules: HashMap<FragmentId, Reschedule>,
115    /// Should contain the actor ids in upstream and downstream fragments referenced by
116    /// `reschedules`.
117    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
118}
119
120/// Preloaded context for rescheduling, built outside the barrier worker.
121#[derive(Debug, Clone)]
122pub struct RescheduleContext {
123    pub loaded: LoadedFragmentContext,
124    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
125    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
128}
129
130impl RescheduleContext {
131    pub fn empty() -> Self {
132        Self {
133            loaded: LoadedFragmentContext::default(),
134            job_extra_info: HashMap::new(),
135            upstream_fragments: HashMap::new(),
136            downstream_fragments: HashMap::new(),
137            downstream_relations: HashMap::new(),
138        }
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.loaded.is_empty()
143    }
144
145    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
146        let loaded = self.loaded.for_database(database_id)?;
147        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
148        // Use the filtered loaded context as the source of truth so every side map is pruned by
149        // the same fragment set.
150        let fragment_ids: HashSet<FragmentId> = loaded
151            .job_fragments
152            .values()
153            .flat_map(|fragments| fragments.keys().copied())
154            .collect();
155
156        let job_extra_info = self
157            .job_extra_info
158            .iter()
159            .filter(|(job_id, _)| job_ids.contains(*job_id))
160            .map(|(job_id, info)| (*job_id, info.clone()))
161            .collect();
162
163        let upstream_fragments = self
164            .upstream_fragments
165            .iter()
166            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
167            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
168            .collect();
169
170        let downstream_fragments = self
171            .downstream_fragments
172            .iter()
173            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
174            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
175            .collect();
176
177        let downstream_relations = self
178            .downstream_relations
179            .iter()
180            // Ownership of this map is source-fragment based. We keep all downstream edges for
181            // selected sources because the target side can still be referenced during dispatcher
182            // reconstruction even if that target fragment is not being rescheduled.
183            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
184            .map(|(key, relation)| (*key, relation.clone()))
185            .collect();
186
187        Some(Self {
188            loaded,
189            job_extra_info,
190            upstream_fragments,
191            downstream_fragments,
192            downstream_relations,
193        })
194    }
195
196    /// Split this context into per-database contexts without cloning the large loaded graph
197    /// payloads.
198    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
199        let Self {
200            loaded,
201            job_extra_info,
202            upstream_fragments,
203            downstream_fragments,
204            downstream_relations,
205        } = self;
206
207        let mut contexts: HashMap<_, _> = loaded
208            .into_database_contexts()
209            .into_iter()
210            .map(|(database_id, loaded)| {
211                (
212                    database_id,
213                    Self {
214                        loaded,
215                        job_extra_info: HashMap::new(),
216                        upstream_fragments: HashMap::new(),
217                        downstream_fragments: HashMap::new(),
218                        downstream_relations: HashMap::new(),
219                    },
220                )
221            })
222            .collect();
223
224        if contexts.is_empty() {
225            return contexts;
226        }
227
228        let mut job_databases = HashMap::new();
229        let mut fragment_databases = HashMap::new();
230        for (&database_id, context) in &contexts {
231            for job_id in context.loaded.job_map.keys().copied() {
232                job_databases.insert(job_id, database_id);
233            }
234            for fragment_id in context
235                .loaded
236                .job_fragments
237                .values()
238                .flat_map(|fragments| fragments.keys().copied())
239            {
240                fragment_databases.insert(fragment_id, database_id);
241            }
242        }
243
244        for (job_id, info) in job_extra_info {
245            if let Some(database_id) = job_databases.get(&job_id).copied() {
246                contexts
247                    .get_mut(&database_id)
248                    .expect("database context should exist for job")
249                    .job_extra_info
250                    .insert(job_id, info);
251            }
252        }
253
254        for (fragment_id, upstreams) in upstream_fragments {
255            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
256                contexts
257                    .get_mut(&database_id)
258                    .expect("database context should exist for fragment")
259                    .upstream_fragments
260                    .insert(fragment_id, upstreams);
261            }
262        }
263
264        for (fragment_id, downstreams) in downstream_fragments {
265            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
266                contexts
267                    .get_mut(&database_id)
268                    .expect("database context should exist for fragment")
269                    .downstream_fragments
270                    .insert(fragment_id, downstreams);
271            }
272        }
273
274        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
275            // Route by source fragment ownership. A target may be outside of current reschedule
276            // set, but this edge still belongs to the source-side command.
277            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
278                contexts
279                    .get_mut(&database_id)
280                    .expect("database context should exist for relation source")
281                    .downstream_relations
282                    .insert((source_fragment_id, target_fragment_id), relation);
283            }
284        }
285
286        contexts
287    }
288}
289
290/// Replacing an old job with a new one. All actors in the job will be rebuilt.
291///
292/// Current use cases:
293/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
294/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
295///   will replace a table job's plan.
296#[derive(Debug, Clone)]
297pub struct ReplaceStreamJobPlan {
298    pub old_fragments: StreamJobFragments,
299    pub new_fragments: StreamJobFragmentsToCreate,
300    /// The resource group of the database this job belongs to.
301    pub database_resource_group: String,
302    /// Downstream jobs of the replaced job need to update their `Merge` node to
303    /// connect to the new fragment.
304    pub replace_upstream: FragmentReplaceUpstream,
305    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
306    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
307    /// inside the barrier worker.
308    pub split_plan: ReplaceJobSplitPlan,
309    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
310    pub streaming_job: StreamingJob,
311    /// The `streaming_job::Model` for this job, loaded from meta store.
312    pub streaming_job_model: streaming_job::Model,
313    /// The temporary dummy job fragments id of new table fragment
314    pub tmp_id: JobId,
315    /// The state table ids to be dropped.
316    pub to_drop_state_table_ids: Vec<TableId>,
317    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
318}
319
320impl ReplaceStreamJobPlan {
321    /// `old_fragment_id` -> `new_fragment_id`
322    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
323        let mut fragment_replacements = HashMap::new();
324        for (upstream_fragment_id, new_upstream_fragment_id) in
325            self.replace_upstream.values().flatten()
326        {
327            {
328                let r =
329                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
330                if let Some(r) = r {
331                    assert_eq!(
332                        *new_upstream_fragment_id, r,
333                        "one fragment is replaced by multiple fragments"
334                    );
335                }
336            }
337        }
338        fragment_replacements
339    }
340}
341
342#[derive(educe::Educe, Clone)]
343#[educe(Debug)]
344pub struct CreateStreamingJobCommandInfo {
345    #[educe(Debug(ignore))]
346    pub stream_job_fragments: StreamJobFragmentsToCreate,
347    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
348    /// The resource group of the database this job belongs to.
349    pub database_resource_group: String,
350    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
351    pub init_split_assignment: SourceSplitAssignment,
352    pub definition: String,
353    pub job_type: StreamingJobType,
354    pub create_type: CreateType,
355    pub streaming_job: StreamingJob,
356    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
357    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
358    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
359    pub is_serverless: bool,
360    /// The `streaming_job::Model` for this job, loaded from meta store.
361    pub streaming_job_model: streaming_job::Model,
362    /// Batch refresh interval in seconds. If set, the MV uses batch refresh semantics.
363    pub refresh_interval_sec: Option<u64>,
364}
365
366impl StreamJobFragments {
367    /// Build fragment-level info for new fragments, populating actor infos from the
368    /// rendered actors and their locations, and applying split assignment to actor splits.
369    pub(super) fn new_fragment_info<'a>(
370        &'a self,
371        stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
372        actor_location: &'a HashMap<ActorId, WorkerId>,
373        assignment: &'a SplitAssignment,
374    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
375        self.fragments.values().map(|fragment| {
376            (
377                fragment.fragment_id,
378                InflightFragmentInfo {
379                    fragment_id: fragment.fragment_id,
380                    distribution_type: fragment.distribution_type.into(),
381                    fragment_type_mask: fragment.fragment_type_mask,
382                    vnode_count: fragment.vnode_count(),
383                    nodes: fragment.nodes.clone(),
384                    actors: stream_actors
385                        .get(&fragment.fragment_id)
386                        .into_iter()
387                        .flatten()
388                        .map(|actor| {
389                            (
390                                actor.actor_id,
391                                InflightActorInfo {
392                                    worker_id: actor_location[&actor.actor_id],
393                                    vnode_bitmap: actor.vnode_bitmap.clone(),
394                                    splits: assignment
395                                        .get(&fragment.fragment_id)
396                                        .and_then(|s| s.get(&actor.actor_id))
397                                        .cloned()
398                                        .unwrap_or_default(),
399                                },
400                            )
401                        })
402                        .collect(),
403                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
404                },
405            )
406        })
407    }
408}
409
410#[derive(Debug, Clone)]
411pub struct SnapshotBackfillInfo {
412    /// `table_id` -> `Some(snapshot_backfill_epoch)`
413    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
414    /// by global barrier worker when handling the command.
415    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
416}
417
418#[derive(Debug, Clone)]
419pub struct BatchRefreshInfo {
420    pub snapshot_backfill_info: SnapshotBackfillInfo,
421    pub refresh_interval_sec: u64,
422}
423
424#[derive(Debug, Clone)]
425pub enum CreateStreamingJobType {
426    Normal,
427    SinkIntoTable(UpstreamSinkInfo),
428    SnapshotBackfill(SnapshotBackfillInfo),
429    BatchRefresh(BatchRefreshInfo),
430}
431
432/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
433/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
434/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
435// FIXME: this enum is significantly large on stack, box it
436#[derive(Debug)]
437pub enum Command {
438    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
439    /// all messages before the checkpoint barrier should have been committed.
440    Flush,
441
442    /// `Pause` command generates a `Pause` barrier **only if**
443    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
444    Pause,
445
446    /// `Resume` command generates a `Resume` barrier **only
447    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
448    /// will be generated.
449    Resume,
450
451    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
452    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
453    /// dropped by reference counts before.
454    ///
455    /// Barriers from the actors to be dropped will STILL be collected.
456    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
457    /// drop actors, and then delete the job fragments info from meta store.
458    DropStreamingJobs {
459        streaming_job_ids: HashSet<JobId>,
460        /// Used by recovery quick path when draining buffered drop/cancel commands.
461        unregistered_state_table_ids: HashSet<TableId>,
462        // target_fragment -> [sink_fragments]
463        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
464    },
465
466    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
467    ///
468    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
469    /// be collected since the barrier should be passthrough.
470    ///
471    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
472    /// it adds the job fragments info to meta store. However, the creating progress will **last
473    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
474    /// will be set to `Created`.
475    CreateStreamingJob {
476        info: CreateStreamingJobCommandInfo,
477        job_type: CreateStreamingJobType,
478        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
479    },
480
481    /// Reschedule context. It must be resolved inside the barrier worker before injection.
482    RescheduleIntent {
483        context: RescheduleContext,
484        /// Filled by the barrier worker after resolving `context` against current worker topology.
485        ///
486        /// We keep unresolved `context` outside of checkpoint state and only materialize this
487        /// execution plan right before injection, then drop `context` to release memory earlier.
488        reschedule_plan: Option<ReschedulePlan>,
489    },
490
491    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
492    /// essentially switching the downstream of the old job fragments to the new ones, and
493    /// dropping the old job fragments. Used for schema change.
494    ///
495    /// This can be treated as a special case of reschedule, while the upstream fragment
496    /// of the Merge executors are changed additionally.
497    ReplaceStreamJob(ReplaceStreamJobPlan),
498
499    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
500    /// changed splits.
501    SourceChangeSplit(SplitState),
502
503    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
504    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
505    Throttle {
506        jobs: HashSet<JobId>,
507        config: HashMap<FragmentId, ThrottleConfig>,
508    },
509
510    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
511    /// materialize executor to start storing old value for subscription.
512    CreateSubscription {
513        subscription_id: SubscriptionId,
514        upstream_mv_table_id: TableId,
515        retention_second: u64,
516    },
517
518    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
519    /// materialize executor to stop storing old value when there is no
520    /// subscription depending on it.
521    DropSubscription {
522        subscription_id: SubscriptionId,
523        upstream_mv_table_id: TableId,
524    },
525
526    /// `AlterSubscriptionRetention` command updates the subscription retention time.
527    AlterSubscriptionRetention {
528        subscription_id: SubscriptionId,
529        upstream_mv_table_id: TableId,
530        retention_second: u64,
531    },
532
533    ConnectorPropsChange(ConnectorPropsChange),
534
535    /// `Refresh` command generates a barrier to refresh a table by truncating state
536    /// and reloading data from source.
537    Refresh {
538        table_id: TableId,
539        associated_source_id: SourceId,
540    },
541    ListFinish {
542        table_id: TableId,
543        associated_source_id: SourceId,
544    },
545    LoadFinish {
546        table_id: TableId,
547        associated_source_id: SourceId,
548    },
549
550    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
551    /// Used when upstream binlog/oplog has expired.
552    ResetSource {
553        source_id: SourceId,
554    },
555
556    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
557    /// to resume for troubleshooting.
558    ResumeBackfill {
559        target: ResumeBackfillTarget,
560    },
561
562    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
563    /// into source splits (UNSAFE - admin only).
564    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
565    InjectSourceOffsets {
566        source_id: SourceId,
567        /// Split ID -> offset (JSON-encoded based on connector type)
568        split_offsets: HashMap<String, String>,
569    },
570}
571
572#[derive(Debug, Clone, Copy)]
573pub enum ResumeBackfillTarget {
574    Job(JobId),
575    Fragment(FragmentId),
576}
577
578// For debugging and observability purposes. Can add more details later if needed.
579impl std::fmt::Display for Command {
580    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
581        match self {
582            Command::Flush => write!(f, "Flush"),
583            Command::Pause => write!(f, "Pause"),
584            Command::Resume => write!(f, "Resume"),
585            Command::DropStreamingJobs {
586                streaming_job_ids, ..
587            } => {
588                write!(
589                    f,
590                    "DropStreamingJobs: {}",
591                    streaming_job_ids.iter().sorted().join(", ")
592                )
593            }
594            Command::CreateStreamingJob { info, .. } => {
595                write!(f, "CreateStreamingJob: {}", info.streaming_job)
596            }
597            Command::RescheduleIntent {
598                reschedule_plan, ..
599            } => {
600                if reschedule_plan.is_some() {
601                    write!(f, "RescheduleIntent(planned)")
602                } else {
603                    write!(f, "RescheduleIntent")
604                }
605            }
606            Command::ReplaceStreamJob(plan) => {
607                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
608            }
609            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
610            Command::Throttle { .. } => write!(f, "Throttle"),
611            Command::CreateSubscription {
612                subscription_id, ..
613            } => write!(f, "CreateSubscription: {subscription_id}"),
614            Command::DropSubscription {
615                subscription_id, ..
616            } => write!(f, "DropSubscription: {subscription_id}"),
617            Command::AlterSubscriptionRetention {
618                subscription_id,
619                retention_second,
620                ..
621            } => write!(
622                f,
623                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
624            ),
625            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
626            Command::Refresh {
627                table_id,
628                associated_source_id,
629            } => write!(
630                f,
631                "Refresh: {} (source: {})",
632                table_id, associated_source_id
633            ),
634            Command::ListFinish {
635                table_id,
636                associated_source_id,
637            } => write!(
638                f,
639                "ListFinish: {} (source: {})",
640                table_id, associated_source_id
641            ),
642            Command::LoadFinish {
643                table_id,
644                associated_source_id,
645            } => write!(
646                f,
647                "LoadFinish: {} (source: {})",
648                table_id, associated_source_id
649            ),
650            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
651            Command::ResumeBackfill { target } => match target {
652                ResumeBackfillTarget::Job(job_id) => {
653                    write!(f, "ResumeBackfill: job={job_id}")
654                }
655                ResumeBackfillTarget::Fragment(fragment_id) => {
656                    write!(f, "ResumeBackfill: fragment={fragment_id}")
657                }
658            },
659            Command::InjectSourceOffsets {
660                source_id,
661                split_offsets,
662            } => write!(
663                f,
664                "InjectSourceOffsets: {} ({} splits)",
665                source_id,
666                split_offsets.len()
667            ),
668        }
669    }
670}
671
672impl Command {
673    pub fn pause() -> Self {
674        Self::Pause
675    }
676
677    pub fn resume() -> Self {
678        Self::Resume
679    }
680
681    pub fn need_checkpoint(&self) -> bool {
682        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
683        !matches!(self, Command::Resume)
684    }
685}
686
687#[derive(Debug)]
688pub enum PostCollectCommand {
689    Command(String),
690    DropStreamingJobs,
691    CreateStreamingJob {
692        info: CreateStreamingJobCommandInfo,
693        job_type: CreateStreamingJobType,
694        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
695        resolved_split_assignment: SplitAssignment,
696    },
697    Reschedule {
698        reschedules: HashMap<FragmentId, Reschedule>,
699    },
700    ReplaceStreamJob {
701        plan: ReplaceStreamJobPlan,
702        resolved_split_assignment: SplitAssignment,
703    },
704    SourceChangeSplit {
705        split_assignment: SplitAssignment,
706    },
707    CreateSubscription {
708        subscription_id: SubscriptionId,
709    },
710    ConnectorPropsChange(ConnectorPropsChange),
711    ResumeBackfill {
712        target: ResumeBackfillTarget,
713    },
714}
715
716impl PostCollectCommand {
717    pub fn barrier() -> Self {
718        PostCollectCommand::Command("barrier".to_owned())
719    }
720
721    pub fn should_checkpoint(&self) -> bool {
722        match self {
723            PostCollectCommand::DropStreamingJobs
724            | PostCollectCommand::CreateStreamingJob { .. }
725            | PostCollectCommand::Reschedule { .. }
726            | PostCollectCommand::ReplaceStreamJob { .. }
727            | PostCollectCommand::SourceChangeSplit { .. }
728            | PostCollectCommand::CreateSubscription { .. }
729            | PostCollectCommand::ConnectorPropsChange(_)
730            | PostCollectCommand::ResumeBackfill { .. } => true,
731            PostCollectCommand::Command(_) => false,
732        }
733    }
734
735    pub fn command_name(&self) -> &str {
736        match self {
737            PostCollectCommand::Command(name) => name.as_str(),
738            PostCollectCommand::DropStreamingJobs => "DropStreamingJobs",
739            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
740            PostCollectCommand::Reschedule { .. } => "Reschedule",
741            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
742            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
743            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
744            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
745            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
746        }
747    }
748}
749
750impl Display for PostCollectCommand {
751    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
752        f.write_str(self.command_name())
753    }
754}
755
756#[derive(Debug, Clone, PartialEq, Eq)]
757pub enum BarrierKind {
758    Initial,
759    Barrier,
760    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
761    Checkpoint(Vec<u64>),
762}
763
764impl BarrierKind {
765    pub fn to_protobuf(&self) -> PbBarrierKind {
766        match self {
767            BarrierKind::Initial => PbBarrierKind::Initial,
768            BarrierKind::Barrier => PbBarrierKind::Barrier,
769            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
770        }
771    }
772
773    pub fn is_checkpoint(&self) -> bool {
774        matches!(self, BarrierKind::Checkpoint(_))
775    }
776
777    pub fn is_initial(&self) -> bool {
778        matches!(self, BarrierKind::Initial)
779    }
780
781    pub fn as_str_name(&self) -> &'static str {
782        match self {
783            BarrierKind::Initial => "Initial",
784            BarrierKind::Barrier => "Barrier",
785            BarrierKind::Checkpoint(_) => "Checkpoint",
786        }
787    }
788}
789
790impl BarrierInfo {
791    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
792        let Some(truncate_timestamptz) = Timestamptz::from_secs(
793            self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
794        ) else {
795            warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
796            return self.prev_epoch.value();
797        };
798        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
799    }
800}
801
802impl Command {
803    pub(super) fn collect_commit_epoch_info(
804        database_info: &InflightDatabaseInfo,
805        barrier_info: &PartialGraphBarrierInfo,
806        info: &mut CommitEpochInfo,
807        resps: Vec<BarrierCompleteResponse>,
808        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
809    ) {
810        let (
811            sst_to_context,
812            synced_ssts,
813            new_table_watermarks,
814            old_value_ssts,
815            vector_index_adds,
816            truncate_tables,
817        ) = collect_resp_info(resps);
818
819        let new_table_fragment_infos =
820            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
821                &barrier_info.post_collect_command
822            {
823                assert!(!matches!(
824                    job_type,
825                    CreateStreamingJobType::SnapshotBackfill(_)
826                        | CreateStreamingJobType::BatchRefresh(_)
827                ));
828                let table_fragments = &info.stream_job_fragments;
829                let mut table_ids: HashSet<_> =
830                    table_fragments.internal_table_ids().into_iter().collect();
831                if let Some(mv_table_id) = table_fragments.mv_table_id() {
832                    table_ids.insert(mv_table_id);
833                }
834
835                vec![NewTableFragmentInfo { table_ids }]
836            } else {
837                vec![]
838            };
839
840        let mut mv_log_store_truncate_epoch = HashMap::new();
841        // TODO: may collect cross db snapshot backfill
842        let mut update_truncate_epoch =
843            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
844                Entry::Occupied(mut entry) => {
845                    let prev_truncate_epoch = entry.get_mut();
846                    if truncate_epoch < *prev_truncate_epoch {
847                        *prev_truncate_epoch = truncate_epoch;
848                    }
849                }
850                Entry::Vacant(entry) => {
851                    entry.insert(truncate_epoch);
852                }
853            };
854        for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
855            let truncate_epoch = barrier_info
856                .barrier_info
857                .get_truncate_epoch(max_retention)
858                .0;
859            update_truncate_epoch(mv_table_id, truncate_epoch);
860        }
861        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
862            for mv_table_id in upstream_mv_table_ids {
863                update_truncate_epoch(mv_table_id, backfill_epoch);
864            }
865        }
866
867        let table_new_change_log = build_table_change_log_delta(
868            old_value_ssts.into_iter(),
869            synced_ssts.iter().map(|sst| &sst.sst_info),
870            must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
871            mv_log_store_truncate_epoch.into_iter(),
872        );
873
874        let epoch = barrier_info.barrier_info.prev_epoch();
875        for table_id in &barrier_info.table_ids_to_commit {
876            info.tables_to_commit
877                .try_insert(*table_id, epoch)
878                .expect("non duplicate");
879        }
880
881        info.sstables.extend(synced_ssts);
882        info.new_table_watermarks.extend(new_table_watermarks);
883        info.sst_to_context.extend(sst_to_context);
884        info.new_table_fragment_infos
885            .extend(new_table_fragment_infos);
886        info.change_log_delta.extend(table_new_change_log);
887        for (table_id, vector_index_adds) in vector_index_adds {
888            info.vector_index_delta
889                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
890                .expect("non-duplicate");
891        }
892        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
893            &barrier_info.post_collect_command
894            && let Some(index_table) = collect_new_vector_index_info(job_info)
895        {
896            info.vector_index_delta
897                .try_insert(
898                    index_table.id,
899                    VectorIndexDelta::Init(PbVectorIndexInit {
900                        info: Some(index_table.vector_index_info.unwrap()),
901                    }),
902                )
903                .expect("non-duplicate");
904        }
905        info.truncate_tables.extend(truncate_tables);
906    }
907}
908
909impl Command {
910    /// Build the `Pause` mutation.
911    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
912        {
913            {
914                // Only pause when the cluster is not already paused.
915                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
916                if !is_currently_paused {
917                    Some(Mutation::Pause(PauseMutation {}))
918                } else {
919                    None
920                }
921            }
922        }
923    }
924
925    /// Build the `Resume` mutation.
926    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
927        {
928            {
929                // Only resume when the cluster is paused with the same reason.
930                if is_currently_paused {
931                    Some(Mutation::Resume(ResumeMutation {}))
932                } else {
933                    None
934                }
935            }
936        }
937    }
938
939    /// Build the `Splits` mutation for `SourceChangeSplit`.
940    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
941        {
942            {
943                let mut diff = HashMap::new();
944
945                for actor_splits in split_assignment.values() {
946                    diff.extend(actor_splits.clone());
947                }
948
949                Mutation::Splits(SourceChangeSplitMutation {
950                    actor_splits: build_actor_connector_splits(&diff),
951                })
952            }
953        }
954    }
955
956    /// Build the `Throttle` mutation.
957    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
958        {
959            {
960                let config = config.clone();
961                Mutation::Throttle(ThrottleMutation {
962                    fragment_throttle: config,
963                })
964            }
965        }
966    }
967
968    /// Build the `Stop` mutation for `DropStreamingJobs`.
969    pub(super) fn drop_streaming_jobs_to_mutation(
970        actors: &Vec<ActorId>,
971        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
972    ) -> Mutation {
973        {
974            Mutation::Stop(StopMutation {
975                actors: actors.clone(),
976                dropped_sink_fragments: dropped_sink_fragment_by_targets
977                    .values()
978                    .flatten()
979                    .cloned()
980                    .collect(),
981            })
982        }
983    }
984
985    /// Build the `Add` mutation for `CreateStreamingJob`.
986    pub(super) fn create_streaming_job_to_mutation(
987        info: &CreateStreamingJobCommandInfo,
988        job_type: &CreateStreamingJobType,
989        is_currently_paused: bool,
990        edges: &mut FragmentEdgeBuildResult,
991        control_stream_manager: &ControlStreamManager,
992        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
993        split_assignment: &SplitAssignment,
994        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
995        actor_location: &HashMap<ActorId, WorkerId>,
996    ) -> MetaResult<Mutation> {
997        {
998            {
999                let CreateStreamingJobCommandInfo {
1000                    stream_job_fragments,
1001                    upstream_fragment_downstreams,
1002                    fragment_backfill_ordering,
1003                    streaming_job,
1004                    ..
1005                } = info;
1006                let database_id = streaming_job.database_id();
1007                let added_actors: Vec<ActorId> = stream_actors
1008                    .values()
1009                    .flatten()
1010                    .map(|actor| actor.actor_id)
1011                    .collect();
1012                let actor_splits = split_assignment
1013                    .values()
1014                    .flat_map(build_actor_connector_splits)
1015                    .collect();
1016                let subscriptions_to_add =
1017                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
1018                    | CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
1019                        snapshot_backfill_info,
1020                        ..
1021                    }) = job_type
1022                    {
1023                        snapshot_backfill_info
1024                            .upstream_mv_table_id_to_backfill_epoch
1025                            .keys()
1026                            .map(|table_id| SubscriptionUpstreamInfo {
1027                                subscriber_id: stream_job_fragments
1028                                    .stream_job_id()
1029                                    .as_subscriber_id(),
1030                                upstream_mv_table_id: *table_id,
1031                            })
1032                            .collect()
1033                    } else {
1034                        Default::default()
1035                    };
1036                let backfill_nodes_to_pause: Vec<_> =
1037                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1038                        .into_iter()
1039                        .collect();
1040
1041                let new_upstream_sinks =
1042                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1043                        sink_fragment_id,
1044                        sink_output_fields,
1045                        project_exprs,
1046                        new_sink_downstream,
1047                        ..
1048                    }) = job_type
1049                    {
1050                        let new_sink_actors = stream_actors
1051                            .get(sink_fragment_id)
1052                            .unwrap_or_else(|| {
1053                                panic!("upstream sink fragment {sink_fragment_id} not exist")
1054                            })
1055                            .iter()
1056                            .map(|actor| {
1057                                let worker_id = actor_location[&actor.actor_id];
1058                                PbActorInfo {
1059                                    actor_id: actor.actor_id,
1060                                    host: Some(control_stream_manager.host_addr(worker_id)),
1061                                    partial_graph_id: to_partial_graph_id(database_id, None),
1062                                }
1063                            });
1064                        let new_upstream_sink = PbNewUpstreamSink {
1065                            info: Some(PbUpstreamSinkInfo {
1066                                upstream_fragment_id: *sink_fragment_id,
1067                                sink_output_schema: sink_output_fields.clone(),
1068                                project_exprs: project_exprs.clone(),
1069                            }),
1070                            upstream_actors: new_sink_actors.collect(),
1071                        };
1072                        HashMap::from([(
1073                            new_sink_downstream.downstream_fragment_id,
1074                            new_upstream_sink,
1075                        )])
1076                    } else {
1077                        HashMap::new()
1078                    };
1079
1080                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1081                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1082
1083                let add_mutation = AddMutation {
1084                    actor_dispatchers: edges
1085                        .dispatchers
1086                        .extract_if(|fragment_id, _| {
1087                            upstream_fragment_downstreams.contains_key(fragment_id)
1088                        })
1089                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1090                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1091                        .collect(),
1092                    added_actors,
1093                    actor_splits,
1094                    // If the cluster is already paused, the new actors should be paused too.
1095                    pause: is_currently_paused,
1096                    subscriptions_to_add,
1097                    backfill_nodes_to_pause,
1098                    actor_cdc_table_snapshot_splits,
1099                    new_upstream_sinks,
1100                };
1101
1102                Ok(Mutation::Add(add_mutation))
1103            }
1104        }
1105    }
1106
1107    /// Build the `Update` mutation for `ReplaceStreamJob`.
1108    pub(super) fn replace_stream_job_to_mutation(
1109        ReplaceStreamJobPlan {
1110            old_fragments,
1111            replace_upstream,
1112            upstream_fragment_downstreams,
1113            auto_refresh_schema_sinks,
1114            ..
1115        }: &ReplaceStreamJobPlan,
1116        edges: &mut FragmentEdgeBuildResult,
1117        database_info: &mut InflightDatabaseInfo,
1118        split_assignment: &SplitAssignment,
1119    ) -> MetaResult<Option<Mutation>> {
1120        {
1121            {
1122                let merge_updates = edges
1123                    .merge_updates
1124                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1125                    .collect();
1126                let dispatchers = edges
1127                    .dispatchers
1128                    .extract_if(|fragment_id, _| {
1129                        upstream_fragment_downstreams.contains_key(fragment_id)
1130                    })
1131                    .collect();
1132                let actor_cdc_table_snapshot_splits = database_info
1133                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1134                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1135                let old_fragments = old_fragments.fragments.keys().copied();
1136                let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1137                    .as_ref()
1138                    .into_iter()
1139                    .flat_map(|sinks| sinks.iter())
1140                    .map(|sink| sink.original_fragment.fragment_id);
1141                Ok(Self::generate_update_mutation_for_replace_table(
1142                    old_fragments
1143                        .chain(auto_refresh_sink_fragment_ids)
1144                        .flat_map(|fragment_id| {
1145                            database_info.fragment(fragment_id).actors.keys().copied()
1146                        }),
1147                    merge_updates,
1148                    dispatchers,
1149                    split_assignment,
1150                    actor_cdc_table_snapshot_splits,
1151                    auto_refresh_schema_sinks.as_ref(),
1152                ))
1153            }
1154        }
1155    }
1156
1157    /// Build the `Update` mutation for `RescheduleIntent`.
1158    pub(super) fn reschedule_to_mutation(
1159        reschedules: &HashMap<FragmentId, Reschedule>,
1160        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1161        control_stream_manager: &ControlStreamManager,
1162        database_info: &mut InflightDatabaseInfo,
1163    ) -> MetaResult<Option<Mutation>> {
1164        {
1165            {
1166                let database_id = database_info.database_id;
1167                let mut dispatcher_update = HashMap::new();
1168                for reschedule in reschedules.values() {
1169                    for &(upstream_fragment_id, dispatcher_id) in
1170                        &reschedule.upstream_fragment_dispatcher_ids
1171                    {
1172                        // Find the actors of the upstream fragment.
1173                        let upstream_actor_ids = fragment_actors
1174                            .get(&upstream_fragment_id)
1175                            .expect("should contain");
1176
1177                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1178
1179                        // Record updates for all actors.
1180                        for &actor_id in upstream_actor_ids {
1181                            let added_downstream_actor_id = if upstream_reschedule
1182                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1183                                .unwrap_or(true)
1184                            {
1185                                reschedule
1186                                    .added_actors
1187                                    .values()
1188                                    .flatten()
1189                                    .cloned()
1190                                    .collect()
1191                            } else {
1192                                Default::default()
1193                            };
1194                            // Index with the dispatcher id to check duplicates.
1195                            dispatcher_update
1196                                .try_insert(
1197                                    (actor_id, dispatcher_id),
1198                                    DispatcherUpdate {
1199                                        actor_id,
1200                                        dispatcher_id,
1201                                        hash_mapping: reschedule
1202                                            .upstream_dispatcher_mapping
1203                                            .as_ref()
1204                                            .map(|m| m.to_protobuf()),
1205                                        added_downstream_actor_id,
1206                                        removed_downstream_actor_id: reschedule
1207                                            .removed_actors
1208                                            .iter()
1209                                            .cloned()
1210                                            .collect(),
1211                                    },
1212                                )
1213                                .unwrap();
1214                        }
1215                    }
1216                }
1217                let dispatcher_update = dispatcher_update.into_values().collect();
1218
1219                let mut merge_update = HashMap::new();
1220                for (&fragment_id, reschedule) in reschedules {
1221                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1222                        // Find the actors of the downstream fragment.
1223                        let downstream_actor_ids = fragment_actors
1224                            .get(&downstream_fragment_id)
1225                            .expect("should contain");
1226
1227                        // Downstream removed actors should be skipped
1228                        // Newly created actors of the current fragment will not dispatch Update
1229                        // barriers to them
1230                        let downstream_removed_actors: HashSet<_> = reschedules
1231                            .get(&downstream_fragment_id)
1232                            .map(|downstream_reschedule| {
1233                                downstream_reschedule
1234                                    .removed_actors
1235                                    .iter()
1236                                    .copied()
1237                                    .collect()
1238                            })
1239                            .unwrap_or_default();
1240
1241                        // Record updates for all actors.
1242                        for &actor_id in downstream_actor_ids {
1243                            if downstream_removed_actors.contains(&actor_id) {
1244                                continue;
1245                            }
1246
1247                            // Index with the fragment id to check duplicates.
1248                            merge_update
1249                                .try_insert(
1250                                    (actor_id, fragment_id),
1251                                    MergeUpdate {
1252                                        actor_id,
1253                                        upstream_fragment_id: fragment_id,
1254                                        new_upstream_fragment_id: None,
1255                                        added_upstream_actors: reschedule
1256                                            .added_actors
1257                                            .iter()
1258                                            .flat_map(|(worker_id, actors)| {
1259                                                let host =
1260                                                    control_stream_manager.host_addr(*worker_id);
1261                                                actors.iter().map(move |&actor_id| PbActorInfo {
1262                                                    actor_id,
1263                                                    host: Some(host.clone()),
1264                                                    // we assume that we only scale the partial graph of database
1265                                                    partial_graph_id: to_partial_graph_id(
1266                                                        database_id,
1267                                                        None,
1268                                                    ),
1269                                                })
1270                                            })
1271                                            .collect(),
1272                                        removed_upstream_actor_id: reschedule
1273                                            .removed_actors
1274                                            .iter()
1275                                            .cloned()
1276                                            .collect(),
1277                                    },
1278                                )
1279                                .unwrap();
1280                        }
1281                    }
1282                }
1283                let merge_update = merge_update.into_values().collect();
1284
1285                let mut actor_vnode_bitmap_update = HashMap::new();
1286                for reschedule in reschedules.values() {
1287                    // Record updates for all actors in this fragment.
1288                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1289                        let bitmap = bitmap.to_protobuf();
1290                        actor_vnode_bitmap_update
1291                            .try_insert(actor_id, bitmap)
1292                            .unwrap();
1293                    }
1294                }
1295                let dropped_actors = reschedules
1296                    .values()
1297                    .flat_map(|r| r.removed_actors.iter().copied())
1298                    .collect();
1299                let mut actor_splits = HashMap::new();
1300                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1301                for (fragment_id, reschedule) in reschedules {
1302                    for (actor_id, splits) in &reschedule.actor_splits {
1303                        actor_splits.insert(
1304                            *actor_id,
1305                            ConnectorSplits {
1306                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1307                            },
1308                        );
1309                    }
1310
1311                    if let Some(assignment) =
1312                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1313                    {
1314                        actor_cdc_table_snapshot_splits.extend(assignment)
1315                    }
1316                }
1317
1318                // we don't create dispatchers in reschedule scenario
1319                let actor_new_dispatchers = HashMap::new();
1320                let mutation = Mutation::Update(UpdateMutation {
1321                    dispatcher_update,
1322                    merge_update,
1323                    actor_vnode_bitmap_update,
1324                    dropped_actors,
1325                    actor_splits,
1326                    actor_new_dispatchers,
1327                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1328                        splits: actor_cdc_table_snapshot_splits,
1329                    }),
1330                    sink_schema_change: Default::default(),
1331                    subscriptions_to_drop: vec![],
1332                });
1333                tracing::debug!("update mutation: {mutation:?}");
1334                Ok(Some(mutation))
1335            }
1336        }
1337    }
1338
1339    /// Build the `Add` mutation for `CreateSubscription`.
1340    pub(super) fn create_subscription_to_mutation(
1341        upstream_mv_table_id: TableId,
1342        subscription_id: SubscriptionId,
1343    ) -> Mutation {
1344        {
1345            Mutation::Add(AddMutation {
1346                actor_dispatchers: Default::default(),
1347                added_actors: vec![],
1348                actor_splits: Default::default(),
1349                pause: false,
1350                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1351                    upstream_mv_table_id,
1352                    subscriber_id: subscription_id.as_subscriber_id(),
1353                }],
1354                backfill_nodes_to_pause: vec![],
1355                actor_cdc_table_snapshot_splits: None,
1356                new_upstream_sinks: Default::default(),
1357            })
1358        }
1359    }
1360
1361    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1362    pub(super) fn drop_subscription_to_mutation(
1363        upstream_mv_table_id: TableId,
1364        subscription_id: SubscriptionId,
1365    ) -> Mutation {
1366        {
1367            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1368                info: vec![SubscriptionUpstreamInfo {
1369                    subscriber_id: subscription_id.as_subscriber_id(),
1370                    upstream_mv_table_id,
1371                }],
1372            })
1373        }
1374    }
1375
1376    /// Build the `ConnectorPropsChange` mutation.
1377    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1378        {
1379            {
1380                let mut connector_props_infos = HashMap::default();
1381                for (k, v) in config {
1382                    connector_props_infos.insert(
1383                        k.as_raw_id(),
1384                        ConnectorPropsInfo {
1385                            connector_props_info: v.clone(),
1386                        },
1387                    );
1388                }
1389                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1390                    connector_props_infos,
1391                })
1392            }
1393        }
1394    }
1395
1396    /// Build the `RefreshStart` mutation.
1397    pub(super) fn refresh_to_mutation(
1398        table_id: TableId,
1399        associated_source_id: SourceId,
1400    ) -> Mutation {
1401        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1402            table_id,
1403            associated_source_id,
1404        })
1405    }
1406
1407    /// Build the `ListFinish` mutation.
1408    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1409        Mutation::ListFinish(ListFinishMutation {
1410            associated_source_id,
1411        })
1412    }
1413
1414    /// Build the `LoadFinish` mutation.
1415    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1416        Mutation::LoadFinish(LoadFinishMutation {
1417            associated_source_id,
1418        })
1419    }
1420
1421    /// Build the `ResetSource` mutation.
1422    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1423        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1424            source_id: source_id.as_raw_id(),
1425        })
1426    }
1427
1428    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1429    pub(super) fn resume_backfill_to_mutation(
1430        target: &ResumeBackfillTarget,
1431        database_info: &InflightDatabaseInfo,
1432    ) -> MetaResult<Option<Mutation>> {
1433        {
1434            {
1435                let fragment_ids: HashSet<_> = match target {
1436                    ResumeBackfillTarget::Job(job_id) => {
1437                        database_info.backfill_fragment_ids_for_job(*job_id)?
1438                    }
1439                    ResumeBackfillTarget::Fragment(fragment_id) => {
1440                        if !database_info.is_backfill_fragment(*fragment_id)? {
1441                            return Err(MetaError::invalid_parameter(format!(
1442                                "fragment {} is not a backfill node",
1443                                fragment_id
1444                            )));
1445                        }
1446                        HashSet::from([*fragment_id])
1447                    }
1448                };
1449                if fragment_ids.is_empty() {
1450                    warn!(
1451                        ?target,
1452                        "resume backfill command ignored because no backfill fragments found"
1453                    );
1454                    Ok(None)
1455                } else {
1456                    Ok(Some(Mutation::StartFragmentBackfill(
1457                        StartFragmentBackfillMutation {
1458                            fragment_ids: fragment_ids.into_iter().collect(),
1459                        },
1460                    )))
1461                }
1462            }
1463        }
1464    }
1465
1466    /// Build the `InjectSourceOffsets` mutation.
1467    pub(super) fn inject_source_offsets_to_mutation(
1468        source_id: SourceId,
1469        split_offsets: &HashMap<String, String>,
1470    ) -> Mutation {
1471        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1472            source_id: source_id.as_raw_id(),
1473            split_offsets: split_offsets.clone(),
1474        })
1475    }
1476
1477    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1478    pub(super) fn create_streaming_job_actors_to_create(
1479        info: &CreateStreamingJobCommandInfo,
1480        edges: &mut FragmentEdgeBuildResult,
1481        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1482        actor_location: &HashMap<ActorId, WorkerId>,
1483    ) -> StreamJobActorsToCreate {
1484        {
1485            {
1486                edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1487                    |fragment| {
1488                        let actors = stream_actors
1489                            .get(&fragment.fragment_id)
1490                            .into_iter()
1491                            .flatten()
1492                            .map(|actor| (actor, actor_location[&actor.actor_id]));
1493                        (
1494                            fragment.fragment_id,
1495                            &fragment.nodes,
1496                            actors,
1497                            [], // no subscriber for new job to create
1498                        )
1499                    },
1500                ))
1501            }
1502        }
1503    }
1504
1505    /// Collect actors to create for `RescheduleIntent`.
1506    pub(super) fn reschedule_actors_to_create(
1507        reschedules: &HashMap<FragmentId, Reschedule>,
1508        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1509        database_info: &InflightDatabaseInfo,
1510        control_stream_manager: &ControlStreamManager,
1511    ) -> StreamJobActorsToCreate {
1512        {
1513            {
1514                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1515                    reschedules.iter().map(|(fragment_id, reschedule)| {
1516                        (
1517                            *fragment_id,
1518                            reschedule.newly_created_actors.values().map(
1519                                |((actor, dispatchers), _)| {
1520                                    (actor.actor_id, dispatchers.as_slice())
1521                                },
1522                            ),
1523                        )
1524                    }),
1525                    Some((reschedules, fragment_actors)),
1526                    database_info,
1527                    control_stream_manager,
1528                );
1529                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1530                for (fragment_id, (actor, dispatchers), worker_id) in
1531                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1532                        reschedule
1533                            .newly_created_actors
1534                            .values()
1535                            .map(|(actors, status)| (*fragment_id, actors, status))
1536                    })
1537                {
1538                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1539                    map.entry(*worker_id)
1540                        .or_default()
1541                        .entry(fragment_id)
1542                        .or_insert_with(|| {
1543                            let node = database_info.fragment(fragment_id).nodes.clone();
1544                            let subscribers =
1545                                database_info.fragment_subscribers(fragment_id).collect();
1546                            (node, vec![], subscribers)
1547                        })
1548                        .1
1549                        .push((actor.clone(), upstreams, dispatchers.clone()));
1550                }
1551                map
1552            }
1553        }
1554    }
1555
1556    /// Collect actors to create for `ReplaceStreamJob`.
1557    pub(super) fn replace_stream_job_actors_to_create(
1558        replace_table: &ReplaceStreamJobPlan,
1559        edges: &mut FragmentEdgeBuildResult,
1560        database_info: &InflightDatabaseInfo,
1561        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1562        actor_location: &HashMap<ActorId, WorkerId>,
1563    ) -> StreamJobActorsToCreate {
1564        {
1565            {
1566                let mut actors = edges.collect_actors_to_create(
1567                    replace_table
1568                        .new_fragments
1569                        .fragments
1570                        .values()
1571                        .map(|fragment| {
1572                            let actors = stream_actors
1573                                .get(&fragment.fragment_id)
1574                                .into_iter()
1575                                .flatten()
1576                                .map(|actor| (actor, actor_location[&actor.actor_id]));
1577                            (
1578                                fragment.fragment_id,
1579                                &fragment.nodes,
1580                                actors,
1581                                database_info
1582                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1583                            )
1584                        }),
1585                );
1586
1587                // Handle auto-refresh schema sinks
1588                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1589                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1590                        (
1591                            sink.new_fragment.fragment_id,
1592                            &sink.new_fragment.nodes,
1593                            stream_actors
1594                                .get(&sink.new_fragment.fragment_id)
1595                                .into_iter()
1596                                .flatten()
1597                                .map(|actor| (actor, actor_location[&actor.actor_id])),
1598                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1599                        )
1600                    }));
1601                    for (worker_id, fragment_actors) in sink_actors {
1602                        actors.entry(worker_id).or_default().extend(fragment_actors);
1603                    }
1604                }
1605                actors
1606            }
1607        }
1608    }
1609
1610    fn generate_update_mutation_for_replace_table(
1611        dropped_actors: impl IntoIterator<Item = ActorId>,
1612        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1613        dispatchers: FragmentActorDispatchers,
1614        split_assignment: &SplitAssignment,
1615        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1616        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1617    ) -> Option<Mutation> {
1618        let dropped_actors = dropped_actors.into_iter().collect();
1619
1620        let actor_new_dispatchers = dispatchers
1621            .into_values()
1622            .flatten()
1623            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1624            .collect();
1625
1626        let actor_splits = split_assignment
1627            .values()
1628            .flat_map(build_actor_connector_splits)
1629            .collect();
1630        Some(Mutation::Update(UpdateMutation {
1631            actor_new_dispatchers,
1632            merge_update: merge_updates.into_values().flatten().collect(),
1633            dropped_actors,
1634            actor_splits,
1635            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1636            sink_schema_change: auto_refresh_schema_sinks
1637                .as_ref()
1638                .into_iter()
1639                .flat_map(|sinks| {
1640                    sinks.iter().map(|sink| {
1641                        let op = if !sink.removed_column_names.is_empty() {
1642                            PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1643                                column_names: sink.removed_column_names.clone(),
1644                            })
1645                        } else {
1646                            PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1647                                fields: sink
1648                                    .newly_add_fields
1649                                    .iter()
1650                                    .map(|field| field.to_prost())
1651                                    .collect(),
1652                            })
1653                        };
1654                        (
1655                            sink.original_sink.id.as_raw_id(),
1656                            PbSinkSchemaChange {
1657                                original_schema: sink
1658                                    .original_sink
1659                                    .columns
1660                                    .iter()
1661                                    .map(|col| PbField {
1662                                        data_type: Some(
1663                                            col.column_desc
1664                                                .as_ref()
1665                                                .unwrap()
1666                                                .column_type
1667                                                .as_ref()
1668                                                .unwrap()
1669                                                .clone(),
1670                                        ),
1671                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1672                                    })
1673                                    .collect(),
1674                                op: Some(op),
1675                            },
1676                        )
1677                    })
1678                })
1679                .collect(),
1680            ..Default::default()
1681        }))
1682    }
1683}
1684
1685impl Command {
1686    #[expect(clippy::type_complexity)]
1687    pub(super) fn collect_database_partial_graph_actor_upstreams(
1688        actor_dispatchers: impl Iterator<
1689            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1690        >,
1691        reschedule_dispatcher_update: Option<(
1692            &HashMap<FragmentId, Reschedule>,
1693            &HashMap<FragmentId, HashSet<ActorId>>,
1694        )>,
1695        database_info: &InflightDatabaseInfo,
1696        control_stream_manager: &ControlStreamManager,
1697    ) -> HashMap<ActorId, ActorUpstreams> {
1698        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1699        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1700            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1701            for (upstream_actor_id, dispatchers) in upstream_actors {
1702                let upstream_actor_location =
1703                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1704                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1705                for downstream_actor_id in dispatchers
1706                    .iter()
1707                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1708                {
1709                    actor_upstreams
1710                        .entry(*downstream_actor_id)
1711                        .or_default()
1712                        .entry(upstream_fragment_id)
1713                        .or_default()
1714                        .insert(
1715                            upstream_actor_id,
1716                            PbActorInfo {
1717                                actor_id: upstream_actor_id,
1718                                host: Some(upstream_actor_host.clone()),
1719                                partial_graph_id: to_partial_graph_id(
1720                                    database_info.database_id,
1721                                    None,
1722                                ),
1723                            },
1724                        );
1725                }
1726            }
1727        }
1728        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1729            for reschedule in reschedules.values() {
1730                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1731                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1732                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1733                    for upstream_actor_id in fragment_actors
1734                        .get(upstream_fragment_id)
1735                        .expect("should exist")
1736                    {
1737                        let upstream_actor_location =
1738                            upstream_fragment.actors[upstream_actor_id].worker_id;
1739                        let upstream_actor_host =
1740                            control_stream_manager.host_addr(upstream_actor_location);
1741                        if let Some(upstream_reschedule) = upstream_reschedule
1742                            && upstream_reschedule
1743                                .removed_actors
1744                                .contains(upstream_actor_id)
1745                        {
1746                            continue;
1747                        }
1748                        for (_, downstream_actor_id) in
1749                            reschedule
1750                                .added_actors
1751                                .iter()
1752                                .flat_map(|(worker_id, actors)| {
1753                                    actors.iter().map(|actor| (*worker_id, *actor))
1754                                })
1755                        {
1756                            actor_upstreams
1757                                .entry(downstream_actor_id)
1758                                .or_default()
1759                                .entry(*upstream_fragment_id)
1760                                .or_default()
1761                                .insert(
1762                                    *upstream_actor_id,
1763                                    PbActorInfo {
1764                                        actor_id: *upstream_actor_id,
1765                                        host: Some(upstream_actor_host.clone()),
1766                                        partial_graph_id: to_partial_graph_id(
1767                                            database_info.database_id,
1768                                            None,
1769                                        ),
1770                                    },
1771                                );
1772                        }
1773                    }
1774                }
1775            }
1776        }
1777        actor_upstreams
1778    }
1779}