1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
23use risingwave_meta_model::WorkerId;
24use risingwave_meta_model::fragment::DistributionType;
25use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
26use risingwave_pb::meta::subscribe_response::Operation;
27use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
28use risingwave_pb::stream_plan::stream_node::NodeBody;
29use tracing::warn;
30
31use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
32use crate::barrier::rpc::ControlStreamManager;
33use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
34use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
35use crate::controller::utils::rebuild_fragment_mapping;
36use crate::manager::NotificationManagerRef;
37use crate::model::{ActorId, FragmentId, SubscriptionId};
38
39#[derive(Debug, Clone)]
40pub struct SharedFragmentInfo {
41 pub fragment_id: FragmentId,
42 pub distribution_type: DistributionType,
43 pub actors: HashMap<ActorId, InflightActorInfo>,
44}
45
46impl From<&InflightFragmentInfo> for SharedFragmentInfo {
47 fn from(info: &InflightFragmentInfo) -> Self {
48 Self {
49 fragment_id: info.fragment_id,
50 distribution_type: info.distribution_type,
51 actors: info.actors.clone(),
52 }
53 }
54}
55
56type SharedActorInfosInner = HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>;
57
58#[derive(Clone, educe::Educe)]
59#[educe(Debug)]
60pub(crate) struct SharedActorInfos {
61 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
62 #[educe(Debug(ignore))]
63 notification_manager: NotificationManagerRef,
64}
65
66impl SharedActorInfos {
67 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
68 Self {
69 inner: Arc::new(Default::default()),
70 notification_manager,
71 }
72 }
73
74 pub(super) fn remove_database(&self, database_id: DatabaseId) {
75 if let Some(database) = self.inner.write().remove(&database_id) {
76 let mapping = database
77 .into_values()
78 .map(|fragment| rebuild_fragment_mapping(&fragment))
79 .collect_vec();
80 if !mapping.is_empty() {
81 self.notification_manager
82 .notify_fragment_mapping(Operation::Delete, mapping);
83 }
84 }
85 }
86
87 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
88 let database_ids: HashSet<_> = database_ids.into_iter().collect();
89
90 let mut mapping = Vec::new();
91 for fragment in self
92 .inner
93 .write()
94 .extract_if(|database_id, _| !database_ids.contains(database_id))
95 .flat_map(|(_, fragments)| fragments.into_values())
96 {
97 mapping.push(rebuild_fragment_mapping(&fragment));
98 }
99 if !mapping.is_empty() {
100 self.notification_manager
101 .notify_fragment_mapping(Operation::Delete, mapping);
102 }
103 }
104
105 pub(super) fn recover_database(
106 &self,
107 database_id: DatabaseId,
108 fragments: impl Iterator<Item = &InflightFragmentInfo>,
109 ) {
110 let mut remaining_fragments: HashMap<_, _> = fragments
111 .map(|fragment| (fragment.fragment_id, fragment))
112 .collect();
113 let mut writer = self.start_writer(database_id);
115 let database = writer.write_guard.entry(database_id).or_default();
116 for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
117 if let Some(info) = remaining_fragments.remove(fragment_id) {
118 let info = info.into();
119 writer
120 .updated_fragment_mapping
121 .get_or_insert_default()
122 .push(rebuild_fragment_mapping(&info));
123 *fragment_info = info;
124 false
125 } else {
126 true
127 }
128 }) {
129 writer
130 .deleted_fragment_mapping
131 .get_or_insert_default()
132 .push(rebuild_fragment_mapping(&fragment));
133 }
134 for (fragment_id, fragment) in remaining_fragments {
135 let info = fragment.into();
136 writer
137 .added_fragment_mapping
138 .get_or_insert_default()
139 .push(rebuild_fragment_mapping(&info));
140 database.insert(fragment_id, info);
141 }
142 writer.finish();
143 }
144
145 pub(super) fn upsert(
146 &self,
147 database_id: DatabaseId,
148 infos: impl IntoIterator<Item = &InflightFragmentInfo>,
149 ) {
150 let mut writer = self.start_writer(database_id);
151 writer.upsert(infos);
152 writer.finish();
153 }
154
155 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
156 SharedActorInfoWriter {
157 database_id,
158 write_guard: self.inner.write(),
159 notification_manager: &self.notification_manager,
160 added_fragment_mapping: None,
161 updated_fragment_mapping: None,
162 deleted_fragment_mapping: None,
163 }
164 }
165}
166
167pub(super) struct SharedActorInfoWriter<'a> {
168 database_id: DatabaseId,
169 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
170 notification_manager: &'a NotificationManagerRef,
171 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
172 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
173 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
174}
175
176impl SharedActorInfoWriter<'_> {
177 pub(super) fn upsert(&mut self, infos: impl IntoIterator<Item = &InflightFragmentInfo>) {
178 let database = self.write_guard.entry(self.database_id).or_default();
179 for info in infos {
180 match database.entry(info.fragment_id) {
181 Entry::Occupied(mut entry) => {
182 let info = info.into();
183 self.updated_fragment_mapping
184 .get_or_insert_default()
185 .push(rebuild_fragment_mapping(&info));
186 entry.insert(info);
187 }
188 Entry::Vacant(entry) => {
189 let info = info.into();
190 self.added_fragment_mapping
191 .get_or_insert_default()
192 .push(rebuild_fragment_mapping(&info));
193 entry.insert(info);
194 }
195 }
196 }
197 }
198
199 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
200 if let Some(database) = self.write_guard.get_mut(&self.database_id)
201 && let Some(fragment) = database.remove(&info.fragment_id)
202 {
203 self.deleted_fragment_mapping
204 .get_or_insert_default()
205 .push(rebuild_fragment_mapping(&fragment));
206 }
207 }
208
209 pub(super) fn finish(self) {
210 if let Some(mapping) = self.added_fragment_mapping {
211 self.notification_manager
212 .notify_fragment_mapping(Operation::Add, mapping);
213 }
214 if let Some(mapping) = self.updated_fragment_mapping {
215 self.notification_manager
216 .notify_fragment_mapping(Operation::Update, mapping);
217 }
218 if let Some(mapping) = self.deleted_fragment_mapping {
219 self.notification_manager
220 .notify_fragment_mapping(Operation::Delete, mapping);
221 }
222 }
223}
224
225#[derive(Debug, Clone)]
226pub(super) struct BarrierInfo {
227 pub prev_epoch: TracedEpoch,
228 pub curr_epoch: TracedEpoch,
229 pub kind: BarrierKind,
230}
231
232impl BarrierInfo {
233 pub(super) fn prev_epoch(&self) -> u64 {
234 self.prev_epoch.value().0
235 }
236}
237
238#[derive(Debug, Clone)]
239pub(crate) enum CommandFragmentChanges {
240 NewFragment {
241 job_id: TableId,
242 info: InflightFragmentInfo,
243 is_existing: bool,
247 },
248 ReplaceNodeUpstream(
249 HashMap<FragmentId, FragmentId>,
251 ),
252 Reschedule {
253 new_actors: HashMap<ActorId, InflightActorInfo>,
254 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
255 to_remove: HashSet<ActorId>,
256 },
257 RemoveFragment,
258}
259
260#[derive(Default, Clone, Debug)]
261pub struct InflightSubscriptionInfo {
262 pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
264}
265
266#[derive(Clone, Debug)]
267pub struct InflightStreamingJobInfo {
268 pub job_id: TableId,
269 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
270}
271
272impl InflightStreamingJobInfo {
273 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
274 self.fragment_infos.values()
275 }
276
277 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
278 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
279 }
280}
281
282impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
283 type Item = &'a InflightFragmentInfo;
284
285 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
286
287 fn into_iter(self) -> Self::IntoIter {
288 self.fragment_infos()
289 }
290}
291
292#[derive(Clone, Debug)]
293pub struct InflightDatabaseInfo {
294 database_id: DatabaseId,
295 jobs: HashMap<TableId, InflightStreamingJobInfo>,
296 fragment_location: HashMap<FragmentId, TableId>,
297 pub(super) shared_actor_infos: SharedActorInfos,
298}
299
300impl InflightDatabaseInfo {
301 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
302 self.jobs.values().flat_map(|job| job.fragment_infos())
303 }
304
305 pub fn contains_job(&self, job_id: TableId) -> bool {
306 self.jobs.contains_key(&job_id)
307 }
308
309 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
310 let job_id = self.fragment_location[&fragment_id];
311 self.jobs
312 .get(&job_id)
313 .expect("should exist")
314 .fragment_infos
315 .get(&fragment_id)
316 .expect("should exist")
317 }
318
319 fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
320 let job_id = self.fragment_location[&fragment_id];
321 self.jobs
322 .get_mut(&job_id)
323 .expect("should exist")
324 .fragment_infos
325 .get_mut(&fragment_id)
326 .expect("should exist")
327 }
328
329 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
330 Self {
331 database_id,
332 jobs: Default::default(),
333 fragment_location: Default::default(),
334 shared_actor_infos,
335 }
336 }
337
338 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
339 shared_actor_infos.remove_database(database_id);
341 Self::empty_inner(database_id, shared_actor_infos)
342 }
343
344 pub fn recover(
345 database_id: DatabaseId,
346 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
347 shared_actor_infos: SharedActorInfos,
348 ) -> Self {
349 let mut info = Self::empty_inner(database_id, shared_actor_infos);
350 for job in jobs {
351 info.add_existing(job);
352 }
353 info
354 }
355
356 pub fn is_empty(&self) -> bool {
357 self.jobs.is_empty()
358 }
359
360 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
361 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
362 (
363 fragment_id,
364 CommandFragmentChanges::NewFragment {
365 job_id: job.job_id,
366 info,
367 is_existing: true,
368 },
369 )
370 }))
371 }
372
373 pub(crate) fn pre_apply(
376 &mut self,
377 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
378 ) {
379 self.apply_add(
380 fragment_changes
381 .iter()
382 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
383 )
384 }
385
386 fn apply_add(
387 &mut self,
388 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
389 ) {
390 {
391 let shared_infos = self.shared_actor_infos.clone();
392 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
393 for (fragment_id, change) in fragment_changes {
394 match change {
395 CommandFragmentChanges::NewFragment {
396 job_id,
397 info,
398 is_existing,
399 } => {
400 let fragment_infos =
401 self.jobs
402 .entry(job_id)
403 .or_insert_with(|| InflightStreamingJobInfo {
404 job_id,
405 fragment_infos: Default::default(),
406 });
407 if !is_existing {
408 shared_actor_writer.upsert([&info]);
409 }
410 fragment_infos
411 .fragment_infos
412 .try_insert(fragment_id, info)
413 .expect("non duplicate");
414 self.fragment_location
415 .try_insert(fragment_id, job_id)
416 .expect("non duplicate");
417 }
418 CommandFragmentChanges::Reschedule {
419 new_actors,
420 actor_update_vnode_bitmap,
421 ..
422 } => {
423 let info = self.fragment_mut(fragment_id);
424 let actors = &mut info.actors;
425 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
426 actors
427 .get_mut(&actor_id)
428 .expect("should exist")
429 .vnode_bitmap = Some(new_vnodes);
430 }
431 for (actor_id, actor) in new_actors {
432 actors
433 .try_insert(actor_id as _, actor)
434 .expect("non-duplicate");
435 }
436 }
437 CommandFragmentChanges::RemoveFragment => {}
438 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
439 let mut remaining_fragment_ids: HashSet<_> =
440 replace_map.keys().cloned().collect();
441 let info = self.fragment_mut(fragment_id);
442 visit_stream_node_mut(&mut info.nodes, |node| {
443 if let NodeBody::Merge(m) = node
444 && let Some(new_upstream_fragment_id) =
445 replace_map.get(&m.upstream_fragment_id)
446 {
447 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
448 if cfg!(debug_assertions) {
449 panic!(
450 "duplicate upstream fragment: {:?} {:?}",
451 m, replace_map
452 );
453 } else {
454 warn!(?m, ?replace_map, "duplicate upstream fragment");
455 }
456 }
457 m.upstream_fragment_id = *new_upstream_fragment_id;
458 }
459 });
460 if cfg!(debug_assertions) {
461 assert!(
462 remaining_fragment_ids.is_empty(),
463 "non-existing fragment to replace: {:?} {:?} {:?}",
464 remaining_fragment_ids,
465 info.nodes,
466 replace_map
467 );
468 } else {
469 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
470 }
471 }
472 }
473 }
474 shared_actor_writer.finish();
475 }
476 }
477
478 pub(super) fn build_edge(
479 &self,
480 command: Option<&Command>,
481 control_stream_manager: &ControlStreamManager,
482 ) -> Option<FragmentEdgeBuildResult> {
483 let (info, replace_job) = match command {
484 None => {
485 return None;
486 }
487 Some(command) => match command {
488 Command::Flush
489 | Command::Pause
490 | Command::Resume
491 | Command::DropStreamingJobs { .. }
492 | Command::MergeSnapshotBackfillStreamingJobs(_)
493 | Command::RescheduleFragment { .. }
494 | Command::SourceChangeSplit(_)
495 | Command::Throttle(_)
496 | Command::CreateSubscription { .. }
497 | Command::DropSubscription { .. }
498 | Command::ConnectorPropsChange(_)
499 | Command::StartFragmentBackfill { .. }
500 | Command::Refresh { .. }
501 | Command::LoadFinish { .. } => {
502 return None;
503 }
504 Command::CreateStreamingJob { info, job_type, .. } => {
505 let replace_job = match job_type {
506 CreateStreamingJobType::Normal
507 | CreateStreamingJobType::SnapshotBackfill(_) => None,
508 CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
509 };
510 (Some(info), replace_job)
511 }
512 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
513 },
514 };
515 let existing_fragment_ids = info
521 .into_iter()
522 .flat_map(|info| info.upstream_fragment_downstreams.keys())
523 .chain(replace_job.into_iter().flat_map(|replace_job| {
524 replace_job
525 .upstream_fragment_downstreams
526 .keys()
527 .filter(|fragment_id| {
528 info.map(|info| {
529 !info
530 .stream_job_fragments
531 .fragments
532 .contains_key(fragment_id)
533 })
534 .unwrap_or(true)
535 })
536 .chain(replace_job.replace_upstream.keys())
537 }))
538 .cloned();
539 let new_fragment_infos = info
540 .into_iter()
541 .flat_map(|info| info.stream_job_fragments.new_fragment_info())
542 .chain(replace_job.into_iter().flat_map(|replace_job| {
543 replace_job.new_fragments.new_fragment_info().chain(
544 replace_job
545 .auto_refresh_schema_sinks
546 .as_ref()
547 .into_iter()
548 .flat_map(|sinks| {
549 sinks.iter().map(|sink| {
550 (sink.new_fragment.fragment_id, sink.new_fragment_info())
551 })
552 }),
553 )
554 }))
555 .collect_vec();
556 let mut builder = FragmentEdgeBuilder::new(
557 existing_fragment_ids
558 .map(|fragment_id| self.fragment(fragment_id))
559 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
560 control_stream_manager,
561 );
562 if let Some(info) = info {
563 builder.add_relations(&info.upstream_fragment_downstreams);
564 builder.add_relations(&info.stream_job_fragments.downstreams);
565 }
566 if let Some(replace_job) = replace_job {
567 builder.add_relations(&replace_job.upstream_fragment_downstreams);
568 builder.add_relations(&replace_job.new_fragments.downstreams);
569 }
570 if let Some(replace_job) = replace_job {
571 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
572 for (original_upstream_fragment_id, new_upstream_fragment_id) in
573 fragment_replacement
574 {
575 builder.replace_upstream(
576 *fragment_id,
577 *original_upstream_fragment_id,
578 *new_upstream_fragment_id,
579 );
580 }
581 }
582 }
583 Some(builder.build())
584 }
585}
586
587impl InflightSubscriptionInfo {
588 pub fn pre_apply(&mut self, command: &Command) {
589 if let Command::CreateSubscription {
590 subscription_id,
591 upstream_mv_table_id,
592 retention_second,
593 } = command
594 && let Some(prev_retiontion) = self
595 .mv_depended_subscriptions
596 .entry(*upstream_mv_table_id)
597 .or_default()
598 .insert(*subscription_id, *retention_second)
599 {
600 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
601 }
602 }
603}
604
605impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
606 type Item = PbSubscriptionUpstreamInfo;
607
608 type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
609
610 fn into_iter(self) -> Self::IntoIter {
611 self.mv_depended_subscriptions
612 .iter()
613 .flat_map(|(table_id, subscriptions)| {
614 subscriptions
615 .keys()
616 .map(|subscriber_id| PbSubscriptionUpstreamInfo {
617 subscriber_id: *subscriber_id,
618 upstream_mv_table_id: table_id.table_id,
619 })
620 })
621 }
622}
623
624impl InflightDatabaseInfo {
625 pub(crate) fn post_apply(
628 &mut self,
629 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
630 ) {
631 let inner = self.shared_actor_infos.clone();
632 let mut shared_actor_writer = inner.start_writer(self.database_id);
633 {
634 for (fragment_id, changes) in fragment_changes {
635 match changes {
636 CommandFragmentChanges::NewFragment { .. } => {}
637 CommandFragmentChanges::Reschedule { to_remove, .. } => {
638 let job_id = self.fragment_location[fragment_id];
639 let info = self
640 .jobs
641 .get_mut(&job_id)
642 .expect("should exist")
643 .fragment_infos
644 .get_mut(fragment_id)
645 .expect("should exist");
646 for actor_id in to_remove {
647 assert!(info.actors.remove(&(*actor_id as _)).is_some());
648 }
649 shared_actor_writer.upsert([&*info]);
650 }
651 CommandFragmentChanges::RemoveFragment => {
652 let job_id = self
653 .fragment_location
654 .remove(fragment_id)
655 .expect("should exist");
656 let job = self.jobs.get_mut(&job_id).expect("should exist");
657 let fragment = job
658 .fragment_infos
659 .remove(fragment_id)
660 .expect("should exist");
661 shared_actor_writer.remove(&fragment);
662 if job.fragment_infos.is_empty() {
663 self.jobs.remove(&job_id).expect("should exist");
664 }
665 }
666 CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
667 }
668 }
669 }
670 shared_actor_writer.finish();
671 }
672}
673
674impl InflightSubscriptionInfo {
675 pub fn post_apply(&mut self, command: &Command) {
676 if let Command::DropSubscription {
677 subscription_id,
678 upstream_mv_table_id,
679 } = command
680 {
681 let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
682 Some(subscriptions) => {
683 let removed = subscriptions.remove(subscription_id).is_some();
684 if removed && subscriptions.is_empty() {
685 self.mv_depended_subscriptions.remove(upstream_mv_table_id);
686 }
687 removed
688 }
689 None => false,
690 };
691 if !removed {
692 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
693 }
694 }
695 }
696}
697
698impl InflightFragmentInfo {
699 pub(crate) fn actor_ids_to_collect(
701 infos: impl IntoIterator<Item = &Self>,
702 ) -> HashMap<WorkerId, HashSet<ActorId>> {
703 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
704 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
705 assert!(
706 ret.entry(actor.worker_id)
707 .or_default()
708 .insert(*actor_id as _)
709 )
710 }
711 ret
712 }
713
714 pub fn existing_table_ids<'a>(
715 infos: impl IntoIterator<Item = &'a Self> + 'a,
716 ) -> impl Iterator<Item = TableId> + 'a {
717 infos
718 .into_iter()
719 .flat_map(|info| info.state_table_ids.iter().cloned())
720 }
721
722 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
723 infos.into_iter().any(|fragment| {
724 fragment
725 .actors
726 .values()
727 .any(|actor| (actor.worker_id) == worker_id)
728 })
729 }
730
731 pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
732 infos
733 .into_iter()
734 .flat_map(|info| info.actors.values())
735 .map(|actor| actor.worker_id)
736 .collect()
737 }
738}
739
740impl InflightDatabaseInfo {
741 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
742 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
743 }
744
745 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
746 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
747 }
748}