Skip to main content

risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::{ColumnCatalog as PbColumnCatalog, PbField};
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49    PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50    StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51    UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::complete_task::CompleteBarrierTask;
59use crate::barrier::edge_builder::FragmentEdgeBuildResult;
60use crate::barrier::info::BarrierInfo;
61use crate::barrier::partial_graph::PartialGraphBarrierInfo;
62use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
63use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
64use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
65use crate::controller::scale::LoadedFragmentContext;
66use crate::controller::utils::StreamingJobExtraInfo;
67use crate::hummock::NewTableFragmentInfo;
68use crate::manager::{StreamingJob, StreamingJobType};
69use crate::model::{
70    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
71    FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
72    StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
73};
74use crate::stream::{
75    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
76    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
77    build_actor_connector_splits,
78};
79use crate::{MetaError, MetaResult};
80
81/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
82/// used for actor scaling or migration.
83#[derive(Debug, Clone)]
84pub struct Reschedule {
85    /// Added actors in this fragment.
86    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88    /// Removed actors in this fragment.
89    pub removed_actors: HashSet<ActorId>,
90
91    /// Vnode bitmap updates for some actors in this fragment.
92    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
95    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96    /// New hash mapping of the upstream dispatcher to be updated.
97    ///
98    /// This field exists only when there's upstream fragment and the current fragment is
99    /// hash-sharded.
100    pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102    /// The downstream fragments of this fragment.
103    pub downstream_fragment_ids: Vec<FragmentId>,
104
105    /// Reassigned splits for source actors.
106    /// It becomes the `actor_splits` in [`UpdateMutation`].
107    /// `Source` and `SourceBackfill` are handled together here.
108    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111}
112
113#[derive(Debug, Clone)]
114pub struct ReschedulePlan {
115    pub reschedules: HashMap<FragmentId, Reschedule>,
116    /// Should contain the actor ids in upstream and downstream fragments referenced by
117    /// `reschedules`.
118    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
119}
120
121/// Preloaded context for rescheduling, built outside the barrier worker.
122#[derive(Debug, Clone)]
123pub struct RescheduleContext {
124    pub loaded: LoadedFragmentContext,
125    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
126    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
128    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
129}
130
131impl RescheduleContext {
132    pub fn empty() -> Self {
133        Self {
134            loaded: LoadedFragmentContext::default(),
135            job_extra_info: HashMap::new(),
136            upstream_fragments: HashMap::new(),
137            downstream_fragments: HashMap::new(),
138            downstream_relations: HashMap::new(),
139        }
140    }
141
142    pub fn is_empty(&self) -> bool {
143        self.loaded.is_empty()
144    }
145
146    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
147        let loaded = self.loaded.for_database(database_id)?;
148        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
149        // Use the filtered loaded context as the source of truth so every side map is pruned by
150        // the same fragment set.
151        let fragment_ids: HashSet<FragmentId> = loaded
152            .job_fragments
153            .values()
154            .flat_map(|fragments| fragments.keys().copied())
155            .collect();
156
157        let job_extra_info = self
158            .job_extra_info
159            .iter()
160            .filter(|(job_id, _)| job_ids.contains(*job_id))
161            .map(|(job_id, info)| (*job_id, info.clone()))
162            .collect();
163
164        let upstream_fragments = self
165            .upstream_fragments
166            .iter()
167            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
168            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
169            .collect();
170
171        let downstream_fragments = self
172            .downstream_fragments
173            .iter()
174            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
175            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
176            .collect();
177
178        let downstream_relations = self
179            .downstream_relations
180            .iter()
181            // Ownership of this map is source-fragment based. We keep all downstream edges for
182            // selected sources because the target side can still be referenced during dispatcher
183            // reconstruction even if that target fragment is not being rescheduled.
184            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
185            .map(|(key, relation)| (*key, relation.clone()))
186            .collect();
187
188        Some(Self {
189            loaded,
190            job_extra_info,
191            upstream_fragments,
192            downstream_fragments,
193            downstream_relations,
194        })
195    }
196
197    /// Split this context into per-database contexts without cloning the large loaded graph
198    /// payloads.
199    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
200        let Self {
201            loaded,
202            job_extra_info,
203            upstream_fragments,
204            downstream_fragments,
205            downstream_relations,
206        } = self;
207
208        let mut contexts: HashMap<_, _> = loaded
209            .into_database_contexts()
210            .into_iter()
211            .map(|(database_id, loaded)| {
212                (
213                    database_id,
214                    Self {
215                        loaded,
216                        job_extra_info: HashMap::new(),
217                        upstream_fragments: HashMap::new(),
218                        downstream_fragments: HashMap::new(),
219                        downstream_relations: HashMap::new(),
220                    },
221                )
222            })
223            .collect();
224
225        if contexts.is_empty() {
226            return contexts;
227        }
228
229        let mut job_databases = HashMap::new();
230        let mut fragment_databases = HashMap::new();
231        for (&database_id, context) in &contexts {
232            for job_id in context.loaded.job_map.keys().copied() {
233                job_databases.insert(job_id, database_id);
234            }
235            for fragment_id in context
236                .loaded
237                .job_fragments
238                .values()
239                .flat_map(|fragments| fragments.keys().copied())
240            {
241                fragment_databases.insert(fragment_id, database_id);
242            }
243        }
244
245        for (job_id, info) in job_extra_info {
246            if let Some(database_id) = job_databases.get(&job_id).copied() {
247                contexts
248                    .get_mut(&database_id)
249                    .expect("database context should exist for job")
250                    .job_extra_info
251                    .insert(job_id, info);
252            }
253        }
254
255        for (fragment_id, upstreams) in upstream_fragments {
256            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
257                contexts
258                    .get_mut(&database_id)
259                    .expect("database context should exist for fragment")
260                    .upstream_fragments
261                    .insert(fragment_id, upstreams);
262            }
263        }
264
265        for (fragment_id, downstreams) in downstream_fragments {
266            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
267                contexts
268                    .get_mut(&database_id)
269                    .expect("database context should exist for fragment")
270                    .downstream_fragments
271                    .insert(fragment_id, downstreams);
272            }
273        }
274
275        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
276            // Route by source fragment ownership. A target may be outside of current reschedule
277            // set, but this edge still belongs to the source-side command.
278            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
279                contexts
280                    .get_mut(&database_id)
281                    .expect("database context should exist for relation source")
282                    .downstream_relations
283                    .insert((source_fragment_id, target_fragment_id), relation);
284            }
285        }
286
287        contexts
288    }
289}
290
291/// Replacing an old job with a new one. All actors in the job will be rebuilt.
292///
293/// Current use cases:
294/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
295/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
296///   will replace a table job's plan.
297#[derive(Debug, Clone)]
298pub struct ReplaceStreamJobPlan {
299    pub old_fragments: StreamJobFragments,
300    pub new_fragments: StreamJobFragmentsToCreate,
301    /// The resource group of the database this job belongs to.
302    pub database_resource_group: String,
303    /// Downstream jobs of the replaced job need to update their `Merge` node to
304    /// connect to the new fragment.
305    pub replace_upstream: FragmentReplaceUpstream,
306    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
307    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
308    /// inside the barrier worker.
309    pub split_plan: ReplaceJobSplitPlan,
310    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
311    pub streaming_job: StreamingJob,
312    /// The `streaming_job::Model` for this job, loaded from meta store.
313    pub streaming_job_model: streaming_job::Model,
314    /// The temporary dummy job fragments id of new table fragment
315    pub tmp_id: JobId,
316    /// The state table ids to be dropped.
317    pub to_drop_state_table_ids: Vec<TableId>,
318    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
319}
320
321impl ReplaceStreamJobPlan {
322    /// `old_fragment_id` -> `new_fragment_id`
323    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
324        let mut fragment_replacements = HashMap::new();
325        for (upstream_fragment_id, new_upstream_fragment_id) in
326            self.replace_upstream.values().flatten()
327        {
328            {
329                let r =
330                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
331                if let Some(r) = r {
332                    assert_eq!(
333                        *new_upstream_fragment_id, r,
334                        "one fragment is replaced by multiple fragments"
335                    );
336                }
337            }
338        }
339        fragment_replacements
340    }
341}
342
343#[derive(educe::Educe, Clone)]
344#[educe(Debug)]
345pub struct CreateStreamingJobCommandInfo {
346    #[educe(Debug(ignore))]
347    pub stream_job_fragments: StreamJobFragmentsToCreate,
348    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
349    /// The resource group of the database this job belongs to.
350    pub database_resource_group: String,
351    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
352    pub init_split_assignment: SourceSplitAssignment,
353    pub definition: String,
354    pub job_type: StreamingJobType,
355    pub create_type: CreateType,
356    pub streaming_job: StreamingJob,
357    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
358    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
359    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
360    pub is_serverless: bool,
361    /// The `streaming_job::Model` for this job, loaded from meta store.
362    pub streaming_job_model: streaming_job::Model,
363    /// Batch refresh interval in seconds. If set, the MV uses batch refresh semantics.
364    pub refresh_interval_sec: Option<u64>,
365}
366
367impl StreamJobFragments {
368    /// Build fragment-level info for new fragments, populating actor infos from the
369    /// rendered actors and their locations, and applying split assignment to actor splits.
370    pub(super) fn new_fragment_info<'a>(
371        &'a self,
372        stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
373        actor_location: &'a HashMap<ActorId, WorkerId>,
374        assignment: &'a SplitAssignment,
375    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
376        self.fragments.values().map(|fragment| {
377            (
378                fragment.fragment_id,
379                InflightFragmentInfo {
380                    fragment_id: fragment.fragment_id,
381                    distribution_type: fragment.distribution_type.into(),
382                    fragment_type_mask: fragment.fragment_type_mask,
383                    vnode_count: fragment.vnode_count(),
384                    nodes: fragment.nodes.clone(),
385                    actors: stream_actors
386                        .get(&fragment.fragment_id)
387                        .into_iter()
388                        .flatten()
389                        .map(|actor| {
390                            (
391                                actor.actor_id,
392                                InflightActorInfo {
393                                    worker_id: actor_location[&actor.actor_id],
394                                    vnode_bitmap: actor.vnode_bitmap.clone(),
395                                    splits: assignment
396                                        .get(&fragment.fragment_id)
397                                        .and_then(|s| s.get(&actor.actor_id))
398                                        .cloned()
399                                        .unwrap_or_default(),
400                                },
401                            )
402                        })
403                        .collect(),
404                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
405                },
406            )
407        })
408    }
409}
410
411#[derive(Debug, Clone)]
412pub struct SnapshotBackfillInfo {
413    /// `table_id` -> `Some(snapshot_backfill_epoch)`
414    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
415    /// by global barrier worker when handling the command.
416    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
417}
418
419#[derive(Debug, Clone)]
420pub struct BatchRefreshInfo {
421    pub snapshot_backfill_info: SnapshotBackfillInfo,
422    pub refresh_interval_sec: u64,
423}
424
425#[derive(Debug, Clone)]
426pub enum CreateStreamingJobType {
427    Normal,
428    SinkIntoTable(UpstreamSinkInfo),
429    SnapshotBackfill(SnapshotBackfillInfo),
430    BatchRefresh(BatchRefreshInfo),
431}
432
433/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
434/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
435/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
436// FIXME: this enum is significantly large on stack, box it
437#[derive(Debug)]
438pub enum Command {
439    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
440    /// all messages before the checkpoint barrier should have been committed.
441    Flush,
442
443    /// `Pause` command generates a `Pause` barrier **only if**
444    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
445    Pause,
446
447    /// `Resume` command generates a `Resume` barrier **only
448    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
449    /// will be generated.
450    Resume,
451
452    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
453    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
454    /// dropped by reference counts before.
455    ///
456    /// Barriers from the actors to be dropped will STILL be collected.
457    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
458    /// drop actors, and then delete the job fragments info from meta store.
459    DropStreamingJobs {
460        streaming_job_ids: HashSet<JobId>,
461        /// Used by recovery quick path when draining buffered drop/cancel commands.
462        unregistered_state_table_ids: HashSet<TableId>,
463        // target_fragment -> [sink_fragments]
464        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
465    },
466
467    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
468    ///
469    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
470    /// be collected since the barrier should be passthrough.
471    ///
472    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
473    /// it adds the job fragments info to meta store. However, the creating progress will **last
474    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
475    /// will be set to `Created`.
476    CreateStreamingJob {
477        info: CreateStreamingJobCommandInfo,
478        job_type: CreateStreamingJobType,
479        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
480    },
481
482    /// Reschedule context. It must be resolved inside the barrier worker before injection.
483    RescheduleIntent {
484        context: RescheduleContext,
485        /// Filled by the barrier worker after resolving `context` against current worker topology.
486        ///
487        /// We keep unresolved `context` outside of checkpoint state and only materialize this
488        /// execution plan right before injection, then drop `context` to release memory earlier.
489        reschedule_plan: Option<ReschedulePlan>,
490    },
491
492    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
493    /// essentially switching the downstream of the old job fragments to the new ones, and
494    /// dropping the old job fragments. Used for schema change.
495    ///
496    /// This can be treated as a special case of reschedule, while the upstream fragment
497    /// of the Merge executors are changed additionally.
498    ReplaceStreamJob(ReplaceStreamJobPlan),
499
500    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
501    /// changed splits.
502    SourceChangeSplit(SplitState),
503
504    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
505    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
506    Throttle {
507        jobs: HashSet<JobId>,
508        config: HashMap<FragmentId, ThrottleConfig>,
509    },
510
511    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
512    /// materialize executor to start storing old value for subscription.
513    CreateSubscription {
514        subscription_id: SubscriptionId,
515        upstream_mv_table_id: TableId,
516        retention_second: u64,
517    },
518
519    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
520    /// materialize executor to stop storing old value when there is no
521    /// subscription depending on it.
522    DropSubscription {
523        subscription_id: SubscriptionId,
524        upstream_mv_table_id: TableId,
525    },
526
527    /// `AlterSubscriptionRetention` command updates the subscription retention time.
528    AlterSubscriptionRetention {
529        subscription_id: SubscriptionId,
530        upstream_mv_table_id: TableId,
531        retention_second: u64,
532    },
533
534    ConnectorPropsChange(ConnectorPropsChange),
535
536    /// `Refresh` command generates a barrier to refresh a table by truncating state
537    /// and reloading data from source.
538    Refresh {
539        table_id: TableId,
540        associated_source_id: SourceId,
541    },
542    ListFinish {
543        table_id: TableId,
544        associated_source_id: SourceId,
545    },
546    LoadFinish {
547        table_id: TableId,
548        associated_source_id: SourceId,
549    },
550
551    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
552    /// Used when upstream binlog/oplog has expired.
553    ResetSource {
554        source_id: SourceId,
555    },
556
557    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
558    /// to resume for troubleshooting.
559    ResumeBackfill {
560        target: ResumeBackfillTarget,
561    },
562
563    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
564    /// into source splits (UNSAFE - admin only).
565    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
566    InjectSourceOffsets {
567        source_id: SourceId,
568        /// Split ID -> offset (JSON-encoded based on connector type)
569        split_offsets: HashMap<String, String>,
570    },
571}
572
573#[derive(Debug, Clone, Copy)]
574pub enum ResumeBackfillTarget {
575    Job(JobId),
576    Fragment(FragmentId),
577}
578
579// For debugging and observability purposes. Can add more details later if needed.
580impl std::fmt::Display for Command {
581    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582        match self {
583            Command::Flush => write!(f, "Flush"),
584            Command::Pause => write!(f, "Pause"),
585            Command::Resume => write!(f, "Resume"),
586            Command::DropStreamingJobs {
587                streaming_job_ids, ..
588            } => {
589                write!(
590                    f,
591                    "DropStreamingJobs: {}",
592                    streaming_job_ids.iter().sorted().join(", ")
593                )
594            }
595            Command::CreateStreamingJob { info, .. } => {
596                write!(f, "CreateStreamingJob: {}", info.streaming_job)
597            }
598            Command::RescheduleIntent {
599                reschedule_plan, ..
600            } => {
601                if reschedule_plan.is_some() {
602                    write!(f, "RescheduleIntent(planned)")
603                } else {
604                    write!(f, "RescheduleIntent")
605                }
606            }
607            Command::ReplaceStreamJob(plan) => {
608                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
609            }
610            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
611            Command::Throttle { .. } => write!(f, "Throttle"),
612            Command::CreateSubscription {
613                subscription_id, ..
614            } => write!(f, "CreateSubscription: {subscription_id}"),
615            Command::DropSubscription {
616                subscription_id, ..
617            } => write!(f, "DropSubscription: {subscription_id}"),
618            Command::AlterSubscriptionRetention {
619                subscription_id,
620                retention_second,
621                ..
622            } => write!(
623                f,
624                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
625            ),
626            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
627            Command::Refresh {
628                table_id,
629                associated_source_id,
630            } => write!(
631                f,
632                "Refresh: {} (source: {})",
633                table_id, associated_source_id
634            ),
635            Command::ListFinish {
636                table_id,
637                associated_source_id,
638            } => write!(
639                f,
640                "ListFinish: {} (source: {})",
641                table_id, associated_source_id
642            ),
643            Command::LoadFinish {
644                table_id,
645                associated_source_id,
646            } => write!(
647                f,
648                "LoadFinish: {} (source: {})",
649                table_id, associated_source_id
650            ),
651            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
652            Command::ResumeBackfill { target } => match target {
653                ResumeBackfillTarget::Job(job_id) => {
654                    write!(f, "ResumeBackfill: job={job_id}")
655                }
656                ResumeBackfillTarget::Fragment(fragment_id) => {
657                    write!(f, "ResumeBackfill: fragment={fragment_id}")
658                }
659            },
660            Command::InjectSourceOffsets {
661                source_id,
662                split_offsets,
663            } => write!(
664                f,
665                "InjectSourceOffsets: {} ({} splits)",
666                source_id,
667                split_offsets.len()
668            ),
669        }
670    }
671}
672
673impl Command {
674    pub fn pause() -> Self {
675        Self::Pause
676    }
677
678    pub fn resume() -> Self {
679        Self::Resume
680    }
681
682    pub fn need_checkpoint(&self) -> bool {
683        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
684        !matches!(self, Command::Resume)
685    }
686}
687
688#[derive(Debug)]
689pub enum PostCollectCommand {
690    Command(String),
691    DropStreamingJobs,
692    CreateStreamingJob {
693        info: CreateStreamingJobCommandInfo,
694        job_type: CreateStreamingJobType,
695        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
696        resolved_split_assignment: SplitAssignment,
697    },
698    Reschedule {
699        reschedules: HashMap<FragmentId, Reschedule>,
700    },
701    ReplaceStreamJob {
702        plan: ReplaceStreamJobPlan,
703        resolved_split_assignment: SplitAssignment,
704    },
705    SourceChangeSplit {
706        split_assignment: SplitAssignment,
707    },
708    CreateSubscription {
709        subscription_id: SubscriptionId,
710    },
711    ConnectorPropsChange(ConnectorPropsChange),
712    ResumeBackfill {
713        target: ResumeBackfillTarget,
714    },
715}
716
717impl PostCollectCommand {
718    pub fn barrier() -> Self {
719        PostCollectCommand::Command("barrier".to_owned())
720    }
721
722    pub fn should_checkpoint(&self) -> bool {
723        match self {
724            PostCollectCommand::DropStreamingJobs
725            | PostCollectCommand::CreateStreamingJob { .. }
726            | PostCollectCommand::Reschedule { .. }
727            | PostCollectCommand::ReplaceStreamJob { .. }
728            | PostCollectCommand::SourceChangeSplit { .. }
729            | PostCollectCommand::CreateSubscription { .. }
730            | PostCollectCommand::ConnectorPropsChange(_)
731            | PostCollectCommand::ResumeBackfill { .. } => true,
732            PostCollectCommand::Command(_) => false,
733        }
734    }
735
736    pub fn command_name(&self) -> &str {
737        match self {
738            PostCollectCommand::Command(name) => name.as_str(),
739            PostCollectCommand::DropStreamingJobs => "DropStreamingJobs",
740            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
741            PostCollectCommand::Reschedule { .. } => "Reschedule",
742            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
743            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
744            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
745            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
746            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
747        }
748    }
749}
750
751impl Display for PostCollectCommand {
752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753        f.write_str(self.command_name())
754    }
755}
756
757#[derive(Debug, Clone, PartialEq, Eq)]
758pub enum BarrierKind {
759    Initial,
760    Barrier,
761    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
762    Checkpoint(Vec<u64>),
763}
764
765impl BarrierKind {
766    pub fn to_protobuf(&self) -> PbBarrierKind {
767        match self {
768            BarrierKind::Initial => PbBarrierKind::Initial,
769            BarrierKind::Barrier => PbBarrierKind::Barrier,
770            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
771        }
772    }
773
774    pub fn is_checkpoint(&self) -> bool {
775        matches!(self, BarrierKind::Checkpoint(_))
776    }
777
778    pub fn is_initial(&self) -> bool {
779        matches!(self, BarrierKind::Initial)
780    }
781
782    pub fn as_str_name(&self) -> &'static str {
783        match self {
784            BarrierKind::Initial => "Initial",
785            BarrierKind::Barrier => "Barrier",
786            BarrierKind::Checkpoint(_) => "Checkpoint",
787        }
788    }
789}
790
791fn sink_original_schema_fields(columns: &[PbColumnCatalog]) -> Vec<PbField> {
792    columns
793        .iter()
794        .filter(|col| !col.is_hidden)
795        .map(|col| {
796            let column_desc = col
797                .column_desc
798                .as_ref()
799                .expect("sink column catalog should have a column descriptor");
800            PbField {
801                data_type: Some(
802                    column_desc
803                        .column_type
804                        .as_ref()
805                        .expect("sink column descriptor should have a column type")
806                        .clone(),
807                ),
808                name: column_desc.name.clone(),
809            }
810        })
811        .collect()
812}
813
814impl BarrierInfo {
815    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
816        let Some(truncate_timestamptz) = Timestamptz::from_secs(
817            self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
818        ) else {
819            warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
820            return self.prev_epoch.value();
821        };
822        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
823    }
824}
825
826impl Command {
827    pub(super) fn collect_commit_epoch_info(
828        database_info: &InflightDatabaseInfo,
829        barrier_info: &PartialGraphBarrierInfo,
830        task: &mut CompleteBarrierTask,
831        resps: Vec<BarrierCompleteResponse>,
832        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
833    ) {
834        let (
835            sst_to_context,
836            synced_ssts,
837            new_table_watermarks,
838            old_value_ssts,
839            vector_index_adds,
840            truncate_tables,
841            iceberg_v3_sink_metadata,
842        ) = collect_resp_info(resps);
843
844        let new_table_fragment_infos =
845            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
846                &barrier_info.post_collect_command
847            {
848                assert!(!matches!(
849                    job_type,
850                    CreateStreamingJobType::SnapshotBackfill(_)
851                        | CreateStreamingJobType::BatchRefresh(_)
852                ));
853                let table_fragments = &info.stream_job_fragments;
854                let mut table_ids: HashSet<_> =
855                    table_fragments.internal_table_ids().into_iter().collect();
856                if let Some(mv_table_id) = table_fragments.mv_table_id() {
857                    table_ids.insert(mv_table_id);
858                }
859
860                vec![NewTableFragmentInfo { table_ids }]
861            } else {
862                vec![]
863            };
864
865        let mut mv_log_store_truncate_epoch = HashMap::new();
866        // TODO: may collect cross db snapshot backfill
867        let mut update_truncate_epoch =
868            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
869                Entry::Occupied(mut entry) => {
870                    let prev_truncate_epoch = entry.get_mut();
871                    if truncate_epoch < *prev_truncate_epoch {
872                        *prev_truncate_epoch = truncate_epoch;
873                    }
874                }
875                Entry::Vacant(entry) => {
876                    entry.insert(truncate_epoch);
877                }
878            };
879        for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
880            let truncate_epoch = barrier_info
881                .barrier_info
882                .get_truncate_epoch(max_retention)
883                .0;
884            update_truncate_epoch(mv_table_id, truncate_epoch);
885        }
886        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
887            for mv_table_id in upstream_mv_table_ids {
888                update_truncate_epoch(mv_table_id, backfill_epoch);
889            }
890        }
891
892        let table_new_change_log = build_table_change_log_delta(
893            old_value_ssts.into_iter(),
894            synced_ssts.iter().map(|sst| &sst.sst_info),
895            must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
896            mv_log_store_truncate_epoch.into_iter(),
897        );
898
899        let epoch = barrier_info.barrier_info.prev_epoch();
900        let info = &mut task.commit_info;
901        for table_id in &barrier_info.table_ids_to_commit {
902            info.tables_to_commit
903                .try_insert(*table_id, epoch)
904                .expect("non duplicate");
905        }
906
907        info.sstables.extend(synced_ssts);
908        info.new_table_watermarks.extend(new_table_watermarks);
909        info.sst_to_context.extend(sst_to_context);
910        info.new_table_fragment_infos
911            .extend(new_table_fragment_infos);
912        info.change_log_delta.extend(table_new_change_log);
913        for (table_id, vector_index_adds) in vector_index_adds {
914            info.vector_index_delta
915                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
916                .expect("non-duplicate");
917        }
918        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
919            &barrier_info.post_collect_command
920            && let Some(index_table) = collect_new_vector_index_info(job_info)
921        {
922            info.vector_index_delta
923                .try_insert(
924                    index_table.id,
925                    VectorIndexDelta::Init(PbVectorIndexInit {
926                        info: Some(index_table.vector_index_info.unwrap()),
927                    }),
928                )
929                .expect("non-duplicate");
930        }
931        info.truncate_tables.extend(truncate_tables);
932        task.iceberg_v3_sink_metadata
933            .extend(iceberg_v3_sink_metadata);
934    }
935}
936
937impl Command {
938    /// Build the `Pause` mutation.
939    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
940        {
941            {
942                // Only pause when the cluster is not already paused.
943                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
944                if !is_currently_paused {
945                    Some(Mutation::Pause(PauseMutation {}))
946                } else {
947                    None
948                }
949            }
950        }
951    }
952
953    /// Build the `Resume` mutation.
954    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
955        {
956            {
957                // Only resume when the cluster is paused with the same reason.
958                if is_currently_paused {
959                    Some(Mutation::Resume(ResumeMutation {}))
960                } else {
961                    None
962                }
963            }
964        }
965    }
966
967    /// Build the `Splits` mutation for `SourceChangeSplit`.
968    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
969        {
970            {
971                let mut diff = HashMap::new();
972
973                for actor_splits in split_assignment.values() {
974                    diff.extend(actor_splits.clone());
975                }
976
977                Mutation::Splits(SourceChangeSplitMutation {
978                    actor_splits: build_actor_connector_splits(&diff),
979                })
980            }
981        }
982    }
983
984    /// Build the `Throttle` mutation.
985    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
986        {
987            {
988                let config = config.clone();
989                Mutation::Throttle(ThrottleMutation {
990                    fragment_throttle: config,
991                })
992            }
993        }
994    }
995
996    /// Build the `Stop` mutation for `DropStreamingJobs`.
997    pub(super) fn drop_streaming_jobs_to_mutation(
998        actors: &Vec<ActorId>,
999        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
1000    ) -> Mutation {
1001        {
1002            Mutation::Stop(StopMutation {
1003                actors: actors.clone(),
1004                dropped_sink_fragments: dropped_sink_fragment_by_targets
1005                    .values()
1006                    .flatten()
1007                    .cloned()
1008                    .collect(),
1009            })
1010        }
1011    }
1012
1013    /// Build the `Add` mutation for `CreateStreamingJob`.
1014    pub(super) fn create_streaming_job_to_mutation(
1015        info: &CreateStreamingJobCommandInfo,
1016        job_type: &CreateStreamingJobType,
1017        is_currently_paused: bool,
1018        edges: &mut FragmentEdgeBuildResult,
1019        control_stream_manager: &ControlStreamManager,
1020        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1021        split_assignment: &SplitAssignment,
1022        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1023        actor_location: &HashMap<ActorId, WorkerId>,
1024    ) -> MetaResult<Mutation> {
1025        {
1026            {
1027                let CreateStreamingJobCommandInfo {
1028                    stream_job_fragments,
1029                    upstream_fragment_downstreams,
1030                    fragment_backfill_ordering,
1031                    streaming_job,
1032                    ..
1033                } = info;
1034                let database_id = streaming_job.database_id();
1035                let added_actors: Vec<ActorId> = stream_actors
1036                    .values()
1037                    .flatten()
1038                    .map(|actor| actor.actor_id)
1039                    .collect();
1040                let actor_splits = split_assignment
1041                    .values()
1042                    .flat_map(build_actor_connector_splits)
1043                    .collect();
1044                let subscriptions_to_add =
1045                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
1046                    | CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
1047                        snapshot_backfill_info,
1048                        ..
1049                    }) = job_type
1050                    {
1051                        snapshot_backfill_info
1052                            .upstream_mv_table_id_to_backfill_epoch
1053                            .keys()
1054                            .map(|table_id| SubscriptionUpstreamInfo {
1055                                subscriber_id: stream_job_fragments
1056                                    .stream_job_id()
1057                                    .as_subscriber_id(),
1058                                upstream_mv_table_id: *table_id,
1059                            })
1060                            .collect()
1061                    } else {
1062                        Default::default()
1063                    };
1064                let backfill_nodes_to_pause: Vec<_> =
1065                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1066                        .into_iter()
1067                        .collect();
1068
1069                let new_upstream_sinks =
1070                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1071                        sink_fragment_id,
1072                        sink_output_fields,
1073                        project_exprs,
1074                        new_sink_downstream,
1075                        ..
1076                    }) = job_type
1077                    {
1078                        let new_sink_actors = stream_actors
1079                            .get(sink_fragment_id)
1080                            .unwrap_or_else(|| {
1081                                panic!("upstream sink fragment {sink_fragment_id} not exist")
1082                            })
1083                            .iter()
1084                            .map(|actor| {
1085                                let worker_id = actor_location[&actor.actor_id];
1086                                PbActorInfo {
1087                                    actor_id: actor.actor_id,
1088                                    host: Some(control_stream_manager.host_addr(worker_id)),
1089                                    partial_graph_id: to_partial_graph_id(database_id, None),
1090                                }
1091                            });
1092                        let new_upstream_sink = PbNewUpstreamSink {
1093                            info: Some(PbUpstreamSinkInfo {
1094                                upstream_fragment_id: *sink_fragment_id,
1095                                sink_output_schema: sink_output_fields.clone(),
1096                                project_exprs: project_exprs.clone(),
1097                            }),
1098                            upstream_actors: new_sink_actors.collect(),
1099                        };
1100                        HashMap::from([(
1101                            new_sink_downstream.downstream_fragment_id,
1102                            new_upstream_sink,
1103                        )])
1104                    } else {
1105                        HashMap::new()
1106                    };
1107
1108                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1109                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1110
1111                let add_mutation = AddMutation {
1112                    actor_dispatchers: edges
1113                        .dispatchers
1114                        .extract_if(|fragment_id, _| {
1115                            upstream_fragment_downstreams.contains_key(fragment_id)
1116                        })
1117                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1118                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1119                        .collect(),
1120                    added_actors,
1121                    actor_splits,
1122                    // If the cluster is already paused, the new actors should be paused too.
1123                    pause: is_currently_paused,
1124                    subscriptions_to_add,
1125                    backfill_nodes_to_pause,
1126                    actor_cdc_table_snapshot_splits,
1127                    new_upstream_sinks,
1128                };
1129
1130                Ok(Mutation::Add(add_mutation))
1131            }
1132        }
1133    }
1134
1135    /// Build the `Update` mutation for `ReplaceStreamJob`.
1136    pub(super) fn replace_stream_job_to_mutation(
1137        ReplaceStreamJobPlan {
1138            old_fragments,
1139            replace_upstream,
1140            upstream_fragment_downstreams,
1141            auto_refresh_schema_sinks,
1142            ..
1143        }: &ReplaceStreamJobPlan,
1144        edges: &mut FragmentEdgeBuildResult,
1145        database_info: &mut InflightDatabaseInfo,
1146        split_assignment: &SplitAssignment,
1147    ) -> MetaResult<Option<Mutation>> {
1148        {
1149            {
1150                let merge_updates = edges
1151                    .merge_updates
1152                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1153                    .collect();
1154                let dispatchers = edges
1155                    .dispatchers
1156                    .extract_if(|fragment_id, _| {
1157                        upstream_fragment_downstreams.contains_key(fragment_id)
1158                    })
1159                    .collect();
1160                let actor_cdc_table_snapshot_splits = database_info
1161                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1162                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1163                let old_fragments = old_fragments.fragments.keys().copied();
1164                let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1165                    .as_ref()
1166                    .into_iter()
1167                    .flat_map(|sinks| sinks.iter())
1168                    .map(|sink| sink.original_fragment.fragment_id);
1169                Ok(Self::generate_update_mutation_for_replace_table(
1170                    old_fragments
1171                        .chain(auto_refresh_sink_fragment_ids)
1172                        .flat_map(|fragment_id| {
1173                            database_info.fragment(fragment_id).actors.keys().copied()
1174                        }),
1175                    merge_updates,
1176                    dispatchers,
1177                    split_assignment,
1178                    actor_cdc_table_snapshot_splits,
1179                    auto_refresh_schema_sinks.as_ref(),
1180                ))
1181            }
1182        }
1183    }
1184
1185    /// Build the `Update` mutation for `RescheduleIntent`.
1186    pub(super) fn reschedule_to_mutation(
1187        reschedules: &HashMap<FragmentId, Reschedule>,
1188        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1189        control_stream_manager: &ControlStreamManager,
1190        database_info: &mut InflightDatabaseInfo,
1191    ) -> MetaResult<Option<Mutation>> {
1192        {
1193            {
1194                let database_id = database_info.database_id;
1195                let mut dispatcher_update = HashMap::new();
1196                for reschedule in reschedules.values() {
1197                    for &(upstream_fragment_id, dispatcher_id) in
1198                        &reschedule.upstream_fragment_dispatcher_ids
1199                    {
1200                        // Find the actors of the upstream fragment.
1201                        let upstream_actor_ids = fragment_actors
1202                            .get(&upstream_fragment_id)
1203                            .expect("should contain");
1204
1205                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1206
1207                        // Record updates for all actors.
1208                        for &actor_id in upstream_actor_ids {
1209                            let added_downstream_actor_id = if upstream_reschedule
1210                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1211                                .unwrap_or(true)
1212                            {
1213                                reschedule
1214                                    .added_actors
1215                                    .values()
1216                                    .flatten()
1217                                    .cloned()
1218                                    .collect()
1219                            } else {
1220                                Default::default()
1221                            };
1222                            // Index with the dispatcher id to check duplicates.
1223                            dispatcher_update
1224                                .try_insert(
1225                                    (actor_id, dispatcher_id),
1226                                    DispatcherUpdate {
1227                                        actor_id,
1228                                        dispatcher_id,
1229                                        hash_mapping: reschedule
1230                                            .upstream_dispatcher_mapping
1231                                            .as_ref()
1232                                            .map(|m| m.to_protobuf()),
1233                                        added_downstream_actor_id,
1234                                        removed_downstream_actor_id: reschedule
1235                                            .removed_actors
1236                                            .iter()
1237                                            .cloned()
1238                                            .collect(),
1239                                    },
1240                                )
1241                                .unwrap();
1242                        }
1243                    }
1244                }
1245                let dispatcher_update = dispatcher_update.into_values().collect();
1246
1247                let mut merge_update = HashMap::new();
1248                for (&fragment_id, reschedule) in reschedules {
1249                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1250                        // Find the actors of the downstream fragment.
1251                        let downstream_actor_ids = fragment_actors
1252                            .get(&downstream_fragment_id)
1253                            .expect("should contain");
1254
1255                        // Downstream removed actors should be skipped
1256                        // Newly created actors of the current fragment will not dispatch Update
1257                        // barriers to them
1258                        let downstream_removed_actors: HashSet<_> = reschedules
1259                            .get(&downstream_fragment_id)
1260                            .map(|downstream_reschedule| {
1261                                downstream_reschedule
1262                                    .removed_actors
1263                                    .iter()
1264                                    .copied()
1265                                    .collect()
1266                            })
1267                            .unwrap_or_default();
1268
1269                        // Record updates for all actors.
1270                        for &actor_id in downstream_actor_ids {
1271                            if downstream_removed_actors.contains(&actor_id) {
1272                                continue;
1273                            }
1274
1275                            // Index with the fragment id to check duplicates.
1276                            merge_update
1277                                .try_insert(
1278                                    (actor_id, fragment_id),
1279                                    MergeUpdate {
1280                                        actor_id,
1281                                        upstream_fragment_id: fragment_id,
1282                                        new_upstream_fragment_id: None,
1283                                        added_upstream_actors: reschedule
1284                                            .added_actors
1285                                            .iter()
1286                                            .flat_map(|(worker_id, actors)| {
1287                                                let host =
1288                                                    control_stream_manager.host_addr(*worker_id);
1289                                                actors.iter().map(move |&actor_id| PbActorInfo {
1290                                                    actor_id,
1291                                                    host: Some(host.clone()),
1292                                                    // we assume that we only scale the partial graph of database
1293                                                    partial_graph_id: to_partial_graph_id(
1294                                                        database_id,
1295                                                        None,
1296                                                    ),
1297                                                })
1298                                            })
1299                                            .collect(),
1300                                        removed_upstream_actor_id: reschedule
1301                                            .removed_actors
1302                                            .iter()
1303                                            .cloned()
1304                                            .collect(),
1305                                    },
1306                                )
1307                                .unwrap();
1308                        }
1309                    }
1310                }
1311                let merge_update = merge_update.into_values().collect();
1312
1313                let mut actor_vnode_bitmap_update = HashMap::new();
1314                for reschedule in reschedules.values() {
1315                    // Record updates for all actors in this fragment.
1316                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1317                        let bitmap = bitmap.to_protobuf();
1318                        actor_vnode_bitmap_update
1319                            .try_insert(actor_id, bitmap)
1320                            .unwrap();
1321                    }
1322                }
1323                let dropped_actors = reschedules
1324                    .values()
1325                    .flat_map(|r| r.removed_actors.iter().copied())
1326                    .collect();
1327                let mut actor_splits = HashMap::new();
1328                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1329                for (fragment_id, reschedule) in reschedules {
1330                    for (actor_id, splits) in &reschedule.actor_splits {
1331                        actor_splits.insert(
1332                            *actor_id,
1333                            ConnectorSplits {
1334                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1335                            },
1336                        );
1337                    }
1338
1339                    if let Some(assignment) =
1340                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1341                    {
1342                        actor_cdc_table_snapshot_splits.extend(assignment)
1343                    }
1344                }
1345
1346                // we don't create dispatchers in reschedule scenario
1347                let actor_new_dispatchers = HashMap::new();
1348                let mutation = Mutation::Update(UpdateMutation {
1349                    dispatcher_update,
1350                    merge_update,
1351                    actor_vnode_bitmap_update,
1352                    dropped_actors,
1353                    actor_splits,
1354                    actor_new_dispatchers,
1355                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1356                        splits: actor_cdc_table_snapshot_splits,
1357                    }),
1358                    sink_schema_change: Default::default(),
1359                    subscriptions_to_drop: vec![],
1360                });
1361                tracing::debug!("update mutation: {mutation:?}");
1362                Ok(Some(mutation))
1363            }
1364        }
1365    }
1366
1367    /// Build the `Add` mutation for `CreateSubscription`.
1368    pub(super) fn create_subscription_to_mutation(
1369        upstream_mv_table_id: TableId,
1370        subscription_id: SubscriptionId,
1371    ) -> Mutation {
1372        {
1373            Mutation::Add(AddMutation {
1374                actor_dispatchers: Default::default(),
1375                added_actors: vec![],
1376                actor_splits: Default::default(),
1377                pause: false,
1378                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1379                    upstream_mv_table_id,
1380                    subscriber_id: subscription_id.as_subscriber_id(),
1381                }],
1382                backfill_nodes_to_pause: vec![],
1383                actor_cdc_table_snapshot_splits: None,
1384                new_upstream_sinks: Default::default(),
1385            })
1386        }
1387    }
1388
1389    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1390    pub(super) fn drop_subscription_to_mutation(
1391        upstream_mv_table_id: TableId,
1392        subscription_id: SubscriptionId,
1393    ) -> Mutation {
1394        {
1395            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1396                info: vec![SubscriptionUpstreamInfo {
1397                    subscriber_id: subscription_id.as_subscriber_id(),
1398                    upstream_mv_table_id,
1399                }],
1400            })
1401        }
1402    }
1403
1404    /// Build the `ConnectorPropsChange` mutation.
1405    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1406        {
1407            {
1408                let mut connector_props_infos = HashMap::default();
1409                for (k, v) in config {
1410                    connector_props_infos.insert(
1411                        k.as_raw_id(),
1412                        ConnectorPropsInfo {
1413                            connector_props_info: v.clone(),
1414                        },
1415                    );
1416                }
1417                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1418                    connector_props_infos,
1419                })
1420            }
1421        }
1422    }
1423
1424    /// Build the `RefreshStart` mutation.
1425    pub(super) fn refresh_to_mutation(
1426        table_id: TableId,
1427        associated_source_id: SourceId,
1428    ) -> Mutation {
1429        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1430            table_id,
1431            associated_source_id,
1432        })
1433    }
1434
1435    /// Build the `ListFinish` mutation.
1436    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1437        Mutation::ListFinish(ListFinishMutation {
1438            associated_source_id,
1439        })
1440    }
1441
1442    /// Build the `LoadFinish` mutation.
1443    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1444        Mutation::LoadFinish(LoadFinishMutation {
1445            associated_source_id,
1446        })
1447    }
1448
1449    /// Build the `ResetSource` mutation.
1450    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1451        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1452            source_id: source_id.as_raw_id(),
1453        })
1454    }
1455
1456    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1457    pub(super) fn resume_backfill_to_mutation(
1458        target: &ResumeBackfillTarget,
1459        database_info: &InflightDatabaseInfo,
1460    ) -> MetaResult<Option<Mutation>> {
1461        {
1462            {
1463                let fragment_ids: HashSet<_> = match target {
1464                    ResumeBackfillTarget::Job(job_id) => {
1465                        database_info.backfill_fragment_ids_for_job(*job_id)?
1466                    }
1467                    ResumeBackfillTarget::Fragment(fragment_id) => {
1468                        if !database_info.is_backfill_fragment(*fragment_id)? {
1469                            return Err(MetaError::invalid_parameter(format!(
1470                                "fragment {} is not a backfill node",
1471                                fragment_id
1472                            )));
1473                        }
1474                        HashSet::from([*fragment_id])
1475                    }
1476                };
1477                if fragment_ids.is_empty() {
1478                    warn!(
1479                        ?target,
1480                        "resume backfill command ignored because no backfill fragments found"
1481                    );
1482                    Ok(None)
1483                } else {
1484                    Ok(Some(Mutation::StartFragmentBackfill(
1485                        StartFragmentBackfillMutation {
1486                            fragment_ids: fragment_ids.into_iter().collect(),
1487                        },
1488                    )))
1489                }
1490            }
1491        }
1492    }
1493
1494    /// Build the `InjectSourceOffsets` mutation.
1495    pub(super) fn inject_source_offsets_to_mutation(
1496        source_id: SourceId,
1497        split_offsets: &HashMap<String, String>,
1498    ) -> Mutation {
1499        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1500            source_id: source_id.as_raw_id(),
1501            split_offsets: split_offsets.clone(),
1502        })
1503    }
1504
1505    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1506    pub(super) fn create_streaming_job_actors_to_create(
1507        info: &CreateStreamingJobCommandInfo,
1508        edges: &mut FragmentEdgeBuildResult,
1509        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1510        actor_location: &HashMap<ActorId, WorkerId>,
1511    ) -> StreamJobActorsToCreate {
1512        {
1513            {
1514                edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1515                    |fragment| {
1516                        let actors = stream_actors
1517                            .get(&fragment.fragment_id)
1518                            .into_iter()
1519                            .flatten()
1520                            .map(|actor| (actor, actor_location[&actor.actor_id]));
1521                        (
1522                            fragment.fragment_id,
1523                            &fragment.nodes,
1524                            actors,
1525                            [], // no subscriber for new job to create
1526                        )
1527                    },
1528                ))
1529            }
1530        }
1531    }
1532
1533    /// Collect actors to create for `RescheduleIntent`.
1534    pub(super) fn reschedule_actors_to_create(
1535        reschedules: &HashMap<FragmentId, Reschedule>,
1536        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1537        database_info: &InflightDatabaseInfo,
1538        control_stream_manager: &ControlStreamManager,
1539    ) -> StreamJobActorsToCreate {
1540        {
1541            {
1542                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1543                    reschedules.iter().map(|(fragment_id, reschedule)| {
1544                        (
1545                            *fragment_id,
1546                            reschedule.newly_created_actors.values().map(
1547                                |((actor, dispatchers), _)| {
1548                                    (actor.actor_id, dispatchers.as_slice())
1549                                },
1550                            ),
1551                        )
1552                    }),
1553                    Some((reschedules, fragment_actors)),
1554                    database_info,
1555                    control_stream_manager,
1556                );
1557                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1558                for (fragment_id, (actor, dispatchers), worker_id) in
1559                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1560                        reschedule
1561                            .newly_created_actors
1562                            .values()
1563                            .map(|(actors, status)| (*fragment_id, actors, status))
1564                    })
1565                {
1566                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1567                    map.entry(*worker_id)
1568                        .or_default()
1569                        .entry(fragment_id)
1570                        .or_insert_with(|| {
1571                            let node = database_info.fragment(fragment_id).nodes.clone();
1572                            let subscribers =
1573                                database_info.fragment_subscribers(fragment_id).collect();
1574                            (node, vec![], subscribers)
1575                        })
1576                        .1
1577                        .push((actor.clone(), upstreams, dispatchers.clone()));
1578                }
1579                map
1580            }
1581        }
1582    }
1583
1584    /// Collect actors to create for `ReplaceStreamJob`.
1585    pub(super) fn replace_stream_job_actors_to_create(
1586        replace_table: &ReplaceStreamJobPlan,
1587        edges: &mut FragmentEdgeBuildResult,
1588        database_info: &InflightDatabaseInfo,
1589        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1590        actor_location: &HashMap<ActorId, WorkerId>,
1591    ) -> StreamJobActorsToCreate {
1592        {
1593            {
1594                let mut actors = edges.collect_actors_to_create(
1595                    replace_table
1596                        .new_fragments
1597                        .fragments
1598                        .values()
1599                        .map(|fragment| {
1600                            let actors = stream_actors
1601                                .get(&fragment.fragment_id)
1602                                .into_iter()
1603                                .flatten()
1604                                .map(|actor| (actor, actor_location[&actor.actor_id]));
1605                            (
1606                                fragment.fragment_id,
1607                                &fragment.nodes,
1608                                actors,
1609                                database_info
1610                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1611                            )
1612                        }),
1613                );
1614
1615                // Handle auto-refresh schema sinks
1616                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1617                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1618                        (
1619                            sink.new_fragment.fragment_id,
1620                            &sink.new_fragment.nodes,
1621                            stream_actors
1622                                .get(&sink.new_fragment.fragment_id)
1623                                .into_iter()
1624                                .flatten()
1625                                .map(|actor| (actor, actor_location[&actor.actor_id])),
1626                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1627                        )
1628                    }));
1629                    for (worker_id, fragment_actors) in sink_actors {
1630                        actors.entry(worker_id).or_default().extend(fragment_actors);
1631                    }
1632                }
1633                actors
1634            }
1635        }
1636    }
1637
1638    fn generate_update_mutation_for_replace_table(
1639        dropped_actors: impl IntoIterator<Item = ActorId>,
1640        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1641        dispatchers: FragmentActorDispatchers,
1642        split_assignment: &SplitAssignment,
1643        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1644        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1645    ) -> Option<Mutation> {
1646        let dropped_actors = dropped_actors.into_iter().collect();
1647
1648        let actor_new_dispatchers = dispatchers
1649            .into_values()
1650            .flatten()
1651            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1652            .collect();
1653
1654        let actor_splits = split_assignment
1655            .values()
1656            .flat_map(build_actor_connector_splits)
1657            .collect();
1658        Some(Mutation::Update(UpdateMutation {
1659            actor_new_dispatchers,
1660            merge_update: merge_updates.into_values().flatten().collect(),
1661            dropped_actors,
1662            actor_splits,
1663            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1664            sink_schema_change: auto_refresh_schema_sinks
1665                .as_ref()
1666                .into_iter()
1667                .flat_map(|sinks| {
1668                    sinks.iter().map(|sink| {
1669                        let op = if !sink.removed_column_names.is_empty() {
1670                            PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1671                                column_names: sink.removed_column_names.clone(),
1672                            })
1673                        } else {
1674                            PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1675                                fields: sink
1676                                    .newly_add_fields
1677                                    .iter()
1678                                    .map(|field| field.to_prost())
1679                                    .collect(),
1680                            })
1681                        };
1682                        (
1683                            sink.original_sink.id.as_raw_id(),
1684                            PbSinkSchemaChange {
1685                                original_schema: sink_original_schema_fields(
1686                                    &sink.original_sink.columns,
1687                                ),
1688                                op: Some(op),
1689                            },
1690                        )
1691                    })
1692                })
1693                .collect(),
1694            ..Default::default()
1695        }))
1696    }
1697}
1698
1699impl Command {
1700    #[expect(clippy::type_complexity)]
1701    pub(super) fn collect_database_partial_graph_actor_upstreams(
1702        actor_dispatchers: impl Iterator<
1703            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1704        >,
1705        reschedule_dispatcher_update: Option<(
1706            &HashMap<FragmentId, Reschedule>,
1707            &HashMap<FragmentId, HashSet<ActorId>>,
1708        )>,
1709        database_info: &InflightDatabaseInfo,
1710        control_stream_manager: &ControlStreamManager,
1711    ) -> HashMap<ActorId, ActorUpstreams> {
1712        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1713        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1714            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1715            for (upstream_actor_id, dispatchers) in upstream_actors {
1716                let upstream_actor_location =
1717                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1718                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1719                for downstream_actor_id in dispatchers
1720                    .iter()
1721                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1722                {
1723                    actor_upstreams
1724                        .entry(*downstream_actor_id)
1725                        .or_default()
1726                        .entry(upstream_fragment_id)
1727                        .or_default()
1728                        .insert(
1729                            upstream_actor_id,
1730                            PbActorInfo {
1731                                actor_id: upstream_actor_id,
1732                                host: Some(upstream_actor_host.clone()),
1733                                partial_graph_id: to_partial_graph_id(
1734                                    database_info.database_id,
1735                                    None,
1736                                ),
1737                            },
1738                        );
1739                }
1740            }
1741        }
1742        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1743            for reschedule in reschedules.values() {
1744                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1745                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1746                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1747                    for upstream_actor_id in fragment_actors
1748                        .get(upstream_fragment_id)
1749                        .expect("should exist")
1750                    {
1751                        let upstream_actor_location =
1752                            upstream_fragment.actors[upstream_actor_id].worker_id;
1753                        let upstream_actor_host =
1754                            control_stream_manager.host_addr(upstream_actor_location);
1755                        if let Some(upstream_reschedule) = upstream_reschedule
1756                            && upstream_reschedule
1757                                .removed_actors
1758                                .contains(upstream_actor_id)
1759                        {
1760                            continue;
1761                        }
1762                        for (_, downstream_actor_id) in
1763                            reschedule
1764                                .added_actors
1765                                .iter()
1766                                .flat_map(|(worker_id, actors)| {
1767                                    actors.iter().map(|actor| (*worker_id, *actor))
1768                                })
1769                        {
1770                            actor_upstreams
1771                                .entry(downstream_actor_id)
1772                                .or_default()
1773                                .entry(*upstream_fragment_id)
1774                                .or_default()
1775                                .insert(
1776                                    *upstream_actor_id,
1777                                    PbActorInfo {
1778                                        actor_id: *upstream_actor_id,
1779                                        host: Some(upstream_actor_host.clone()),
1780                                        partial_graph_id: to_partial_graph_id(
1781                                            database_info.database_id,
1782                                            None,
1783                                        ),
1784                                    },
1785                                );
1786                        }
1787                    }
1788                }
1789            }
1790        }
1791        actor_upstreams
1792    }
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797    use risingwave_pb::data::PbDataType;
1798    use risingwave_pb::data::data_type::PbTypeName;
1799    use risingwave_pb::plan_common::{ColumnCatalog as PbColumnCatalog, ColumnDesc};
1800
1801    use super::sink_original_schema_fields;
1802
1803    fn column(name: &str, type_name: PbTypeName, is_hidden: bool) -> PbColumnCatalog {
1804        PbColumnCatalog {
1805            column_desc: Some(ColumnDesc {
1806                column_type: Some(PbDataType {
1807                    type_name: type_name as i32,
1808                    ..Default::default()
1809                }),
1810                name: name.to_owned(),
1811                ..Default::default()
1812            }),
1813            is_hidden,
1814        }
1815    }
1816
1817    #[test]
1818    fn test_sink_original_schema_fields_skips_hidden_columns() {
1819        let columns = vec![
1820            column("k", PbTypeName::Int32, false),
1821            column("v", PbTypeName::Varchar, false),
1822            column("_row_id", PbTypeName::Serial, true),
1823        ];
1824
1825        let fields = sink_original_schema_fields(&columns);
1826        let field_names = fields
1827            .iter()
1828            .map(|field| field.name.as_str())
1829            .collect::<Vec<_>>();
1830
1831        assert_eq!(field_names, ["k", "v"]);
1832    }
1833}