1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::rebuild_fragment_mapping;
49use crate::manager::NotificationManagerRef;
50use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
51
52#[derive(Debug, Clone)]
53pub struct SharedActorInfo {
54 pub worker_id: WorkerId,
55 pub vnode_bitmap: Option<Bitmap>,
56 pub splits: Vec<SplitImpl>,
57}
58
59impl From<&InflightActorInfo> for SharedActorInfo {
60 fn from(value: &InflightActorInfo) -> Self {
61 Self {
62 worker_id: value.worker_id,
63 vnode_bitmap: value.vnode_bitmap.clone(),
64 splits: value.splits.clone(),
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct SharedFragmentInfo {
71 pub fragment_id: FragmentId,
72 pub job_id: JobId,
73 pub distribution_type: DistributionType,
74 pub actors: HashMap<ActorId, SharedActorInfo>,
75 pub vnode_count: usize,
76 pub fragment_type_mask: FragmentTypeMask,
77}
78
79impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
80 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
81 let (info, job_id) = pair;
82
83 let InflightFragmentInfo {
84 fragment_id,
85 distribution_type,
86 fragment_type_mask,
87 actors,
88 vnode_count,
89 ..
90 } = info;
91
92 Self {
93 fragment_id: *fragment_id,
94 job_id,
95 distribution_type: *distribution_type,
96 fragment_type_mask: *fragment_type_mask,
97 actors: actors
98 .iter()
99 .map(|(actor_id, actor)| (*actor_id, actor.into()))
100 .collect(),
101 vnode_count: *vnode_count,
102 }
103 }
104}
105
106#[derive(Default, Debug)]
107pub struct SharedActorInfosInner {
108 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
109}
110
111impl SharedActorInfosInner {
112 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
113 self.info
114 .values()
115 .find_map(|database| database.get(&fragment_id))
116 }
117
118 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
119 self.info.values().flatten()
120 }
121}
122
123#[derive(Clone, educe::Educe)]
124#[educe(Debug)]
125pub struct SharedActorInfos {
126 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
127 #[educe(Debug(ignore))]
128 notification_manager: NotificationManagerRef,
129}
130
131impl SharedActorInfos {
132 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
133 self.inner.read()
134 }
135
136 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
137 let core = self.inner.read();
138 core.iter_over_fragments()
139 .flat_map(|(_, fragment)| {
140 fragment
141 .actors
142 .iter()
143 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
144 })
145 .collect()
146 }
147
148 pub fn migrate_splits_for_source_actors(
154 &self,
155 fragment_id: FragmentId,
156 prev_actor_ids: &[ActorId],
157 curr_actor_ids: &[ActorId],
158 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
159 let guard = self.read_guard();
160
161 let prev_splits = prev_actor_ids
162 .iter()
163 .flat_map(|actor_id| {
164 guard
166 .get_fragment(fragment_id)
167 .and_then(|info| info.actors.get(actor_id))
168 .map(|actor| actor.splits.clone())
169 .unwrap_or_default()
170 })
171 .map(|split| (split.id(), split))
172 .collect();
173
174 let empty_actor_splits = curr_actor_ids
175 .iter()
176 .map(|actor_id| (*actor_id, vec![]))
177 .collect();
178
179 let diff = crate::stream::source_manager::reassign_splits(
180 fragment_id,
181 empty_actor_splits,
182 &prev_splits,
183 std::default::Default::default(),
185 )
186 .unwrap_or_default();
187
188 Ok(diff)
189 }
190}
191
192impl SharedActorInfos {
193 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
194 Self {
195 inner: Arc::new(Default::default()),
196 notification_manager,
197 }
198 }
199
200 pub(super) fn remove_database(&self, database_id: DatabaseId) {
201 if let Some(database) = self.inner.write().info.remove(&database_id) {
202 let mapping = database
203 .into_values()
204 .map(|fragment| rebuild_fragment_mapping(&fragment))
205 .collect_vec();
206 if !mapping.is_empty() {
207 self.notification_manager
208 .notify_fragment_mapping(Operation::Delete, mapping);
209 }
210 }
211 }
212
213 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
214 let database_ids: HashSet<_> = database_ids.into_iter().collect();
215
216 let mut mapping = Vec::new();
217 for fragment in self
218 .inner
219 .write()
220 .info
221 .extract_if(|database_id, _| !database_ids.contains(database_id))
222 .flat_map(|(_, fragments)| fragments.into_values())
223 {
224 mapping.push(rebuild_fragment_mapping(&fragment));
225 }
226 if !mapping.is_empty() {
227 self.notification_manager
228 .notify_fragment_mapping(Operation::Delete, mapping);
229 }
230 }
231
232 pub(super) fn recover_database(
233 &self,
234 database_id: DatabaseId,
235 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
236 ) {
237 let mut remaining_fragments: HashMap<_, _> = fragments
238 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
239 .collect();
240 let mut writer = self.start_writer(database_id);
242 let database = writer.write_guard.info.entry(database_id).or_default();
243 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
244 if let Some(info) = remaining_fragments.remove(fragment_id) {
245 let info = info.into();
246 writer
247 .updated_fragment_mapping
248 .get_or_insert_default()
249 .push(rebuild_fragment_mapping(&info));
250 *fragment_infos = info;
251 false
252 } else {
253 true
254 }
255 }) {
256 writer
257 .deleted_fragment_mapping
258 .get_or_insert_default()
259 .push(rebuild_fragment_mapping(&fragment));
260 }
261 for (fragment_id, info) in remaining_fragments {
262 let info = info.into();
263 writer
264 .added_fragment_mapping
265 .get_or_insert_default()
266 .push(rebuild_fragment_mapping(&info));
267 database.insert(fragment_id, info);
268 }
269 writer.finish();
270 }
271
272 pub(super) fn upsert(
273 &self,
274 database_id: DatabaseId,
275 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
276 ) {
277 let mut writer = self.start_writer(database_id);
278 writer.upsert(infos);
279 writer.finish();
280 }
281
282 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
283 SharedActorInfoWriter {
284 database_id,
285 write_guard: self.inner.write(),
286 notification_manager: &self.notification_manager,
287 added_fragment_mapping: None,
288 updated_fragment_mapping: None,
289 deleted_fragment_mapping: None,
290 }
291 }
292}
293
294pub(super) struct SharedActorInfoWriter<'a> {
295 database_id: DatabaseId,
296 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
297 notification_manager: &'a NotificationManagerRef,
298 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
300 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
301}
302
303impl SharedActorInfoWriter<'_> {
304 pub(super) fn upsert(
305 &mut self,
306 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
307 ) {
308 let database = self.write_guard.info.entry(self.database_id).or_default();
309 for info @ (fragment, _) in infos {
310 match database.entry(fragment.fragment_id) {
311 Entry::Occupied(mut entry) => {
312 let info = info.into();
313 self.updated_fragment_mapping
314 .get_or_insert_default()
315 .push(rebuild_fragment_mapping(&info));
316 entry.insert(info);
317 }
318 Entry::Vacant(entry) => {
319 let info = info.into();
320 self.added_fragment_mapping
321 .get_or_insert_default()
322 .push(rebuild_fragment_mapping(&info));
323 entry.insert(info);
324 }
325 }
326 }
327 }
328
329 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
330 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
331 && let Some(fragment) = database.remove(&info.fragment_id)
332 {
333 self.deleted_fragment_mapping
334 .get_or_insert_default()
335 .push(rebuild_fragment_mapping(&fragment));
336 }
337 }
338
339 pub(super) fn finish(self) {
340 if let Some(mapping) = self.added_fragment_mapping {
341 self.notification_manager
342 .notify_fragment_mapping(Operation::Add, mapping);
343 }
344 if let Some(mapping) = self.updated_fragment_mapping {
345 self.notification_manager
346 .notify_fragment_mapping(Operation::Update, mapping);
347 }
348 if let Some(mapping) = self.deleted_fragment_mapping {
349 self.notification_manager
350 .notify_fragment_mapping(Operation::Delete, mapping);
351 }
352 }
353}
354
355#[derive(Debug, Clone)]
356pub(super) struct BarrierInfo {
357 pub prev_epoch: TracedEpoch,
358 pub curr_epoch: TracedEpoch,
359 pub kind: BarrierKind,
360}
361
362impl BarrierInfo {
363 pub(super) fn prev_epoch(&self) -> u64 {
364 self.prev_epoch.value().0
365 }
366}
367
368#[derive(Debug)]
369pub(super) enum CommandFragmentChanges {
370 NewFragment {
371 job_id: JobId,
372 info: InflightFragmentInfo,
373 is_existing: bool,
377 },
378 AddNodeUpstream(PbUpstreamSinkInfo),
379 DropNodeUpstream(Vec<FragmentId>),
380 ReplaceNodeUpstream(
381 HashMap<FragmentId, FragmentId>,
383 ),
384 Reschedule {
385 new_actors: HashMap<ActorId, InflightActorInfo>,
386 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
387 to_remove: HashSet<ActorId>,
388 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
389 },
390 RemoveFragment,
391 SplitAssignment {
392 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393 },
394}
395
396pub(super) enum PostApplyFragmentChanges {
397 Reschedule { to_remove: HashSet<ActorId> },
398 RemoveFragment,
399}
400
401#[derive(Clone, Debug)]
402pub enum SubscriberType {
403 Subscription(u64),
404 SnapshotBackfill,
405}
406
407#[derive(Debug)]
408pub(super) enum CreateStreamingJobStatus {
409 Init,
410 Creating(CreateMviewProgressTracker),
411 Created,
412}
413
414#[derive(Debug)]
415pub(super) struct InflightStreamingJobInfo {
416 pub job_id: JobId,
417 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
418 pub subscribers: HashMap<SubscriberId, SubscriberType>,
419 pub status: CreateStreamingJobStatus,
420 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
421}
422
423impl InflightStreamingJobInfo {
424 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
425 self.fragment_infos.values()
426 }
427
428 pub fn snapshot_backfill_actor_ids(
429 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430 ) -> impl Iterator<Item = ActorId> + '_ {
431 fragment_infos
432 .values()
433 .filter(|fragment| {
434 fragment
435 .fragment_type_mask
436 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
437 })
438 .flat_map(|fragment| fragment.actors.keys().copied())
439 }
440
441 pub fn tracking_progress_actor_ids(
442 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
443 ) -> Vec<(ActorId, BackfillUpstreamType)> {
444 StreamJobFragments::tracking_progress_actor_ids_impl(
445 fragment_infos
446 .values()
447 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
448 )
449 }
450}
451
452impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
453 type Item = &'a InflightFragmentInfo;
454
455 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
456
457 fn into_iter(self) -> Self::IntoIter {
458 self.fragment_infos()
459 }
460}
461
462#[derive(Debug)]
463pub struct InflightDatabaseInfo {
464 database_id: DatabaseId,
465 jobs: HashMap<JobId, InflightStreamingJobInfo>,
466 fragment_location: HashMap<FragmentId, JobId>,
467 pub(super) shared_actor_infos: SharedActorInfos,
468}
469
470impl InflightDatabaseInfo {
471 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
472 self.jobs.values().flat_map(|job| job.fragment_infos())
473 }
474
475 pub fn contains_job(&self, job_id: JobId) -> bool {
476 self.jobs.contains_key(&job_id)
477 }
478
479 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
480 let job_id = self.fragment_location[&fragment_id];
481 self.jobs
482 .get(&job_id)
483 .expect("should exist")
484 .fragment_infos
485 .get(&fragment_id)
486 .expect("should exist")
487 }
488
489 pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
490 self.jobs
491 .iter()
492 .filter_map(|(job_id, job)| match &job.status {
493 CreateStreamingJobStatus::Init => None,
494 CreateStreamingJobStatus::Creating(tracker) => {
495 Some((*job_id, tracker.gen_ddl_progress()))
496 }
497 CreateStreamingJobStatus::Created => None,
498 })
499 }
500
501 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
502 self.jobs.iter().filter_map(|(job_id, job)| {
503 job.cdc_table_backfill_tracker
504 .as_ref()
505 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
506 })
507 }
508
509 pub(super) fn may_assign_fragment_cdc_backfill_splits(
510 &mut self,
511 fragment_id: FragmentId,
512 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
513 let job_id = self.fragment_location[&fragment_id];
514 let job = self.jobs.get_mut(&job_id).expect("should exist");
515 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
516 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
517 if cdc_scan_fragment_id != fragment_id {
518 return Ok(None);
519 }
520 let actors = job.fragment_infos[&cdc_scan_fragment_id]
521 .actors
522 .keys()
523 .copied()
524 .collect();
525 tracker.reassign_splits(actors).map(Some)
526 } else {
527 Ok(None)
528 }
529 }
530
531 pub(super) fn assign_cdc_backfill_splits(
532 &mut self,
533 job_id: JobId,
534 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
535 let job = self.jobs.get_mut(&job_id).expect("should exist");
536 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
537 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
538 let actors = job.fragment_infos[&cdc_scan_fragment_id]
539 .actors
540 .keys()
541 .copied()
542 .collect();
543 tracker.reassign_splits(actors).map(Some)
544 } else {
545 Ok(None)
546 }
547 }
548
549 pub(super) fn apply_collected_command(
550 &mut self,
551 command: Option<&Command>,
552 resps: &[BarrierCompleteResponse],
553 version_stats: &HummockVersionStats,
554 ) {
555 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
556 match job_type {
557 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
558 let job_id = info.streaming_job.id();
559 if let Some(job_info) = self.jobs.get_mut(&job_id) {
560 let CreateStreamingJobStatus::Init = replace(
561 &mut job_info.status,
562 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
563 info,
564 version_stats,
565 )),
566 ) else {
567 unreachable!("should be init before collect the first barrier")
568 };
569 } else {
570 info!(%job_id, "newly create job get cancelled before first barrier is collected")
571 }
572 }
573 CreateStreamingJobType::SnapshotBackfill(_) => {
574 }
576 }
577 }
578 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
579 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
580 warn!(
581 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
582 );
583 continue;
584 };
585 let CreateStreamingJobStatus::Creating(tracker) =
586 &mut self.jobs.get_mut(job_id).expect("should exist").status
587 else {
588 warn!("update the progress of an created streaming job: {progress:?}");
589 continue;
590 };
591 tracker.apply_progress(progress, version_stats);
592 }
593 for progress in resps
594 .iter()
595 .flat_map(|resp| &resp.cdc_table_backfill_progress)
596 {
597 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
598 warn!(
599 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
600 );
601 continue;
602 };
603 let Some(tracker) = &mut self
604 .jobs
605 .get_mut(job_id)
606 .expect("should exist")
607 .cdc_table_backfill_tracker
608 else {
609 warn!("update the progress of an created streaming job: {progress:?}");
610 continue;
611 };
612 tracker.update_split_progress(progress);
613 }
614 }
615
616 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
617 self.jobs.values().filter_map(|job| match &job.status {
618 CreateStreamingJobStatus::Init => None,
619 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
620 CreateStreamingJobStatus::Created => None,
621 })
622 }
623
624 fn iter_mut_creating_job_tracker(
625 &mut self,
626 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
627 self.jobs
628 .values_mut()
629 .filter_map(|job| match &mut job.status {
630 CreateStreamingJobStatus::Init => None,
631 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
632 CreateStreamingJobStatus::Created => None,
633 })
634 }
635
636 pub(super) fn has_pending_finished_jobs(&self) -> bool {
637 self.iter_creating_job_tracker()
638 .any(|tracker| tracker.is_finished())
639 }
640
641 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
642 self.iter_mut_creating_job_tracker()
643 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
644 .collect()
645 }
646
647 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
648 let mut finished_jobs = vec![];
649 let mut table_ids_to_truncate = vec![];
650 let mut finished_cdc_table_backfill = vec![];
651 for (job_id, job) in &mut self.jobs {
652 if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
653 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
654 table_ids_to_truncate.extend(truncate_table_ids);
655 if is_finished {
656 let CreateStreamingJobStatus::Creating(tracker) =
657 replace(&mut job.status, CreateStreamingJobStatus::Created)
658 else {
659 unreachable!()
660 };
661 finished_jobs.push(tracker.into_tracking_job());
662 }
663 }
664 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
665 && tracker.take_pre_completed()
666 {
667 finished_cdc_table_backfill.push(*job_id);
668 }
669 }
670 StagingCommitInfo {
671 finished_jobs,
672 table_ids_to_truncate,
673 finished_cdc_table_backfill,
674 }
675 }
676
677 pub fn fragment_subscribers(
678 &self,
679 fragment_id: FragmentId,
680 ) -> impl Iterator<Item = SubscriberId> + '_ {
681 let job_id = self.fragment_location[&fragment_id];
682 self.jobs[&job_id].subscribers.keys().copied()
683 }
684
685 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
686 self.jobs[&job_id].subscribers.keys().copied()
687 }
688
689 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
690 self.jobs
691 .iter()
692 .filter_map(|(job_id, info)| {
693 info.subscribers
694 .values()
695 .filter_map(|subscriber| match subscriber {
696 SubscriberType::Subscription(retention) => Some(*retention),
697 SubscriberType::SnapshotBackfill => None,
698 })
699 .max()
700 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
701 })
702 .collect()
703 }
704
705 pub fn register_subscriber(
706 &mut self,
707 job_id: JobId,
708 subscriber_id: SubscriberId,
709 subscriber: SubscriberType,
710 ) {
711 self.jobs
712 .get_mut(&job_id)
713 .expect("should exist")
714 .subscribers
715 .try_insert(subscriber_id, subscriber)
716 .expect("non duplicate");
717 }
718
719 pub fn unregister_subscriber(
720 &mut self,
721 job_id: JobId,
722 subscriber_id: SubscriberId,
723 ) -> Option<SubscriberType> {
724 self.jobs
725 .get_mut(&job_id)
726 .expect("should exist")
727 .subscribers
728 .remove(&subscriber_id)
729 }
730
731 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
732 let job_id = self.fragment_location[&fragment_id];
733 let fragment = self
734 .jobs
735 .get_mut(&job_id)
736 .expect("should exist")
737 .fragment_infos
738 .get_mut(&fragment_id)
739 .expect("should exist");
740 (fragment, job_id)
741 }
742
743 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
744 Self {
745 database_id,
746 jobs: Default::default(),
747 fragment_location: Default::default(),
748 shared_actor_infos,
749 }
750 }
751
752 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
753 shared_actor_infos.remove_database(database_id);
755 Self::empty_inner(database_id, shared_actor_infos)
756 }
757
758 pub fn recover(
759 database_id: DatabaseId,
760 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
761 shared_actor_infos: SharedActorInfos,
762 ) -> Self {
763 let mut info = Self::empty_inner(database_id, shared_actor_infos);
764 for job in jobs {
765 info.add_existing(job);
766 }
767 info
768 }
769
770 pub fn is_empty(&self) -> bool {
771 self.jobs.is_empty()
772 }
773
774 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
775 let InflightStreamingJobInfo {
776 job_id,
777 fragment_infos,
778 subscribers,
779 status,
780 cdc_table_backfill_tracker,
781 } = job;
782 self.jobs
783 .try_insert(
784 job.job_id,
785 InflightStreamingJobInfo {
786 job_id,
787 subscribers,
788 fragment_infos: Default::default(), status,
790 cdc_table_backfill_tracker,
791 },
792 )
793 .expect("non-duplicate");
794 let post_apply_changes =
795 self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
796 (
797 fragment_id,
798 CommandFragmentChanges::NewFragment {
799 job_id: job.job_id,
800 info,
801 is_existing: true,
802 },
803 )
804 }));
805 self.post_apply(post_apply_changes);
806 }
807
808 pub(crate) fn pre_apply(
811 &mut self,
812 new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
813 fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
814 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
815 if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
816 self.jobs
817 .try_insert(
818 job_id,
819 InflightStreamingJobInfo {
820 job_id,
821 fragment_infos: Default::default(),
822 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
824 cdc_table_backfill_tracker,
825 },
826 )
827 .expect("non-duplicate");
828 }
829 self.apply_add(fragment_changes.into_iter())
830 }
831
832 fn apply_add(
833 &mut self,
834 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
835 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
836 let mut post_apply = HashMap::new();
837 {
838 let shared_infos = self.shared_actor_infos.clone();
839 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
840 for (fragment_id, change) in fragment_changes {
841 match change {
842 CommandFragmentChanges::NewFragment {
843 job_id,
844 info,
845 is_existing,
846 } => {
847 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
848 if !is_existing {
849 shared_actor_writer.upsert([(&info, job_id)]);
850 }
851 fragment_infos
852 .fragment_infos
853 .try_insert(fragment_id, info)
854 .expect("non duplicate");
855 self.fragment_location
856 .try_insert(fragment_id, job_id)
857 .expect("non duplicate");
858 }
859 CommandFragmentChanges::Reschedule {
860 new_actors,
861 actor_update_vnode_bitmap,
862 to_remove,
863 actor_splits,
864 } => {
865 let (info, _) = self.fragment_mut(fragment_id);
866 let actors = &mut info.actors;
867 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
868 actors
869 .get_mut(&actor_id)
870 .expect("should exist")
871 .vnode_bitmap = Some(new_vnodes);
872 }
873 for (actor_id, actor) in new_actors {
874 actors
875 .try_insert(actor_id as _, actor)
876 .expect("non-duplicate");
877 }
878 for (actor_id, splits) in actor_splits {
879 actors.get_mut(&actor_id).expect("should exist").splits = splits;
880 }
881
882 post_apply.insert(
883 fragment_id,
884 PostApplyFragmentChanges::Reschedule { to_remove },
885 );
886
887 }
889 CommandFragmentChanges::RemoveFragment => {
890 post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
891 }
892 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
893 let mut remaining_fragment_ids: HashSet<_> =
894 replace_map.keys().cloned().collect();
895 let (info, _) = self.fragment_mut(fragment_id);
896 visit_stream_node_mut(&mut info.nodes, |node| {
897 if let NodeBody::Merge(m) = node
898 && let Some(new_upstream_fragment_id) =
899 replace_map.get(&m.upstream_fragment_id)
900 {
901 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
902 if cfg!(debug_assertions) {
903 panic!(
904 "duplicate upstream fragment: {:?} {:?}",
905 m, replace_map
906 );
907 } else {
908 warn!(?m, ?replace_map, "duplicate upstream fragment");
909 }
910 }
911 m.upstream_fragment_id = *new_upstream_fragment_id;
912 }
913 });
914 if cfg!(debug_assertions) {
915 assert!(
916 remaining_fragment_ids.is_empty(),
917 "non-existing fragment to replace: {:?} {:?} {:?}",
918 remaining_fragment_ids,
919 info.nodes,
920 replace_map
921 );
922 } else {
923 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
924 }
925 }
926 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
927 let (info, _) = self.fragment_mut(fragment_id);
928 let mut injected = false;
929 visit_stream_node_mut(&mut info.nodes, |node| {
930 if let NodeBody::UpstreamSinkUnion(u) = node {
931 if cfg!(debug_assertions) {
932 let current_upstream_fragment_ids = u
933 .init_upstreams
934 .iter()
935 .map(|upstream| upstream.upstream_fragment_id)
936 .collect::<HashSet<_>>();
937 if current_upstream_fragment_ids
938 .contains(&new_upstream_info.upstream_fragment_id)
939 {
940 panic!(
941 "duplicate upstream fragment: {:?} {:?}",
942 u, new_upstream_info
943 );
944 }
945 }
946 u.init_upstreams.push(new_upstream_info.clone());
947 injected = true;
948 }
949 });
950 assert!(injected, "should inject upstream into UpstreamSinkUnion");
951 }
952 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
953 let (info, _) = self.fragment_mut(fragment_id);
954 let mut removed = false;
955 visit_stream_node_mut(&mut info.nodes, |node| {
956 if let NodeBody::UpstreamSinkUnion(u) = node {
957 if cfg!(debug_assertions) {
958 let current_upstream_fragment_ids = u
959 .init_upstreams
960 .iter()
961 .map(|upstream| upstream.upstream_fragment_id)
962 .collect::<HashSet<FragmentId>>();
963 for drop_fragment_id in &drop_upstream_fragment_ids {
964 if !current_upstream_fragment_ids.contains(drop_fragment_id)
965 {
966 panic!(
967 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
968 u, drop_upstream_fragment_ids, drop_fragment_id
969 );
970 }
971 }
972 }
973 u.init_upstreams.retain(|upstream| {
974 !drop_upstream_fragment_ids
975 .contains(&upstream.upstream_fragment_id)
976 });
977 removed = true;
978 }
979 });
980 assert!(removed, "should remove upstream from UpstreamSinkUnion");
981 }
982 CommandFragmentChanges::SplitAssignment { actor_splits } => {
983 let (info, job_id) = self.fragment_mut(fragment_id);
984 let actors = &mut info.actors;
985 for (actor_id, splits) in actor_splits {
986 actors.get_mut(&actor_id).expect("should exist").splits = splits;
987 }
988 shared_actor_writer.upsert([(&*info, job_id)]);
989 }
990 }
991 }
992 shared_actor_writer.finish();
993 }
994 post_apply
995 }
996
997 pub(super) fn build_edge(
998 &self,
999 command: Option<&Command>,
1000 control_stream_manager: &ControlStreamManager,
1001 ) -> Option<FragmentEdgeBuildResult> {
1002 let (info, replace_job, new_upstream_sink) = match command {
1003 None => {
1004 return None;
1005 }
1006 Some(command) => match command {
1007 Command::Flush
1008 | Command::Pause
1009 | Command::Resume
1010 | Command::DropStreamingJobs { .. }
1011 | Command::RescheduleFragment { .. }
1012 | Command::SourceChangeSplit { .. }
1013 | Command::Throttle(_)
1014 | Command::CreateSubscription { .. }
1015 | Command::DropSubscription { .. }
1016 | Command::ConnectorPropsChange(_)
1017 | Command::Refresh { .. }
1018 | Command::ListFinish { .. }
1019 | Command::LoadFinish { .. } => {
1020 return None;
1021 }
1022 Command::CreateStreamingJob { info, job_type, .. } => {
1023 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1024 new_upstream_sink,
1025 ) = job_type
1026 {
1027 Some(new_upstream_sink)
1028 } else {
1029 None
1030 };
1031 (Some(info), None, new_upstream_sink)
1032 }
1033 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1034 },
1035 };
1036 let existing_fragment_ids = info
1044 .into_iter()
1045 .flat_map(|info| info.upstream_fragment_downstreams.keys())
1046 .chain(replace_job.into_iter().flat_map(|replace_job| {
1047 replace_job
1048 .upstream_fragment_downstreams
1049 .keys()
1050 .filter(|fragment_id| {
1051 info.map(|info| {
1052 !info
1053 .stream_job_fragments
1054 .fragments
1055 .contains_key(fragment_id)
1056 })
1057 .unwrap_or(true)
1058 })
1059 .chain(replace_job.replace_upstream.keys())
1060 }))
1061 .chain(
1062 new_upstream_sink
1063 .into_iter()
1064 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1065 )
1066 .cloned();
1067 let new_fragment_infos = info
1068 .into_iter()
1069 .flat_map(|info| {
1070 info.stream_job_fragments
1071 .new_fragment_info(&info.init_split_assignment)
1072 })
1073 .chain(replace_job.into_iter().flat_map(|replace_job| {
1074 replace_job
1075 .new_fragments
1076 .new_fragment_info(&replace_job.init_split_assignment)
1077 .chain(
1078 replace_job
1079 .auto_refresh_schema_sinks
1080 .as_ref()
1081 .into_iter()
1082 .flat_map(|sinks| {
1083 sinks.iter().map(|sink| {
1084 (sink.new_fragment.fragment_id, sink.new_fragment_info())
1085 })
1086 }),
1087 )
1088 }))
1089 .collect_vec();
1090 let mut builder = FragmentEdgeBuilder::new(
1091 existing_fragment_ids
1092 .map(|fragment_id| self.fragment(fragment_id))
1093 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
1094 control_stream_manager,
1095 );
1096 if let Some(info) = info {
1097 builder.add_relations(&info.upstream_fragment_downstreams);
1098 builder.add_relations(&info.stream_job_fragments.downstreams);
1099 }
1100 if let Some(replace_job) = replace_job {
1101 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1102 builder.add_relations(&replace_job.new_fragments.downstreams);
1103 }
1104 if let Some(new_upstream_sink) = new_upstream_sink {
1105 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1106 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1107 builder.add_edge(sink_fragment_id, new_sink_downstream);
1108 }
1109 if let Some(replace_job) = replace_job {
1110 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1111 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1112 fragment_replacement
1113 {
1114 builder.replace_upstream(
1115 *fragment_id,
1116 *original_upstream_fragment_id,
1117 *new_upstream_fragment_id,
1118 );
1119 }
1120 }
1121 }
1122 Some(builder.build())
1123 }
1124
1125 pub(crate) fn post_apply(
1128 &mut self,
1129 fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1130 ) {
1131 let inner = self.shared_actor_infos.clone();
1132 let mut shared_actor_writer = inner.start_writer(self.database_id);
1133 {
1134 for (fragment_id, changes) in fragment_changes {
1135 match changes {
1136 PostApplyFragmentChanges::Reschedule { to_remove } => {
1137 let job_id = self.fragment_location[&fragment_id];
1138 let info = self
1139 .jobs
1140 .get_mut(&job_id)
1141 .expect("should exist")
1142 .fragment_infos
1143 .get_mut(&fragment_id)
1144 .expect("should exist");
1145 for actor_id in to_remove {
1146 assert!(info.actors.remove(&actor_id).is_some());
1147 }
1148 shared_actor_writer.upsert([(&*info, job_id)]);
1149 }
1150 PostApplyFragmentChanges::RemoveFragment => {
1151 let job_id = self
1152 .fragment_location
1153 .remove(&fragment_id)
1154 .expect("should exist");
1155 let job = self.jobs.get_mut(&job_id).expect("should exist");
1156 let fragment = job
1157 .fragment_infos
1158 .remove(&fragment_id)
1159 .expect("should exist");
1160 shared_actor_writer.remove(&fragment);
1161 if job.fragment_infos.is_empty() {
1162 self.jobs.remove(&job_id).expect("should exist");
1163 }
1164 }
1165 }
1166 }
1167 }
1168 shared_actor_writer.finish();
1169 }
1170}
1171
1172impl InflightFragmentInfo {
1173 pub(crate) fn actor_ids_to_collect(
1175 infos: impl IntoIterator<Item = &Self>,
1176 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1177 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1178 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1179 assert!(
1180 ret.entry(actor.worker_id)
1181 .or_default()
1182 .insert(*actor_id as _)
1183 )
1184 }
1185 ret
1186 }
1187
1188 pub fn existing_table_ids<'a>(
1189 infos: impl IntoIterator<Item = &'a Self> + 'a,
1190 ) -> impl Iterator<Item = TableId> + 'a {
1191 infos
1192 .into_iter()
1193 .flat_map(|info| info.state_table_ids.iter().cloned())
1194 }
1195
1196 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1197 infos.into_iter().any(|fragment| {
1198 fragment
1199 .actors
1200 .values()
1201 .any(|actor| (actor.worker_id) == worker_id)
1202 })
1203 }
1204}
1205
1206impl InflightDatabaseInfo {
1207 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1208 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1209 }
1210
1211 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1212 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1213 }
1214}