risingwave_meta/barrier/
command.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Formatter;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::ActorMapping;
23use risingwave_common::must_match;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_connector::source::SplitImpl;
27use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::catalog::{CreateType, Table};
30use risingwave_pb::common::ActorInfo;
31use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
32use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
35use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
36use risingwave_pb::stream_plan::update_mutation::*;
37use risingwave_pb::stream_plan::{
38    AddMutation, BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatcher,
39    Dispatchers, DropSubscriptionsMutation, PauseMutation, ResumeMutation,
40    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
41    SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
42};
43use risingwave_pb::stream_service::BarrierCompleteResponse;
44use tracing::warn;
45
46use super::info::{CommandFragmentChanges, InflightDatabaseInfo, InflightStreamingJobInfo};
47use crate::barrier::InflightSubscriptionInfo;
48use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
49use crate::barrier::edge_builder::FragmentEdgeBuildResult;
50use crate::barrier::info::BarrierInfo;
51use crate::barrier::rpc::ControlStreamManager;
52use crate::barrier::utils::collect_resp_info;
53use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
54use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
55use crate::manager::{StreamingJob, StreamingJobType};
56use crate::model::{
57    ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
58    FragmentId, FragmentReplaceUpstream, StreamActorWithDispatchers, StreamJobActorsToCreate,
59    StreamJobFragments, StreamJobFragmentsToCreate,
60};
61use crate::stream::{
62    ConnectorPropsChange, FragmentBackfillOrder, JobReschedulePostUpdates, SplitAssignment,
63    ThrottleConfig, build_actor_connector_splits,
64};
65
66/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
67/// in some fragment, like scaling or migrating.
68#[derive(Debug, Clone)]
69pub struct Reschedule {
70    /// Added actors in this fragment.
71    pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
72
73    /// Removed actors in this fragment.
74    pub removed_actors: HashSet<ActorId>,
75
76    /// Vnode bitmap updates for some actors in this fragment.
77    pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
78
79    /// The upstream fragments of this fragment, and the dispatchers that should be updated.
80    pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
81    /// New hash mapping of the upstream dispatcher to be updated.
82    ///
83    /// This field exists only when there's upstream fragment and the current fragment is
84    /// hash-sharded.
85    pub upstream_dispatcher_mapping: Option<ActorMapping>,
86
87    /// The downstream fragments of this fragment.
88    pub downstream_fragment_ids: Vec<FragmentId>,
89
90    /// Reassigned splits for source actors.
91    /// It becomes the `actor_splits` in [`UpdateMutation`].
92    /// `Source` and `SourceBackfill` are handled together here.
93    pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
94
95    pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
96}
97
98/// Replacing an old job with a new one. All actors in the job will be rebuilt.
99///
100/// Current use cases:
101/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
102/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
103///   will replace a table job's plan.
104#[derive(Debug, Clone)]
105pub struct ReplaceStreamJobPlan {
106    pub old_fragments: StreamJobFragments,
107    pub new_fragments: StreamJobFragmentsToCreate,
108    /// Downstream jobs of the replaced job need to update their `Merge` node to
109    /// connect to the new fragment.
110    pub replace_upstream: FragmentReplaceUpstream,
111    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
112    /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids.
113    /// We need to reassign splits for it.
114    ///
115    /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about
116    /// `backfill_splits`.
117    pub init_split_assignment: SplitAssignment,
118    /// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
119    pub streaming_job: StreamingJob,
120    /// The temporary dummy job fragments id of new table fragment
121    pub tmp_id: u32,
122    /// The state table ids to be dropped.
123    pub to_drop_state_table_ids: Vec<TableId>,
124}
125
126impl ReplaceStreamJobPlan {
127    fn fragment_changes(&self) -> HashMap<FragmentId, CommandFragmentChanges> {
128        let mut fragment_changes = HashMap::new();
129        for (fragment_id, new_fragment) in self.new_fragments.new_fragment_info() {
130            let fragment_change =
131                CommandFragmentChanges::NewFragment(self.streaming_job.id().into(), new_fragment);
132            fragment_changes
133                .try_insert(fragment_id, fragment_change)
134                .expect("non-duplicate");
135        }
136        for fragment in self.old_fragments.fragments.values() {
137            fragment_changes
138                .try_insert(fragment.fragment_id, CommandFragmentChanges::RemoveFragment)
139                .expect("non-duplicate");
140        }
141        for (fragment_id, replace_map) in &self.replace_upstream {
142            fragment_changes
143                .try_insert(
144                    *fragment_id,
145                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map.clone()),
146                )
147                .expect("non-duplicate");
148        }
149        fragment_changes
150    }
151
152    /// `old_fragment_id` -> `new_fragment_id`
153    pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
154        let mut fragment_replacements = HashMap::new();
155        for (upstream_fragment_id, new_upstream_fragment_id) in
156            self.replace_upstream.values().flatten()
157        {
158            {
159                let r =
160                    fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
161                if let Some(r) = r {
162                    assert_eq!(
163                        *new_upstream_fragment_id, r,
164                        "one fragment is replaced by multiple fragments"
165                    );
166                }
167            }
168        }
169        fragment_replacements
170    }
171}
172
173#[derive(educe::Educe, Clone)]
174#[educe(Debug)]
175pub struct CreateStreamingJobCommandInfo {
176    #[educe(Debug(ignore))]
177    pub stream_job_fragments: StreamJobFragmentsToCreate,
178    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
179    pub init_split_assignment: SplitAssignment,
180    pub definition: String,
181    pub job_type: StreamingJobType,
182    pub create_type: CreateType,
183    pub streaming_job: StreamingJob,
184    pub internal_tables: Vec<Table>,
185    pub fragment_backfill_ordering: FragmentBackfillOrder,
186}
187
188impl StreamJobFragments {
189    pub(super) fn new_fragment_info(
190        &self,
191    ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + '_ {
192        self.fragments.values().map(|fragment| {
193            (
194                fragment.fragment_id,
195                InflightFragmentInfo {
196                    fragment_id: fragment.fragment_id,
197                    distribution_type: fragment.distribution_type.into(),
198                    nodes: fragment.nodes.clone(),
199                    actors: fragment
200                        .actors
201                        .iter()
202                        .map(|actor| {
203                            (
204                                actor.actor_id,
205                                InflightActorInfo {
206                                    worker_id: self
207                                        .actor_status
208                                        .get(&actor.actor_id)
209                                        .expect("should exist")
210                                        .worker_id()
211                                        as WorkerId,
212                                    vnode_bitmap: actor.vnode_bitmap.clone(),
213                                },
214                            )
215                        })
216                        .collect(),
217                    state_table_ids: fragment
218                        .state_table_ids
219                        .iter()
220                        .map(|table_id| TableId::new(*table_id))
221                        .collect(),
222                },
223            )
224        })
225    }
226}
227
228#[derive(Debug, Clone)]
229pub struct SnapshotBackfillInfo {
230    /// `table_id` -> `Some(snapshot_backfill_epoch)`
231    /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled
232    /// by global barrier worker when handling the command.
233    pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
234}
235
236#[derive(Debug, Clone)]
237pub enum CreateStreamingJobType {
238    Normal,
239    SinkIntoTable(ReplaceStreamJobPlan),
240    SnapshotBackfill(SnapshotBackfillInfo),
241}
242
243/// [`Command`] is the input of [`crate::barrier::worker::GlobalBarrierWorker`]. For different commands,
244/// it will [build different barriers to send](Self::to_mutation),
245/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
246// FIXME: this enum is significantly large on stack, box it
247#[derive(Debug)]
248pub enum Command {
249    /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
250    /// all messages before the checkpoint barrier should have been committed.
251    Flush,
252
253    /// `Pause` command generates a `Pause` barrier **only if**
254    /// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
255    Pause,
256
257    /// `Resume` command generates a `Resume` barrier **only
258    /// if** the cluster is paused with the same reason. Otherwise, a barrier with no mutation
259    /// will be generated.
260    Resume,
261
262    /// `DropStreamingJobs` command generates a `Stop` barrier to stop the given
263    /// [`Vec<ActorId>`]. The catalog has ensured that these streaming jobs are safe to be
264    /// dropped by reference counts before.
265    ///
266    /// Barriers from the actors to be dropped will STILL be collected.
267    /// After the barrier is collected, it notifies the local stream manager of compute nodes to
268    /// drop actors, and then delete the job fragments info from meta store.
269    DropStreamingJobs {
270        table_fragments_ids: HashSet<TableId>,
271        actors: Vec<ActorId>,
272        unregistered_state_table_ids: HashSet<TableId>,
273        unregistered_fragment_ids: HashSet<FragmentId>,
274    },
275
276    /// `CreateStreamingJob` command generates a `Add` barrier by given info.
277    ///
278    /// Barriers from the actors to be created, which is marked as `Inactive` at first, will STILL
279    /// be collected since the barrier should be passthrough.
280    ///
281    /// After the barrier is collected, these newly created actors will be marked as `Running`. And
282    /// it adds the job fragments info to meta store. However, the creating progress will **last
283    /// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
284    /// will be set to `Created`.
285    CreateStreamingJob {
286        info: CreateStreamingJobCommandInfo,
287        job_type: CreateStreamingJobType,
288        cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
289    },
290    MergeSnapshotBackfillStreamingJobs(
291        HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>,
292    ),
293
294    /// `Reschedule` command generates a `Update` barrier by the [`Reschedule`] of each fragment.
295    /// Mainly used for scaling and migration.
296    ///
297    /// Barriers from which actors should be collected, and the post behavior of this command are
298    /// very similar to `Create` and `Drop` commands, for added and removed actors, respectively.
299    RescheduleFragment {
300        reschedules: HashMap<FragmentId, Reschedule>,
301        // Should contain the actor ids in upstream and downstream fragment of `reschedules`
302        fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
303        // Used for updating additional metadata after the barrier ends
304        post_updates: JobReschedulePostUpdates,
305    },
306
307    /// `ReplaceStreamJob` command generates a `Update` barrier with the given `replace_upstream`. This is
308    /// essentially switching the downstream of the old job fragments to the new ones, and
309    /// dropping the old job fragments. Used for schema change.
310    ///
311    /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
312    /// of the Merge executors are changed additionally.
313    ReplaceStreamJob(ReplaceStreamJobPlan),
314
315    /// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
316    /// changed splits.
317    SourceChangeSplit(SplitAssignment),
318
319    /// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
320    /// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
321    Throttle(ThrottleConfig),
322
323    /// `CreateSubscription` command generates a `CreateSubscriptionMutation` to notify
324    /// materialize executor to start storing old value for subscription.
325    CreateSubscription {
326        subscription_id: u32,
327        upstream_mv_table_id: TableId,
328        retention_second: u64,
329    },
330
331    /// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
332    /// materialize executor to stop storing old value when there is no
333    /// subscription depending on it.
334    DropSubscription {
335        subscription_id: u32,
336        upstream_mv_table_id: TableId,
337    },
338
339    ConnectorPropsChange(ConnectorPropsChange),
340
341    /// `StartFragmentBackfill` command will trigger backfilling for specified scans by `fragment_id`.
342    StartFragmentBackfill {
343        fragment_ids: Vec<FragmentId>,
344    },
345}
346
347// For debugging and observability purposes. Can add more details later if needed.
348impl std::fmt::Display for Command {
349    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
350        match self {
351            Command::Flush => write!(f, "Flush"),
352            Command::Pause => write!(f, "Pause"),
353            Command::Resume => write!(f, "Resume"),
354            Command::DropStreamingJobs {
355                table_fragments_ids,
356                ..
357            } => {
358                write!(
359                    f,
360                    "DropStreamingJobs: {}",
361                    table_fragments_ids.iter().sorted().join(", ")
362                )
363            }
364            Command::CreateStreamingJob { info, .. } => {
365                write!(f, "CreateStreamingJob: {}", info.streaming_job)
366            }
367            Command::MergeSnapshotBackfillStreamingJobs(_) => {
368                write!(f, "MergeSnapshotBackfillStreamingJobs")
369            }
370            Command::RescheduleFragment { .. } => write!(f, "RescheduleFragment"),
371            Command::ReplaceStreamJob(plan) => {
372                write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
373            }
374            Command::SourceChangeSplit(_) => write!(f, "SourceChangeSplit"),
375            Command::Throttle(_) => write!(f, "Throttle"),
376            Command::CreateSubscription {
377                subscription_id, ..
378            } => write!(f, "CreateSubscription: {subscription_id}"),
379            Command::DropSubscription {
380                subscription_id, ..
381            } => write!(f, "DropSubscription: {subscription_id}"),
382            Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
383            Command::StartFragmentBackfill { .. } => write!(f, "StartFragmentBackfill"),
384        }
385    }
386}
387
388impl Command {
389    pub fn pause() -> Self {
390        Self::Pause
391    }
392
393    pub fn resume() -> Self {
394        Self::Resume
395    }
396
397    pub fn cancel(table_fragments: &StreamJobFragments) -> Self {
398        Self::DropStreamingJobs {
399            table_fragments_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
400            actors: table_fragments.actor_ids(),
401            unregistered_state_table_ids: table_fragments
402                .all_table_ids()
403                .map(TableId::new)
404                .collect(),
405            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
406        }
407    }
408
409    pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
410        match self {
411            Command::Flush => None,
412            Command::Pause => None,
413            Command::Resume => None,
414            Command::DropStreamingJobs {
415                unregistered_fragment_ids,
416                ..
417            } => Some(
418                unregistered_fragment_ids
419                    .iter()
420                    .map(|fragment_id| (*fragment_id, CommandFragmentChanges::RemoveFragment))
421                    .collect(),
422            ),
423            Command::CreateStreamingJob { info, job_type, .. } => {
424                assert!(
425                    !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_)),
426                    "should handle fragment changes separately for snapshot backfill"
427                );
428                let mut changes: HashMap<_, _> = info
429                    .stream_job_fragments
430                    .new_fragment_info()
431                    .map(|(fragment_id, fragment_info)| {
432                        (
433                            fragment_id,
434                            CommandFragmentChanges::NewFragment(
435                                info.streaming_job.id().into(),
436                                fragment_info,
437                            ),
438                        )
439                    })
440                    .collect();
441
442                if let CreateStreamingJobType::SinkIntoTable(plan) = job_type {
443                    let extra_change = plan.fragment_changes();
444                    changes.extend(extra_change);
445                }
446
447                Some(changes)
448            }
449            Command::RescheduleFragment { reschedules, .. } => Some(
450                reschedules
451                    .iter()
452                    .map(|(fragment_id, reschedule)| {
453                        (
454                            *fragment_id,
455                            CommandFragmentChanges::Reschedule {
456                                new_actors: reschedule
457                                    .added_actors
458                                    .iter()
459                                    .flat_map(|(node_id, actors)| {
460                                        actors.iter().map(|actor_id| {
461                                            (
462                                                *actor_id,
463                                                InflightActorInfo {
464                                                    worker_id: *node_id,
465                                                    vnode_bitmap: reschedule
466                                                        .newly_created_actors
467                                                        .get(actor_id)
468                                                        .expect("should exist")
469                                                        .0
470                                                        .0
471                                                        .vnode_bitmap
472                                                        .clone(),
473                                                },
474                                            )
475                                        })
476                                    })
477                                    .collect(),
478                                actor_update_vnode_bitmap: reschedule
479                                    .vnode_bitmap_updates
480                                    .iter()
481                                    .filter(|(actor_id, _)| {
482                                        // only keep the existing actors
483                                        !reschedule.newly_created_actors.contains_key(actor_id)
484                                    })
485                                    .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
486                                    .collect(),
487                                to_remove: reschedule.removed_actors.iter().cloned().collect(),
488                            },
489                        )
490                    })
491                    .collect(),
492            ),
493            Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
494            Command::MergeSnapshotBackfillStreamingJobs(_) => None,
495            Command::SourceChangeSplit(_) => None,
496            Command::Throttle(_) => None,
497            Command::CreateSubscription { .. } => None,
498            Command::DropSubscription { .. } => None,
499            Command::ConnectorPropsChange(_) => None,
500            Command::StartFragmentBackfill { .. } => None,
501        }
502    }
503
504    pub fn need_checkpoint(&self) -> bool {
505        // todo! Reviewing the flow of different command to reduce the amount of checkpoint
506        !matches!(self, Command::Resume)
507    }
508}
509
510#[derive(Debug, Clone)]
511pub enum BarrierKind {
512    Initial,
513    Barrier,
514    /// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
515    Checkpoint(Vec<u64>),
516}
517
518impl BarrierKind {
519    pub fn to_protobuf(&self) -> PbBarrierKind {
520        match self {
521            BarrierKind::Initial => PbBarrierKind::Initial,
522            BarrierKind::Barrier => PbBarrierKind::Barrier,
523            BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
524        }
525    }
526
527    pub fn is_checkpoint(&self) -> bool {
528        matches!(self, BarrierKind::Checkpoint(_))
529    }
530
531    pub fn is_initial(&self) -> bool {
532        matches!(self, BarrierKind::Initial)
533    }
534
535    pub fn as_str_name(&self) -> &'static str {
536        match self {
537            BarrierKind::Initial => "Initial",
538            BarrierKind::Barrier => "Barrier",
539            BarrierKind::Checkpoint(_) => "Checkpoint",
540        }
541    }
542}
543
544/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
545/// [`Command`].
546pub(super) struct CommandContext {
547    subscription_info: InflightSubscriptionInfo,
548
549    pub(super) barrier_info: BarrierInfo,
550
551    pub(super) table_ids_to_commit: HashSet<TableId>,
552
553    pub(super) command: Option<Command>,
554
555    /// The tracing span of this command.
556    ///
557    /// Differs from [`crate::barrier::TracedEpoch`], this span focuses on the lifetime of the corresponding
558    /// barrier, including the process of waiting for the barrier to be sent, flowing through the
559    /// stream graph on compute nodes, and finishing its `post_collect` stuffs.
560    _span: tracing::Span,
561}
562
563impl std::fmt::Debug for CommandContext {
564    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
565        f.debug_struct("CommandContext")
566            .field("barrier_info", &self.barrier_info)
567            .field("command", &self.command)
568            .finish()
569    }
570}
571
572impl std::fmt::Display for CommandContext {
573    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
574        write!(
575            f,
576            "prev_epoch={}, curr_epoch={}, kind={}",
577            self.barrier_info.prev_epoch.value().0,
578            self.barrier_info.curr_epoch.value().0,
579            self.barrier_info.kind.as_str_name()
580        )?;
581        if let Some(command) = &self.command {
582            write!(f, ", command={}", command)?;
583        }
584        Ok(())
585    }
586}
587
588impl CommandContext {
589    pub(super) fn new(
590        barrier_info: BarrierInfo,
591        subscription_info: InflightSubscriptionInfo,
592        table_ids_to_commit: HashSet<TableId>,
593        command: Option<Command>,
594        span: tracing::Span,
595    ) -> Self {
596        Self {
597            subscription_info,
598            barrier_info,
599            table_ids_to_commit,
600            command,
601            _span: span,
602        }
603    }
604
605    fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
606        let Some(truncate_timestamptz) = Timestamptz::from_secs(
607            self.barrier_info
608                .prev_epoch
609                .value()
610                .as_timestamptz()
611                .timestamp()
612                - retention_second as i64,
613        ) else {
614            warn!(retention_second, prev_epoch = ?self.barrier_info.prev_epoch.value(), "invalid retention second value");
615            return self.barrier_info.prev_epoch.value();
616        };
617        Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
618    }
619
620    pub(super) fn collect_commit_epoch_info(
621        &self,
622        info: &mut CommitEpochInfo,
623        resps: Vec<BarrierCompleteResponse>,
624        backfill_pinned_log_epoch: HashMap<TableId, (u64, HashSet<TableId>)>,
625    ) {
626        let (sst_to_context, synced_ssts, new_table_watermarks, old_value_ssts) =
627            collect_resp_info(resps);
628
629        let new_table_fragment_infos =
630            if let Some(Command::CreateStreamingJob { info, job_type, .. }) = &self.command
631                && !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
632            {
633                let table_fragments = &info.stream_job_fragments;
634                let mut table_ids: HashSet<_> = table_fragments
635                    .internal_table_ids()
636                    .into_iter()
637                    .map(TableId::new)
638                    .collect();
639                if let Some(mv_table_id) = table_fragments.mv_table_id() {
640                    table_ids.insert(TableId::new(mv_table_id));
641                }
642
643                vec![NewTableFragmentInfo { table_ids }]
644            } else {
645                vec![]
646            };
647
648        let mut mv_log_store_truncate_epoch = HashMap::new();
649        // TODO: may collect cross db snapshot backfill
650        let mut update_truncate_epoch =
651            |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch
652                .entry(table_id.table_id)
653            {
654                Entry::Occupied(mut entry) => {
655                    let prev_truncate_epoch = entry.get_mut();
656                    if truncate_epoch < *prev_truncate_epoch {
657                        *prev_truncate_epoch = truncate_epoch;
658                    }
659                }
660                Entry::Vacant(entry) => {
661                    entry.insert(truncate_epoch);
662                }
663            };
664        for (mv_table_id, subscriptions) in &self.subscription_info.mv_depended_subscriptions {
665            if let Some(truncate_epoch) = subscriptions
666                .values()
667                .max()
668                .map(|max_retention| self.get_truncate_epoch(*max_retention).0)
669            {
670                update_truncate_epoch(*mv_table_id, truncate_epoch);
671            }
672        }
673        for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
674            for mv_table_id in upstream_mv_table_ids {
675                update_truncate_epoch(mv_table_id, backfill_epoch);
676            }
677        }
678
679        let table_new_change_log = build_table_change_log_delta(
680            old_value_ssts.into_iter(),
681            synced_ssts.iter().map(|sst| &sst.sst_info),
682            must_match!(&self.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
683            mv_log_store_truncate_epoch.into_iter(),
684        );
685
686        let epoch = self.barrier_info.prev_epoch();
687        for table_id in &self.table_ids_to_commit {
688            info.tables_to_commit
689                .try_insert(*table_id, epoch)
690                .expect("non duplicate");
691        }
692
693        info.sstables.extend(synced_ssts);
694        info.new_table_watermarks.extend(new_table_watermarks);
695        info.sst_to_context.extend(sst_to_context);
696        info.new_table_fragment_infos
697            .extend(new_table_fragment_infos);
698        info.change_log_delta.extend(table_new_change_log);
699    }
700}
701
702impl Command {
703    /// Generate a mutation for the given command.
704    ///
705    /// `edges` contains the information of `dispatcher`s of `DispatchExecutor` and `actor_upstreams`s of `MergeNode`
706    pub(super) fn to_mutation(
707        &self,
708        is_currently_paused: bool,
709        edges: &mut Option<FragmentEdgeBuildResult>,
710        control_stream_manager: &ControlStreamManager,
711    ) -> Option<Mutation> {
712        match self {
713            Command::Flush => None,
714
715            Command::Pause => {
716                // Only pause when the cluster is not already paused.
717                // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
718                if !is_currently_paused {
719                    Some(Mutation::Pause(PauseMutation {}))
720                } else {
721                    None
722                }
723            }
724
725            Command::Resume => {
726                // Only resume when the cluster is paused with the same reason.
727                if is_currently_paused {
728                    Some(Mutation::Resume(ResumeMutation {}))
729                } else {
730                    None
731                }
732            }
733
734            Command::SourceChangeSplit(change) => {
735                let mut diff = HashMap::new();
736
737                for actor_splits in change.values() {
738                    diff.extend(actor_splits.clone());
739                }
740
741                Some(Mutation::Splits(SourceChangeSplitMutation {
742                    actor_splits: build_actor_connector_splits(&diff),
743                }))
744            }
745
746            Command::Throttle(config) => {
747                let mut actor_to_apply = HashMap::new();
748                for per_fragment in config.values() {
749                    actor_to_apply.extend(
750                        per_fragment
751                            .iter()
752                            .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit })),
753                    );
754                }
755
756                Some(Mutation::Throttle(ThrottleMutation {
757                    actor_throttle: actor_to_apply,
758                }))
759            }
760
761            Command::DropStreamingJobs { actors, .. } => Some(Mutation::Stop(StopMutation {
762                actors: actors.clone(),
763            })),
764
765            Command::CreateStreamingJob {
766                info:
767                    CreateStreamingJobCommandInfo {
768                        stream_job_fragments: table_fragments,
769                        init_split_assignment: split_assignment,
770                        upstream_fragment_downstreams,
771                        fragment_backfill_ordering,
772                        ..
773                    },
774                job_type,
775                ..
776            } => {
777                let edges = edges.as_mut().expect("should exist");
778                let added_actors = table_fragments.actor_ids();
779                let actor_splits = split_assignment
780                    .values()
781                    .flat_map(build_actor_connector_splits)
782                    .collect();
783                let subscriptions_to_add =
784                    if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) =
785                        job_type
786                    {
787                        snapshot_backfill_info
788                            .upstream_mv_table_id_to_backfill_epoch
789                            .keys()
790                            .map(|table_id| SubscriptionUpstreamInfo {
791                                subscriber_id: table_fragments.stream_job_id().table_id,
792                                upstream_mv_table_id: table_id.table_id,
793                            })
794                            .collect()
795                    } else {
796                        Default::default()
797                    };
798                let backfill_nodes_to_pause: Vec<_> =
799                    get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
800                        .into_iter()
801                        .collect();
802                let add = Some(Mutation::Add(AddMutation {
803                    actor_dispatchers: edges
804                        .dispatchers
805                        .extract_if(|fragment_id, _| {
806                            upstream_fragment_downstreams.contains_key(fragment_id)
807                        })
808                        .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
809                        .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
810                        .collect(),
811                    added_actors,
812                    actor_splits,
813                    // If the cluster is already paused, the new actors should be paused too.
814                    pause: is_currently_paused,
815                    subscriptions_to_add,
816                    backfill_nodes_to_pause,
817                }));
818
819                if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan {
820                    old_fragments,
821                    init_split_assignment,
822                    replace_upstream,
823                    upstream_fragment_downstreams,
824                    ..
825                }) = job_type
826                {
827                    let merge_updates = edges
828                        .merge_updates
829                        .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
830                        .collect();
831                    let dispatchers = edges
832                        .dispatchers
833                        .extract_if(|fragment_id, _| {
834                            upstream_fragment_downstreams.contains_key(fragment_id)
835                        })
836                        .collect();
837                    let update = Self::generate_update_mutation_for_replace_table(
838                        old_fragments,
839                        merge_updates,
840                        dispatchers,
841                        init_split_assignment,
842                    );
843
844                    Some(Mutation::Combined(CombinedMutation {
845                        mutations: vec![
846                            BarrierMutation { mutation: add },
847                            BarrierMutation { mutation: update },
848                        ],
849                    }))
850                } else {
851                    add
852                }
853            }
854            Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) => {
855                Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
856                    info: jobs_to_merge
857                        .iter()
858                        .flat_map(|(table_id, (backfill_upstream_tables, _))| {
859                            backfill_upstream_tables
860                                .iter()
861                                .map(move |upstream_table_id| SubscriptionUpstreamInfo {
862                                    subscriber_id: table_id.table_id,
863                                    upstream_mv_table_id: upstream_table_id.table_id,
864                                })
865                        })
866                        .collect(),
867                }))
868            }
869
870            Command::ReplaceStreamJob(ReplaceStreamJobPlan {
871                old_fragments,
872                replace_upstream,
873                upstream_fragment_downstreams,
874                init_split_assignment,
875                ..
876            }) => {
877                let edges = edges.as_mut().expect("should exist");
878                let merge_updates = edges
879                    .merge_updates
880                    .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
881                    .collect();
882                let dispatchers = edges
883                    .dispatchers
884                    .extract_if(|fragment_id, _| {
885                        upstream_fragment_downstreams.contains_key(fragment_id)
886                    })
887                    .collect();
888                Self::generate_update_mutation_for_replace_table(
889                    old_fragments,
890                    merge_updates,
891                    dispatchers,
892                    init_split_assignment,
893                )
894            }
895
896            Command::RescheduleFragment {
897                reschedules,
898                fragment_actors,
899                ..
900            } => {
901                let mut dispatcher_update = HashMap::new();
902                for reschedule in reschedules.values() {
903                    for &(upstream_fragment_id, dispatcher_id) in
904                        &reschedule.upstream_fragment_dispatcher_ids
905                    {
906                        // Find the actors of the upstream fragment.
907                        let upstream_actor_ids = fragment_actors
908                            .get(&upstream_fragment_id)
909                            .expect("should contain");
910
911                        let upstream_reschedule = reschedules.get(&upstream_fragment_id);
912
913                        // Record updates for all actors.
914                        for &actor_id in upstream_actor_ids {
915                            let added_downstream_actor_id = if upstream_reschedule
916                                .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
917                                .unwrap_or(true)
918                            {
919                                reschedule
920                                    .added_actors
921                                    .values()
922                                    .flatten()
923                                    .cloned()
924                                    .collect()
925                            } else {
926                                Default::default()
927                            };
928                            // Index with the dispatcher id to check duplicates.
929                            dispatcher_update
930                                .try_insert(
931                                    (actor_id, dispatcher_id),
932                                    DispatcherUpdate {
933                                        actor_id,
934                                        dispatcher_id,
935                                        hash_mapping: reschedule
936                                            .upstream_dispatcher_mapping
937                                            .as_ref()
938                                            .map(|m| m.to_protobuf()),
939                                        added_downstream_actor_id,
940                                        removed_downstream_actor_id: reschedule
941                                            .removed_actors
942                                            .iter()
943                                            .cloned()
944                                            .collect(),
945                                    },
946                                )
947                                .unwrap();
948                        }
949                    }
950                }
951                let dispatcher_update = dispatcher_update.into_values().collect();
952
953                let mut merge_update = HashMap::new();
954                for (&fragment_id, reschedule) in reschedules {
955                    for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
956                        // Find the actors of the downstream fragment.
957                        let downstream_actor_ids = fragment_actors
958                            .get(&downstream_fragment_id)
959                            .expect("should contain");
960
961                        // Downstream removed actors should be skipped
962                        // Newly created actors of the current fragment will not dispatch Update
963                        // barriers to them
964                        let downstream_removed_actors: HashSet<_> = reschedules
965                            .get(&downstream_fragment_id)
966                            .map(|downstream_reschedule| {
967                                downstream_reschedule
968                                    .removed_actors
969                                    .iter()
970                                    .copied()
971                                    .collect()
972                            })
973                            .unwrap_or_default();
974
975                        // Record updates for all actors.
976                        for &actor_id in downstream_actor_ids {
977                            if downstream_removed_actors.contains(&actor_id) {
978                                continue;
979                            }
980
981                            // Index with the fragment id to check duplicates.
982                            merge_update
983                                .try_insert(
984                                    (actor_id, fragment_id),
985                                    MergeUpdate {
986                                        actor_id,
987                                        upstream_fragment_id: fragment_id,
988                                        new_upstream_fragment_id: None,
989                                        added_upstream_actors: reschedule
990                                            .added_actors
991                                            .iter()
992                                            .flat_map(|(worker_id, actors)| {
993                                                let host =
994                                                    control_stream_manager.host_addr(*worker_id);
995                                                actors.iter().map(move |actor_id| ActorInfo {
996                                                    actor_id: *actor_id,
997                                                    host: Some(host.clone()),
998                                                })
999                                            })
1000                                            .collect(),
1001                                        removed_upstream_actor_id: reschedule
1002                                            .removed_actors
1003                                            .iter()
1004                                            .cloned()
1005                                            .collect(),
1006                                    },
1007                                )
1008                                .unwrap();
1009                        }
1010                    }
1011                }
1012                let merge_update = merge_update.into_values().collect();
1013
1014                let mut actor_vnode_bitmap_update = HashMap::new();
1015                for reschedule in reschedules.values() {
1016                    // Record updates for all actors in this fragment.
1017                    for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1018                        let bitmap = bitmap.to_protobuf();
1019                        actor_vnode_bitmap_update
1020                            .try_insert(actor_id, bitmap)
1021                            .unwrap();
1022                    }
1023                }
1024                let dropped_actors = reschedules
1025                    .values()
1026                    .flat_map(|r| r.removed_actors.iter().copied())
1027                    .collect();
1028                let mut actor_splits = HashMap::new();
1029
1030                for reschedule in reschedules.values() {
1031                    for (actor_id, splits) in &reschedule.actor_splits {
1032                        actor_splits.insert(
1033                            *actor_id as ActorId,
1034                            ConnectorSplits {
1035                                splits: splits.iter().map(ConnectorSplit::from).collect(),
1036                            },
1037                        );
1038                    }
1039                }
1040
1041                // we don't create dispatchers in reschedule scenario
1042                let actor_new_dispatchers = HashMap::new();
1043
1044                let mutation = Mutation::Update(UpdateMutation {
1045                    dispatcher_update,
1046                    merge_update,
1047                    actor_vnode_bitmap_update,
1048                    dropped_actors,
1049                    actor_splits,
1050                    actor_new_dispatchers,
1051                });
1052                tracing::debug!("update mutation: {mutation:?}");
1053                Some(mutation)
1054            }
1055
1056            Command::CreateSubscription {
1057                upstream_mv_table_id,
1058                subscription_id,
1059                ..
1060            } => Some(Mutation::Add(AddMutation {
1061                actor_dispatchers: Default::default(),
1062                added_actors: vec![],
1063                actor_splits: Default::default(),
1064                pause: false,
1065                subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1066                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1067                    subscriber_id: *subscription_id,
1068                }],
1069                backfill_nodes_to_pause: vec![],
1070            })),
1071            Command::DropSubscription {
1072                upstream_mv_table_id,
1073                subscription_id,
1074            } => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
1075                info: vec![SubscriptionUpstreamInfo {
1076                    subscriber_id: *subscription_id,
1077                    upstream_mv_table_id: upstream_mv_table_id.table_id,
1078                }],
1079            })),
1080            Command::ConnectorPropsChange(config) => {
1081                let mut connector_props_infos = HashMap::default();
1082                for (k, v) in config {
1083                    connector_props_infos.insert(
1084                        *k,
1085                        ConnectorPropsInfo {
1086                            connector_props_info: v.clone(),
1087                        },
1088                    );
1089                }
1090                Some(Mutation::ConnectorPropsChange(
1091                    ConnectorPropsChangeMutation {
1092                        connector_props_infos,
1093                    },
1094                ))
1095            }
1096            Command::StartFragmentBackfill { fragment_ids } => Some(
1097                Mutation::StartFragmentBackfill(StartFragmentBackfillMutation {
1098                    fragment_ids: fragment_ids.clone(),
1099                }),
1100            ),
1101        }
1102    }
1103
1104    pub(super) fn actors_to_create(
1105        &self,
1106        graph_info: &InflightDatabaseInfo,
1107        edges: &mut Option<FragmentEdgeBuildResult>,
1108        control_stream_manager: &ControlStreamManager,
1109    ) -> Option<StreamJobActorsToCreate> {
1110        match self {
1111            Command::CreateStreamingJob { info, job_type, .. } => {
1112                let sink_into_table_replace_plan = match job_type {
1113                    CreateStreamingJobType::Normal => None,
1114                    CreateStreamingJobType::SinkIntoTable(replace_table) => Some(replace_table),
1115                    CreateStreamingJobType::SnapshotBackfill(_) => {
1116                        // for snapshot backfill, the actors to create is measured separately
1117                        return None;
1118                    }
1119                };
1120                let get_actors_to_create = || {
1121                    sink_into_table_replace_plan
1122                        .map(|plan| plan.new_fragments.actors_to_create())
1123                        .into_iter()
1124                        .flatten()
1125                        .chain(info.stream_job_fragments.actors_to_create())
1126                };
1127                let edges = edges.as_mut().expect("should exist");
1128                Some(edges.collect_actors_to_create(get_actors_to_create()))
1129            }
1130            Command::RescheduleFragment {
1131                reschedules,
1132                fragment_actors,
1133                ..
1134            } => {
1135                let mut actor_upstreams = Self::collect_actor_upstreams(
1136                    reschedules.iter().map(|(fragment_id, reschedule)| {
1137                        (
1138                            *fragment_id,
1139                            reschedule.newly_created_actors.values().map(
1140                                |((actor, dispatchers), _)| {
1141                                    (actor.actor_id, dispatchers.as_slice())
1142                                },
1143                            ),
1144                        )
1145                    }),
1146                    Some((reschedules, fragment_actors)),
1147                    graph_info,
1148                    control_stream_manager,
1149                );
1150                let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>)>> = HashMap::new();
1151                for (fragment_id, (actor, dispatchers), worker_id) in
1152                    reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1153                        reschedule
1154                            .newly_created_actors
1155                            .values()
1156                            .map(|(actors, status)| (*fragment_id, actors, status))
1157                    })
1158                {
1159                    let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1160                    map.entry(*worker_id)
1161                        .or_default()
1162                        .entry(fragment_id)
1163                        .or_insert_with(|| {
1164                            let node = graph_info.fragment(fragment_id).nodes.clone();
1165                            (node, vec![])
1166                        })
1167                        .1
1168                        .push((actor.clone(), upstreams, dispatchers.clone()));
1169                }
1170                Some(map)
1171            }
1172            Command::ReplaceStreamJob(replace_table) => {
1173                let edges = edges.as_mut().expect("should exist");
1174                Some(edges.collect_actors_to_create(replace_table.new_fragments.actors_to_create()))
1175            }
1176            _ => None,
1177        }
1178    }
1179
1180    fn generate_update_mutation_for_replace_table(
1181        old_fragments: &StreamJobFragments,
1182        merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1183        dispatchers: FragmentActorDispatchers,
1184        init_split_assignment: &SplitAssignment,
1185    ) -> Option<Mutation> {
1186        let dropped_actors = old_fragments.actor_ids();
1187
1188        let actor_new_dispatchers = dispatchers
1189            .into_values()
1190            .flatten()
1191            .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1192            .collect();
1193
1194        let actor_splits = init_split_assignment
1195            .values()
1196            .flat_map(build_actor_connector_splits)
1197            .collect();
1198
1199        Some(Mutation::Update(UpdateMutation {
1200            actor_new_dispatchers,
1201            merge_update: merge_updates.into_values().flatten().collect(),
1202            dropped_actors,
1203            actor_splits,
1204            ..Default::default()
1205        }))
1206    }
1207
1208    /// For `CancelStreamingJob`, returns the table id of the target table.
1209    pub fn tables_to_drop(&self) -> impl Iterator<Item = TableId> + '_ {
1210        match self {
1211            Command::DropStreamingJobs {
1212                table_fragments_ids,
1213                ..
1214            } => Some(table_fragments_ids.iter().cloned()),
1215            _ => None,
1216        }
1217        .into_iter()
1218        .flatten()
1219    }
1220}
1221
1222impl Command {
1223    #[expect(clippy::type_complexity)]
1224    pub(super) fn collect_actor_upstreams(
1225        actor_dispatchers: impl Iterator<
1226            Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1227        >,
1228        reschedule_dispatcher_update: Option<(
1229            &HashMap<FragmentId, Reschedule>,
1230            &HashMap<FragmentId, HashSet<ActorId>>,
1231        )>,
1232        graph_info: &InflightDatabaseInfo,
1233        control_stream_manager: &ControlStreamManager,
1234    ) -> HashMap<ActorId, ActorUpstreams> {
1235        let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1236        for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1237            let upstream_fragment = graph_info.fragment(upstream_fragment_id);
1238            for (upstream_actor_id, dispatchers) in upstream_actors {
1239                let upstream_actor_location =
1240                    upstream_fragment.actors[&upstream_actor_id].worker_id;
1241                let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1242                for downstream_actor_id in dispatchers
1243                    .iter()
1244                    .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1245                {
1246                    actor_upstreams
1247                        .entry(*downstream_actor_id)
1248                        .or_default()
1249                        .entry(upstream_fragment_id)
1250                        .or_default()
1251                        .insert(
1252                            upstream_actor_id,
1253                            ActorInfo {
1254                                actor_id: upstream_actor_id,
1255                                host: Some(upstream_actor_host.clone()),
1256                            },
1257                        );
1258                }
1259            }
1260        }
1261        if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1262            for reschedule in reschedules.values() {
1263                for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1264                    let upstream_fragment = graph_info.fragment(*upstream_fragment_id);
1265                    let upstream_reschedule = reschedules.get(upstream_fragment_id);
1266                    for upstream_actor_id in fragment_actors
1267                        .get(upstream_fragment_id)
1268                        .expect("should exist")
1269                    {
1270                        let upstream_actor_location =
1271                            upstream_fragment.actors[upstream_actor_id].worker_id;
1272                        let upstream_actor_host =
1273                            control_stream_manager.host_addr(upstream_actor_location);
1274                        if let Some(upstream_reschedule) = upstream_reschedule
1275                            && upstream_reschedule
1276                                .removed_actors
1277                                .contains(upstream_actor_id)
1278                        {
1279                            continue;
1280                        }
1281                        for (_, downstream_actor_id) in
1282                            reschedule
1283                                .added_actors
1284                                .iter()
1285                                .flat_map(|(worker_id, actors)| {
1286                                    actors.iter().map(|actor| (*worker_id, *actor))
1287                                })
1288                        {
1289                            actor_upstreams
1290                                .entry(downstream_actor_id)
1291                                .or_default()
1292                                .entry(*upstream_fragment_id)
1293                                .or_default()
1294                                .insert(
1295                                    *upstream_actor_id,
1296                                    ActorInfo {
1297                                        actor_id: *upstream_actor_id,
1298                                        host: Some(upstream_actor_host.clone()),
1299                                    },
1300                                );
1301                        }
1302                    }
1303                }
1304            }
1305        }
1306        actor_upstreams
1307    }
1308}