risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
23use risingwave_meta_model::WorkerId;
24use risingwave_meta_model::fragment::DistributionType;
25use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
26use risingwave_pb::meta::subscribe_response::Operation;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{PbSubscriptionUpstreamInfo, PbUpstreamSinkInfo};
29use tracing::warn;
30
31use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
32use crate::barrier::rpc::ControlStreamManager;
33use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
34use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
35use crate::controller::utils::rebuild_fragment_mapping;
36use crate::manager::NotificationManagerRef;
37use crate::model::{ActorId, FragmentId, SubscriptionId};
38
39#[derive(Debug, Clone)]
40pub struct SharedFragmentInfo {
41    pub fragment_id: FragmentId,
42    pub distribution_type: DistributionType,
43    pub actors: HashMap<ActorId, InflightActorInfo>,
44}
45
46impl From<&InflightFragmentInfo> for SharedFragmentInfo {
47    fn from(info: &InflightFragmentInfo) -> Self {
48        Self {
49            fragment_id: info.fragment_id,
50            distribution_type: info.distribution_type,
51            actors: info.actors.clone(),
52        }
53    }
54}
55
56type SharedActorInfosInner = HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>;
57
58#[derive(Clone, educe::Educe)]
59#[educe(Debug)]
60pub(crate) struct SharedActorInfos {
61    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
62    #[educe(Debug(ignore))]
63    notification_manager: NotificationManagerRef,
64}
65
66impl SharedActorInfos {
67    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
68        Self {
69            inner: Arc::new(Default::default()),
70            notification_manager,
71        }
72    }
73
74    pub(super) fn remove_database(&self, database_id: DatabaseId) {
75        if let Some(database) = self.inner.write().remove(&database_id) {
76            let mapping = database
77                .into_values()
78                .map(|fragment| rebuild_fragment_mapping(&fragment))
79                .collect_vec();
80            if !mapping.is_empty() {
81                self.notification_manager
82                    .notify_fragment_mapping(Operation::Delete, mapping);
83            }
84        }
85    }
86
87    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
88        let database_ids: HashSet<_> = database_ids.into_iter().collect();
89
90        let mut mapping = Vec::new();
91        for fragment in self
92            .inner
93            .write()
94            .extract_if(|database_id, _| !database_ids.contains(database_id))
95            .flat_map(|(_, fragments)| fragments.into_values())
96        {
97            mapping.push(rebuild_fragment_mapping(&fragment));
98        }
99        if !mapping.is_empty() {
100            self.notification_manager
101                .notify_fragment_mapping(Operation::Delete, mapping);
102        }
103    }
104
105    pub(super) fn recover_database(
106        &self,
107        database_id: DatabaseId,
108        fragments: impl Iterator<Item = &InflightFragmentInfo>,
109    ) {
110        let mut remaining_fragments: HashMap<_, _> = fragments
111            .map(|fragment| (fragment.fragment_id, fragment))
112            .collect();
113        // delete the fragments that exist previously, but not included in the recovered fragments
114        let mut writer = self.start_writer(database_id);
115        let database = writer.write_guard.entry(database_id).or_default();
116        for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
117            if let Some(info) = remaining_fragments.remove(fragment_id) {
118                let info = info.into();
119                writer
120                    .updated_fragment_mapping
121                    .get_or_insert_default()
122                    .push(rebuild_fragment_mapping(&info));
123                *fragment_info = info;
124                false
125            } else {
126                true
127            }
128        }) {
129            writer
130                .deleted_fragment_mapping
131                .get_or_insert_default()
132                .push(rebuild_fragment_mapping(&fragment));
133        }
134        for (fragment_id, fragment) in remaining_fragments {
135            let info = fragment.into();
136            writer
137                .added_fragment_mapping
138                .get_or_insert_default()
139                .push(rebuild_fragment_mapping(&info));
140            database.insert(fragment_id, info);
141        }
142        writer.finish();
143    }
144
145    pub(super) fn upsert(
146        &self,
147        database_id: DatabaseId,
148        infos: impl IntoIterator<Item = &InflightFragmentInfo>,
149    ) {
150        let mut writer = self.start_writer(database_id);
151        writer.upsert(infos);
152        writer.finish();
153    }
154
155    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
156        SharedActorInfoWriter {
157            database_id,
158            write_guard: self.inner.write(),
159            notification_manager: &self.notification_manager,
160            added_fragment_mapping: None,
161            updated_fragment_mapping: None,
162            deleted_fragment_mapping: None,
163        }
164    }
165}
166
167pub(super) struct SharedActorInfoWriter<'a> {
168    database_id: DatabaseId,
169    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
170    notification_manager: &'a NotificationManagerRef,
171    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
172    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
173    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
174}
175
176impl SharedActorInfoWriter<'_> {
177    pub(super) fn upsert(&mut self, infos: impl IntoIterator<Item = &InflightFragmentInfo>) {
178        let database = self.write_guard.entry(self.database_id).or_default();
179        for info in infos {
180            match database.entry(info.fragment_id) {
181                Entry::Occupied(mut entry) => {
182                    let info = info.into();
183                    self.updated_fragment_mapping
184                        .get_or_insert_default()
185                        .push(rebuild_fragment_mapping(&info));
186                    entry.insert(info);
187                }
188                Entry::Vacant(entry) => {
189                    let info = info.into();
190                    self.added_fragment_mapping
191                        .get_or_insert_default()
192                        .push(rebuild_fragment_mapping(&info));
193                    entry.insert(info);
194                }
195            }
196        }
197    }
198
199    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
200        if let Some(database) = self.write_guard.get_mut(&self.database_id)
201            && let Some(fragment) = database.remove(&info.fragment_id)
202        {
203            self.deleted_fragment_mapping
204                .get_or_insert_default()
205                .push(rebuild_fragment_mapping(&fragment));
206        }
207    }
208
209    pub(super) fn finish(self) {
210        if let Some(mapping) = self.added_fragment_mapping {
211            self.notification_manager
212                .notify_fragment_mapping(Operation::Add, mapping);
213        }
214        if let Some(mapping) = self.updated_fragment_mapping {
215            self.notification_manager
216                .notify_fragment_mapping(Operation::Update, mapping);
217        }
218        if let Some(mapping) = self.deleted_fragment_mapping {
219            self.notification_manager
220                .notify_fragment_mapping(Operation::Delete, mapping);
221        }
222    }
223}
224
225#[derive(Debug, Clone)]
226pub(super) struct BarrierInfo {
227    pub prev_epoch: TracedEpoch,
228    pub curr_epoch: TracedEpoch,
229    pub kind: BarrierKind,
230}
231
232impl BarrierInfo {
233    pub(super) fn prev_epoch(&self) -> u64 {
234        self.prev_epoch.value().0
235    }
236}
237
238#[derive(Debug, Clone)]
239pub(crate) enum CommandFragmentChanges {
240    NewFragment {
241        job_id: TableId,
242        info: InflightFragmentInfo,
243        /// Whether the fragment already exists before added. This is used
244        /// when snapshot backfill is finished and add its fragment info
245        /// back to the database.
246        is_existing: bool,
247    },
248    AddNodeUpstream(PbUpstreamSinkInfo),
249    DropNodeUpstream(Vec<FragmentId>),
250    ReplaceNodeUpstream(
251        /// old `fragment_id` -> new `fragment_id`
252        HashMap<FragmentId, FragmentId>,
253    ),
254    Reschedule {
255        new_actors: HashMap<ActorId, InflightActorInfo>,
256        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
257        to_remove: HashSet<ActorId>,
258    },
259    RemoveFragment,
260}
261
262#[derive(Default, Clone, Debug)]
263pub struct InflightSubscriptionInfo {
264    /// `mv_table_id` => `subscription_id` => retention seconds
265    pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
266}
267
268#[derive(Clone, Debug)]
269pub struct InflightStreamingJobInfo {
270    pub job_id: TableId,
271    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
272}
273
274impl InflightStreamingJobInfo {
275    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
276        self.fragment_infos.values()
277    }
278
279    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
280        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
281    }
282}
283
284impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
285    type Item = &'a InflightFragmentInfo;
286
287    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
288
289    fn into_iter(self) -> Self::IntoIter {
290        self.fragment_infos()
291    }
292}
293
294#[derive(Clone, Debug)]
295pub struct InflightDatabaseInfo {
296    database_id: DatabaseId,
297    jobs: HashMap<TableId, InflightStreamingJobInfo>,
298    fragment_location: HashMap<FragmentId, TableId>,
299    pub(super) shared_actor_infos: SharedActorInfos,
300}
301
302impl InflightDatabaseInfo {
303    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
304        self.jobs.values().flat_map(|job| job.fragment_infos())
305    }
306
307    pub fn contains_job(&self, job_id: TableId) -> bool {
308        self.jobs.contains_key(&job_id)
309    }
310
311    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
312        let job_id = self.fragment_location[&fragment_id];
313        self.jobs
314            .get(&job_id)
315            .expect("should exist")
316            .fragment_infos
317            .get(&fragment_id)
318            .expect("should exist")
319    }
320
321    fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
322        let job_id = self.fragment_location[&fragment_id];
323        self.jobs
324            .get_mut(&job_id)
325            .expect("should exist")
326            .fragment_infos
327            .get_mut(&fragment_id)
328            .expect("should exist")
329    }
330
331    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
332        Self {
333            database_id,
334            jobs: Default::default(),
335            fragment_location: Default::default(),
336            shared_actor_infos,
337        }
338    }
339
340    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
341        // remove the database because it's empty.
342        shared_actor_infos.remove_database(database_id);
343        Self::empty_inner(database_id, shared_actor_infos)
344    }
345
346    pub fn recover(
347        database_id: DatabaseId,
348        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
349        shared_actor_infos: SharedActorInfos,
350    ) -> Self {
351        let mut info = Self::empty_inner(database_id, shared_actor_infos);
352        for job in jobs {
353            info.add_existing(job);
354        }
355        info
356    }
357
358    pub fn is_empty(&self) -> bool {
359        self.jobs.is_empty()
360    }
361
362    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
363        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
364            (
365                fragment_id,
366                CommandFragmentChanges::NewFragment {
367                    job_id: job.job_id,
368                    info,
369                    is_existing: true,
370                },
371            )
372        }))
373    }
374
375    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
376    /// the info correspondingly.
377    pub(crate) fn pre_apply(
378        &mut self,
379        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
380    ) {
381        self.apply_add(
382            fragment_changes
383                .iter()
384                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
385        )
386    }
387
388    fn apply_add(
389        &mut self,
390        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
391    ) {
392        {
393            let shared_infos = self.shared_actor_infos.clone();
394            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
395            for (fragment_id, change) in fragment_changes {
396                match change {
397                    CommandFragmentChanges::NewFragment {
398                        job_id,
399                        info,
400                        is_existing,
401                    } => {
402                        let fragment_infos =
403                            self.jobs
404                                .entry(job_id)
405                                .or_insert_with(|| InflightStreamingJobInfo {
406                                    job_id,
407                                    fragment_infos: Default::default(),
408                                });
409                        if !is_existing {
410                            shared_actor_writer.upsert([&info]);
411                        }
412                        fragment_infos
413                            .fragment_infos
414                            .try_insert(fragment_id, info)
415                            .expect("non duplicate");
416                        self.fragment_location
417                            .try_insert(fragment_id, job_id)
418                            .expect("non duplicate");
419                    }
420                    CommandFragmentChanges::Reschedule {
421                        new_actors,
422                        actor_update_vnode_bitmap,
423                        ..
424                    } => {
425                        let info = self.fragment_mut(fragment_id);
426                        let actors = &mut info.actors;
427                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
428                            actors
429                                .get_mut(&actor_id)
430                                .expect("should exist")
431                                .vnode_bitmap = Some(new_vnodes);
432                        }
433                        for (actor_id, actor) in new_actors {
434                            actors
435                                .try_insert(actor_id as _, actor)
436                                .expect("non-duplicate");
437                        }
438                    }
439                    CommandFragmentChanges::RemoveFragment => {}
440                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
441                        let mut remaining_fragment_ids: HashSet<_> =
442                            replace_map.keys().cloned().collect();
443                        let info = self.fragment_mut(fragment_id);
444                        visit_stream_node_mut(&mut info.nodes, |node| {
445                            if let NodeBody::Merge(m) = node
446                                && let Some(new_upstream_fragment_id) =
447                                    replace_map.get(&m.upstream_fragment_id)
448                            {
449                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
450                                    if cfg!(debug_assertions) {
451                                        panic!(
452                                            "duplicate upstream fragment: {:?} {:?}",
453                                            m, replace_map
454                                        );
455                                    } else {
456                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
457                                    }
458                                }
459                                m.upstream_fragment_id = *new_upstream_fragment_id;
460                            }
461                        });
462                        if cfg!(debug_assertions) {
463                            assert!(
464                                remaining_fragment_ids.is_empty(),
465                                "non-existing fragment to replace: {:?} {:?} {:?}",
466                                remaining_fragment_ids,
467                                info.nodes,
468                                replace_map
469                            );
470                        } else {
471                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
472                        }
473                    }
474                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
475                        let info = self.fragment_mut(fragment_id);
476                        let mut injected = false;
477                        visit_stream_node_mut(&mut info.nodes, |node| {
478                            if let NodeBody::UpstreamSinkUnion(u) = node {
479                                if cfg!(debug_assertions) {
480                                    let current_upstream_fragment_ids = u
481                                        .init_upstreams
482                                        .iter()
483                                        .map(|upstream| upstream.upstream_fragment_id)
484                                        .collect::<HashSet<_>>();
485                                    if current_upstream_fragment_ids
486                                        .contains(&new_upstream_info.upstream_fragment_id)
487                                    {
488                                        panic!(
489                                            "duplicate upstream fragment: {:?} {:?}",
490                                            u, new_upstream_info
491                                        );
492                                    }
493                                }
494                                u.init_upstreams.push(new_upstream_info.clone());
495                                injected = true;
496                            }
497                        });
498                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
499                    }
500                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
501                        let info = self.fragment_mut(fragment_id);
502                        let mut removed = false;
503                        visit_stream_node_mut(&mut info.nodes, |node| {
504                            if let NodeBody::UpstreamSinkUnion(u) = node {
505                                if cfg!(debug_assertions) {
506                                    let current_upstream_fragment_ids = u
507                                        .init_upstreams
508                                        .iter()
509                                        .map(|upstream| upstream.upstream_fragment_id)
510                                        .collect::<HashSet<_>>();
511                                    for drop_fragment_id in &drop_upstream_fragment_ids {
512                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
513                                        {
514                                            panic!(
515                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
516                                                u, drop_upstream_fragment_ids, drop_fragment_id
517                                            );
518                                        }
519                                    }
520                                }
521                                u.init_upstreams.retain(|upstream| {
522                                    !drop_upstream_fragment_ids
523                                        .contains(&upstream.upstream_fragment_id)
524                                });
525                                removed = true;
526                            }
527                        });
528                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
529                    }
530                }
531            }
532            shared_actor_writer.finish();
533        }
534    }
535
536    pub(super) fn build_edge(
537        &self,
538        command: Option<&Command>,
539        control_stream_manager: &ControlStreamManager,
540    ) -> Option<FragmentEdgeBuildResult> {
541        let (info, replace_job, new_upstream_sink) = match command {
542            None => {
543                return None;
544            }
545            Some(command) => match command {
546                Command::Flush
547                | Command::Pause
548                | Command::Resume
549                | Command::DropStreamingJobs { .. }
550                | Command::MergeSnapshotBackfillStreamingJobs(_)
551                | Command::RescheduleFragment { .. }
552                | Command::SourceChangeSplit(_)
553                | Command::Throttle(_)
554                | Command::CreateSubscription { .. }
555                | Command::DropSubscription { .. }
556                | Command::ConnectorPropsChange(_)
557                | Command::StartFragmentBackfill { .. }
558                | Command::Refresh { .. }
559                | Command::LoadFinish { .. } => {
560                    return None;
561                }
562                Command::CreateStreamingJob { info, job_type, .. } => {
563                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
564                        new_upstream_sink,
565                    ) = job_type
566                    {
567                        Some(new_upstream_sink)
568                    } else {
569                        None
570                    };
571                    (Some(info), None, new_upstream_sink)
572                }
573                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
574            },
575        };
576        // `existing_fragment_ids` consists of
577        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
578        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
579        // if the upstream fragment previously exists
580        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
581        // if creating a new sink-into-table
582        //  - should contain the `fragment_id` of the downstream table.
583        let existing_fragment_ids = info
584            .into_iter()
585            .flat_map(|info| info.upstream_fragment_downstreams.keys())
586            .chain(replace_job.into_iter().flat_map(|replace_job| {
587                replace_job
588                    .upstream_fragment_downstreams
589                    .keys()
590                    .filter(|fragment_id| {
591                        info.map(|info| {
592                            !info
593                                .stream_job_fragments
594                                .fragments
595                                .contains_key(fragment_id)
596                        })
597                        .unwrap_or(true)
598                    })
599                    .chain(replace_job.replace_upstream.keys())
600            }))
601            .chain(
602                new_upstream_sink
603                    .into_iter()
604                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
605            )
606            .cloned();
607        let new_fragment_infos = info
608            .into_iter()
609            .flat_map(|info| info.stream_job_fragments.new_fragment_info())
610            .chain(replace_job.into_iter().flat_map(|replace_job| {
611                replace_job.new_fragments.new_fragment_info().chain(
612                    replace_job
613                        .auto_refresh_schema_sinks
614                        .as_ref()
615                        .into_iter()
616                        .flat_map(|sinks| {
617                            sinks.iter().map(|sink| {
618                                (sink.new_fragment.fragment_id, sink.new_fragment_info())
619                            })
620                        }),
621                )
622            }))
623            .collect_vec();
624        let mut builder = FragmentEdgeBuilder::new(
625            existing_fragment_ids
626                .map(|fragment_id| self.fragment(fragment_id))
627                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
628            control_stream_manager,
629        );
630        if let Some(info) = info {
631            builder.add_relations(&info.upstream_fragment_downstreams);
632            builder.add_relations(&info.stream_job_fragments.downstreams);
633        }
634        if let Some(replace_job) = replace_job {
635            builder.add_relations(&replace_job.upstream_fragment_downstreams);
636            builder.add_relations(&replace_job.new_fragments.downstreams);
637        }
638        if let Some(new_upstream_sink) = new_upstream_sink {
639            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
640            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
641            builder.add_edge(sink_fragment_id, new_sink_downstream);
642        }
643        if let Some(replace_job) = replace_job {
644            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
645                for (original_upstream_fragment_id, new_upstream_fragment_id) in
646                    fragment_replacement
647                {
648                    builder.replace_upstream(
649                        *fragment_id,
650                        *original_upstream_fragment_id,
651                        *new_upstream_fragment_id,
652                    );
653                }
654            }
655        }
656        Some(builder.build())
657    }
658}
659
660impl InflightSubscriptionInfo {
661    pub fn pre_apply(&mut self, command: &Command) {
662        if let Command::CreateSubscription {
663            subscription_id,
664            upstream_mv_table_id,
665            retention_second,
666        } = command
667            && let Some(prev_retiontion) = self
668                .mv_depended_subscriptions
669                .entry(*upstream_mv_table_id)
670                .or_default()
671                .insert(*subscription_id, *retention_second)
672        {
673            warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
674        }
675    }
676}
677
678impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
679    type Item = PbSubscriptionUpstreamInfo;
680
681    type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
682
683    fn into_iter(self) -> Self::IntoIter {
684        self.mv_depended_subscriptions
685            .iter()
686            .flat_map(|(table_id, subscriptions)| {
687                subscriptions
688                    .keys()
689                    .map(|subscriber_id| PbSubscriptionUpstreamInfo {
690                        subscriber_id: *subscriber_id,
691                        upstream_mv_table_id: table_id.table_id,
692                    })
693            })
694    }
695}
696
697impl InflightDatabaseInfo {
698    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
699    /// remove that from the snapshot correspondingly.
700    pub(crate) fn post_apply(
701        &mut self,
702        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
703    ) {
704        let inner = self.shared_actor_infos.clone();
705        let mut shared_actor_writer = inner.start_writer(self.database_id);
706        {
707            for (fragment_id, changes) in fragment_changes {
708                match changes {
709                    CommandFragmentChanges::NewFragment { .. } => {}
710                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
711                        let job_id = self.fragment_location[fragment_id];
712                        let info = self
713                            .jobs
714                            .get_mut(&job_id)
715                            .expect("should exist")
716                            .fragment_infos
717                            .get_mut(fragment_id)
718                            .expect("should exist");
719                        for actor_id in to_remove {
720                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
721                        }
722                        shared_actor_writer.upsert([&*info]);
723                    }
724                    CommandFragmentChanges::RemoveFragment => {
725                        let job_id = self
726                            .fragment_location
727                            .remove(fragment_id)
728                            .expect("should exist");
729                        let job = self.jobs.get_mut(&job_id).expect("should exist");
730                        let fragment = job
731                            .fragment_infos
732                            .remove(fragment_id)
733                            .expect("should exist");
734                        shared_actor_writer.remove(&fragment);
735                        if job.fragment_infos.is_empty() {
736                            self.jobs.remove(&job_id).expect("should exist");
737                        }
738                    }
739                    CommandFragmentChanges::ReplaceNodeUpstream(_)
740                    | CommandFragmentChanges::AddNodeUpstream(_)
741                    | CommandFragmentChanges::DropNodeUpstream(_) => {}
742                }
743            }
744        }
745        shared_actor_writer.finish();
746    }
747}
748
749impl InflightSubscriptionInfo {
750    pub fn post_apply(&mut self, command: &Command) {
751        if let Command::DropSubscription {
752            subscription_id,
753            upstream_mv_table_id,
754        } = command
755        {
756            let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
757                Some(subscriptions) => {
758                    let removed = subscriptions.remove(subscription_id).is_some();
759                    if removed && subscriptions.is_empty() {
760                        self.mv_depended_subscriptions.remove(upstream_mv_table_id);
761                    }
762                    removed
763                }
764                None => false,
765            };
766            if !removed {
767                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
768            }
769        }
770    }
771}
772
773impl InflightFragmentInfo {
774    /// Returns actor list to collect in the target worker node.
775    pub(crate) fn actor_ids_to_collect(
776        infos: impl IntoIterator<Item = &Self>,
777    ) -> HashMap<WorkerId, HashSet<ActorId>> {
778        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
779        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
780            assert!(
781                ret.entry(actor.worker_id)
782                    .or_default()
783                    .insert(*actor_id as _)
784            )
785        }
786        ret
787    }
788
789    pub fn existing_table_ids<'a>(
790        infos: impl IntoIterator<Item = &'a Self> + 'a,
791    ) -> impl Iterator<Item = TableId> + 'a {
792        infos
793            .into_iter()
794            .flat_map(|info| info.state_table_ids.iter().cloned())
795    }
796
797    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
798        infos.into_iter().any(|fragment| {
799            fragment
800                .actors
801                .values()
802                .any(|actor| (actor.worker_id) == worker_id)
803        })
804    }
805
806    pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
807        infos
808            .into_iter()
809            .flat_map(|info| info.actors.values())
810            .map(|actor| actor.worker_id)
811            .collect()
812    }
813}
814
815impl InflightDatabaseInfo {
816    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
817        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
818    }
819
820    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
821        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
822    }
823}