1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::ddl_service::PbBackfillType;
33use risingwave_pb::hummock::HummockVersionStats;
34use risingwave_pb::id::SubscriberId;
35use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
36use risingwave_pb::meta::subscribe_response::Operation;
37use risingwave_pb::source::PbCdcTableSnapshotSplits;
38use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
41use risingwave_pb::stream_service::BarrierCompleteResponse;
42use tracing::{info, warn};
43
44use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
45use crate::barrier::command::{
46 CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
47};
48use crate::barrier::edge_builder::{
49 EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
50};
51use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
52use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
53use crate::barrier::{
54 BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
55};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::utils::rebuild_fragment_mapping;
58use crate::manager::NotificationManagerRef;
59use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
60use crate::stream::UpstreamSinkInfo;
61use crate::{MetaError, MetaResult};
62
63#[derive(Debug, Clone)]
64pub struct SharedActorInfo {
65 pub worker_id: WorkerId,
66 pub vnode_bitmap: Option<Bitmap>,
67 pub splits: Vec<SplitImpl>,
68}
69
70impl From<&InflightActorInfo> for SharedActorInfo {
71 fn from(value: &InflightActorInfo) -> Self {
72 Self {
73 worker_id: value.worker_id,
74 vnode_bitmap: value.vnode_bitmap.clone(),
75 splits: value.splits.clone(),
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct SharedFragmentInfo {
82 pub fragment_id: FragmentId,
83 pub job_id: JobId,
84 pub distribution_type: DistributionType,
85 pub actors: HashMap<ActorId, SharedActorInfo>,
86 pub vnode_count: usize,
87 pub fragment_type_mask: FragmentTypeMask,
88}
89
90impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
91 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
92 let (info, job_id) = pair;
93
94 let InflightFragmentInfo {
95 fragment_id,
96 distribution_type,
97 fragment_type_mask,
98 actors,
99 vnode_count,
100 ..
101 } = info;
102
103 Self {
104 fragment_id: *fragment_id,
105 job_id,
106 distribution_type: *distribution_type,
107 fragment_type_mask: *fragment_type_mask,
108 actors: actors
109 .iter()
110 .map(|(actor_id, actor)| (*actor_id, actor.into()))
111 .collect(),
112 vnode_count: *vnode_count,
113 }
114 }
115}
116
117#[derive(Default, Debug)]
118pub struct SharedActorInfosInner {
119 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
120}
121
122impl SharedActorInfosInner {
123 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
124 self.info
125 .values()
126 .find_map(|database| database.get(&fragment_id))
127 }
128
129 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
130 self.info.values().flatten()
131 }
132}
133
134#[derive(Clone, educe::Educe)]
135#[educe(Debug)]
136pub struct SharedActorInfos {
137 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
138 #[educe(Debug(ignore))]
139 notification_manager: NotificationManagerRef,
140}
141
142impl SharedActorInfos {
143 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
144 self.inner.read()
145 }
146
147 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
148 let core = self.inner.read();
149 core.iter_over_fragments()
150 .flat_map(|(_, fragment)| {
151 fragment
152 .actors
153 .iter()
154 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
155 })
156 .collect()
157 }
158
159 pub fn migrate_splits_for_source_actors(
165 &self,
166 fragment_id: FragmentId,
167 prev_actor_ids: &[ActorId],
168 curr_actor_ids: &[ActorId],
169 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
170 let guard = self.read_guard();
171
172 let prev_splits = prev_actor_ids
173 .iter()
174 .flat_map(|actor_id| {
175 guard
177 .get_fragment(fragment_id)
178 .and_then(|info| info.actors.get(actor_id))
179 .map(|actor| actor.splits.clone())
180 .unwrap_or_default()
181 })
182 .map(|split| (split.id(), split))
183 .collect();
184
185 let empty_actor_splits = curr_actor_ids
186 .iter()
187 .map(|actor_id| (*actor_id, vec![]))
188 .collect();
189
190 let diff = crate::stream::source_manager::reassign_splits(
191 fragment_id,
192 empty_actor_splits,
193 &prev_splits,
194 std::default::Default::default(),
196 )
197 .unwrap_or_default();
198
199 Ok(diff)
200 }
201}
202
203impl SharedActorInfos {
204 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
205 Self {
206 inner: Arc::new(Default::default()),
207 notification_manager,
208 }
209 }
210
211 pub(super) fn remove_database(&self, database_id: DatabaseId) {
212 if let Some(database) = self.inner.write().info.remove(&database_id) {
213 let mapping = database
214 .into_values()
215 .map(|fragment| rebuild_fragment_mapping(&fragment))
216 .collect_vec();
217 if !mapping.is_empty() {
218 self.notification_manager
219 .notify_streaming_fragment_mapping(Operation::Delete, mapping);
220 }
221 }
222 }
223
224 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
225 let database_ids: HashSet<_> = database_ids.into_iter().collect();
226
227 let mut mapping = Vec::new();
228 for fragment in self
229 .inner
230 .write()
231 .info
232 .extract_if(|database_id, _| !database_ids.contains(database_id))
233 .flat_map(|(_, fragments)| fragments.into_values())
234 {
235 mapping.push(rebuild_fragment_mapping(&fragment));
236 }
237 if !mapping.is_empty() {
238 self.notification_manager
239 .notify_streaming_fragment_mapping(Operation::Delete, mapping);
240 }
241 }
242
243 pub(super) fn recover_database(
244 &self,
245 database_id: DatabaseId,
246 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
247 ) {
248 let mut remaining_fragments: HashMap<_, _> = fragments
249 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
250 .collect();
251 let mut writer = self.start_writer(database_id);
253 let database = writer.write_guard.info.entry(database_id).or_default();
254 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
255 if let Some(info) = remaining_fragments.remove(fragment_id) {
256 let info = info.into();
257 writer
258 .updated_fragment_mapping
259 .get_or_insert_default()
260 .push(rebuild_fragment_mapping(&info));
261 *fragment_infos = info;
262 false
263 } else {
264 true
265 }
266 }) {
267 writer
268 .deleted_fragment_mapping
269 .get_or_insert_default()
270 .push(rebuild_fragment_mapping(&fragment));
271 }
272 for (fragment_id, info) in remaining_fragments {
273 let info = info.into();
274 writer
275 .added_fragment_mapping
276 .get_or_insert_default()
277 .push(rebuild_fragment_mapping(&info));
278 database.insert(fragment_id, info);
279 }
280 writer.finish();
281 }
282
283 pub(super) fn upsert(
284 &self,
285 database_id: DatabaseId,
286 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
287 ) {
288 let mut writer = self.start_writer(database_id);
289 writer.upsert(infos);
290 writer.finish();
291 }
292
293 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
294 SharedActorInfoWriter {
295 database_id,
296 write_guard: self.inner.write(),
297 notification_manager: &self.notification_manager,
298 added_fragment_mapping: None,
299 updated_fragment_mapping: None,
300 deleted_fragment_mapping: None,
301 }
302 }
303}
304
305pub(super) struct SharedActorInfoWriter<'a> {
306 database_id: DatabaseId,
307 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
308 notification_manager: &'a NotificationManagerRef,
309 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
311 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
312}
313
314impl SharedActorInfoWriter<'_> {
315 pub(super) fn upsert(
316 &mut self,
317 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
318 ) {
319 let database = self.write_guard.info.entry(self.database_id).or_default();
320 for info @ (fragment, _) in infos {
321 match database.entry(fragment.fragment_id) {
322 Entry::Occupied(mut entry) => {
323 let info = info.into();
324 self.updated_fragment_mapping
325 .get_or_insert_default()
326 .push(rebuild_fragment_mapping(&info));
327 entry.insert(info);
328 }
329 Entry::Vacant(entry) => {
330 let info = info.into();
331 self.added_fragment_mapping
332 .get_or_insert_default()
333 .push(rebuild_fragment_mapping(&info));
334 entry.insert(info);
335 }
336 }
337 }
338 }
339
340 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
341 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
342 && let Some(fragment) = database.remove(&info.fragment_id)
343 {
344 self.deleted_fragment_mapping
345 .get_or_insert_default()
346 .push(rebuild_fragment_mapping(&fragment));
347 }
348 }
349
350 pub(super) fn finish(self) {
351 if let Some(mapping) = self.added_fragment_mapping {
352 self.notification_manager
353 .notify_streaming_fragment_mapping(Operation::Add, mapping);
354 }
355 if let Some(mapping) = self.updated_fragment_mapping {
356 self.notification_manager
357 .notify_streaming_fragment_mapping(Operation::Update, mapping);
358 }
359 if let Some(mapping) = self.deleted_fragment_mapping {
360 self.notification_manager
361 .notify_streaming_fragment_mapping(Operation::Delete, mapping);
362 }
363 }
364}
365
366#[derive(Debug, Clone)]
367pub(super) struct BarrierInfo {
368 pub prev_epoch: TracedEpoch,
369 pub curr_epoch: TracedEpoch,
370 pub kind: BarrierKind,
371}
372
373impl BarrierInfo {
374 pub(super) fn prev_epoch(&self) -> u64 {
375 self.prev_epoch.value().0
376 }
377
378 pub(super) fn curr_epoch(&self) -> u64 {
379 self.curr_epoch.value().0
380 }
381
382 pub(super) fn epoch(&self) -> EpochPair {
383 EpochPair {
384 curr: self.curr_epoch(),
385 prev: self.prev_epoch(),
386 }
387 }
388}
389
390#[derive(Clone, Debug)]
391pub enum SubscriberType {
392 Subscription(u64),
393 SnapshotBackfill,
394}
395
396#[derive(Debug)]
397pub(super) enum CreateStreamingJobStatus {
398 Init,
399 Creating { tracker: CreateMviewProgressTracker },
400 Created,
401}
402
403#[derive(Debug)]
404pub(super) struct InflightStreamingJobInfo {
405 pub job_id: JobId,
406 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
407 pub subscribers: HashMap<SubscriberId, SubscriberType>,
408 pub status: CreateStreamingJobStatus,
409 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
410}
411
412impl InflightStreamingJobInfo {
413 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
414 self.fragment_infos.values()
415 }
416
417 pub fn snapshot_backfill_actor_ids(
418 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
419 ) -> impl Iterator<Item = ActorId> + '_ {
420 fragment_infos
421 .values()
422 .filter(|fragment| {
423 fragment
424 .fragment_type_mask
425 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
426 })
427 .flat_map(|fragment| fragment.actors.keys().copied())
428 }
429
430 pub fn tracking_progress_actor_ids(
431 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
432 ) -> Vec<(ActorId, BackfillUpstreamType)> {
433 StreamJobFragments::tracking_progress_actor_ids_impl(
434 fragment_infos
435 .values()
436 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
437 )
438 }
439}
440
441impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
442 type Item = &'a InflightFragmentInfo;
443
444 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
445
446 fn into_iter(self) -> Self::IntoIter {
447 self.fragment_infos()
448 }
449}
450
451#[derive(Debug)]
452pub struct InflightDatabaseInfo {
453 pub(super) database_id: DatabaseId,
454 jobs: HashMap<JobId, InflightStreamingJobInfo>,
455 fragment_location: HashMap<FragmentId, JobId>,
456 pub(super) shared_actor_infos: SharedActorInfos,
457}
458
459impl InflightDatabaseInfo {
460 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
461 self.jobs.values().flat_map(|job| job.fragment_infos())
462 }
463
464 pub fn contains_job(&self, job_id: JobId) -> bool {
465 self.jobs.contains_key(&job_id)
466 }
467
468 pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
469 self.fragment_location.get(&fragment_id).copied()
470 }
471
472 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
473 let job_id = self.fragment_location[&fragment_id];
474 self.jobs
475 .get(&job_id)
476 .expect("should exist")
477 .fragment_infos
478 .get(&fragment_id)
479 .expect("should exist")
480 }
481
482 pub(super) fn backfill_fragment_ids_for_job(
483 &self,
484 job_id: JobId,
485 ) -> MetaResult<HashSet<FragmentId>> {
486 let job = self
487 .jobs
488 .get(&job_id)
489 .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
490 Ok(job
491 .fragment_infos
492 .iter()
493 .filter_map(|(fragment_id, fragment)| {
494 fragment
495 .fragment_type_mask
496 .contains_any([
497 FragmentTypeFlag::StreamScan,
498 FragmentTypeFlag::SourceScan,
499 FragmentTypeFlag::LocalityProvider,
500 ])
501 .then_some(*fragment_id)
502 })
503 .collect())
504 }
505
506 pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
507 let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
508 MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
509 })?;
510 let fragment = self
511 .jobs
512 .get(job_id)
513 .expect("should exist")
514 .fragment_infos
515 .get(&fragment_id)
516 .expect("should exist");
517 Ok(fragment.fragment_type_mask.contains_any([
518 FragmentTypeFlag::StreamScan,
519 FragmentTypeFlag::SourceScan,
520 FragmentTypeFlag::LocalityProvider,
521 ]))
522 }
523
524 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
525 self.jobs
526 .iter()
527 .filter_map(|(job_id, job)| match &job.status {
528 CreateStreamingJobStatus::Init => None,
529 CreateStreamingJobStatus::Creating { tracker } => {
530 let progress = tracker.gen_backfill_progress();
531 Some((
532 *job_id,
533 BackfillProgress {
534 progress,
535 backfill_type: PbBackfillType::NormalBackfill,
536 },
537 ))
538 }
539 CreateStreamingJobStatus::Created => None,
540 })
541 }
542
543 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
544 self.jobs.iter().filter_map(|(job_id, job)| {
545 job.cdc_table_backfill_tracker
546 .as_ref()
547 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
548 })
549 }
550
551 pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
552 let mut result = Vec::new();
553 for job in self.jobs.values() {
554 let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
555 continue;
556 };
557 let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
558 result.extend(fragment_progress);
559 }
560 result
561 }
562
563 pub(super) fn may_assign_fragment_cdc_backfill_splits(
564 &mut self,
565 fragment_id: FragmentId,
566 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
567 let job_id = self.fragment_location[&fragment_id];
568 let job = self.jobs.get_mut(&job_id).expect("should exist");
569 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
570 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
571 if cdc_scan_fragment_id != fragment_id {
572 return Ok(None);
573 }
574 let actors = job.fragment_infos[&cdc_scan_fragment_id]
575 .actors
576 .keys()
577 .copied()
578 .collect();
579 tracker.reassign_splits(actors).map(Some)
580 } else {
581 Ok(None)
582 }
583 }
584
585 pub(super) fn assign_cdc_backfill_splits(
586 &mut self,
587 job_id: JobId,
588 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
589 let job = self.jobs.get_mut(&job_id).expect("should exist");
590 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
591 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
592 let actors = job.fragment_infos[&cdc_scan_fragment_id]
593 .actors
594 .keys()
595 .copied()
596 .collect();
597 tracker.reassign_splits(actors).map(Some)
598 } else {
599 Ok(None)
600 }
601 }
602
603 pub(super) fn apply_collected_command(
604 &mut self,
605 command: &PostCollectCommand,
606 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
607 version_stats: &HummockVersionStats,
608 ) {
609 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
610 match job_type {
611 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
612 let job_id = info.streaming_job.id();
613 if let Some(job_info) = self.jobs.get_mut(&job_id) {
614 let CreateStreamingJobStatus::Init = replace(
615 &mut job_info.status,
616 CreateStreamingJobStatus::Creating {
617 tracker: CreateMviewProgressTracker::new(
618 info,
619 version_stats,
620 &job_info.fragment_infos,
621 ),
622 },
623 ) else {
624 unreachable!("should be init before collect the first barrier")
625 };
626 } else {
627 info!(%job_id, "newly create job get cancelled before first barrier is collected")
628 }
629 }
630 CreateStreamingJobType::SnapshotBackfill(_)
631 | CreateStreamingJobType::BatchRefresh(_) => {
632 }
634 }
635 }
636 if let PostCollectCommand::Reschedule { reschedules, .. } = command {
637 debug_assert!(
639 reschedules
640 .values()
641 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
642 "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
643 );
644
645 let related_job_ids = reschedules
647 .keys()
648 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
649 .cloned()
650 .collect::<HashSet<_>>();
651 for job_id in related_job_ids {
652 if let Some(job) = self.jobs.get_mut(&job_id)
653 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
654 {
655 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
656 }
657 }
658 }
659 for progress in resps.values().flat_map(|resp| &resp.create_mview_progress) {
660 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
661 warn!(
662 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
663 );
664 continue;
665 };
666 let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
667 CreateStreamingJobStatus::Init => {
668 continue;
669 }
670 CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
671 CreateStreamingJobStatus::Created => {
672 if !progress.done {
673 warn!("update the progress of an created streaming job: {progress:?}");
674 }
675 continue;
676 }
677 };
678 tracker.apply_progress(progress, version_stats);
679 }
680 for progress in resps
681 .values()
682 .flat_map(|resp| &resp.cdc_table_backfill_progress)
683 {
684 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
685 warn!(
686 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
687 );
688 continue;
689 };
690 let Some(tracker) = &mut self
691 .jobs
692 .get_mut(job_id)
693 .expect("should exist")
694 .cdc_table_backfill_tracker
695 else {
696 warn!("update the cdc progress of an created streaming job: {progress:?}");
697 continue;
698 };
699 tracker.update_split_progress(progress);
700 }
701 for cdc_offset_updated in resps
703 .values()
704 .flat_map(|resp| &resp.cdc_source_offset_updated)
705 {
706 use risingwave_common::id::SourceId;
707 let source_id = SourceId::new(cdc_offset_updated.source_id);
708 let job_id = source_id.as_share_source_job_id();
709 if let Some(job) = self.jobs.get_mut(&job_id) {
710 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
711 tracker.mark_cdc_source_finished();
712 }
713 } else {
714 warn!(
715 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
716 cdc_offset_updated.source_id, job_id
717 );
718 }
719 }
720 }
721
722 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
723 self.jobs.values().filter_map(|job| match &job.status {
724 CreateStreamingJobStatus::Init => None,
725 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
726 CreateStreamingJobStatus::Created => None,
727 })
728 }
729
730 fn iter_mut_creating_job_tracker(
731 &mut self,
732 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
733 self.jobs
734 .values_mut()
735 .filter_map(|job| match &mut job.status {
736 CreateStreamingJobStatus::Init => None,
737 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
738 CreateStreamingJobStatus::Created => None,
739 })
740 }
741
742 pub(super) fn has_pending_finished_jobs(&self) -> bool {
743 self.iter_creating_job_tracker()
744 .any(|tracker| tracker.is_finished())
745 }
746
747 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
748 self.iter_mut_creating_job_tracker()
749 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
750 .collect()
751 }
752
753 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
754 let mut finished_jobs = vec![];
755 let mut table_ids_to_truncate = vec![];
756 let mut finished_cdc_table_backfill = vec![];
757 for (job_id, job) in &mut self.jobs {
758 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
759 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
760 table_ids_to_truncate.extend(truncate_table_ids);
761 if is_finished {
762 let CreateStreamingJobStatus::Creating { tracker, .. } =
763 replace(&mut job.status, CreateStreamingJobStatus::Created)
764 else {
765 unreachable!()
766 };
767 finished_jobs.push(tracker.into_tracking_job());
768 }
769 }
770 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
771 && tracker.take_pre_completed()
772 {
773 finished_cdc_table_backfill.push(*job_id);
774 }
775 }
776 StagingCommitInfo {
777 finished_jobs,
778 table_ids_to_truncate,
779 finished_cdc_table_backfill,
780 }
781 }
782
783 pub fn fragment_subscribers(
784 &self,
785 fragment_id: FragmentId,
786 ) -> impl Iterator<Item = SubscriberId> + '_ {
787 let job_id = self.fragment_location[&fragment_id];
788 self.jobs[&job_id].subscribers.keys().copied()
789 }
790
791 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
792 self.jobs[&job_id].subscribers.keys().copied()
793 }
794
795 pub fn max_subscription_retention(&self) -> impl Iterator<Item = (TableId, u64)> + '_ {
796 self.jobs.iter().filter_map(|(job_id, info)| {
797 info.subscribers
798 .values()
799 .filter_map(|subscriber| match subscriber {
800 SubscriberType::Subscription(retention) => Some(*retention),
801 SubscriberType::SnapshotBackfill => None,
802 })
803 .max()
804 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
805 })
806 }
807
808 pub fn register_subscriber(
809 &mut self,
810 job_id: JobId,
811 subscriber_id: SubscriberId,
812 subscriber: SubscriberType,
813 ) {
814 self.jobs
815 .get_mut(&job_id)
816 .expect("should exist")
817 .subscribers
818 .try_insert(subscriber_id, subscriber)
819 .expect("non duplicate");
820 }
821
822 pub fn unregister_subscriber(
823 &mut self,
824 job_id: JobId,
825 subscriber_id: SubscriberId,
826 ) -> Option<SubscriberType> {
827 self.jobs
828 .get_mut(&job_id)
829 .expect("should exist")
830 .subscribers
831 .remove(&subscriber_id)
832 }
833
834 pub fn update_subscription_retention(
835 &mut self,
836 job_id: JobId,
837 subscriber_id: SubscriberId,
838 retention_second: u64,
839 ) {
840 let job = self.jobs.get_mut(&job_id).expect("should exist");
841 match job.subscribers.get_mut(&subscriber_id) {
842 Some(SubscriberType::Subscription(current_retention)) => {
843 *current_retention = retention_second;
844 }
845 Some(SubscriberType::SnapshotBackfill) => {
846 warn!(
847 %job_id,
848 %subscriber_id,
849 "cannot update retention for snapshot backfill subscriber"
850 );
851 }
852 None => {
853 warn!(%job_id, %subscriber_id, "subscription subscriber not found");
854 }
855 }
856 }
857
858 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
859 let job_id = self.fragment_location[&fragment_id];
860 let fragment = self
861 .jobs
862 .get_mut(&job_id)
863 .expect("should exist")
864 .fragment_infos
865 .get_mut(&fragment_id)
866 .expect("should exist");
867 (fragment, job_id)
868 }
869
870 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
871 Self {
872 database_id,
873 jobs: Default::default(),
874 fragment_location: Default::default(),
875 shared_actor_infos,
876 }
877 }
878
879 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
880 shared_actor_infos.remove_database(database_id);
882 Self::empty_inner(database_id, shared_actor_infos)
883 }
884
885 pub fn recover(
886 database_id: DatabaseId,
887 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
888 shared_actor_infos: SharedActorInfos,
889 ) -> Self {
890 let mut info = Self::empty_inner(database_id, shared_actor_infos);
891 for job in jobs {
892 info.add_existing(job);
893 }
894 info
895 }
896
897 pub fn is_empty(&self) -> bool {
898 self.jobs.is_empty()
899 }
900
901 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
902 let InflightStreamingJobInfo {
903 job_id,
904 fragment_infos,
905 subscribers,
906 status,
907 cdc_table_backfill_tracker,
908 } = job;
909 self.jobs
910 .try_insert(
911 job_id,
912 InflightStreamingJobInfo {
913 job_id,
914 subscribers,
915 fragment_infos: Default::default(), status,
917 cdc_table_backfill_tracker,
918 },
919 )
920 .expect("non-duplicate");
921 self.pre_apply_new_fragments(
922 fragment_infos
923 .into_iter()
924 .map(|(fragment_id, info)| (fragment_id, job_id, info)),
925 );
926 }
927
928 pub(crate) fn pre_apply_new_job(
930 &mut self,
931 job_id: JobId,
932 cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
933 ) {
934 {
935 self.jobs
936 .try_insert(
937 job_id,
938 InflightStreamingJobInfo {
939 job_id,
940 fragment_infos: Default::default(),
941 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
943 cdc_table_backfill_tracker,
944 },
945 )
946 .expect("non-duplicate");
947 }
948 }
949
950 pub(crate) fn pre_apply_new_fragments(
952 &mut self,
953 fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
954 ) {
955 {
956 let shared_infos = self.shared_actor_infos.clone();
957 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
958 for (fragment_id, job_id, info) in fragments {
959 {
960 {
961 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
962 shared_actor_writer.upsert([(&info, job_id)]);
963 fragment_infos
964 .fragment_infos
965 .try_insert(fragment_id, info)
966 .expect("non duplicate");
967 self.fragment_location
968 .try_insert(fragment_id, job_id)
969 .expect("non duplicate");
970 }
971 }
972 }
973 shared_actor_writer.finish();
974 }
975 }
976
977 pub(crate) fn pre_apply_reschedule(
980 &mut self,
981 fragment_id: FragmentId,
982 new_actors: HashMap<ActorId, InflightActorInfo>,
983 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
984 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
985 ) {
986 {
987 {
988 {
989 {
990 let (info, _) = self.fragment_mut(fragment_id);
991 let actors = &mut info.actors;
992 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
993 actors
994 .get_mut(&actor_id)
995 .expect("should exist")
996 .vnode_bitmap = Some(new_vnodes);
997 }
998 for (actor_id, actor) in new_actors {
999 actors
1000 .try_insert(actor_id as _, actor)
1001 .expect("non-duplicate");
1002 }
1003 for (actor_id, splits) in actor_splits {
1004 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1005 }
1006 }
1008 }
1009 }
1010 }
1011 }
1012
1013 pub(crate) fn pre_apply_replace_node_upstream(
1015 &mut self,
1016 fragment_id: FragmentId,
1017 replace_map: &HashMap<FragmentId, FragmentId>,
1018 ) {
1019 {
1020 {
1021 {
1022 {
1023 let mut remaining_fragment_ids: HashSet<_> =
1024 replace_map.keys().cloned().collect();
1025 let (info, _) = self.fragment_mut(fragment_id);
1026 visit_stream_node_mut(&mut info.nodes, |node| {
1027 if let NodeBody::Merge(m) = node
1028 && let Some(new_upstream_fragment_id) =
1029 replace_map.get(&m.upstream_fragment_id)
1030 {
1031 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1032 if cfg!(debug_assertions) {
1033 panic!(
1034 "duplicate upstream fragment: {:?} {:?}",
1035 m, replace_map
1036 );
1037 } else {
1038 warn!(?m, ?replace_map, "duplicate upstream fragment");
1039 }
1040 }
1041 m.upstream_fragment_id = *new_upstream_fragment_id;
1042 }
1043 });
1044 if cfg!(debug_assertions) {
1045 assert!(
1046 remaining_fragment_ids.is_empty(),
1047 "non-existing fragment to replace: {:?} {:?} {:?}",
1048 remaining_fragment_ids,
1049 info.nodes,
1050 replace_map
1051 );
1052 } else {
1053 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1054 }
1055 }
1056 }
1057 }
1058 }
1059 }
1060
1061 pub(crate) fn pre_apply_add_node_upstream(
1063 &mut self,
1064 fragment_id: FragmentId,
1065 new_upstream_info: &PbUpstreamSinkInfo,
1066 ) {
1067 {
1068 {
1069 {
1070 {
1071 let (info, _) = self.fragment_mut(fragment_id);
1072 let mut injected = false;
1073 visit_stream_node_mut(&mut info.nodes, |node| {
1074 if let NodeBody::UpstreamSinkUnion(u) = node {
1075 if cfg!(debug_assertions) {
1076 let current_upstream_fragment_ids = u
1077 .init_upstreams
1078 .iter()
1079 .map(|upstream| upstream.upstream_fragment_id)
1080 .collect::<HashSet<_>>();
1081 if current_upstream_fragment_ids
1082 .contains(&new_upstream_info.upstream_fragment_id)
1083 {
1084 panic!(
1085 "duplicate upstream fragment: {:?} {:?}",
1086 u, new_upstream_info
1087 );
1088 }
1089 }
1090 u.init_upstreams.push(new_upstream_info.clone());
1091 injected = true;
1092 }
1093 });
1094 assert!(injected, "should inject upstream into UpstreamSinkUnion");
1095 }
1096 }
1097 }
1098 }
1099 }
1100
1101 pub(crate) fn pre_apply_drop_node_upstream(
1103 &mut self,
1104 fragment_id: FragmentId,
1105 drop_upstream_fragment_ids: &[FragmentId],
1106 ) {
1107 if !self.fragment_location.contains_key(&fragment_id) {
1108 warn!(
1109 target_fragment_id = %fragment_id,
1110 drop_upstream_fragment_ids = ?drop_upstream_fragment_ids,
1111 "skip dropping upstream sink fragments for non-existing target fragment"
1112 );
1113 return;
1114 }
1115 {
1116 {
1117 {
1118 {
1119 let (info, _) = self.fragment_mut(fragment_id);
1120 let mut removed = false;
1121 visit_stream_node_mut(&mut info.nodes, |node| {
1122 if let NodeBody::UpstreamSinkUnion(u) = node {
1123 if cfg!(debug_assertions) {
1124 let current_upstream_fragment_ids = u
1125 .init_upstreams
1126 .iter()
1127 .map(|upstream| upstream.upstream_fragment_id)
1128 .collect::<HashSet<FragmentId>>();
1129 for drop_fragment_id in drop_upstream_fragment_ids {
1130 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1131 {
1132 panic!(
1133 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1134 u, drop_upstream_fragment_ids, drop_fragment_id
1135 );
1136 }
1137 }
1138 }
1139 u.init_upstreams.retain(|upstream| {
1140 !drop_upstream_fragment_ids
1141 .contains(&upstream.upstream_fragment_id)
1142 });
1143 removed = true;
1144 }
1145 });
1146 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1147 }
1148 }
1149 }
1150 }
1151 }
1152
1153 pub(crate) fn pre_apply_throttle(&mut self, fragment_id: FragmentId, config: &ThrottleConfig) {
1156 if !self.fragment_location.contains_key(&fragment_id) {
1159 return;
1160 }
1161 let throttle_type = config.throttle_type();
1162 let rate_limit = config.rate_limit;
1163 let (info, _) = self.fragment_mut(fragment_id);
1164
1165 visit_stream_node_mut(&mut info.nodes, |node| match throttle_type {
1166 ThrottleType::Source => {
1167 if let NodeBody::Source(node) = node
1168 && let Some(node_inner) = &mut node.source_inner
1169 {
1170 node_inner.rate_limit = rate_limit;
1171 }
1172 if let NodeBody::StreamFsFetch(node) = node
1173 && let Some(node_inner) = &mut node.node_inner
1174 {
1175 node_inner.rate_limit = rate_limit;
1176 }
1177 }
1178 ThrottleType::Backfill => match node {
1179 NodeBody::StreamCdcScan(node) => node.rate_limit = rate_limit,
1180 NodeBody::StreamScan(node) => node.rate_limit = rate_limit,
1181 NodeBody::SourceBackfill(node) => node.rate_limit = rate_limit,
1182 _ => {}
1183 },
1184 ThrottleType::Sink => {
1185 if let NodeBody::Sink(node) = node {
1186 node.rate_limit = rate_limit;
1187 }
1188 }
1189 ThrottleType::Dml => {
1190 if let NodeBody::Dml(node) = node {
1191 node.rate_limit = rate_limit;
1192 }
1193 }
1194 ThrottleType::Unspecified => {}
1195 });
1196 }
1197
1198 pub(crate) fn pre_apply_split_assignments(
1200 &mut self,
1201 assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1202 ) {
1203 {
1204 let shared_infos = self.shared_actor_infos.clone();
1205 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1206 {
1207 {
1208 for (fragment_id, actor_splits) in assignments {
1209 let (info, job_id) = self.fragment_mut(fragment_id);
1210 let actors = &mut info.actors;
1211 for (actor_id, splits) in actor_splits {
1212 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1213 }
1214 shared_actor_writer.upsert([(&*info, job_id)]);
1215 }
1216 }
1217 }
1218 shared_actor_writer.finish();
1219 }
1220 }
1221
1222 pub(super) fn build_edge(
1223 &self,
1224 info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1225 replace_job: Option<&ReplaceStreamJobPlan>,
1226 new_upstream_sink: Option<&UpstreamSinkInfo>,
1227 control_stream_manager: &ControlStreamManager,
1228 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1229 actor_location: &HashMap<ActorId, WorkerId>,
1230 ) -> FragmentEdgeBuildResult {
1231 let existing_fragment_ids = info
1239 .into_iter()
1240 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1241 .chain(replace_job.into_iter().flat_map(|replace_job| {
1242 replace_job
1243 .upstream_fragment_downstreams
1244 .keys()
1245 .filter(|fragment_id| {
1246 info.map(|(info, _)| {
1247 !info
1248 .stream_job_fragments
1249 .fragments
1250 .contains_key(*fragment_id)
1251 })
1252 .unwrap_or(true)
1253 })
1254 .chain(replace_job.replace_upstream.keys())
1255 }))
1256 .chain(
1257 new_upstream_sink
1258 .into_iter()
1259 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1260 )
1261 .cloned();
1262 let new_fragments = info
1264 .into_iter()
1265 .flat_map(|(info, is_snapshot_backfill)| {
1266 let partial_graph_id = to_partial_graph_id(
1267 self.database_id,
1268 is_snapshot_backfill.then_some(info.streaming_job.id()),
1269 );
1270 info.stream_job_fragments
1271 .fragments
1272 .values()
1273 .map(move |fragment| (partial_graph_id, fragment))
1274 })
1275 .chain(replace_job.into_iter().flat_map(|replace_job| {
1276 replace_job
1277 .new_fragments
1278 .fragments
1279 .values()
1280 .chain(
1281 replace_job
1282 .auto_refresh_schema_sinks
1283 .as_ref()
1284 .into_iter()
1285 .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1286 )
1287 .map(|fragment| {
1288 (
1289 to_partial_graph_id(self.database_id, None),
1291 fragment,
1292 )
1293 })
1294 }));
1295
1296 let mut builder = FragmentEdgeBuilder::new(
1297 existing_fragment_ids
1299 .map(|fragment_id| {
1300 (
1301 fragment_id,
1302 EdgeBuilderFragmentInfo::from_inflight(
1303 self.fragment(fragment_id),
1304 to_partial_graph_id(self.database_id, None),
1305 control_stream_manager,
1306 ),
1307 )
1308 })
1309 .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1311 (
1312 fragment.fragment_id,
1313 EdgeBuilderFragmentInfo::from_fragment(
1314 fragment,
1315 stream_actors,
1316 actor_location,
1317 partial_graph_id,
1318 control_stream_manager,
1319 ),
1320 )
1321 })),
1322 );
1323 if let Some((info, _)) = info {
1324 builder.add_relations(&info.upstream_fragment_downstreams);
1325 builder.add_relations(&info.stream_job_fragments.downstreams);
1326 }
1327 if let Some(replace_job) = replace_job {
1328 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1329 builder.add_relations(&replace_job.new_fragments.downstreams);
1330 }
1331 if let Some(new_upstream_sink) = new_upstream_sink {
1332 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1333 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1334 builder.add_edge(sink_fragment_id, new_sink_downstream);
1335 }
1336 if let Some(replace_job) = replace_job {
1337 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1338 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1339 fragment_replacement
1340 {
1341 builder.replace_upstream(
1342 *fragment_id,
1343 *original_upstream_fragment_id,
1344 *new_upstream_fragment_id,
1345 );
1346 }
1347 }
1348 }
1349 builder.build()
1350 }
1351
1352 pub(crate) fn post_apply_reschedules(
1354 &mut self,
1355 reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1356 ) {
1357 let inner = self.shared_actor_infos.clone();
1358 let mut shared_actor_writer = inner.start_writer(self.database_id);
1359 {
1360 {
1361 {
1362 for (fragment_id, to_remove) in reschedules {
1363 let job_id = self.fragment_location[&fragment_id];
1364 let info = self
1365 .jobs
1366 .get_mut(&job_id)
1367 .expect("should exist")
1368 .fragment_infos
1369 .get_mut(&fragment_id)
1370 .expect("should exist");
1371 for actor_id in to_remove {
1372 assert!(info.actors.remove(&actor_id).is_some());
1373 }
1374 shared_actor_writer.upsert([(&*info, job_id)]);
1375 }
1376 }
1377 }
1378 }
1379 shared_actor_writer.finish();
1380 }
1381
1382 pub(crate) fn post_apply_remove_fragments(
1384 &mut self,
1385 fragment_ids: impl IntoIterator<Item = FragmentId>,
1386 ) {
1387 let inner = self.shared_actor_infos.clone();
1388 let mut shared_actor_writer = inner.start_writer(self.database_id);
1389 {
1390 {
1391 {
1392 for fragment_id in fragment_ids {
1393 let job_id = self
1394 .fragment_location
1395 .remove(&fragment_id)
1396 .expect("should exist");
1397 let job = self.jobs.get_mut(&job_id).expect("should exist");
1398 let fragment = job
1399 .fragment_infos
1400 .remove(&fragment_id)
1401 .expect("should exist");
1402 shared_actor_writer.remove(&fragment);
1403 if job.fragment_infos.is_empty() {
1404 self.jobs.remove(&job_id).expect("should exist");
1405 }
1406 }
1407 }
1408 }
1409 }
1410 shared_actor_writer.finish();
1411 }
1412
1413 pub(crate) fn post_apply_remove_job(
1414 &mut self,
1415 job_id: JobId,
1416 ) -> Option<InflightStreamingJobInfo> {
1417 let job = self.jobs.remove(&job_id)?;
1418 let inner = self.shared_actor_infos.clone();
1419 let mut shared_actor_writer = inner.start_writer(self.database_id);
1420 for (fragment_id, fragment) in &job.fragment_infos {
1421 self.fragment_location
1422 .remove(fragment_id)
1423 .expect("should exist");
1424 shared_actor_writer.remove(fragment);
1425 }
1426 shared_actor_writer.finish();
1427 Some(job)
1428 }
1429}
1430
1431impl InflightFragmentInfo {
1432 pub(crate) fn actor_ids_to_collect(
1434 infos: impl IntoIterator<Item = &Self>,
1435 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1436 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1437 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1438 assert!(
1439 ret.entry(actor.worker_id)
1440 .or_default()
1441 .insert(*actor_id as _)
1442 )
1443 }
1444 ret
1445 }
1446
1447 pub fn existing_table_ids<'a>(
1448 infos: impl IntoIterator<Item = &'a Self> + 'a,
1449 ) -> impl Iterator<Item = TableId> + 'a {
1450 infos
1451 .into_iter()
1452 .flat_map(|info| info.state_table_ids.iter().cloned())
1453 }
1454
1455 pub fn workers<'a>(
1456 infos: impl IntoIterator<Item = &'a Self> + 'a,
1457 ) -> impl Iterator<Item = WorkerId> + 'a {
1458 infos
1459 .into_iter()
1460 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1461 }
1462
1463 pub fn contains_worker<'a>(
1464 infos: impl IntoIterator<Item = &'a Self> + 'a,
1465 worker_id: WorkerId,
1466 ) -> bool {
1467 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1468 }
1469}
1470
1471impl InflightDatabaseInfo {
1472 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1473 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1474 }
1475
1476 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1477 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1478 }
1479}