risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49    PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50    StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51    UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::partial_graph::PartialGraphBarrierInfo;
61use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
62use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
63use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
64use crate::controller::scale::LoadedFragmentContext;
65use crate::controller::utils::StreamingJobExtraInfo;
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
71    StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::{
74    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
75    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
76    build_actor_connector_splits,
77};
78use crate::{MetaError, MetaResult};
79
80/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
81/// used for actor scaling or migration.
82#[derive(Debug, Clone)]
83pub struct Reschedule {
84    /// Added actors in this fragment.
85    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
86
87    /// Removed actors in this fragment.
88    pub removed_actors: HashSet<ActorId>,
89
90    /// Vnode bitmap updates for some actors in this fragment.
91    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
92
93    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
94    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
95    /// New hash mapping of the upstream dispatcher to be updated.
96    ///
97    /// This field exists only when there's upstream fragment and the current fragment is
98    /// hash-sharded.
99    pub upstream_dispatcher_mapping: Option<ActorMapping>,
100
101    /// The downstream fragments of this fragment.
102    pub downstream_fragment_ids: Vec<FragmentId>,
103
104    /// Reassigned splits for source actors.
105    /// It becomes the `actor_splits` in [`UpdateMutation`].
106    /// `Source` and `SourceBackfill` are handled together here.
107    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
108
109    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
110}
111
112#[derive(Debug, Clone)]
113pub struct ReschedulePlan {
114    pub reschedules: HashMap<FragmentId, Reschedule>,
115    /// Should contain the actor ids in upstream and downstream fragments referenced by
116    /// `reschedules`.
117    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
118}
119
120/// Preloaded context for rescheduling, built outside the barrier worker.
121#[derive(Debug, Clone)]
122pub struct RescheduleContext {
123    pub loaded: LoadedFragmentContext,
124    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
125    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
128}
129
130impl RescheduleContext {
131    pub fn empty() -> Self {
132        Self {
133            loaded: LoadedFragmentContext::default(),
134            job_extra_info: HashMap::new(),
135            upstream_fragments: HashMap::new(),
136            downstream_fragments: HashMap::new(),
137            downstream_relations: HashMap::new(),
138        }
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.loaded.is_empty()
143    }
144
145    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
146        let loaded = self.loaded.for_database(database_id)?;
147        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
148        // Use the filtered loaded context as the source of truth so every side map is pruned by
149        // the same fragment set.
150        let fragment_ids: HashSet<FragmentId> = loaded
151            .job_fragments
152            .values()
153            .flat_map(|fragments| fragments.keys().copied())
154            .collect();
155
156        let job_extra_info = self
157            .job_extra_info
158            .iter()
159            .filter(|(job_id, _)| job_ids.contains(*job_id))
160            .map(|(job_id, info)| (*job_id, info.clone()))
161            .collect();
162
163        let upstream_fragments = self
164            .upstream_fragments
165            .iter()
166            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
167            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
168            .collect();
169
170        let downstream_fragments = self
171            .downstream_fragments
172            .iter()
173            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
174            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
175            .collect();
176
177        let downstream_relations = self
178            .downstream_relations
179            .iter()
180            // Ownership of this map is source-fragment based. We keep all downstream edges for
181            // selected sources because the target side can still be referenced during dispatcher
182            // reconstruction even if that target fragment is not being rescheduled.
183            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
184            .map(|(key, relation)| (*key, relation.clone()))
185            .collect();
186
187        Some(Self {
188            loaded,
189            job_extra_info,
190            upstream_fragments,
191            downstream_fragments,
192            downstream_relations,
193        })
194    }
195
196    /// Split this context into per-database contexts without cloning the large loaded graph
197    /// payloads.
198    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
199        let Self {
200            loaded,
201            job_extra_info,
202            upstream_fragments,
203            downstream_fragments,
204            downstream_relations,
205        } = self;
206
207        let mut contexts: HashMap<_, _> = loaded
208            .into_database_contexts()
209            .into_iter()
210            .map(|(database_id, loaded)| {
211                (
212                    database_id,
213                    Self {
214                        loaded,
215                        job_extra_info: HashMap::new(),
216                        upstream_fragments: HashMap::new(),
217                        downstream_fragments: HashMap::new(),
218                        downstream_relations: HashMap::new(),
219                    },
220                )
221            })
222            .collect();
223
224        if contexts.is_empty() {
225            return contexts;
226        }
227
228        let mut job_databases = HashMap::new();
229        let mut fragment_databases = HashMap::new();
230        for (&database_id, context) in &contexts {
231            for job_id in context.loaded.job_map.keys().copied() {
232                job_databases.insert(job_id, database_id);
233            }
234            for fragment_id in context
235                .loaded
236                .job_fragments
237                .values()
238                .flat_map(|fragments| fragments.keys().copied())
239            {
240                fragment_databases.insert(fragment_id, database_id);
241            }
242        }
243
244        for (job_id, info) in job_extra_info {
245            if let Some(database_id) = job_databases.get(&job_id).copied() {
246                contexts
247                    .get_mut(&database_id)
248                    .expect("database context should exist for job")
249                    .job_extra_info
250                    .insert(job_id, info);
251            }
252        }
253
254        for (fragment_id, upstreams) in upstream_fragments {
255            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
256                contexts
257                    .get_mut(&database_id)
258                    .expect("database context should exist for fragment")
259                    .upstream_fragments
260                    .insert(fragment_id, upstreams);
261            }
262        }
263
264        for (fragment_id, downstreams) in downstream_fragments {
265            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
266                contexts
267                    .get_mut(&database_id)
268                    .expect("database context should exist for fragment")
269                    .downstream_fragments
270                    .insert(fragment_id, downstreams);
271            }
272        }
273
274        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
275            // Route by source fragment ownership. A target may be outside of current reschedule
276            // set, but this edge still belongs to the source-side command.
277            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
278                contexts
279                    .get_mut(&database_id)
280                    .expect("database context should exist for relation source")
281                    .downstream_relations
282                    .insert((source_fragment_id, target_fragment_id), relation);
283            }
284        }
285
286        contexts
287    }
288}
289
290/// Replacing an old job with a new one. All actors in the job will be rebuilt.
291///
292/// Current use cases:
293/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
294/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
295///   will replace a table job's plan.
296#[derive(Debug, Clone)]
297pub struct ReplaceStreamJobPlan {
298    pub old_fragments: StreamJobFragments,
299    pub new_fragments: StreamJobFragmentsToCreate,
300    /// The resource group of the database this job belongs to.
301    pub database_resource_group: String,
302    /// Downstream jobs of the replaced job need to update their `Merge` node to
303    /// connect to the new fragment.
304    pub replace_upstream: FragmentReplaceUpstream,
305    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
306    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
307    /// inside the barrier worker.
308    pub split_plan: ReplaceJobSplitPlan,
309    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
310    pub streaming_job: StreamingJob,
311    /// The `streaming_job::Model` for this job, loaded from meta store.
312    pub streaming_job_model: streaming_job::Model,
313    /// The temporary dummy job fragments id of new table fragment
314    pub tmp_id: JobId,
315    /// The state table ids to be dropped.
316    pub to_drop_state_table_ids: Vec<TableId>,
317    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
318}
319
320impl ReplaceStreamJobPlan {
321    /// `old_fragment_id` -> `new_fragment_id`
322    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
323        let mut fragment_replacements = HashMap::new();
324        for (upstream_fragment_id, new_upstream_fragment_id) in
325            self.replace_upstream.values().flatten()
326        {
327            {
328                let r =
329                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
330                if let Some(r) = r {
331                    assert_eq!(
332                        *new_upstream_fragment_id, r,
333                        "one fragment is replaced by multiple fragments"
334                    );
335                }
336            }
337        }
338        fragment_replacements
339    }
340}
341
342#[derive(educe::Educe, Clone)]
343#[educe(Debug)]
344pub struct CreateStreamingJobCommandInfo {
345    #[educe(Debug(ignore))]
346    pub stream_job_fragments: StreamJobFragmentsToCreate,
347    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
348    /// The resource group of the database this job belongs to.
349    pub database_resource_group: String,
350    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
351    pub init_split_assignment: SourceSplitAssignment,
352    pub definition: String,
353    pub job_type: StreamingJobType,
354    pub create_type: CreateType,
355    pub streaming_job: StreamingJob,
356    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
357    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
358    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
359    pub is_serverless: bool,
360    /// The `streaming_job::Model` for this job, loaded from meta store.
361    pub streaming_job_model: streaming_job::Model,
362}
363
364impl StreamJobFragments {
365    /// Build fragment-level info for new fragments, populating actor infos from the
366    /// rendered actors and their locations, and applying split assignment to actor splits.
367    pub(super) fn new_fragment_info<'a>(
368        &'a self,
369        stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
370        actor_location: &'a HashMap<ActorId, WorkerId>,
371        assignment: &'a SplitAssignment,
372    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
373        self.fragments.values().map(|fragment| {
374            (
375                fragment.fragment_id,
376                InflightFragmentInfo {
377                    fragment_id: fragment.fragment_id,
378                    distribution_type: fragment.distribution_type.into(),
379                    fragment_type_mask: fragment.fragment_type_mask,
380                    vnode_count: fragment.vnode_count(),
381                    nodes: fragment.nodes.clone(),
382                    actors: stream_actors
383                        .get(&fragment.fragment_id)
384                        .into_iter()
385                        .flatten()
386                        .map(|actor| {
387                            (
388                                actor.actor_id,
389                                InflightActorInfo {
390                                    worker_id: actor_location[&actor.actor_id],
391                                    vnode_bitmap: actor.vnode_bitmap.clone(),
392                                    splits: assignment
393                                        .get(&fragment.fragment_id)
394                                        .and_then(|s| s.get(&actor.actor_id))
395                                        .cloned()
396                                        .unwrap_or_default(),
397                                },
398                            )
399                        })
400                        .collect(),
401                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
402                },
403            )
404        })
405    }
406}
407
408#[derive(Debug, Clone)]
409pub struct SnapshotBackfillInfo {
410    /// `table_id` -> `Some(snapshot_backfill_epoch)`
411    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
412    /// by global barrier worker when handling the command.
413    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
414}
415
416#[derive(Debug, Clone)]
417pub enum CreateStreamingJobType {
418    Normal,
419    SinkIntoTable(UpstreamSinkInfo),
420    SnapshotBackfill(SnapshotBackfillInfo),
421}
422
423/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
424/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
425/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
426// FIXME: this enum is significantly large on stack, box it
427#[derive(Debug)]
428pub enum Command {
429    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
430    /// all messages before the checkpoint barrier should have been committed.
431    Flush,
432
433    /// `Pause` command generates a `Pause` barrier **only if**
434    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
435    Pause,
436
437    /// `Resume` command generates a `Resume` barrier **only
438    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
439    /// will be generated.
440    Resume,
441
442    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
443    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
444    /// dropped by reference counts before.
445    ///
446    /// Barriers from the actors to be dropped will STILL be collected.
447    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
448    /// drop actors, and then delete the job fragments info from meta store.
449    DropStreamingJobs {
450        streaming_job_ids: HashSet<JobId>,
451        /// Used by recovery quick path when draining buffered drop/cancel commands.
452        unregistered_state_table_ids: HashSet<TableId>,
453        unregistered_fragment_ids: HashSet<FragmentId>,
454        // target_fragment -> [sink_fragments]
455        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
456    },
457
458    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
459    ///
460    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
461    /// be collected since the barrier should be passthrough.
462    ///
463    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
464    /// it adds the job fragments info to meta store. However, the creating progress will **last
465    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
466    /// will be set to `Created`.
467    CreateStreamingJob {
468        info: CreateStreamingJobCommandInfo,
469        job_type: CreateStreamingJobType,
470        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
471    },
472
473    /// Reschedule context. It must be resolved inside the barrier worker before injection.
474    RescheduleIntent {
475        context: RescheduleContext,
476        /// Filled by the barrier worker after resolving `context` against current worker topology.
477        ///
478        /// We keep unresolved `context` outside of checkpoint state and only materialize this
479        /// execution plan right before injection, then drop `context` to release memory earlier.
480        reschedule_plan: Option<ReschedulePlan>,
481    },
482
483    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
484    /// essentially switching the downstream of the old job fragments to the new ones, and
485    /// dropping the old job fragments. Used for schema change.
486    ///
487    /// This can be treated as a special case of reschedule, while the upstream fragment
488    /// of the Merge executors are changed additionally.
489    ReplaceStreamJob(ReplaceStreamJobPlan),
490
491    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
492    /// changed splits.
493    SourceChangeSplit(SplitState),
494
495    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
496    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
497    Throttle {
498        jobs: HashSet<JobId>,
499        config: HashMap<FragmentId, ThrottleConfig>,
500    },
501
502    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
503    /// materialize executor to start storing old value for subscription.
504    CreateSubscription {
505        subscription_id: SubscriptionId,
506        upstream_mv_table_id: TableId,
507        retention_second: u64,
508    },
509
510    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
511    /// materialize executor to stop storing old value when there is no
512    /// subscription depending on it.
513    DropSubscription {
514        subscription_id: SubscriptionId,
515        upstream_mv_table_id: TableId,
516    },
517
518    /// `AlterSubscriptionRetention` command updates the subscription retention time.
519    AlterSubscriptionRetention {
520        subscription_id: SubscriptionId,
521        upstream_mv_table_id: TableId,
522        retention_second: u64,
523    },
524
525    ConnectorPropsChange(ConnectorPropsChange),
526
527    /// `Refresh` command generates a barrier to refresh a table by truncating state
528    /// and reloading data from source.
529    Refresh {
530        table_id: TableId,
531        associated_source_id: SourceId,
532    },
533    ListFinish {
534        table_id: TableId,
535        associated_source_id: SourceId,
536    },
537    LoadFinish {
538        table_id: TableId,
539        associated_source_id: SourceId,
540    },
541
542    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
543    /// Used when upstream binlog/oplog has expired.
544    ResetSource {
545        source_id: SourceId,
546    },
547
548    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
549    /// to resume for troubleshooting.
550    ResumeBackfill {
551        target: ResumeBackfillTarget,
552    },
553
554    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
555    /// into source splits (UNSAFE - admin only).
556    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
557    InjectSourceOffsets {
558        source_id: SourceId,
559        /// Split ID -> offset (JSON-encoded based on connector type)
560        split_offsets: HashMap<String, String>,
561    },
562}
563
564#[derive(Debug, Clone, Copy)]
565pub enum ResumeBackfillTarget {
566    Job(JobId),
567    Fragment(FragmentId),
568}
569
570// For debugging and observability purposes. Can add more details later if needed.
571impl std::fmt::Display for Command {
572    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
573        match self {
574            Command::Flush => write!(f, "Flush"),
575            Command::Pause => write!(f, "Pause"),
576            Command::Resume => write!(f, "Resume"),
577            Command::DropStreamingJobs {
578                streaming_job_ids, ..
579            } => {
580                write!(
581                    f,
582                    "DropStreamingJobs: {}",
583                    streaming_job_ids.iter().sorted().join(", ")
584                )
585            }
586            Command::CreateStreamingJob { info, .. } => {
587                write!(f, "CreateStreamingJob: {}", info.streaming_job)
588            }
589            Command::RescheduleIntent {
590                reschedule_plan, ..
591            } => {
592                if reschedule_plan.is_some() {
593                    write!(f, "RescheduleIntent(planned)")
594                } else {
595                    write!(f, "RescheduleIntent")
596                }
597            }
598            Command::ReplaceStreamJob(plan) => {
599                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
600            }
601            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
602            Command::Throttle { .. } => write!(f, "Throttle"),
603            Command::CreateSubscription {
604                subscription_id, ..
605            } => write!(f, "CreateSubscription: {subscription_id}"),
606            Command::DropSubscription {
607                subscription_id, ..
608            } => write!(f, "DropSubscription: {subscription_id}"),
609            Command::AlterSubscriptionRetention {
610                subscription_id,
611                retention_second,
612                ..
613            } => write!(
614                f,
615                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
616            ),
617            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
618            Command::Refresh {
619                table_id,
620                associated_source_id,
621            } => write!(
622                f,
623                "Refresh: {} (source: {})",
624                table_id, associated_source_id
625            ),
626            Command::ListFinish {
627                table_id,
628                associated_source_id,
629            } => write!(
630                f,
631                "ListFinish: {} (source: {})",
632                table_id, associated_source_id
633            ),
634            Command::LoadFinish {
635                table_id,
636                associated_source_id,
637            } => write!(
638                f,
639                "LoadFinish: {} (source: {})",
640                table_id, associated_source_id
641            ),
642            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
643            Command::ResumeBackfill { target } => match target {
644                ResumeBackfillTarget::Job(job_id) => {
645                    write!(f, "ResumeBackfill: job={job_id}")
646                }
647                ResumeBackfillTarget::Fragment(fragment_id) => {
648                    write!(f, "ResumeBackfill: fragment={fragment_id}")
649                }
650            },
651            Command::InjectSourceOffsets {
652                source_id,
653                split_offsets,
654            } => write!(
655                f,
656                "InjectSourceOffsets: {} ({} splits)",
657                source_id,
658                split_offsets.len()
659            ),
660        }
661    }
662}
663
664impl Command {
665    pub fn pause() -> Self {
666        Self::Pause
667    }
668
669    pub fn resume() -> Self {
670        Self::Resume
671    }
672
673    pub fn need_checkpoint(&self) -> bool {
674        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
675        !matches!(self, Command::Resume)
676    }
677}
678
679#[derive(Debug)]
680pub enum PostCollectCommand {
681    Command(String),
682    DropStreamingJobs,
683    CreateStreamingJob {
684        info: CreateStreamingJobCommandInfo,
685        job_type: CreateStreamingJobType,
686        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
687        resolved_split_assignment: SplitAssignment,
688    },
689    Reschedule {
690        reschedules: HashMap<FragmentId, Reschedule>,
691    },
692    ReplaceStreamJob {
693        plan: ReplaceStreamJobPlan,
694        resolved_split_assignment: SplitAssignment,
695    },
696    SourceChangeSplit {
697        split_assignment: SplitAssignment,
698    },
699    CreateSubscription {
700        subscription_id: SubscriptionId,
701    },
702    ConnectorPropsChange(ConnectorPropsChange),
703    ResumeBackfill {
704        target: ResumeBackfillTarget,
705    },
706}
707
708impl PostCollectCommand {
709    pub fn barrier() -> Self {
710        PostCollectCommand::Command("barrier".to_owned())
711    }
712
713    pub fn should_checkpoint(&self) -> bool {
714        match self {
715            PostCollectCommand::DropStreamingJobs
716            | PostCollectCommand::CreateStreamingJob { .. }
717            | PostCollectCommand::Reschedule { .. }
718            | PostCollectCommand::ReplaceStreamJob { .. }
719            | PostCollectCommand::SourceChangeSplit { .. }
720            | PostCollectCommand::CreateSubscription { .. }
721            | PostCollectCommand::ConnectorPropsChange(_)
722            | PostCollectCommand::ResumeBackfill { .. } => true,
723            PostCollectCommand::Command(_) => false,
724        }
725    }
726
727    pub fn command_name(&self) -> &str {
728        match self {
729            PostCollectCommand::Command(name) => name.as_str(),
730            PostCollectCommand::DropStreamingJobs => "DropStreamingJobs",
731            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
732            PostCollectCommand::Reschedule { .. } => "Reschedule",
733            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
734            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
735            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
736            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
737            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
738        }
739    }
740}
741
742impl Display for PostCollectCommand {
743    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
744        f.write_str(self.command_name())
745    }
746}
747
748#[derive(Debug, Clone, PartialEq, Eq)]
749pub enum BarrierKind {
750    Initial,
751    Barrier,
752    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
753    Checkpoint(Vec<u64>),
754}
755
756impl BarrierKind {
757    pub fn to_protobuf(&self) -> PbBarrierKind {
758        match self {
759            BarrierKind::Initial => PbBarrierKind::Initial,
760            BarrierKind::Barrier => PbBarrierKind::Barrier,
761            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
762        }
763    }
764
765    pub fn is_checkpoint(&self) -> bool {
766        matches!(self, BarrierKind::Checkpoint(_))
767    }
768
769    pub fn is_initial(&self) -> bool {
770        matches!(self, BarrierKind::Initial)
771    }
772
773    pub fn as_str_name(&self) -> &'static str {
774        match self {
775            BarrierKind::Initial => "Initial",
776            BarrierKind::Barrier => "Barrier",
777            BarrierKind::Checkpoint(_) => "Checkpoint",
778        }
779    }
780}
781
782impl BarrierInfo {
783    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
784        let Some(truncate_timestamptz) = Timestamptz::from_secs(
785            self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
786        ) else {
787            warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
788            return self.prev_epoch.value();
789        };
790        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
791    }
792}
793
794impl Command {
795    pub(super) fn collect_commit_epoch_info(
796        database_info: &InflightDatabaseInfo,
797        barrier_info: &PartialGraphBarrierInfo,
798        info: &mut CommitEpochInfo,
799        resps: Vec<BarrierCompleteResponse>,
800        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
801    ) {
802        let (
803            sst_to_context,
804            synced_ssts,
805            new_table_watermarks,
806            old_value_ssts,
807            vector_index_adds,
808            truncate_tables,
809        ) = collect_resp_info(resps);
810
811        let new_table_fragment_infos =
812            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
813                &barrier_info.post_collect_command
814            {
815                assert!(!matches!(
816                    job_type,
817                    CreateStreamingJobType::SnapshotBackfill(_)
818                ));
819                let table_fragments = &info.stream_job_fragments;
820                let mut table_ids: HashSet<_> =
821                    table_fragments.internal_table_ids().into_iter().collect();
822                if let Some(mv_table_id) = table_fragments.mv_table_id() {
823                    table_ids.insert(mv_table_id);
824                }
825
826                vec![NewTableFragmentInfo { table_ids }]
827            } else {
828                vec![]
829            };
830
831        let mut mv_log_store_truncate_epoch = HashMap::new();
832        // TODO: may collect cross db snapshot backfill
833        let mut update_truncate_epoch =
834            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
835                Entry::Occupied(mut entry) => {
836                    let prev_truncate_epoch = entry.get_mut();
837                    if truncate_epoch < *prev_truncate_epoch {
838                        *prev_truncate_epoch = truncate_epoch;
839                    }
840                }
841                Entry::Vacant(entry) => {
842                    entry.insert(truncate_epoch);
843                }
844            };
845        for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
846            let truncate_epoch = barrier_info
847                .barrier_info
848                .get_truncate_epoch(max_retention)
849                .0;
850            update_truncate_epoch(mv_table_id, truncate_epoch);
851        }
852        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
853            for mv_table_id in upstream_mv_table_ids {
854                update_truncate_epoch(mv_table_id, backfill_epoch);
855            }
856        }
857
858        let table_new_change_log = build_table_change_log_delta(
859            old_value_ssts.into_iter(),
860            synced_ssts.iter().map(|sst| &sst.sst_info),
861            must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
862            mv_log_store_truncate_epoch.into_iter(),
863        );
864
865        let epoch = barrier_info.barrier_info.prev_epoch();
866        for table_id in &barrier_info.table_ids_to_commit {
867            info.tables_to_commit
868                .try_insert(*table_id, epoch)
869                .expect("non duplicate");
870        }
871
872        info.sstables.extend(synced_ssts);
873        info.new_table_watermarks.extend(new_table_watermarks);
874        info.sst_to_context.extend(sst_to_context);
875        info.new_table_fragment_infos
876            .extend(new_table_fragment_infos);
877        info.change_log_delta.extend(table_new_change_log);
878        for (table_id, vector_index_adds) in vector_index_adds {
879            info.vector_index_delta
880                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
881                .expect("non-duplicate");
882        }
883        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
884            &barrier_info.post_collect_command
885            && let Some(index_table) = collect_new_vector_index_info(job_info)
886        {
887            info.vector_index_delta
888                .try_insert(
889                    index_table.id,
890                    VectorIndexDelta::Init(PbVectorIndexInit {
891                        info: Some(index_table.vector_index_info.unwrap()),
892                    }),
893                )
894                .expect("non-duplicate");
895        }
896        info.truncate_tables.extend(truncate_tables);
897    }
898}
899
900impl Command {
901    /// Build the `Pause` mutation.
902    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
903        {
904            {
905                // Only pause when the cluster is not already paused.
906                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
907                if !is_currently_paused {
908                    Some(Mutation::Pause(PauseMutation {}))
909                } else {
910                    None
911                }
912            }
913        }
914    }
915
916    /// Build the `Resume` mutation.
917    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
918        {
919            {
920                // Only resume when the cluster is paused with the same reason.
921                if is_currently_paused {
922                    Some(Mutation::Resume(ResumeMutation {}))
923                } else {
924                    None
925                }
926            }
927        }
928    }
929
930    /// Build the `Splits` mutation for `SourceChangeSplit`.
931    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
932        {
933            {
934                let mut diff = HashMap::new();
935
936                for actor_splits in split_assignment.values() {
937                    diff.extend(actor_splits.clone());
938                }
939
940                Mutation::Splits(SourceChangeSplitMutation {
941                    actor_splits: build_actor_connector_splits(&diff),
942                })
943            }
944        }
945    }
946
947    /// Build the `Throttle` mutation.
948    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
949        {
950            {
951                let config = config.clone();
952                Mutation::Throttle(ThrottleMutation {
953                    fragment_throttle: config,
954                })
955            }
956        }
957    }
958
959    /// Build the `Stop` mutation for `DropStreamingJobs`.
960    pub(super) fn drop_streaming_jobs_to_mutation(
961        actors: &Vec<ActorId>,
962        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
963    ) -> Mutation {
964        {
965            Mutation::Stop(StopMutation {
966                actors: actors.clone(),
967                dropped_sink_fragments: dropped_sink_fragment_by_targets
968                    .values()
969                    .flatten()
970                    .cloned()
971                    .collect(),
972            })
973        }
974    }
975
976    /// Build the `Add` mutation for `CreateStreamingJob`.
977    pub(super) fn create_streaming_job_to_mutation(
978        info: &CreateStreamingJobCommandInfo,
979        job_type: &CreateStreamingJobType,
980        is_currently_paused: bool,
981        edges: &mut FragmentEdgeBuildResult,
982        control_stream_manager: &ControlStreamManager,
983        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
984        split_assignment: &SplitAssignment,
985        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
986        actor_location: &HashMap<ActorId, WorkerId>,
987    ) -> MetaResult<Mutation> {
988        {
989            {
990                let CreateStreamingJobCommandInfo {
991                    stream_job_fragments,
992                    upstream_fragment_downstreams,
993                    fragment_backfill_ordering,
994                    streaming_job,
995                    ..
996                } = info;
997                let database_id = streaming_job.database_id();
998                let added_actors: Vec<ActorId> = stream_actors
999                    .values()
1000                    .flatten()
1001                    .map(|actor| actor.actor_id)
1002                    .collect();
1003                let actor_splits = split_assignment
1004                    .values()
1005                    .flat_map(build_actor_connector_splits)
1006                    .collect();
1007                let subscriptions_to_add =
1008                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1009                        job_type
1010                    {
1011                        snapshot_backfill_info
1012                            .upstream_mv_table_id_to_backfill_epoch
1013                            .keys()
1014                            .map(|table_id| SubscriptionUpstreamInfo {
1015                                subscriber_id: stream_job_fragments
1016                                    .stream_job_id()
1017                                    .as_subscriber_id(),
1018                                upstream_mv_table_id: *table_id,
1019                            })
1020                            .collect()
1021                    } else {
1022                        Default::default()
1023                    };
1024                let backfill_nodes_to_pause: Vec<_> =
1025                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1026                        .into_iter()
1027                        .collect();
1028
1029                let new_upstream_sinks =
1030                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1031                        sink_fragment_id,
1032                        sink_output_fields,
1033                        project_exprs,
1034                        new_sink_downstream,
1035                        ..
1036                    }) = job_type
1037                    {
1038                        let new_sink_actors = stream_actors
1039                            .get(sink_fragment_id)
1040                            .unwrap_or_else(|| {
1041                                panic!("upstream sink fragment {sink_fragment_id} not exist")
1042                            })
1043                            .iter()
1044                            .map(|actor| {
1045                                let worker_id = actor_location[&actor.actor_id];
1046                                PbActorInfo {
1047                                    actor_id: actor.actor_id,
1048                                    host: Some(control_stream_manager.host_addr(worker_id)),
1049                                    partial_graph_id: to_partial_graph_id(database_id, None),
1050                                }
1051                            });
1052                        let new_upstream_sink = PbNewUpstreamSink {
1053                            info: Some(PbUpstreamSinkInfo {
1054                                upstream_fragment_id: *sink_fragment_id,
1055                                sink_output_schema: sink_output_fields.clone(),
1056                                project_exprs: project_exprs.clone(),
1057                            }),
1058                            upstream_actors: new_sink_actors.collect(),
1059                        };
1060                        HashMap::from([(
1061                            new_sink_downstream.downstream_fragment_id,
1062                            new_upstream_sink,
1063                        )])
1064                    } else {
1065                        HashMap::new()
1066                    };
1067
1068                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1069                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1070
1071                let add_mutation = AddMutation {
1072                    actor_dispatchers: edges
1073                        .dispatchers
1074                        .extract_if(|fragment_id, _| {
1075                            upstream_fragment_downstreams.contains_key(fragment_id)
1076                        })
1077                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1078                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1079                        .collect(),
1080                    added_actors,
1081                    actor_splits,
1082                    // If the cluster is already paused, the new actors should be paused too.
1083                    pause: is_currently_paused,
1084                    subscriptions_to_add,
1085                    backfill_nodes_to_pause,
1086                    actor_cdc_table_snapshot_splits,
1087                    new_upstream_sinks,
1088                };
1089
1090                Ok(Mutation::Add(add_mutation))
1091            }
1092        }
1093    }
1094
1095    /// Build the `Update` mutation for `ReplaceStreamJob`.
1096    pub(super) fn replace_stream_job_to_mutation(
1097        ReplaceStreamJobPlan {
1098            old_fragments,
1099            replace_upstream,
1100            upstream_fragment_downstreams,
1101            auto_refresh_schema_sinks,
1102            ..
1103        }: &ReplaceStreamJobPlan,
1104        edges: &mut FragmentEdgeBuildResult,
1105        database_info: &mut InflightDatabaseInfo,
1106        split_assignment: &SplitAssignment,
1107    ) -> MetaResult<Option<Mutation>> {
1108        {
1109            {
1110                let merge_updates = edges
1111                    .merge_updates
1112                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1113                    .collect();
1114                let dispatchers = edges
1115                    .dispatchers
1116                    .extract_if(|fragment_id, _| {
1117                        upstream_fragment_downstreams.contains_key(fragment_id)
1118                    })
1119                    .collect();
1120                let actor_cdc_table_snapshot_splits = database_info
1121                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1122                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1123                let old_fragments = old_fragments.fragments.keys().copied();
1124                let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1125                    .as_ref()
1126                    .into_iter()
1127                    .flat_map(|sinks| sinks.iter())
1128                    .map(|sink| sink.original_fragment.fragment_id);
1129                Ok(Self::generate_update_mutation_for_replace_table(
1130                    old_fragments
1131                        .chain(auto_refresh_sink_fragment_ids)
1132                        .flat_map(|fragment_id| {
1133                            database_info.fragment(fragment_id).actors.keys().copied()
1134                        }),
1135                    merge_updates,
1136                    dispatchers,
1137                    split_assignment,
1138                    actor_cdc_table_snapshot_splits,
1139                    auto_refresh_schema_sinks.as_ref(),
1140                ))
1141            }
1142        }
1143    }
1144
1145    /// Build the `Update` mutation for `RescheduleIntent`.
1146    pub(super) fn reschedule_to_mutation(
1147        reschedules: &HashMap<FragmentId, Reschedule>,
1148        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1149        control_stream_manager: &ControlStreamManager,
1150        database_info: &mut InflightDatabaseInfo,
1151    ) -> MetaResult<Option<Mutation>> {
1152        {
1153            {
1154                let database_id = database_info.database_id;
1155                let mut dispatcher_update = HashMap::new();
1156                for reschedule in reschedules.values() {
1157                    for &(upstream_fragment_id, dispatcher_id) in
1158                        &reschedule.upstream_fragment_dispatcher_ids
1159                    {
1160                        // Find the actors of the upstream fragment.
1161                        let upstream_actor_ids = fragment_actors
1162                            .get(&upstream_fragment_id)
1163                            .expect("should contain");
1164
1165                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1166
1167                        // Record updates for all actors.
1168                        for &actor_id in upstream_actor_ids {
1169                            let added_downstream_actor_id = if upstream_reschedule
1170                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1171                                .unwrap_or(true)
1172                            {
1173                                reschedule
1174                                    .added_actors
1175                                    .values()
1176                                    .flatten()
1177                                    .cloned()
1178                                    .collect()
1179                            } else {
1180                                Default::default()
1181                            };
1182                            // Index with the dispatcher id to check duplicates.
1183                            dispatcher_update
1184                                .try_insert(
1185                                    (actor_id, dispatcher_id),
1186                                    DispatcherUpdate {
1187                                        actor_id,
1188                                        dispatcher_id,
1189                                        hash_mapping: reschedule
1190                                            .upstream_dispatcher_mapping
1191                                            .as_ref()
1192                                            .map(|m| m.to_protobuf()),
1193                                        added_downstream_actor_id,
1194                                        removed_downstream_actor_id: reschedule
1195                                            .removed_actors
1196                                            .iter()
1197                                            .cloned()
1198                                            .collect(),
1199                                    },
1200                                )
1201                                .unwrap();
1202                        }
1203                    }
1204                }
1205                let dispatcher_update = dispatcher_update.into_values().collect();
1206
1207                let mut merge_update = HashMap::new();
1208                for (&fragment_id, reschedule) in reschedules {
1209                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1210                        // Find the actors of the downstream fragment.
1211                        let downstream_actor_ids = fragment_actors
1212                            .get(&downstream_fragment_id)
1213                            .expect("should contain");
1214
1215                        // Downstream removed actors should be skipped
1216                        // Newly created actors of the current fragment will not dispatch Update
1217                        // barriers to them
1218                        let downstream_removed_actors: HashSet<_> = reschedules
1219                            .get(&downstream_fragment_id)
1220                            .map(|downstream_reschedule| {
1221                                downstream_reschedule
1222                                    .removed_actors
1223                                    .iter()
1224                                    .copied()
1225                                    .collect()
1226                            })
1227                            .unwrap_or_default();
1228
1229                        // Record updates for all actors.
1230                        for &actor_id in downstream_actor_ids {
1231                            if downstream_removed_actors.contains(&actor_id) {
1232                                continue;
1233                            }
1234
1235                            // Index with the fragment id to check duplicates.
1236                            merge_update
1237                                .try_insert(
1238                                    (actor_id, fragment_id),
1239                                    MergeUpdate {
1240                                        actor_id,
1241                                        upstream_fragment_id: fragment_id,
1242                                        new_upstream_fragment_id: None,
1243                                        added_upstream_actors: reschedule
1244                                            .added_actors
1245                                            .iter()
1246                                            .flat_map(|(worker_id, actors)| {
1247                                                let host =
1248                                                    control_stream_manager.host_addr(*worker_id);
1249                                                actors.iter().map(move |&actor_id| PbActorInfo {
1250                                                    actor_id,
1251                                                    host: Some(host.clone()),
1252                                                    // we assume that we only scale the partial graph of database
1253                                                    partial_graph_id: to_partial_graph_id(
1254                                                        database_id,
1255                                                        None,
1256                                                    ),
1257                                                })
1258                                            })
1259                                            .collect(),
1260                                        removed_upstream_actor_id: reschedule
1261                                            .removed_actors
1262                                            .iter()
1263                                            .cloned()
1264                                            .collect(),
1265                                    },
1266                                )
1267                                .unwrap();
1268                        }
1269                    }
1270                }
1271                let merge_update = merge_update.into_values().collect();
1272
1273                let mut actor_vnode_bitmap_update = HashMap::new();
1274                for reschedule in reschedules.values() {
1275                    // Record updates for all actors in this fragment.
1276                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1277                        let bitmap = bitmap.to_protobuf();
1278                        actor_vnode_bitmap_update
1279                            .try_insert(actor_id, bitmap)
1280                            .unwrap();
1281                    }
1282                }
1283                let dropped_actors = reschedules
1284                    .values()
1285                    .flat_map(|r| r.removed_actors.iter().copied())
1286                    .collect();
1287                let mut actor_splits = HashMap::new();
1288                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1289                for (fragment_id, reschedule) in reschedules {
1290                    for (actor_id, splits) in &reschedule.actor_splits {
1291                        actor_splits.insert(
1292                            *actor_id,
1293                            ConnectorSplits {
1294                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1295                            },
1296                        );
1297                    }
1298
1299                    if let Some(assignment) =
1300                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1301                    {
1302                        actor_cdc_table_snapshot_splits.extend(assignment)
1303                    }
1304                }
1305
1306                // we don't create dispatchers in reschedule scenario
1307                let actor_new_dispatchers = HashMap::new();
1308                let mutation = Mutation::Update(UpdateMutation {
1309                    dispatcher_update,
1310                    merge_update,
1311                    actor_vnode_bitmap_update,
1312                    dropped_actors,
1313                    actor_splits,
1314                    actor_new_dispatchers,
1315                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1316                        splits: actor_cdc_table_snapshot_splits,
1317                    }),
1318                    sink_schema_change: Default::default(),
1319                    subscriptions_to_drop: vec![],
1320                });
1321                tracing::debug!("update mutation: {mutation:?}");
1322                Ok(Some(mutation))
1323            }
1324        }
1325    }
1326
1327    /// Build the `Add` mutation for `CreateSubscription`.
1328    pub(super) fn create_subscription_to_mutation(
1329        upstream_mv_table_id: TableId,
1330        subscription_id: SubscriptionId,
1331    ) -> Mutation {
1332        {
1333            Mutation::Add(AddMutation {
1334                actor_dispatchers: Default::default(),
1335                added_actors: vec![],
1336                actor_splits: Default::default(),
1337                pause: false,
1338                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1339                    upstream_mv_table_id,
1340                    subscriber_id: subscription_id.as_subscriber_id(),
1341                }],
1342                backfill_nodes_to_pause: vec![],
1343                actor_cdc_table_snapshot_splits: None,
1344                new_upstream_sinks: Default::default(),
1345            })
1346        }
1347    }
1348
1349    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1350    pub(super) fn drop_subscription_to_mutation(
1351        upstream_mv_table_id: TableId,
1352        subscription_id: SubscriptionId,
1353    ) -> Mutation {
1354        {
1355            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1356                info: vec![SubscriptionUpstreamInfo {
1357                    subscriber_id: subscription_id.as_subscriber_id(),
1358                    upstream_mv_table_id,
1359                }],
1360            })
1361        }
1362    }
1363
1364    /// Build the `ConnectorPropsChange` mutation.
1365    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1366        {
1367            {
1368                let mut connector_props_infos = HashMap::default();
1369                for (k, v) in config {
1370                    connector_props_infos.insert(
1371                        k.as_raw_id(),
1372                        ConnectorPropsInfo {
1373                            connector_props_info: v.clone(),
1374                        },
1375                    );
1376                }
1377                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1378                    connector_props_infos,
1379                })
1380            }
1381        }
1382    }
1383
1384    /// Build the `RefreshStart` mutation.
1385    pub(super) fn refresh_to_mutation(
1386        table_id: TableId,
1387        associated_source_id: SourceId,
1388    ) -> Mutation {
1389        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1390            table_id,
1391            associated_source_id,
1392        })
1393    }
1394
1395    /// Build the `ListFinish` mutation.
1396    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1397        Mutation::ListFinish(ListFinishMutation {
1398            associated_source_id,
1399        })
1400    }
1401
1402    /// Build the `LoadFinish` mutation.
1403    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1404        Mutation::LoadFinish(LoadFinishMutation {
1405            associated_source_id,
1406        })
1407    }
1408
1409    /// Build the `ResetSource` mutation.
1410    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1411        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1412            source_id: source_id.as_raw_id(),
1413        })
1414    }
1415
1416    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1417    pub(super) fn resume_backfill_to_mutation(
1418        target: &ResumeBackfillTarget,
1419        database_info: &InflightDatabaseInfo,
1420    ) -> MetaResult<Option<Mutation>> {
1421        {
1422            {
1423                let fragment_ids: HashSet<_> = match target {
1424                    ResumeBackfillTarget::Job(job_id) => {
1425                        database_info.backfill_fragment_ids_for_job(*job_id)?
1426                    }
1427                    ResumeBackfillTarget::Fragment(fragment_id) => {
1428                        if !database_info.is_backfill_fragment(*fragment_id)? {
1429                            return Err(MetaError::invalid_parameter(format!(
1430                                "fragment {} is not a backfill node",
1431                                fragment_id
1432                            )));
1433                        }
1434                        HashSet::from([*fragment_id])
1435                    }
1436                };
1437                if fragment_ids.is_empty() {
1438                    warn!(
1439                        ?target,
1440                        "resume backfill command ignored because no backfill fragments found"
1441                    );
1442                    Ok(None)
1443                } else {
1444                    Ok(Some(Mutation::StartFragmentBackfill(
1445                        StartFragmentBackfillMutation {
1446                            fragment_ids: fragment_ids.into_iter().collect(),
1447                        },
1448                    )))
1449                }
1450            }
1451        }
1452    }
1453
1454    /// Build the `InjectSourceOffsets` mutation.
1455    pub(super) fn inject_source_offsets_to_mutation(
1456        source_id: SourceId,
1457        split_offsets: &HashMap<String, String>,
1458    ) -> Mutation {
1459        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1460            source_id: source_id.as_raw_id(),
1461            split_offsets: split_offsets.clone(),
1462        })
1463    }
1464
1465    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1466    pub(super) fn create_streaming_job_actors_to_create(
1467        info: &CreateStreamingJobCommandInfo,
1468        edges: &mut FragmentEdgeBuildResult,
1469        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1470        actor_location: &HashMap<ActorId, WorkerId>,
1471    ) -> StreamJobActorsToCreate {
1472        {
1473            {
1474                edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1475                    |fragment| {
1476                        let actors = stream_actors
1477                            .get(&fragment.fragment_id)
1478                            .into_iter()
1479                            .flatten()
1480                            .map(|actor| (actor, actor_location[&actor.actor_id]));
1481                        (
1482                            fragment.fragment_id,
1483                            &fragment.nodes,
1484                            actors,
1485                            [], // no subscriber for new job to create
1486                        )
1487                    },
1488                ))
1489            }
1490        }
1491    }
1492
1493    /// Collect actors to create for `RescheduleIntent`.
1494    pub(super) fn reschedule_actors_to_create(
1495        reschedules: &HashMap<FragmentId, Reschedule>,
1496        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1497        database_info: &InflightDatabaseInfo,
1498        control_stream_manager: &ControlStreamManager,
1499    ) -> StreamJobActorsToCreate {
1500        {
1501            {
1502                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1503                    reschedules.iter().map(|(fragment_id, reschedule)| {
1504                        (
1505                            *fragment_id,
1506                            reschedule.newly_created_actors.values().map(
1507                                |((actor, dispatchers), _)| {
1508                                    (actor.actor_id, dispatchers.as_slice())
1509                                },
1510                            ),
1511                        )
1512                    }),
1513                    Some((reschedules, fragment_actors)),
1514                    database_info,
1515                    control_stream_manager,
1516                );
1517                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1518                for (fragment_id, (actor, dispatchers), worker_id) in
1519                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1520                        reschedule
1521                            .newly_created_actors
1522                            .values()
1523                            .map(|(actors, status)| (*fragment_id, actors, status))
1524                    })
1525                {
1526                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1527                    map.entry(*worker_id)
1528                        .or_default()
1529                        .entry(fragment_id)
1530                        .or_insert_with(|| {
1531                            let node = database_info.fragment(fragment_id).nodes.clone();
1532                            let subscribers =
1533                                database_info.fragment_subscribers(fragment_id).collect();
1534                            (node, vec![], subscribers)
1535                        })
1536                        .1
1537                        .push((actor.clone(), upstreams, dispatchers.clone()));
1538                }
1539                map
1540            }
1541        }
1542    }
1543
1544    /// Collect actors to create for `ReplaceStreamJob`.
1545    pub(super) fn replace_stream_job_actors_to_create(
1546        replace_table: &ReplaceStreamJobPlan,
1547        edges: &mut FragmentEdgeBuildResult,
1548        database_info: &InflightDatabaseInfo,
1549        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1550        actor_location: &HashMap<ActorId, WorkerId>,
1551    ) -> StreamJobActorsToCreate {
1552        {
1553            {
1554                let mut actors = edges.collect_actors_to_create(
1555                    replace_table
1556                        .new_fragments
1557                        .fragments
1558                        .values()
1559                        .map(|fragment| {
1560                            let actors = stream_actors
1561                                .get(&fragment.fragment_id)
1562                                .into_iter()
1563                                .flatten()
1564                                .map(|actor| (actor, actor_location[&actor.actor_id]));
1565                            (
1566                                fragment.fragment_id,
1567                                &fragment.nodes,
1568                                actors,
1569                                database_info
1570                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1571                            )
1572                        }),
1573                );
1574
1575                // Handle auto-refresh schema sinks
1576                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1577                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1578                        (
1579                            sink.new_fragment.fragment_id,
1580                            &sink.new_fragment.nodes,
1581                            stream_actors
1582                                .get(&sink.new_fragment.fragment_id)
1583                                .into_iter()
1584                                .flatten()
1585                                .map(|actor| (actor, actor_location[&actor.actor_id])),
1586                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1587                        )
1588                    }));
1589                    for (worker_id, fragment_actors) in sink_actors {
1590                        actors.entry(worker_id).or_default().extend(fragment_actors);
1591                    }
1592                }
1593                actors
1594            }
1595        }
1596    }
1597
1598    fn generate_update_mutation_for_replace_table(
1599        dropped_actors: impl IntoIterator<Item = ActorId>,
1600        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1601        dispatchers: FragmentActorDispatchers,
1602        split_assignment: &SplitAssignment,
1603        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1604        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1605    ) -> Option<Mutation> {
1606        let dropped_actors = dropped_actors.into_iter().collect();
1607
1608        let actor_new_dispatchers = dispatchers
1609            .into_values()
1610            .flatten()
1611            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1612            .collect();
1613
1614        let actor_splits = split_assignment
1615            .values()
1616            .flat_map(build_actor_connector_splits)
1617            .collect();
1618        Some(Mutation::Update(UpdateMutation {
1619            actor_new_dispatchers,
1620            merge_update: merge_updates.into_values().flatten().collect(),
1621            dropped_actors,
1622            actor_splits,
1623            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1624            sink_schema_change: auto_refresh_schema_sinks
1625                .as_ref()
1626                .into_iter()
1627                .flat_map(|sinks| {
1628                    sinks.iter().map(|sink| {
1629                        let op = if !sink.removed_column_names.is_empty() {
1630                            PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1631                                column_names: sink.removed_column_names.clone(),
1632                            })
1633                        } else {
1634                            PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1635                                fields: sink
1636                                    .newly_add_fields
1637                                    .iter()
1638                                    .map(|field| field.to_prost())
1639                                    .collect(),
1640                            })
1641                        };
1642                        (
1643                            sink.original_sink.id.as_raw_id(),
1644                            PbSinkSchemaChange {
1645                                original_schema: sink
1646                                    .original_sink
1647                                    .columns
1648                                    .iter()
1649                                    .map(|col| PbField {
1650                                        data_type: Some(
1651                                            col.column_desc
1652                                                .as_ref()
1653                                                .unwrap()
1654                                                .column_type
1655                                                .as_ref()
1656                                                .unwrap()
1657                                                .clone(),
1658                                        ),
1659                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1660                                    })
1661                                    .collect(),
1662                                op: Some(op),
1663                            },
1664                        )
1665                    })
1666                })
1667                .collect(),
1668            ..Default::default()
1669        }))
1670    }
1671}
1672
1673impl Command {
1674    #[expect(clippy::type_complexity)]
1675    pub(super) fn collect_database_partial_graph_actor_upstreams(
1676        actor_dispatchers: impl Iterator<
1677            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1678        >,
1679        reschedule_dispatcher_update: Option<(
1680            &HashMap<FragmentId, Reschedule>,
1681            &HashMap<FragmentId, HashSet<ActorId>>,
1682        )>,
1683        database_info: &InflightDatabaseInfo,
1684        control_stream_manager: &ControlStreamManager,
1685    ) -> HashMap<ActorId, ActorUpstreams> {
1686        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1687        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1688            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1689            for (upstream_actor_id, dispatchers) in upstream_actors {
1690                let upstream_actor_location =
1691                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1692                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1693                for downstream_actor_id in dispatchers
1694                    .iter()
1695                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1696                {
1697                    actor_upstreams
1698                        .entry(*downstream_actor_id)
1699                        .or_default()
1700                        .entry(upstream_fragment_id)
1701                        .or_default()
1702                        .insert(
1703                            upstream_actor_id,
1704                            PbActorInfo {
1705                                actor_id: upstream_actor_id,
1706                                host: Some(upstream_actor_host.clone()),
1707                                partial_graph_id: to_partial_graph_id(
1708                                    database_info.database_id,
1709                                    None,
1710                                ),
1711                            },
1712                        );
1713                }
1714            }
1715        }
1716        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1717            for reschedule in reschedules.values() {
1718                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1719                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1720                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1721                    for upstream_actor_id in fragment_actors
1722                        .get(upstream_fragment_id)
1723                        .expect("should exist")
1724                    {
1725                        let upstream_actor_location =
1726                            upstream_fragment.actors[upstream_actor_id].worker_id;
1727                        let upstream_actor_host =
1728                            control_stream_manager.host_addr(upstream_actor_location);
1729                        if let Some(upstream_reschedule) = upstream_reschedule
1730                            && upstream_reschedule
1731                                .removed_actors
1732                                .contains(upstream_actor_id)
1733                        {
1734                            continue;
1735                        }
1736                        for (_, downstream_actor_id) in
1737                            reschedule
1738                                .added_actors
1739                                .iter()
1740                                .flat_map(|(worker_id, actors)| {
1741                                    actors.iter().map(|actor| (*worker_id, *actor))
1742                                })
1743                        {
1744                            actor_upstreams
1745                                .entry(downstream_actor_id)
1746                                .or_default()
1747                                .entry(*upstream_fragment_id)
1748                                .or_default()
1749                                .insert(
1750                                    *upstream_actor_id,
1751                                    PbActorInfo {
1752                                        actor_id: *upstream_actor_id,
1753                                        host: Some(upstream_actor_host.clone()),
1754                                        partial_graph_id: to_partial_graph_id(
1755                                            database_info.database_id,
1756                                            None,
1757                                        ),
1758                                    },
1759                                );
1760                        }
1761                    }
1762                }
1763            }
1764        }
1765        actor_upstreams
1766    }
1767}