risingwave_meta/barrier/
command.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
29use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
30use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::catalog::CreateType;
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::PbActorInfo;
35use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
36use risingwave_pb::plan_common::PbField;
37use risingwave_pb::source::{
38    ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
39};
40use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
41use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
42use risingwave_pb::stream_plan::barrier_mutation::Mutation;
43use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
44use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
45use risingwave_pb::stream_plan::stream_node::NodeBody;
46use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
47use risingwave_pb::stream_plan::update_mutation::*;
48use risingwave_pb::stream_plan::{
49    AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
50    ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkSchemaChange,
51    PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation, StopMutation,
52    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
53};
54use risingwave_pb::stream_service::BarrierCompleteResponse;
55use tracing::warn;
56
57use super::info::{CommandFragmentChanges, InflightDatabaseInfo};
58use crate::MetaResult;
59use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
60use crate::barrier::cdc_progress::CdcTableBackfillTracker;
61use crate::barrier::edge_builder::FragmentEdgeBuildResult;
62use crate::barrier::info::BarrierInfo;
63use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
64use crate::barrier::utils::collect_resp_info;
65use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
66use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
67use crate::manager::{StreamingJob, StreamingJobType};
68use crate::model::{
69    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
70    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
71    StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
72};
73use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
74use crate::stream::{
75    AutoRefreshSchemaSinkContext, ConnectorPropsChange, FragmentBackfillOrder, SplitAssignment,
76    SplitState, UpstreamSinkInfo, build_actor_connector_splits,
77};
78
79/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
80/// in some fragment, like scaling or migrating.
81#[derive(Debug, Clone)]
82pub struct Reschedule {
83    /// Added actors in this fragment.
84    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
85
86    /// Removed actors in this fragment.
87    pub removed_actors: HashSet<ActorId>,
88
89    /// Vnode bitmap updates for some actors in this fragment.
90    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
91
92    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
93    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
94    /// New hash mapping of the upstream dispatcher to be updated.
95    ///
96    /// This field exists only when there's upstream fragment and the current fragment is
97    /// hash-sharded.
98    pub upstream_dispatcher_mapping: Option<ActorMapping>,
99
100    /// The downstream fragments of this fragment.
101    pub downstream_fragment_ids: Vec<FragmentId>,
102
103    /// Reassigned splits for source actors.
104    /// It becomes the `actor_splits` in [`UpdateMutation`].
105    /// `Source` and `SourceBackfill` are handled together here.
106    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
107
108    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
109}
110
111/// Replacing an old job with a new one. All actors in the job will be rebuilt.
112///
113/// Current use cases:
114/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
115/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
116///   will replace a table job's plan.
117#[derive(Debug, Clone)]
118pub struct ReplaceStreamJobPlan {
119    pub old_fragments: StreamJobFragments,
120    pub new_fragments: StreamJobFragmentsToCreate,
121    /// Downstream jobs of the replaced job need to update their `Merge` node to
122    /// connect to the new fragment.
123    pub replace_upstream: FragmentReplaceUpstream,
124    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
125    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
126    /// We need to reassign splits for it.
127    ///
128    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
129    /// `backfill_splits`.
130    pub init_split_assignment: SplitAssignment,
131    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
132    pub streaming_job: StreamingJob,
133    /// The temporary dummy job fragments id of new table fragment
134    pub tmp_id: JobId,
135    /// The state table ids to be dropped.
136    pub to_drop_state_table_ids: Vec<TableId>,
137    pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
138}
139
140impl ReplaceStreamJobPlan {
141    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
142        let mut fragment_changes = HashMap::new();
143        for (fragment_id, new_fragment) in self
144            .new_fragments
145            .new_fragment_info(&self.init_split_assignment)
146        {
147            let fragment_change = CommandFragmentChanges::NewFragment {
148                job_id: self.streaming_job.id(),
149                info: new_fragment,
150            };
151            fragment_changes
152                .try_insert(fragment_id, fragment_change)
153                .expect("non-duplicate");
154        }
155        for fragment in self.old_fragments.fragments.values() {
156            fragment_changes
157                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
158                .expect("non-duplicate");
159        }
160        for (fragment_id, replace_map) in &self.replace_upstream {
161            fragment_changes
162                .try_insert(
163                    *fragment_id,
164                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
165                )
166                .expect("non-duplicate");
167        }
168        if let Some(sinks) = &self.auto_refresh_schema_sinks {
169            for sink in sinks {
170                let fragment_change = CommandFragmentChanges::NewFragment {
171                    job_id: sink.original_sink.id.as_job_id(),
172                    info: sink.new_fragment_info(),
173                };
174                fragment_changes
175                    .try_insert(sink.new_fragment.fragment_id, fragment_change)
176                    .expect("non-duplicate");
177                fragment_changes
178                    .try_insert(
179                        sink.original_fragment.fragment_id,
180                        CommandFragmentChanges::RemoveFragment,
181                    )
182                    .expect("non-duplicate");
183            }
184        }
185        fragment_changes
186    }
187
188    /// `old_fragment_id` -> `new_fragment_id`
189    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
190        let mut fragment_replacements = HashMap::new();
191        for (upstream_fragment_id, new_upstream_fragment_id) in
192            self.replace_upstream.values().flatten()
193        {
194            {
195                let r =
196                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
197                if let Some(r) = r {
198                    assert_eq!(
199                        *new_upstream_fragment_id, r,
200                        "one fragment is replaced by multiple fragments"
201                    );
202                }
203            }
204        }
205        fragment_replacements
206    }
207}
208
209#[derive(educe::Educe, Clone)]
210#[educe(Debug)]
211pub struct CreateStreamingJobCommandInfo {
212    #[educe(Debug(ignore))]
213    pub stream_job_fragments: StreamJobFragmentsToCreate,
214    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
215    pub init_split_assignment: SplitAssignment,
216    pub definition: String,
217    pub job_type: StreamingJobType,
218    pub create_type: CreateType,
219    pub streaming_job: StreamingJob,
220    pub fragment_backfill_ordering: FragmentBackfillOrder,
221    pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
222    pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
223    pub is_serverless: bool,
224}
225
226impl StreamJobFragments {
227    pub(super) fn new_fragment_info<'a>(
228        &'a self,
229        assignment: &'a SplitAssignment,
230    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
231        self.fragments.values().map(|fragment| {
232            let mut fragment_splits = assignment
233                .get(&fragment.fragment_id)
234                .cloned()
235                .unwrap_or_default();
236
237            (
238                fragment.fragment_id,
239                InflightFragmentInfo {
240                    fragment_id: fragment.fragment_id,
241                    distribution_type: fragment.distribution_type.into(),
242                    fragment_type_mask: fragment.fragment_type_mask,
243                    vnode_count: fragment.vnode_count(),
244                    nodes: fragment.nodes.clone(),
245                    actors: fragment
246                        .actors
247                        .iter()
248                        .map(|actor| {
249                            (
250                                actor.actor_id,
251                                InflightActorInfo {
252                                    worker_id: self
253                                        .actor_status
254                                        .get(&actor.actor_id)
255                                        .expect("should exist")
256                                        .worker_id(),
257                                    vnode_bitmap: actor.vnode_bitmap.clone(),
258                                    splits: fragment_splits
259                                        .remove(&actor.actor_id)
260                                        .unwrap_or_default(),
261                                },
262                            )
263                        })
264                        .collect(),
265                    state_table_ids: fragment.state_table_ids.iter().copied().collect(),
266                },
267            )
268        })
269    }
270}
271
272#[derive(Debug, Clone)]
273pub struct SnapshotBackfillInfo {
274    /// `table_id` -> `Some(snapshot_backfill_epoch)`
275    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
276    /// by global barrier worker when handling the command.
277    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
278}
279
280#[derive(Debug, Clone)]
281pub enum CreateStreamingJobType {
282    Normal,
283    SinkIntoTable(UpstreamSinkInfo),
284    SnapshotBackfill(SnapshotBackfillInfo),
285}
286
287/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
288/// it will [build different barriers to send](Self::to_mutation),
289/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
290// FIXME: this enum is significantly large on stack, box it
291#[derive(Debug)]
292pub enum Command {
293    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
294    /// all messages before the checkpoint barrier should have been committed.
295    Flush,
296
297    /// `Pause` command generates a `Pause` barrier **only if**
298    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
299    Pause,
300
301    /// `Resume` command generates a `Resume` barrier **only
302    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
303    /// will be generated.
304    Resume,
305
306    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
307    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
308    /// dropped by reference counts before.
309    ///
310    /// Barriers from the actors to be dropped will STILL be collected.
311    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
312    /// drop actors, and then delete the job fragments info from meta store.
313    DropStreamingJobs {
314        streaming_job_ids: HashSet<JobId>,
315        actors: Vec<ActorId>,
316        unregistered_state_table_ids: HashSet<TableId>,
317        unregistered_fragment_ids: HashSet<FragmentId>,
318        // target_fragment -> [sink_fragments]
319        dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
320    },
321
322    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
323    ///
324    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
325    /// be collected since the barrier should be passthrough.
326    ///
327    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
328    /// it adds the job fragments info to meta store. However, the creating progress will **last
329    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
330    /// will be set to `Created`.
331    CreateStreamingJob {
332        info: CreateStreamingJobCommandInfo,
333        job_type: CreateStreamingJobType,
334        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
335    },
336
337    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
338    /// Mainly used for scaling and migration.
339    ///
340    /// Barriers from which actors should be collected, and the post behavior of this command are
341    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
342    RescheduleFragment {
343        reschedules: HashMap<FragmentId, Reschedule>,
344        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
345        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
346    },
347
348    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
349    /// essentially switching the downstream of the old job fragments to the new ones, and
350    /// dropping the old job fragments. Used for schema change.
351    ///
352    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
353    /// of the Merge executors are changed additionally.
354    ReplaceStreamJob(ReplaceStreamJobPlan),
355
356    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
357    /// changed splits.
358    SourceChangeSplit(SplitState),
359
360    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
361    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
362    Throttle {
363        jobs: HashSet<JobId>,
364        config: HashMap<FragmentId, Option<u32>>,
365    },
366
367    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
368    /// materialize executor to start storing old value for subscription.
369    CreateSubscription {
370        subscription_id: SubscriptionId,
371        upstream_mv_table_id: TableId,
372        retention_second: u64,
373    },
374
375    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
376    /// materialize executor to stop storing old value when there is no
377    /// subscription depending on it.
378    DropSubscription {
379        subscription_id: SubscriptionId,
380        upstream_mv_table_id: TableId,
381    },
382
383    ConnectorPropsChange(ConnectorPropsChange),
384
385    /// `Refresh` command generates a barrier to refresh a table by truncating state
386    /// and reloading data from source.
387    Refresh {
388        table_id: TableId,
389        associated_source_id: SourceId,
390    },
391    ListFinish {
392        table_id: TableId,
393        associated_source_id: SourceId,
394    },
395    LoadFinish {
396        table_id: TableId,
397        associated_source_id: SourceId,
398    },
399
400    /// `ResetSource` command generates a barrier to reset CDC source offset to latest.
401    /// Used when upstream binlog/oplog has expired.
402    ResetSource {
403        source_id: SourceId,
404    },
405}
406
407// For debugging and observability purposes. Can add more details later if needed.
408impl std::fmt::Display for Command {
409    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
410        match self {
411            Command::Flush => write!(f, "Flush"),
412            Command::Pause => write!(f, "Pause"),
413            Command::Resume => write!(f, "Resume"),
414            Command::DropStreamingJobs {
415                streaming_job_ids, ..
416            } => {
417                write!(
418                    f,
419                    "DropStreamingJobs: {}",
420                    streaming_job_ids.iter().sorted().join(", ")
421                )
422            }
423            Command::CreateStreamingJob { info, .. } => {
424                write!(f, "CreateStreamingJob: {}", info.streaming_job)
425            }
426            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
427            Command::ReplaceStreamJob(plan) => {
428                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
429            }
430            Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
431            Command::Throttle { .. } => write!(f, "Throttle"),
432            Command::CreateSubscription {
433                subscription_id, ..
434            } => write!(f, "CreateSubscription: {subscription_id}"),
435            Command::DropSubscription {
436                subscription_id, ..
437            } => write!(f, "DropSubscription: {subscription_id}"),
438            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
439            Command::Refresh {
440                table_id,
441                associated_source_id,
442            } => write!(
443                f,
444                "Refresh: {} (source: {})",
445                table_id, associated_source_id
446            ),
447            Command::ListFinish {
448                table_id,
449                associated_source_id,
450            } => write!(
451                f,
452                "ListFinish: {} (source: {})",
453                table_id, associated_source_id
454            ),
455            Command::LoadFinish {
456                table_id,
457                associated_source_id,
458            } => write!(
459                f,
460                "LoadFinish: {} (source: {})",
461                table_id, associated_source_id
462            ),
463            Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
464        }
465    }
466}
467
468impl Command {
469    pub fn pause() -> Self {
470        Self::Pause
471    }
472
473    pub fn resume() -> Self {
474        Self::Resume
475    }
476
477    #[expect(clippy::type_complexity)]
478    pub(super) fn fragment_changes(
479        &self,
480    ) -> Option<(
481        Option<(JobId, Option<CdcTableBackfillTracker>)>,
482        HashMap<FragmentId, CommandFragmentChanges>,
483    )> {
484        match self {
485            Command::Flush => None,
486            Command::Pause => None,
487            Command::Resume => None,
488            Command::DropStreamingJobs {
489                unregistered_fragment_ids,
490                dropped_sink_fragment_by_targets,
491                ..
492            } => {
493                let changes = unregistered_fragment_ids
494                    .iter()
495                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
496                    .chain(dropped_sink_fragment_by_targets.iter().map(
497                        |(target_fragment, sink_fragments)| {
498                            (
499                                *target_fragment,
500                                CommandFragmentChanges::DropNodeUpstream(sink_fragments.clone()),
501                            )
502                        },
503                    ))
504                    .collect();
505
506                Some((None, changes))
507            }
508            Command::CreateStreamingJob { info, job_type, .. } => {
509                assert!(
510                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
511                    "should handle fragment changes separately for snapshot backfill"
512                );
513                let mut changes: HashMap<_, _> = info
514                    .stream_job_fragments
515                    .new_fragment_info(&info.init_split_assignment)
516                    .map(|(fragment_id, fragment_infos)| {
517                        (
518                            fragment_id,
519                            CommandFragmentChanges::NewFragment {
520                                job_id: info.streaming_job.id(),
521                                info: fragment_infos,
522                            },
523                        )
524                    })
525                    .collect();
526
527                if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
528                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
529                    changes.insert(
530                        downstream_fragment_id,
531                        CommandFragmentChanges::AddNodeUpstream(PbUpstreamSinkInfo {
532                            upstream_fragment_id: ctx.sink_fragment_id,
533                            sink_output_schema: ctx.sink_output_fields.clone(),
534                            project_exprs: ctx.project_exprs.clone(),
535                        }),
536                    );
537                }
538
539                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
540                    let (fragment, _) =
541                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
542                            .expect("should have parallel cdc fragment");
543                    Some(CdcTableBackfillTracker::new(
544                        fragment.fragment_id,
545                        splits.clone(),
546                    ))
547                } else {
548                    None
549                };
550
551                Some((Some((info.streaming_job.id(), cdc_tracker)), changes))
552            }
553            Command::RescheduleFragment { reschedules, .. } => Some((
554                None,
555                reschedules
556                    .iter()
557                    .map(|(fragment_id, reschedule)| {
558                        (
559                            *fragment_id,
560                            CommandFragmentChanges::Reschedule {
561                                new_actors: reschedule
562                                    .added_actors
563                                    .iter()
564                                    .flat_map(|(node_id, actors)| {
565                                        actors.iter().map(|actor_id| {
566                                            (
567                                                *actor_id,
568                                                InflightActorInfo {
569                                                    worker_id: *node_id,
570                                                    vnode_bitmap: reschedule
571                                                        .newly_created_actors
572                                                        .get(actor_id)
573                                                        .expect("should exist")
574                                                        .0
575                                                        .0
576                                                        .vnode_bitmap
577                                                        .clone(),
578                                                    splits: reschedule
579                                                        .actor_splits
580                                                        .get(actor_id)
581                                                        .cloned()
582                                                        .unwrap_or_default(),
583                                                },
584                                            )
585                                        })
586                                    })
587                                    .collect(),
588                                actor_update_vnode_bitmap: reschedule
589                                    .vnode_bitmap_updates
590                                    .iter()
591                                    .filter(|(actor_id, _)| {
592                                        // only keep the existing actors
593                                        !reschedule.newly_created_actors.contains_key(actor_id)
594                                    })
595                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
596                                    .collect(),
597                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
598                                actor_splits: reschedule.actor_splits.clone(),
599                            },
600                        )
601                    })
602                    .collect(),
603            )),
604            Command::ReplaceStreamJob(plan) => Some((None, plan.fragment_changes())),
605            Command::SourceChangeSplit(SplitState {
606                split_assignment, ..
607            }) => Some((
608                None,
609                split_assignment
610                    .iter()
611                    .map(|(&fragment_id, splits)| {
612                        (
613                            fragment_id,
614                            CommandFragmentChanges::SplitAssignment {
615                                actor_splits: splits.clone(),
616                            },
617                        )
618                    })
619                    .collect(),
620            )),
621            Command::Throttle { .. } => None,
622            Command::CreateSubscription { .. } => None,
623            Command::DropSubscription { .. } => None,
624            Command::ConnectorPropsChange(_) => None,
625            Command::Refresh { .. } => None, // Refresh doesn't change fragment structure
626            Command::ListFinish { .. } => None, // ListFinish doesn't change fragment structure
627            Command::LoadFinish { .. } => None, // LoadFinish doesn't change fragment structure
628            Command::ResetSource { .. } => None, // ResetSource doesn't change fragment structure
629        }
630    }
631
632    pub fn need_checkpoint(&self) -> bool {
633        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
634        !matches!(self, Command::Resume)
635    }
636}
637
638#[derive(Debug, Clone)]
639pub enum BarrierKind {
640    Initial,
641    Barrier,
642    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
643    Checkpoint(Vec<u64>),
644}
645
646impl BarrierKind {
647    pub fn to_protobuf(&self) -> PbBarrierKind {
648        match self {
649            BarrierKind::Initial => PbBarrierKind::Initial,
650            BarrierKind::Barrier => PbBarrierKind::Barrier,
651            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
652        }
653    }
654
655    pub fn is_checkpoint(&self) -> bool {
656        matches!(self, BarrierKind::Checkpoint(_))
657    }
658
659    pub fn is_initial(&self) -> bool {
660        matches!(self, BarrierKind::Initial)
661    }
662
663    pub fn as_str_name(&self) -> &'static str {
664        match self {
665            BarrierKind::Initial => "Initial",
666            BarrierKind::Barrier => "Barrier",
667            BarrierKind::Checkpoint(_) => "Checkpoint",
668        }
669    }
670}
671
672/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
673/// [`Command`].
674pub(super) struct CommandContext {
675    mv_subscription_max_retention: HashMap<TableId, u64>,
676
677    pub(super) barrier_info: BarrierInfo,
678
679    pub(super) table_ids_to_commit: HashSet<TableId>,
680
681    pub(super) command: Option<Command>,
682
683    /// The tracing span of this command.
684    ///
685    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
686    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
687    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
688    _span: tracing::Span,
689}
690
691impl std::fmt::Debug for CommandContext {
692    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
693        f.debug_struct("CommandContext")
694            .field("barrier_info", &self.barrier_info)
695            .field("command", &self.command)
696            .finish()
697    }
698}
699
700impl std::fmt::Display for CommandContext {
701    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
702        write!(
703            f,
704            "prev_epoch={}, curr_epoch={}, kind={}",
705            self.barrier_info.prev_epoch(),
706            self.barrier_info.curr_epoch(),
707            self.barrier_info.kind.as_str_name()
708        )?;
709        if let Some(command) = &self.command {
710            write!(f, ", command={}", command)?;
711        }
712        Ok(())
713    }
714}
715
716impl CommandContext {
717    pub(super) fn new(
718        barrier_info: BarrierInfo,
719        mv_subscription_max_retention: HashMap<TableId, u64>,
720        table_ids_to_commit: HashSet<TableId>,
721        command: Option<Command>,
722        span: tracing::Span,
723    ) -> Self {
724        Self {
725            mv_subscription_max_retention,
726            barrier_info,
727            table_ids_to_commit,
728            command,
729            _span: span,
730        }
731    }
732
733    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
734        let Some(truncate_timestamptz) = Timestamptz::from_secs(
735            self.barrier_info
736                .prev_epoch
737                .value()
738                .as_timestamptz()
739                .timestamp()
740                - retention_second as i64,
741        ) else {
742            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
743            return self.barrier_info.prev_epoch.value();
744        };
745        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
746    }
747
748    pub(super) fn collect_commit_epoch_info(
749        &self,
750        info: &mut CommitEpochInfo,
751        resps: Vec<BarrierCompleteResponse>,
752        backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
753    ) {
754        let (
755            sst_to_context,
756            synced_ssts,
757            new_table_watermarks,
758            old_value_ssts,
759            vector_index_adds,
760            truncate_tables,
761        ) = collect_resp_info(resps);
762
763        let new_table_fragment_infos =
764            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
765                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
766            {
767                let table_fragments = &info.stream_job_fragments;
768                let mut table_ids: HashSet<_> =
769                    table_fragments.internal_table_ids().into_iter().collect();
770                if let Some(mv_table_id) = table_fragments.mv_table_id() {
771                    table_ids.insert(mv_table_id);
772                }
773
774                vec![NewTableFragmentInfo { table_ids }]
775            } else {
776                vec![]
777            };
778
779        let mut mv_log_store_truncate_epoch = HashMap::new();
780        // TODO: may collect cross db snapshot backfill
781        let mut update_truncate_epoch =
782            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
783                Entry::Occupied(mut entry) => {
784                    let prev_truncate_epoch = entry.get_mut();
785                    if truncate_epoch < *prev_truncate_epoch {
786                        *prev_truncate_epoch = truncate_epoch;
787                    }
788                }
789                Entry::Vacant(entry) => {
790                    entry.insert(truncate_epoch);
791                }
792            };
793        for (mv_table_id, max_retention) in &self.mv_subscription_max_retention {
794            let truncate_epoch = self.get_truncate_epoch(*max_retention).0;
795            update_truncate_epoch(*mv_table_id, truncate_epoch);
796        }
797        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
798            for mv_table_id in upstream_mv_table_ids {
799                update_truncate_epoch(mv_table_id, backfill_epoch);
800            }
801        }
802
803        let table_new_change_log = build_table_change_log_delta(
804            old_value_ssts.into_iter(),
805            synced_ssts.iter().map(|sst| &sst.sst_info),
806            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
807            mv_log_store_truncate_epoch.into_iter(),
808        );
809
810        let epoch = self.barrier_info.prev_epoch();
811        for table_id in &self.table_ids_to_commit {
812            info.tables_to_commit
813                .try_insert(*table_id, epoch)
814                .expect("non duplicate");
815        }
816
817        info.sstables.extend(synced_ssts);
818        info.new_table_watermarks.extend(new_table_watermarks);
819        info.sst_to_context.extend(sst_to_context);
820        info.new_table_fragment_infos
821            .extend(new_table_fragment_infos);
822        info.change_log_delta.extend(table_new_change_log);
823        for (table_id, vector_index_adds) in vector_index_adds {
824            info.vector_index_delta
825                .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
826                .expect("non-duplicate");
827        }
828        if let Some(Command::CreateStreamingJob { info: job_info, .. }) = &self.command {
829            for fragment in job_info.stream_job_fragments.fragments.values() {
830                visit_stream_node_cont(&fragment.nodes, |node| {
831                    match node.node_body.as_ref().unwrap() {
832                        NodeBody::VectorIndexWrite(vector_index_write) => {
833                            let index_table = vector_index_write.table.as_ref().unwrap();
834                            assert_eq!(index_table.table_type, PbTableType::VectorIndex as i32);
835                            info.vector_index_delta
836                                .try_insert(
837                                    index_table.id,
838                                    VectorIndexDelta::Init(PbVectorIndexInit {
839                                        info: Some(index_table.vector_index_info.unwrap()),
840                                    }),
841                                )
842                                .expect("non-duplicate");
843                            false
844                        }
845                        _ => true,
846                    }
847                })
848            }
849        }
850        info.truncate_tables.extend(truncate_tables);
851    }
852}
853
854impl Command {
855    /// Generate a mutation for the given command.
856    ///
857    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
858    pub(super) fn to_mutation(
859        &self,
860        is_currently_paused: bool,
861        edges: &mut Option<FragmentEdgeBuildResult>,
862        control_stream_manager: &ControlStreamManager,
863        database_info: &mut InflightDatabaseInfo,
864    ) -> MetaResult<Option<Mutation>> {
865        let mutation = match self {
866            Command::Flush => None,
867
868            Command::Pause => {
869                // Only pause when the cluster is not already paused.
870                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
871                if !is_currently_paused {
872                    Some(Mutation::Pause(PauseMutation {}))
873                } else {
874                    None
875                }
876            }
877
878            Command::Resume => {
879                // Only resume when the cluster is paused with the same reason.
880                if is_currently_paused {
881                    Some(Mutation::Resume(ResumeMutation {}))
882                } else {
883                    None
884                }
885            }
886
887            Command::SourceChangeSplit(SplitState {
888                split_assignment, ..
889            }) => {
890                let mut diff = HashMap::new();
891
892                for actor_splits in split_assignment.values() {
893                    diff.extend(actor_splits.clone());
894                }
895
896                Some(Mutation::Splits(SourceChangeSplitMutation {
897                    actor_splits: build_actor_connector_splits(&diff),
898                }))
899            }
900
901            Command::Throttle { config, .. } => {
902                let mut fragment_to_apply = HashMap::new();
903                for (fragment_id, limit) in config {
904                    fragment_to_apply.insert(*fragment_id, RateLimit { rate_limit: *limit });
905                }
906
907                Some(Mutation::Throttle(ThrottleMutation {
908                    fragment_throttle: fragment_to_apply,
909                }))
910            }
911
912            Command::DropStreamingJobs {
913                actors,
914                dropped_sink_fragment_by_targets,
915                ..
916            } => Some(Mutation::Stop(StopMutation {
917                actors: actors.clone(),
918                dropped_sink_fragments: dropped_sink_fragment_by_targets
919                    .values()
920                    .flatten()
921                    .cloned()
922                    .collect(),
923            })),
924
925            Command::CreateStreamingJob {
926                info:
927                    CreateStreamingJobCommandInfo {
928                        stream_job_fragments,
929                        init_split_assignment: split_assignment,
930                        upstream_fragment_downstreams,
931                        fragment_backfill_ordering,
932                        ..
933                    },
934                job_type,
935                ..
936            } => {
937                let edges = edges.as_mut().expect("should exist");
938                let added_actors = stream_job_fragments.actor_ids().collect();
939                let actor_splits = split_assignment
940                    .values()
941                    .flat_map(build_actor_connector_splits)
942                    .collect();
943                let subscriptions_to_add =
944                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
945                        job_type
946                    {
947                        snapshot_backfill_info
948                            .upstream_mv_table_id_to_backfill_epoch
949                            .keys()
950                            .map(|table_id| SubscriptionUpstreamInfo {
951                                subscriber_id: stream_job_fragments
952                                    .stream_job_id()
953                                    .as_subscriber_id(),
954                                upstream_mv_table_id: *table_id,
955                            })
956                            .collect()
957                    } else {
958                        Default::default()
959                    };
960                let backfill_nodes_to_pause: Vec<_> =
961                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
962                        .into_iter()
963                        .collect();
964
965                let new_upstream_sinks =
966                    if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
967                        sink_fragment_id,
968                        sink_output_fields,
969                        project_exprs,
970                        new_sink_downstream,
971                        ..
972                    }) = job_type
973                    {
974                        let new_sink_actors = stream_job_fragments
975                            .actors_to_create()
976                            .filter(|(fragment_id, _, _)| *fragment_id == *sink_fragment_id)
977                            .exactly_one()
978                            .map(|(_, _, actors)| {
979                                actors.into_iter().map(|(actor, worker_id)| PbActorInfo {
980                                    actor_id: actor.actor_id,
981                                    host: Some(control_stream_manager.host_addr(worker_id)),
982                                    partial_graph_id: to_partial_graph_id(None),
983                                })
984                            })
985                            .unwrap_or_else(|_| panic!("should have exactly one sink actor"));
986                        let new_upstream_sink = PbNewUpstreamSink {
987                            info: Some(PbUpstreamSinkInfo {
988                                upstream_fragment_id: *sink_fragment_id,
989                                sink_output_schema: sink_output_fields.clone(),
990                                project_exprs: project_exprs.clone(),
991                            }),
992                            upstream_actors: new_sink_actors.collect(),
993                        };
994                        HashMap::from([(
995                            new_sink_downstream.downstream_fragment_id,
996                            new_upstream_sink,
997                        )])
998                    } else {
999                        HashMap::new()
1000                    };
1001
1002                let actor_cdc_table_snapshot_splits =
1003                    if !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)) {
1004                        database_info
1005                            .assign_cdc_backfill_splits(stream_job_fragments.stream_job_id)?
1006                            .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits })
1007                    } else {
1008                        None
1009                    };
1010
1011                let add_mutation = AddMutation {
1012                    actor_dispatchers: edges
1013                        .dispatchers
1014                        .extract_if(|fragment_id, _| {
1015                            upstream_fragment_downstreams.contains_key(fragment_id)
1016                        })
1017                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1018                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1019                        .collect(),
1020                    added_actors,
1021                    actor_splits,
1022                    // If the cluster is already paused, the new actors should be paused too.
1023                    pause: is_currently_paused,
1024                    subscriptions_to_add,
1025                    backfill_nodes_to_pause,
1026                    actor_cdc_table_snapshot_splits,
1027                    new_upstream_sinks,
1028                };
1029
1030                Some(Mutation::Add(add_mutation))
1031            }
1032
1033            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
1034                old_fragments,
1035                replace_upstream,
1036                upstream_fragment_downstreams,
1037                init_split_assignment,
1038                auto_refresh_schema_sinks,
1039                ..
1040            }) => {
1041                let edges = edges.as_mut().expect("should exist");
1042                let merge_updates = edges
1043                    .merge_updates
1044                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1045                    .collect();
1046                let dispatchers = edges
1047                    .dispatchers
1048                    .extract_if(|fragment_id, _| {
1049                        upstream_fragment_downstreams.contains_key(fragment_id)
1050                    })
1051                    .collect();
1052                let actor_cdc_table_snapshot_splits = database_info
1053                    .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1054                    .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1055                Self::generate_update_mutation_for_replace_table(
1056                    old_fragments.actor_ids().chain(
1057                        auto_refresh_schema_sinks
1058                            .as_ref()
1059                            .into_iter()
1060                            .flat_map(|sinks| {
1061                                sinks.iter().flat_map(|sink| {
1062                                    sink.original_fragment
1063                                        .actors
1064                                        .iter()
1065                                        .map(|actor| actor.actor_id)
1066                                })
1067                            }),
1068                    ),
1069                    merge_updates,
1070                    dispatchers,
1071                    init_split_assignment,
1072                    actor_cdc_table_snapshot_splits,
1073                    auto_refresh_schema_sinks.as_ref(),
1074                )
1075            }
1076
1077            Command::RescheduleFragment {
1078                reschedules,
1079                fragment_actors,
1080                ..
1081            } => {
1082                let mut dispatcher_update = HashMap::new();
1083                for reschedule in reschedules.values() {
1084                    for &(upstream_fragment_id, dispatcher_id) in
1085                        &reschedule.upstream_fragment_dispatcher_ids
1086                    {
1087                        // Find the actors of the upstream fragment.
1088                        let upstream_actor_ids = fragment_actors
1089                            .get(&upstream_fragment_id)
1090                            .expect("should contain");
1091
1092                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1093
1094                        // Record updates for all actors.
1095                        for &actor_id in upstream_actor_ids {
1096                            let added_downstream_actor_id = if upstream_reschedule
1097                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1098                                .unwrap_or(true)
1099                            {
1100                                reschedule
1101                                    .added_actors
1102                                    .values()
1103                                    .flatten()
1104                                    .cloned()
1105                                    .collect()
1106                            } else {
1107                                Default::default()
1108                            };
1109                            // Index with the dispatcher id to check duplicates.
1110                            dispatcher_update
1111                                .try_insert(
1112                                    (actor_id, dispatcher_id),
1113                                    DispatcherUpdate {
1114                                        actor_id,
1115                                        dispatcher_id,
1116                                        hash_mapping: reschedule
1117                                            .upstream_dispatcher_mapping
1118                                            .as_ref()
1119                                            .map(|m| m.to_protobuf()),
1120                                        added_downstream_actor_id,
1121                                        removed_downstream_actor_id: reschedule
1122                                            .removed_actors
1123                                            .iter()
1124                                            .cloned()
1125                                            .collect(),
1126                                    },
1127                                )
1128                                .unwrap();
1129                        }
1130                    }
1131                }
1132                let dispatcher_update = dispatcher_update.into_values().collect();
1133
1134                let mut merge_update = HashMap::new();
1135                for (&fragment_id, reschedule) in reschedules {
1136                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1137                        // Find the actors of the downstream fragment.
1138                        let downstream_actor_ids = fragment_actors
1139                            .get(&downstream_fragment_id)
1140                            .expect("should contain");
1141
1142                        // Downstream removed actors should be skipped
1143                        // Newly created actors of the current fragment will not dispatch Update
1144                        // barriers to them
1145                        let downstream_removed_actors: HashSet<_> = reschedules
1146                            .get(&downstream_fragment_id)
1147                            .map(|downstream_reschedule| {
1148                                downstream_reschedule
1149                                    .removed_actors
1150                                    .iter()
1151                                    .copied()
1152                                    .collect()
1153                            })
1154                            .unwrap_or_default();
1155
1156                        // Record updates for all actors.
1157                        for &actor_id in downstream_actor_ids {
1158                            if downstream_removed_actors.contains(&actor_id) {
1159                                continue;
1160                            }
1161
1162                            // Index with the fragment id to check duplicates.
1163                            merge_update
1164                                .try_insert(
1165                                    (actor_id, fragment_id),
1166                                    MergeUpdate {
1167                                        actor_id,
1168                                        upstream_fragment_id: fragment_id,
1169                                        new_upstream_fragment_id: None,
1170                                        added_upstream_actors: reschedule
1171                                            .added_actors
1172                                            .iter()
1173                                            .flat_map(|(worker_id, actors)| {
1174                                                let host =
1175                                                    control_stream_manager.host_addr(*worker_id);
1176                                                actors.iter().map(move |&actor_id| PbActorInfo {
1177                                                    actor_id,
1178                                                    host: Some(host.clone()),
1179                                                    // we assume that we only scale the partial graph of database
1180                                                    partial_graph_id: to_partial_graph_id(None),
1181                                                })
1182                                            })
1183                                            .collect(),
1184                                        removed_upstream_actor_id: reschedule
1185                                            .removed_actors
1186                                            .iter()
1187                                            .cloned()
1188                                            .collect(),
1189                                    },
1190                                )
1191                                .unwrap();
1192                        }
1193                    }
1194                }
1195                let merge_update = merge_update.into_values().collect();
1196
1197                let mut actor_vnode_bitmap_update = HashMap::new();
1198                for reschedule in reschedules.values() {
1199                    // Record updates for all actors in this fragment.
1200                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1201                        let bitmap = bitmap.to_protobuf();
1202                        actor_vnode_bitmap_update
1203                            .try_insert(actor_id, bitmap)
1204                            .unwrap();
1205                    }
1206                }
1207                let dropped_actors = reschedules
1208                    .values()
1209                    .flat_map(|r| r.removed_actors.iter().copied())
1210                    .collect();
1211                let mut actor_splits = HashMap::new();
1212                let mut actor_cdc_table_snapshot_splits = HashMap::new();
1213                for (fragment_id, reschedule) in reschedules {
1214                    for (actor_id, splits) in &reschedule.actor_splits {
1215                        actor_splits.insert(
1216                            *actor_id,
1217                            ConnectorSplits {
1218                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1219                            },
1220                        );
1221                    }
1222
1223                    if let Some(assignment) =
1224                        database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1225                    {
1226                        actor_cdc_table_snapshot_splits.extend(assignment)
1227                    }
1228                }
1229
1230                // we don't create dispatchers in reschedule scenario
1231                let actor_new_dispatchers = HashMap::new();
1232                let mutation = Mutation::Update(UpdateMutation {
1233                    dispatcher_update,
1234                    merge_update,
1235                    actor_vnode_bitmap_update,
1236                    dropped_actors,
1237                    actor_splits,
1238                    actor_new_dispatchers,
1239                    actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1240                        splits: actor_cdc_table_snapshot_splits,
1241                    }),
1242                    sink_schema_change: Default::default(),
1243                    subscriptions_to_drop: vec![],
1244                });
1245                tracing::debug!("update mutation: {mutation:?}");
1246                Some(mutation)
1247            }
1248
1249            Command::CreateSubscription {
1250                upstream_mv_table_id,
1251                subscription_id,
1252                ..
1253            } => Some(Mutation::Add(AddMutation {
1254                actor_dispatchers: Default::default(),
1255                added_actors: vec![],
1256                actor_splits: Default::default(),
1257                pause: false,
1258                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1259                    upstream_mv_table_id: *upstream_mv_table_id,
1260                    subscriber_id: subscription_id.as_subscriber_id(),
1261                }],
1262                backfill_nodes_to_pause: vec![],
1263                actor_cdc_table_snapshot_splits: None,
1264                new_upstream_sinks: Default::default(),
1265            })),
1266            Command::DropSubscription {
1267                upstream_mv_table_id,
1268                subscription_id,
1269            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1270                info: vec![SubscriptionUpstreamInfo {
1271                    subscriber_id: subscription_id.as_subscriber_id(),
1272                    upstream_mv_table_id: *upstream_mv_table_id,
1273                }],
1274            })),
1275            Command::ConnectorPropsChange(config) => {
1276                let mut connector_props_infos = HashMap::default();
1277                for (k, v) in config {
1278                    connector_props_infos.insert(
1279                        k.as_raw_id(),
1280                        ConnectorPropsInfo {
1281                            connector_props_info: v.clone(),
1282                        },
1283                    );
1284                }
1285                Some(Mutation::ConnectorPropsChange(
1286                    ConnectorPropsChangeMutation {
1287                        connector_props_infos,
1288                    },
1289                ))
1290            }
1291            Command::Refresh {
1292                table_id,
1293                associated_source_id,
1294            } => Some(Mutation::RefreshStart(
1295                risingwave_pb::stream_plan::RefreshStartMutation {
1296                    table_id: *table_id,
1297                    associated_source_id: *associated_source_id,
1298                },
1299            )),
1300            Command::ListFinish {
1301                table_id: _,
1302                associated_source_id,
1303            } => Some(Mutation::ListFinish(ListFinishMutation {
1304                associated_source_id: *associated_source_id,
1305            })),
1306            Command::LoadFinish {
1307                table_id: _,
1308                associated_source_id,
1309            } => Some(Mutation::LoadFinish(LoadFinishMutation {
1310                associated_source_id: *associated_source_id,
1311            })),
1312            Command::ResetSource { source_id } => Some(Mutation::ResetSource(
1313                risingwave_pb::stream_plan::ResetSourceMutation {
1314                    source_id: source_id.as_raw_id(),
1315                },
1316            )),
1317        };
1318        Ok(mutation)
1319    }
1320
1321    pub(super) fn actors_to_create(
1322        &self,
1323        database_info: &InflightDatabaseInfo,
1324        edges: &mut Option<FragmentEdgeBuildResult>,
1325        control_stream_manager: &ControlStreamManager,
1326    ) -> Option<StreamJobActorsToCreate> {
1327        match self {
1328            Command::CreateStreamingJob { info, job_type, .. } => {
1329                if let CreateStreamingJobType::SnapshotBackfill(_) = job_type {
1330                    // for snapshot backfill, the actors to create is measured separately
1331                    return None;
1332                }
1333                let actors_to_create = info.stream_job_fragments.actors_to_create();
1334                let edges = edges.as_mut().expect("should exist");
1335                Some(edges.collect_actors_to_create(actors_to_create.map(
1336                    |(fragment_id, node, actors)| {
1337                        (
1338                            fragment_id,
1339                            node,
1340                            actors,
1341                            [], // no subscriber for new job to create
1342                        )
1343                    },
1344                )))
1345            }
1346            Command::RescheduleFragment {
1347                reschedules,
1348                fragment_actors,
1349                ..
1350            } => {
1351                // we assume that we only scale the actors in database partial graph
1352                let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1353                    reschedules.iter().map(|(fragment_id, reschedule)| {
1354                        (
1355                            *fragment_id,
1356                            reschedule.newly_created_actors.values().map(
1357                                |((actor, dispatchers), _)| {
1358                                    (actor.actor_id, dispatchers.as_slice())
1359                                },
1360                            ),
1361                        )
1362                    }),
1363                    Some((reschedules, fragment_actors)),
1364                    database_info,
1365                    control_stream_manager,
1366                );
1367                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1368                for (fragment_id, (actor, dispatchers), worker_id) in
1369                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1370                        reschedule
1371                            .newly_created_actors
1372                            .values()
1373                            .map(|(actors, status)| (*fragment_id, actors, status))
1374                    })
1375                {
1376                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1377                    map.entry(*worker_id)
1378                        .or_default()
1379                        .entry(fragment_id)
1380                        .or_insert_with(|| {
1381                            let node = database_info.fragment(fragment_id).nodes.clone();
1382                            let subscribers =
1383                                database_info.fragment_subscribers(fragment_id).collect();
1384                            (node, vec![], subscribers)
1385                        })
1386                        .1
1387                        .push((actor.clone(), upstreams, dispatchers.clone()));
1388                }
1389                Some(map)
1390            }
1391            Command::ReplaceStreamJob(replace_table) => {
1392                let edges = edges.as_mut().expect("should exist");
1393                let mut actors = edges.collect_actors_to_create(
1394                    replace_table.new_fragments.actors_to_create().map(
1395                        |(fragment_id, node, actors)| {
1396                            (
1397                                fragment_id,
1398                                node,
1399                                actors,
1400                                database_info
1401                                    .job_subscribers(replace_table.old_fragments.stream_job_id),
1402                            )
1403                        },
1404                    ),
1405                );
1406                if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1407                    let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1408                        (
1409                            sink.new_fragment.fragment_id,
1410                            &sink.new_fragment.nodes,
1411                            sink.new_fragment.actors.iter().map(|actor| {
1412                                (
1413                                    actor,
1414                                    sink.actor_status[&actor.actor_id]
1415                                        .location
1416                                        .as_ref()
1417                                        .unwrap()
1418                                        .worker_node_id,
1419                                )
1420                            }),
1421                            database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1422                        )
1423                    }));
1424                    for (worker_id, fragment_actors) in sink_actors {
1425                        actors.entry(worker_id).or_default().extend(fragment_actors);
1426                    }
1427                }
1428                Some(actors)
1429            }
1430            _ => None,
1431        }
1432    }
1433
1434    fn generate_update_mutation_for_replace_table(
1435        dropped_actors: impl IntoIterator<Item = ActorId>,
1436        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1437        dispatchers: FragmentActorDispatchers,
1438        init_split_assignment: &SplitAssignment,
1439        cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1440        auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1441    ) -> Option<Mutation> {
1442        let dropped_actors = dropped_actors.into_iter().collect();
1443
1444        let actor_new_dispatchers = dispatchers
1445            .into_values()
1446            .flatten()
1447            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1448            .collect();
1449
1450        let actor_splits = init_split_assignment
1451            .values()
1452            .flat_map(build_actor_connector_splits)
1453            .collect();
1454        Some(Mutation::Update(UpdateMutation {
1455            actor_new_dispatchers,
1456            merge_update: merge_updates.into_values().flatten().collect(),
1457            dropped_actors,
1458            actor_splits,
1459            actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1460            sink_schema_change: auto_refresh_schema_sinks
1461                .as_ref()
1462                .into_iter()
1463                .flat_map(|sinks| {
1464                    sinks.iter().map(|sink| {
1465                        (
1466                            sink.original_sink.id.as_raw_id(),
1467                            PbSinkSchemaChange {
1468                                original_schema: sink
1469                                    .original_sink
1470                                    .columns
1471                                    .iter()
1472                                    .map(|col| PbField {
1473                                        data_type: Some(
1474                                            col.column_desc
1475                                                .as_ref()
1476                                                .unwrap()
1477                                                .column_type
1478                                                .as_ref()
1479                                                .unwrap()
1480                                                .clone(),
1481                                        ),
1482                                        name: col.column_desc.as_ref().unwrap().name.clone(),
1483                                    })
1484                                    .collect(),
1485                                op: Some(PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1486                                    fields: sink
1487                                        .newly_add_fields
1488                                        .iter()
1489                                        .map(|field| field.to_prost())
1490                                        .collect(),
1491                                })),
1492                            },
1493                        )
1494                    })
1495                })
1496                .collect(),
1497            ..Default::default()
1498        }))
1499    }
1500
1501    /// For `CancelStreamingJob`, returns the table id of the target table.
1502    pub fn jobs_to_drop(&self) -> impl Iterator<Item = JobId> + '_ {
1503        match self {
1504            Command::DropStreamingJobs {
1505                streaming_job_ids, ..
1506            } => Some(streaming_job_ids.iter().cloned()),
1507            _ => None,
1508        }
1509        .into_iter()
1510        .flatten()
1511    }
1512}
1513
1514impl Command {
1515    #[expect(clippy::type_complexity)]
1516    pub(super) fn collect_database_partial_graph_actor_upstreams(
1517        actor_dispatchers: impl Iterator<
1518            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1519        >,
1520        reschedule_dispatcher_update: Option<(
1521            &HashMap<FragmentId, Reschedule>,
1522            &HashMap<FragmentId, HashSet<ActorId>>,
1523        )>,
1524        database_info: &InflightDatabaseInfo,
1525        control_stream_manager: &ControlStreamManager,
1526    ) -> HashMap<ActorId, ActorUpstreams> {
1527        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1528        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1529            let upstream_fragment = database_info.fragment(upstream_fragment_id);
1530            for (upstream_actor_id, dispatchers) in upstream_actors {
1531                let upstream_actor_location =
1532                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1533                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1534                for downstream_actor_id in dispatchers
1535                    .iter()
1536                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1537                {
1538                    actor_upstreams
1539                        .entry(*downstream_actor_id)
1540                        .or_default()
1541                        .entry(upstream_fragment_id)
1542                        .or_default()
1543                        .insert(
1544                            upstream_actor_id,
1545                            PbActorInfo {
1546                                actor_id: upstream_actor_id,
1547                                host: Some(upstream_actor_host.clone()),
1548                                partial_graph_id: to_partial_graph_id(None),
1549                            },
1550                        );
1551                }
1552            }
1553        }
1554        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1555            for reschedule in reschedules.values() {
1556                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1557                    let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1558                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1559                    for upstream_actor_id in fragment_actors
1560                        .get(upstream_fragment_id)
1561                        .expect("should exist")
1562                    {
1563                        let upstream_actor_location =
1564                            upstream_fragment.actors[upstream_actor_id].worker_id;
1565                        let upstream_actor_host =
1566                            control_stream_manager.host_addr(upstream_actor_location);
1567                        if let Some(upstream_reschedule) = upstream_reschedule
1568                            && upstream_reschedule
1569                                .removed_actors
1570                                .contains(upstream_actor_id)
1571                        {
1572                            continue;
1573                        }
1574                        for (_, downstream_actor_id) in
1575                            reschedule
1576                                .added_actors
1577                                .iter()
1578                                .flat_map(|(worker_id, actors)| {
1579                                    actors.iter().map(|actor| (*worker_id, *actor))
1580                                })
1581                        {
1582                            actor_upstreams
1583                                .entry(downstream_actor_id)
1584                                .or_default()
1585                                .entry(*upstream_fragment_id)
1586                                .or_default()
1587                                .insert(
1588                                    *upstream_actor_id,
1589                                    PbActorInfo {
1590                                        actor_id: *upstream_actor_id,
1591                                        host: Some(upstream_actor_host.clone()),
1592                                        partial_graph_id: to_partial_graph_id(None),
1593                                    },
1594                                );
1595                        }
1596                    }
1597                }
1598            }
1599        }
1600        actor_upstreams
1601    }
1602}