1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::ddl_service::PbBackfillType;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::SubscriberId;
34use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
35use risingwave_pb::meta::subscribe_response::Operation;
36use risingwave_pb::source::PbCdcTableSnapshotSplits;
37use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::{info, warn};
41
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::command::{
44 CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
45};
46use crate::barrier::edge_builder::{
47 EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
48};
49use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
50use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
51use crate::barrier::{
52 BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
53};
54use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
55use crate::controller::utils::rebuild_fragment_mapping;
56use crate::manager::NotificationManagerRef;
57use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
58use crate::stream::UpstreamSinkInfo;
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone)]
62pub struct SharedActorInfo {
63 pub worker_id: WorkerId,
64 pub vnode_bitmap: Option<Bitmap>,
65 pub splits: Vec<SplitImpl>,
66}
67
68impl From<&InflightActorInfo> for SharedActorInfo {
69 fn from(value: &InflightActorInfo) -> Self {
70 Self {
71 worker_id: value.worker_id,
72 vnode_bitmap: value.vnode_bitmap.clone(),
73 splits: value.splits.clone(),
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
79pub struct SharedFragmentInfo {
80 pub fragment_id: FragmentId,
81 pub job_id: JobId,
82 pub distribution_type: DistributionType,
83 pub actors: HashMap<ActorId, SharedActorInfo>,
84 pub vnode_count: usize,
85 pub fragment_type_mask: FragmentTypeMask,
86}
87
88impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
89 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
90 let (info, job_id) = pair;
91
92 let InflightFragmentInfo {
93 fragment_id,
94 distribution_type,
95 fragment_type_mask,
96 actors,
97 vnode_count,
98 ..
99 } = info;
100
101 Self {
102 fragment_id: *fragment_id,
103 job_id,
104 distribution_type: *distribution_type,
105 fragment_type_mask: *fragment_type_mask,
106 actors: actors
107 .iter()
108 .map(|(actor_id, actor)| (*actor_id, actor.into()))
109 .collect(),
110 vnode_count: *vnode_count,
111 }
112 }
113}
114
115#[derive(Default, Debug)]
116pub struct SharedActorInfosInner {
117 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
118}
119
120impl SharedActorInfosInner {
121 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
122 self.info
123 .values()
124 .find_map(|database| database.get(&fragment_id))
125 }
126
127 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
128 self.info.values().flatten()
129 }
130}
131
132#[derive(Clone, educe::Educe)]
133#[educe(Debug)]
134pub struct SharedActorInfos {
135 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
136 #[educe(Debug(ignore))]
137 notification_manager: NotificationManagerRef,
138}
139
140impl SharedActorInfos {
141 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
142 self.inner.read()
143 }
144
145 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
146 let core = self.inner.read();
147 core.iter_over_fragments()
148 .flat_map(|(_, fragment)| {
149 fragment
150 .actors
151 .iter()
152 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
153 })
154 .collect()
155 }
156
157 pub fn migrate_splits_for_source_actors(
163 &self,
164 fragment_id: FragmentId,
165 prev_actor_ids: &[ActorId],
166 curr_actor_ids: &[ActorId],
167 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
168 let guard = self.read_guard();
169
170 let prev_splits = prev_actor_ids
171 .iter()
172 .flat_map(|actor_id| {
173 guard
175 .get_fragment(fragment_id)
176 .and_then(|info| info.actors.get(actor_id))
177 .map(|actor| actor.splits.clone())
178 .unwrap_or_default()
179 })
180 .map(|split| (split.id(), split))
181 .collect();
182
183 let empty_actor_splits = curr_actor_ids
184 .iter()
185 .map(|actor_id| (*actor_id, vec![]))
186 .collect();
187
188 let diff = crate::stream::source_manager::reassign_splits(
189 fragment_id,
190 empty_actor_splits,
191 &prev_splits,
192 std::default::Default::default(),
194 )
195 .unwrap_or_default();
196
197 Ok(diff)
198 }
199}
200
201impl SharedActorInfos {
202 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
203 Self {
204 inner: Arc::new(Default::default()),
205 notification_manager,
206 }
207 }
208
209 pub(super) fn remove_database(&self, database_id: DatabaseId) {
210 if let Some(database) = self.inner.write().info.remove(&database_id) {
211 let mapping = database
212 .into_values()
213 .map(|fragment| rebuild_fragment_mapping(&fragment))
214 .collect_vec();
215 if !mapping.is_empty() {
216 self.notification_manager
217 .notify_fragment_mapping(Operation::Delete, mapping);
218 }
219 }
220 }
221
222 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
223 let database_ids: HashSet<_> = database_ids.into_iter().collect();
224
225 let mut mapping = Vec::new();
226 for fragment in self
227 .inner
228 .write()
229 .info
230 .extract_if(|database_id, _| !database_ids.contains(database_id))
231 .flat_map(|(_, fragments)| fragments.into_values())
232 {
233 mapping.push(rebuild_fragment_mapping(&fragment));
234 }
235 if !mapping.is_empty() {
236 self.notification_manager
237 .notify_fragment_mapping(Operation::Delete, mapping);
238 }
239 }
240
241 pub(super) fn recover_database(
242 &self,
243 database_id: DatabaseId,
244 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
245 ) {
246 let mut remaining_fragments: HashMap<_, _> = fragments
247 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
248 .collect();
249 let mut writer = self.start_writer(database_id);
251 let database = writer.write_guard.info.entry(database_id).or_default();
252 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
253 if let Some(info) = remaining_fragments.remove(fragment_id) {
254 let info = info.into();
255 writer
256 .updated_fragment_mapping
257 .get_or_insert_default()
258 .push(rebuild_fragment_mapping(&info));
259 *fragment_infos = info;
260 false
261 } else {
262 true
263 }
264 }) {
265 writer
266 .deleted_fragment_mapping
267 .get_or_insert_default()
268 .push(rebuild_fragment_mapping(&fragment));
269 }
270 for (fragment_id, info) in remaining_fragments {
271 let info = info.into();
272 writer
273 .added_fragment_mapping
274 .get_or_insert_default()
275 .push(rebuild_fragment_mapping(&info));
276 database.insert(fragment_id, info);
277 }
278 writer.finish();
279 }
280
281 pub(super) fn upsert(
282 &self,
283 database_id: DatabaseId,
284 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
285 ) {
286 let mut writer = self.start_writer(database_id);
287 writer.upsert(infos);
288 writer.finish();
289 }
290
291 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
292 SharedActorInfoWriter {
293 database_id,
294 write_guard: self.inner.write(),
295 notification_manager: &self.notification_manager,
296 added_fragment_mapping: None,
297 updated_fragment_mapping: None,
298 deleted_fragment_mapping: None,
299 }
300 }
301}
302
303pub(super) struct SharedActorInfoWriter<'a> {
304 database_id: DatabaseId,
305 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
306 notification_manager: &'a NotificationManagerRef,
307 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
308 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
309 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310}
311
312impl SharedActorInfoWriter<'_> {
313 pub(super) fn upsert(
314 &mut self,
315 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
316 ) {
317 let database = self.write_guard.info.entry(self.database_id).or_default();
318 for info @ (fragment, _) in infos {
319 match database.entry(fragment.fragment_id) {
320 Entry::Occupied(mut entry) => {
321 let info = info.into();
322 self.updated_fragment_mapping
323 .get_or_insert_default()
324 .push(rebuild_fragment_mapping(&info));
325 entry.insert(info);
326 }
327 Entry::Vacant(entry) => {
328 let info = info.into();
329 self.added_fragment_mapping
330 .get_or_insert_default()
331 .push(rebuild_fragment_mapping(&info));
332 entry.insert(info);
333 }
334 }
335 }
336 }
337
338 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
339 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
340 && let Some(fragment) = database.remove(&info.fragment_id)
341 {
342 self.deleted_fragment_mapping
343 .get_or_insert_default()
344 .push(rebuild_fragment_mapping(&fragment));
345 }
346 }
347
348 pub(super) fn finish(self) {
349 if let Some(mapping) = self.added_fragment_mapping {
350 self.notification_manager
351 .notify_fragment_mapping(Operation::Add, mapping);
352 }
353 if let Some(mapping) = self.updated_fragment_mapping {
354 self.notification_manager
355 .notify_fragment_mapping(Operation::Update, mapping);
356 }
357 if let Some(mapping) = self.deleted_fragment_mapping {
358 self.notification_manager
359 .notify_fragment_mapping(Operation::Delete, mapping);
360 }
361 }
362}
363
364#[derive(Debug, Clone)]
365pub(super) struct BarrierInfo {
366 pub prev_epoch: TracedEpoch,
367 pub curr_epoch: TracedEpoch,
368 pub kind: BarrierKind,
369}
370
371impl BarrierInfo {
372 pub(super) fn prev_epoch(&self) -> u64 {
373 self.prev_epoch.value().0
374 }
375
376 pub(super) fn curr_epoch(&self) -> u64 {
377 self.curr_epoch.value().0
378 }
379
380 pub(super) fn epoch(&self) -> EpochPair {
381 EpochPair {
382 curr: self.curr_epoch(),
383 prev: self.prev_epoch(),
384 }
385 }
386}
387
388#[derive(Clone, Debug)]
389pub enum SubscriberType {
390 Subscription(u64),
391 SnapshotBackfill,
392}
393
394#[derive(Debug)]
395pub(super) enum CreateStreamingJobStatus {
396 Init,
397 Creating { tracker: CreateMviewProgressTracker },
398 Created,
399}
400
401#[derive(Debug)]
402pub(super) struct InflightStreamingJobInfo {
403 pub job_id: JobId,
404 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
405 pub subscribers: HashMap<SubscriberId, SubscriberType>,
406 pub status: CreateStreamingJobStatus,
407 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
408}
409
410impl InflightStreamingJobInfo {
411 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
412 self.fragment_infos.values()
413 }
414
415 pub fn snapshot_backfill_actor_ids(
416 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
417 ) -> impl Iterator<Item = ActorId> + '_ {
418 fragment_infos
419 .values()
420 .filter(|fragment| {
421 fragment
422 .fragment_type_mask
423 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
424 })
425 .flat_map(|fragment| fragment.actors.keys().copied())
426 }
427
428 pub fn tracking_progress_actor_ids(
429 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430 ) -> Vec<(ActorId, BackfillUpstreamType)> {
431 StreamJobFragments::tracking_progress_actor_ids_impl(
432 fragment_infos
433 .values()
434 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
435 )
436 }
437}
438
439impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
440 type Item = &'a InflightFragmentInfo;
441
442 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
443
444 fn into_iter(self) -> Self::IntoIter {
445 self.fragment_infos()
446 }
447}
448
449#[derive(Debug)]
450pub struct InflightDatabaseInfo {
451 pub(super) database_id: DatabaseId,
452 jobs: HashMap<JobId, InflightStreamingJobInfo>,
453 fragment_location: HashMap<FragmentId, JobId>,
454 pub(super) shared_actor_infos: SharedActorInfos,
455}
456
457impl InflightDatabaseInfo {
458 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
459 self.jobs.values().flat_map(|job| job.fragment_infos())
460 }
461
462 pub fn contains_job(&self, job_id: JobId) -> bool {
463 self.jobs.contains_key(&job_id)
464 }
465
466 pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
467 self.fragment_location.get(&fragment_id).copied()
468 }
469
470 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
471 let job_id = self.fragment_location[&fragment_id];
472 self.jobs
473 .get(&job_id)
474 .expect("should exist")
475 .fragment_infos
476 .get(&fragment_id)
477 .expect("should exist")
478 }
479
480 pub(super) fn backfill_fragment_ids_for_job(
481 &self,
482 job_id: JobId,
483 ) -> MetaResult<HashSet<FragmentId>> {
484 let job = self
485 .jobs
486 .get(&job_id)
487 .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
488 Ok(job
489 .fragment_infos
490 .iter()
491 .filter_map(|(fragment_id, fragment)| {
492 fragment
493 .fragment_type_mask
494 .contains_any([
495 FragmentTypeFlag::StreamScan,
496 FragmentTypeFlag::SourceScan,
497 FragmentTypeFlag::LocalityProvider,
498 ])
499 .then_some(*fragment_id)
500 })
501 .collect())
502 }
503
504 pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
505 let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
506 MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
507 })?;
508 let fragment = self
509 .jobs
510 .get(job_id)
511 .expect("should exist")
512 .fragment_infos
513 .get(&fragment_id)
514 .expect("should exist");
515 Ok(fragment.fragment_type_mask.contains_any([
516 FragmentTypeFlag::StreamScan,
517 FragmentTypeFlag::SourceScan,
518 FragmentTypeFlag::LocalityProvider,
519 ]))
520 }
521
522 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
523 self.jobs
524 .iter()
525 .filter_map(|(job_id, job)| match &job.status {
526 CreateStreamingJobStatus::Init => None,
527 CreateStreamingJobStatus::Creating { tracker } => {
528 let progress = tracker.gen_backfill_progress();
529 Some((
530 *job_id,
531 BackfillProgress {
532 progress,
533 backfill_type: PbBackfillType::NormalBackfill,
534 },
535 ))
536 }
537 CreateStreamingJobStatus::Created => None,
538 })
539 }
540
541 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
542 self.jobs.iter().filter_map(|(job_id, job)| {
543 job.cdc_table_backfill_tracker
544 .as_ref()
545 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
546 })
547 }
548
549 pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
550 let mut result = Vec::new();
551 for job in self.jobs.values() {
552 let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
553 continue;
554 };
555 let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
556 result.extend(fragment_progress);
557 }
558 result
559 }
560
561 pub(super) fn may_assign_fragment_cdc_backfill_splits(
562 &mut self,
563 fragment_id: FragmentId,
564 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
565 let job_id = self.fragment_location[&fragment_id];
566 let job = self.jobs.get_mut(&job_id).expect("should exist");
567 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
568 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
569 if cdc_scan_fragment_id != fragment_id {
570 return Ok(None);
571 }
572 let actors = job.fragment_infos[&cdc_scan_fragment_id]
573 .actors
574 .keys()
575 .copied()
576 .collect();
577 tracker.reassign_splits(actors).map(Some)
578 } else {
579 Ok(None)
580 }
581 }
582
583 pub(super) fn assign_cdc_backfill_splits(
584 &mut self,
585 job_id: JobId,
586 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
587 let job = self.jobs.get_mut(&job_id).expect("should exist");
588 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
589 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
590 let actors = job.fragment_infos[&cdc_scan_fragment_id]
591 .actors
592 .keys()
593 .copied()
594 .collect();
595 tracker.reassign_splits(actors).map(Some)
596 } else {
597 Ok(None)
598 }
599 }
600
601 pub(super) fn apply_collected_command(
602 &mut self,
603 command: &PostCollectCommand,
604 resps: &[BarrierCompleteResponse],
605 version_stats: &HummockVersionStats,
606 ) {
607 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
608 match job_type {
609 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
610 let job_id = info.streaming_job.id();
611 if let Some(job_info) = self.jobs.get_mut(&job_id) {
612 let CreateStreamingJobStatus::Init = replace(
613 &mut job_info.status,
614 CreateStreamingJobStatus::Creating {
615 tracker: CreateMviewProgressTracker::new(
616 info,
617 version_stats,
618 &job_info.fragment_infos,
619 ),
620 },
621 ) else {
622 unreachable!("should be init before collect the first barrier")
623 };
624 } else {
625 info!(%job_id, "newly create job get cancelled before first barrier is collected")
626 }
627 }
628 CreateStreamingJobType::SnapshotBackfill(_) => {
629 }
631 }
632 }
633 if let PostCollectCommand::Reschedule { reschedules, .. } = command {
634 debug_assert!(
636 reschedules
637 .values()
638 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
639 "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
640 );
641
642 let related_job_ids = reschedules
644 .keys()
645 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
646 .cloned()
647 .collect::<HashSet<_>>();
648 for job_id in related_job_ids {
649 if let Some(job) = self.jobs.get_mut(&job_id)
650 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
651 {
652 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
653 }
654 }
655 }
656 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
657 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
658 warn!(
659 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
660 );
661 continue;
662 };
663 let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
664 CreateStreamingJobStatus::Init => {
665 continue;
666 }
667 CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
668 CreateStreamingJobStatus::Created => {
669 if !progress.done {
670 warn!("update the progress of an created streaming job: {progress:?}");
671 }
672 continue;
673 }
674 };
675 tracker.apply_progress(progress, version_stats);
676 }
677 for progress in resps
678 .iter()
679 .flat_map(|resp| &resp.cdc_table_backfill_progress)
680 {
681 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
682 warn!(
683 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
684 );
685 continue;
686 };
687 let Some(tracker) = &mut self
688 .jobs
689 .get_mut(job_id)
690 .expect("should exist")
691 .cdc_table_backfill_tracker
692 else {
693 warn!("update the cdc progress of an created streaming job: {progress:?}");
694 continue;
695 };
696 tracker.update_split_progress(progress);
697 }
698 for cdc_offset_updated in resps
700 .iter()
701 .flat_map(|resp| &resp.cdc_source_offset_updated)
702 {
703 use risingwave_common::id::SourceId;
704 let source_id = SourceId::new(cdc_offset_updated.source_id);
705 let job_id = source_id.as_share_source_job_id();
706 if let Some(job) = self.jobs.get_mut(&job_id) {
707 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
708 tracker.mark_cdc_source_finished();
709 }
710 } else {
711 warn!(
712 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
713 cdc_offset_updated.source_id, job_id
714 );
715 }
716 }
717 }
718
719 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
720 self.jobs.values().filter_map(|job| match &job.status {
721 CreateStreamingJobStatus::Init => None,
722 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
723 CreateStreamingJobStatus::Created => None,
724 })
725 }
726
727 fn iter_mut_creating_job_tracker(
728 &mut self,
729 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
730 self.jobs
731 .values_mut()
732 .filter_map(|job| match &mut job.status {
733 CreateStreamingJobStatus::Init => None,
734 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
735 CreateStreamingJobStatus::Created => None,
736 })
737 }
738
739 pub(super) fn has_pending_finished_jobs(&self) -> bool {
740 self.iter_creating_job_tracker()
741 .any(|tracker| tracker.is_finished())
742 }
743
744 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
745 self.iter_mut_creating_job_tracker()
746 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
747 .collect()
748 }
749
750 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
751 let mut finished_jobs = vec![];
752 let mut table_ids_to_truncate = vec![];
753 let mut finished_cdc_table_backfill = vec![];
754 for (job_id, job) in &mut self.jobs {
755 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
756 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
757 table_ids_to_truncate.extend(truncate_table_ids);
758 if is_finished {
759 let CreateStreamingJobStatus::Creating { tracker, .. } =
760 replace(&mut job.status, CreateStreamingJobStatus::Created)
761 else {
762 unreachable!()
763 };
764 finished_jobs.push(tracker.into_tracking_job());
765 }
766 }
767 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
768 && tracker.take_pre_completed()
769 {
770 finished_cdc_table_backfill.push(*job_id);
771 }
772 }
773 StagingCommitInfo {
774 finished_jobs,
775 table_ids_to_truncate,
776 finished_cdc_table_backfill,
777 }
778 }
779
780 pub fn fragment_subscribers(
781 &self,
782 fragment_id: FragmentId,
783 ) -> impl Iterator<Item = SubscriberId> + '_ {
784 let job_id = self.fragment_location[&fragment_id];
785 self.jobs[&job_id].subscribers.keys().copied()
786 }
787
788 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
789 self.jobs[&job_id].subscribers.keys().copied()
790 }
791
792 pub fn max_subscription_retention(&self) -> impl Iterator<Item = (TableId, u64)> + '_ {
793 self.jobs.iter().filter_map(|(job_id, info)| {
794 info.subscribers
795 .values()
796 .filter_map(|subscriber| match subscriber {
797 SubscriberType::Subscription(retention) => Some(*retention),
798 SubscriberType::SnapshotBackfill => None,
799 })
800 .max()
801 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
802 })
803 }
804
805 pub fn register_subscriber(
806 &mut self,
807 job_id: JobId,
808 subscriber_id: SubscriberId,
809 subscriber: SubscriberType,
810 ) {
811 self.jobs
812 .get_mut(&job_id)
813 .expect("should exist")
814 .subscribers
815 .try_insert(subscriber_id, subscriber)
816 .expect("non duplicate");
817 }
818
819 pub fn unregister_subscriber(
820 &mut self,
821 job_id: JobId,
822 subscriber_id: SubscriberId,
823 ) -> Option<SubscriberType> {
824 self.jobs
825 .get_mut(&job_id)
826 .expect("should exist")
827 .subscribers
828 .remove(&subscriber_id)
829 }
830
831 pub fn update_subscription_retention(
832 &mut self,
833 job_id: JobId,
834 subscriber_id: SubscriberId,
835 retention_second: u64,
836 ) {
837 let job = self.jobs.get_mut(&job_id).expect("should exist");
838 match job.subscribers.get_mut(&subscriber_id) {
839 Some(SubscriberType::Subscription(current_retention)) => {
840 *current_retention = retention_second;
841 }
842 Some(SubscriberType::SnapshotBackfill) => {
843 warn!(
844 %job_id,
845 %subscriber_id,
846 "cannot update retention for snapshot backfill subscriber"
847 );
848 }
849 None => {
850 warn!(%job_id, %subscriber_id, "subscription subscriber not found");
851 }
852 }
853 }
854
855 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
856 let job_id = self.fragment_location[&fragment_id];
857 let fragment = self
858 .jobs
859 .get_mut(&job_id)
860 .expect("should exist")
861 .fragment_infos
862 .get_mut(&fragment_id)
863 .expect("should exist");
864 (fragment, job_id)
865 }
866
867 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
868 Self {
869 database_id,
870 jobs: Default::default(),
871 fragment_location: Default::default(),
872 shared_actor_infos,
873 }
874 }
875
876 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
877 shared_actor_infos.remove_database(database_id);
879 Self::empty_inner(database_id, shared_actor_infos)
880 }
881
882 pub fn recover(
883 database_id: DatabaseId,
884 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
885 shared_actor_infos: SharedActorInfos,
886 ) -> Self {
887 let mut info = Self::empty_inner(database_id, shared_actor_infos);
888 for job in jobs {
889 info.add_existing(job);
890 }
891 info
892 }
893
894 pub fn is_empty(&self) -> bool {
895 self.jobs.is_empty()
896 }
897
898 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
899 let InflightStreamingJobInfo {
900 job_id,
901 fragment_infos,
902 subscribers,
903 status,
904 cdc_table_backfill_tracker,
905 } = job;
906 self.jobs
907 .try_insert(
908 job_id,
909 InflightStreamingJobInfo {
910 job_id,
911 subscribers,
912 fragment_infos: Default::default(), status,
914 cdc_table_backfill_tracker,
915 },
916 )
917 .expect("non-duplicate");
918 self.pre_apply_new_fragments(
919 fragment_infos
920 .into_iter()
921 .map(|(fragment_id, info)| (fragment_id, job_id, info)),
922 );
923 }
924
925 pub(crate) fn pre_apply_new_job(
927 &mut self,
928 job_id: JobId,
929 cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
930 ) {
931 {
932 self.jobs
933 .try_insert(
934 job_id,
935 InflightStreamingJobInfo {
936 job_id,
937 fragment_infos: Default::default(),
938 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
940 cdc_table_backfill_tracker,
941 },
942 )
943 .expect("non-duplicate");
944 }
945 }
946
947 pub(crate) fn pre_apply_new_fragments(
949 &mut self,
950 fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
951 ) {
952 {
953 let shared_infos = self.shared_actor_infos.clone();
954 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
955 for (fragment_id, job_id, info) in fragments {
956 {
957 {
958 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
959 shared_actor_writer.upsert([(&info, job_id)]);
960 fragment_infos
961 .fragment_infos
962 .try_insert(fragment_id, info)
963 .expect("non duplicate");
964 self.fragment_location
965 .try_insert(fragment_id, job_id)
966 .expect("non duplicate");
967 }
968 }
969 }
970 shared_actor_writer.finish();
971 }
972 }
973
974 pub(crate) fn pre_apply_reschedule(
977 &mut self,
978 fragment_id: FragmentId,
979 new_actors: HashMap<ActorId, InflightActorInfo>,
980 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
981 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
982 ) {
983 {
984 {
985 {
986 {
987 let (info, _) = self.fragment_mut(fragment_id);
988 let actors = &mut info.actors;
989 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
990 actors
991 .get_mut(&actor_id)
992 .expect("should exist")
993 .vnode_bitmap = Some(new_vnodes);
994 }
995 for (actor_id, actor) in new_actors {
996 actors
997 .try_insert(actor_id as _, actor)
998 .expect("non-duplicate");
999 }
1000 for (actor_id, splits) in actor_splits {
1001 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1002 }
1003 }
1005 }
1006 }
1007 }
1008 }
1009
1010 pub(crate) fn pre_apply_replace_node_upstream(
1012 &mut self,
1013 fragment_id: FragmentId,
1014 replace_map: &HashMap<FragmentId, FragmentId>,
1015 ) {
1016 {
1017 {
1018 {
1019 {
1020 let mut remaining_fragment_ids: HashSet<_> =
1021 replace_map.keys().cloned().collect();
1022 let (info, _) = self.fragment_mut(fragment_id);
1023 visit_stream_node_mut(&mut info.nodes, |node| {
1024 if let NodeBody::Merge(m) = node
1025 && let Some(new_upstream_fragment_id) =
1026 replace_map.get(&m.upstream_fragment_id)
1027 {
1028 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1029 if cfg!(debug_assertions) {
1030 panic!(
1031 "duplicate upstream fragment: {:?} {:?}",
1032 m, replace_map
1033 );
1034 } else {
1035 warn!(?m, ?replace_map, "duplicate upstream fragment");
1036 }
1037 }
1038 m.upstream_fragment_id = *new_upstream_fragment_id;
1039 }
1040 });
1041 if cfg!(debug_assertions) {
1042 assert!(
1043 remaining_fragment_ids.is_empty(),
1044 "non-existing fragment to replace: {:?} {:?} {:?}",
1045 remaining_fragment_ids,
1046 info.nodes,
1047 replace_map
1048 );
1049 } else {
1050 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1051 }
1052 }
1053 }
1054 }
1055 }
1056 }
1057
1058 pub(crate) fn pre_apply_add_node_upstream(
1060 &mut self,
1061 fragment_id: FragmentId,
1062 new_upstream_info: &PbUpstreamSinkInfo,
1063 ) {
1064 {
1065 {
1066 {
1067 {
1068 let (info, _) = self.fragment_mut(fragment_id);
1069 let mut injected = false;
1070 visit_stream_node_mut(&mut info.nodes, |node| {
1071 if let NodeBody::UpstreamSinkUnion(u) = node {
1072 if cfg!(debug_assertions) {
1073 let current_upstream_fragment_ids = u
1074 .init_upstreams
1075 .iter()
1076 .map(|upstream| upstream.upstream_fragment_id)
1077 .collect::<HashSet<_>>();
1078 if current_upstream_fragment_ids
1079 .contains(&new_upstream_info.upstream_fragment_id)
1080 {
1081 panic!(
1082 "duplicate upstream fragment: {:?} {:?}",
1083 u, new_upstream_info
1084 );
1085 }
1086 }
1087 u.init_upstreams.push(new_upstream_info.clone());
1088 injected = true;
1089 }
1090 });
1091 assert!(injected, "should inject upstream into UpstreamSinkUnion");
1092 }
1093 }
1094 }
1095 }
1096 }
1097
1098 pub(crate) fn pre_apply_drop_node_upstream(
1100 &mut self,
1101 fragment_id: FragmentId,
1102 drop_upstream_fragment_ids: &[FragmentId],
1103 ) {
1104 {
1105 {
1106 {
1107 {
1108 let (info, _) = self.fragment_mut(fragment_id);
1109 let mut removed = false;
1110 visit_stream_node_mut(&mut info.nodes, |node| {
1111 if let NodeBody::UpstreamSinkUnion(u) = node {
1112 if cfg!(debug_assertions) {
1113 let current_upstream_fragment_ids = u
1114 .init_upstreams
1115 .iter()
1116 .map(|upstream| upstream.upstream_fragment_id)
1117 .collect::<HashSet<FragmentId>>();
1118 for drop_fragment_id in drop_upstream_fragment_ids {
1119 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1120 {
1121 panic!(
1122 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1123 u, drop_upstream_fragment_ids, drop_fragment_id
1124 );
1125 }
1126 }
1127 }
1128 u.init_upstreams.retain(|upstream| {
1129 !drop_upstream_fragment_ids
1130 .contains(&upstream.upstream_fragment_id)
1131 });
1132 removed = true;
1133 }
1134 });
1135 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1136 }
1137 }
1138 }
1139 }
1140 }
1141
1142 pub(crate) fn pre_apply_split_assignments(
1144 &mut self,
1145 assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1146 ) {
1147 {
1148 let shared_infos = self.shared_actor_infos.clone();
1149 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1150 {
1151 {
1152 for (fragment_id, actor_splits) in assignments {
1153 let (info, job_id) = self.fragment_mut(fragment_id);
1154 let actors = &mut info.actors;
1155 for (actor_id, splits) in actor_splits {
1156 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1157 }
1158 shared_actor_writer.upsert([(&*info, job_id)]);
1159 }
1160 }
1161 }
1162 shared_actor_writer.finish();
1163 }
1164 }
1165
1166 pub(super) fn build_edge(
1167 &self,
1168 info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1169 replace_job: Option<&ReplaceStreamJobPlan>,
1170 new_upstream_sink: Option<&UpstreamSinkInfo>,
1171 control_stream_manager: &ControlStreamManager,
1172 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1173 actor_location: &HashMap<ActorId, WorkerId>,
1174 ) -> FragmentEdgeBuildResult {
1175 let existing_fragment_ids = info
1183 .into_iter()
1184 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1185 .chain(replace_job.into_iter().flat_map(|replace_job| {
1186 replace_job
1187 .upstream_fragment_downstreams
1188 .keys()
1189 .filter(|fragment_id| {
1190 info.map(|(info, _)| {
1191 !info
1192 .stream_job_fragments
1193 .fragments
1194 .contains_key(*fragment_id)
1195 })
1196 .unwrap_or(true)
1197 })
1198 .chain(replace_job.replace_upstream.keys())
1199 }))
1200 .chain(
1201 new_upstream_sink
1202 .into_iter()
1203 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1204 )
1205 .cloned();
1206 let new_fragments = info
1208 .into_iter()
1209 .flat_map(|(info, is_snapshot_backfill)| {
1210 let partial_graph_id = to_partial_graph_id(
1211 self.database_id,
1212 is_snapshot_backfill.then_some(info.streaming_job.id()),
1213 );
1214 info.stream_job_fragments
1215 .fragments
1216 .values()
1217 .map(move |fragment| (partial_graph_id, fragment))
1218 })
1219 .chain(replace_job.into_iter().flat_map(|replace_job| {
1220 replace_job
1221 .new_fragments
1222 .fragments
1223 .values()
1224 .chain(
1225 replace_job
1226 .auto_refresh_schema_sinks
1227 .as_ref()
1228 .into_iter()
1229 .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1230 )
1231 .map(|fragment| {
1232 (
1233 to_partial_graph_id(self.database_id, None),
1235 fragment,
1236 )
1237 })
1238 }));
1239
1240 let mut builder = FragmentEdgeBuilder::new(
1241 existing_fragment_ids
1243 .map(|fragment_id| {
1244 (
1245 fragment_id,
1246 EdgeBuilderFragmentInfo::from_inflight(
1247 self.fragment(fragment_id),
1248 to_partial_graph_id(self.database_id, None),
1249 control_stream_manager,
1250 ),
1251 )
1252 })
1253 .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1255 (
1256 fragment.fragment_id,
1257 EdgeBuilderFragmentInfo::from_fragment(
1258 fragment,
1259 stream_actors,
1260 actor_location,
1261 partial_graph_id,
1262 control_stream_manager,
1263 ),
1264 )
1265 })),
1266 );
1267 if let Some((info, _)) = info {
1268 builder.add_relations(&info.upstream_fragment_downstreams);
1269 builder.add_relations(&info.stream_job_fragments.downstreams);
1270 }
1271 if let Some(replace_job) = replace_job {
1272 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1273 builder.add_relations(&replace_job.new_fragments.downstreams);
1274 }
1275 if let Some(new_upstream_sink) = new_upstream_sink {
1276 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1277 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1278 builder.add_edge(sink_fragment_id, new_sink_downstream);
1279 }
1280 if let Some(replace_job) = replace_job {
1281 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1282 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1283 fragment_replacement
1284 {
1285 builder.replace_upstream(
1286 *fragment_id,
1287 *original_upstream_fragment_id,
1288 *new_upstream_fragment_id,
1289 );
1290 }
1291 }
1292 }
1293 builder.build()
1294 }
1295
1296 pub(crate) fn post_apply_reschedules(
1298 &mut self,
1299 reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1300 ) {
1301 let inner = self.shared_actor_infos.clone();
1302 let mut shared_actor_writer = inner.start_writer(self.database_id);
1303 {
1304 {
1305 {
1306 for (fragment_id, to_remove) in reschedules {
1307 let job_id = self.fragment_location[&fragment_id];
1308 let info = self
1309 .jobs
1310 .get_mut(&job_id)
1311 .expect("should exist")
1312 .fragment_infos
1313 .get_mut(&fragment_id)
1314 .expect("should exist");
1315 for actor_id in to_remove {
1316 assert!(info.actors.remove(&actor_id).is_some());
1317 }
1318 shared_actor_writer.upsert([(&*info, job_id)]);
1319 }
1320 }
1321 }
1322 }
1323 shared_actor_writer.finish();
1324 }
1325
1326 pub(crate) fn post_apply_remove_fragments(
1328 &mut self,
1329 fragment_ids: impl IntoIterator<Item = FragmentId>,
1330 ) {
1331 let inner = self.shared_actor_infos.clone();
1332 let mut shared_actor_writer = inner.start_writer(self.database_id);
1333 {
1334 {
1335 {
1336 for fragment_id in fragment_ids {
1337 let job_id = self
1338 .fragment_location
1339 .remove(&fragment_id)
1340 .expect("should exist");
1341 let job = self.jobs.get_mut(&job_id).expect("should exist");
1342 let fragment = job
1343 .fragment_infos
1344 .remove(&fragment_id)
1345 .expect("should exist");
1346 shared_actor_writer.remove(&fragment);
1347 if job.fragment_infos.is_empty() {
1348 self.jobs.remove(&job_id).expect("should exist");
1349 }
1350 }
1351 }
1352 }
1353 }
1354 shared_actor_writer.finish();
1355 }
1356}
1357
1358impl InflightFragmentInfo {
1359 pub(crate) fn actor_ids_to_collect(
1361 infos: impl IntoIterator<Item = &Self>,
1362 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1363 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1364 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1365 assert!(
1366 ret.entry(actor.worker_id)
1367 .or_default()
1368 .insert(*actor_id as _)
1369 )
1370 }
1371 ret
1372 }
1373
1374 pub fn existing_table_ids<'a>(
1375 infos: impl IntoIterator<Item = &'a Self> + 'a,
1376 ) -> impl Iterator<Item = TableId> + 'a {
1377 infos
1378 .into_iter()
1379 .flat_map(|info| info.state_table_ids.iter().cloned())
1380 }
1381
1382 pub fn workers<'a>(
1383 infos: impl IntoIterator<Item = &'a Self> + 'a,
1384 ) -> impl Iterator<Item = WorkerId> + 'a {
1385 infos
1386 .into_iter()
1387 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1388 }
1389
1390 pub fn contains_worker<'a>(
1391 infos: impl IntoIterator<Item = &'a Self> + 'a,
1392 worker_id: WorkerId,
1393 ) -> bool {
1394 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1395 }
1396}
1397
1398impl InflightDatabaseInfo {
1399 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1400 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1401 }
1402
1403 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1404 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1405 }
1406}