1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::rebuild_fragment_mapping;
49use crate::manager::NotificationManagerRef;
50use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
51
52#[derive(Debug, Clone)]
53pub struct SharedActorInfo {
54 pub worker_id: WorkerId,
55 pub vnode_bitmap: Option<Bitmap>,
56 pub splits: Vec<SplitImpl>,
57}
58
59impl From<&InflightActorInfo> for SharedActorInfo {
60 fn from(value: &InflightActorInfo) -> Self {
61 Self {
62 worker_id: value.worker_id,
63 vnode_bitmap: value.vnode_bitmap.clone(),
64 splits: value.splits.clone(),
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct SharedFragmentInfo {
71 pub fragment_id: FragmentId,
72 pub job_id: JobId,
73 pub distribution_type: DistributionType,
74 pub actors: HashMap<ActorId, SharedActorInfo>,
75 pub vnode_count: usize,
76 pub fragment_type_mask: FragmentTypeMask,
77}
78
79impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
80 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
81 let (info, job_id) = pair;
82
83 let InflightFragmentInfo {
84 fragment_id,
85 distribution_type,
86 fragment_type_mask,
87 actors,
88 vnode_count,
89 ..
90 } = info;
91
92 Self {
93 fragment_id: *fragment_id,
94 job_id,
95 distribution_type: *distribution_type,
96 fragment_type_mask: *fragment_type_mask,
97 actors: actors
98 .iter()
99 .map(|(actor_id, actor)| (*actor_id, actor.into()))
100 .collect(),
101 vnode_count: *vnode_count,
102 }
103 }
104}
105
106#[derive(Default, Debug)]
107pub struct SharedActorInfosInner {
108 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
109}
110
111impl SharedActorInfosInner {
112 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
113 self.info
114 .values()
115 .find_map(|database| database.get(&fragment_id))
116 }
117
118 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
119 self.info.values().flatten()
120 }
121}
122
123#[derive(Clone, educe::Educe)]
124#[educe(Debug)]
125pub struct SharedActorInfos {
126 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
127 #[educe(Debug(ignore))]
128 notification_manager: NotificationManagerRef,
129}
130
131impl SharedActorInfos {
132 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
133 self.inner.read()
134 }
135
136 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
137 let core = self.inner.read();
138 core.iter_over_fragments()
139 .flat_map(|(_, fragment)| {
140 fragment
141 .actors
142 .iter()
143 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
144 })
145 .collect()
146 }
147
148 pub fn migrate_splits_for_source_actors(
154 &self,
155 fragment_id: FragmentId,
156 prev_actor_ids: &[ActorId],
157 curr_actor_ids: &[ActorId],
158 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
159 let guard = self.read_guard();
160
161 let prev_splits = prev_actor_ids
162 .iter()
163 .flat_map(|actor_id| {
164 guard
166 .get_fragment(fragment_id)
167 .and_then(|info| info.actors.get(actor_id))
168 .map(|actor| actor.splits.clone())
169 .unwrap_or_default()
170 })
171 .map(|split| (split.id(), split))
172 .collect();
173
174 let empty_actor_splits = curr_actor_ids
175 .iter()
176 .map(|actor_id| (*actor_id, vec![]))
177 .collect();
178
179 let diff = crate::stream::source_manager::reassign_splits(
180 fragment_id,
181 empty_actor_splits,
182 &prev_splits,
183 std::default::Default::default(),
185 )
186 .unwrap_or_default();
187
188 Ok(diff)
189 }
190}
191
192impl SharedActorInfos {
193 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
194 Self {
195 inner: Arc::new(Default::default()),
196 notification_manager,
197 }
198 }
199
200 pub(super) fn remove_database(&self, database_id: DatabaseId) {
201 if let Some(database) = self.inner.write().info.remove(&database_id) {
202 let mapping = database
203 .into_values()
204 .map(|fragment| rebuild_fragment_mapping(&fragment))
205 .collect_vec();
206 if !mapping.is_empty() {
207 self.notification_manager
208 .notify_fragment_mapping(Operation::Delete, mapping);
209 }
210 }
211 }
212
213 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
214 let database_ids: HashSet<_> = database_ids.into_iter().collect();
215
216 let mut mapping = Vec::new();
217 for fragment in self
218 .inner
219 .write()
220 .info
221 .extract_if(|database_id, _| !database_ids.contains(database_id))
222 .flat_map(|(_, fragments)| fragments.into_values())
223 {
224 mapping.push(rebuild_fragment_mapping(&fragment));
225 }
226 if !mapping.is_empty() {
227 self.notification_manager
228 .notify_fragment_mapping(Operation::Delete, mapping);
229 }
230 }
231
232 pub(super) fn recover_database(
233 &self,
234 database_id: DatabaseId,
235 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
236 ) {
237 let mut remaining_fragments: HashMap<_, _> = fragments
238 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
239 .collect();
240 let mut writer = self.start_writer(database_id);
242 let database = writer.write_guard.info.entry(database_id).or_default();
243 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
244 if let Some(info) = remaining_fragments.remove(fragment_id) {
245 let info = info.into();
246 writer
247 .updated_fragment_mapping
248 .get_or_insert_default()
249 .push(rebuild_fragment_mapping(&info));
250 *fragment_infos = info;
251 false
252 } else {
253 true
254 }
255 }) {
256 writer
257 .deleted_fragment_mapping
258 .get_or_insert_default()
259 .push(rebuild_fragment_mapping(&fragment));
260 }
261 for (fragment_id, info) in remaining_fragments {
262 let info = info.into();
263 writer
264 .added_fragment_mapping
265 .get_or_insert_default()
266 .push(rebuild_fragment_mapping(&info));
267 database.insert(fragment_id, info);
268 }
269 writer.finish();
270 }
271
272 pub(super) fn upsert(
273 &self,
274 database_id: DatabaseId,
275 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
276 ) {
277 let mut writer = self.start_writer(database_id);
278 writer.upsert(infos);
279 writer.finish();
280 }
281
282 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
283 SharedActorInfoWriter {
284 database_id,
285 write_guard: self.inner.write(),
286 notification_manager: &self.notification_manager,
287 added_fragment_mapping: None,
288 updated_fragment_mapping: None,
289 deleted_fragment_mapping: None,
290 }
291 }
292}
293
294pub(super) struct SharedActorInfoWriter<'a> {
295 database_id: DatabaseId,
296 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
297 notification_manager: &'a NotificationManagerRef,
298 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
300 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
301}
302
303impl SharedActorInfoWriter<'_> {
304 pub(super) fn upsert(
305 &mut self,
306 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
307 ) {
308 let database = self.write_guard.info.entry(self.database_id).or_default();
309 for info @ (fragment, _) in infos {
310 match database.entry(fragment.fragment_id) {
311 Entry::Occupied(mut entry) => {
312 let info = info.into();
313 self.updated_fragment_mapping
314 .get_or_insert_default()
315 .push(rebuild_fragment_mapping(&info));
316 entry.insert(info);
317 }
318 Entry::Vacant(entry) => {
319 let info = info.into();
320 self.added_fragment_mapping
321 .get_or_insert_default()
322 .push(rebuild_fragment_mapping(&info));
323 entry.insert(info);
324 }
325 }
326 }
327 }
328
329 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
330 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
331 && let Some(fragment) = database.remove(&info.fragment_id)
332 {
333 self.deleted_fragment_mapping
334 .get_or_insert_default()
335 .push(rebuild_fragment_mapping(&fragment));
336 }
337 }
338
339 pub(super) fn finish(self) {
340 if let Some(mapping) = self.added_fragment_mapping {
341 self.notification_manager
342 .notify_fragment_mapping(Operation::Add, mapping);
343 }
344 if let Some(mapping) = self.updated_fragment_mapping {
345 self.notification_manager
346 .notify_fragment_mapping(Operation::Update, mapping);
347 }
348 if let Some(mapping) = self.deleted_fragment_mapping {
349 self.notification_manager
350 .notify_fragment_mapping(Operation::Delete, mapping);
351 }
352 }
353}
354
355#[derive(Debug, Clone)]
356pub(super) struct BarrierInfo {
357 pub prev_epoch: TracedEpoch,
358 pub curr_epoch: TracedEpoch,
359 pub kind: BarrierKind,
360}
361
362impl BarrierInfo {
363 pub(super) fn prev_epoch(&self) -> u64 {
364 self.prev_epoch.value().0
365 }
366
367 pub(super) fn curr_epoch(&self) -> u64 {
368 self.curr_epoch.value().0
369 }
370}
371
372#[derive(Debug)]
373pub(super) enum CommandFragmentChanges {
374 NewFragment {
375 job_id: JobId,
376 info: InflightFragmentInfo,
377 },
378 AddNodeUpstream(PbUpstreamSinkInfo),
379 DropNodeUpstream(Vec<FragmentId>),
380 ReplaceNodeUpstream(
381 HashMap<FragmentId, FragmentId>,
383 ),
384 Reschedule {
385 new_actors: HashMap<ActorId, InflightActorInfo>,
386 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
387 to_remove: HashSet<ActorId>,
388 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
389 },
390 RemoveFragment,
391 SplitAssignment {
392 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393 },
394}
395
396pub(super) enum PostApplyFragmentChanges {
397 Reschedule { to_remove: HashSet<ActorId> },
398 RemoveFragment,
399}
400
401#[derive(Clone, Debug)]
402pub enum SubscriberType {
403 Subscription(u64),
404 SnapshotBackfill,
405}
406
407#[derive(Debug)]
408pub(super) enum CreateStreamingJobStatus {
409 Init,
410 Creating(CreateMviewProgressTracker),
411 Created,
412}
413
414#[derive(Debug)]
415pub(super) struct InflightStreamingJobInfo {
416 pub job_id: JobId,
417 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
418 pub subscribers: HashMap<SubscriberId, SubscriberType>,
419 pub status: CreateStreamingJobStatus,
420 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
421}
422
423impl InflightStreamingJobInfo {
424 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
425 self.fragment_infos.values()
426 }
427
428 pub fn snapshot_backfill_actor_ids(
429 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430 ) -> impl Iterator<Item = ActorId> + '_ {
431 fragment_infos
432 .values()
433 .filter(|fragment| {
434 fragment
435 .fragment_type_mask
436 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
437 })
438 .flat_map(|fragment| fragment.actors.keys().copied())
439 }
440
441 pub fn tracking_progress_actor_ids(
442 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
443 ) -> Vec<(ActorId, BackfillUpstreamType)> {
444 StreamJobFragments::tracking_progress_actor_ids_impl(
445 fragment_infos
446 .values()
447 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
448 )
449 }
450}
451
452impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
453 type Item = &'a InflightFragmentInfo;
454
455 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
456
457 fn into_iter(self) -> Self::IntoIter {
458 self.fragment_infos()
459 }
460}
461
462#[derive(Debug)]
463pub struct InflightDatabaseInfo {
464 database_id: DatabaseId,
465 jobs: HashMap<JobId, InflightStreamingJobInfo>,
466 fragment_location: HashMap<FragmentId, JobId>,
467 pub(super) shared_actor_infos: SharedActorInfos,
468}
469
470impl InflightDatabaseInfo {
471 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
472 self.jobs.values().flat_map(|job| job.fragment_infos())
473 }
474
475 pub fn contains_job(&self, job_id: JobId) -> bool {
476 self.jobs.contains_key(&job_id)
477 }
478
479 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
480 let job_id = self.fragment_location[&fragment_id];
481 self.jobs
482 .get(&job_id)
483 .expect("should exist")
484 .fragment_infos
485 .get(&fragment_id)
486 .expect("should exist")
487 }
488
489 pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
490 self.jobs
491 .iter()
492 .filter_map(|(job_id, job)| match &job.status {
493 CreateStreamingJobStatus::Init => None,
494 CreateStreamingJobStatus::Creating(tracker) => {
495 Some((*job_id, tracker.gen_ddl_progress()))
496 }
497 CreateStreamingJobStatus::Created => None,
498 })
499 }
500
501 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
502 self.jobs.iter().filter_map(|(job_id, job)| {
503 job.cdc_table_backfill_tracker
504 .as_ref()
505 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
506 })
507 }
508
509 pub(super) fn may_assign_fragment_cdc_backfill_splits(
510 &mut self,
511 fragment_id: FragmentId,
512 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
513 let job_id = self.fragment_location[&fragment_id];
514 let job = self.jobs.get_mut(&job_id).expect("should exist");
515 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
516 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
517 if cdc_scan_fragment_id != fragment_id {
518 return Ok(None);
519 }
520 let actors = job.fragment_infos[&cdc_scan_fragment_id]
521 .actors
522 .keys()
523 .copied()
524 .collect();
525 tracker.reassign_splits(actors).map(Some)
526 } else {
527 Ok(None)
528 }
529 }
530
531 pub(super) fn assign_cdc_backfill_splits(
532 &mut self,
533 job_id: JobId,
534 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
535 let job = self.jobs.get_mut(&job_id).expect("should exist");
536 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
537 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
538 let actors = job.fragment_infos[&cdc_scan_fragment_id]
539 .actors
540 .keys()
541 .copied()
542 .collect();
543 tracker.reassign_splits(actors).map(Some)
544 } else {
545 Ok(None)
546 }
547 }
548
549 pub(super) fn apply_collected_command(
550 &mut self,
551 command: Option<&Command>,
552 resps: &[BarrierCompleteResponse],
553 version_stats: &HummockVersionStats,
554 ) {
555 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
556 match job_type {
557 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
558 let job_id = info.streaming_job.id();
559 if let Some(job_info) = self.jobs.get_mut(&job_id) {
560 let CreateStreamingJobStatus::Init = replace(
561 &mut job_info.status,
562 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
563 info,
564 version_stats,
565 )),
566 ) else {
567 unreachable!("should be init before collect the first barrier")
568 };
569 } else {
570 info!(%job_id, "newly create job get cancelled before first barrier is collected")
571 }
572 }
573 CreateStreamingJobType::SnapshotBackfill(_) => {
574 }
576 }
577 }
578 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
579 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
580 warn!(
581 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
582 );
583 continue;
584 };
585 let CreateStreamingJobStatus::Creating(tracker) =
586 &mut self.jobs.get_mut(job_id).expect("should exist").status
587 else {
588 warn!("update the progress of an created streaming job: {progress:?}");
589 continue;
590 };
591 tracker.apply_progress(progress, version_stats);
592 }
593 for progress in resps
594 .iter()
595 .flat_map(|resp| &resp.cdc_table_backfill_progress)
596 {
597 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
598 warn!(
599 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
600 );
601 continue;
602 };
603 let Some(tracker) = &mut self
604 .jobs
605 .get_mut(job_id)
606 .expect("should exist")
607 .cdc_table_backfill_tracker
608 else {
609 warn!("update the progress of an created streaming job: {progress:?}");
610 continue;
611 };
612 tracker.update_split_progress(progress);
613 }
614 }
615
616 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
617 self.jobs.values().filter_map(|job| match &job.status {
618 CreateStreamingJobStatus::Init => None,
619 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
620 CreateStreamingJobStatus::Created => None,
621 })
622 }
623
624 fn iter_mut_creating_job_tracker(
625 &mut self,
626 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
627 self.jobs
628 .values_mut()
629 .filter_map(|job| match &mut job.status {
630 CreateStreamingJobStatus::Init => None,
631 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
632 CreateStreamingJobStatus::Created => None,
633 })
634 }
635
636 pub(super) fn has_pending_finished_jobs(&self) -> bool {
637 self.iter_creating_job_tracker()
638 .any(|tracker| tracker.is_finished())
639 }
640
641 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
642 self.iter_mut_creating_job_tracker()
643 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
644 .collect()
645 }
646
647 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
648 let mut finished_jobs = vec![];
649 let mut table_ids_to_truncate = vec![];
650 let mut finished_cdc_table_backfill = vec![];
651 for (job_id, job) in &mut self.jobs {
652 if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
653 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
654 table_ids_to_truncate.extend(truncate_table_ids);
655 if is_finished {
656 let CreateStreamingJobStatus::Creating(tracker) =
657 replace(&mut job.status, CreateStreamingJobStatus::Created)
658 else {
659 unreachable!()
660 };
661 finished_jobs.push(tracker.into_tracking_job());
662 }
663 }
664 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
665 && tracker.take_pre_completed()
666 {
667 finished_cdc_table_backfill.push(*job_id);
668 }
669 }
670 StagingCommitInfo {
671 finished_jobs,
672 table_ids_to_truncate,
673 finished_cdc_table_backfill,
674 }
675 }
676
677 pub fn fragment_subscribers(
678 &self,
679 fragment_id: FragmentId,
680 ) -> impl Iterator<Item = SubscriberId> + '_ {
681 let job_id = self.fragment_location[&fragment_id];
682 self.jobs[&job_id].subscribers.keys().copied()
683 }
684
685 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
686 self.jobs[&job_id].subscribers.keys().copied()
687 }
688
689 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
690 self.jobs
691 .iter()
692 .filter_map(|(job_id, info)| {
693 info.subscribers
694 .values()
695 .filter_map(|subscriber| match subscriber {
696 SubscriberType::Subscription(retention) => Some(*retention),
697 SubscriberType::SnapshotBackfill => None,
698 })
699 .max()
700 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
701 })
702 .collect()
703 }
704
705 pub fn register_subscriber(
706 &mut self,
707 job_id: JobId,
708 subscriber_id: SubscriberId,
709 subscriber: SubscriberType,
710 ) {
711 self.jobs
712 .get_mut(&job_id)
713 .expect("should exist")
714 .subscribers
715 .try_insert(subscriber_id, subscriber)
716 .expect("non duplicate");
717 }
718
719 pub fn unregister_subscriber(
720 &mut self,
721 job_id: JobId,
722 subscriber_id: SubscriberId,
723 ) -> Option<SubscriberType> {
724 self.jobs
725 .get_mut(&job_id)
726 .expect("should exist")
727 .subscribers
728 .remove(&subscriber_id)
729 }
730
731 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
732 let job_id = self.fragment_location[&fragment_id];
733 let fragment = self
734 .jobs
735 .get_mut(&job_id)
736 .expect("should exist")
737 .fragment_infos
738 .get_mut(&fragment_id)
739 .expect("should exist");
740 (fragment, job_id)
741 }
742
743 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
744 Self {
745 database_id,
746 jobs: Default::default(),
747 fragment_location: Default::default(),
748 shared_actor_infos,
749 }
750 }
751
752 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
753 shared_actor_infos.remove_database(database_id);
755 Self::empty_inner(database_id, shared_actor_infos)
756 }
757
758 pub fn recover(
759 database_id: DatabaseId,
760 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
761 shared_actor_infos: SharedActorInfos,
762 ) -> Self {
763 let mut info = Self::empty_inner(database_id, shared_actor_infos);
764 for job in jobs {
765 info.add_existing(job);
766 }
767 info
768 }
769
770 pub fn is_empty(&self) -> bool {
771 self.jobs.is_empty()
772 }
773
774 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
775 let InflightStreamingJobInfo {
776 job_id,
777 fragment_infos,
778 subscribers,
779 status,
780 cdc_table_backfill_tracker,
781 } = job;
782 self.jobs
783 .try_insert(
784 job.job_id,
785 InflightStreamingJobInfo {
786 job_id,
787 subscribers,
788 fragment_infos: Default::default(), status,
790 cdc_table_backfill_tracker,
791 },
792 )
793 .expect("non-duplicate");
794 let post_apply_changes =
795 self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
796 (
797 fragment_id,
798 CommandFragmentChanges::NewFragment {
799 job_id: job.job_id,
800 info,
801 },
802 )
803 }));
804 self.post_apply(post_apply_changes);
805 }
806
807 pub(crate) fn pre_apply(
810 &mut self,
811 new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
812 fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
813 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
814 if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
815 self.jobs
816 .try_insert(
817 job_id,
818 InflightStreamingJobInfo {
819 job_id,
820 fragment_infos: Default::default(),
821 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
823 cdc_table_backfill_tracker,
824 },
825 )
826 .expect("non-duplicate");
827 }
828 self.apply_add(fragment_changes.into_iter())
829 }
830
831 fn apply_add(
832 &mut self,
833 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
834 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
835 let mut post_apply = HashMap::new();
836 {
837 let shared_infos = self.shared_actor_infos.clone();
838 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
839 for (fragment_id, change) in fragment_changes {
840 match change {
841 CommandFragmentChanges::NewFragment { job_id, info } => {
842 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
843 shared_actor_writer.upsert([(&info, job_id)]);
844 fragment_infos
845 .fragment_infos
846 .try_insert(fragment_id, info)
847 .expect("non duplicate");
848 self.fragment_location
849 .try_insert(fragment_id, job_id)
850 .expect("non duplicate");
851 }
852 CommandFragmentChanges::Reschedule {
853 new_actors,
854 actor_update_vnode_bitmap,
855 to_remove,
856 actor_splits,
857 } => {
858 let (info, _) = self.fragment_mut(fragment_id);
859 let actors = &mut info.actors;
860 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
861 actors
862 .get_mut(&actor_id)
863 .expect("should exist")
864 .vnode_bitmap = Some(new_vnodes);
865 }
866 for (actor_id, actor) in new_actors {
867 actors
868 .try_insert(actor_id as _, actor)
869 .expect("non-duplicate");
870 }
871 for (actor_id, splits) in actor_splits {
872 actors.get_mut(&actor_id).expect("should exist").splits = splits;
873 }
874
875 post_apply.insert(
876 fragment_id,
877 PostApplyFragmentChanges::Reschedule { to_remove },
878 );
879
880 }
882 CommandFragmentChanges::RemoveFragment => {
883 post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
884 }
885 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
886 let mut remaining_fragment_ids: HashSet<_> =
887 replace_map.keys().cloned().collect();
888 let (info, _) = self.fragment_mut(fragment_id);
889 visit_stream_node_mut(&mut info.nodes, |node| {
890 if let NodeBody::Merge(m) = node
891 && let Some(new_upstream_fragment_id) =
892 replace_map.get(&m.upstream_fragment_id)
893 {
894 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
895 if cfg!(debug_assertions) {
896 panic!(
897 "duplicate upstream fragment: {:?} {:?}",
898 m, replace_map
899 );
900 } else {
901 warn!(?m, ?replace_map, "duplicate upstream fragment");
902 }
903 }
904 m.upstream_fragment_id = *new_upstream_fragment_id;
905 }
906 });
907 if cfg!(debug_assertions) {
908 assert!(
909 remaining_fragment_ids.is_empty(),
910 "non-existing fragment to replace: {:?} {:?} {:?}",
911 remaining_fragment_ids,
912 info.nodes,
913 replace_map
914 );
915 } else {
916 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
917 }
918 }
919 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
920 let (info, _) = self.fragment_mut(fragment_id);
921 let mut injected = false;
922 visit_stream_node_mut(&mut info.nodes, |node| {
923 if let NodeBody::UpstreamSinkUnion(u) = node {
924 if cfg!(debug_assertions) {
925 let current_upstream_fragment_ids = u
926 .init_upstreams
927 .iter()
928 .map(|upstream| upstream.upstream_fragment_id)
929 .collect::<HashSet<_>>();
930 if current_upstream_fragment_ids
931 .contains(&new_upstream_info.upstream_fragment_id)
932 {
933 panic!(
934 "duplicate upstream fragment: {:?} {:?}",
935 u, new_upstream_info
936 );
937 }
938 }
939 u.init_upstreams.push(new_upstream_info.clone());
940 injected = true;
941 }
942 });
943 assert!(injected, "should inject upstream into UpstreamSinkUnion");
944 }
945 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
946 let (info, _) = self.fragment_mut(fragment_id);
947 let mut removed = false;
948 visit_stream_node_mut(&mut info.nodes, |node| {
949 if let NodeBody::UpstreamSinkUnion(u) = node {
950 if cfg!(debug_assertions) {
951 let current_upstream_fragment_ids = u
952 .init_upstreams
953 .iter()
954 .map(|upstream| upstream.upstream_fragment_id)
955 .collect::<HashSet<FragmentId>>();
956 for drop_fragment_id in &drop_upstream_fragment_ids {
957 if !current_upstream_fragment_ids.contains(drop_fragment_id)
958 {
959 panic!(
960 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
961 u, drop_upstream_fragment_ids, drop_fragment_id
962 );
963 }
964 }
965 }
966 u.init_upstreams.retain(|upstream| {
967 !drop_upstream_fragment_ids
968 .contains(&upstream.upstream_fragment_id)
969 });
970 removed = true;
971 }
972 });
973 assert!(removed, "should remove upstream from UpstreamSinkUnion");
974 }
975 CommandFragmentChanges::SplitAssignment { actor_splits } => {
976 let (info, job_id) = self.fragment_mut(fragment_id);
977 let actors = &mut info.actors;
978 for (actor_id, splits) in actor_splits {
979 actors.get_mut(&actor_id).expect("should exist").splits = splits;
980 }
981 shared_actor_writer.upsert([(&*info, job_id)]);
982 }
983 }
984 }
985 shared_actor_writer.finish();
986 }
987 post_apply
988 }
989
990 pub(super) fn build_edge(
991 &self,
992 command: Option<&Command>,
993 control_stream_manager: &ControlStreamManager,
994 ) -> Option<FragmentEdgeBuildResult> {
995 let (info, replace_job, new_upstream_sink) = match command {
996 None => {
997 return None;
998 }
999 Some(command) => match command {
1000 Command::Flush
1001 | Command::Pause
1002 | Command::Resume
1003 | Command::DropStreamingJobs { .. }
1004 | Command::RescheduleFragment { .. }
1005 | Command::SourceChangeSplit { .. }
1006 | Command::Throttle(_)
1007 | Command::CreateSubscription { .. }
1008 | Command::DropSubscription { .. }
1009 | Command::ConnectorPropsChange(_)
1010 | Command::Refresh { .. }
1011 | Command::ListFinish { .. }
1012 | Command::LoadFinish { .. } => {
1013 return None;
1014 }
1015 Command::CreateStreamingJob { info, job_type, .. } => {
1016 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1017 new_upstream_sink,
1018 ) = job_type
1019 {
1020 Some(new_upstream_sink)
1021 } else {
1022 None
1023 };
1024 (Some(info), None, new_upstream_sink)
1025 }
1026 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1027 },
1028 };
1029 let existing_fragment_ids = info
1037 .into_iter()
1038 .flat_map(|info| info.upstream_fragment_downstreams.keys())
1039 .chain(replace_job.into_iter().flat_map(|replace_job| {
1040 replace_job
1041 .upstream_fragment_downstreams
1042 .keys()
1043 .filter(|fragment_id| {
1044 info.map(|info| {
1045 !info
1046 .stream_job_fragments
1047 .fragments
1048 .contains_key(fragment_id)
1049 })
1050 .unwrap_or(true)
1051 })
1052 .chain(replace_job.replace_upstream.keys())
1053 }))
1054 .chain(
1055 new_upstream_sink
1056 .into_iter()
1057 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1058 )
1059 .cloned();
1060 let new_fragment_infos = info
1061 .into_iter()
1062 .flat_map(|info| {
1063 info.stream_job_fragments
1064 .new_fragment_info(&info.init_split_assignment)
1065 })
1066 .chain(replace_job.into_iter().flat_map(|replace_job| {
1067 replace_job
1068 .new_fragments
1069 .new_fragment_info(&replace_job.init_split_assignment)
1070 .chain(
1071 replace_job
1072 .auto_refresh_schema_sinks
1073 .as_ref()
1074 .into_iter()
1075 .flat_map(|sinks| {
1076 sinks.iter().map(|sink| {
1077 (sink.new_fragment.fragment_id, sink.new_fragment_info())
1078 })
1079 }),
1080 )
1081 }))
1082 .collect_vec();
1083 let mut builder = FragmentEdgeBuilder::new(
1084 existing_fragment_ids
1085 .map(|fragment_id| self.fragment(fragment_id))
1086 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
1087 control_stream_manager,
1088 );
1089 if let Some(info) = info {
1090 builder.add_relations(&info.upstream_fragment_downstreams);
1091 builder.add_relations(&info.stream_job_fragments.downstreams);
1092 }
1093 if let Some(replace_job) = replace_job {
1094 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1095 builder.add_relations(&replace_job.new_fragments.downstreams);
1096 }
1097 if let Some(new_upstream_sink) = new_upstream_sink {
1098 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1099 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1100 builder.add_edge(sink_fragment_id, new_sink_downstream);
1101 }
1102 if let Some(replace_job) = replace_job {
1103 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1104 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1105 fragment_replacement
1106 {
1107 builder.replace_upstream(
1108 *fragment_id,
1109 *original_upstream_fragment_id,
1110 *new_upstream_fragment_id,
1111 );
1112 }
1113 }
1114 }
1115 Some(builder.build())
1116 }
1117
1118 pub(crate) fn post_apply(
1121 &mut self,
1122 fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1123 ) {
1124 let inner = self.shared_actor_infos.clone();
1125 let mut shared_actor_writer = inner.start_writer(self.database_id);
1126 {
1127 for (fragment_id, changes) in fragment_changes {
1128 match changes {
1129 PostApplyFragmentChanges::Reschedule { to_remove } => {
1130 let job_id = self.fragment_location[&fragment_id];
1131 let info = self
1132 .jobs
1133 .get_mut(&job_id)
1134 .expect("should exist")
1135 .fragment_infos
1136 .get_mut(&fragment_id)
1137 .expect("should exist");
1138 for actor_id in to_remove {
1139 assert!(info.actors.remove(&actor_id).is_some());
1140 }
1141 shared_actor_writer.upsert([(&*info, job_id)]);
1142 }
1143 PostApplyFragmentChanges::RemoveFragment => {
1144 let job_id = self
1145 .fragment_location
1146 .remove(&fragment_id)
1147 .expect("should exist");
1148 let job = self.jobs.get_mut(&job_id).expect("should exist");
1149 let fragment = job
1150 .fragment_infos
1151 .remove(&fragment_id)
1152 .expect("should exist");
1153 shared_actor_writer.remove(&fragment);
1154 if job.fragment_infos.is_empty() {
1155 self.jobs.remove(&job_id).expect("should exist");
1156 }
1157 }
1158 }
1159 }
1160 }
1161 shared_actor_writer.finish();
1162 }
1163}
1164
1165impl InflightFragmentInfo {
1166 pub(crate) fn actor_ids_to_collect(
1168 infos: impl IntoIterator<Item = &Self>,
1169 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1170 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1171 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1172 assert!(
1173 ret.entry(actor.worker_id)
1174 .or_default()
1175 .insert(*actor_id as _)
1176 )
1177 }
1178 ret
1179 }
1180
1181 pub fn existing_table_ids<'a>(
1182 infos: impl IntoIterator<Item = &'a Self> + 'a,
1183 ) -> impl Iterator<Item = TableId> + 'a {
1184 infos
1185 .into_iter()
1186 .flat_map(|info| info.state_table_ids.iter().cloned())
1187 }
1188
1189 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1190 infos.into_iter().any(|fragment| {
1191 fragment
1192 .actors
1193 .values()
1194 .any(|actor| (actor.worker_id) == worker_id)
1195 })
1196 }
1197}
1198
1199impl InflightDatabaseInfo {
1200 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1201 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1202 }
1203
1204 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1205 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1206 }
1207}