1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::ddl_service::PbBackfillType;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::SubscriberId;
34use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
35use risingwave_pb::meta::subscribe_response::Operation;
36use risingwave_pb::source::PbCdcTableSnapshotSplits;
37use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::{info, warn};
41
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::command::{
44 CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
45};
46use crate::barrier::edge_builder::{
47 EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
48};
49use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
50use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
51use crate::barrier::{
52 BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
53};
54use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
55use crate::controller::utils::rebuild_fragment_mapping;
56use crate::manager::NotificationManagerRef;
57use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
58use crate::stream::UpstreamSinkInfo;
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone)]
62pub struct SharedActorInfo {
63 pub worker_id: WorkerId,
64 pub vnode_bitmap: Option<Bitmap>,
65 pub splits: Vec<SplitImpl>,
66}
67
68impl From<&InflightActorInfo> for SharedActorInfo {
69 fn from(value: &InflightActorInfo) -> Self {
70 Self {
71 worker_id: value.worker_id,
72 vnode_bitmap: value.vnode_bitmap.clone(),
73 splits: value.splits.clone(),
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
79pub struct SharedFragmentInfo {
80 pub fragment_id: FragmentId,
81 pub job_id: JobId,
82 pub distribution_type: DistributionType,
83 pub actors: HashMap<ActorId, SharedActorInfo>,
84 pub vnode_count: usize,
85 pub fragment_type_mask: FragmentTypeMask,
86}
87
88impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
89 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
90 let (info, job_id) = pair;
91
92 let InflightFragmentInfo {
93 fragment_id,
94 distribution_type,
95 fragment_type_mask,
96 actors,
97 vnode_count,
98 ..
99 } = info;
100
101 Self {
102 fragment_id: *fragment_id,
103 job_id,
104 distribution_type: *distribution_type,
105 fragment_type_mask: *fragment_type_mask,
106 actors: actors
107 .iter()
108 .map(|(actor_id, actor)| (*actor_id, actor.into()))
109 .collect(),
110 vnode_count: *vnode_count,
111 }
112 }
113}
114
115#[derive(Default, Debug)]
116pub struct SharedActorInfosInner {
117 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
118}
119
120impl SharedActorInfosInner {
121 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
122 self.info
123 .values()
124 .find_map(|database| database.get(&fragment_id))
125 }
126
127 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
128 self.info.values().flatten()
129 }
130}
131
132#[derive(Clone, educe::Educe)]
133#[educe(Debug)]
134pub struct SharedActorInfos {
135 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
136 #[educe(Debug(ignore))]
137 notification_manager: NotificationManagerRef,
138}
139
140impl SharedActorInfos {
141 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
142 self.inner.read()
143 }
144
145 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
146 let core = self.inner.read();
147 core.iter_over_fragments()
148 .flat_map(|(_, fragment)| {
149 fragment
150 .actors
151 .iter()
152 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
153 })
154 .collect()
155 }
156
157 pub fn migrate_splits_for_source_actors(
163 &self,
164 fragment_id: FragmentId,
165 prev_actor_ids: &[ActorId],
166 curr_actor_ids: &[ActorId],
167 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
168 let guard = self.read_guard();
169
170 let prev_splits = prev_actor_ids
171 .iter()
172 .flat_map(|actor_id| {
173 guard
175 .get_fragment(fragment_id)
176 .and_then(|info| info.actors.get(actor_id))
177 .map(|actor| actor.splits.clone())
178 .unwrap_or_default()
179 })
180 .map(|split| (split.id(), split))
181 .collect();
182
183 let empty_actor_splits = curr_actor_ids
184 .iter()
185 .map(|actor_id| (*actor_id, vec![]))
186 .collect();
187
188 let diff = crate::stream::source_manager::reassign_splits(
189 fragment_id,
190 empty_actor_splits,
191 &prev_splits,
192 std::default::Default::default(),
194 )
195 .unwrap_or_default();
196
197 Ok(diff)
198 }
199}
200
201impl SharedActorInfos {
202 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
203 Self {
204 inner: Arc::new(Default::default()),
205 notification_manager,
206 }
207 }
208
209 pub(super) fn remove_database(&self, database_id: DatabaseId) {
210 if let Some(database) = self.inner.write().info.remove(&database_id) {
211 let mapping = database
212 .into_values()
213 .map(|fragment| rebuild_fragment_mapping(&fragment))
214 .collect_vec();
215 if !mapping.is_empty() {
216 self.notification_manager
217 .notify_fragment_mapping(Operation::Delete, mapping);
218 }
219 }
220 }
221
222 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
223 let database_ids: HashSet<_> = database_ids.into_iter().collect();
224
225 let mut mapping = Vec::new();
226 for fragment in self
227 .inner
228 .write()
229 .info
230 .extract_if(|database_id, _| !database_ids.contains(database_id))
231 .flat_map(|(_, fragments)| fragments.into_values())
232 {
233 mapping.push(rebuild_fragment_mapping(&fragment));
234 }
235 if !mapping.is_empty() {
236 self.notification_manager
237 .notify_fragment_mapping(Operation::Delete, mapping);
238 }
239 }
240
241 pub(super) fn recover_database(
242 &self,
243 database_id: DatabaseId,
244 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
245 ) {
246 let mut remaining_fragments: HashMap<_, _> = fragments
247 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
248 .collect();
249 let mut writer = self.start_writer(database_id);
251 let database = writer.write_guard.info.entry(database_id).or_default();
252 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
253 if let Some(info) = remaining_fragments.remove(fragment_id) {
254 let info = info.into();
255 writer
256 .updated_fragment_mapping
257 .get_or_insert_default()
258 .push(rebuild_fragment_mapping(&info));
259 *fragment_infos = info;
260 false
261 } else {
262 true
263 }
264 }) {
265 writer
266 .deleted_fragment_mapping
267 .get_or_insert_default()
268 .push(rebuild_fragment_mapping(&fragment));
269 }
270 for (fragment_id, info) in remaining_fragments {
271 let info = info.into();
272 writer
273 .added_fragment_mapping
274 .get_or_insert_default()
275 .push(rebuild_fragment_mapping(&info));
276 database.insert(fragment_id, info);
277 }
278 writer.finish();
279 }
280
281 pub(super) fn upsert(
282 &self,
283 database_id: DatabaseId,
284 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
285 ) {
286 let mut writer = self.start_writer(database_id);
287 writer.upsert(infos);
288 writer.finish();
289 }
290
291 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
292 SharedActorInfoWriter {
293 database_id,
294 write_guard: self.inner.write(),
295 notification_manager: &self.notification_manager,
296 added_fragment_mapping: None,
297 updated_fragment_mapping: None,
298 deleted_fragment_mapping: None,
299 }
300 }
301}
302
303pub(super) struct SharedActorInfoWriter<'a> {
304 database_id: DatabaseId,
305 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
306 notification_manager: &'a NotificationManagerRef,
307 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
308 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
309 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310}
311
312impl SharedActorInfoWriter<'_> {
313 pub(super) fn upsert(
314 &mut self,
315 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
316 ) {
317 let database = self.write_guard.info.entry(self.database_id).or_default();
318 for info @ (fragment, _) in infos {
319 match database.entry(fragment.fragment_id) {
320 Entry::Occupied(mut entry) => {
321 let info = info.into();
322 self.updated_fragment_mapping
323 .get_or_insert_default()
324 .push(rebuild_fragment_mapping(&info));
325 entry.insert(info);
326 }
327 Entry::Vacant(entry) => {
328 let info = info.into();
329 self.added_fragment_mapping
330 .get_or_insert_default()
331 .push(rebuild_fragment_mapping(&info));
332 entry.insert(info);
333 }
334 }
335 }
336 }
337
338 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
339 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
340 && let Some(fragment) = database.remove(&info.fragment_id)
341 {
342 self.deleted_fragment_mapping
343 .get_or_insert_default()
344 .push(rebuild_fragment_mapping(&fragment));
345 }
346 }
347
348 pub(super) fn finish(self) {
349 if let Some(mapping) = self.added_fragment_mapping {
350 self.notification_manager
351 .notify_fragment_mapping(Operation::Add, mapping);
352 }
353 if let Some(mapping) = self.updated_fragment_mapping {
354 self.notification_manager
355 .notify_fragment_mapping(Operation::Update, mapping);
356 }
357 if let Some(mapping) = self.deleted_fragment_mapping {
358 self.notification_manager
359 .notify_fragment_mapping(Operation::Delete, mapping);
360 }
361 }
362}
363
364#[derive(Debug, Clone)]
365pub(super) struct BarrierInfo {
366 pub prev_epoch: TracedEpoch,
367 pub curr_epoch: TracedEpoch,
368 pub kind: BarrierKind,
369}
370
371impl BarrierInfo {
372 pub(super) fn prev_epoch(&self) -> u64 {
373 self.prev_epoch.value().0
374 }
375
376 pub(super) fn curr_epoch(&self) -> u64 {
377 self.curr_epoch.value().0
378 }
379
380 pub(super) fn epoch(&self) -> EpochPair {
381 EpochPair {
382 curr: self.curr_epoch(),
383 prev: self.prev_epoch(),
384 }
385 }
386}
387
388#[derive(Clone, Debug)]
389pub enum SubscriberType {
390 Subscription(u64),
391 SnapshotBackfill,
392}
393
394#[derive(Debug)]
395pub(super) enum CreateStreamingJobStatus {
396 Init,
397 Creating { tracker: CreateMviewProgressTracker },
398 Created,
399}
400
401#[derive(Debug)]
402pub(super) struct InflightStreamingJobInfo {
403 pub job_id: JobId,
404 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
405 pub subscribers: HashMap<SubscriberId, SubscriberType>,
406 pub status: CreateStreamingJobStatus,
407 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
408}
409
410impl InflightStreamingJobInfo {
411 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
412 self.fragment_infos.values()
413 }
414
415 pub fn snapshot_backfill_actor_ids(
416 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
417 ) -> impl Iterator<Item = ActorId> + '_ {
418 fragment_infos
419 .values()
420 .filter(|fragment| {
421 fragment
422 .fragment_type_mask
423 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
424 })
425 .flat_map(|fragment| fragment.actors.keys().copied())
426 }
427
428 pub fn tracking_progress_actor_ids(
429 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430 ) -> Vec<(ActorId, BackfillUpstreamType)> {
431 StreamJobFragments::tracking_progress_actor_ids_impl(
432 fragment_infos
433 .values()
434 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
435 )
436 }
437}
438
439impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
440 type Item = &'a InflightFragmentInfo;
441
442 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
443
444 fn into_iter(self) -> Self::IntoIter {
445 self.fragment_infos()
446 }
447}
448
449#[derive(Debug)]
450pub struct InflightDatabaseInfo {
451 pub(super) database_id: DatabaseId,
452 jobs: HashMap<JobId, InflightStreamingJobInfo>,
453 fragment_location: HashMap<FragmentId, JobId>,
454 pub(super) shared_actor_infos: SharedActorInfos,
455}
456
457impl InflightDatabaseInfo {
458 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
459 self.jobs.values().flat_map(|job| job.fragment_infos())
460 }
461
462 pub fn contains_job(&self, job_id: JobId) -> bool {
463 self.jobs.contains_key(&job_id)
464 }
465
466 pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
467 self.fragment_location.get(&fragment_id).copied()
468 }
469
470 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
471 let job_id = self.fragment_location[&fragment_id];
472 self.jobs
473 .get(&job_id)
474 .expect("should exist")
475 .fragment_infos
476 .get(&fragment_id)
477 .expect("should exist")
478 }
479
480 pub(super) fn backfill_fragment_ids_for_job(
481 &self,
482 job_id: JobId,
483 ) -> MetaResult<HashSet<FragmentId>> {
484 let job = self
485 .jobs
486 .get(&job_id)
487 .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
488 Ok(job
489 .fragment_infos
490 .iter()
491 .filter_map(|(fragment_id, fragment)| {
492 fragment
493 .fragment_type_mask
494 .contains_any([
495 FragmentTypeFlag::StreamScan,
496 FragmentTypeFlag::SourceScan,
497 FragmentTypeFlag::LocalityProvider,
498 ])
499 .then_some(*fragment_id)
500 })
501 .collect())
502 }
503
504 pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
505 let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
506 MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
507 })?;
508 let fragment = self
509 .jobs
510 .get(job_id)
511 .expect("should exist")
512 .fragment_infos
513 .get(&fragment_id)
514 .expect("should exist");
515 Ok(fragment.fragment_type_mask.contains_any([
516 FragmentTypeFlag::StreamScan,
517 FragmentTypeFlag::SourceScan,
518 FragmentTypeFlag::LocalityProvider,
519 ]))
520 }
521
522 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
523 self.jobs
524 .iter()
525 .filter_map(|(job_id, job)| match &job.status {
526 CreateStreamingJobStatus::Init => None,
527 CreateStreamingJobStatus::Creating { tracker } => {
528 let progress = tracker.gen_backfill_progress();
529 Some((
530 *job_id,
531 BackfillProgress {
532 progress,
533 backfill_type: PbBackfillType::NormalBackfill,
534 },
535 ))
536 }
537 CreateStreamingJobStatus::Created => None,
538 })
539 }
540
541 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
542 self.jobs.iter().filter_map(|(job_id, job)| {
543 job.cdc_table_backfill_tracker
544 .as_ref()
545 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
546 })
547 }
548
549 pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
550 let mut result = Vec::new();
551 for job in self.jobs.values() {
552 let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
553 continue;
554 };
555 let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
556 result.extend(fragment_progress);
557 }
558 result
559 }
560
561 pub(super) fn may_assign_fragment_cdc_backfill_splits(
562 &mut self,
563 fragment_id: FragmentId,
564 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
565 let job_id = self.fragment_location[&fragment_id];
566 let job = self.jobs.get_mut(&job_id).expect("should exist");
567 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
568 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
569 if cdc_scan_fragment_id != fragment_id {
570 return Ok(None);
571 }
572 let actors = job.fragment_infos[&cdc_scan_fragment_id]
573 .actors
574 .keys()
575 .copied()
576 .collect();
577 tracker.reassign_splits(actors).map(Some)
578 } else {
579 Ok(None)
580 }
581 }
582
583 pub(super) fn assign_cdc_backfill_splits(
584 &mut self,
585 job_id: JobId,
586 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
587 let job = self.jobs.get_mut(&job_id).expect("should exist");
588 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
589 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
590 let actors = job.fragment_infos[&cdc_scan_fragment_id]
591 .actors
592 .keys()
593 .copied()
594 .collect();
595 tracker.reassign_splits(actors).map(Some)
596 } else {
597 Ok(None)
598 }
599 }
600
601 pub(super) fn apply_collected_command(
602 &mut self,
603 command: &PostCollectCommand,
604 resps: &[BarrierCompleteResponse],
605 version_stats: &HummockVersionStats,
606 ) {
607 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
608 match job_type {
609 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
610 let job_id = info.streaming_job.id();
611 if let Some(job_info) = self.jobs.get_mut(&job_id) {
612 let CreateStreamingJobStatus::Init = replace(
613 &mut job_info.status,
614 CreateStreamingJobStatus::Creating {
615 tracker: CreateMviewProgressTracker::new(
616 info,
617 version_stats,
618 &job_info.fragment_infos,
619 ),
620 },
621 ) else {
622 unreachable!("should be init before collect the first barrier")
623 };
624 } else {
625 info!(%job_id, "newly create job get cancelled before first barrier is collected")
626 }
627 }
628 CreateStreamingJobType::SnapshotBackfill(_) => {
629 }
631 }
632 }
633 if let PostCollectCommand::Reschedule { reschedules, .. } = command {
634 debug_assert!(
636 reschedules
637 .values()
638 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
639 "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
640 );
641
642 let related_job_ids = reschedules
644 .keys()
645 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
646 .cloned()
647 .collect::<HashSet<_>>();
648 for job_id in related_job_ids {
649 if let Some(job) = self.jobs.get_mut(&job_id)
650 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
651 {
652 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
653 }
654 }
655 }
656 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
657 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
658 warn!(
659 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
660 );
661 continue;
662 };
663 let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
664 CreateStreamingJobStatus::Init => {
665 continue;
666 }
667 CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
668 CreateStreamingJobStatus::Created => {
669 if !progress.done {
670 warn!("update the progress of an created streaming job: {progress:?}");
671 }
672 continue;
673 }
674 };
675 tracker.apply_progress(progress, version_stats);
676 }
677 for progress in resps
678 .iter()
679 .flat_map(|resp| &resp.cdc_table_backfill_progress)
680 {
681 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
682 warn!(
683 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
684 );
685 continue;
686 };
687 let Some(tracker) = &mut self
688 .jobs
689 .get_mut(job_id)
690 .expect("should exist")
691 .cdc_table_backfill_tracker
692 else {
693 warn!("update the cdc progress of an created streaming job: {progress:?}");
694 continue;
695 };
696 tracker.update_split_progress(progress);
697 }
698 for cdc_offset_updated in resps
700 .iter()
701 .flat_map(|resp| &resp.cdc_source_offset_updated)
702 {
703 use risingwave_common::id::SourceId;
704 let source_id = SourceId::new(cdc_offset_updated.source_id);
705 let job_id = source_id.as_share_source_job_id();
706 if let Some(job) = self.jobs.get_mut(&job_id) {
707 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
708 tracker.mark_cdc_source_finished();
709 }
710 } else {
711 warn!(
712 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
713 cdc_offset_updated.source_id, job_id
714 );
715 }
716 }
717 }
718
719 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
720 self.jobs.values().filter_map(|job| match &job.status {
721 CreateStreamingJobStatus::Init => None,
722 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
723 CreateStreamingJobStatus::Created => None,
724 })
725 }
726
727 fn iter_mut_creating_job_tracker(
728 &mut self,
729 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
730 self.jobs
731 .values_mut()
732 .filter_map(|job| match &mut job.status {
733 CreateStreamingJobStatus::Init => None,
734 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
735 CreateStreamingJobStatus::Created => None,
736 })
737 }
738
739 pub(super) fn has_pending_finished_jobs(&self) -> bool {
740 self.iter_creating_job_tracker()
741 .any(|tracker| tracker.is_finished())
742 }
743
744 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
745 self.iter_mut_creating_job_tracker()
746 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
747 .collect()
748 }
749
750 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
751 let mut finished_jobs = vec![];
752 let mut table_ids_to_truncate = vec![];
753 let mut finished_cdc_table_backfill = vec![];
754 for (job_id, job) in &mut self.jobs {
755 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
756 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
757 table_ids_to_truncate.extend(truncate_table_ids);
758 if is_finished {
759 let CreateStreamingJobStatus::Creating { tracker, .. } =
760 replace(&mut job.status, CreateStreamingJobStatus::Created)
761 else {
762 unreachable!()
763 };
764 finished_jobs.push(tracker.into_tracking_job());
765 }
766 }
767 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
768 && tracker.take_pre_completed()
769 {
770 finished_cdc_table_backfill.push(*job_id);
771 }
772 }
773 StagingCommitInfo {
774 finished_jobs,
775 table_ids_to_truncate,
776 finished_cdc_table_backfill,
777 }
778 }
779
780 pub fn fragment_subscribers(
781 &self,
782 fragment_id: FragmentId,
783 ) -> impl Iterator<Item = SubscriberId> + '_ {
784 let job_id = self.fragment_location[&fragment_id];
785 self.jobs[&job_id].subscribers.keys().copied()
786 }
787
788 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
789 self.jobs[&job_id].subscribers.keys().copied()
790 }
791
792 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
793 self.jobs
794 .iter()
795 .filter_map(|(job_id, info)| {
796 info.subscribers
797 .values()
798 .filter_map(|subscriber| match subscriber {
799 SubscriberType::Subscription(retention) => Some(*retention),
800 SubscriberType::SnapshotBackfill => None,
801 })
802 .max()
803 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
804 })
805 .collect()
806 }
807
808 pub fn register_subscriber(
809 &mut self,
810 job_id: JobId,
811 subscriber_id: SubscriberId,
812 subscriber: SubscriberType,
813 ) {
814 self.jobs
815 .get_mut(&job_id)
816 .expect("should exist")
817 .subscribers
818 .try_insert(subscriber_id, subscriber)
819 .expect("non duplicate");
820 }
821
822 pub fn unregister_subscriber(
823 &mut self,
824 job_id: JobId,
825 subscriber_id: SubscriberId,
826 ) -> Option<SubscriberType> {
827 self.jobs
828 .get_mut(&job_id)
829 .expect("should exist")
830 .subscribers
831 .remove(&subscriber_id)
832 }
833
834 pub fn update_subscription_retention(
835 &mut self,
836 job_id: JobId,
837 subscriber_id: SubscriberId,
838 retention_second: u64,
839 ) {
840 let job = self.jobs.get_mut(&job_id).expect("should exist");
841 match job.subscribers.get_mut(&subscriber_id) {
842 Some(SubscriberType::Subscription(current_retention)) => {
843 *current_retention = retention_second;
844 }
845 Some(SubscriberType::SnapshotBackfill) => {
846 warn!(
847 %job_id,
848 %subscriber_id,
849 "cannot update retention for snapshot backfill subscriber"
850 );
851 }
852 None => {
853 warn!(%job_id, %subscriber_id, "subscription subscriber not found");
854 }
855 }
856 }
857
858 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
859 let job_id = self.fragment_location[&fragment_id];
860 let fragment = self
861 .jobs
862 .get_mut(&job_id)
863 .expect("should exist")
864 .fragment_infos
865 .get_mut(&fragment_id)
866 .expect("should exist");
867 (fragment, job_id)
868 }
869
870 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
871 Self {
872 database_id,
873 jobs: Default::default(),
874 fragment_location: Default::default(),
875 shared_actor_infos,
876 }
877 }
878
879 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
880 shared_actor_infos.remove_database(database_id);
882 Self::empty_inner(database_id, shared_actor_infos)
883 }
884
885 pub fn recover(
886 database_id: DatabaseId,
887 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
888 shared_actor_infos: SharedActorInfos,
889 ) -> Self {
890 let mut info = Self::empty_inner(database_id, shared_actor_infos);
891 for job in jobs {
892 info.add_existing(job);
893 }
894 info
895 }
896
897 pub fn is_empty(&self) -> bool {
898 self.jobs.is_empty()
899 }
900
901 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
902 let InflightStreamingJobInfo {
903 job_id,
904 fragment_infos,
905 subscribers,
906 status,
907 cdc_table_backfill_tracker,
908 } = job;
909 self.jobs
910 .try_insert(
911 job_id,
912 InflightStreamingJobInfo {
913 job_id,
914 subscribers,
915 fragment_infos: Default::default(), status,
917 cdc_table_backfill_tracker,
918 },
919 )
920 .expect("non-duplicate");
921 self.pre_apply_new_fragments(
922 fragment_infos
923 .into_iter()
924 .map(|(fragment_id, info)| (fragment_id, job_id, info)),
925 );
926 }
927
928 pub(crate) fn pre_apply_new_job(
930 &mut self,
931 job_id: JobId,
932 cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
933 ) {
934 {
935 self.jobs
936 .try_insert(
937 job_id,
938 InflightStreamingJobInfo {
939 job_id,
940 fragment_infos: Default::default(),
941 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
943 cdc_table_backfill_tracker,
944 },
945 )
946 .expect("non-duplicate");
947 }
948 }
949
950 pub(crate) fn pre_apply_new_fragments(
952 &mut self,
953 fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
954 ) {
955 {
956 let shared_infos = self.shared_actor_infos.clone();
957 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
958 for (fragment_id, job_id, info) in fragments {
959 {
960 {
961 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
962 shared_actor_writer.upsert([(&info, job_id)]);
963 fragment_infos
964 .fragment_infos
965 .try_insert(fragment_id, info)
966 .expect("non duplicate");
967 self.fragment_location
968 .try_insert(fragment_id, job_id)
969 .expect("non duplicate");
970 }
971 }
972 }
973 shared_actor_writer.finish();
974 }
975 }
976
977 pub(crate) fn pre_apply_reschedule(
980 &mut self,
981 fragment_id: FragmentId,
982 new_actors: HashMap<ActorId, InflightActorInfo>,
983 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
984 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
985 ) {
986 {
987 {
988 {
989 {
990 let (info, _) = self.fragment_mut(fragment_id);
991 let actors = &mut info.actors;
992 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
993 actors
994 .get_mut(&actor_id)
995 .expect("should exist")
996 .vnode_bitmap = Some(new_vnodes);
997 }
998 for (actor_id, actor) in new_actors {
999 actors
1000 .try_insert(actor_id as _, actor)
1001 .expect("non-duplicate");
1002 }
1003 for (actor_id, splits) in actor_splits {
1004 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1005 }
1006 }
1008 }
1009 }
1010 }
1011 }
1012
1013 pub(crate) fn pre_apply_replace_node_upstream(
1015 &mut self,
1016 fragment_id: FragmentId,
1017 replace_map: &HashMap<FragmentId, FragmentId>,
1018 ) {
1019 {
1020 {
1021 {
1022 {
1023 let mut remaining_fragment_ids: HashSet<_> =
1024 replace_map.keys().cloned().collect();
1025 let (info, _) = self.fragment_mut(fragment_id);
1026 visit_stream_node_mut(&mut info.nodes, |node| {
1027 if let NodeBody::Merge(m) = node
1028 && let Some(new_upstream_fragment_id) =
1029 replace_map.get(&m.upstream_fragment_id)
1030 {
1031 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1032 if cfg!(debug_assertions) {
1033 panic!(
1034 "duplicate upstream fragment: {:?} {:?}",
1035 m, replace_map
1036 );
1037 } else {
1038 warn!(?m, ?replace_map, "duplicate upstream fragment");
1039 }
1040 }
1041 m.upstream_fragment_id = *new_upstream_fragment_id;
1042 }
1043 });
1044 if cfg!(debug_assertions) {
1045 assert!(
1046 remaining_fragment_ids.is_empty(),
1047 "non-existing fragment to replace: {:?} {:?} {:?}",
1048 remaining_fragment_ids,
1049 info.nodes,
1050 replace_map
1051 );
1052 } else {
1053 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1054 }
1055 }
1056 }
1057 }
1058 }
1059 }
1060
1061 pub(crate) fn pre_apply_add_node_upstream(
1063 &mut self,
1064 fragment_id: FragmentId,
1065 new_upstream_info: &PbUpstreamSinkInfo,
1066 ) {
1067 {
1068 {
1069 {
1070 {
1071 let (info, _) = self.fragment_mut(fragment_id);
1072 let mut injected = false;
1073 visit_stream_node_mut(&mut info.nodes, |node| {
1074 if let NodeBody::UpstreamSinkUnion(u) = node {
1075 if cfg!(debug_assertions) {
1076 let current_upstream_fragment_ids = u
1077 .init_upstreams
1078 .iter()
1079 .map(|upstream| upstream.upstream_fragment_id)
1080 .collect::<HashSet<_>>();
1081 if current_upstream_fragment_ids
1082 .contains(&new_upstream_info.upstream_fragment_id)
1083 {
1084 panic!(
1085 "duplicate upstream fragment: {:?} {:?}",
1086 u, new_upstream_info
1087 );
1088 }
1089 }
1090 u.init_upstreams.push(new_upstream_info.clone());
1091 injected = true;
1092 }
1093 });
1094 assert!(injected, "should inject upstream into UpstreamSinkUnion");
1095 }
1096 }
1097 }
1098 }
1099 }
1100
1101 pub(crate) fn pre_apply_drop_node_upstream(
1103 &mut self,
1104 fragment_id: FragmentId,
1105 drop_upstream_fragment_ids: &[FragmentId],
1106 ) {
1107 {
1108 {
1109 {
1110 {
1111 let (info, _) = self.fragment_mut(fragment_id);
1112 let mut removed = false;
1113 visit_stream_node_mut(&mut info.nodes, |node| {
1114 if let NodeBody::UpstreamSinkUnion(u) = node {
1115 if cfg!(debug_assertions) {
1116 let current_upstream_fragment_ids = u
1117 .init_upstreams
1118 .iter()
1119 .map(|upstream| upstream.upstream_fragment_id)
1120 .collect::<HashSet<FragmentId>>();
1121 for drop_fragment_id in drop_upstream_fragment_ids {
1122 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1123 {
1124 panic!(
1125 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1126 u, drop_upstream_fragment_ids, drop_fragment_id
1127 );
1128 }
1129 }
1130 }
1131 u.init_upstreams.retain(|upstream| {
1132 !drop_upstream_fragment_ids
1133 .contains(&upstream.upstream_fragment_id)
1134 });
1135 removed = true;
1136 }
1137 });
1138 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1139 }
1140 }
1141 }
1142 }
1143 }
1144
1145 pub(crate) fn pre_apply_split_assignments(
1147 &mut self,
1148 assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1149 ) {
1150 {
1151 let shared_infos = self.shared_actor_infos.clone();
1152 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1153 {
1154 {
1155 for (fragment_id, actor_splits) in assignments {
1156 let (info, job_id) = self.fragment_mut(fragment_id);
1157 let actors = &mut info.actors;
1158 for (actor_id, splits) in actor_splits {
1159 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1160 }
1161 shared_actor_writer.upsert([(&*info, job_id)]);
1162 }
1163 }
1164 }
1165 shared_actor_writer.finish();
1166 }
1167 }
1168
1169 pub(super) fn build_edge(
1170 &self,
1171 info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1172 replace_job: Option<&ReplaceStreamJobPlan>,
1173 new_upstream_sink: Option<&UpstreamSinkInfo>,
1174 control_stream_manager: &ControlStreamManager,
1175 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1176 actor_location: &HashMap<ActorId, WorkerId>,
1177 ) -> FragmentEdgeBuildResult {
1178 let existing_fragment_ids = info
1186 .into_iter()
1187 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1188 .chain(replace_job.into_iter().flat_map(|replace_job| {
1189 replace_job
1190 .upstream_fragment_downstreams
1191 .keys()
1192 .filter(|fragment_id| {
1193 info.map(|(info, _)| {
1194 !info
1195 .stream_job_fragments
1196 .fragments
1197 .contains_key(*fragment_id)
1198 })
1199 .unwrap_or(true)
1200 })
1201 .chain(replace_job.replace_upstream.keys())
1202 }))
1203 .chain(
1204 new_upstream_sink
1205 .into_iter()
1206 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1207 )
1208 .cloned();
1209 let new_fragments = info
1211 .into_iter()
1212 .flat_map(|(info, is_snapshot_backfill)| {
1213 let partial_graph_id = to_partial_graph_id(
1214 self.database_id,
1215 is_snapshot_backfill.then_some(info.streaming_job.id()),
1216 );
1217 info.stream_job_fragments
1218 .fragments
1219 .values()
1220 .map(move |fragment| (partial_graph_id, fragment))
1221 })
1222 .chain(replace_job.into_iter().flat_map(|replace_job| {
1223 replace_job
1224 .new_fragments
1225 .fragments
1226 .values()
1227 .chain(
1228 replace_job
1229 .auto_refresh_schema_sinks
1230 .as_ref()
1231 .into_iter()
1232 .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1233 )
1234 .map(|fragment| {
1235 (
1236 to_partial_graph_id(self.database_id, None),
1238 fragment,
1239 )
1240 })
1241 }));
1242
1243 let mut builder = FragmentEdgeBuilder::new(
1244 existing_fragment_ids
1246 .map(|fragment_id| {
1247 (
1248 fragment_id,
1249 EdgeBuilderFragmentInfo::from_inflight(
1250 self.fragment(fragment_id),
1251 to_partial_graph_id(self.database_id, None),
1252 control_stream_manager,
1253 ),
1254 )
1255 })
1256 .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1258 (
1259 fragment.fragment_id,
1260 EdgeBuilderFragmentInfo::from_fragment(
1261 fragment,
1262 stream_actors,
1263 actor_location,
1264 partial_graph_id,
1265 control_stream_manager,
1266 ),
1267 )
1268 })),
1269 );
1270 if let Some((info, _)) = info {
1271 builder.add_relations(&info.upstream_fragment_downstreams);
1272 builder.add_relations(&info.stream_job_fragments.downstreams);
1273 }
1274 if let Some(replace_job) = replace_job {
1275 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1276 builder.add_relations(&replace_job.new_fragments.downstreams);
1277 }
1278 if let Some(new_upstream_sink) = new_upstream_sink {
1279 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1280 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1281 builder.add_edge(sink_fragment_id, new_sink_downstream);
1282 }
1283 if let Some(replace_job) = replace_job {
1284 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1285 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1286 fragment_replacement
1287 {
1288 builder.replace_upstream(
1289 *fragment_id,
1290 *original_upstream_fragment_id,
1291 *new_upstream_fragment_id,
1292 );
1293 }
1294 }
1295 }
1296 builder.build()
1297 }
1298
1299 pub(crate) fn post_apply_reschedules(
1301 &mut self,
1302 reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1303 ) {
1304 let inner = self.shared_actor_infos.clone();
1305 let mut shared_actor_writer = inner.start_writer(self.database_id);
1306 {
1307 {
1308 {
1309 for (fragment_id, to_remove) in reschedules {
1310 let job_id = self.fragment_location[&fragment_id];
1311 let info = self
1312 .jobs
1313 .get_mut(&job_id)
1314 .expect("should exist")
1315 .fragment_infos
1316 .get_mut(&fragment_id)
1317 .expect("should exist");
1318 for actor_id in to_remove {
1319 assert!(info.actors.remove(&actor_id).is_some());
1320 }
1321 shared_actor_writer.upsert([(&*info, job_id)]);
1322 }
1323 }
1324 }
1325 }
1326 shared_actor_writer.finish();
1327 }
1328
1329 pub(crate) fn post_apply_remove_fragments(
1331 &mut self,
1332 fragment_ids: impl IntoIterator<Item = FragmentId>,
1333 ) {
1334 let inner = self.shared_actor_infos.clone();
1335 let mut shared_actor_writer = inner.start_writer(self.database_id);
1336 {
1337 {
1338 {
1339 for fragment_id in fragment_ids {
1340 let job_id = self
1341 .fragment_location
1342 .remove(&fragment_id)
1343 .expect("should exist");
1344 let job = self.jobs.get_mut(&job_id).expect("should exist");
1345 let fragment = job
1346 .fragment_infos
1347 .remove(&fragment_id)
1348 .expect("should exist");
1349 shared_actor_writer.remove(&fragment);
1350 if job.fragment_infos.is_empty() {
1351 self.jobs.remove(&job_id).expect("should exist");
1352 }
1353 }
1354 }
1355 }
1356 }
1357 shared_actor_writer.finish();
1358 }
1359}
1360
1361impl InflightFragmentInfo {
1362 pub(crate) fn actor_ids_to_collect(
1364 infos: impl IntoIterator<Item = &Self>,
1365 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1366 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1367 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1368 assert!(
1369 ret.entry(actor.worker_id)
1370 .or_default()
1371 .insert(*actor_id as _)
1372 )
1373 }
1374 ret
1375 }
1376
1377 pub fn existing_table_ids<'a>(
1378 infos: impl IntoIterator<Item = &'a Self> + 'a,
1379 ) -> impl Iterator<Item = TableId> + 'a {
1380 infos
1381 .into_iter()
1382 .flat_map(|info| info.state_table_ids.iter().cloned())
1383 }
1384
1385 pub fn workers<'a>(
1386 infos: impl IntoIterator<Item = &'a Self> + 'a,
1387 ) -> impl Iterator<Item = WorkerId> + 'a {
1388 infos
1389 .into_iter()
1390 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1391 }
1392
1393 pub fn contains_worker<'a>(
1394 infos: impl IntoIterator<Item = &'a Self> + 'a,
1395 worker_id: WorkerId,
1396 ) -> bool {
1397 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1398 }
1399}
1400
1401impl InflightDatabaseInfo {
1402 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1403 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1404 }
1405
1406 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1407 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1408 }
1409}