risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::PbField;
35use risingwave_pb::source::{
36    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
37};
38use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
39use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
42use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
43use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
44use risingwave_pb::stream_plan::update_mutation::*;
45use risingwave_pb::stream_plan::{
46    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
47    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
48    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
49    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
50};
51use risingwave_pb::stream_service::BarrierCompleteResponse;
52use tracing::warn;
53
54use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
55use crate::MetaResult;
56use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
57use crate::barrier::cdc_progress::CdcTableBackfillTracker;
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::BarrierInfo;
60use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
61use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
62use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
63use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
64use crate::manager::{StreamingJob, StreamingJobType};
65use crate::model::{
66    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
67    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
68    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
69};
70use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
71use crate::stream::{
72    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
73    SplitState, UpstreamSinkInfo, build_actor_connector_splits,
74};
75
76/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
77/// in some fragment, like scaling or migrating.
78#[derive(Debug, Clone)]
79pub struct Reschedule {
80    /// Added actors in this fragment.
81    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
82
83    /// Removed actors in this fragment.
84    pub removed_actors: HashSet<ActorId>,
85
86    /// Vnode bitmap updates for some actors in this fragment.
87    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
88
89    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
90    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
91    /// New hash mapping of the upstream dispatcher to be updated.
92    ///
93    /// This field exists only when there's upstream fragment and the current fragment is
94    /// hash-sharded.
95    pub upstream_dispatcher_mapping: Option<ActorMapping>,
96
97    /// The downstream fragments of this fragment.
98    pub downstream_fragment_ids: Vec<FragmentId>,
99
100    /// Reassigned splits for source actors.
101    /// It becomes the `actor_splits` in [`UpdateMutation`].
102    /// `Source` and `SourceBackfill` are handled together here.
103    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
104
105    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
106}
107
108/// Replacing an old job with a new one. All actors in the job will be rebuilt.
109///
110/// Current use cases:
111/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
112/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
113///   will replace a table job's plan.
114#[derive(Debug, Clone)]
115pub struct ReplaceStreamJobPlan {
116    pub old_fragments: StreamJobFragments,
117    pub new_fragments: StreamJobFragmentsToCreate,
118    /// Downstream jobs of the replaced job need to update their `Merge` node to
119    /// connect to the new fragment.
120    pub replace_upstream: FragmentReplaceUpstream,
121    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
122    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
123    /// We need to reassign splits for it.
124    ///
125    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
126    /// `backfill_splits`.
127    pub init_split_assignment: SplitAssignment,
128    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
129    pub streaming_job: StreamingJob,
130    /// The temporary dummy job fragments id of new table fragment
131    pub tmp_id: JobId,
132    /// The state table ids to be dropped.
133    pub to_drop_state_table_ids: Vec<TableId>,
134    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
135}
136
137impl ReplaceStreamJobPlan {
138    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
139        let mut fragment_changes = HashMap::new();
140        for (fragment_id, new_fragment) in self
141            .new_fragments
142            .new_fragment_info(&self.init_split_assignment)
143        {
144            let fragment_change = CommandFragmentChanges::NewFragment {
145                job_id: self.streaming_job.id(),
146                info: new_fragment,
147            };
148            fragment_changes
149                .try_insert(fragment_id, fragment_change)
150                .expect("non-duplicate");
151        }
152        for fragment in self.old_fragments.fragments.values() {
153            fragment_changes
154                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
155                .expect("non-duplicate");
156        }
157        for (fragment_id, replace_map) in &self.replace_upstream {
158            fragment_changes
159                .try_insert(
160                    *fragment_id,
161                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
162                )
163                .expect("non-duplicate");
164        }
165        if let Some(sinks) = &self.auto_refresh_schema_sinks {
166            for sink in sinks {
167                let fragment_change = CommandFragmentChanges::NewFragment {
168                    job_id: sink.original_sink.id.as_job_id(),
169                    info: sink.new_fragment_info(),
170                };
171                fragment_changes
172                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
173                    .expect("non-duplicate");
174                fragment_changes
175                    .try_insert(
176                        sink.original_fragment.fragment_id,
177                        CommandFragmentChanges::RemoveFragment,
178                    )
179                    .expect("non-duplicate");
180            }
181        }
182        fragment_changes
183    }
184
185    /// `old_fragment_id` -> `new_fragment_id`
186    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
187        let mut fragment_replacements = HashMap::new();
188        for (upstream_fragment_id, new_upstream_fragment_id) in
189            self.replace_upstream.values().flatten()
190        {
191            {
192                let r =
193                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
194                if let Some(r) = r {
195                    assert_eq!(
196                        *new_upstream_fragment_id, r,
197                        "one fragment is replaced by multiple fragments"
198                    );
199                }
200            }
201        }
202        fragment_replacements
203    }
204}
205
206#[derive(educe::Educe, Clone)]
207#[educe(Debug)]
208pub struct CreateStreamingJobCommandInfo {
209    #[educe(Debug(ignore))]
210    pub stream_job_fragments: StreamJobFragmentsToCreate,
211    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
212    pub init_split_assignment: SplitAssignment,
213    pub definition: String,
214    pub job_type: StreamingJobType,
215    pub create_type: CreateType,
216    pub streaming_job: StreamingJob,
217    pub fragment_backfill_ordering: FragmentBackfillOrder,
218    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
219    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
220    pub is_serverless: bool,
221}
222
223impl StreamJobFragments {
224    pub(super) fn new_fragment_info<'a>(
225        &'a self,
226        assignment: &'a SplitAssignment,
227    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
228        self.fragments.values().map(|fragment| {
229            let mut fragment_splits = assignment
230                .get(&fragment.fragment_id)
231                .cloned()
232                .unwrap_or_default();
233
234            (
235                fragment.fragment_id,
236                InflightFragmentInfo {
237                    fragment_id: fragment.fragment_id,
238                    distribution_type: fragment.distribution_type.into(),
239                    fragment_type_mask: fragment.fragment_type_mask,
240                    vnode_count: fragment.vnode_count(),
241                    nodes: fragment.nodes.clone(),
242                    actors: fragment
243                        .actors
244                        .iter()
245                        .map(|actor| {
246                            (
247                                actor.actor_id,
248                                InflightActorInfo {
249                                    worker_id: self
250                                        .actor_status
251                                        .get(&actor.actor_id)
252                                        .expect("should exist")
253                                        .worker_id(),
254                                    vnode_bitmap: actor.vnode_bitmap.clone(),
255                                    splits: fragment_splits
256                                        .remove(&actor.actor_id)
257                                        .unwrap_or_default(),
258                                },
259                            )
260                        })
261                        .collect(),
262                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
263                },
264            )
265        })
266    }
267}
268
269#[derive(Debug, Clone)]
270pub struct SnapshotBackfillInfo {
271    /// `table_id` -> `Some(snapshot_backfill_epoch)`
272    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
273    /// by global barrier worker when handling the command.
274    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
275}
276
277#[derive(Debug, Clone)]
278pub enum CreateStreamingJobType {
279    Normal,
280    SinkIntoTable(UpstreamSinkInfo),
281    SnapshotBackfill(SnapshotBackfillInfo),
282}
283
284/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
285/// it will [build different barriers to send](Self::to_mutation),
286/// and may [do different stuffs after the barrier is collected](PostCollectCommand::post_collect).
287// FIXME: this enum is significantly large on stack, box it
288#[derive(Debug)]
289pub enum Command {
290    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
291    /// all messages before the checkpoint barrier should have been committed.
292    Flush,
293
294    /// `Pause` command generates a `Pause` barrier **only if**
295    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
296    Pause,
297
298    /// `Resume` command generates a `Resume` barrier **only
299    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
300    /// will be generated.
301    Resume,
302
303    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
304    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
305    /// dropped by reference counts before.
306    ///
307    /// Barriers from the actors to be dropped will STILL be collected.
308    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
309    /// drop actors, and then delete the job fragments info from meta store.
310    DropStreamingJobs {
311        streaming_job_ids: HashSet<JobId>,
312        actors: Vec<ActorId>,
313        unregistered_state_table_ids: HashSet<TableId>,
314        unregistered_fragment_ids: HashSet<FragmentId>,
315        // target_fragment -> [sink_fragments]
316        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
317    },
318
319    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
320    ///
321    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
322    /// be collected since the barrier should be passthrough.
323    ///
324    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
325    /// it adds the job fragments info to meta store. However, the creating progress will **last
326    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
327    /// will be set to `Created`.
328    CreateStreamingJob {
329        info: CreateStreamingJobCommandInfo,
330        job_type: CreateStreamingJobType,
331        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
332    },
333
334    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
335    /// Mainly used for scaling and migration.
336    ///
337    /// Barriers from which actors should be collected, and the post behavior of this command are
338    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
339    RescheduleFragment {
340        reschedules: HashMap<FragmentId, Reschedule>,
341        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
342        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
343    },
344
345    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
346    /// essentially switching the downstream of the old job fragments to the new ones, and
347    /// dropping the old job fragments. Used for schema change.
348    ///
349    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
350    /// of the Merge executors are changed additionally.
351    ReplaceStreamJob(ReplaceStreamJobPlan),
352
353    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
354    /// changed splits.
355    SourceChangeSplit(SplitState),
356
357    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
358    /// the `rate_limit` of executors. `throttle_type` specifies which executor kinds should apply it.
359    Throttle {
360        jobs: HashSet<JobId>,
361        config: HashMap<FragmentId, ThrottleConfig>,
362    },
363
364    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
365    /// materialize executor to start storing old value for subscription.
366    CreateSubscription {
367        subscription_id: SubscriptionId,
368        upstream_mv_table_id: TableId,
369        retention_second: u64,
370    },
371
372    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
373    /// materialize executor to stop storing old value when there is no
374    /// subscription depending on it.
375    DropSubscription {
376        subscription_id: SubscriptionId,
377        upstream_mv_table_id: TableId,
378    },
379
380    ConnectorPropsChange(ConnectorPropsChange),
381
382    /// `Refresh` command generates a barrier to refresh a table by truncating state
383    /// and reloading data from source.
384    Refresh {
385        table_id: TableId,
386        associated_source_id: SourceId,
387    },
388    ListFinish {
389        table_id: TableId,
390        associated_source_id: SourceId,
391    },
392    LoadFinish {
393        table_id: TableId,
394        associated_source_id: SourceId,
395    },
396
397    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
398    /// Used when upstream binlog/oplog has expired.
399    ResetSource {
400        source_id: SourceId,
401    },
402}
403
404// For debugging and observability purposes. Can add more details later if needed.
405impl std::fmt::Display for Command {
406    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
407        match self {
408            Command::Flush => write!(f, "Flush"),
409            Command::Pause => write!(f, "Pause"),
410            Command::Resume => write!(f, "Resume"),
411            Command::DropStreamingJobs {
412                streaming_job_ids, ..
413            } => {
414                write!(
415                    f,
416                    "DropStreamingJobs: {}",
417                    streaming_job_ids.iter().sorted().join(", ")
418                )
419            }
420            Command::CreateStreamingJob { info, .. } => {
421                write!(f, "CreateStreamingJob: {}", info.streaming_job)
422            }
423            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
424            Command::ReplaceStreamJob(plan) => {
425                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
426            }
427            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
428            Command::Throttle { .. } => write!(f, "Throttle"),
429            Command::CreateSubscription {
430                subscription_id, ..
431            } => write!(f, "CreateSubscription: {subscription_id}"),
432            Command::DropSubscription {
433                subscription_id, ..
434            } => write!(f, "DropSubscription: {subscription_id}"),
435            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
436            Command::Refresh {
437                table_id,
438                associated_source_id,
439            } => write!(
440                f,
441                "Refresh: {} (source: {})",
442                table_id, associated_source_id
443            ),
444            Command::ListFinish {
445                table_id,
446                associated_source_id,
447            } => write!(
448                f,
449                "ListFinish: {} (source: {})",
450                table_id, associated_source_id
451            ),
452            Command::LoadFinish {
453                table_id,
454                associated_source_id,
455            } => write!(
456                f,
457                "LoadFinish: {} (source: {})",
458                table_id, associated_source_id
459            ),
460            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
461        }
462    }
463}
464
465impl Command {
466    pub fn pause() -> Self {
467        Self::Pause
468    }
469
470    pub fn resume() -> Self {
471        Self::Resume
472    }
473
474    #[expect(clippy::type_complexity)]
475    pub(super) fn fragment_changes(
476        &self,
477    ) -> Option<(
478        Option<(JobId, Option<CdcTableBackfillTracker>)>,
479        HashMap<FragmentId, CommandFragmentChanges>,
480    )> {
481        match self {
482            Command::Flush => None,
483            Command::Pause => None,
484            Command::Resume => None,
485            Command::DropStreamingJobs {
486                unregistered_fragment_ids,
487                dropped_sink_fragment_by_targets,
488                ..
489            } => {
490                let changes = unregistered_fragment_ids
491                    .iter()
492                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
493                    .chain(dropped_sink_fragment_by_targets.iter().map(
494                        |(target_fragment, sink_fragments)| {
495                            (
496                                *target_fragment,
497                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
498                            )
499                        },
500                    ))
501                    .collect();
502
503                Some((None, changes))
504            }
505            Command::CreateStreamingJob { info, job_type, .. } => {
506                assert!(
507                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
508                    "should handle fragment changes separately for snapshot backfill"
509                );
510                let mut changes: HashMap<_, _> = info
511                    .stream_job_fragments
512                    .new_fragment_info(&info.init_split_assignment)
513                    .map(|(fragment_id, fragment_infos)| {
514                        (
515                            fragment_id,
516                            CommandFragmentChanges::NewFragment {
517                                job_id: info.streaming_job.id(),
518                                info: fragment_infos,
519                            },
520                        )
521                    })
522                    .collect();
523
524                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
525                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
526                    changes.insert(
527                        downstream_fragment_id,
528                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
529                            upstream_fragment_id: ctx.sink_fragment_id,
530                            sink_output_schema: ctx.sink_output_fields.clone(),
531                            project_exprs: ctx.project_exprs.clone(),
532                        }),
533                    );
534                }
535
536                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
537                    let (fragment, _) =
538                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
539                            .expect("should have parallel cdc fragment");
540                    Some(CdcTableBackfillTracker::new(
541                        fragment.fragment_id,
542                        splits.clone(),
543                    ))
544                } else {
545                    None
546                };
547
548                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
549            }
550            Command::RescheduleFragment { reschedules, .. } => Some((
551                None,
552                reschedules
553                    .iter()
554                    .map(|(fragment_id, reschedule)| {
555                        (
556                            *fragment_id,
557                            CommandFragmentChanges::Reschedule {
558                                new_actors: reschedule
559                                    .added_actors
560                                    .iter()
561                                    .flat_map(|(node_id, actors)| {
562                                        actors.iter().map(|actor_id| {
563                                            (
564                                                *actor_id,
565                                                InflightActorInfo {
566                                                    worker_id: *node_id,
567                                                    vnode_bitmap: reschedule
568                                                        .newly_created_actors
569                                                        .get(actor_id)
570                                                        .expect("should exist")
571                                                        .0
572                                                        .0
573                                                        .vnode_bitmap
574                                                        .clone(),
575                                                    splits: reschedule
576                                                        .actor_splits
577                                                        .get(actor_id)
578                                                        .cloned()
579                                                        .unwrap_or_default(),
580                                                },
581                                            )
582                                        })
583                                    })
584                                    .collect(),
585                                actor_update_vnode_bitmap: reschedule
586                                    .vnode_bitmap_updates
587                                    .iter()
588                                    .filter(|(actor_id, _)| {
589                                        // only keep the existing actors
590                                        !reschedule.newly_created_actors.contains_key(actor_id)
591                                    })
592                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
593                                    .collect(),
594                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
595                                actor_splits: reschedule.actor_splits.clone(),
596                            },
597                        )
598                    })
599                    .collect(),
600            )),
601            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
602            Command::SourceChangeSplit(SplitState {
603                split_assignment, ..
604            }) => Some((
605                None,
606                split_assignment
607                    .iter()
608                    .map(|(&fragment_id, splits)| {
609                        (
610                            fragment_id,
611                            CommandFragmentChanges::SplitAssignment {
612                                actor_splits: splits.clone(),
613                            },
614                        )
615                    })
616                    .collect(),
617            )),
618            Command::Throttle { .. } => None,
619            Command::CreateSubscription { .. } => None,
620            Command::DropSubscription { .. } => None,
621            Command::ConnectorPropsChange(_) => None,
622            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
623            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
624            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
625            Command::ResetSource { .. } => None, // ResetSource doesn't change fragment structure
626        }
627    }
628
629    pub fn need_checkpoint(&self) -> bool {
630        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
631        !matches!(self, Command::Resume)
632    }
633}
634
635#[derive(Debug)]
636pub enum PostCollectCommand {
637    Command(String),
638    DropStreamingJobs {
639        streaming_job_ids: HashSet<JobId>,
640        unregistered_state_table_ids: HashSet<TableId>,
641    },
642    CreateStreamingJob {
643        info: CreateStreamingJobCommandInfo,
644        job_type: CreateStreamingJobType,
645        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
646    },
647    RescheduleFragment {
648        reschedules: HashMap<FragmentId, Reschedule>,
649    },
650    ReplaceStreamJob(ReplaceStreamJobPlan),
651    SourceChangeSplit {
652        split_assignment: SplitAssignment,
653    },
654    CreateSubscription {
655        subscription_id: SubscriptionId,
656    },
657    ConnectorPropsChange(ConnectorPropsChange),
658}
659
660impl PostCollectCommand {
661    pub fn barrier() -> Self {
662        PostCollectCommand::Command("barrier".to_owned())
663    }
664
665    pub fn command_name(&self) -> &str {
666        match self {
667            PostCollectCommand::Command(name) => name.as_str(),
668            PostCollectCommand::DropStreamingJobs { .. } => "DropStreamingJobs",
669            PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
670            PostCollectCommand::RescheduleFragment { .. } => "RescheduleFragment",
671            PostCollectCommand::ReplaceStreamJob(_) => "ReplaceStreamJob",
672            PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
673            PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
674            PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
675        }
676    }
677}
678
679impl Display for PostCollectCommand {
680    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681        f.write_str(self.command_name())
682    }
683}
684
685impl Command {
686    pub(super) fn into_post_collect(self) -> PostCollectCommand {
687        match self {
688            Command::DropStreamingJobs {
689                streaming_job_ids,
690                unregistered_state_table_ids,
691                ..
692            } => PostCollectCommand::DropStreamingJobs {
693                streaming_job_ids,
694                unregistered_state_table_ids,
695            },
696            Command::CreateStreamingJob {
697                info,
698                job_type,
699                cross_db_snapshot_backfill_info,
700            } => match job_type {
701                CreateStreamingJobType::SnapshotBackfill(_) => PostCollectCommand::barrier(),
702                job_type => PostCollectCommand::CreateStreamingJob {
703                    info,
704                    job_type,
705                    cross_db_snapshot_backfill_info,
706                },
707            },
708            Command::RescheduleFragment { reschedules, .. } => {
709                PostCollectCommand::RescheduleFragment { reschedules }
710            }
711            Command::ReplaceStreamJob(plan) => PostCollectCommand::ReplaceStreamJob(plan),
712            Command::SourceChangeSplit(SplitState { split_assignment }) => {
713                PostCollectCommand::SourceChangeSplit { split_assignment }
714            }
715            Command::CreateSubscription {
716                subscription_id, ..
717            } => PostCollectCommand::CreateSubscription { subscription_id },
718            Command::ConnectorPropsChange(connector_props_change) => {
719                PostCollectCommand::ConnectorPropsChange(connector_props_change)
720            }
721            Command::Flush => PostCollectCommand::Command("Flush".to_owned()),
722            Command::Pause => PostCollectCommand::Command("Pause".to_owned()),
723            Command::Resume => PostCollectCommand::Command("Resume".to_owned()),
724            Command::Throttle { .. } => PostCollectCommand::Command("Throttle".to_owned()),
725            Command::DropSubscription { .. } => {
726                PostCollectCommand::Command("DropSubscription".to_owned())
727            }
728            Command::Refresh { .. } => PostCollectCommand::Command("Refresh".to_owned()),
729            Command::ListFinish { .. } => PostCollectCommand::Command("ListFinish".to_owned()),
730            Command::LoadFinish { .. } => PostCollectCommand::Command("LoadFinish".to_owned()),
731            Command::ResetSource { .. } => PostCollectCommand::Command("ResetSource".to_owned()),
732        }
733    }
734}
735
736#[derive(Debug, Clone)]
737pub enum BarrierKind {
738    Initial,
739    Barrier,
740    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
741    Checkpoint(Vec<u64>),
742}
743
744impl BarrierKind {
745    pub fn to_protobuf(&self) -> PbBarrierKind {
746        match self {
747            BarrierKind::Initial => PbBarrierKind::Initial,
748            BarrierKind::Barrier => PbBarrierKind::Barrier,
749            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
750        }
751    }
752
753    pub fn is_checkpoint(&self) -> bool {
754        matches!(self, BarrierKind::Checkpoint(_))
755    }
756
757    pub fn is_initial(&self) -> bool {
758        matches!(self, BarrierKind::Initial)
759    }
760
761    pub fn as_str_name(&self) -> &'static str {
762        match self {
763            BarrierKind::Initial => "Initial",
764            BarrierKind::Barrier => "Barrier",
765            BarrierKind::Checkpoint(_) => "Checkpoint",
766        }
767    }
768}
769
770/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
771/// [`Command`].
772pub(super) struct CommandContext {
773    mv_subscription_max_retention: HashMap<TableId, u64>,
774
775    pub(super) barrier_info: BarrierInfo,
776
777    pub(super) table_ids_to_commit: HashSet<TableId>,
778
779    pub(super) command: PostCollectCommand,
780
781    /// The tracing span of this command.
782    ///
783    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
784    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
785    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
786    _span: tracing::Span,
787}
788
789impl std::fmt::Debug for CommandContext {
790    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
791        f.debug_struct("CommandContext")
792            .field("barrier_info", &self.barrier_info)
793            .field("command", &self.command.command_name())
794            .finish()
795    }
796}
797
798impl CommandContext {
799    pub(super) fn new(
800        barrier_info: BarrierInfo,
801        mv_subscription_max_retention: HashMap<TableId, u64>,
802        table_ids_to_commit: HashSet<TableId>,
803        command: PostCollectCommand,
804        span: tracing::Span,
805    ) -> Self {
806        Self {
807            mv_subscription_max_retention,
808            barrier_info,
809            table_ids_to_commit,
810            command,
811            _span: span,
812        }
813    }
814
815    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
816        let Some(truncate_timestamptz) = Timestamptz::from_secs(
817            self.barrier_info
818                .prev_epoch
819                .value()
820                .as_timestamptz()
821                .timestamp()
822                - retention_second as i64,
823        ) else {
824            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
825            return self.barrier_info.prev_epoch.value();
826        };
827        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
828    }
829
830    pub(super) fn collect_commit_epoch_info(
831        &self,
832        info: &mut CommitEpochInfo,
833        resps: Vec<BarrierCompleteResponse>,
834        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
835    ) {
836        let (
837            sst_to_context,
838            synced_ssts,
839            new_table_watermarks,
840            old_value_ssts,
841            vector_index_adds,
842            truncate_tables,
843        ) = collect_resp_info(resps);
844
845        let new_table_fragment_infos =
846            if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = &self.command {
847                assert!(!matches!(
848                    job_type,
849                    CreateStreamingJobType::SnapshotBackfill(_)
850                ));
851                let table_fragments = &info.stream_job_fragments;
852                let mut table_ids: HashSet<_> =
853                    table_fragments.internal_table_ids().into_iter().collect();
854                if let Some(mv_table_id) = table_fragments.mv_table_id() {
855                    table_ids.insert(mv_table_id);
856                }
857
858                vec![NewTableFragmentInfo { table_ids }]
859            } else {
860                vec![]
861            };
862
863        let mut mv_log_store_truncate_epoch = HashMap::new();
864        // TODO: may collect cross db snapshot backfill
865        let mut update_truncate_epoch =
866            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
867                Entry::Occupied(mut entry) => {
868                    let prev_truncate_epoch = entry.get_mut();
869                    if truncate_epoch < *prev_truncate_epoch {
870                        *prev_truncate_epoch = truncate_epoch;
871                    }
872                }
873                Entry::Vacant(entry) => {
874                    entry.insert(truncate_epoch);
875                }
876            };
877        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
878            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
879            update_truncate_epoch(*mv_table_id, truncate_epoch);
880        }
881        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
882            for mv_table_id in upstream_mv_table_ids {
883                update_truncate_epoch(mv_table_id, backfill_epoch);
884            }
885        }
886
887        let table_new_change_log = build_table_change_log_delta(
888            old_value_ssts.into_iter(),
889            synced_ssts.iter().map(|sst| &sst.sst_info),
890            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
891            mv_log_store_truncate_epoch.into_iter(),
892        );
893
894        let epoch = self.barrier_info.prev_epoch();
895        for table_id in &self.table_ids_to_commit {
896            info.tables_to_commit
897                .try_insert(*table_id, epoch)
898                .expect("non duplicate");
899        }
900
901        info.sstables.extend(synced_ssts);
902        info.new_table_watermarks.extend(new_table_watermarks);
903        info.sst_to_context.extend(sst_to_context);
904        info.new_table_fragment_infos
905            .extend(new_table_fragment_infos);
906        info.change_log_delta.extend(table_new_change_log);
907        for (table_id, vector_index_adds) in vector_index_adds {
908            info.vector_index_delta
909                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
910                .expect("non-duplicate");
911        }
912        if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } = &self.command
913            && let Some(index_table) = collect_new_vector_index_info(job_info)
914        {
915            info.vector_index_delta
916                .try_insert(
917                    index_table.id,
918                    VectorIndexDelta::Init(PbVectorIndexInit {
919                        info: Some(index_table.vector_index_info.unwrap()),
920                    }),
921                )
922                .expect("non-duplicate");
923        }
924        info.truncate_tables.extend(truncate_tables);
925    }
926}
927
928impl Command {
929    /// Generate a mutation for the given command.
930    ///
931    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
932    pub(super) fn to_mutation(
933        &self,
934        is_currently_paused: bool,
935        edges: &mut Option<FragmentEdgeBuildResult>,
936        control_stream_manager: &ControlStreamManager,
937        database_info: &mut InflightDatabaseInfo,
938    ) -> MetaResult<Option<Mutation>> {
939        let database_id = database_info.database_id;
940        let mutation = match self {
941            Command::Flush => None,
942
943            Command::Pause => {
944                // Only pause when the cluster is not already paused.
945                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
946                if !is_currently_paused {
947                    Some(Mutation::Pause(PauseMutation {}))
948                } else {
949                    None
950                }
951            }
952
953            Command::Resume => {
954                // Only resume when the cluster is paused with the same reason.
955                if is_currently_paused {
956                    Some(Mutation::Resume(ResumeMutation {}))
957                } else {
958                    None
959                }
960            }
961
962            Command::SourceChangeSplit(SplitState {
963                split_assignment, ..
964            }) => {
965                let mut diff = HashMap::new();
966
967                for actor_splits in split_assignment.values() {
968                    diff.extend(actor_splits.clone());
969                }
970
971                Some(Mutation::Splits(SourceChangeSplitMutation {
972                    actor_splits: build_actor_connector_splits(&diff),
973                }))
974            }
975
976            Command::Throttle { config, .. } => {
977                let config = config.clone();
978                Some(Mutation::Throttle(ThrottleMutation {
979                    fragment_throttle: config,
980                }))
981            }
982
983            Command::DropStreamingJobs {
984                actors,
985                dropped_sink_fragment_by_targets,
986                ..
987            } => Some(Mutation::Stop(StopMutation {
988                actors: actors.clone(),
989                dropped_sink_fragments: dropped_sink_fragment_by_targets
990                    .values()
991                    .flatten()
992                    .cloned()
993                    .collect(),
994            })),
995
996            Command::CreateStreamingJob {
997                info:
998                    CreateStreamingJobCommandInfo {
999                        stream_job_fragments,
1000                        init_split_assignment: split_assignment,
1001                        upstream_fragment_downstreams,
1002                        fragment_backfill_ordering,
1003                        ..
1004                    },
1005                job_type,
1006                ..
1007            } => {
1008                let edges = edges.as_mut().expect("should exist");
1009                let added_actors = stream_job_fragments.actor_ids().collect();
1010                let actor_splits = split_assignment
1011                    .values()
1012                    .flat_map(build_actor_connector_splits)
1013                    .collect();
1014                let subscriptions_to_add =
1015                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
1016                        job_type
1017                    {
1018                        snapshot_backfill_info
1019                            .upstream_mv_table_id_to_backfill_epoch
1020                            .keys()
1021                            .map(|table_id| SubscriptionUpstreamInfo {
1022                                subscriber_id: stream_job_fragments
1023                                    .stream_job_id()
1024                                    .as_subscriber_id(),
1025                                upstream_mv_table_id: *table_id,
1026                            })
1027                            .collect()
1028                    } else {
1029                        Default::default()
1030                    };
1031                let backfill_nodes_to_pause: Vec<_> =
1032                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1033                        .into_iter()
1034                        .collect();
1035
1036                let new_upstream_sinks =
1037                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1038                        sink_fragment_id,
1039                        sink_output_fields,
1040                        project_exprs,
1041                        new_sink_downstream,
1042                        ..
1043                    }) = job_type
1044                    {
1045                        let new_sink_actors = stream_job_fragments
1046                            .actors_to_create()
1047                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
1048                            .exactly_one()
1049                            .map(|(_, _, actors)| {
1050                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
1051                                    actor_id: actor.actor_id,
1052                                    host: Some(control_stream_manager.host_addr(worker_id)),
1053                                    partial_graph_id: to_partial_graph_id(database_id, None),
1054                                })
1055                            })
1056                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
1057                        let new_upstream_sink = PbNewUpstreamSink {
1058                            info: Some(PbUpstreamSinkInfo {
1059                                upstream_fragment_id: *sink_fragment_id,
1060                                sink_output_schema: sink_output_fields.clone(),
1061                                project_exprs: project_exprs.clone(),
1062                            }),
1063                            upstream_actors: new_sink_actors.collect(),
1064                        };
1065                        HashMap::from([(
1066                            new_sink_downstream.downstream_fragment_id,
1067                            new_upstream_sink,
1068                        )])
1069                    } else {
1070                        HashMap::new()
1071                    };
1072
1073                let actor_cdc_table_snapshot_splits =
1074                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1075                        database_info
1076                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1077                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1078                    } else {
1079                        None
1080                    };
1081
1082                let add_mutation = AddMutation {
1083                    actor_dispatchers: edges
1084                        .dispatchers
1085                        .extract_if(|fragment_id, _| {
1086                            upstream_fragment_downstreams.contains_key(fragment_id)
1087                        })
1088                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1089                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1090                        .collect(),
1091                    added_actors,
1092                    actor_splits,
1093                    // If the cluster is already paused, the new actors should be paused too.
1094                    pause: is_currently_paused,
1095                    subscriptions_to_add,
1096                    backfill_nodes_to_pause,
1097                    actor_cdc_table_snapshot_splits,
1098                    new_upstream_sinks,
1099                };
1100
1101                Some(Mutation::Add(add_mutation))
1102            }
1103
1104            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1105                old_fragments,
1106                replace_upstream,
1107                upstream_fragment_downstreams,
1108                init_split_assignment,
1109                auto_refresh_schema_sinks,
1110                ..
1111            }) => {
1112                let edges = edges.as_mut().expect("should exist");
1113                let merge_updates = edges
1114                    .merge_updates
1115                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1116                    .collect();
1117                let dispatchers = edges
1118                    .dispatchers
1119                    .extract_if(|fragment_id, _| {
1120                        upstream_fragment_downstreams.contains_key(fragment_id)
1121                    })
1122                    .collect();
1123                let actor_cdc_table_snapshot_splits = database_info
1124                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1125                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1126                Self::generate_update_mutation_for_replace_table(
1127                    old_fragments.actor_ids().chain(
1128                        auto_refresh_schema_sinks
1129                            .as_ref()
1130                            .into_iter()
1131                            .flat_map(|sinks| {
1132                                sinks.iter().flat_map(|sink| {
1133                                    sink.original_fragment
1134                                        .actors
1135                                        .iter()
1136                                        .map(|actor| actor.actor_id)
1137                                })
1138                            }),
1139                    ),
1140                    merge_updates,
1141                    dispatchers,
1142                    init_split_assignment,
1143                    actor_cdc_table_snapshot_splits,
1144                    auto_refresh_schema_sinks.as_ref(),
1145                )
1146            }
1147
1148            Command::RescheduleFragment {
1149                reschedules,
1150                fragment_actors,
1151                ..
1152            } => {
1153                let mut dispatcher_update = HashMap::new();
1154                for reschedule in reschedules.values() {
1155                    for &(upstream_fragment_id, dispatcher_id) in
1156                        &reschedule.upstream_fragment_dispatcher_ids
1157                    {
1158                        // Find the actors of the upstream fragment.
1159                        let upstream_actor_ids = fragment_actors
1160                            .get(&upstream_fragment_id)
1161                            .expect("should contain");
1162
1163                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1164
1165                        // Record updates for all actors.
1166                        for &actor_id in upstream_actor_ids {
1167                            let added_downstream_actor_id = if upstream_reschedule
1168                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1169                                .unwrap_or(true)
1170                            {
1171                                reschedule
1172                                    .added_actors
1173                                    .values()
1174                                    .flatten()
1175                                    .cloned()
1176                                    .collect()
1177                            } else {
1178                                Default::default()
1179                            };
1180                            // Index with the dispatcher id to check duplicates.
1181                            dispatcher_update
1182                                .try_insert(
1183                                    (actor_id, dispatcher_id),
1184                                    DispatcherUpdate {
1185                                        actor_id,
1186                                        dispatcher_id,
1187                                        hash_mapping: reschedule
1188                                            .upstream_dispatcher_mapping
1189                                            .as_ref()
1190                                            .map(|m| m.to_protobuf()),
1191                                        added_downstream_actor_id,
1192                                        removed_downstream_actor_id: reschedule
1193                                            .removed_actors
1194                                            .iter()
1195                                            .cloned()
1196                                            .collect(),
1197                                    },
1198                                )
1199                                .unwrap();
1200                        }
1201                    }
1202                }
1203                let dispatcher_update = dispatcher_update.into_values().collect();
1204
1205                let mut merge_update = HashMap::new();
1206                for (&fragment_id, reschedule) in reschedules {
1207                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1208                        // Find the actors of the downstream fragment.
1209                        let downstream_actor_ids = fragment_actors
1210                            .get(&downstream_fragment_id)
1211                            .expect("should contain");
1212
1213                        // Downstream removed actors should be skipped
1214                        // Newly created actors of the current fragment will not dispatch Update
1215                        // barriers to them
1216                        let downstream_removed_actors: HashSet<_> = reschedules
1217                            .get(&downstream_fragment_id)
1218                            .map(|downstream_reschedule| {
1219                                downstream_reschedule
1220                                    .removed_actors
1221                                    .iter()
1222                                    .copied()
1223                                    .collect()
1224                            })
1225                            .unwrap_or_default();
1226
1227                        // Record updates for all actors.
1228                        for &actor_id in downstream_actor_ids {
1229                            if downstream_removed_actors.contains(&actor_id) {
1230                                continue;
1231                            }
1232
1233                            // Index with the fragment id to check duplicates.
1234                            merge_update
1235                                .try_insert(
1236                                    (actor_id, fragment_id),
1237                                    MergeUpdate {
1238                                        actor_id,
1239                                        upstream_fragment_id: fragment_id,
1240                                        new_upstream_fragment_id: None,
1241                                        added_upstream_actors: reschedule
1242                                            .added_actors
1243                                            .iter()
1244                                            .flat_map(|(worker_id, actors)| {
1245                                                let host =
1246                                                    control_stream_manager.host_addr(*worker_id);
1247                                                actors.iter().map(move |&actor_id| PbActorInfo {
1248                                                    actor_id,
1249                                                    host: Some(host.clone()),
1250                                                    // we assume that we only scale the partial graph of database
1251                                                    partial_graph_id: to_partial_graph_id(
1252                                                        database_id,
1253                                                        None,
1254                                                    ),
1255                                                })
1256                                            })
1257                                            .collect(),
1258                                        removed_upstream_actor_id: reschedule
1259                                            .removed_actors
1260                                            .iter()
1261                                            .cloned()
1262                                            .collect(),
1263                                    },
1264                                )
1265                                .unwrap();
1266                        }
1267                    }
1268                }
1269                let merge_update = merge_update.into_values().collect();
1270
1271                let mut actor_vnode_bitmap_update = HashMap::new();
1272                for reschedule in reschedules.values() {
1273                    // Record updates for all actors in this fragment.
1274                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1275                        let bitmap = bitmap.to_protobuf();
1276                        actor_vnode_bitmap_update
1277                            .try_insert(actor_id, bitmap)
1278                            .unwrap();
1279                    }
1280                }
1281                let dropped_actors = reschedules
1282                    .values()
1283                    .flat_map(|r| r.removed_actors.iter().copied())
1284                    .collect();
1285                let mut actor_splits = HashMap::new();
1286                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1287                for (fragment_id, reschedule) in reschedules {
1288                    for (actor_id, splits) in &reschedule.actor_splits {
1289                        actor_splits.insert(
1290                            *actor_id,
1291                            ConnectorSplits {
1292                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1293                            },
1294                        );
1295                    }
1296
1297                    if let Some(assignment) =
1298                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1299                    {
1300                        actor_cdc_table_snapshot_splits.extend(assignment)
1301                    }
1302                }
1303
1304                // we don't create dispatchers in reschedule scenario
1305                let actor_new_dispatchers = HashMap::new();
1306                let mutation = Mutation::Update(UpdateMutation {
1307                    dispatcher_update,
1308                    merge_update,
1309                    actor_vnode_bitmap_update,
1310                    dropped_actors,
1311                    actor_splits,
1312                    actor_new_dispatchers,
1313                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1314                        splits: actor_cdc_table_snapshot_splits,
1315                    }),
1316                    sink_schema_change: Default::default(),
1317                    subscriptions_to_drop: vec![],
1318                });
1319                tracing::debug!("update mutation: {mutation:?}");
1320                Some(mutation)
1321            }
1322
1323            Command::CreateSubscription {
1324                upstream_mv_table_id,
1325                subscription_id,
1326                ..
1327            } => Some(Mutation::Add(AddMutation {
1328                actor_dispatchers: Default::default(),
1329                added_actors: vec![],
1330                actor_splits: Default::default(),
1331                pause: false,
1332                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1333                    upstream_mv_table_id: *upstream_mv_table_id,
1334                    subscriber_id: subscription_id.as_subscriber_id(),
1335                }],
1336                backfill_nodes_to_pause: vec![],
1337                actor_cdc_table_snapshot_splits: None,
1338                new_upstream_sinks: Default::default(),
1339            })),
1340            Command::DropSubscription {
1341                upstream_mv_table_id,
1342                subscription_id,
1343            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1344                info: vec![SubscriptionUpstreamInfo {
1345                    subscriber_id: subscription_id.as_subscriber_id(),
1346                    upstream_mv_table_id: *upstream_mv_table_id,
1347                }],
1348            })),
1349            Command::ConnectorPropsChange(config) => {
1350                let mut connector_props_infos = HashMap::default();
1351                for (k, v) in config {
1352                    connector_props_infos.insert(
1353                        k.as_raw_id(),
1354                        ConnectorPropsInfo {
1355                            connector_props_info: v.clone(),
1356                        },
1357                    );
1358                }
1359                Some(Mutation::ConnectorPropsChange(
1360                    ConnectorPropsChangeMutation {
1361                        connector_props_infos,
1362                    },
1363                ))
1364            }
1365            Command::Refresh {
1366                table_id,
1367                associated_source_id,
1368            } => Some(Mutation::RefreshStart(
1369                risingwave_pb::stream_plan::RefreshStartMutation {
1370                    table_id: *table_id,
1371                    associated_source_id: *associated_source_id,
1372                },
1373            )),
1374            Command::ListFinish {
1375                table_id: _,
1376                associated_source_id,
1377            } => Some(Mutation::ListFinish(ListFinishMutation {
1378                associated_source_id: *associated_source_id,
1379            })),
1380            Command::LoadFinish {
1381                table_id: _,
1382                associated_source_id,
1383            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1384                associated_source_id: *associated_source_id,
1385            })),
1386            Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1387                risingwave_pb::stream_plan::ResetSourceMutation {
1388                    source_id: source_id.as_raw_id(),
1389                },
1390            )),
1391        };
1392        Ok(mutation)
1393    }
1394
1395    pub(super) fn actors_to_create(
1396        &self,
1397        database_info: &InflightDatabaseInfo,
1398        edges: &mut Option<FragmentEdgeBuildResult>,
1399        control_stream_manager: &ControlStreamManager,
1400    ) -> Option<StreamJobActorsToCreate> {
1401        match self {
1402            Command::CreateStreamingJob { info, job_type, .. } => {
1403                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1404                    // for snapshot backfill, the actors to create is measured separately
1405                    return None;
1406                }
1407                let actors_to_create = info.stream_job_fragments.actors_to_create();
1408                let edges = edges.as_mut().expect("should exist");
1409                Some(edges.collect_actors_to_create(actors_to_create.map(
1410                    |(fragment_id, node, actors)| {
1411                        (
1412                            fragment_id,
1413                            node,
1414                            actors,
1415                            [], // no subscriber for new job to create
1416                        )
1417                    },
1418                )))
1419            }
1420            Command::RescheduleFragment {
1421                reschedules,
1422                fragment_actors,
1423                ..
1424            } => {
1425                // we assume that we only scale the actors in database partial graph
1426                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1427                    reschedules.iter().map(|(fragment_id, reschedule)| {
1428                        (
1429                            *fragment_id,
1430                            reschedule.newly_created_actors.values().map(
1431                                |((actor, dispatchers), _)| {
1432                                    (actor.actor_id, dispatchers.as_slice())
1433                                },
1434                            ),
1435                        )
1436                    }),
1437                    Some((reschedules, fragment_actors)),
1438                    database_info,
1439                    control_stream_manager,
1440                );
1441                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1442                for (fragment_id, (actor, dispatchers), worker_id) in
1443                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1444                        reschedule
1445                            .newly_created_actors
1446                            .values()
1447                            .map(|(actors, status)| (*fragment_id, actors, status))
1448                    })
1449                {
1450                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1451                    map.entry(*worker_id)
1452                        .or_default()
1453                        .entry(fragment_id)
1454                        .or_insert_with(|| {
1455                            let node = database_info.fragment(fragment_id).nodes.clone();
1456                            let subscribers =
1457                                database_info.fragment_subscribers(fragment_id).collect();
1458                            (node, vec![], subscribers)
1459                        })
1460                        .1
1461                        .push((actor.clone(), upstreams, dispatchers.clone()));
1462                }
1463                Some(map)
1464            }
1465            Command::ReplaceStreamJob(replace_table) => {
1466                let edges = edges.as_mut().expect("should exist");
1467                let mut actors = edges.collect_actors_to_create(
1468                    replace_table.new_fragments.actors_to_create().map(
1469                        |(fragment_id, node, actors)| {
1470                            (
1471                                fragment_id,
1472                                node,
1473                                actors,
1474                                database_info
1475                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1476                            )
1477                        },
1478                    ),
1479                );
1480                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1481                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1482                        (
1483                            sink.new_fragment.fragment_id,
1484                            &sink.new_fragment.nodes,
1485                            sink.new_fragment.actors.iter().map(|actor| {
1486                                (
1487                                    actor,
1488                                    sink.actor_status[&actor.actor_id]
1489                                        .location
1490                                        .as_ref()
1491                                        .unwrap()
1492                                        .worker_node_id,
1493                                )
1494                            }),
1495                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1496                        )
1497                    }));
1498                    for (worker_id, fragment_actors) in sink_actors {
1499                        actors.entry(worker_id).or_default().extend(fragment_actors);
1500                    }
1501                }
1502                Some(actors)
1503            }
1504            _ => None,
1505        }
1506    }
1507
1508    fn generate_update_mutation_for_replace_table(
1509        dropped_actors: impl IntoIterator<Item = ActorId>,
1510        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1511        dispatchers: FragmentActorDispatchers,
1512        init_split_assignment: &SplitAssignment,
1513        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1514        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1515    ) -> Option<Mutation> {
1516        let dropped_actors = dropped_actors.into_iter().collect();
1517
1518        let actor_new_dispatchers = dispatchers
1519            .into_values()
1520            .flatten()
1521            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1522            .collect();
1523
1524        let actor_splits = init_split_assignment
1525            .values()
1526            .flat_map(build_actor_connector_splits)
1527            .collect();
1528        Some(Mutation::Update(UpdateMutation {
1529            actor_new_dispatchers,
1530            merge_update: merge_updates.into_values().flatten().collect(),
1531            dropped_actors,
1532            actor_splits,
1533            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1534            sink_schema_change: auto_refresh_schema_sinks
1535                .as_ref()
1536                .into_iter()
1537                .flat_map(|sinks| {
1538                    sinks.iter().map(|sink| {
1539                        (
1540                            sink.original_sink.id.as_raw_id(),
1541                            PbSinkSchemaChange {
1542                                original_schema: sink
1543                                    .original_sink
1544                                    .columns
1545                                    .iter()
1546                                    .map(|col| PbField {
1547                                        data_type: Some(
1548                                            col.column_desc
1549                                                .as_ref()
1550                                                .unwrap()
1551                                                .column_type
1552                                                .as_ref()
1553                                                .unwrap()
1554                                                .clone(),
1555                                        ),
1556                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1557                                    })
1558                                    .collect(),
1559                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1560                                    fields: sink
1561                                        .newly_add_fields
1562                                        .iter()
1563                                        .map(|field| field.to_prost())
1564                                        .collect(),
1565                                })),
1566                            },
1567                        )
1568                    })
1569                })
1570                .collect(),
1571            ..Default::default()
1572        }))
1573    }
1574}
1575
1576impl Command {
1577    #[expect(clippy::type_complexity)]
1578    pub(super) fn collect_database_partial_graph_actor_upstreams(
1579        actor_dispatchers: impl Iterator<
1580            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1581        >,
1582        reschedule_dispatcher_update: Option<(
1583            &HashMap<FragmentId, Reschedule>,
1584            &HashMap<FragmentId, HashSet<ActorId>>,
1585        )>,
1586        database_info: &InflightDatabaseInfo,
1587        control_stream_manager: &ControlStreamManager,
1588    ) -> HashMap<ActorId, ActorUpstreams> {
1589        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1590        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1591            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1592            for (upstream_actor_id, dispatchers) in upstream_actors {
1593                let upstream_actor_location =
1594                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1595                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1596                for downstream_actor_id in dispatchers
1597                    .iter()
1598                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1599                {
1600                    actor_upstreams
1601                        .entry(*downstream_actor_id)
1602                        .or_default()
1603                        .entry(upstream_fragment_id)
1604                        .or_default()
1605                        .insert(
1606                            upstream_actor_id,
1607                            PbActorInfo {
1608                                actor_id: upstream_actor_id,
1609                                host: Some(upstream_actor_host.clone()),
1610                                partial_graph_id: to_partial_graph_id(
1611                                    database_info.database_id,
1612                                    None,
1613                                ),
1614                            },
1615                        );
1616                }
1617            }
1618        }
1619        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1620            for reschedule in reschedules.values() {
1621                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1622                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1623                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1624                    for upstream_actor_id in fragment_actors
1625                        .get(upstream_fragment_id)
1626                        .expect("should exist")
1627                    {
1628                        let upstream_actor_location =
1629                            upstream_fragment.actors[upstream_actor_id].worker_id;
1630                        let upstream_actor_host =
1631                            control_stream_manager.host_addr(upstream_actor_location);
1632                        if let Some(upstream_reschedule) = upstream_reschedule
1633                            && upstream_reschedule
1634                                .removed_actors
1635                                .contains(upstream_actor_id)
1636                        {
1637                            continue;
1638                        }
1639                        for (_, downstream_actor_id) in
1640                            reschedule
1641                                .added_actors
1642                                .iter()
1643                                .flat_map(|(worker_id, actors)| {
1644                                    actors.iter().map(|actor| (*worker_id, *actor))
1645                                })
1646                        {
1647                            actor_upstreams
1648                                .entry(downstream_actor_id)
1649                                .or_default()
1650                                .entry(*upstream_fragment_id)
1651                                .or_default()
1652                                .insert(
1653                                    *upstream_actor_id,
1654                                    PbActorInfo {
1655                                        actor_id: *upstream_actor_id,
1656                                        host: Some(upstream_actor_host.clone()),
1657                                        partial_graph_id: to_partial_graph_id(
1658                                            database_info.database_id,
1659                                            None,
1660                                        ),
1661                                    },
1662                                );
1663                        }
1664                    }
1665                }
1666            }
1667        }
1668        actor_upstreams
1669    }
1670}