risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
49    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
50    StopMutation, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
51};
52use risingwave_pb::stream_service::BarrierCompleteResponse;
53use tracing::warn;
54
55use super::info::InflightDatabaseInfo;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::edge_builder::FragmentEdgeBuildResult;
58use crate::barrier::info::BarrierInfo;
59use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
60use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
61use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
62use crate::controller::scale::LoadedFragmentContext;
63use crate::controller::utils::StreamingJobExtraInfo;
64use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
65use crate::manager::{StreamingJob, StreamingJobType};
66use crate::model::{
67    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
68    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
69    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
70};
71use crate::stream::{
72    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
73    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
74    build_actor_connector_splits,
75};
76use crate::{MetaError, MetaResult};
77
78/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
79/// used for actor scaling or migration.
80#[derive(Debug, Clone)]
81pub struct Reschedule {
82    /// Added actors in this fragment.
83    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
84
85    /// Removed actors in this fragment.
86    pub removed_actors: HashSet<ActorId>,
87
88    /// Vnode bitmap updates for some actors in this fragment.
89    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
90
91    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
92    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
93    /// New hash mapping of the upstream dispatcher to be updated.
94    ///
95    /// This field exists only when there's upstream fragment and the current fragment is
96    /// hash-sharded.
97    pub upstream_dispatcher_mapping: Option<ActorMapping>,
98
99    /// The downstream fragments of this fragment.
100    pub downstream_fragment_ids: Vec<FragmentId>,
101
102    /// Reassigned splits for source actors.
103    /// It becomes the `actor_splits` in [`UpdateMutation`].
104    /// `Source` and `SourceBackfill` are handled together here.
105    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
106
107    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
108}
109
110#[derive(Debug, Clone)]
111pub struct ReschedulePlan {
112    pub reschedules: HashMap<FragmentId, Reschedule>,
113    /// Should contain the actor ids in upstream and downstream fragments referenced by
114    /// `reschedules`.
115    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
116}
117
118/// Preloaded context for rescheduling, built outside the barrier worker.
119#[derive(Debug, Clone)]
120pub struct RescheduleContext {
121    pub loaded: LoadedFragmentContext,
122    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
123    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
124    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
126}
127
128impl RescheduleContext {
129    pub fn empty() -> Self {
130        Self {
131            loaded: LoadedFragmentContext::default(),
132            job_extra_info: HashMap::new(),
133            upstream_fragments: HashMap::new(),
134            downstream_fragments: HashMap::new(),
135            downstream_relations: HashMap::new(),
136        }
137    }
138
139    pub fn is_empty(&self) -> bool {
140        self.loaded.is_empty()
141    }
142
143    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
144        let loaded = self.loaded.for_database(database_id)?;
145        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
146        // Use the filtered loaded context as the source of truth so every side map is pruned by
147        // the same fragment set.
148        let fragment_ids: HashSet<FragmentId> = loaded
149            .job_fragments
150            .values()
151            .flat_map(|fragments| fragments.keys().copied())
152            .collect();
153
154        let job_extra_info = self
155            .job_extra_info
156            .iter()
157            .filter(|(job_id, _)| job_ids.contains(*job_id))
158            .map(|(job_id, info)| (*job_id, info.clone()))
159            .collect();
160
161        let upstream_fragments = self
162            .upstream_fragments
163            .iter()
164            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
165            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
166            .collect();
167
168        let downstream_fragments = self
169            .downstream_fragments
170            .iter()
171            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
172            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
173            .collect();
174
175        let downstream_relations = self
176            .downstream_relations
177            .iter()
178            // Ownership of this map is source-fragment based. We keep all downstream edges for
179            // selected sources because the target side can still be referenced during dispatcher
180            // reconstruction even if that target fragment is not being rescheduled.
181            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
182            .map(|(key, relation)| (*key, relation.clone()))
183            .collect();
184
185        Some(Self {
186            loaded,
187            job_extra_info,
188            upstream_fragments,
189            downstream_fragments,
190            downstream_relations,
191        })
192    }
193
194    /// Split this context into per-database contexts without cloning the large loaded graph
195    /// payloads.
196    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
197        let Self {
198            loaded,
199            job_extra_info,
200            upstream_fragments,
201            downstream_fragments,
202            downstream_relations,
203        } = self;
204
205        let mut contexts: HashMap<_, _> = loaded
206            .into_database_contexts()
207            .into_iter()
208            .map(|(database_id, loaded)| {
209                (
210                    database_id,
211                    Self {
212                        loaded,
213                        job_extra_info: HashMap::new(),
214                        upstream_fragments: HashMap::new(),
215                        downstream_fragments: HashMap::new(),
216                        downstream_relations: HashMap::new(),
217                    },
218                )
219            })
220            .collect();
221
222        if contexts.is_empty() {
223            return contexts;
224        }
225
226        let mut job_databases = HashMap::new();
227        let mut fragment_databases = HashMap::new();
228        for (&database_id, context) in &contexts {
229            for job_id in context.loaded.job_map.keys().copied() {
230                job_databases.insert(job_id, database_id);
231            }
232            for fragment_id in context
233                .loaded
234                .job_fragments
235                .values()
236                .flat_map(|fragments| fragments.keys().copied())
237            {
238                fragment_databases.insert(fragment_id, database_id);
239            }
240        }
241
242        for (job_id, info) in job_extra_info {
243            if let Some(database_id) = job_databases.get(&job_id).copied() {
244                contexts
245                    .get_mut(&database_id)
246                    .expect("database context should exist for job")
247                    .job_extra_info
248                    .insert(job_id, info);
249            }
250        }
251
252        for (fragment_id, upstreams) in upstream_fragments {
253            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
254                contexts
255                    .get_mut(&database_id)
256                    .expect("database context should exist for fragment")
257                    .upstream_fragments
258                    .insert(fragment_id, upstreams);
259            }
260        }
261
262        for (fragment_id, downstreams) in downstream_fragments {
263            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
264                contexts
265                    .get_mut(&database_id)
266                    .expect("database context should exist for fragment")
267                    .downstream_fragments
268                    .insert(fragment_id, downstreams);
269            }
270        }
271
272        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
273            // Route by source fragment ownership. A target may be outside of current reschedule
274            // set, but this edge still belongs to the source-side command.
275            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
276                contexts
277                    .get_mut(&database_id)
278                    .expect("database context should exist for relation source")
279                    .downstream_relations
280                    .insert((source_fragment_id, target_fragment_id), relation);
281            }
282        }
283
284        contexts
285    }
286}
287
288/// Replacing an old job with a new one. All actors in the job will be rebuilt.
289///
290/// Current use cases:
291/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
292/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
293///   will replace a table job's plan.
294#[derive(Debug, Clone)]
295pub struct ReplaceStreamJobPlan {
296    pub old_fragments: StreamJobFragments,
297    pub new_fragments: StreamJobFragmentsToCreate,
298    /// Downstream jobs of the replaced job need to update their `Merge` node to
299    /// connect to the new fragment.
300    pub replace_upstream: FragmentReplaceUpstream,
301    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
302    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
303    /// inside the barrier worker.
304    pub split_plan: ReplaceJobSplitPlan,
305    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
306    pub streaming_job: StreamingJob,
307    /// The temporary dummy job fragments id of new table fragment
308    pub tmp_id: JobId,
309    /// The state table ids to be dropped.
310    pub to_drop_state_table_ids: Vec<TableId>,
311    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
312}
313
314impl ReplaceStreamJobPlan {
315    /// `old_fragment_id` -> `new_fragment_id`
316    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
317        let mut fragment_replacements = HashMap::new();
318        for (upstream_fragment_id, new_upstream_fragment_id) in
319            self.replace_upstream.values().flatten()
320        {
321            {
322                let r =
323                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
324                if let Some(r) = r {
325                    assert_eq!(
326                        *new_upstream_fragment_id, r,
327                        "one fragment is replaced by multiple fragments"
328                    );
329                }
330            }
331        }
332        fragment_replacements
333    }
334}
335
336#[derive(educe::Educe, Clone)]
337#[educe(Debug)]
338pub struct CreateStreamingJobCommandInfo {
339    #[educe(Debug(ignore))]
340    pub stream_job_fragments: StreamJobFragmentsToCreate,
341    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
342    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
343    pub init_split_assignment: SourceSplitAssignment,
344    pub definition: String,
345    pub job_type: StreamingJobType,
346    pub create_type: CreateType,
347    pub streaming_job: StreamingJob,
348    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
349    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
350    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
351    pub is_serverless: bool,
352}
353
354impl StreamJobFragments {
355    /// Build fragment-level info for new fragments, populating actor splits from the given assignment.
356    pub(super) fn new_fragment_info<'a>(
357        &'a self,
358        assignment: &'a SplitAssignment,
359    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
360        self.fragments.values().map(|fragment| {
361            let mut fragment_splits = assignment
362                .get(&fragment.fragment_id)
363                .cloned()
364                .unwrap_or_default();
365
366            (
367                fragment.fragment_id,
368                InflightFragmentInfo {
369                    fragment_id: fragment.fragment_id,
370                    distribution_type: fragment.distribution_type.into(),
371                    fragment_type_mask: fragment.fragment_type_mask,
372                    vnode_count: fragment.vnode_count(),
373                    nodes: fragment.nodes.clone(),
374                    actors: fragment
375                        .actors
376                        .iter()
377                        .map(|actor| {
378                            (
379                                actor.actor_id,
380                                InflightActorInfo {
381                                    worker_id: self
382                                        .actor_status
383                                        .get(&actor.actor_id)
384                                        .expect("should exist")
385                                        .worker_id(),
386                                    vnode_bitmap: actor.vnode_bitmap.clone(),
387                                    splits: fragment_splits
388                                        .remove(&actor.actor_id)
389                                        .unwrap_or_default(),
390                                },
391                            )
392                        })
393                        .collect(),
394                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
395                },
396            )
397        })
398    }
399}
400
401#[derive(Debug, Clone)]
402pub struct SnapshotBackfillInfo {
403    /// `table_id` -> `Some(snapshot_backfill_epoch)`
404    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
405    /// by global barrier worker when handling the command.
406    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
407}
408
409#[derive(Debug, Clone)]
410pub enum CreateStreamingJobType {
411    Normal,
412    SinkIntoTable(UpstreamSinkInfo),
413    SnapshotBackfill(SnapshotBackfillInfo),
414}
415
416/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
417/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
418/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
419// FIXME: this enum is significantly large on stack, box it
420#[derive(Debug)]
421pub enum Command {
422    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
423    /// all messages before the checkpoint barrier should have been committed.
424    Flush,
425
426    /// `Pause` command generates a `Pause` barrier **only if**
427    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
428    Pause,
429
430    /// `Resume` command generates a `Resume` barrier **only
431    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
432    /// will be generated.
433    Resume,
434
435    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
436    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
437    /// dropped by reference counts before.
438    ///
439    /// Barriers from the actors to be dropped will STILL be collected.
440    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
441    /// drop actors, and then delete the job fragments info from meta store.
442    DropStreamingJobs {
443        streaming_job_ids: HashSet<JobId>,
444        actors: Vec<ActorId>,
445        unregistered_state_table_ids: HashSet<TableId>,
446        unregistered_fragment_ids: HashSet<FragmentId>,
447        // target_fragment -> [sink_fragments]
448        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
449    },
450
451    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
452    ///
453    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
454    /// be collected since the barrier should be passthrough.
455    ///
456    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
457    /// it adds the job fragments info to meta store. However, the creating progress will **last
458    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
459    /// will be set to `Created`.
460    CreateStreamingJob {
461        info: CreateStreamingJobCommandInfo,
462        job_type: CreateStreamingJobType,
463        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
464    },
465
466    /// Reschedule context. It must be resolved inside the barrier worker before injection.
467    RescheduleIntent {
468        context: RescheduleContext,
469        /// Filled by the barrier worker after resolving `context` against current worker topology.
470        ///
471        /// We keep unresolved `context` outside of checkpoint state and only materialize this
472        /// execution plan right before injection, then drop `context` to release memory earlier.
473        reschedule_plan: Option<ReschedulePlan>,
474    },
475
476    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
477    /// essentially switching the downstream of the old job fragments to the new ones, and
478    /// dropping the old job fragments. Used for schema change.
479    ///
480    /// This can be treated as a special case of reschedule, while the upstream fragment
481    /// of the Merge executors are changed additionally.
482    ReplaceStreamJob(ReplaceStreamJobPlan),
483
484    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
485    /// changed splits.
486    SourceChangeSplit(SplitState),
487
488    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
489    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
490    Throttle {
491        jobs: HashSet<JobId>,
492        config: HashMap<FragmentId, ThrottleConfig>,
493    },
494
495    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
496    /// materialize executor to start storing old value for subscription.
497    CreateSubscription {
498        subscription_id: SubscriptionId,
499        upstream_mv_table_id: TableId,
500        retention_second: u64,
501    },
502
503    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
504    /// materialize executor to stop storing old value when there is no
505    /// subscription depending on it.
506    DropSubscription {
507        subscription_id: SubscriptionId,
508        upstream_mv_table_id: TableId,
509    },
510
511    /// `AlterSubscriptionRetention` command updates the subscription retention time.
512    AlterSubscriptionRetention {
513        subscription_id: SubscriptionId,
514        upstream_mv_table_id: TableId,
515        retention_second: u64,
516    },
517
518    ConnectorPropsChange(ConnectorPropsChange),
519
520    /// `Refresh` command generates a barrier to refresh a table by truncating state
521    /// and reloading data from source.
522    Refresh {
523        table_id: TableId,
524        associated_source_id: SourceId,
525    },
526    ListFinish {
527        table_id: TableId,
528        associated_source_id: SourceId,
529    },
530    LoadFinish {
531        table_id: TableId,
532        associated_source_id: SourceId,
533    },
534
535    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
536    /// Used when upstream binlog/oplog has expired.
537    ResetSource {
538        source_id: SourceId,
539    },
540
541    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
542    /// to resume for troubleshooting.
543    ResumeBackfill {
544        target: ResumeBackfillTarget,
545    },
546
547    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
548    /// into source splits (UNSAFE - admin only).
549    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
550    InjectSourceOffsets {
551        source_id: SourceId,
552        /// Split ID -> offset (JSON-encoded based on connector type)
553        split_offsets: HashMap<String, String>,
554    },
555}
556
557#[derive(Debug, Clone, Copy)]
558pub enum ResumeBackfillTarget {
559    Job(JobId),
560    Fragment(FragmentId),
561}
562
563// For debugging and observability purposes. Can add more details later if needed.
564impl std::fmt::Display for Command {
565    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
566        match self {
567            Command::Flush => write!(f, "Flush"),
568            Command::Pause => write!(f, "Pause"),
569            Command::Resume => write!(f, "Resume"),
570            Command::DropStreamingJobs {
571                streaming_job_ids, ..
572            } => {
573                write!(
574                    f,
575                    "DropStreamingJobs: {}",
576                    streaming_job_ids.iter().sorted().join(", ")
577                )
578            }
579            Command::CreateStreamingJob { info, .. } => {
580                write!(f, "CreateStreamingJob: {}", info.streaming_job)
581            }
582            Command::RescheduleIntent {
583                reschedule_plan, ..
584            } => {
585                if reschedule_plan.is_some() {
586                    write!(f, "RescheduleIntent(planned)")
587                } else {
588                    write!(f, "RescheduleIntent")
589                }
590            }
591            Command::ReplaceStreamJob(plan) => {
592                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
593            }
594            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
595            Command::Throttle { .. } => write!(f, "Throttle"),
596            Command::CreateSubscription {
597                subscription_id, ..
598            } => write!(f, "CreateSubscription: {subscription_id}"),
599            Command::DropSubscription {
600                subscription_id, ..
601            } => write!(f, "DropSubscription: {subscription_id}"),
602            Command::AlterSubscriptionRetention {
603                subscription_id,
604                retention_second,
605                ..
606            } => write!(
607                f,
608                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
609            ),
610            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
611            Command::Refresh {
612                table_id,
613                associated_source_id,
614            } => write!(
615                f,
616                "Refresh: {} (source: {})",
617                table_id, associated_source_id
618            ),
619            Command::ListFinish {
620                table_id,
621                associated_source_id,
622            } => write!(
623                f,
624                "ListFinish: {} (source: {})",
625                table_id, associated_source_id
626            ),
627            Command::LoadFinish {
628                table_id,
629                associated_source_id,
630            } => write!(
631                f,
632                "LoadFinish: {} (source: {})",
633                table_id, associated_source_id
634            ),
635            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
636            Command::ResumeBackfill { target } => match target {
637                ResumeBackfillTarget::Job(job_id) => {
638                    write!(f, "ResumeBackfill: job={job_id}")
639                }
640                ResumeBackfillTarget::Fragment(fragment_id) => {
641                    write!(f, "ResumeBackfill: fragment={fragment_id}")
642                }
643            },
644            Command::InjectSourceOffsets {
645                source_id,
646                split_offsets,
647            } => write!(
648                f,
649                "InjectSourceOffsets: {} ({} splits)",
650                source_id,
651                split_offsets.len()
652            ),
653        }
654    }
655}
656
657impl Command {
658    pub fn pause() -> Self {
659        Self::Pause
660    }
661
662    pub fn resume() -> Self {
663        Self::Resume
664    }
665
666    pub fn need_checkpoint(&self) -> bool {
667        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
668        !matches!(self, Command::Resume)
669    }
670}
671
672#[derive(Debug)]
673pub enum PostCollectCommand {
674    Command(String),
675    DropStreamingJobs {
676        streaming_job_ids: HashSet<JobId>,
677        unregistered_state_table_ids: HashSet<TableId>,
678    },
679    CreateStreamingJob {
680        info: CreateStreamingJobCommandInfo,
681        job_type: CreateStreamingJobType,
682        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
683        resolved_split_assignment: SplitAssignment,
684    },
685    Reschedule {
686        reschedules: HashMap<FragmentId, Reschedule>,
687    },
688    ReplaceStreamJob {
689        plan: ReplaceStreamJobPlan,
690        resolved_split_assignment: SplitAssignment,
691    },
692    SourceChangeSplit {
693        split_assignment: SplitAssignment,
694    },
695    CreateSubscription {
696        subscription_id: SubscriptionId,
697    },
698    ConnectorPropsChange(ConnectorPropsChange),
699    ResumeBackfill {
700        target: ResumeBackfillTarget,
701    },
702}
703
704impl PostCollectCommand {
705    pub fn barrier() -> Self {
706        PostCollectCommand::Command("barrier".to_owned())
707    }
708
709    pub fn command_name(&self) -> &str {
710        match self {
711            PostCollectCommand::Command(name) => name.as_str(),
712            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
713            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
714            PostCollectCommand::Reschedule { .. } => "Reschedule",
715            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
716            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
717            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
718            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
719            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
720        }
721    }
722}
723
724impl Display for PostCollectCommand {
725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726        f.write_str(self.command_name())
727    }
728}
729
730#[derive(Debug, Clone)]
731pub enum BarrierKind {
732    Initial,
733    Barrier,
734    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
735    Checkpoint(Vec<u64>),
736}
737
738impl BarrierKind {
739    pub fn to_protobuf(&self) -> PbBarrierKind {
740        match self {
741            BarrierKind::Initial => PbBarrierKind::Initial,
742            BarrierKind::Barrier => PbBarrierKind::Barrier,
743            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
744        }
745    }
746
747    pub fn is_checkpoint(&self) -> bool {
748        matches!(self, BarrierKind::Checkpoint(_))
749    }
750
751    pub fn is_initial(&self) -> bool {
752        matches!(self, BarrierKind::Initial)
753    }
754
755    pub fn as_str_name(&self) -> &'static str {
756        match self {
757            BarrierKind::Initial => "Initial",
758            BarrierKind::Barrier => "Barrier",
759            BarrierKind::Checkpoint(_) => "Checkpoint",
760        }
761    }
762}
763
764/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
765/// [`Command`].
766pub(super) struct CommandContext {
767    mv_subscription_max_retention: HashMap<TableId, u64>,
768
769    pub(super) barrier_info: BarrierInfo,
770
771    pub(super) table_ids_to_commit: HashSet<TableId>,
772
773    pub(super) command: PostCollectCommand,
774
775    /// The tracing span of this command.
776    ///
777    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
778    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
779    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
780    _span: tracing::Span,
781}
782
783impl std::fmt::Debug for CommandContext {
784    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
785        f.debug_struct("CommandContext")
786            .field("barrier_info", &self.barrier_info)
787            .field("command", &self.command.command_name())
788            .finish()
789    }
790}
791
792impl CommandContext {
793    pub(super) fn new(
794        barrier_info: BarrierInfo,
795        mv_subscription_max_retention: HashMap<TableId, u64>,
796        table_ids_to_commit: HashSet<TableId>,
797        command: PostCollectCommand,
798        span: tracing::Span,
799    ) -> Self {
800        Self {
801            mv_subscription_max_retention,
802            barrier_info,
803            table_ids_to_commit,
804            command,
805            _span: span,
806        }
807    }
808
809    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
810        let Some(truncate_timestamptz) = Timestamptz::from_secs(
811            self.barrier_info
812                .prev_epoch
813                .value()
814                .as_timestamptz()
815                .timestamp()
816                - retention_second as i64,
817        ) else {
818            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
819            return self.barrier_info.prev_epoch.value();
820        };
821        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
822    }
823
824    pub(super) fn collect_commit_epoch_info(
825        &self,
826        info: &mut CommitEpochInfo,
827        resps: Vec<BarrierCompleteResponse>,
828        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
829    ) {
830        let (
831            sst_to_context,
832            synced_ssts,
833            new_table_watermarks,
834            old_value_ssts,
835            vector_index_adds,
836            truncate_tables,
837        ) = collect_resp_info(resps);
838
839        let new_table_fragment_infos =
840            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
841                assert!(!matches!(
842                    job_type,
843                    CreateStreamingJobType::SnapshotBackfill(_)
844                ));
845                let table_fragments = &info.stream_job_fragments;
846                let mut table_ids: HashSet<_> =
847                    table_fragments.internal_table_ids().into_iter().collect();
848                if let Some(mv_table_id) = table_fragments.mv_table_id() {
849                    table_ids.insert(mv_table_id);
850                }
851
852                vec![NewTableFragmentInfo { table_ids }]
853            } else {
854                vec![]
855            };
856
857        let mut mv_log_store_truncate_epoch = HashMap::new();
858        // TODO: may collect cross db snapshot backfill
859        let mut update_truncate_epoch =
860            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
861                Entry::Occupied(mut entry) => {
862                    let prev_truncate_epoch = entry.get_mut();
863                    if truncate_epoch < *prev_truncate_epoch {
864                        *prev_truncate_epoch = truncate_epoch;
865                    }
866                }
867                Entry::Vacant(entry) => {
868                    entry.insert(truncate_epoch);
869                }
870            };
871        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
872            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
873            update_truncate_epoch(*mv_table_id, truncate_epoch);
874        }
875        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
876            for mv_table_id in upstream_mv_table_ids {
877                update_truncate_epoch(mv_table_id, backfill_epoch);
878            }
879        }
880
881        let table_new_change_log = build_table_change_log_delta(
882            old_value_ssts.into_iter(),
883            synced_ssts.iter().map(|sst| &sst.sst_info),
884            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
885            mv_log_store_truncate_epoch.into_iter(),
886        );
887
888        let epoch = self.barrier_info.prev_epoch();
889        for table_id in &self.table_ids_to_commit {
890            info.tables_to_commit
891                .try_insert(*table_id, epoch)
892                .expect("non duplicate");
893        }
894
895        info.sstables.extend(synced_ssts);
896        info.new_table_watermarks.extend(new_table_watermarks);
897        info.sst_to_context.extend(sst_to_context);
898        info.new_table_fragment_infos
899            .extend(new_table_fragment_infos);
900        info.change_log_delta.extend(table_new_change_log);
901        for (table_id, vector_index_adds) in vector_index_adds {
902            info.vector_index_delta
903                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
904                .expect("non-duplicate");
905        }
906        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
907            && let Some(index_table) = collect_new_vector_index_info(job_info)
908        {
909            info.vector_index_delta
910                .try_insert(
911                    index_table.id,
912                    VectorIndexDelta::Init(PbVectorIndexInit {
913                        info: Some(index_table.vector_index_info.unwrap()),
914                    }),
915                )
916                .expect("non-duplicate");
917        }
918        info.truncate_tables.extend(truncate_tables);
919    }
920}
921
922impl Command {
923    /// Build the `Pause` mutation.
924    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
925        {
926            {
927                // Only pause when the cluster is not already paused.
928                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
929                if !is_currently_paused {
930                    Some(Mutation::Pause(PauseMutation {}))
931                } else {
932                    None
933                }
934            }
935        }
936    }
937
938    /// Build the `Resume` mutation.
939    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
940        {
941            {
942                // Only resume when the cluster is paused with the same reason.
943                if is_currently_paused {
944                    Some(Mutation::Resume(ResumeMutation {}))
945                } else {
946                    None
947                }
948            }
949        }
950    }
951
952    /// Build the `Splits` mutation for `SourceChangeSplit`.
953    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
954        {
955            {
956                let mut diff = HashMap::new();
957
958                for actor_splits in split_assignment.values() {
959                    diff.extend(actor_splits.clone());
960                }
961
962                Mutation::Splits(SourceChangeSplitMutation {
963                    actor_splits: build_actor_connector_splits(&diff),
964                })
965            }
966        }
967    }
968
969    /// Build the `Throttle` mutation.
970    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
971        {
972            {
973                let config = config.clone();
974                Mutation::Throttle(ThrottleMutation {
975                    fragment_throttle: config,
976                })
977            }
978        }
979    }
980
981    /// Build the `Stop` mutation for `DropStreamingJobs`.
982    pub(super) fn drop_streaming_jobs_to_mutation(
983        actors: &Vec<ActorId>,
984        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
985    ) -> Mutation {
986        {
987            Mutation::Stop(StopMutation {
988                actors: actors.clone(),
989                dropped_sink_fragments: dropped_sink_fragment_by_targets
990                    .values()
991                    .flatten()
992                    .cloned()
993                    .collect(),
994            })
995        }
996    }
997
998    /// Build the `Add` mutation for `CreateStreamingJob`.
999    pub(super) fn create_streaming_job_to_mutation(
1000        info: &CreateStreamingJobCommandInfo,
1001        job_type: &CreateStreamingJobType,
1002        is_currently_paused: bool,
1003        edges: &mut FragmentEdgeBuildResult,
1004        control_stream_manager: &ControlStreamManager,
1005        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1006        split_assignment: &SplitAssignment,
1007    ) -> MetaResult<Mutation> {
1008        {
1009            {
1010                let CreateStreamingJobCommandInfo {
1011                    stream_job_fragments,
1012                    upstream_fragment_downstreams,
1013                    fragment_backfill_ordering,
1014                    streaming_job,
1015                    ..
1016                } = info;
1017                let database_id = streaming_job.database_id();
1018                let added_actors = stream_job_fragments.actor_ids().collect();
1019                let actor_splits = split_assignment
1020                    .values()
1021                    .flat_map(build_actor_connector_splits)
1022                    .collect();
1023                let subscriptions_to_add =
1024                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1025                        job_type
1026                    {
1027                        snapshot_backfill_info
1028                            .upstream_mv_table_id_to_backfill_epoch
1029                            .keys()
1030                            .map(|table_id| SubscriptionUpstreamInfo {
1031                                subscriber_id: stream_job_fragments
1032                                    .stream_job_id()
1033                                    .as_subscriber_id(),
1034                                upstream_mv_table_id: *table_id,
1035                            })
1036                            .collect()
1037                    } else {
1038                        Default::default()
1039                    };
1040                let backfill_nodes_to_pause: Vec<_> =
1041                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1042                        .into_iter()
1043                        .collect();
1044
1045                let new_upstream_sinks =
1046                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1047                        sink_fragment_id,
1048                        sink_output_fields,
1049                        project_exprs,
1050                        new_sink_downstream,
1051                        ..
1052                    }) = job_type
1053                    {
1054                        let new_sink_actors = stream_job_fragments
1055                            .actors_to_create()
1056                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1057                            .exactly_one()
1058                            .map(|(_, _, actors)| {
1059                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1060                                    actor_id: actor.actor_id,
1061                                    host: Some(control_stream_manager.host_addr(worker_id)),
1062                                    partial_graph_id: to_partial_graph_id(database_id, None),
1063                                })
1064                            })
1065                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1066                        let new_upstream_sink = PbNewUpstreamSink {
1067                            info: Some(PbUpstreamSinkInfo {
1068                                upstream_fragment_id: *sink_fragment_id,
1069                                sink_output_schema: sink_output_fields.clone(),
1070                                project_exprs: project_exprs.clone(),
1071                            }),
1072                            upstream_actors: new_sink_actors.collect(),
1073                        };
1074                        HashMap::from([(
1075                            new_sink_downstream.downstream_fragment_id,
1076                            new_upstream_sink,
1077                        )])
1078                    } else {
1079                        HashMap::new()
1080                    };
1081
1082                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1083                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1084
1085                let add_mutation = AddMutation {
1086                    actor_dispatchers: edges
1087                        .dispatchers
1088                        .extract_if(|fragment_id, _| {
1089                            upstream_fragment_downstreams.contains_key(fragment_id)
1090                        })
1091                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1092                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1093                        .collect(),
1094                    added_actors,
1095                    actor_splits,
1096                    // If the cluster is already paused, the new actors should be paused too.
1097                    pause: is_currently_paused,
1098                    subscriptions_to_add,
1099                    backfill_nodes_to_pause,
1100                    actor_cdc_table_snapshot_splits,
1101                    new_upstream_sinks,
1102                };
1103
1104                Ok(Mutation::Add(add_mutation))
1105            }
1106        }
1107    }
1108
1109    /// Build the `Update` mutation for `ReplaceStreamJob`.
1110    pub(super) fn replace_stream_job_to_mutation(
1111        ReplaceStreamJobPlan {
1112            old_fragments,
1113            replace_upstream,
1114            upstream_fragment_downstreams,
1115            auto_refresh_schema_sinks,
1116            ..
1117        }: &ReplaceStreamJobPlan,
1118        edges: &mut FragmentEdgeBuildResult,
1119        database_info: &mut InflightDatabaseInfo,
1120        split_assignment: &SplitAssignment,
1121    ) -> MetaResult<Option<Mutation>> {
1122        {
1123            {
1124                let merge_updates = edges
1125                    .merge_updates
1126                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1127                    .collect();
1128                let dispatchers = edges
1129                    .dispatchers
1130                    .extract_if(|fragment_id, _| {
1131                        upstream_fragment_downstreams.contains_key(fragment_id)
1132                    })
1133                    .collect();
1134                let actor_cdc_table_snapshot_splits = database_info
1135                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1136                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1137                Ok(Self::generate_update_mutation_for_replace_table(
1138                    old_fragments.actor_ids().chain(
1139                        auto_refresh_schema_sinks
1140                            .as_ref()
1141                            .into_iter()
1142                            .flat_map(|sinks| {
1143                                sinks.iter().flat_map(|sink| {
1144                                    sink.original_fragment
1145                                        .actors
1146                                        .iter()
1147                                        .map(|actor| actor.actor_id)
1148                                })
1149                            }),
1150                    ),
1151                    merge_updates,
1152                    dispatchers,
1153                    split_assignment,
1154                    actor_cdc_table_snapshot_splits,
1155                    auto_refresh_schema_sinks.as_ref(),
1156                ))
1157            }
1158        }
1159    }
1160
1161    /// Build the `Update` mutation for `RescheduleIntent`.
1162    pub(super) fn reschedule_to_mutation(
1163        reschedules: &HashMap<FragmentId, Reschedule>,
1164        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1165        control_stream_manager: &ControlStreamManager,
1166        database_info: &mut InflightDatabaseInfo,
1167    ) -> MetaResult<Option<Mutation>> {
1168        {
1169            {
1170                let database_id = database_info.database_id;
1171                let mut dispatcher_update = HashMap::new();
1172                for reschedule in reschedules.values() {
1173                    for &(upstream_fragment_id, dispatcher_id) in
1174                        &reschedule.upstream_fragment_dispatcher_ids
1175                    {
1176                        // Find the actors of the upstream fragment.
1177                        let upstream_actor_ids = fragment_actors
1178                            .get(&upstream_fragment_id)
1179                            .expect("should contain");
1180
1181                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1182
1183                        // Record updates for all actors.
1184                        for &actor_id in upstream_actor_ids {
1185                            let added_downstream_actor_id = if upstream_reschedule
1186                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1187                                .unwrap_or(true)
1188                            {
1189                                reschedule
1190                                    .added_actors
1191                                    .values()
1192                                    .flatten()
1193                                    .cloned()
1194                                    .collect()
1195                            } else {
1196                                Default::default()
1197                            };
1198                            // Index with the dispatcher id to check duplicates.
1199                            dispatcher_update
1200                                .try_insert(
1201                                    (actor_id, dispatcher_id),
1202                                    DispatcherUpdate {
1203                                        actor_id,
1204                                        dispatcher_id,
1205                                        hash_mapping: reschedule
1206                                            .upstream_dispatcher_mapping
1207                                            .as_ref()
1208                                            .map(|m| m.to_protobuf()),
1209                                        added_downstream_actor_id,
1210                                        removed_downstream_actor_id: reschedule
1211                                            .removed_actors
1212                                            .iter()
1213                                            .cloned()
1214                                            .collect(),
1215                                    },
1216                                )
1217                                .unwrap();
1218                        }
1219                    }
1220                }
1221                let dispatcher_update = dispatcher_update.into_values().collect();
1222
1223                let mut merge_update = HashMap::new();
1224                for (&fragment_id, reschedule) in reschedules {
1225                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1226                        // Find the actors of the downstream fragment.
1227                        let downstream_actor_ids = fragment_actors
1228                            .get(&downstream_fragment_id)
1229                            .expect("should contain");
1230
1231                        // Downstream removed actors should be skipped
1232                        // Newly created actors of the current fragment will not dispatch Update
1233                        // barriers to them
1234                        let downstream_removed_actors: HashSet<_> = reschedules
1235                            .get(&downstream_fragment_id)
1236                            .map(|downstream_reschedule| {
1237                                downstream_reschedule
1238                                    .removed_actors
1239                                    .iter()
1240                                    .copied()
1241                                    .collect()
1242                            })
1243                            .unwrap_or_default();
1244
1245                        // Record updates for all actors.
1246                        for &actor_id in downstream_actor_ids {
1247                            if downstream_removed_actors.contains(&actor_id) {
1248                                continue;
1249                            }
1250
1251                            // Index with the fragment id to check duplicates.
1252                            merge_update
1253                                .try_insert(
1254                                    (actor_id, fragment_id),
1255                                    MergeUpdate {
1256                                        actor_id,
1257                                        upstream_fragment_id: fragment_id,
1258                                        new_upstream_fragment_id: None,
1259                                        added_upstream_actors: reschedule
1260                                            .added_actors
1261                                            .iter()
1262                                            .flat_map(|(worker_id, actors)| {
1263                                                let host =
1264                                                    control_stream_manager.host_addr(*worker_id);
1265                                                actors.iter().map(move |&actor_id| PbActorInfo {
1266                                                    actor_id,
1267                                                    host: Some(host.clone()),
1268                                                    // we assume that we only scale the partial graph of database
1269                                                    partial_graph_id: to_partial_graph_id(
1270                                                        database_id,
1271                                                        None,
1272                                                    ),
1273                                                })
1274                                            })
1275                                            .collect(),
1276                                        removed_upstream_actor_id: reschedule
1277                                            .removed_actors
1278                                            .iter()
1279                                            .cloned()
1280                                            .collect(),
1281                                    },
1282                                )
1283                                .unwrap();
1284                        }
1285                    }
1286                }
1287                let merge_update = merge_update.into_values().collect();
1288
1289                let mut actor_vnode_bitmap_update = HashMap::new();
1290                for reschedule in reschedules.values() {
1291                    // Record updates for all actors in this fragment.
1292                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1293                        let bitmap = bitmap.to_protobuf();
1294                        actor_vnode_bitmap_update
1295                            .try_insert(actor_id, bitmap)
1296                            .unwrap();
1297                    }
1298                }
1299                let dropped_actors = reschedules
1300                    .values()
1301                    .flat_map(|r| r.removed_actors.iter().copied())
1302                    .collect();
1303                let mut actor_splits = HashMap::new();
1304                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1305                for (fragment_id, reschedule) in reschedules {
1306                    for (actor_id, splits) in &reschedule.actor_splits {
1307                        actor_splits.insert(
1308                            *actor_id,
1309                            ConnectorSplits {
1310                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1311                            },
1312                        );
1313                    }
1314
1315                    if let Some(assignment) =
1316                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1317                    {
1318                        actor_cdc_table_snapshot_splits.extend(assignment)
1319                    }
1320                }
1321
1322                // we don't create dispatchers in reschedule scenario
1323                let actor_new_dispatchers = HashMap::new();
1324                let mutation = Mutation::Update(UpdateMutation {
1325                    dispatcher_update,
1326                    merge_update,
1327                    actor_vnode_bitmap_update,
1328                    dropped_actors,
1329                    actor_splits,
1330                    actor_new_dispatchers,
1331                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1332                        splits: actor_cdc_table_snapshot_splits,
1333                    }),
1334                    sink_schema_change: Default::default(),
1335                    subscriptions_to_drop: vec![],
1336                });
1337                tracing::debug!("update mutation: {mutation:?}");
1338                Ok(Some(mutation))
1339            }
1340        }
1341    }
1342
1343    /// Build the `Add` mutation for `CreateSubscription`.
1344    pub(super) fn create_subscription_to_mutation(
1345        upstream_mv_table_id: TableId,
1346        subscription_id: SubscriptionId,
1347    ) -> Mutation {
1348        {
1349            Mutation::Add(AddMutation {
1350                actor_dispatchers: Default::default(),
1351                added_actors: vec![],
1352                actor_splits: Default::default(),
1353                pause: false,
1354                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1355                    upstream_mv_table_id,
1356                    subscriber_id: subscription_id.as_subscriber_id(),
1357                }],
1358                backfill_nodes_to_pause: vec![],
1359                actor_cdc_table_snapshot_splits: None,
1360                new_upstream_sinks: Default::default(),
1361            })
1362        }
1363    }
1364
1365    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1366    pub(super) fn drop_subscription_to_mutation(
1367        upstream_mv_table_id: TableId,
1368        subscription_id: SubscriptionId,
1369    ) -> Mutation {
1370        {
1371            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1372                info: vec![SubscriptionUpstreamInfo {
1373                    subscriber_id: subscription_id.as_subscriber_id(),
1374                    upstream_mv_table_id,
1375                }],
1376            })
1377        }
1378    }
1379
1380    /// Build the `ConnectorPropsChange` mutation.
1381    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1382        {
1383            {
1384                let mut connector_props_infos = HashMap::default();
1385                for (k, v) in config {
1386                    connector_props_infos.insert(
1387                        k.as_raw_id(),
1388                        ConnectorPropsInfo {
1389                            connector_props_info: v.clone(),
1390                        },
1391                    );
1392                }
1393                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1394                    connector_props_infos,
1395                })
1396            }
1397        }
1398    }
1399
1400    /// Build the `RefreshStart` mutation.
1401    pub(super) fn refresh_to_mutation(
1402        table_id: TableId,
1403        associated_source_id: SourceId,
1404    ) -> Mutation {
1405        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1406            table_id,
1407            associated_source_id,
1408        })
1409    }
1410
1411    /// Build the `ListFinish` mutation.
1412    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1413        Mutation::ListFinish(ListFinishMutation {
1414            associated_source_id,
1415        })
1416    }
1417
1418    /// Build the `LoadFinish` mutation.
1419    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1420        Mutation::LoadFinish(LoadFinishMutation {
1421            associated_source_id,
1422        })
1423    }
1424
1425    /// Build the `ResetSource` mutation.
1426    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1427        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1428            source_id: source_id.as_raw_id(),
1429        })
1430    }
1431
1432    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1433    pub(super) fn resume_backfill_to_mutation(
1434        target: &ResumeBackfillTarget,
1435        database_info: &InflightDatabaseInfo,
1436    ) -> MetaResult<Option<Mutation>> {
1437        {
1438            {
1439                let fragment_ids: HashSet<_> = match target {
1440                    ResumeBackfillTarget::Job(job_id) => {
1441                        database_info.backfill_fragment_ids_for_job(*job_id)?
1442                    }
1443                    ResumeBackfillTarget::Fragment(fragment_id) => {
1444                        if !database_info.is_backfill_fragment(*fragment_id)? {
1445                            return Err(MetaError::invalid_parameter(format!(
1446                                "fragment {} is not a backfill node",
1447                                fragment_id
1448                            )));
1449                        }
1450                        HashSet::from([*fragment_id])
1451                    }
1452                };
1453                if fragment_ids.is_empty() {
1454                    warn!(
1455                        ?target,
1456                        "resume backfill command ignored because no backfill fragments found"
1457                    );
1458                    Ok(None)
1459                } else {
1460                    Ok(Some(Mutation::StartFragmentBackfill(
1461                        StartFragmentBackfillMutation {
1462                            fragment_ids: fragment_ids.into_iter().collect(),
1463                        },
1464                    )))
1465                }
1466            }
1467        }
1468    }
1469
1470    /// Build the `InjectSourceOffsets` mutation.
1471    pub(super) fn inject_source_offsets_to_mutation(
1472        source_id: SourceId,
1473        split_offsets: &HashMap<String, String>,
1474    ) -> Mutation {
1475        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1476            source_id: source_id.as_raw_id(),
1477            split_offsets: split_offsets.clone(),
1478        })
1479    }
1480
1481    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1482    pub(super) fn create_streaming_job_actors_to_create(
1483        info: &CreateStreamingJobCommandInfo,
1484        edges: &mut FragmentEdgeBuildResult,
1485    ) -> StreamJobActorsToCreate {
1486        {
1487            {
1488                let actors_to_create = info.stream_job_fragments.actors_to_create();
1489                edges.collect_actors_to_create(actors_to_create.map(
1490                    |(fragment_id, node, actors)| {
1491                        (
1492                            fragment_id,
1493                            node,
1494                            actors,
1495                            [], // no subscriber for new job to create
1496                        )
1497                    },
1498                ))
1499            }
1500        }
1501    }
1502
1503    /// Collect actors to create for `RescheduleIntent`.
1504    pub(super) fn reschedule_actors_to_create(
1505        reschedules: &HashMap<FragmentId, Reschedule>,
1506        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1507        database_info: &InflightDatabaseInfo,
1508        control_stream_manager: &ControlStreamManager,
1509    ) -> StreamJobActorsToCreate {
1510        {
1511            {
1512                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1513                    reschedules.iter().map(|(fragment_id, reschedule)| {
1514                        (
1515                            *fragment_id,
1516                            reschedule.newly_created_actors.values().map(
1517                                |((actor, dispatchers), _)| {
1518                                    (actor.actor_id, dispatchers.as_slice())
1519                                },
1520                            ),
1521                        )
1522                    }),
1523                    Some((reschedules, fragment_actors)),
1524                    database_info,
1525                    control_stream_manager,
1526                );
1527                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1528                for (fragment_id, (actor, dispatchers), worker_id) in
1529                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1530                        reschedule
1531                            .newly_created_actors
1532                            .values()
1533                            .map(|(actors, status)| (*fragment_id, actors, status))
1534                    })
1535                {
1536                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1537                    map.entry(*worker_id)
1538                        .or_default()
1539                        .entry(fragment_id)
1540                        .or_insert_with(|| {
1541                            let node = database_info.fragment(fragment_id).nodes.clone();
1542                            let subscribers =
1543                                database_info.fragment_subscribers(fragment_id).collect();
1544                            (node, vec![], subscribers)
1545                        })
1546                        .1
1547                        .push((actor.clone(), upstreams, dispatchers.clone()));
1548                }
1549                map
1550            }
1551        }
1552    }
1553
1554    /// Collect actors to create for `ReplaceStreamJob`.
1555    pub(super) fn replace_stream_job_actors_to_create(
1556        replace_table: &ReplaceStreamJobPlan,
1557        edges: &mut FragmentEdgeBuildResult,
1558        database_info: &InflightDatabaseInfo,
1559    ) -> StreamJobActorsToCreate {
1560        {
1561            {
1562                let mut actors = edges.collect_actors_to_create(
1563                    replace_table.new_fragments.actors_to_create().map(
1564                        |(fragment_id, node, actors)| {
1565                            (
1566                                fragment_id,
1567                                node,
1568                                actors,
1569                                database_info
1570                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1571                            )
1572                        },
1573                    ),
1574                );
1575                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1576                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1577                        (
1578                            sink.new_fragment.fragment_id,
1579                            &sink.new_fragment.nodes,
1580                            sink.new_fragment.actors.iter().map(|actor| {
1581                                (
1582                                    actor,
1583                                    sink.actor_status[&actor.actor_id]
1584                                        .location
1585                                        .as_ref()
1586                                        .unwrap()
1587                                        .worker_node_id,
1588                                )
1589                            }),
1590                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1591                        )
1592                    }));
1593                    for (worker_id, fragment_actors) in sink_actors {
1594                        actors.entry(worker_id).or_default().extend(fragment_actors);
1595                    }
1596                }
1597                actors
1598            }
1599        }
1600    }
1601
1602    fn generate_update_mutation_for_replace_table(
1603        dropped_actors: impl IntoIterator<Item = ActorId>,
1604        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1605        dispatchers: FragmentActorDispatchers,
1606        split_assignment: &SplitAssignment,
1607        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1608        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1609    ) -> Option<Mutation> {
1610        let dropped_actors = dropped_actors.into_iter().collect();
1611
1612        let actor_new_dispatchers = dispatchers
1613            .into_values()
1614            .flatten()
1615            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1616            .collect();
1617
1618        let actor_splits = split_assignment
1619            .values()
1620            .flat_map(build_actor_connector_splits)
1621            .collect();
1622        Some(Mutation::Update(UpdateMutation {
1623            actor_new_dispatchers,
1624            merge_update: merge_updates.into_values().flatten().collect(),
1625            dropped_actors,
1626            actor_splits,
1627            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1628            sink_schema_change: auto_refresh_schema_sinks
1629                .as_ref()
1630                .into_iter()
1631                .flat_map(|sinks| {
1632                    sinks.iter().map(|sink| {
1633                        (
1634                            sink.original_sink.id.as_raw_id(),
1635                            PbSinkSchemaChange {
1636                                original_schema: sink
1637                                    .original_sink
1638                                    .columns
1639                                    .iter()
1640                                    .map(|col| PbField {
1641                                        data_type: Some(
1642                                            col.column_desc
1643                                                .as_ref()
1644                                                .unwrap()
1645                                                .column_type
1646                                                .as_ref()
1647                                                .unwrap()
1648                                                .clone(),
1649                                        ),
1650                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1651                                    })
1652                                    .collect(),
1653                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1654                                    fields: sink
1655                                        .newly_add_fields
1656                                        .iter()
1657                                        .map(|field| field.to_prost())
1658                                        .collect(),
1659                                })),
1660                            },
1661                        )
1662                    })
1663                })
1664                .collect(),
1665            ..Default::default()
1666        }))
1667    }
1668}
1669
1670impl Command {
1671    #[expect(clippy::type_complexity)]
1672    pub(super) fn collect_database_partial_graph_actor_upstreams(
1673        actor_dispatchers: impl Iterator<
1674            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1675        >,
1676        reschedule_dispatcher_update: Option<(
1677            &HashMap<FragmentId, Reschedule>,
1678            &HashMap<FragmentId, HashSet<ActorId>>,
1679        )>,
1680        database_info: &InflightDatabaseInfo,
1681        control_stream_manager: &ControlStreamManager,
1682    ) -> HashMap<ActorId, ActorUpstreams> {
1683        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1684        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1685            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1686            for (upstream_actor_id, dispatchers) in upstream_actors {
1687                let upstream_actor_location =
1688                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1689                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1690                for downstream_actor_id in dispatchers
1691                    .iter()
1692                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1693                {
1694                    actor_upstreams
1695                        .entry(*downstream_actor_id)
1696                        .or_default()
1697                        .entry(upstream_fragment_id)
1698                        .or_default()
1699                        .insert(
1700                            upstream_actor_id,
1701                            PbActorInfo {
1702                                actor_id: upstream_actor_id,
1703                                host: Some(upstream_actor_host.clone()),
1704                                partial_graph_id: to_partial_graph_id(
1705                                    database_info.database_id,
1706                                    None,
1707                                ),
1708                            },
1709                        );
1710                }
1711            }
1712        }
1713        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1714            for reschedule in reschedules.values() {
1715                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1716                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1717                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1718                    for upstream_actor_id in fragment_actors
1719                        .get(upstream_fragment_id)
1720                        .expect("should exist")
1721                    {
1722                        let upstream_actor_location =
1723                            upstream_fragment.actors[upstream_actor_id].worker_id;
1724                        let upstream_actor_host =
1725                            control_stream_manager.host_addr(upstream_actor_location);
1726                        if let Some(upstream_reschedule) = upstream_reschedule
1727                            && upstream_reschedule
1728                                .removed_actors
1729                                .contains(upstream_actor_id)
1730                        {
1731                            continue;
1732                        }
1733                        for (_, downstream_actor_id) in
1734                            reschedule
1735                                .added_actors
1736                                .iter()
1737                                .flat_map(|(worker_id, actors)| {
1738                                    actors.iter().map(|actor| (*worker_id, *actor))
1739                                })
1740                        {
1741                            actor_upstreams
1742                                .entry(downstream_actor_id)
1743                                .or_default()
1744                                .entry(*upstream_fragment_id)
1745                                .or_default()
1746                                .insert(
1747                                    *upstream_actor_id,
1748                                    PbActorInfo {
1749                                        actor_id: *upstream_actor_id,
1750                                        host: Some(upstream_actor_host.clone()),
1751                                        partial_graph_id: to_partial_graph_id(
1752                                            database_info.database_id,
1753                                            None,
1754                                        ),
1755                                    },
1756                                );
1757                        }
1758                    }
1759                }
1760            }
1761        }
1762        actor_upstreams
1763    }
1764}