risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
23use risingwave_meta_model::WorkerId;
24use risingwave_meta_model::fragment::DistributionType;
25use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
26use risingwave_pb::meta::subscribe_response::Operation;
27use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
28use risingwave_pb::stream_plan::stream_node::NodeBody;
29use tracing::warn;
30
31use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
32use crate::barrier::rpc::ControlStreamManager;
33use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
34use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
35use crate::controller::utils::rebuild_fragment_mapping;
36use crate::manager::NotificationManagerRef;
37use crate::model::{ActorId, FragmentId, SubscriptionId};
38
39#[derive(Debug, Clone)]
40pub struct SharedFragmentInfo {
41    pub fragment_id: FragmentId,
42    pub distribution_type: DistributionType,
43    pub actors: HashMap<ActorId, InflightActorInfo>,
44}
45
46impl From<&InflightFragmentInfo> for SharedFragmentInfo {
47    fn from(info: &InflightFragmentInfo) -> Self {
48        Self {
49            fragment_id: info.fragment_id,
50            distribution_type: info.distribution_type,
51            actors: info.actors.clone(),
52        }
53    }
54}
55
56type SharedActorInfosInner = HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>;
57
58#[derive(Clone, educe::Educe)]
59#[educe(Debug)]
60pub(crate) struct SharedActorInfos {
61    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
62    #[educe(Debug(ignore))]
63    notification_manager: NotificationManagerRef,
64}
65
66impl SharedActorInfos {
67    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
68        Self {
69            inner: Arc::new(Default::default()),
70            notification_manager,
71        }
72    }
73
74    pub(super) fn remove_database(&self, database_id: DatabaseId) {
75        if let Some(database) = self.inner.write().remove(&database_id) {
76            let mapping = database
77                .into_values()
78                .map(|fragment| rebuild_fragment_mapping(&fragment))
79                .collect_vec();
80            if !mapping.is_empty() {
81                self.notification_manager
82                    .notify_fragment_mapping(Operation::Delete, mapping);
83            }
84        }
85    }
86
87    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
88        let database_ids: HashSet<_> = database_ids.into_iter().collect();
89
90        let mut mapping = Vec::new();
91        for fragment in self
92            .inner
93            .write()
94            .extract_if(|database_id, _| !database_ids.contains(database_id))
95            .flat_map(|(_, fragments)| fragments.into_values())
96        {
97            mapping.push(rebuild_fragment_mapping(&fragment));
98        }
99        if !mapping.is_empty() {
100            self.notification_manager
101                .notify_fragment_mapping(Operation::Delete, mapping);
102        }
103    }
104
105    pub(super) fn recover_database(
106        &self,
107        database_id: DatabaseId,
108        fragments: impl Iterator<Item = &InflightFragmentInfo>,
109    ) {
110        let mut remaining_fragments: HashMap<_, _> = fragments
111            .map(|fragment| (fragment.fragment_id, fragment))
112            .collect();
113        // delete the fragments that exist previously, but not included in the recovered fragments
114        let mut writer = self.start_writer(database_id);
115        let database = writer.write_guard.entry(database_id).or_default();
116        for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
117            if let Some(info) = remaining_fragments.remove(fragment_id) {
118                let info = info.into();
119                writer
120                    .updated_fragment_mapping
121                    .get_or_insert_default()
122                    .push(rebuild_fragment_mapping(&info));
123                *fragment_info = info;
124                false
125            } else {
126                true
127            }
128        }) {
129            writer
130                .deleted_fragment_mapping
131                .get_or_insert_default()
132                .push(rebuild_fragment_mapping(&fragment));
133        }
134        for (fragment_id, fragment) in remaining_fragments {
135            let info = fragment.into();
136            writer
137                .added_fragment_mapping
138                .get_or_insert_default()
139                .push(rebuild_fragment_mapping(&info));
140            database.insert(fragment_id, info);
141        }
142        writer.finish();
143    }
144
145    pub(super) fn upsert(
146        &self,
147        database_id: DatabaseId,
148        infos: impl IntoIterator<Item = &InflightFragmentInfo>,
149    ) {
150        let mut writer = self.start_writer(database_id);
151        writer.upsert(infos);
152        writer.finish();
153    }
154
155    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
156        SharedActorInfoWriter {
157            database_id,
158            write_guard: self.inner.write(),
159            notification_manager: &self.notification_manager,
160            added_fragment_mapping: None,
161            updated_fragment_mapping: None,
162            deleted_fragment_mapping: None,
163        }
164    }
165}
166
167pub(super) struct SharedActorInfoWriter<'a> {
168    database_id: DatabaseId,
169    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
170    notification_manager: &'a NotificationManagerRef,
171    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
172    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
173    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
174}
175
176impl SharedActorInfoWriter<'_> {
177    pub(super) fn upsert(&mut self, infos: impl IntoIterator<Item = &InflightFragmentInfo>) {
178        let database = self.write_guard.entry(self.database_id).or_default();
179        for info in infos {
180            match database.entry(info.fragment_id) {
181                Entry::Occupied(mut entry) => {
182                    let info = info.into();
183                    self.updated_fragment_mapping
184                        .get_or_insert_default()
185                        .push(rebuild_fragment_mapping(&info));
186                    entry.insert(info);
187                }
188                Entry::Vacant(entry) => {
189                    let info = info.into();
190                    self.added_fragment_mapping
191                        .get_or_insert_default()
192                        .push(rebuild_fragment_mapping(&info));
193                    entry.insert(info);
194                }
195            }
196        }
197    }
198
199    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
200        if let Some(database) = self.write_guard.get_mut(&self.database_id)
201            && let Some(fragment) = database.remove(&info.fragment_id)
202        {
203            self.deleted_fragment_mapping
204                .get_or_insert_default()
205                .push(rebuild_fragment_mapping(&fragment));
206        }
207    }
208
209    pub(super) fn finish(self) {
210        if let Some(mapping) = self.added_fragment_mapping {
211            self.notification_manager
212                .notify_fragment_mapping(Operation::Add, mapping);
213        }
214        if let Some(mapping) = self.updated_fragment_mapping {
215            self.notification_manager
216                .notify_fragment_mapping(Operation::Update, mapping);
217        }
218        if let Some(mapping) = self.deleted_fragment_mapping {
219            self.notification_manager
220                .notify_fragment_mapping(Operation::Delete, mapping);
221        }
222    }
223}
224
225#[derive(Debug, Clone)]
226pub(super) struct BarrierInfo {
227    pub prev_epoch: TracedEpoch,
228    pub curr_epoch: TracedEpoch,
229    pub kind: BarrierKind,
230}
231
232impl BarrierInfo {
233    pub(super) fn prev_epoch(&self) -> u64 {
234        self.prev_epoch.value().0
235    }
236}
237
238#[derive(Debug, Clone)]
239pub(crate) enum CommandFragmentChanges {
240    NewFragment {
241        job_id: TableId,
242        info: InflightFragmentInfo,
243        /// Whether the fragment already exists before added. This is used
244        /// when snapshot backfill is finished and add its fragment info
245        /// back to the database.
246        is_existing: bool,
247    },
248    ReplaceNodeUpstream(
249        /// old `fragment_id` -> new `fragment_id`
250        HashMap<FragmentId, FragmentId>,
251    ),
252    Reschedule {
253        new_actors: HashMap<ActorId, InflightActorInfo>,
254        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
255        to_remove: HashSet<ActorId>,
256    },
257    RemoveFragment,
258}
259
260#[derive(Default, Clone, Debug)]
261pub struct InflightSubscriptionInfo {
262    /// `mv_table_id` => `subscription_id` => retention seconds
263    pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
264}
265
266#[derive(Clone, Debug)]
267pub struct InflightStreamingJobInfo {
268    pub job_id: TableId,
269    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
270}
271
272impl InflightStreamingJobInfo {
273    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
274        self.fragment_infos.values()
275    }
276
277    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
278        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
279    }
280}
281
282impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
283    type Item = &'a InflightFragmentInfo;
284
285    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
286
287    fn into_iter(self) -> Self::IntoIter {
288        self.fragment_infos()
289    }
290}
291
292#[derive(Clone, Debug)]
293pub struct InflightDatabaseInfo {
294    database_id: DatabaseId,
295    jobs: HashMap<TableId, InflightStreamingJobInfo>,
296    fragment_location: HashMap<FragmentId, TableId>,
297    pub(super) shared_actor_infos: SharedActorInfos,
298}
299
300impl InflightDatabaseInfo {
301    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
302        self.jobs.values().flat_map(|job| job.fragment_infos())
303    }
304
305    pub fn contains_job(&self, job_id: TableId) -> bool {
306        self.jobs.contains_key(&job_id)
307    }
308
309    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
310        let job_id = self.fragment_location[&fragment_id];
311        self.jobs
312            .get(&job_id)
313            .expect("should exist")
314            .fragment_infos
315            .get(&fragment_id)
316            .expect("should exist")
317    }
318
319    fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
320        let job_id = self.fragment_location[&fragment_id];
321        self.jobs
322            .get_mut(&job_id)
323            .expect("should exist")
324            .fragment_infos
325            .get_mut(&fragment_id)
326            .expect("should exist")
327    }
328
329    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
330        Self {
331            database_id,
332            jobs: Default::default(),
333            fragment_location: Default::default(),
334            shared_actor_infos,
335        }
336    }
337
338    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
339        // remove the database because it's empty.
340        shared_actor_infos.remove_database(database_id);
341        Self::empty_inner(database_id, shared_actor_infos)
342    }
343
344    pub fn recover(
345        database_id: DatabaseId,
346        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
347        shared_actor_infos: SharedActorInfos,
348    ) -> Self {
349        let mut info = Self::empty_inner(database_id, shared_actor_infos);
350        for job in jobs {
351            info.add_existing(job);
352        }
353        info
354    }
355
356    pub fn is_empty(&self) -> bool {
357        self.jobs.is_empty()
358    }
359
360    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
361        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
362            (
363                fragment_id,
364                CommandFragmentChanges::NewFragment {
365                    job_id: job.job_id,
366                    info,
367                    is_existing: true,
368                },
369            )
370        }))
371    }
372
373    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
374    /// the info correspondingly.
375    pub(crate) fn pre_apply(
376        &mut self,
377        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
378    ) {
379        self.apply_add(
380            fragment_changes
381                .iter()
382                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
383        )
384    }
385
386    fn apply_add(
387        &mut self,
388        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
389    ) {
390        {
391            let shared_infos = self.shared_actor_infos.clone();
392            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
393            for (fragment_id, change) in fragment_changes {
394                match change {
395                    CommandFragmentChanges::NewFragment {
396                        job_id,
397                        info,
398                        is_existing,
399                    } => {
400                        let fragment_infos =
401                            self.jobs
402                                .entry(job_id)
403                                .or_insert_with(|| InflightStreamingJobInfo {
404                                    job_id,
405                                    fragment_infos: Default::default(),
406                                });
407                        if !is_existing {
408                            shared_actor_writer.upsert([&info]);
409                        }
410                        fragment_infos
411                            .fragment_infos
412                            .try_insert(fragment_id, info)
413                            .expect("non duplicate");
414                        self.fragment_location
415                            .try_insert(fragment_id, job_id)
416                            .expect("non duplicate");
417                    }
418                    CommandFragmentChanges::Reschedule {
419                        new_actors,
420                        actor_update_vnode_bitmap,
421                        ..
422                    } => {
423                        let info = self.fragment_mut(fragment_id);
424                        let actors = &mut info.actors;
425                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
426                            actors
427                                .get_mut(&actor_id)
428                                .expect("should exist")
429                                .vnode_bitmap = Some(new_vnodes);
430                        }
431                        for (actor_id, actor) in new_actors {
432                            actors
433                                .try_insert(actor_id as _, actor)
434                                .expect("non-duplicate");
435                        }
436                    }
437                    CommandFragmentChanges::RemoveFragment => {}
438                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
439                        let mut remaining_fragment_ids: HashSet<_> =
440                            replace_map.keys().cloned().collect();
441                        let info = self.fragment_mut(fragment_id);
442                        visit_stream_node_mut(&mut info.nodes, |node| {
443                            if let NodeBody::Merge(m) = node
444                                && let Some(new_upstream_fragment_id) =
445                                    replace_map.get(&m.upstream_fragment_id)
446                            {
447                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
448                                    if cfg!(debug_assertions) {
449                                        panic!(
450                                            "duplicate upstream fragment: {:?} {:?}",
451                                            m, replace_map
452                                        );
453                                    } else {
454                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
455                                    }
456                                }
457                                m.upstream_fragment_id = *new_upstream_fragment_id;
458                            }
459                        });
460                        if cfg!(debug_assertions) {
461                            assert!(
462                                remaining_fragment_ids.is_empty(),
463                                "non-existing fragment to replace: {:?} {:?} {:?}",
464                                remaining_fragment_ids,
465                                info.nodes,
466                                replace_map
467                            );
468                        } else {
469                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
470                        }
471                    }
472                }
473            }
474            shared_actor_writer.finish();
475        }
476    }
477
478    pub(super) fn build_edge(
479        &self,
480        command: Option<&Command>,
481        control_stream_manager: &ControlStreamManager,
482    ) -> Option<FragmentEdgeBuildResult> {
483        let (info, replace_job) = match command {
484            None => {
485                return None;
486            }
487            Some(command) => match command {
488                Command::Flush
489                | Command::Pause
490                | Command::Resume
491                | Command::DropStreamingJobs { .. }
492                | Command::MergeSnapshotBackfillStreamingJobs(_)
493                | Command::RescheduleFragment { .. }
494                | Command::SourceChangeSplit(_)
495                | Command::Throttle(_)
496                | Command::CreateSubscription { .. }
497                | Command::DropSubscription { .. }
498                | Command::ConnectorPropsChange(_)
499                | Command::StartFragmentBackfill { .. }
500                | Command::Refresh { .. }
501                | Command::LoadFinish { .. } => {
502                    return None;
503                }
504                Command::CreateStreamingJob { info, job_type, .. } => {
505                    let replace_job = match job_type {
506                        CreateStreamingJobType::Normal
507                        | CreateStreamingJobType::SnapshotBackfill(_) => None,
508                        CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
509                    };
510                    (Some(info), replace_job)
511                }
512                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
513            },
514        };
515        // `existing_fragment_ids` consists of
516        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
517        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
518        // if the upstream fragment previously exists
519        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments.
520        let existing_fragment_ids = info
521            .into_iter()
522            .flat_map(|info| info.upstream_fragment_downstreams.keys())
523            .chain(replace_job.into_iter().flat_map(|replace_job| {
524                replace_job
525                    .upstream_fragment_downstreams
526                    .keys()
527                    .filter(|fragment_id| {
528                        info.map(|info| {
529                            !info
530                                .stream_job_fragments
531                                .fragments
532                                .contains_key(fragment_id)
533                        })
534                        .unwrap_or(true)
535                    })
536                    .chain(replace_job.replace_upstream.keys())
537            }))
538            .cloned();
539        let new_fragment_infos = info
540            .into_iter()
541            .flat_map(|info| info.stream_job_fragments.new_fragment_info())
542            .chain(replace_job.into_iter().flat_map(|replace_job| {
543                replace_job.new_fragments.new_fragment_info().chain(
544                    replace_job
545                        .auto_refresh_schema_sinks
546                        .as_ref()
547                        .into_iter()
548                        .flat_map(|sinks| {
549                            sinks.iter().map(|sink| {
550                                (sink.new_fragment.fragment_id, sink.new_fragment_info())
551                            })
552                        }),
553                )
554            }))
555            .collect_vec();
556        let mut builder = FragmentEdgeBuilder::new(
557            existing_fragment_ids
558                .map(|fragment_id| self.fragment(fragment_id))
559                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
560            control_stream_manager,
561        );
562        if let Some(info) = info {
563            builder.add_relations(&info.upstream_fragment_downstreams);
564            builder.add_relations(&info.stream_job_fragments.downstreams);
565        }
566        if let Some(replace_job) = replace_job {
567            builder.add_relations(&replace_job.upstream_fragment_downstreams);
568            builder.add_relations(&replace_job.new_fragments.downstreams);
569        }
570        if let Some(replace_job) = replace_job {
571            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
572                for (original_upstream_fragment_id, new_upstream_fragment_id) in
573                    fragment_replacement
574                {
575                    builder.replace_upstream(
576                        *fragment_id,
577                        *original_upstream_fragment_id,
578                        *new_upstream_fragment_id,
579                    );
580                }
581            }
582        }
583        Some(builder.build())
584    }
585}
586
587impl InflightSubscriptionInfo {
588    pub fn pre_apply(&mut self, command: &Command) {
589        if let Command::CreateSubscription {
590            subscription_id,
591            upstream_mv_table_id,
592            retention_second,
593        } = command
594            && let Some(prev_retiontion) = self
595                .mv_depended_subscriptions
596                .entry(*upstream_mv_table_id)
597                .or_default()
598                .insert(*subscription_id, *retention_second)
599        {
600            warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
601        }
602    }
603}
604
605impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
606    type Item = PbSubscriptionUpstreamInfo;
607
608    type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
609
610    fn into_iter(self) -> Self::IntoIter {
611        self.mv_depended_subscriptions
612            .iter()
613            .flat_map(|(table_id, subscriptions)| {
614                subscriptions
615                    .keys()
616                    .map(|subscriber_id| PbSubscriptionUpstreamInfo {
617                        subscriber_id: *subscriber_id,
618                        upstream_mv_table_id: table_id.table_id,
619                    })
620            })
621    }
622}
623
624impl InflightDatabaseInfo {
625    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
626    /// remove that from the snapshot correspondingly.
627    pub(crate) fn post_apply(
628        &mut self,
629        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
630    ) {
631        let inner = self.shared_actor_infos.clone();
632        let mut shared_actor_writer = inner.start_writer(self.database_id);
633        {
634            for (fragment_id, changes) in fragment_changes {
635                match changes {
636                    CommandFragmentChanges::NewFragment { .. } => {}
637                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
638                        let job_id = self.fragment_location[fragment_id];
639                        let info = self
640                            .jobs
641                            .get_mut(&job_id)
642                            .expect("should exist")
643                            .fragment_infos
644                            .get_mut(fragment_id)
645                            .expect("should exist");
646                        for actor_id in to_remove {
647                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
648                        }
649                        shared_actor_writer.upsert([&*info]);
650                    }
651                    CommandFragmentChanges::RemoveFragment => {
652                        let job_id = self
653                            .fragment_location
654                            .remove(fragment_id)
655                            .expect("should exist");
656                        let job = self.jobs.get_mut(&job_id).expect("should exist");
657                        let fragment = job
658                            .fragment_infos
659                            .remove(fragment_id)
660                            .expect("should exist");
661                        shared_actor_writer.remove(&fragment);
662                        if job.fragment_infos.is_empty() {
663                            self.jobs.remove(&job_id).expect("should exist");
664                        }
665                    }
666                    CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
667                }
668            }
669        }
670        shared_actor_writer.finish();
671    }
672}
673
674impl InflightSubscriptionInfo {
675    pub fn post_apply(&mut self, command: &Command) {
676        if let Command::DropSubscription {
677            subscription_id,
678            upstream_mv_table_id,
679        } = command
680        {
681            let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
682                Some(subscriptions) => {
683                    let removed = subscriptions.remove(subscription_id).is_some();
684                    if removed && subscriptions.is_empty() {
685                        self.mv_depended_subscriptions.remove(upstream_mv_table_id);
686                    }
687                    removed
688                }
689                None => false,
690            };
691            if !removed {
692                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
693            }
694        }
695    }
696}
697
698impl InflightFragmentInfo {
699    /// Returns actor list to collect in the target worker node.
700    pub(crate) fn actor_ids_to_collect(
701        infos: impl IntoIterator<Item = &Self>,
702    ) -> HashMap<WorkerId, HashSet<ActorId>> {
703        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
704        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
705            assert!(
706                ret.entry(actor.worker_id)
707                    .or_default()
708                    .insert(*actor_id as _)
709            )
710        }
711        ret
712    }
713
714    pub fn existing_table_ids<'a>(
715        infos: impl IntoIterator<Item = &'a Self> + 'a,
716    ) -> impl Iterator<Item = TableId> + 'a {
717        infos
718            .into_iter()
719            .flat_map(|info| info.state_table_ids.iter().cloned())
720    }
721
722    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
723        infos.into_iter().any(|fragment| {
724            fragment
725                .actors
726                .values()
727                .any(|actor| (actor.worker_id) == worker_id)
728        })
729    }
730
731    pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
732        infos
733            .into_iter()
734            .flat_map(|info| info.actors.values())
735            .map(|actor| actor.worker_id)
736            .collect()
737    }
738}
739
740impl InflightDatabaseInfo {
741    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
742        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
743    }
744
745    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
746        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
747    }
748}