1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::ddl_service::PbBackfillType;
33use risingwave_pb::hummock::HummockVersionStats;
34use risingwave_pb::id::SubscriberId;
35use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
36use risingwave_pb::meta::subscribe_response::Operation;
37use risingwave_pb::source::PbCdcTableSnapshotSplits;
38use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
41use risingwave_pb::stream_service::BarrierCompleteResponse;
42use tracing::{info, warn};
43
44use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
45use crate::barrier::command::{
46 CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
47};
48use crate::barrier::edge_builder::{
49 EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
50};
51use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
52use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
53use crate::barrier::{
54 BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
55};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::utils::rebuild_fragment_mapping;
58use crate::manager::NotificationManagerRef;
59use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
60use crate::stream::UpstreamSinkInfo;
61use crate::{MetaError, MetaResult};
62
63#[derive(Debug, Clone)]
64pub struct SharedActorInfo {
65 pub worker_id: WorkerId,
66 pub vnode_bitmap: Option<Bitmap>,
67 pub splits: Vec<SplitImpl>,
68}
69
70impl From<&InflightActorInfo> for SharedActorInfo {
71 fn from(value: &InflightActorInfo) -> Self {
72 Self {
73 worker_id: value.worker_id,
74 vnode_bitmap: value.vnode_bitmap.clone(),
75 splits: value.splits.clone(),
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct SharedFragmentInfo {
82 pub fragment_id: FragmentId,
83 pub job_id: JobId,
84 pub distribution_type: DistributionType,
85 pub actors: HashMap<ActorId, SharedActorInfo>,
86 pub vnode_count: usize,
87 pub fragment_type_mask: FragmentTypeMask,
88}
89
90impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
91 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
92 let (info, job_id) = pair;
93
94 let InflightFragmentInfo {
95 fragment_id,
96 distribution_type,
97 fragment_type_mask,
98 actors,
99 vnode_count,
100 ..
101 } = info;
102
103 Self {
104 fragment_id: *fragment_id,
105 job_id,
106 distribution_type: *distribution_type,
107 fragment_type_mask: *fragment_type_mask,
108 actors: actors
109 .iter()
110 .map(|(actor_id, actor)| (*actor_id, actor.into()))
111 .collect(),
112 vnode_count: *vnode_count,
113 }
114 }
115}
116
117#[derive(Default, Debug)]
118pub struct SharedActorInfosInner {
119 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
120}
121
122impl SharedActorInfosInner {
123 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
124 self.info
125 .values()
126 .find_map(|database| database.get(&fragment_id))
127 }
128
129 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
130 self.info.values().flatten()
131 }
132}
133
134#[derive(Clone, educe::Educe)]
135#[educe(Debug)]
136pub struct SharedActorInfos {
137 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
138 #[educe(Debug(ignore))]
139 notification_manager: NotificationManagerRef,
140}
141
142impl SharedActorInfos {
143 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
144 self.inner.read()
145 }
146
147 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
148 let core = self.inner.read();
149 core.iter_over_fragments()
150 .flat_map(|(_, fragment)| {
151 fragment
152 .actors
153 .iter()
154 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
155 })
156 .collect()
157 }
158
159 pub fn migrate_splits_for_source_actors(
165 &self,
166 fragment_id: FragmentId,
167 prev_actor_ids: &[ActorId],
168 curr_actor_ids: &[ActorId],
169 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
170 let guard = self.read_guard();
171
172 let prev_splits = prev_actor_ids
173 .iter()
174 .flat_map(|actor_id| {
175 guard
177 .get_fragment(fragment_id)
178 .and_then(|info| info.actors.get(actor_id))
179 .map(|actor| actor.splits.clone())
180 .unwrap_or_default()
181 })
182 .map(|split| (split.id(), split))
183 .collect();
184
185 let empty_actor_splits = curr_actor_ids
186 .iter()
187 .map(|actor_id| (*actor_id, vec![]))
188 .collect();
189
190 let diff = crate::stream::source_manager::reassign_splits(
191 fragment_id,
192 empty_actor_splits,
193 &prev_splits,
194 std::default::Default::default(),
196 )
197 .unwrap_or_default();
198
199 Ok(diff)
200 }
201}
202
203impl SharedActorInfos {
204 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
205 Self {
206 inner: Arc::new(Default::default()),
207 notification_manager,
208 }
209 }
210
211 pub(super) fn remove_database(&self, database_id: DatabaseId) {
212 if let Some(database) = self.inner.write().info.remove(&database_id) {
213 let mapping = database
214 .into_values()
215 .map(|fragment| rebuild_fragment_mapping(&fragment))
216 .collect_vec();
217 if !mapping.is_empty() {
218 self.notification_manager
219 .notify_streaming_fragment_mapping(Operation::Delete, mapping);
220 }
221 }
222 }
223
224 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
225 let database_ids: HashSet<_> = database_ids.into_iter().collect();
226
227 let mut mapping = Vec::new();
228 for fragment in self
229 .inner
230 .write()
231 .info
232 .extract_if(|database_id, _| !database_ids.contains(database_id))
233 .flat_map(|(_, fragments)| fragments.into_values())
234 {
235 mapping.push(rebuild_fragment_mapping(&fragment));
236 }
237 if !mapping.is_empty() {
238 self.notification_manager
239 .notify_streaming_fragment_mapping(Operation::Delete, mapping);
240 }
241 }
242
243 pub(super) fn recover_database(
244 &self,
245 database_id: DatabaseId,
246 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
247 ) {
248 let mut remaining_fragments: HashMap<_, _> = fragments
249 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
250 .collect();
251 let mut writer = self.start_writer(database_id);
253 let database = writer.write_guard.info.entry(database_id).or_default();
254 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
255 if let Some(info) = remaining_fragments.remove(fragment_id) {
256 let info = info.into();
257 writer
258 .updated_fragment_mapping
259 .get_or_insert_default()
260 .push(rebuild_fragment_mapping(&info));
261 *fragment_infos = info;
262 false
263 } else {
264 true
265 }
266 }) {
267 writer
268 .deleted_fragment_mapping
269 .get_or_insert_default()
270 .push(rebuild_fragment_mapping(&fragment));
271 }
272 for (fragment_id, info) in remaining_fragments {
273 let info = info.into();
274 writer
275 .added_fragment_mapping
276 .get_or_insert_default()
277 .push(rebuild_fragment_mapping(&info));
278 database.insert(fragment_id, info);
279 }
280 writer.finish();
281 }
282
283 pub(super) fn upsert(
284 &self,
285 database_id: DatabaseId,
286 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
287 ) {
288 let mut writer = self.start_writer(database_id);
289 writer.upsert(infos);
290 writer.finish();
291 }
292
293 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
294 SharedActorInfoWriter {
295 database_id,
296 write_guard: self.inner.write(),
297 notification_manager: &self.notification_manager,
298 added_fragment_mapping: None,
299 updated_fragment_mapping: None,
300 deleted_fragment_mapping: None,
301 }
302 }
303}
304
305pub(super) struct SharedActorInfoWriter<'a> {
306 database_id: DatabaseId,
307 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
308 notification_manager: &'a NotificationManagerRef,
309 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
311 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
312}
313
314impl SharedActorInfoWriter<'_> {
315 pub(super) fn upsert(
316 &mut self,
317 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
318 ) {
319 let database = self.write_guard.info.entry(self.database_id).or_default();
320 for info @ (fragment, _) in infos {
321 match database.entry(fragment.fragment_id) {
322 Entry::Occupied(mut entry) => {
323 let info = info.into();
324 self.updated_fragment_mapping
325 .get_or_insert_default()
326 .push(rebuild_fragment_mapping(&info));
327 entry.insert(info);
328 }
329 Entry::Vacant(entry) => {
330 let info = info.into();
331 self.added_fragment_mapping
332 .get_or_insert_default()
333 .push(rebuild_fragment_mapping(&info));
334 entry.insert(info);
335 }
336 }
337 }
338 }
339
340 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
341 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
342 && let Some(fragment) = database.remove(&info.fragment_id)
343 {
344 self.deleted_fragment_mapping
345 .get_or_insert_default()
346 .push(rebuild_fragment_mapping(&fragment));
347 }
348 }
349
350 pub(super) fn finish(self) {
351 if let Some(mapping) = self.added_fragment_mapping {
352 self.notification_manager
353 .notify_streaming_fragment_mapping(Operation::Add, mapping);
354 }
355 if let Some(mapping) = self.updated_fragment_mapping {
356 self.notification_manager
357 .notify_streaming_fragment_mapping(Operation::Update, mapping);
358 }
359 if let Some(mapping) = self.deleted_fragment_mapping {
360 self.notification_manager
361 .notify_streaming_fragment_mapping(Operation::Delete, mapping);
362 }
363 }
364}
365
366#[derive(Debug, Clone)]
367pub(super) struct BarrierInfo {
368 pub prev_epoch: TracedEpoch,
369 pub curr_epoch: TracedEpoch,
370 pub kind: BarrierKind,
371}
372
373impl BarrierInfo {
374 pub(super) fn prev_epoch(&self) -> u64 {
375 self.prev_epoch.value().0
376 }
377
378 pub(super) fn curr_epoch(&self) -> u64 {
379 self.curr_epoch.value().0
380 }
381
382 pub(super) fn epoch(&self) -> EpochPair {
383 EpochPair {
384 curr: self.curr_epoch(),
385 prev: self.prev_epoch(),
386 }
387 }
388}
389
390#[derive(Clone, Debug)]
391pub enum SubscriberType {
392 Subscription(u64),
393 SnapshotBackfill,
394}
395
396#[derive(Debug)]
397pub(super) enum CreateStreamingJobStatus {
398 Init,
399 Creating { tracker: CreateMviewProgressTracker },
400 Created,
401}
402
403#[derive(Debug)]
404pub(super) struct InflightStreamingJobInfo {
405 pub job_id: JobId,
406 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
407 pub subscribers: HashMap<SubscriberId, SubscriberType>,
408 pub status: CreateStreamingJobStatus,
409 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
410}
411
412impl InflightStreamingJobInfo {
413 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
414 self.fragment_infos.values()
415 }
416
417 pub fn snapshot_backfill_actor_ids(
418 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
419 ) -> impl Iterator<Item = ActorId> + '_ {
420 fragment_infos
421 .values()
422 .filter(|fragment| {
423 fragment
424 .fragment_type_mask
425 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
426 })
427 .flat_map(|fragment| fragment.actors.keys().copied())
428 }
429
430 pub fn tracking_progress_actor_ids(
431 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
432 ) -> Vec<(ActorId, BackfillUpstreamType)> {
433 StreamJobFragments::tracking_progress_actor_ids_impl(
434 fragment_infos
435 .values()
436 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
437 )
438 }
439}
440
441impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
442 type Item = &'a InflightFragmentInfo;
443
444 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
445
446 fn into_iter(self) -> Self::IntoIter {
447 self.fragment_infos()
448 }
449}
450
451#[derive(Debug)]
452pub struct InflightDatabaseInfo {
453 pub(super) database_id: DatabaseId,
454 jobs: HashMap<JobId, InflightStreamingJobInfo>,
455 fragment_location: HashMap<FragmentId, JobId>,
456 pub(super) shared_actor_infos: SharedActorInfos,
457}
458
459impl InflightDatabaseInfo {
460 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
461 self.jobs.values().flat_map(|job| job.fragment_infos())
462 }
463
464 pub fn contains_job(&self, job_id: JobId) -> bool {
465 self.jobs.contains_key(&job_id)
466 }
467
468 pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
469 self.fragment_location.get(&fragment_id).copied()
470 }
471
472 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
473 let job_id = self.fragment_location[&fragment_id];
474 self.jobs
475 .get(&job_id)
476 .expect("should exist")
477 .fragment_infos
478 .get(&fragment_id)
479 .expect("should exist")
480 }
481
482 pub(super) fn backfill_fragment_ids_for_job(
483 &self,
484 job_id: JobId,
485 ) -> MetaResult<HashSet<FragmentId>> {
486 let job = self
487 .jobs
488 .get(&job_id)
489 .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
490 Ok(job
491 .fragment_infos
492 .iter()
493 .filter_map(|(fragment_id, fragment)| {
494 fragment
495 .fragment_type_mask
496 .contains_any([
497 FragmentTypeFlag::StreamScan,
498 FragmentTypeFlag::SourceScan,
499 FragmentTypeFlag::LocalityProvider,
500 ])
501 .then_some(*fragment_id)
502 })
503 .collect())
504 }
505
506 pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
507 let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
508 MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
509 })?;
510 let fragment = self
511 .jobs
512 .get(job_id)
513 .expect("should exist")
514 .fragment_infos
515 .get(&fragment_id)
516 .expect("should exist");
517 Ok(fragment.fragment_type_mask.contains_any([
518 FragmentTypeFlag::StreamScan,
519 FragmentTypeFlag::SourceScan,
520 FragmentTypeFlag::LocalityProvider,
521 ]))
522 }
523
524 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
525 self.jobs
526 .iter()
527 .filter_map(|(job_id, job)| match &job.status {
528 CreateStreamingJobStatus::Init => None,
529 CreateStreamingJobStatus::Creating { tracker } => {
530 let progress = tracker.gen_backfill_progress();
531 Some((
532 *job_id,
533 BackfillProgress {
534 progress,
535 backfill_type: PbBackfillType::NormalBackfill,
536 },
537 ))
538 }
539 CreateStreamingJobStatus::Created => None,
540 })
541 }
542
543 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
544 self.jobs.iter().filter_map(|(job_id, job)| {
545 job.cdc_table_backfill_tracker
546 .as_ref()
547 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
548 })
549 }
550
551 pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
552 let mut result = Vec::new();
553 for job in self.jobs.values() {
554 let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
555 continue;
556 };
557 let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
558 result.extend(fragment_progress);
559 }
560 result
561 }
562
563 pub(super) fn may_assign_fragment_cdc_backfill_splits(
564 &mut self,
565 fragment_id: FragmentId,
566 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
567 let job_id = self.fragment_location[&fragment_id];
568 let job = self.jobs.get_mut(&job_id).expect("should exist");
569 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
570 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
571 if cdc_scan_fragment_id != fragment_id {
572 return Ok(None);
573 }
574 let actors = job.fragment_infos[&cdc_scan_fragment_id]
575 .actors
576 .keys()
577 .copied()
578 .collect();
579 tracker.reassign_splits(actors).map(Some)
580 } else {
581 Ok(None)
582 }
583 }
584
585 pub(super) fn assign_cdc_backfill_splits(
586 &mut self,
587 job_id: JobId,
588 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
589 let job = self.jobs.get_mut(&job_id).expect("should exist");
590 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
591 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
592 let actors = job.fragment_infos[&cdc_scan_fragment_id]
593 .actors
594 .keys()
595 .copied()
596 .collect();
597 tracker.reassign_splits(actors).map(Some)
598 } else {
599 Ok(None)
600 }
601 }
602
603 pub(super) fn apply_collected_command(
604 &mut self,
605 command: &PostCollectCommand,
606 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
607 version_stats: &HummockVersionStats,
608 ) {
609 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
610 match job_type {
611 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
612 let job_id = info.streaming_job.id();
613 if let Some(job_info) = self.jobs.get_mut(&job_id) {
614 let CreateStreamingJobStatus::Init = replace(
615 &mut job_info.status,
616 CreateStreamingJobStatus::Creating {
617 tracker: CreateMviewProgressTracker::new(
618 info,
619 version_stats,
620 &job_info.fragment_infos,
621 ),
622 },
623 ) else {
624 unreachable!("should be init before collect the first barrier")
625 };
626 } else {
627 info!(%job_id, "newly create job get cancelled before first barrier is collected")
628 }
629 }
630 CreateStreamingJobType::SnapshotBackfill(_)
631 | CreateStreamingJobType::BatchRefresh(_) => {
632 }
634 }
635 }
636 if let PostCollectCommand::Reschedule { reschedules, .. } = command {
637 debug_assert!(
639 reschedules
640 .values()
641 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
642 "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
643 );
644
645 let related_job_ids = reschedules
647 .keys()
648 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
649 .cloned()
650 .collect::<HashSet<_>>();
651 for job_id in related_job_ids {
652 if let Some(job) = self.jobs.get_mut(&job_id)
653 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
654 {
655 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
656 }
657 }
658 }
659 for progress in resps.values().flat_map(|resp| &resp.create_mview_progress) {
660 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
661 warn!(
662 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
663 );
664 continue;
665 };
666 let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
667 CreateStreamingJobStatus::Init => {
668 continue;
669 }
670 CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
671 CreateStreamingJobStatus::Created => {
672 if !progress.done {
673 warn!("update the progress of an created streaming job: {progress:?}");
674 }
675 continue;
676 }
677 };
678 tracker.apply_progress(progress, version_stats);
679 }
680 for progress in resps
681 .values()
682 .flat_map(|resp| &resp.cdc_table_backfill_progress)
683 {
684 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
685 warn!(
686 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
687 );
688 continue;
689 };
690 let Some(tracker) = &mut self
691 .jobs
692 .get_mut(job_id)
693 .expect("should exist")
694 .cdc_table_backfill_tracker
695 else {
696 warn!("update the cdc progress of an created streaming job: {progress:?}");
697 continue;
698 };
699 tracker.update_split_progress(progress);
700 }
701 for cdc_offset_updated in resps
703 .values()
704 .flat_map(|resp| &resp.cdc_source_offset_updated)
705 {
706 let source_id = cdc_offset_updated.source_id;
707 let job_id = source_id.as_share_source_job_id();
708 if let Some(job) = self.jobs.get_mut(&job_id) {
709 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
710 tracker.mark_cdc_source_finished();
711 }
712 } else {
713 warn!(
714 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
715 cdc_offset_updated.source_id, job_id
716 );
717 }
718 }
719 }
720
721 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
722 self.jobs.values().filter_map(|job| match &job.status {
723 CreateStreamingJobStatus::Init => None,
724 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
725 CreateStreamingJobStatus::Created => None,
726 })
727 }
728
729 fn iter_mut_creating_job_tracker(
730 &mut self,
731 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
732 self.jobs
733 .values_mut()
734 .filter_map(|job| match &mut job.status {
735 CreateStreamingJobStatus::Init => None,
736 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
737 CreateStreamingJobStatus::Created => None,
738 })
739 }
740
741 pub(super) fn has_pending_finished_jobs(&self) -> bool {
742 self.iter_creating_job_tracker()
743 .any(|tracker| tracker.is_finished())
744 }
745
746 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
747 self.iter_mut_creating_job_tracker()
748 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
749 .collect()
750 }
751
752 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
753 let mut finished_jobs = vec![];
754 let mut table_ids_to_truncate = vec![];
755 let mut finished_cdc_table_backfill = vec![];
756 for (job_id, job) in &mut self.jobs {
757 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
758 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
759 table_ids_to_truncate.extend(truncate_table_ids);
760 if is_finished {
761 let CreateStreamingJobStatus::Creating { tracker, .. } =
762 replace(&mut job.status, CreateStreamingJobStatus::Created)
763 else {
764 unreachable!()
765 };
766 finished_jobs.push(tracker.into_tracking_job());
767 }
768 }
769 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
770 && tracker.take_pre_completed()
771 {
772 finished_cdc_table_backfill.push(*job_id);
773 }
774 }
775 StagingCommitInfo {
776 finished_jobs,
777 table_ids_to_truncate,
778 finished_cdc_table_backfill,
779 }
780 }
781
782 pub fn fragment_subscribers(
783 &self,
784 fragment_id: FragmentId,
785 ) -> impl Iterator<Item = SubscriberId> + '_ {
786 let job_id = self.fragment_location[&fragment_id];
787 self.jobs[&job_id].subscribers.keys().copied()
788 }
789
790 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
791 self.jobs[&job_id].subscribers.keys().copied()
792 }
793
794 pub fn max_subscription_retention(&self) -> impl Iterator<Item = (TableId, u64)> + '_ {
795 self.jobs.iter().filter_map(|(job_id, info)| {
796 info.subscribers
797 .values()
798 .filter_map(|subscriber| match subscriber {
799 SubscriberType::Subscription(retention) => Some(*retention),
800 SubscriberType::SnapshotBackfill => None,
801 })
802 .max()
803 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
804 })
805 }
806
807 pub fn register_subscriber(
808 &mut self,
809 job_id: JobId,
810 subscriber_id: SubscriberId,
811 subscriber: SubscriberType,
812 ) {
813 self.jobs
814 .get_mut(&job_id)
815 .expect("should exist")
816 .subscribers
817 .try_insert(subscriber_id, subscriber)
818 .expect("non duplicate");
819 }
820
821 pub fn unregister_subscriber(
822 &mut self,
823 job_id: JobId,
824 subscriber_id: SubscriberId,
825 ) -> Option<SubscriberType> {
826 self.jobs
827 .get_mut(&job_id)
828 .expect("should exist")
829 .subscribers
830 .remove(&subscriber_id)
831 }
832
833 pub fn update_subscription_retention(
834 &mut self,
835 job_id: JobId,
836 subscriber_id: SubscriberId,
837 retention_second: u64,
838 ) {
839 let job = self.jobs.get_mut(&job_id).expect("should exist");
840 match job.subscribers.get_mut(&subscriber_id) {
841 Some(SubscriberType::Subscription(current_retention)) => {
842 *current_retention = retention_second;
843 }
844 Some(SubscriberType::SnapshotBackfill) => {
845 warn!(
846 %job_id,
847 %subscriber_id,
848 "cannot update retention for snapshot backfill subscriber"
849 );
850 }
851 None => {
852 warn!(%job_id, %subscriber_id, "subscription subscriber not found");
853 }
854 }
855 }
856
857 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
858 let job_id = self.fragment_location[&fragment_id];
859 let fragment = self
860 .jobs
861 .get_mut(&job_id)
862 .expect("should exist")
863 .fragment_infos
864 .get_mut(&fragment_id)
865 .expect("should exist");
866 (fragment, job_id)
867 }
868
869 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
870 Self {
871 database_id,
872 jobs: Default::default(),
873 fragment_location: Default::default(),
874 shared_actor_infos,
875 }
876 }
877
878 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
879 shared_actor_infos.remove_database(database_id);
881 Self::empty_inner(database_id, shared_actor_infos)
882 }
883
884 pub fn recover(
885 database_id: DatabaseId,
886 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
887 shared_actor_infos: SharedActorInfos,
888 ) -> Self {
889 let mut info = Self::empty_inner(database_id, shared_actor_infos);
890 for job in jobs {
891 info.add_existing(job);
892 }
893 info
894 }
895
896 pub fn is_empty(&self) -> bool {
897 self.jobs.is_empty()
898 }
899
900 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
901 let InflightStreamingJobInfo {
902 job_id,
903 fragment_infos,
904 subscribers,
905 status,
906 cdc_table_backfill_tracker,
907 } = job;
908 self.jobs
909 .try_insert(
910 job_id,
911 InflightStreamingJobInfo {
912 job_id,
913 subscribers,
914 fragment_infos: Default::default(), status,
916 cdc_table_backfill_tracker,
917 },
918 )
919 .expect("non-duplicate");
920 self.pre_apply_new_fragments(
921 fragment_infos
922 .into_iter()
923 .map(|(fragment_id, info)| (fragment_id, job_id, info)),
924 );
925 }
926
927 pub(crate) fn pre_apply_new_job(
929 &mut self,
930 job_id: JobId,
931 cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
932 ) {
933 {
934 self.jobs
935 .try_insert(
936 job_id,
937 InflightStreamingJobInfo {
938 job_id,
939 fragment_infos: Default::default(),
940 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
942 cdc_table_backfill_tracker,
943 },
944 )
945 .expect("non-duplicate");
946 }
947 }
948
949 pub(crate) fn pre_apply_new_fragments(
951 &mut self,
952 fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
953 ) {
954 {
955 let shared_infos = self.shared_actor_infos.clone();
956 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
957 for (fragment_id, job_id, info) in fragments {
958 {
959 {
960 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
961 shared_actor_writer.upsert([(&info, job_id)]);
962 fragment_infos
963 .fragment_infos
964 .try_insert(fragment_id, info)
965 .expect("non duplicate");
966 self.fragment_location
967 .try_insert(fragment_id, job_id)
968 .expect("non duplicate");
969 }
970 }
971 }
972 shared_actor_writer.finish();
973 }
974 }
975
976 pub(crate) fn pre_apply_reschedule(
979 &mut self,
980 fragment_id: FragmentId,
981 new_actors: HashMap<ActorId, InflightActorInfo>,
982 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
983 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
984 ) {
985 {
986 {
987 {
988 {
989 let (info, _) = self.fragment_mut(fragment_id);
990 let actors = &mut info.actors;
991 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
992 actors
993 .get_mut(&actor_id)
994 .expect("should exist")
995 .vnode_bitmap = Some(new_vnodes);
996 }
997 for (actor_id, actor) in new_actors {
998 actors
999 .try_insert(actor_id as _, actor)
1000 .expect("non-duplicate");
1001 }
1002 for (actor_id, splits) in actor_splits {
1003 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1004 }
1005 }
1007 }
1008 }
1009 }
1010 }
1011
1012 pub(crate) fn pre_apply_replace_node_upstream(
1014 &mut self,
1015 fragment_id: FragmentId,
1016 replace_map: &HashMap<FragmentId, FragmentId>,
1017 ) {
1018 {
1019 {
1020 {
1021 {
1022 let mut remaining_fragment_ids: HashSet<_> =
1023 replace_map.keys().cloned().collect();
1024 let (info, _) = self.fragment_mut(fragment_id);
1025 visit_stream_node_mut(&mut info.nodes, |node| {
1026 if let NodeBody::Merge(m) = node
1027 && let Some(new_upstream_fragment_id) =
1028 replace_map.get(&m.upstream_fragment_id)
1029 {
1030 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1031 if cfg!(debug_assertions) {
1032 panic!(
1033 "duplicate upstream fragment: {:?} {:?}",
1034 m, replace_map
1035 );
1036 } else {
1037 warn!(?m, ?replace_map, "duplicate upstream fragment");
1038 }
1039 }
1040 m.upstream_fragment_id = *new_upstream_fragment_id;
1041 }
1042 });
1043 if cfg!(debug_assertions) {
1044 assert!(
1045 remaining_fragment_ids.is_empty(),
1046 "non-existing fragment to replace: {:?} {:?} {:?}",
1047 remaining_fragment_ids,
1048 info.nodes,
1049 replace_map
1050 );
1051 } else {
1052 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1053 }
1054 }
1055 }
1056 }
1057 }
1058 }
1059
1060 pub(crate) fn pre_apply_add_node_upstream(
1062 &mut self,
1063 fragment_id: FragmentId,
1064 new_upstream_info: &PbUpstreamSinkInfo,
1065 ) {
1066 {
1067 {
1068 {
1069 {
1070 let (info, _) = self.fragment_mut(fragment_id);
1071 let mut injected = false;
1072 visit_stream_node_mut(&mut info.nodes, |node| {
1073 if let NodeBody::UpstreamSinkUnion(u) = node {
1074 if cfg!(debug_assertions) {
1075 let current_upstream_fragment_ids = u
1076 .init_upstreams
1077 .iter()
1078 .map(|upstream| upstream.upstream_fragment_id)
1079 .collect::<HashSet<_>>();
1080 if current_upstream_fragment_ids
1081 .contains(&new_upstream_info.upstream_fragment_id)
1082 {
1083 panic!(
1084 "duplicate upstream fragment: {:?} {:?}",
1085 u, new_upstream_info
1086 );
1087 }
1088 }
1089 u.init_upstreams.push(new_upstream_info.clone());
1090 injected = true;
1091 }
1092 });
1093 assert!(injected, "should inject upstream into UpstreamSinkUnion");
1094 }
1095 }
1096 }
1097 }
1098 }
1099
1100 pub(crate) fn pre_apply_drop_node_upstream(
1102 &mut self,
1103 fragment_id: FragmentId,
1104 drop_upstream_fragment_ids: &[FragmentId],
1105 ) {
1106 if !self.fragment_location.contains_key(&fragment_id) {
1107 warn!(
1108 target_fragment_id = %fragment_id,
1109 drop_upstream_fragment_ids = ?drop_upstream_fragment_ids,
1110 "skip dropping upstream sink fragments for non-existing target fragment"
1111 );
1112 return;
1113 }
1114 {
1115 {
1116 {
1117 {
1118 let (info, _) = self.fragment_mut(fragment_id);
1119 let mut removed = false;
1120 visit_stream_node_mut(&mut info.nodes, |node| {
1121 if let NodeBody::UpstreamSinkUnion(u) = node {
1122 if cfg!(debug_assertions) {
1123 let current_upstream_fragment_ids = u
1124 .init_upstreams
1125 .iter()
1126 .map(|upstream| upstream.upstream_fragment_id)
1127 .collect::<HashSet<FragmentId>>();
1128 for drop_fragment_id in drop_upstream_fragment_ids {
1129 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1130 {
1131 panic!(
1132 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1133 u, drop_upstream_fragment_ids, drop_fragment_id
1134 );
1135 }
1136 }
1137 }
1138 u.init_upstreams.retain(|upstream| {
1139 !drop_upstream_fragment_ids
1140 .contains(&upstream.upstream_fragment_id)
1141 });
1142 removed = true;
1143 }
1144 });
1145 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1146 }
1147 }
1148 }
1149 }
1150 }
1151
1152 pub(crate) fn pre_apply_throttle(&mut self, fragment_id: FragmentId, config: &ThrottleConfig) {
1155 if !self.fragment_location.contains_key(&fragment_id) {
1158 return;
1159 }
1160 let throttle_type = config.throttle_type();
1161 let rate_limit = config.rate_limit;
1162 let (info, _) = self.fragment_mut(fragment_id);
1163
1164 visit_stream_node_mut(&mut info.nodes, |node| match throttle_type {
1165 ThrottleType::Source => {
1166 if let NodeBody::Source(node) = node
1167 && let Some(node_inner) = &mut node.source_inner
1168 {
1169 node_inner.rate_limit = rate_limit;
1170 }
1171 if let NodeBody::StreamFsFetch(node) = node
1172 && let Some(node_inner) = &mut node.node_inner
1173 {
1174 node_inner.rate_limit = rate_limit;
1175 }
1176 }
1177 ThrottleType::Backfill => match node {
1178 NodeBody::StreamCdcScan(node) => node.rate_limit = rate_limit,
1179 NodeBody::StreamScan(node) => node.rate_limit = rate_limit,
1180 NodeBody::SourceBackfill(node) => node.rate_limit = rate_limit,
1181 _ => {}
1182 },
1183 ThrottleType::Sink => {
1184 if let NodeBody::Sink(node) = node {
1185 node.rate_limit = rate_limit;
1186 }
1187 }
1188 ThrottleType::Dml => {
1189 if let NodeBody::Dml(node) = node {
1190 node.rate_limit = rate_limit;
1191 }
1192 }
1193 ThrottleType::Unspecified => {}
1194 });
1195 }
1196
1197 pub(crate) fn pre_apply_split_assignments(
1199 &mut self,
1200 assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1201 ) {
1202 {
1203 let shared_infos = self.shared_actor_infos.clone();
1204 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1205 {
1206 {
1207 for (fragment_id, actor_splits) in assignments {
1208 let (info, job_id) = self.fragment_mut(fragment_id);
1209 let actors = &mut info.actors;
1210 for (actor_id, splits) in actor_splits {
1211 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1212 }
1213 shared_actor_writer.upsert([(&*info, job_id)]);
1214 }
1215 }
1216 }
1217 shared_actor_writer.finish();
1218 }
1219 }
1220
1221 pub(super) fn build_edge(
1222 &self,
1223 info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1224 replace_job: Option<&ReplaceStreamJobPlan>,
1225 new_upstream_sink: Option<&UpstreamSinkInfo>,
1226 control_stream_manager: &ControlStreamManager,
1227 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1228 actor_location: &HashMap<ActorId, WorkerId>,
1229 ) -> FragmentEdgeBuildResult {
1230 let existing_fragment_ids = info
1238 .into_iter()
1239 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1240 .chain(replace_job.into_iter().flat_map(|replace_job| {
1241 replace_job
1242 .upstream_fragment_downstreams
1243 .keys()
1244 .filter(|fragment_id| {
1245 info.map(|(info, _)| {
1246 !info
1247 .stream_job_fragments
1248 .fragments
1249 .contains_key(*fragment_id)
1250 })
1251 .unwrap_or(true)
1252 })
1253 .chain(replace_job.replace_upstream.keys())
1254 }))
1255 .chain(
1256 new_upstream_sink
1257 .into_iter()
1258 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1259 )
1260 .cloned();
1261 let new_fragments = info
1263 .into_iter()
1264 .flat_map(|(info, is_snapshot_backfill)| {
1265 let partial_graph_id = to_partial_graph_id(
1266 self.database_id,
1267 is_snapshot_backfill.then_some(info.streaming_job.id()),
1268 );
1269 info.stream_job_fragments
1270 .fragments
1271 .values()
1272 .map(move |fragment| (partial_graph_id, fragment))
1273 })
1274 .chain(replace_job.into_iter().flat_map(|replace_job| {
1275 replace_job
1276 .new_fragments
1277 .fragments
1278 .values()
1279 .chain(
1280 replace_job
1281 .auto_refresh_schema_sinks
1282 .as_ref()
1283 .into_iter()
1284 .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1285 )
1286 .map(|fragment| {
1287 (
1288 to_partial_graph_id(self.database_id, None),
1290 fragment,
1291 )
1292 })
1293 }));
1294
1295 let mut builder = FragmentEdgeBuilder::new(
1296 existing_fragment_ids
1298 .map(|fragment_id| {
1299 (
1300 fragment_id,
1301 EdgeBuilderFragmentInfo::from_inflight(
1302 self.fragment(fragment_id),
1303 to_partial_graph_id(self.database_id, None),
1304 control_stream_manager,
1305 ),
1306 )
1307 })
1308 .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1310 (
1311 fragment.fragment_id,
1312 EdgeBuilderFragmentInfo::from_fragment(
1313 fragment,
1314 stream_actors,
1315 actor_location,
1316 partial_graph_id,
1317 control_stream_manager,
1318 ),
1319 )
1320 })),
1321 );
1322 if let Some((info, _)) = info {
1323 builder.add_relations(&info.upstream_fragment_downstreams);
1324 builder.add_relations(&info.stream_job_fragments.downstreams);
1325 }
1326 if let Some(replace_job) = replace_job {
1327 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1328 builder.add_relations(&replace_job.new_fragments.downstreams);
1329 }
1330 if let Some(new_upstream_sink) = new_upstream_sink {
1331 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1332 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1333 builder.add_edge(sink_fragment_id, new_sink_downstream);
1334 }
1335 if let Some(replace_job) = replace_job {
1336 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1337 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1338 fragment_replacement
1339 {
1340 builder.replace_upstream(
1341 *fragment_id,
1342 *original_upstream_fragment_id,
1343 *new_upstream_fragment_id,
1344 );
1345 }
1346 }
1347 }
1348 builder.build()
1349 }
1350
1351 pub(crate) fn post_apply_reschedules(
1353 &mut self,
1354 reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1355 ) {
1356 let inner = self.shared_actor_infos.clone();
1357 let mut shared_actor_writer = inner.start_writer(self.database_id);
1358 {
1359 {
1360 {
1361 for (fragment_id, to_remove) in reschedules {
1362 let job_id = self.fragment_location[&fragment_id];
1363 let info = self
1364 .jobs
1365 .get_mut(&job_id)
1366 .expect("should exist")
1367 .fragment_infos
1368 .get_mut(&fragment_id)
1369 .expect("should exist");
1370 for actor_id in to_remove {
1371 assert!(info.actors.remove(&actor_id).is_some());
1372 }
1373 shared_actor_writer.upsert([(&*info, job_id)]);
1374 }
1375 }
1376 }
1377 }
1378 shared_actor_writer.finish();
1379 }
1380
1381 pub(crate) fn post_apply_remove_fragments(
1383 &mut self,
1384 fragment_ids: impl IntoIterator<Item = FragmentId>,
1385 ) {
1386 let inner = self.shared_actor_infos.clone();
1387 let mut shared_actor_writer = inner.start_writer(self.database_id);
1388 {
1389 {
1390 {
1391 for fragment_id in fragment_ids {
1392 let job_id = self
1393 .fragment_location
1394 .remove(&fragment_id)
1395 .expect("should exist");
1396 let job = self.jobs.get_mut(&job_id).expect("should exist");
1397 let fragment = job
1398 .fragment_infos
1399 .remove(&fragment_id)
1400 .expect("should exist");
1401 shared_actor_writer.remove(&fragment);
1402 if job.fragment_infos.is_empty() {
1403 self.jobs.remove(&job_id).expect("should exist");
1404 }
1405 }
1406 }
1407 }
1408 }
1409 shared_actor_writer.finish();
1410 }
1411
1412 pub(crate) fn post_apply_remove_job(
1413 &mut self,
1414 job_id: JobId,
1415 ) -> Option<InflightStreamingJobInfo> {
1416 let job = self.jobs.remove(&job_id)?;
1417 let inner = self.shared_actor_infos.clone();
1418 let mut shared_actor_writer = inner.start_writer(self.database_id);
1419 for (fragment_id, fragment) in &job.fragment_infos {
1420 self.fragment_location
1421 .remove(fragment_id)
1422 .expect("should exist");
1423 shared_actor_writer.remove(fragment);
1424 }
1425 shared_actor_writer.finish();
1426 Some(job)
1427 }
1428}
1429
1430impl InflightFragmentInfo {
1431 pub(crate) fn actor_ids_to_collect(
1433 infos: impl IntoIterator<Item = &Self>,
1434 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1435 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1436 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1437 assert!(
1438 ret.entry(actor.worker_id)
1439 .or_default()
1440 .insert(*actor_id as _)
1441 )
1442 }
1443 ret
1444 }
1445
1446 pub fn existing_table_ids<'a>(
1447 infos: impl IntoIterator<Item = &'a Self> + 'a,
1448 ) -> impl Iterator<Item = TableId> + 'a {
1449 infos
1450 .into_iter()
1451 .flat_map(|info| info.state_table_ids.iter().cloned())
1452 }
1453
1454 pub fn workers<'a>(
1455 infos: impl IntoIterator<Item = &'a Self> + 'a,
1456 ) -> impl Iterator<Item = WorkerId> + 'a {
1457 infos
1458 .into_iter()
1459 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1460 }
1461
1462 pub fn contains_worker<'a>(
1463 infos: impl IntoIterator<Item = &'a Self> + 'a,
1464 worker_id: WorkerId,
1465 ) -> bool {
1466 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1467 }
1468}
1469
1470impl InflightDatabaseInfo {
1471 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1472 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1473 }
1474
1475 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1476 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1477 }
1478}