risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37    PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49    PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50    StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51    UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
61use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
62use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
63use crate::controller::scale::LoadedFragmentContext;
64use crate::controller::utils::StreamingJobExtraInfo;
65use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
66use crate::manager::{StreamingJob, StreamingJobType};
67use crate::model::{
68    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
69    FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
70    StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
71};
72use crate::stream::{
73    AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
74    ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
75    build_actor_connector_splits,
76};
77use crate::{MetaError, MetaResult};
78
79/// [`Reschedule`] describes per-fragment changes in a resolved reschedule plan,
80/// used for actor scaling or migration.
81#[derive(Debug, Clone)]
82pub struct Reschedule {
83    /// Added actors in this fragment.
84    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86    /// Removed actors in this fragment.
87    pub removed_actors: HashSet<ActorId>,
88
89    /// Vnode bitmap updates for some actors in this fragment.
90    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
93    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94    /// New hash mapping of the upstream dispatcher to be updated.
95    ///
96    /// This field exists only when there's upstream fragment and the current fragment is
97    /// hash-sharded.
98    pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100    /// The downstream fragments of this fragment.
101    pub downstream_fragment_ids: Vec<FragmentId>,
102
103    /// Reassigned splits for source actors.
104    /// It becomes the `actor_splits` in [`UpdateMutation`].
105    /// `Source` and `SourceBackfill` are handled together here.
106    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111#[derive(Debug, Clone)]
112pub struct ReschedulePlan {
113    pub reschedules: HashMap<FragmentId, Reschedule>,
114    /// Should contain the actor ids in upstream and downstream fragments referenced by
115    /// `reschedules`.
116    pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
117}
118
119/// Preloaded context for rescheduling, built outside the barrier worker.
120#[derive(Debug, Clone)]
121pub struct RescheduleContext {
122    pub loaded: LoadedFragmentContext,
123    pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
124    pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
125    pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
126    pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
127}
128
129impl RescheduleContext {
130    pub fn empty() -> Self {
131        Self {
132            loaded: LoadedFragmentContext::default(),
133            job_extra_info: HashMap::new(),
134            upstream_fragments: HashMap::new(),
135            downstream_fragments: HashMap::new(),
136            downstream_relations: HashMap::new(),
137        }
138    }
139
140    pub fn is_empty(&self) -> bool {
141        self.loaded.is_empty()
142    }
143
144    pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
145        let loaded = self.loaded.for_database(database_id)?;
146        let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
147        // Use the filtered loaded context as the source of truth so every side map is pruned by
148        // the same fragment set.
149        let fragment_ids: HashSet<FragmentId> = loaded
150            .job_fragments
151            .values()
152            .flat_map(|fragments| fragments.keys().copied())
153            .collect();
154
155        let job_extra_info = self
156            .job_extra_info
157            .iter()
158            .filter(|(job_id, _)| job_ids.contains(*job_id))
159            .map(|(job_id, info)| (*job_id, info.clone()))
160            .collect();
161
162        let upstream_fragments = self
163            .upstream_fragments
164            .iter()
165            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
166            .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
167            .collect();
168
169        let downstream_fragments = self
170            .downstream_fragments
171            .iter()
172            .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
173            .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
174            .collect();
175
176        let downstream_relations = self
177            .downstream_relations
178            .iter()
179            // Ownership of this map is source-fragment based. We keep all downstream edges for
180            // selected sources because the target side can still be referenced during dispatcher
181            // reconstruction even if that target fragment is not being rescheduled.
182            .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
183            .map(|(key, relation)| (*key, relation.clone()))
184            .collect();
185
186        Some(Self {
187            loaded,
188            job_extra_info,
189            upstream_fragments,
190            downstream_fragments,
191            downstream_relations,
192        })
193    }
194
195    /// Split this context into per-database contexts without cloning the large loaded graph
196    /// payloads.
197    pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
198        let Self {
199            loaded,
200            job_extra_info,
201            upstream_fragments,
202            downstream_fragments,
203            downstream_relations,
204        } = self;
205
206        let mut contexts: HashMap<_, _> = loaded
207            .into_database_contexts()
208            .into_iter()
209            .map(|(database_id, loaded)| {
210                (
211                    database_id,
212                    Self {
213                        loaded,
214                        job_extra_info: HashMap::new(),
215                        upstream_fragments: HashMap::new(),
216                        downstream_fragments: HashMap::new(),
217                        downstream_relations: HashMap::new(),
218                    },
219                )
220            })
221            .collect();
222
223        if contexts.is_empty() {
224            return contexts;
225        }
226
227        let mut job_databases = HashMap::new();
228        let mut fragment_databases = HashMap::new();
229        for (&database_id, context) in &contexts {
230            for job_id in context.loaded.job_map.keys().copied() {
231                job_databases.insert(job_id, database_id);
232            }
233            for fragment_id in context
234                .loaded
235                .job_fragments
236                .values()
237                .flat_map(|fragments| fragments.keys().copied())
238            {
239                fragment_databases.insert(fragment_id, database_id);
240            }
241        }
242
243        for (job_id, info) in job_extra_info {
244            if let Some(database_id) = job_databases.get(&job_id).copied() {
245                contexts
246                    .get_mut(&database_id)
247                    .expect("database context should exist for job")
248                    .job_extra_info
249                    .insert(job_id, info);
250            }
251        }
252
253        for (fragment_id, upstreams) in upstream_fragments {
254            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
255                contexts
256                    .get_mut(&database_id)
257                    .expect("database context should exist for fragment")
258                    .upstream_fragments
259                    .insert(fragment_id, upstreams);
260            }
261        }
262
263        for (fragment_id, downstreams) in downstream_fragments {
264            if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
265                contexts
266                    .get_mut(&database_id)
267                    .expect("database context should exist for fragment")
268                    .downstream_fragments
269                    .insert(fragment_id, downstreams);
270            }
271        }
272
273        for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
274            // Route by source fragment ownership. A target may be outside of current reschedule
275            // set, but this edge still belongs to the source-side command.
276            if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
277                contexts
278                    .get_mut(&database_id)
279                    .expect("database context should exist for relation source")
280                    .downstream_relations
281                    .insert((source_fragment_id, target_fragment_id), relation);
282            }
283        }
284
285        contexts
286    }
287}
288
289/// Replacing an old job with a new one. All actors in the job will be rebuilt.
290///
291/// Current use cases:
292/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
293/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
294///   will replace a table job's plan.
295#[derive(Debug, Clone)]
296pub struct ReplaceStreamJobPlan {
297    pub old_fragments: StreamJobFragments,
298    pub new_fragments: StreamJobFragmentsToCreate,
299    /// The resource group of the database this job belongs to.
300    pub database_resource_group: String,
301    /// Downstream jobs of the replaced job need to update their `Merge` node to
302    /// connect to the new fragment.
303    pub replace_upstream: FragmentReplaceUpstream,
304    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
305    /// Split plan for the replace job. Determines how splits are resolved in Phase 2
306    /// inside the barrier worker.
307    pub split_plan: ReplaceJobSplitPlan,
308    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
309    pub streaming_job: StreamingJob,
310    /// The `streaming_job::Model` for this job, loaded from meta store.
311    pub streaming_job_model: streaming_job::Model,
312    /// The temporary dummy job fragments id of new table fragment
313    pub tmp_id: JobId,
314    /// The state table ids to be dropped.
315    pub to_drop_state_table_ids: Vec<TableId>,
316    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
317}
318
319impl ReplaceStreamJobPlan {
320    /// `old_fragment_id` -> `new_fragment_id`
321    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
322        let mut fragment_replacements = HashMap::new();
323        for (upstream_fragment_id, new_upstream_fragment_id) in
324            self.replace_upstream.values().flatten()
325        {
326            {
327                let r =
328                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
329                if let Some(r) = r {
330                    assert_eq!(
331                        *new_upstream_fragment_id, r,
332                        "one fragment is replaced by multiple fragments"
333                    );
334                }
335            }
336        }
337        fragment_replacements
338    }
339}
340
341#[derive(educe::Educe, Clone)]
342#[educe(Debug)]
343pub struct CreateStreamingJobCommandInfo {
344    #[educe(Debug(ignore))]
345    pub stream_job_fragments: StreamJobFragmentsToCreate,
346    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
347    /// The resource group of the database this job belongs to.
348    pub database_resource_group: String,
349    /// Source-level split assignment (Phase 1). Resolved to actor-level in the barrier worker.
350    pub init_split_assignment: SourceSplitAssignment,
351    pub definition: String,
352    pub job_type: StreamingJobType,
353    pub create_type: CreateType,
354    pub streaming_job: StreamingJob,
355    pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
356    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
357    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
358    pub is_serverless: bool,
359    /// The `streaming_job::Model` for this job, loaded from meta store.
360    pub streaming_job_model: streaming_job::Model,
361}
362
363impl StreamJobFragments {
364    /// Build fragment-level info for new fragments, populating actor infos from the
365    /// rendered actors and their locations, and applying split assignment to actor splits.
366    pub(super) fn new_fragment_info<'a>(
367        &'a self,
368        stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
369        actor_location: &'a HashMap<ActorId, WorkerId>,
370        assignment: &'a SplitAssignment,
371    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
372        self.fragments.values().map(|fragment| {
373            (
374                fragment.fragment_id,
375                InflightFragmentInfo {
376                    fragment_id: fragment.fragment_id,
377                    distribution_type: fragment.distribution_type.into(),
378                    fragment_type_mask: fragment.fragment_type_mask,
379                    vnode_count: fragment.vnode_count(),
380                    nodes: fragment.nodes.clone(),
381                    actors: stream_actors
382                        .get(&fragment.fragment_id)
383                        .into_iter()
384                        .flatten()
385                        .map(|actor| {
386                            (
387                                actor.actor_id,
388                                InflightActorInfo {
389                                    worker_id: actor_location[&actor.actor_id],
390                                    vnode_bitmap: actor.vnode_bitmap.clone(),
391                                    splits: assignment
392                                        .get(&fragment.fragment_id)
393                                        .and_then(|s| s.get(&actor.actor_id))
394                                        .cloned()
395                                        .unwrap_or_default(),
396                                },
397                            )
398                        })
399                        .collect(),
400                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
401                },
402            )
403        })
404    }
405}
406
407#[derive(Debug, Clone)]
408pub struct SnapshotBackfillInfo {
409    /// `table_id` -> `Some(snapshot_backfill_epoch)`
410    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
411    /// by global barrier worker when handling the command.
412    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
413}
414
415#[derive(Debug, Clone)]
416pub enum CreateStreamingJobType {
417    Normal,
418    SinkIntoTable(UpstreamSinkInfo),
419    SnapshotBackfill(SnapshotBackfillInfo),
420}
421
422/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
423/// it will build different barriers to send via corresponding `*_to_mutation` helpers,
424/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
425// FIXME: this enum is significantly large on stack, box it
426#[derive(Debug)]
427pub enum Command {
428    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
429    /// all messages before the checkpoint barrier should have been committed.
430    Flush,
431
432    /// `Pause` command generates a `Pause` barrier **only if**
433    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
434    Pause,
435
436    /// `Resume` command generates a `Resume` barrier **only
437    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
438    /// will be generated.
439    Resume,
440
441    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
442    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
443    /// dropped by reference counts before.
444    ///
445    /// Barriers from the actors to be dropped will STILL be collected.
446    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
447    /// drop actors, and then delete the job fragments info from meta store.
448    DropStreamingJobs {
449        streaming_job_ids: HashSet<JobId>,
450        unregistered_state_table_ids: HashSet<TableId>,
451        unregistered_fragment_ids: HashSet<FragmentId>,
452        // target_fragment -> [sink_fragments]
453        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
454    },
455
456    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
457    ///
458    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
459    /// be collected since the barrier should be passthrough.
460    ///
461    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
462    /// it adds the job fragments info to meta store. However, the creating progress will **last
463    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
464    /// will be set to `Created`.
465    CreateStreamingJob {
466        info: CreateStreamingJobCommandInfo,
467        job_type: CreateStreamingJobType,
468        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
469    },
470
471    /// Reschedule context. It must be resolved inside the barrier worker before injection.
472    RescheduleIntent {
473        context: RescheduleContext,
474        /// Filled by the barrier worker after resolving `context` against current worker topology.
475        ///
476        /// We keep unresolved `context` outside of checkpoint state and only materialize this
477        /// execution plan right before injection, then drop `context` to release memory earlier.
478        reschedule_plan: Option<ReschedulePlan>,
479    },
480
481    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
482    /// essentially switching the downstream of the old job fragments to the new ones, and
483    /// dropping the old job fragments. Used for schema change.
484    ///
485    /// This can be treated as a special case of reschedule, while the upstream fragment
486    /// of the Merge executors are changed additionally.
487    ReplaceStreamJob(ReplaceStreamJobPlan),
488
489    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
490    /// changed splits.
491    SourceChangeSplit(SplitState),
492
493    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
494    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
495    Throttle {
496        jobs: HashSet<JobId>,
497        config: HashMap<FragmentId, ThrottleConfig>,
498    },
499
500    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
501    /// materialize executor to start storing old value for subscription.
502    CreateSubscription {
503        subscription_id: SubscriptionId,
504        upstream_mv_table_id: TableId,
505        retention_second: u64,
506    },
507
508    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
509    /// materialize executor to stop storing old value when there is no
510    /// subscription depending on it.
511    DropSubscription {
512        subscription_id: SubscriptionId,
513        upstream_mv_table_id: TableId,
514    },
515
516    /// `AlterSubscriptionRetention` command updates the subscription retention time.
517    AlterSubscriptionRetention {
518        subscription_id: SubscriptionId,
519        upstream_mv_table_id: TableId,
520        retention_second: u64,
521    },
522
523    ConnectorPropsChange(ConnectorPropsChange),
524
525    /// `Refresh` command generates a barrier to refresh a table by truncating state
526    /// and reloading data from source.
527    Refresh {
528        table_id: TableId,
529        associated_source_id: SourceId,
530    },
531    ListFinish {
532        table_id: TableId,
533        associated_source_id: SourceId,
534    },
535    LoadFinish {
536        table_id: TableId,
537        associated_source_id: SourceId,
538    },
539
540    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
541    /// Used when upstream binlog/oplog has expired.
542    ResetSource {
543        source_id: SourceId,
544    },
545
546    /// `ResumeBackfill` command generates a `StartFragmentBackfill` barrier to force backfill
547    /// to resume for troubleshooting.
548    ResumeBackfill {
549        target: ResumeBackfillTarget,
550    },
551
552    /// `InjectSourceOffsets` command generates a barrier to inject specific offsets
553    /// into source splits (UNSAFE - admin only).
554    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
555    InjectSourceOffsets {
556        source_id: SourceId,
557        /// Split ID -> offset (JSON-encoded based on connector type)
558        split_offsets: HashMap<String, String>,
559    },
560}
561
562#[derive(Debug, Clone, Copy)]
563pub enum ResumeBackfillTarget {
564    Job(JobId),
565    Fragment(FragmentId),
566}
567
568// For debugging and observability purposes. Can add more details later if needed.
569impl std::fmt::Display for Command {
570    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
571        match self {
572            Command::Flush => write!(f, "Flush"),
573            Command::Pause => write!(f, "Pause"),
574            Command::Resume => write!(f, "Resume"),
575            Command::DropStreamingJobs {
576                streaming_job_ids, ..
577            } => {
578                write!(
579                    f,
580                    "DropStreamingJobs: {}",
581                    streaming_job_ids.iter().sorted().join(", ")
582                )
583            }
584            Command::CreateStreamingJob { info, .. } => {
585                write!(f, "CreateStreamingJob: {}", info.streaming_job)
586            }
587            Command::RescheduleIntent {
588                reschedule_plan, ..
589            } => {
590                if reschedule_plan.is_some() {
591                    write!(f, "RescheduleIntent(planned)")
592                } else {
593                    write!(f, "RescheduleIntent")
594                }
595            }
596            Command::ReplaceStreamJob(plan) => {
597                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
598            }
599            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
600            Command::Throttle { .. } => write!(f, "Throttle"),
601            Command::CreateSubscription {
602                subscription_id, ..
603            } => write!(f, "CreateSubscription: {subscription_id}"),
604            Command::DropSubscription {
605                subscription_id, ..
606            } => write!(f, "DropSubscription: {subscription_id}"),
607            Command::AlterSubscriptionRetention {
608                subscription_id,
609                retention_second,
610                ..
611            } => write!(
612                f,
613                "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
614            ),
615            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
616            Command::Refresh {
617                table_id,
618                associated_source_id,
619            } => write!(
620                f,
621                "Refresh: {} (source: {})",
622                table_id, associated_source_id
623            ),
624            Command::ListFinish {
625                table_id,
626                associated_source_id,
627            } => write!(
628                f,
629                "ListFinish: {} (source: {})",
630                table_id, associated_source_id
631            ),
632            Command::LoadFinish {
633                table_id,
634                associated_source_id,
635            } => write!(
636                f,
637                "LoadFinish: {} (source: {})",
638                table_id, associated_source_id
639            ),
640            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
641            Command::ResumeBackfill { target } => match target {
642                ResumeBackfillTarget::Job(job_id) => {
643                    write!(f, "ResumeBackfill: job={job_id}")
644                }
645                ResumeBackfillTarget::Fragment(fragment_id) => {
646                    write!(f, "ResumeBackfill: fragment={fragment_id}")
647                }
648            },
649            Command::InjectSourceOffsets {
650                source_id,
651                split_offsets,
652            } => write!(
653                f,
654                "InjectSourceOffsets: {} ({} splits)",
655                source_id,
656                split_offsets.len()
657            ),
658        }
659    }
660}
661
662impl Command {
663    pub fn pause() -> Self {
664        Self::Pause
665    }
666
667    pub fn resume() -> Self {
668        Self::Resume
669    }
670
671    pub fn need_checkpoint(&self) -> bool {
672        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
673        !matches!(self, Command::Resume)
674    }
675}
676
677#[derive(Debug)]
678pub enum PostCollectCommand {
679    Command(String),
680    DropStreamingJobs {
681        streaming_job_ids: HashSet<JobId>,
682        unregistered_state_table_ids: HashSet<TableId>,
683    },
684    CreateStreamingJob {
685        info: CreateStreamingJobCommandInfo,
686        job_type: CreateStreamingJobType,
687        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
688        resolved_split_assignment: SplitAssignment,
689    },
690    Reschedule {
691        reschedules: HashMap<FragmentId, Reschedule>,
692    },
693    ReplaceStreamJob {
694        plan: ReplaceStreamJobPlan,
695        resolved_split_assignment: SplitAssignment,
696    },
697    SourceChangeSplit {
698        split_assignment: SplitAssignment,
699    },
700    CreateSubscription {
701        subscription_id: SubscriptionId,
702    },
703    ConnectorPropsChange(ConnectorPropsChange),
704    ResumeBackfill {
705        target: ResumeBackfillTarget,
706    },
707}
708
709impl PostCollectCommand {
710    pub fn barrier() -> Self {
711        PostCollectCommand::Command("barrier".to_owned())
712    }
713
714    pub fn command_name(&self) -> &str {
715        match self {
716            PostCollectCommand::Command(name) => name.as_str(),
717            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
718            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
719            PostCollectCommand::Reschedule { .. } => "Reschedule",
720            PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
721            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
722            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
723            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
724            PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
725        }
726    }
727}
728
729impl Display for PostCollectCommand {
730    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731        f.write_str(self.command_name())
732    }
733}
734
735#[derive(Debug, Clone)]
736pub enum BarrierKind {
737    Initial,
738    Barrier,
739    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
740    Checkpoint(Vec<u64>),
741}
742
743impl BarrierKind {
744    pub fn to_protobuf(&self) -> PbBarrierKind {
745        match self {
746            BarrierKind::Initial => PbBarrierKind::Initial,
747            BarrierKind::Barrier => PbBarrierKind::Barrier,
748            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
749        }
750    }
751
752    pub fn is_checkpoint(&self) -> bool {
753        matches!(self, BarrierKind::Checkpoint(_))
754    }
755
756    pub fn is_initial(&self) -> bool {
757        matches!(self, BarrierKind::Initial)
758    }
759
760    pub fn as_str_name(&self) -> &'static str {
761        match self {
762            BarrierKind::Initial => "Initial",
763            BarrierKind::Barrier => "Barrier",
764            BarrierKind::Checkpoint(_) => "Checkpoint",
765        }
766    }
767}
768
769/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
770/// [`Command`].
771pub(super) struct CommandContext {
772    mv_subscription_max_retention: HashMap<TableId, u64>,
773
774    pub(super) barrier_info: BarrierInfo,
775
776    pub(super) table_ids_to_commit: HashSet<TableId>,
777
778    pub(super) command: PostCollectCommand,
779
780    /// The tracing span of this command.
781    ///
782    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
783    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
784    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
785    _span: tracing::Span,
786}
787
788impl std::fmt::Debug for CommandContext {
789    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
790        f.debug_struct("CommandContext")
791            .field("barrier_info", &self.barrier_info)
792            .field("command", &self.command.command_name())
793            .finish()
794    }
795}
796
797impl CommandContext {
798    pub(super) fn new(
799        barrier_info: BarrierInfo,
800        mv_subscription_max_retention: HashMap<TableId, u64>,
801        table_ids_to_commit: HashSet<TableId>,
802        command: PostCollectCommand,
803        span: tracing::Span,
804    ) -> Self {
805        Self {
806            mv_subscription_max_retention,
807            barrier_info,
808            table_ids_to_commit,
809            command,
810            _span: span,
811        }
812    }
813
814    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
815        let Some(truncate_timestamptz) = Timestamptz::from_secs(
816            self.barrier_info
817                .prev_epoch
818                .value()
819                .as_timestamptz()
820                .timestamp()
821                - retention_second as i64,
822        ) else {
823            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
824            return self.barrier_info.prev_epoch.value();
825        };
826        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
827    }
828
829    pub(super) fn collect_commit_epoch_info(
830        &self,
831        info: &mut CommitEpochInfo,
832        resps: Vec<BarrierCompleteResponse>,
833        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
834    ) {
835        let (
836            sst_to_context,
837            synced_ssts,
838            new_table_watermarks,
839            old_value_ssts,
840            vector_index_adds,
841            truncate_tables,
842        ) = collect_resp_info(resps);
843
844        let new_table_fragment_infos =
845            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
846                assert!(!matches!(
847                    job_type,
848                    CreateStreamingJobType::SnapshotBackfill(_)
849                ));
850                let table_fragments = &info.stream_job_fragments;
851                let mut table_ids: HashSet<_> =
852                    table_fragments.internal_table_ids().into_iter().collect();
853                if let Some(mv_table_id) = table_fragments.mv_table_id() {
854                    table_ids.insert(mv_table_id);
855                }
856
857                vec![NewTableFragmentInfo { table_ids }]
858            } else {
859                vec![]
860            };
861
862        let mut mv_log_store_truncate_epoch = HashMap::new();
863        // TODO: may collect cross db snapshot backfill
864        let mut update_truncate_epoch =
865            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
866                Entry::Occupied(mut entry) => {
867                    let prev_truncate_epoch = entry.get_mut();
868                    if truncate_epoch < *prev_truncate_epoch {
869                        *prev_truncate_epoch = truncate_epoch;
870                    }
871                }
872                Entry::Vacant(entry) => {
873                    entry.insert(truncate_epoch);
874                }
875            };
876        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
877            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
878            update_truncate_epoch(*mv_table_id, truncate_epoch);
879        }
880        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
881            for mv_table_id in upstream_mv_table_ids {
882                update_truncate_epoch(mv_table_id, backfill_epoch);
883            }
884        }
885
886        let table_new_change_log = build_table_change_log_delta(
887            old_value_ssts.into_iter(),
888            synced_ssts.iter().map(|sst| &sst.sst_info),
889            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
890            mv_log_store_truncate_epoch.into_iter(),
891        );
892
893        let epoch = self.barrier_info.prev_epoch();
894        for table_id in &self.table_ids_to_commit {
895            info.tables_to_commit
896                .try_insert(*table_id, epoch)
897                .expect("non duplicate");
898        }
899
900        info.sstables.extend(synced_ssts);
901        info.new_table_watermarks.extend(new_table_watermarks);
902        info.sst_to_context.extend(sst_to_context);
903        info.new_table_fragment_infos
904            .extend(new_table_fragment_infos);
905        info.change_log_delta.extend(table_new_change_log);
906        for (table_id, vector_index_adds) in vector_index_adds {
907            info.vector_index_delta
908                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
909                .expect("non-duplicate");
910        }
911        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
912            && let Some(index_table) = collect_new_vector_index_info(job_info)
913        {
914            info.vector_index_delta
915                .try_insert(
916                    index_table.id,
917                    VectorIndexDelta::Init(PbVectorIndexInit {
918                        info: Some(index_table.vector_index_info.unwrap()),
919                    }),
920                )
921                .expect("non-duplicate");
922        }
923        info.truncate_tables.extend(truncate_tables);
924    }
925}
926
927impl Command {
928    /// Build the `Pause` mutation.
929    pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
930        {
931            {
932                // Only pause when the cluster is not already paused.
933                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
934                if !is_currently_paused {
935                    Some(Mutation::Pause(PauseMutation {}))
936                } else {
937                    None
938                }
939            }
940        }
941    }
942
943    /// Build the `Resume` mutation.
944    pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
945        {
946            {
947                // Only resume when the cluster is paused with the same reason.
948                if is_currently_paused {
949                    Some(Mutation::Resume(ResumeMutation {}))
950                } else {
951                    None
952                }
953            }
954        }
955    }
956
957    /// Build the `Splits` mutation for `SourceChangeSplit`.
958    pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
959        {
960            {
961                let mut diff = HashMap::new();
962
963                for actor_splits in split_assignment.values() {
964                    diff.extend(actor_splits.clone());
965                }
966
967                Mutation::Splits(SourceChangeSplitMutation {
968                    actor_splits: build_actor_connector_splits(&diff),
969                })
970            }
971        }
972    }
973
974    /// Build the `Throttle` mutation.
975    pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
976        {
977            {
978                let config = config.clone();
979                Mutation::Throttle(ThrottleMutation {
980                    fragment_throttle: config,
981                })
982            }
983        }
984    }
985
986    /// Build the `Stop` mutation for `DropStreamingJobs`.
987    pub(super) fn drop_streaming_jobs_to_mutation(
988        actors: &Vec<ActorId>,
989        dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
990    ) -> Mutation {
991        {
992            Mutation::Stop(StopMutation {
993                actors: actors.clone(),
994                dropped_sink_fragments: dropped_sink_fragment_by_targets
995                    .values()
996                    .flatten()
997                    .cloned()
998                    .collect(),
999            })
1000        }
1001    }
1002
1003    /// Build the `Add` mutation for `CreateStreamingJob`.
1004    pub(super) fn create_streaming_job_to_mutation(
1005        info: &CreateStreamingJobCommandInfo,
1006        job_type: &CreateStreamingJobType,
1007        is_currently_paused: bool,
1008        edges: &mut FragmentEdgeBuildResult,
1009        control_stream_manager: &ControlStreamManager,
1010        actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1011        split_assignment: &SplitAssignment,
1012        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1013        actor_location: &HashMap<ActorId, WorkerId>,
1014    ) -> MetaResult<Mutation> {
1015        {
1016            {
1017                let CreateStreamingJobCommandInfo {
1018                    stream_job_fragments,
1019                    upstream_fragment_downstreams,
1020                    fragment_backfill_ordering,
1021                    streaming_job,
1022                    ..
1023                } = info;
1024                let database_id = streaming_job.database_id();
1025                let added_actors: Vec<ActorId> = stream_actors
1026                    .values()
1027                    .flatten()
1028                    .map(|actor| actor.actor_id)
1029                    .collect();
1030                let actor_splits = split_assignment
1031                    .values()
1032                    .flat_map(build_actor_connector_splits)
1033                    .collect();
1034                let subscriptions_to_add =
1035                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1036                        job_type
1037                    {
1038                        snapshot_backfill_info
1039                            .upstream_mv_table_id_to_backfill_epoch
1040                            .keys()
1041                            .map(|table_id| SubscriptionUpstreamInfo {
1042                                subscriber_id: stream_job_fragments
1043                                    .stream_job_id()
1044                                    .as_subscriber_id(),
1045                                upstream_mv_table_id: *table_id,
1046                            })
1047                            .collect()
1048                    } else {
1049                        Default::default()
1050                    };
1051                let backfill_nodes_to_pause: Vec<_> =
1052                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1053                        .into_iter()
1054                        .collect();
1055
1056                let new_upstream_sinks =
1057                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1058                        sink_fragment_id,
1059                        sink_output_fields,
1060                        project_exprs,
1061                        new_sink_downstream,
1062                        ..
1063                    }) = job_type
1064                    {
1065                        let new_sink_actors = stream_actors
1066                            .get(sink_fragment_id)
1067                            .unwrap_or_else(|| {
1068                                panic!("upstream sink fragment {sink_fragment_id} not exist")
1069                            })
1070                            .iter()
1071                            .map(|actor| {
1072                                let worker_id = actor_location[&actor.actor_id];
1073                                PbActorInfo {
1074                                    actor_id: actor.actor_id,
1075                                    host: Some(control_stream_manager.host_addr(worker_id)),
1076                                    partial_graph_id: to_partial_graph_id(database_id, None),
1077                                }
1078                            });
1079                        let new_upstream_sink = PbNewUpstreamSink {
1080                            info: Some(PbUpstreamSinkInfo {
1081                                upstream_fragment_id: *sink_fragment_id,
1082                                sink_output_schema: sink_output_fields.clone(),
1083                                project_exprs: project_exprs.clone(),
1084                            }),
1085                            upstream_actors: new_sink_actors.collect(),
1086                        };
1087                        HashMap::from([(
1088                            new_sink_downstream.downstream_fragment_id,
1089                            new_upstream_sink,
1090                        )])
1091                    } else {
1092                        HashMap::new()
1093                    };
1094
1095                let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1096                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1097
1098                let add_mutation = AddMutation {
1099                    actor_dispatchers: edges
1100                        .dispatchers
1101                        .extract_if(|fragment_id, _| {
1102                            upstream_fragment_downstreams.contains_key(fragment_id)
1103                        })
1104                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1105                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1106                        .collect(),
1107                    added_actors,
1108                    actor_splits,
1109                    // If the cluster is already paused, the new actors should be paused too.
1110                    pause: is_currently_paused,
1111                    subscriptions_to_add,
1112                    backfill_nodes_to_pause,
1113                    actor_cdc_table_snapshot_splits,
1114                    new_upstream_sinks,
1115                };
1116
1117                Ok(Mutation::Add(add_mutation))
1118            }
1119        }
1120    }
1121
1122    /// Build the `Update` mutation for `ReplaceStreamJob`.
1123    pub(super) fn replace_stream_job_to_mutation(
1124        ReplaceStreamJobPlan {
1125            old_fragments,
1126            replace_upstream,
1127            upstream_fragment_downstreams,
1128            auto_refresh_schema_sinks,
1129            ..
1130        }: &ReplaceStreamJobPlan,
1131        edges: &mut FragmentEdgeBuildResult,
1132        database_info: &mut InflightDatabaseInfo,
1133        split_assignment: &SplitAssignment,
1134    ) -> MetaResult<Option<Mutation>> {
1135        {
1136            {
1137                let merge_updates = edges
1138                    .merge_updates
1139                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1140                    .collect();
1141                let dispatchers = edges
1142                    .dispatchers
1143                    .extract_if(|fragment_id, _| {
1144                        upstream_fragment_downstreams.contains_key(fragment_id)
1145                    })
1146                    .collect();
1147                let actor_cdc_table_snapshot_splits = database_info
1148                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1149                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1150                let old_fragments = old_fragments.fragments.keys().copied();
1151                let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1152                    .as_ref()
1153                    .into_iter()
1154                    .flat_map(|sinks| sinks.iter())
1155                    .map(|sink| sink.original_fragment.fragment_id);
1156                Ok(Self::generate_update_mutation_for_replace_table(
1157                    old_fragments
1158                        .chain(auto_refresh_sink_fragment_ids)
1159                        .flat_map(|fragment_id| {
1160                            database_info.fragment(fragment_id).actors.keys().copied()
1161                        }),
1162                    merge_updates,
1163                    dispatchers,
1164                    split_assignment,
1165                    actor_cdc_table_snapshot_splits,
1166                    auto_refresh_schema_sinks.as_ref(),
1167                ))
1168            }
1169        }
1170    }
1171
1172    /// Build the `Update` mutation for `RescheduleIntent`.
1173    pub(super) fn reschedule_to_mutation(
1174        reschedules: &HashMap<FragmentId, Reschedule>,
1175        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1176        control_stream_manager: &ControlStreamManager,
1177        database_info: &mut InflightDatabaseInfo,
1178    ) -> MetaResult<Option<Mutation>> {
1179        {
1180            {
1181                let database_id = database_info.database_id;
1182                let mut dispatcher_update = HashMap::new();
1183                for reschedule in reschedules.values() {
1184                    for &(upstream_fragment_id, dispatcher_id) in
1185                        &reschedule.upstream_fragment_dispatcher_ids
1186                    {
1187                        // Find the actors of the upstream fragment.
1188                        let upstream_actor_ids = fragment_actors
1189                            .get(&upstream_fragment_id)
1190                            .expect("should contain");
1191
1192                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1193
1194                        // Record updates for all actors.
1195                        for &actor_id in upstream_actor_ids {
1196                            let added_downstream_actor_id = if upstream_reschedule
1197                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1198                                .unwrap_or(true)
1199                            {
1200                                reschedule
1201                                    .added_actors
1202                                    .values()
1203                                    .flatten()
1204                                    .cloned()
1205                                    .collect()
1206                            } else {
1207                                Default::default()
1208                            };
1209                            // Index with the dispatcher id to check duplicates.
1210                            dispatcher_update
1211                                .try_insert(
1212                                    (actor_id, dispatcher_id),
1213                                    DispatcherUpdate {
1214                                        actor_id,
1215                                        dispatcher_id,
1216                                        hash_mapping: reschedule
1217                                            .upstream_dispatcher_mapping
1218                                            .as_ref()
1219                                            .map(|m| m.to_protobuf()),
1220                                        added_downstream_actor_id,
1221                                        removed_downstream_actor_id: reschedule
1222                                            .removed_actors
1223                                            .iter()
1224                                            .cloned()
1225                                            .collect(),
1226                                    },
1227                                )
1228                                .unwrap();
1229                        }
1230                    }
1231                }
1232                let dispatcher_update = dispatcher_update.into_values().collect();
1233
1234                let mut merge_update = HashMap::new();
1235                for (&fragment_id, reschedule) in reschedules {
1236                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1237                        // Find the actors of the downstream fragment.
1238                        let downstream_actor_ids = fragment_actors
1239                            .get(&downstream_fragment_id)
1240                            .expect("should contain");
1241
1242                        // Downstream removed actors should be skipped
1243                        // Newly created actors of the current fragment will not dispatch Update
1244                        // barriers to them
1245                        let downstream_removed_actors: HashSet<_> = reschedules
1246                            .get(&downstream_fragment_id)
1247                            .map(|downstream_reschedule| {
1248                                downstream_reschedule
1249                                    .removed_actors
1250                                    .iter()
1251                                    .copied()
1252                                    .collect()
1253                            })
1254                            .unwrap_or_default();
1255
1256                        // Record updates for all actors.
1257                        for &actor_id in downstream_actor_ids {
1258                            if downstream_removed_actors.contains(&actor_id) {
1259                                continue;
1260                            }
1261
1262                            // Index with the fragment id to check duplicates.
1263                            merge_update
1264                                .try_insert(
1265                                    (actor_id, fragment_id),
1266                                    MergeUpdate {
1267                                        actor_id,
1268                                        upstream_fragment_id: fragment_id,
1269                                        new_upstream_fragment_id: None,
1270                                        added_upstream_actors: reschedule
1271                                            .added_actors
1272                                            .iter()
1273                                            .flat_map(|(worker_id, actors)| {
1274                                                let host =
1275                                                    control_stream_manager.host_addr(*worker_id);
1276                                                actors.iter().map(move |&actor_id| PbActorInfo {
1277                                                    actor_id,
1278                                                    host: Some(host.clone()),
1279                                                    // we assume that we only scale the partial graph of database
1280                                                    partial_graph_id: to_partial_graph_id(
1281                                                        database_id,
1282                                                        None,
1283                                                    ),
1284                                                })
1285                                            })
1286                                            .collect(),
1287                                        removed_upstream_actor_id: reschedule
1288                                            .removed_actors
1289                                            .iter()
1290                                            .cloned()
1291                                            .collect(),
1292                                    },
1293                                )
1294                                .unwrap();
1295                        }
1296                    }
1297                }
1298                let merge_update = merge_update.into_values().collect();
1299
1300                let mut actor_vnode_bitmap_update = HashMap::new();
1301                for reschedule in reschedules.values() {
1302                    // Record updates for all actors in this fragment.
1303                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1304                        let bitmap = bitmap.to_protobuf();
1305                        actor_vnode_bitmap_update
1306                            .try_insert(actor_id, bitmap)
1307                            .unwrap();
1308                    }
1309                }
1310                let dropped_actors = reschedules
1311                    .values()
1312                    .flat_map(|r| r.removed_actors.iter().copied())
1313                    .collect();
1314                let mut actor_splits = HashMap::new();
1315                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1316                for (fragment_id, reschedule) in reschedules {
1317                    for (actor_id, splits) in &reschedule.actor_splits {
1318                        actor_splits.insert(
1319                            *actor_id,
1320                            ConnectorSplits {
1321                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1322                            },
1323                        );
1324                    }
1325
1326                    if let Some(assignment) =
1327                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1328                    {
1329                        actor_cdc_table_snapshot_splits.extend(assignment)
1330                    }
1331                }
1332
1333                // we don't create dispatchers in reschedule scenario
1334                let actor_new_dispatchers = HashMap::new();
1335                let mutation = Mutation::Update(UpdateMutation {
1336                    dispatcher_update,
1337                    merge_update,
1338                    actor_vnode_bitmap_update,
1339                    dropped_actors,
1340                    actor_splits,
1341                    actor_new_dispatchers,
1342                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1343                        splits: actor_cdc_table_snapshot_splits,
1344                    }),
1345                    sink_schema_change: Default::default(),
1346                    subscriptions_to_drop: vec![],
1347                });
1348                tracing::debug!("update mutation: {mutation:?}");
1349                Ok(Some(mutation))
1350            }
1351        }
1352    }
1353
1354    /// Build the `Add` mutation for `CreateSubscription`.
1355    pub(super) fn create_subscription_to_mutation(
1356        upstream_mv_table_id: TableId,
1357        subscription_id: SubscriptionId,
1358    ) -> Mutation {
1359        {
1360            Mutation::Add(AddMutation {
1361                actor_dispatchers: Default::default(),
1362                added_actors: vec![],
1363                actor_splits: Default::default(),
1364                pause: false,
1365                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1366                    upstream_mv_table_id,
1367                    subscriber_id: subscription_id.as_subscriber_id(),
1368                }],
1369                backfill_nodes_to_pause: vec![],
1370                actor_cdc_table_snapshot_splits: None,
1371                new_upstream_sinks: Default::default(),
1372            })
1373        }
1374    }
1375
1376    /// Build the `DropSubscriptions` mutation for `DropSubscription`.
1377    pub(super) fn drop_subscription_to_mutation(
1378        upstream_mv_table_id: TableId,
1379        subscription_id: SubscriptionId,
1380    ) -> Mutation {
1381        {
1382            Mutation::DropSubscriptions(DropSubscriptionsMutation {
1383                info: vec![SubscriptionUpstreamInfo {
1384                    subscriber_id: subscription_id.as_subscriber_id(),
1385                    upstream_mv_table_id,
1386                }],
1387            })
1388        }
1389    }
1390
1391    /// Build the `ConnectorPropsChange` mutation.
1392    pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1393        {
1394            {
1395                let mut connector_props_infos = HashMap::default();
1396                for (k, v) in config {
1397                    connector_props_infos.insert(
1398                        k.as_raw_id(),
1399                        ConnectorPropsInfo {
1400                            connector_props_info: v.clone(),
1401                        },
1402                    );
1403                }
1404                Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1405                    connector_props_infos,
1406                })
1407            }
1408        }
1409    }
1410
1411    /// Build the `RefreshStart` mutation.
1412    pub(super) fn refresh_to_mutation(
1413        table_id: TableId,
1414        associated_source_id: SourceId,
1415    ) -> Mutation {
1416        Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1417            table_id,
1418            associated_source_id,
1419        })
1420    }
1421
1422    /// Build the `ListFinish` mutation.
1423    pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1424        Mutation::ListFinish(ListFinishMutation {
1425            associated_source_id,
1426        })
1427    }
1428
1429    /// Build the `LoadFinish` mutation.
1430    pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1431        Mutation::LoadFinish(LoadFinishMutation {
1432            associated_source_id,
1433        })
1434    }
1435
1436    /// Build the `ResetSource` mutation.
1437    pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1438        Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1439            source_id: source_id.as_raw_id(),
1440        })
1441    }
1442
1443    /// Build the `StartFragmentBackfill` mutation for `ResumeBackfill`.
1444    pub(super) fn resume_backfill_to_mutation(
1445        target: &ResumeBackfillTarget,
1446        database_info: &InflightDatabaseInfo,
1447    ) -> MetaResult<Option<Mutation>> {
1448        {
1449            {
1450                let fragment_ids: HashSet<_> = match target {
1451                    ResumeBackfillTarget::Job(job_id) => {
1452                        database_info.backfill_fragment_ids_for_job(*job_id)?
1453                    }
1454                    ResumeBackfillTarget::Fragment(fragment_id) => {
1455                        if !database_info.is_backfill_fragment(*fragment_id)? {
1456                            return Err(MetaError::invalid_parameter(format!(
1457                                "fragment {} is not a backfill node",
1458                                fragment_id
1459                            )));
1460                        }
1461                        HashSet::from([*fragment_id])
1462                    }
1463                };
1464                if fragment_ids.is_empty() {
1465                    warn!(
1466                        ?target,
1467                        "resume backfill command ignored because no backfill fragments found"
1468                    );
1469                    Ok(None)
1470                } else {
1471                    Ok(Some(Mutation::StartFragmentBackfill(
1472                        StartFragmentBackfillMutation {
1473                            fragment_ids: fragment_ids.into_iter().collect(),
1474                        },
1475                    )))
1476                }
1477            }
1478        }
1479    }
1480
1481    /// Build the `InjectSourceOffsets` mutation.
1482    pub(super) fn inject_source_offsets_to_mutation(
1483        source_id: SourceId,
1484        split_offsets: &HashMap<String, String>,
1485    ) -> Mutation {
1486        Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1487            source_id: source_id.as_raw_id(),
1488            split_offsets: split_offsets.clone(),
1489        })
1490    }
1491
1492    /// Collect actors to create for `CreateStreamingJob` (non-snapshot-backfill).
1493    pub(super) fn create_streaming_job_actors_to_create(
1494        info: &CreateStreamingJobCommandInfo,
1495        edges: &mut FragmentEdgeBuildResult,
1496        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1497        actor_location: &HashMap<ActorId, WorkerId>,
1498    ) -> StreamJobActorsToCreate {
1499        {
1500            {
1501                edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1502                    |fragment| {
1503                        let actors = stream_actors
1504                            .get(&fragment.fragment_id)
1505                            .into_iter()
1506                            .flatten()
1507                            .map(|actor| (actor, actor_location[&actor.actor_id]));
1508                        (
1509                            fragment.fragment_id,
1510                            &fragment.nodes,
1511                            actors,
1512                            [], // no subscriber for new job to create
1513                        )
1514                    },
1515                ))
1516            }
1517        }
1518    }
1519
1520    /// Collect actors to create for `RescheduleIntent`.
1521    pub(super) fn reschedule_actors_to_create(
1522        reschedules: &HashMap<FragmentId, Reschedule>,
1523        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1524        database_info: &InflightDatabaseInfo,
1525        control_stream_manager: &ControlStreamManager,
1526    ) -> StreamJobActorsToCreate {
1527        {
1528            {
1529                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1530                    reschedules.iter().map(|(fragment_id, reschedule)| {
1531                        (
1532                            *fragment_id,
1533                            reschedule.newly_created_actors.values().map(
1534                                |((actor, dispatchers), _)| {
1535                                    (actor.actor_id, dispatchers.as_slice())
1536                                },
1537                            ),
1538                        )
1539                    }),
1540                    Some((reschedules, fragment_actors)),
1541                    database_info,
1542                    control_stream_manager,
1543                );
1544                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1545                for (fragment_id, (actor, dispatchers), worker_id) in
1546                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1547                        reschedule
1548                            .newly_created_actors
1549                            .values()
1550                            .map(|(actors, status)| (*fragment_id, actors, status))
1551                    })
1552                {
1553                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1554                    map.entry(*worker_id)
1555                        .or_default()
1556                        .entry(fragment_id)
1557                        .or_insert_with(|| {
1558                            let node = database_info.fragment(fragment_id).nodes.clone();
1559                            let subscribers =
1560                                database_info.fragment_subscribers(fragment_id).collect();
1561                            (node, vec![], subscribers)
1562                        })
1563                        .1
1564                        .push((actor.clone(), upstreams, dispatchers.clone()));
1565                }
1566                map
1567            }
1568        }
1569    }
1570
1571    /// Collect actors to create for `ReplaceStreamJob`.
1572    pub(super) fn replace_stream_job_actors_to_create(
1573        replace_table: &ReplaceStreamJobPlan,
1574        edges: &mut FragmentEdgeBuildResult,
1575        database_info: &InflightDatabaseInfo,
1576        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1577        actor_location: &HashMap<ActorId, WorkerId>,
1578    ) -> StreamJobActorsToCreate {
1579        {
1580            {
1581                let mut actors = edges.collect_actors_to_create(
1582                    replace_table
1583                        .new_fragments
1584                        .fragments
1585                        .values()
1586                        .map(|fragment| {
1587                            let actors = stream_actors
1588                                .get(&fragment.fragment_id)
1589                                .into_iter()
1590                                .flatten()
1591                                .map(|actor| (actor, actor_location[&actor.actor_id]));
1592                            (
1593                                fragment.fragment_id,
1594                                &fragment.nodes,
1595                                actors,
1596                                database_info
1597                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1598                            )
1599                        }),
1600                );
1601
1602                // Handle auto-refresh schema sinks
1603                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1604                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1605                        (
1606                            sink.new_fragment.fragment_id,
1607                            &sink.new_fragment.nodes,
1608                            stream_actors
1609                                .get(&sink.new_fragment.fragment_id)
1610                                .into_iter()
1611                                .flatten()
1612                                .map(|actor| (actor, actor_location[&actor.actor_id])),
1613                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1614                        )
1615                    }));
1616                    for (worker_id, fragment_actors) in sink_actors {
1617                        actors.entry(worker_id).or_default().extend(fragment_actors);
1618                    }
1619                }
1620                actors
1621            }
1622        }
1623    }
1624
1625    fn generate_update_mutation_for_replace_table(
1626        dropped_actors: impl IntoIterator<Item = ActorId>,
1627        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1628        dispatchers: FragmentActorDispatchers,
1629        split_assignment: &SplitAssignment,
1630        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1631        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1632    ) -> Option<Mutation> {
1633        let dropped_actors = dropped_actors.into_iter().collect();
1634
1635        let actor_new_dispatchers = dispatchers
1636            .into_values()
1637            .flatten()
1638            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1639            .collect();
1640
1641        let actor_splits = split_assignment
1642            .values()
1643            .flat_map(build_actor_connector_splits)
1644            .collect();
1645        Some(Mutation::Update(UpdateMutation {
1646            actor_new_dispatchers,
1647            merge_update: merge_updates.into_values().flatten().collect(),
1648            dropped_actors,
1649            actor_splits,
1650            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1651            sink_schema_change: auto_refresh_schema_sinks
1652                .as_ref()
1653                .into_iter()
1654                .flat_map(|sinks| {
1655                    sinks.iter().map(|sink| {
1656                        let op = if !sink.removed_column_names.is_empty() {
1657                            PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1658                                column_names: sink.removed_column_names.clone(),
1659                            })
1660                        } else {
1661                            PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1662                                fields: sink
1663                                    .newly_add_fields
1664                                    .iter()
1665                                    .map(|field| field.to_prost())
1666                                    .collect(),
1667                            })
1668                        };
1669                        (
1670                            sink.original_sink.id.as_raw_id(),
1671                            PbSinkSchemaChange {
1672                                original_schema: sink
1673                                    .original_sink
1674                                    .columns
1675                                    .iter()
1676                                    .map(|col| PbField {
1677                                        data_type: Some(
1678                                            col.column_desc
1679                                                .as_ref()
1680                                                .unwrap()
1681                                                .column_type
1682                                                .as_ref()
1683                                                .unwrap()
1684                                                .clone(),
1685                                        ),
1686                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1687                                    })
1688                                    .collect(),
1689                                op: Some(op),
1690                            },
1691                        )
1692                    })
1693                })
1694                .collect(),
1695            ..Default::default()
1696        }))
1697    }
1698}
1699
1700impl Command {
1701    #[expect(clippy::type_complexity)]
1702    pub(super) fn collect_database_partial_graph_actor_upstreams(
1703        actor_dispatchers: impl Iterator<
1704            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1705        >,
1706        reschedule_dispatcher_update: Option<(
1707            &HashMap<FragmentId, Reschedule>,
1708            &HashMap<FragmentId, HashSet<ActorId>>,
1709        )>,
1710        database_info: &InflightDatabaseInfo,
1711        control_stream_manager: &ControlStreamManager,
1712    ) -> HashMap<ActorId, ActorUpstreams> {
1713        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1714        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1715            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1716            for (upstream_actor_id, dispatchers) in upstream_actors {
1717                let upstream_actor_location =
1718                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1719                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1720                for downstream_actor_id in dispatchers
1721                    .iter()
1722                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1723                {
1724                    actor_upstreams
1725                        .entry(*downstream_actor_id)
1726                        .or_default()
1727                        .entry(upstream_fragment_id)
1728                        .or_default()
1729                        .insert(
1730                            upstream_actor_id,
1731                            PbActorInfo {
1732                                actor_id: upstream_actor_id,
1733                                host: Some(upstream_actor_host.clone()),
1734                                partial_graph_id: to_partial_graph_id(
1735                                    database_info.database_id,
1736                                    None,
1737                                ),
1738                            },
1739                        );
1740                }
1741            }
1742        }
1743        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1744            for reschedule in reschedules.values() {
1745                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1746                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1747                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1748                    for upstream_actor_id in fragment_actors
1749                        .get(upstream_fragment_id)
1750                        .expect("should exist")
1751                    {
1752                        let upstream_actor_location =
1753                            upstream_fragment.actors[upstream_actor_id].worker_id;
1754                        let upstream_actor_host =
1755                            control_stream_manager.host_addr(upstream_actor_location);
1756                        if let Some(upstream_reschedule) = upstream_reschedule
1757                            && upstream_reschedule
1758                                .removed_actors
1759                                .contains(upstream_actor_id)
1760                        {
1761                            continue;
1762                        }
1763                        for (_, downstream_actor_id) in
1764                            reschedule
1765                                .added_actors
1766                                .iter()
1767                                .flat_map(|(worker_id, actors)| {
1768                                    actors.iter().map(|actor| (*worker_id, *actor))
1769                                })
1770                        {
1771                            actor_upstreams
1772                                .entry(downstream_actor_id)
1773                                .or_default()
1774                                .entry(*upstream_fragment_id)
1775                                .or_default()
1776                                .insert(
1777                                    *upstream_actor_id,
1778                                    PbActorInfo {
1779                                        actor_id: *upstream_actor_id,
1780                                        host: Some(upstream_actor_host.clone()),
1781                                        partial_graph_id: to_partial_graph_id(
1782                                            database_info.database_id,
1783                                            None,
1784                                        ),
1785                                    },
1786                                );
1787                        }
1788                    }
1789                }
1790            }
1791        }
1792        actor_upstreams
1793    }
1794}