1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
46use crate::barrier::{BackfillProgress, BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::rebuild_fragment_mapping;
49use crate::manager::NotificationManagerRef;
50use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
51
52#[derive(Debug, Clone)]
53pub struct SharedActorInfo {
54 pub worker_id: WorkerId,
55 pub vnode_bitmap: Option<Bitmap>,
56 pub splits: Vec<SplitImpl>,
57}
58
59impl From<&InflightActorInfo> for SharedActorInfo {
60 fn from(value: &InflightActorInfo) -> Self {
61 Self {
62 worker_id: value.worker_id,
63 vnode_bitmap: value.vnode_bitmap.clone(),
64 splits: value.splits.clone(),
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct SharedFragmentInfo {
71 pub fragment_id: FragmentId,
72 pub job_id: JobId,
73 pub distribution_type: DistributionType,
74 pub actors: HashMap<ActorId, SharedActorInfo>,
75 pub vnode_count: usize,
76 pub fragment_type_mask: FragmentTypeMask,
77}
78
79impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
80 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
81 let (info, job_id) = pair;
82
83 let InflightFragmentInfo {
84 fragment_id,
85 distribution_type,
86 fragment_type_mask,
87 actors,
88 vnode_count,
89 ..
90 } = info;
91
92 Self {
93 fragment_id: *fragment_id,
94 job_id,
95 distribution_type: *distribution_type,
96 fragment_type_mask: *fragment_type_mask,
97 actors: actors
98 .iter()
99 .map(|(actor_id, actor)| (*actor_id, actor.into()))
100 .collect(),
101 vnode_count: *vnode_count,
102 }
103 }
104}
105
106#[derive(Default, Debug)]
107pub struct SharedActorInfosInner {
108 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
109}
110
111impl SharedActorInfosInner {
112 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
113 self.info
114 .values()
115 .find_map(|database| database.get(&fragment_id))
116 }
117
118 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
119 self.info.values().flatten()
120 }
121}
122
123#[derive(Clone, educe::Educe)]
124#[educe(Debug)]
125pub struct SharedActorInfos {
126 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
127 #[educe(Debug(ignore))]
128 notification_manager: NotificationManagerRef,
129}
130
131impl SharedActorInfos {
132 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
133 self.inner.read()
134 }
135
136 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
137 let core = self.inner.read();
138 core.iter_over_fragments()
139 .flat_map(|(_, fragment)| {
140 fragment
141 .actors
142 .iter()
143 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
144 })
145 .collect()
146 }
147
148 pub fn migrate_splits_for_source_actors(
154 &self,
155 fragment_id: FragmentId,
156 prev_actor_ids: &[ActorId],
157 curr_actor_ids: &[ActorId],
158 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
159 let guard = self.read_guard();
160
161 let prev_splits = prev_actor_ids
162 .iter()
163 .flat_map(|actor_id| {
164 guard
166 .get_fragment(fragment_id)
167 .and_then(|info| info.actors.get(actor_id))
168 .map(|actor| actor.splits.clone())
169 .unwrap_or_default()
170 })
171 .map(|split| (split.id(), split))
172 .collect();
173
174 let empty_actor_splits = curr_actor_ids
175 .iter()
176 .map(|actor_id| (*actor_id, vec![]))
177 .collect();
178
179 let diff = crate::stream::source_manager::reassign_splits(
180 fragment_id,
181 empty_actor_splits,
182 &prev_splits,
183 std::default::Default::default(),
185 )
186 .unwrap_or_default();
187
188 Ok(diff)
189 }
190}
191
192impl SharedActorInfos {
193 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
194 Self {
195 inner: Arc::new(Default::default()),
196 notification_manager,
197 }
198 }
199
200 pub(super) fn remove_database(&self, database_id: DatabaseId) {
201 if let Some(database) = self.inner.write().info.remove(&database_id) {
202 let mapping = database
203 .into_values()
204 .map(|fragment| rebuild_fragment_mapping(&fragment))
205 .collect_vec();
206 if !mapping.is_empty() {
207 self.notification_manager
208 .notify_fragment_mapping(Operation::Delete, mapping);
209 }
210 }
211 }
212
213 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
214 let database_ids: HashSet<_> = database_ids.into_iter().collect();
215
216 let mut mapping = Vec::new();
217 for fragment in self
218 .inner
219 .write()
220 .info
221 .extract_if(|database_id, _| !database_ids.contains(database_id))
222 .flat_map(|(_, fragments)| fragments.into_values())
223 {
224 mapping.push(rebuild_fragment_mapping(&fragment));
225 }
226 if !mapping.is_empty() {
227 self.notification_manager
228 .notify_fragment_mapping(Operation::Delete, mapping);
229 }
230 }
231
232 pub(super) fn recover_database(
233 &self,
234 database_id: DatabaseId,
235 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
236 ) {
237 let mut remaining_fragments: HashMap<_, _> = fragments
238 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
239 .collect();
240 let mut writer = self.start_writer(database_id);
242 let database = writer.write_guard.info.entry(database_id).or_default();
243 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
244 if let Some(info) = remaining_fragments.remove(fragment_id) {
245 let info = info.into();
246 writer
247 .updated_fragment_mapping
248 .get_or_insert_default()
249 .push(rebuild_fragment_mapping(&info));
250 *fragment_infos = info;
251 false
252 } else {
253 true
254 }
255 }) {
256 writer
257 .deleted_fragment_mapping
258 .get_or_insert_default()
259 .push(rebuild_fragment_mapping(&fragment));
260 }
261 for (fragment_id, info) in remaining_fragments {
262 let info = info.into();
263 writer
264 .added_fragment_mapping
265 .get_or_insert_default()
266 .push(rebuild_fragment_mapping(&info));
267 database.insert(fragment_id, info);
268 }
269 writer.finish();
270 }
271
272 pub(super) fn upsert(
273 &self,
274 database_id: DatabaseId,
275 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
276 ) {
277 let mut writer = self.start_writer(database_id);
278 writer.upsert(infos);
279 writer.finish();
280 }
281
282 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
283 SharedActorInfoWriter {
284 database_id,
285 write_guard: self.inner.write(),
286 notification_manager: &self.notification_manager,
287 added_fragment_mapping: None,
288 updated_fragment_mapping: None,
289 deleted_fragment_mapping: None,
290 }
291 }
292}
293
294pub(super) struct SharedActorInfoWriter<'a> {
295 database_id: DatabaseId,
296 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
297 notification_manager: &'a NotificationManagerRef,
298 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
300 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
301}
302
303impl SharedActorInfoWriter<'_> {
304 pub(super) fn upsert(
305 &mut self,
306 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
307 ) {
308 let database = self.write_guard.info.entry(self.database_id).or_default();
309 for info @ (fragment, _) in infos {
310 match database.entry(fragment.fragment_id) {
311 Entry::Occupied(mut entry) => {
312 let info = info.into();
313 self.updated_fragment_mapping
314 .get_or_insert_default()
315 .push(rebuild_fragment_mapping(&info));
316 entry.insert(info);
317 }
318 Entry::Vacant(entry) => {
319 let info = info.into();
320 self.added_fragment_mapping
321 .get_or_insert_default()
322 .push(rebuild_fragment_mapping(&info));
323 entry.insert(info);
324 }
325 }
326 }
327 }
328
329 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
330 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
331 && let Some(fragment) = database.remove(&info.fragment_id)
332 {
333 self.deleted_fragment_mapping
334 .get_or_insert_default()
335 .push(rebuild_fragment_mapping(&fragment));
336 }
337 }
338
339 pub(super) fn finish(self) {
340 if let Some(mapping) = self.added_fragment_mapping {
341 self.notification_manager
342 .notify_fragment_mapping(Operation::Add, mapping);
343 }
344 if let Some(mapping) = self.updated_fragment_mapping {
345 self.notification_manager
346 .notify_fragment_mapping(Operation::Update, mapping);
347 }
348 if let Some(mapping) = self.deleted_fragment_mapping {
349 self.notification_manager
350 .notify_fragment_mapping(Operation::Delete, mapping);
351 }
352 }
353}
354
355#[derive(Debug, Clone)]
356pub(super) struct BarrierInfo {
357 pub prev_epoch: TracedEpoch,
358 pub curr_epoch: TracedEpoch,
359 pub kind: BarrierKind,
360}
361
362impl BarrierInfo {
363 pub(super) fn prev_epoch(&self) -> u64 {
364 self.prev_epoch.value().0
365 }
366
367 pub(super) fn curr_epoch(&self) -> u64 {
368 self.curr_epoch.value().0
369 }
370}
371
372#[derive(Debug)]
373pub(super) enum CommandFragmentChanges {
374 NewFragment {
375 job_id: JobId,
376 info: InflightFragmentInfo,
377 },
378 AddNodeUpstream(PbUpstreamSinkInfo),
379 DropNodeUpstream(Vec<FragmentId>),
380 ReplaceNodeUpstream(
381 HashMap<FragmentId, FragmentId>,
383 ),
384 Reschedule {
385 new_actors: HashMap<ActorId, InflightActorInfo>,
386 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
387 to_remove: HashSet<ActorId>,
388 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
389 },
390 RemoveFragment,
391 SplitAssignment {
392 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393 },
394}
395
396pub(super) enum PostApplyFragmentChanges {
397 Reschedule { to_remove: HashSet<ActorId> },
398 RemoveFragment,
399}
400
401#[derive(Clone, Debug)]
402pub enum SubscriberType {
403 Subscription(u64),
404 SnapshotBackfill,
405}
406
407#[derive(Debug)]
408pub(super) enum CreateStreamingJobStatus {
409 Init,
410 Creating { tracker: CreateMviewProgressTracker },
411 Created,
412}
413
414#[derive(Debug)]
415pub(super) struct InflightStreamingJobInfo {
416 pub job_id: JobId,
417 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
418 pub subscribers: HashMap<SubscriberId, SubscriberType>,
419 pub status: CreateStreamingJobStatus,
420 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
421}
422
423impl InflightStreamingJobInfo {
424 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
425 self.fragment_infos.values()
426 }
427
428 pub fn snapshot_backfill_actor_ids(
429 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430 ) -> impl Iterator<Item = ActorId> + '_ {
431 fragment_infos
432 .values()
433 .filter(|fragment| {
434 fragment
435 .fragment_type_mask
436 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
437 })
438 .flat_map(|fragment| fragment.actors.keys().copied())
439 }
440
441 pub fn tracking_progress_actor_ids(
442 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
443 ) -> Vec<(ActorId, BackfillUpstreamType)> {
444 StreamJobFragments::tracking_progress_actor_ids_impl(
445 fragment_infos
446 .values()
447 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
448 )
449 }
450}
451
452impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
453 type Item = &'a InflightFragmentInfo;
454
455 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
456
457 fn into_iter(self) -> Self::IntoIter {
458 self.fragment_infos()
459 }
460}
461
462#[derive(Debug)]
463pub struct InflightDatabaseInfo {
464 pub(super) database_id: DatabaseId,
465 jobs: HashMap<JobId, InflightStreamingJobInfo>,
466 fragment_location: HashMap<FragmentId, JobId>,
467 pub(super) shared_actor_infos: SharedActorInfos,
468}
469
470impl InflightDatabaseInfo {
471 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
472 self.jobs.values().flat_map(|job| job.fragment_infos())
473 }
474
475 pub fn contains_job(&self, job_id: JobId) -> bool {
476 self.jobs.contains_key(&job_id)
477 }
478
479 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
480 let job_id = self.fragment_location[&fragment_id];
481 self.jobs
482 .get(&job_id)
483 .expect("should exist")
484 .fragment_infos
485 .get(&fragment_id)
486 .expect("should exist")
487 }
488
489 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
490 self.jobs
491 .iter()
492 .filter_map(|(job_id, job)| match &job.status {
493 CreateStreamingJobStatus::Init => None,
494 CreateStreamingJobStatus::Creating { tracker } => {
495 let progress = tracker.gen_backfill_progress();
496 Some((
497 *job_id,
498 BackfillProgress {
499 progress,
500 backfill_type: PbBackfillType::NormalBackfill,
501 },
502 ))
503 }
504 CreateStreamingJobStatus::Created => None,
505 })
506 }
507
508 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
509 self.jobs.iter().filter_map(|(job_id, job)| {
510 job.cdc_table_backfill_tracker
511 .as_ref()
512 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
513 })
514 }
515
516 pub(super) fn may_assign_fragment_cdc_backfill_splits(
517 &mut self,
518 fragment_id: FragmentId,
519 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
520 let job_id = self.fragment_location[&fragment_id];
521 let job = self.jobs.get_mut(&job_id).expect("should exist");
522 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
523 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
524 if cdc_scan_fragment_id != fragment_id {
525 return Ok(None);
526 }
527 let actors = job.fragment_infos[&cdc_scan_fragment_id]
528 .actors
529 .keys()
530 .copied()
531 .collect();
532 tracker.reassign_splits(actors).map(Some)
533 } else {
534 Ok(None)
535 }
536 }
537
538 pub(super) fn assign_cdc_backfill_splits(
539 &mut self,
540 job_id: JobId,
541 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
542 let job = self.jobs.get_mut(&job_id).expect("should exist");
543 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
544 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
545 let actors = job.fragment_infos[&cdc_scan_fragment_id]
546 .actors
547 .keys()
548 .copied()
549 .collect();
550 tracker.reassign_splits(actors).map(Some)
551 } else {
552 Ok(None)
553 }
554 }
555
556 pub(super) fn apply_collected_command(
557 &mut self,
558 command: Option<&Command>,
559 resps: &[BarrierCompleteResponse],
560 version_stats: &HummockVersionStats,
561 ) {
562 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
563 match job_type {
564 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
565 let job_id = info.streaming_job.id();
566 if let Some(job_info) = self.jobs.get_mut(&job_id) {
567 let CreateStreamingJobStatus::Init = replace(
568 &mut job_info.status,
569 CreateStreamingJobStatus::Creating {
570 tracker: CreateMviewProgressTracker::new(info, version_stats),
571 },
572 ) else {
573 unreachable!("should be init before collect the first barrier")
574 };
575 } else {
576 info!(%job_id, "newly create job get cancelled before first barrier is collected")
577 }
578 }
579 CreateStreamingJobType::SnapshotBackfill(_) => {
580 }
582 }
583 }
584 if let Some(Command::RescheduleFragment { reschedules, .. }) = command {
585 debug_assert!(
587 reschedules
588 .values()
589 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
590 "RescheduleFragment should not carry vnode bitmap updates when actors are rebuilt"
591 );
592
593 let related_job_ids = reschedules
595 .keys()
596 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
597 .cloned()
598 .collect::<HashSet<_>>();
599 for job_id in related_job_ids {
600 if let Some(job) = self.jobs.get_mut(&job_id)
601 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
602 {
603 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
604 }
605 }
606 }
607 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
608 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
609 warn!(
610 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
611 );
612 continue;
613 };
614 let CreateStreamingJobStatus::Creating { tracker, .. } =
615 &mut self.jobs.get_mut(job_id).expect("should exist").status
616 else {
617 warn!("update the progress of an created streaming job: {progress:?}");
618 continue;
619 };
620 tracker.apply_progress(progress, version_stats);
621 }
622 for progress in resps
623 .iter()
624 .flat_map(|resp| &resp.cdc_table_backfill_progress)
625 {
626 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
627 warn!(
628 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
629 );
630 continue;
631 };
632 let Some(tracker) = &mut self
633 .jobs
634 .get_mut(job_id)
635 .expect("should exist")
636 .cdc_table_backfill_tracker
637 else {
638 warn!("update the progress of an created streaming job: {progress:?}");
639 continue;
640 };
641 tracker.update_split_progress(progress);
642 }
643 for cdc_offset_updated in resps
645 .iter()
646 .flat_map(|resp| &resp.cdc_source_offset_updated)
647 {
648 use risingwave_common::id::SourceId;
649 let source_id = SourceId::new(cdc_offset_updated.source_id);
650 let job_id = source_id.as_share_source_job_id();
651 if let Some(job) = self.jobs.get_mut(&job_id) {
652 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
653 tracker.mark_cdc_source_finished();
654 }
655 } else {
656 warn!(
657 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
658 cdc_offset_updated.source_id, job_id
659 );
660 }
661 }
662 }
663
664 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
665 self.jobs.values().filter_map(|job| match &job.status {
666 CreateStreamingJobStatus::Init => None,
667 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
668 CreateStreamingJobStatus::Created => None,
669 })
670 }
671
672 fn iter_mut_creating_job_tracker(
673 &mut self,
674 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
675 self.jobs
676 .values_mut()
677 .filter_map(|job| match &mut job.status {
678 CreateStreamingJobStatus::Init => None,
679 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
680 CreateStreamingJobStatus::Created => None,
681 })
682 }
683
684 pub(super) fn has_pending_finished_jobs(&self) -> bool {
685 self.iter_creating_job_tracker()
686 .any(|tracker| tracker.is_finished())
687 }
688
689 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
690 self.iter_mut_creating_job_tracker()
691 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
692 .collect()
693 }
694
695 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
696 let mut finished_jobs = vec![];
697 let mut table_ids_to_truncate = vec![];
698 let mut finished_cdc_table_backfill = vec![];
699 for (job_id, job) in &mut self.jobs {
700 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
701 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
702 table_ids_to_truncate.extend(truncate_table_ids);
703 if is_finished {
704 let CreateStreamingJobStatus::Creating { tracker, .. } =
705 replace(&mut job.status, CreateStreamingJobStatus::Created)
706 else {
707 unreachable!()
708 };
709 finished_jobs.push(tracker.into_tracking_job());
710 }
711 }
712 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
713 && tracker.take_pre_completed()
714 {
715 finished_cdc_table_backfill.push(*job_id);
716 }
717 }
718 StagingCommitInfo {
719 finished_jobs,
720 table_ids_to_truncate,
721 finished_cdc_table_backfill,
722 }
723 }
724
725 pub fn fragment_subscribers(
726 &self,
727 fragment_id: FragmentId,
728 ) -> impl Iterator<Item = SubscriberId> + '_ {
729 let job_id = self.fragment_location[&fragment_id];
730 self.jobs[&job_id].subscribers.keys().copied()
731 }
732
733 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
734 self.jobs[&job_id].subscribers.keys().copied()
735 }
736
737 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
738 self.jobs
739 .iter()
740 .filter_map(|(job_id, info)| {
741 info.subscribers
742 .values()
743 .filter_map(|subscriber| match subscriber {
744 SubscriberType::Subscription(retention) => Some(*retention),
745 SubscriberType::SnapshotBackfill => None,
746 })
747 .max()
748 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
749 })
750 .collect()
751 }
752
753 pub fn register_subscriber(
754 &mut self,
755 job_id: JobId,
756 subscriber_id: SubscriberId,
757 subscriber: SubscriberType,
758 ) {
759 self.jobs
760 .get_mut(&job_id)
761 .expect("should exist")
762 .subscribers
763 .try_insert(subscriber_id, subscriber)
764 .expect("non duplicate");
765 }
766
767 pub fn unregister_subscriber(
768 &mut self,
769 job_id: JobId,
770 subscriber_id: SubscriberId,
771 ) -> Option<SubscriberType> {
772 self.jobs
773 .get_mut(&job_id)
774 .expect("should exist")
775 .subscribers
776 .remove(&subscriber_id)
777 }
778
779 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
780 let job_id = self.fragment_location[&fragment_id];
781 let fragment = self
782 .jobs
783 .get_mut(&job_id)
784 .expect("should exist")
785 .fragment_infos
786 .get_mut(&fragment_id)
787 .expect("should exist");
788 (fragment, job_id)
789 }
790
791 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
792 Self {
793 database_id,
794 jobs: Default::default(),
795 fragment_location: Default::default(),
796 shared_actor_infos,
797 }
798 }
799
800 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
801 shared_actor_infos.remove_database(database_id);
803 Self::empty_inner(database_id, shared_actor_infos)
804 }
805
806 pub fn recover(
807 database_id: DatabaseId,
808 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
809 shared_actor_infos: SharedActorInfos,
810 ) -> Self {
811 let mut info = Self::empty_inner(database_id, shared_actor_infos);
812 for job in jobs {
813 info.add_existing(job);
814 }
815 info
816 }
817
818 pub fn is_empty(&self) -> bool {
819 self.jobs.is_empty()
820 }
821
822 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
823 let InflightStreamingJobInfo {
824 job_id,
825 fragment_infos,
826 subscribers,
827 status,
828 cdc_table_backfill_tracker,
829 } = job;
830 self.jobs
831 .try_insert(
832 job.job_id,
833 InflightStreamingJobInfo {
834 job_id,
835 subscribers,
836 fragment_infos: Default::default(), status,
838 cdc_table_backfill_tracker,
839 },
840 )
841 .expect("non-duplicate");
842 let post_apply_changes =
843 self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
844 (
845 fragment_id,
846 CommandFragmentChanges::NewFragment {
847 job_id: job.job_id,
848 info,
849 },
850 )
851 }));
852 self.post_apply(post_apply_changes);
853 }
854
855 pub(crate) fn pre_apply(
858 &mut self,
859 new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
860 fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
861 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
862 if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
863 self.jobs
864 .try_insert(
865 job_id,
866 InflightStreamingJobInfo {
867 job_id,
868 fragment_infos: Default::default(),
869 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
871 cdc_table_backfill_tracker,
872 },
873 )
874 .expect("non-duplicate");
875 }
876 self.apply_add(fragment_changes.into_iter())
877 }
878
879 fn apply_add(
880 &mut self,
881 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
882 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
883 let mut post_apply = HashMap::new();
884 {
885 let shared_infos = self.shared_actor_infos.clone();
886 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
887 for (fragment_id, change) in fragment_changes {
888 match change {
889 CommandFragmentChanges::NewFragment { job_id, info } => {
890 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
891 shared_actor_writer.upsert([(&info, job_id)]);
892 fragment_infos
893 .fragment_infos
894 .try_insert(fragment_id, info)
895 .expect("non duplicate");
896 self.fragment_location
897 .try_insert(fragment_id, job_id)
898 .expect("non duplicate");
899 }
900 CommandFragmentChanges::Reschedule {
901 new_actors,
902 actor_update_vnode_bitmap,
903 to_remove,
904 actor_splits,
905 } => {
906 let (info, _) = self.fragment_mut(fragment_id);
907 let actors = &mut info.actors;
908 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
909 actors
910 .get_mut(&actor_id)
911 .expect("should exist")
912 .vnode_bitmap = Some(new_vnodes);
913 }
914 for (actor_id, actor) in new_actors {
915 actors
916 .try_insert(actor_id as _, actor)
917 .expect("non-duplicate");
918 }
919 for (actor_id, splits) in actor_splits {
920 actors.get_mut(&actor_id).expect("should exist").splits = splits;
921 }
922
923 post_apply.insert(
924 fragment_id,
925 PostApplyFragmentChanges::Reschedule { to_remove },
926 );
927
928 }
930 CommandFragmentChanges::RemoveFragment => {
931 post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
932 }
933 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
934 let mut remaining_fragment_ids: HashSet<_> =
935 replace_map.keys().cloned().collect();
936 let (info, _) = self.fragment_mut(fragment_id);
937 visit_stream_node_mut(&mut info.nodes, |node| {
938 if let NodeBody::Merge(m) = node
939 && let Some(new_upstream_fragment_id) =
940 replace_map.get(&m.upstream_fragment_id)
941 {
942 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
943 if cfg!(debug_assertions) {
944 panic!(
945 "duplicate upstream fragment: {:?} {:?}",
946 m, replace_map
947 );
948 } else {
949 warn!(?m, ?replace_map, "duplicate upstream fragment");
950 }
951 }
952 m.upstream_fragment_id = *new_upstream_fragment_id;
953 }
954 });
955 if cfg!(debug_assertions) {
956 assert!(
957 remaining_fragment_ids.is_empty(),
958 "non-existing fragment to replace: {:?} {:?} {:?}",
959 remaining_fragment_ids,
960 info.nodes,
961 replace_map
962 );
963 } else {
964 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
965 }
966 }
967 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
968 let (info, _) = self.fragment_mut(fragment_id);
969 let mut injected = false;
970 visit_stream_node_mut(&mut info.nodes, |node| {
971 if let NodeBody::UpstreamSinkUnion(u) = node {
972 if cfg!(debug_assertions) {
973 let current_upstream_fragment_ids = u
974 .init_upstreams
975 .iter()
976 .map(|upstream| upstream.upstream_fragment_id)
977 .collect::<HashSet<_>>();
978 if current_upstream_fragment_ids
979 .contains(&new_upstream_info.upstream_fragment_id)
980 {
981 panic!(
982 "duplicate upstream fragment: {:?} {:?}",
983 u, new_upstream_info
984 );
985 }
986 }
987 u.init_upstreams.push(new_upstream_info.clone());
988 injected = true;
989 }
990 });
991 assert!(injected, "should inject upstream into UpstreamSinkUnion");
992 }
993 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
994 let (info, _) = self.fragment_mut(fragment_id);
995 let mut removed = false;
996 visit_stream_node_mut(&mut info.nodes, |node| {
997 if let NodeBody::UpstreamSinkUnion(u) = node {
998 if cfg!(debug_assertions) {
999 let current_upstream_fragment_ids = u
1000 .init_upstreams
1001 .iter()
1002 .map(|upstream| upstream.upstream_fragment_id)
1003 .collect::<HashSet<FragmentId>>();
1004 for drop_fragment_id in &drop_upstream_fragment_ids {
1005 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1006 {
1007 panic!(
1008 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1009 u, drop_upstream_fragment_ids, drop_fragment_id
1010 );
1011 }
1012 }
1013 }
1014 u.init_upstreams.retain(|upstream| {
1015 !drop_upstream_fragment_ids
1016 .contains(&upstream.upstream_fragment_id)
1017 });
1018 removed = true;
1019 }
1020 });
1021 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1022 }
1023 CommandFragmentChanges::SplitAssignment { actor_splits } => {
1024 let (info, job_id) = self.fragment_mut(fragment_id);
1025 let actors = &mut info.actors;
1026 for (actor_id, splits) in actor_splits {
1027 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1028 }
1029 shared_actor_writer.upsert([(&*info, job_id)]);
1030 }
1031 }
1032 }
1033 shared_actor_writer.finish();
1034 }
1035 post_apply
1036 }
1037
1038 pub(super) fn build_edge(
1039 &self,
1040 command: Option<&Command>,
1041 control_stream_manager: &ControlStreamManager,
1042 ) -> Option<FragmentEdgeBuildResult> {
1043 let (info, replace_job, new_upstream_sink) = match command {
1044 None => {
1045 return None;
1046 }
1047 Some(command) => match command {
1048 Command::Flush
1049 | Command::Pause
1050 | Command::Resume
1051 | Command::DropStreamingJobs { .. }
1052 | Command::RescheduleFragment { .. }
1053 | Command::SourceChangeSplit { .. }
1054 | Command::Throttle { .. }
1055 | Command::CreateSubscription { .. }
1056 | Command::DropSubscription { .. }
1057 | Command::ConnectorPropsChange(_)
1058 | Command::Refresh { .. }
1059 | Command::ListFinish { .. }
1060 | Command::LoadFinish { .. } => {
1061 return None;
1062 }
1063 Command::CreateStreamingJob { info, job_type, .. } => {
1064 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1065 new_upstream_sink,
1066 ) = job_type
1067 {
1068 Some(new_upstream_sink)
1069 } else {
1070 None
1071 };
1072 let is_snapshot_backfill =
1073 matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_));
1074 (Some((info, is_snapshot_backfill)), None, new_upstream_sink)
1075 }
1076 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1077 Command::ResetSource { .. } => (None, None, None),
1078 },
1079 };
1080 let existing_fragment_ids = info
1088 .into_iter()
1089 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1090 .chain(replace_job.into_iter().flat_map(|replace_job| {
1091 replace_job
1092 .upstream_fragment_downstreams
1093 .keys()
1094 .filter(|fragment_id| {
1095 info.map(|(info, _)| {
1096 !info
1097 .stream_job_fragments
1098 .fragments
1099 .contains_key(fragment_id)
1100 })
1101 .unwrap_or(true)
1102 })
1103 .chain(replace_job.replace_upstream.keys())
1104 }))
1105 .chain(
1106 new_upstream_sink
1107 .into_iter()
1108 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1109 )
1110 .cloned();
1111 let new_fragment_infos = info
1112 .into_iter()
1113 .flat_map(|(info, is_snapshot_backfill)| {
1114 let partial_graph_id = to_partial_graph_id(
1115 self.database_id,
1116 is_snapshot_backfill.then_some(info.streaming_job.id()),
1117 );
1118 info.stream_job_fragments
1119 .new_fragment_info(&info.init_split_assignment)
1120 .map(move |(fragment_id, info)| (fragment_id, info, partial_graph_id))
1121 })
1122 .chain(
1123 replace_job
1124 .into_iter()
1125 .flat_map(|replace_job| {
1126 replace_job
1127 .new_fragments
1128 .new_fragment_info(&replace_job.init_split_assignment)
1129 .chain(
1130 replace_job
1131 .auto_refresh_schema_sinks
1132 .as_ref()
1133 .into_iter()
1134 .flat_map(|sinks| {
1135 sinks.iter().map(|sink| {
1136 (
1137 sink.new_fragment.fragment_id,
1138 sink.new_fragment_info(),
1139 )
1140 })
1141 }),
1142 )
1143 })
1144 .map(|(fragment_id, fragment)| {
1145 (
1146 fragment_id,
1147 fragment,
1148 to_partial_graph_id(self.database_id, None),
1150 )
1151 }),
1152 )
1153 .collect_vec();
1154 let mut builder = FragmentEdgeBuilder::new(
1155 existing_fragment_ids
1156 .map(|fragment_id| {
1157 (
1158 self.fragment(fragment_id),
1159 to_partial_graph_id(self.database_id, None),
1160 )
1161 })
1162 .chain(
1163 new_fragment_infos
1164 .iter()
1165 .map(|(_, info, partial_graph_id)| (info, *partial_graph_id)),
1166 ),
1167 control_stream_manager,
1168 );
1169 if let Some((info, _)) = info {
1170 builder.add_relations(&info.upstream_fragment_downstreams);
1171 builder.add_relations(&info.stream_job_fragments.downstreams);
1172 }
1173 if let Some(replace_job) = replace_job {
1174 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1175 builder.add_relations(&replace_job.new_fragments.downstreams);
1176 }
1177 if let Some(new_upstream_sink) = new_upstream_sink {
1178 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1179 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1180 builder.add_edge(sink_fragment_id, new_sink_downstream);
1181 }
1182 if let Some(replace_job) = replace_job {
1183 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1184 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1185 fragment_replacement
1186 {
1187 builder.replace_upstream(
1188 *fragment_id,
1189 *original_upstream_fragment_id,
1190 *new_upstream_fragment_id,
1191 );
1192 }
1193 }
1194 }
1195 Some(builder.build())
1196 }
1197
1198 pub(crate) fn post_apply(
1201 &mut self,
1202 fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1203 ) {
1204 let inner = self.shared_actor_infos.clone();
1205 let mut shared_actor_writer = inner.start_writer(self.database_id);
1206 {
1207 for (fragment_id, changes) in fragment_changes {
1208 match changes {
1209 PostApplyFragmentChanges::Reschedule { to_remove } => {
1210 let job_id = self.fragment_location[&fragment_id];
1211 let info = self
1212 .jobs
1213 .get_mut(&job_id)
1214 .expect("should exist")
1215 .fragment_infos
1216 .get_mut(&fragment_id)
1217 .expect("should exist");
1218 for actor_id in to_remove {
1219 assert!(info.actors.remove(&actor_id).is_some());
1220 }
1221 shared_actor_writer.upsert([(&*info, job_id)]);
1222 }
1223 PostApplyFragmentChanges::RemoveFragment => {
1224 let job_id = self
1225 .fragment_location
1226 .remove(&fragment_id)
1227 .expect("should exist");
1228 let job = self.jobs.get_mut(&job_id).expect("should exist");
1229 let fragment = job
1230 .fragment_infos
1231 .remove(&fragment_id)
1232 .expect("should exist");
1233 shared_actor_writer.remove(&fragment);
1234 if job.fragment_infos.is_empty() {
1235 self.jobs.remove(&job_id).expect("should exist");
1236 }
1237 }
1238 }
1239 }
1240 }
1241 shared_actor_writer.finish();
1242 }
1243}
1244
1245impl InflightFragmentInfo {
1246 pub(crate) fn actor_ids_to_collect(
1248 infos: impl IntoIterator<Item = &Self>,
1249 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1250 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1251 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1252 assert!(
1253 ret.entry(actor.worker_id)
1254 .or_default()
1255 .insert(*actor_id as _)
1256 )
1257 }
1258 ret
1259 }
1260
1261 pub fn existing_table_ids<'a>(
1262 infos: impl IntoIterator<Item = &'a Self> + 'a,
1263 ) -> impl Iterator<Item = TableId> + 'a {
1264 infos
1265 .into_iter()
1266 .flat_map(|info| info.state_table_ids.iter().cloned())
1267 }
1268
1269 pub fn workers<'a>(
1270 infos: impl IntoIterator<Item = &'a Self> + 'a,
1271 ) -> impl Iterator<Item = WorkerId> + 'a {
1272 infos
1273 .into_iter()
1274 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1275 }
1276
1277 pub fn contains_worker<'a>(
1278 infos: impl IntoIterator<Item = &'a Self> + 'a,
1279 worker_id: WorkerId,
1280 ) -> bool {
1281 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1282 }
1283}
1284
1285impl InflightDatabaseInfo {
1286 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1287 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1288 }
1289
1290 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1291 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1292 }
1293}