1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::ddl_service::PbBackfillType;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::SubscriberId;
34use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
35use risingwave_pb::meta::subscribe_response::Operation;
36use risingwave_pb::source::PbCdcTableSnapshotSplits;
37use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::{info, warn};
41
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::command::{
44 CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
45};
46use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
47use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
48use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
49use crate::barrier::{
50 BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
51};
52use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
53use crate::controller::utils::rebuild_fragment_mapping;
54use crate::manager::NotificationManagerRef;
55use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
56use crate::stream::UpstreamSinkInfo;
57use crate::{MetaError, MetaResult};
58
59#[derive(Debug, Clone)]
60pub struct SharedActorInfo {
61 pub worker_id: WorkerId,
62 pub vnode_bitmap: Option<Bitmap>,
63 pub splits: Vec<SplitImpl>,
64}
65
66impl From<&InflightActorInfo> for SharedActorInfo {
67 fn from(value: &InflightActorInfo) -> Self {
68 Self {
69 worker_id: value.worker_id,
70 vnode_bitmap: value.vnode_bitmap.clone(),
71 splits: value.splits.clone(),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct SharedFragmentInfo {
78 pub fragment_id: FragmentId,
79 pub job_id: JobId,
80 pub distribution_type: DistributionType,
81 pub actors: HashMap<ActorId, SharedActorInfo>,
82 pub vnode_count: usize,
83 pub fragment_type_mask: FragmentTypeMask,
84}
85
86impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
87 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
88 let (info, job_id) = pair;
89
90 let InflightFragmentInfo {
91 fragment_id,
92 distribution_type,
93 fragment_type_mask,
94 actors,
95 vnode_count,
96 ..
97 } = info;
98
99 Self {
100 fragment_id: *fragment_id,
101 job_id,
102 distribution_type: *distribution_type,
103 fragment_type_mask: *fragment_type_mask,
104 actors: actors
105 .iter()
106 .map(|(actor_id, actor)| (*actor_id, actor.into()))
107 .collect(),
108 vnode_count: *vnode_count,
109 }
110 }
111}
112
113#[derive(Default, Debug)]
114pub struct SharedActorInfosInner {
115 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
116}
117
118impl SharedActorInfosInner {
119 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
120 self.info
121 .values()
122 .find_map(|database| database.get(&fragment_id))
123 }
124
125 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
126 self.info.values().flatten()
127 }
128}
129
130#[derive(Clone, educe::Educe)]
131#[educe(Debug)]
132pub struct SharedActorInfos {
133 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
134 #[educe(Debug(ignore))]
135 notification_manager: NotificationManagerRef,
136}
137
138impl SharedActorInfos {
139 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
140 self.inner.read()
141 }
142
143 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
144 let core = self.inner.read();
145 core.iter_over_fragments()
146 .flat_map(|(_, fragment)| {
147 fragment
148 .actors
149 .iter()
150 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
151 })
152 .collect()
153 }
154
155 pub fn migrate_splits_for_source_actors(
161 &self,
162 fragment_id: FragmentId,
163 prev_actor_ids: &[ActorId],
164 curr_actor_ids: &[ActorId],
165 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
166 let guard = self.read_guard();
167
168 let prev_splits = prev_actor_ids
169 .iter()
170 .flat_map(|actor_id| {
171 guard
173 .get_fragment(fragment_id)
174 .and_then(|info| info.actors.get(actor_id))
175 .map(|actor| actor.splits.clone())
176 .unwrap_or_default()
177 })
178 .map(|split| (split.id(), split))
179 .collect();
180
181 let empty_actor_splits = curr_actor_ids
182 .iter()
183 .map(|actor_id| (*actor_id, vec![]))
184 .collect();
185
186 let diff = crate::stream::source_manager::reassign_splits(
187 fragment_id,
188 empty_actor_splits,
189 &prev_splits,
190 std::default::Default::default(),
192 )
193 .unwrap_or_default();
194
195 Ok(diff)
196 }
197}
198
199impl SharedActorInfos {
200 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
201 Self {
202 inner: Arc::new(Default::default()),
203 notification_manager,
204 }
205 }
206
207 pub(super) fn remove_database(&self, database_id: DatabaseId) {
208 if let Some(database) = self.inner.write().info.remove(&database_id) {
209 let mapping = database
210 .into_values()
211 .map(|fragment| rebuild_fragment_mapping(&fragment))
212 .collect_vec();
213 if !mapping.is_empty() {
214 self.notification_manager
215 .notify_fragment_mapping(Operation::Delete, mapping);
216 }
217 }
218 }
219
220 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
221 let database_ids: HashSet<_> = database_ids.into_iter().collect();
222
223 let mut mapping = Vec::new();
224 for fragment in self
225 .inner
226 .write()
227 .info
228 .extract_if(|database_id, _| !database_ids.contains(database_id))
229 .flat_map(|(_, fragments)| fragments.into_values())
230 {
231 mapping.push(rebuild_fragment_mapping(&fragment));
232 }
233 if !mapping.is_empty() {
234 self.notification_manager
235 .notify_fragment_mapping(Operation::Delete, mapping);
236 }
237 }
238
239 pub(super) fn recover_database(
240 &self,
241 database_id: DatabaseId,
242 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
243 ) {
244 let mut remaining_fragments: HashMap<_, _> = fragments
245 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
246 .collect();
247 let mut writer = self.start_writer(database_id);
249 let database = writer.write_guard.info.entry(database_id).or_default();
250 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
251 if let Some(info) = remaining_fragments.remove(fragment_id) {
252 let info = info.into();
253 writer
254 .updated_fragment_mapping
255 .get_or_insert_default()
256 .push(rebuild_fragment_mapping(&info));
257 *fragment_infos = info;
258 false
259 } else {
260 true
261 }
262 }) {
263 writer
264 .deleted_fragment_mapping
265 .get_or_insert_default()
266 .push(rebuild_fragment_mapping(&fragment));
267 }
268 for (fragment_id, info) in remaining_fragments {
269 let info = info.into();
270 writer
271 .added_fragment_mapping
272 .get_or_insert_default()
273 .push(rebuild_fragment_mapping(&info));
274 database.insert(fragment_id, info);
275 }
276 writer.finish();
277 }
278
279 pub(super) fn upsert(
280 &self,
281 database_id: DatabaseId,
282 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
283 ) {
284 let mut writer = self.start_writer(database_id);
285 writer.upsert(infos);
286 writer.finish();
287 }
288
289 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
290 SharedActorInfoWriter {
291 database_id,
292 write_guard: self.inner.write(),
293 notification_manager: &self.notification_manager,
294 added_fragment_mapping: None,
295 updated_fragment_mapping: None,
296 deleted_fragment_mapping: None,
297 }
298 }
299}
300
301pub(super) struct SharedActorInfoWriter<'a> {
302 database_id: DatabaseId,
303 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
304 notification_manager: &'a NotificationManagerRef,
305 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
306 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
307 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
308}
309
310impl SharedActorInfoWriter<'_> {
311 pub(super) fn upsert(
312 &mut self,
313 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
314 ) {
315 let database = self.write_guard.info.entry(self.database_id).or_default();
316 for info @ (fragment, _) in infos {
317 match database.entry(fragment.fragment_id) {
318 Entry::Occupied(mut entry) => {
319 let info = info.into();
320 self.updated_fragment_mapping
321 .get_or_insert_default()
322 .push(rebuild_fragment_mapping(&info));
323 entry.insert(info);
324 }
325 Entry::Vacant(entry) => {
326 let info = info.into();
327 self.added_fragment_mapping
328 .get_or_insert_default()
329 .push(rebuild_fragment_mapping(&info));
330 entry.insert(info);
331 }
332 }
333 }
334 }
335
336 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
337 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
338 && let Some(fragment) = database.remove(&info.fragment_id)
339 {
340 self.deleted_fragment_mapping
341 .get_or_insert_default()
342 .push(rebuild_fragment_mapping(&fragment));
343 }
344 }
345
346 pub(super) fn finish(self) {
347 if let Some(mapping) = self.added_fragment_mapping {
348 self.notification_manager
349 .notify_fragment_mapping(Operation::Add, mapping);
350 }
351 if let Some(mapping) = self.updated_fragment_mapping {
352 self.notification_manager
353 .notify_fragment_mapping(Operation::Update, mapping);
354 }
355 if let Some(mapping) = self.deleted_fragment_mapping {
356 self.notification_manager
357 .notify_fragment_mapping(Operation::Delete, mapping);
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
363pub(super) struct BarrierInfo {
364 pub prev_epoch: TracedEpoch,
365 pub curr_epoch: TracedEpoch,
366 pub kind: BarrierKind,
367}
368
369impl BarrierInfo {
370 pub(super) fn prev_epoch(&self) -> u64 {
371 self.prev_epoch.value().0
372 }
373
374 pub(super) fn curr_epoch(&self) -> u64 {
375 self.curr_epoch.value().0
376 }
377
378 pub(super) fn epoch(&self) -> EpochPair {
379 EpochPair {
380 curr: self.curr_epoch(),
381 prev: self.prev_epoch(),
382 }
383 }
384}
385
386#[derive(Clone, Debug)]
387pub enum SubscriberType {
388 Subscription(u64),
389 SnapshotBackfill,
390}
391
392#[derive(Debug)]
393pub(super) enum CreateStreamingJobStatus {
394 Init,
395 Creating { tracker: CreateMviewProgressTracker },
396 Created,
397}
398
399#[derive(Debug)]
400pub(super) struct InflightStreamingJobInfo {
401 pub job_id: JobId,
402 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
403 pub subscribers: HashMap<SubscriberId, SubscriberType>,
404 pub status: CreateStreamingJobStatus,
405 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
406}
407
408impl InflightStreamingJobInfo {
409 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
410 self.fragment_infos.values()
411 }
412
413 pub fn snapshot_backfill_actor_ids(
414 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
415 ) -> impl Iterator<Item = ActorId> + '_ {
416 fragment_infos
417 .values()
418 .filter(|fragment| {
419 fragment
420 .fragment_type_mask
421 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
422 })
423 .flat_map(|fragment| fragment.actors.keys().copied())
424 }
425
426 pub fn tracking_progress_actor_ids(
427 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
428 ) -> Vec<(ActorId, BackfillUpstreamType)> {
429 StreamJobFragments::tracking_progress_actor_ids_impl(
430 fragment_infos
431 .values()
432 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
433 )
434 }
435}
436
437impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
438 type Item = &'a InflightFragmentInfo;
439
440 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
441
442 fn into_iter(self) -> Self::IntoIter {
443 self.fragment_infos()
444 }
445}
446
447#[derive(Debug)]
448pub struct InflightDatabaseInfo {
449 pub(super) database_id: DatabaseId,
450 jobs: HashMap<JobId, InflightStreamingJobInfo>,
451 fragment_location: HashMap<FragmentId, JobId>,
452 pub(super) shared_actor_infos: SharedActorInfos,
453}
454
455impl InflightDatabaseInfo {
456 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
457 self.jobs.values().flat_map(|job| job.fragment_infos())
458 }
459
460 pub fn contains_job(&self, job_id: JobId) -> bool {
461 self.jobs.contains_key(&job_id)
462 }
463
464 pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
465 self.fragment_location.get(&fragment_id).copied()
466 }
467
468 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
469 let job_id = self.fragment_location[&fragment_id];
470 self.jobs
471 .get(&job_id)
472 .expect("should exist")
473 .fragment_infos
474 .get(&fragment_id)
475 .expect("should exist")
476 }
477
478 pub(super) fn backfill_fragment_ids_for_job(
479 &self,
480 job_id: JobId,
481 ) -> MetaResult<HashSet<FragmentId>> {
482 let job = self
483 .jobs
484 .get(&job_id)
485 .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
486 Ok(job
487 .fragment_infos
488 .iter()
489 .filter_map(|(fragment_id, fragment)| {
490 fragment
491 .fragment_type_mask
492 .contains_any([
493 FragmentTypeFlag::StreamScan,
494 FragmentTypeFlag::SourceScan,
495 FragmentTypeFlag::LocalityProvider,
496 ])
497 .then_some(*fragment_id)
498 })
499 .collect())
500 }
501
502 pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
503 let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
504 MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
505 })?;
506 let fragment = self
507 .jobs
508 .get(job_id)
509 .expect("should exist")
510 .fragment_infos
511 .get(&fragment_id)
512 .expect("should exist");
513 Ok(fragment.fragment_type_mask.contains_any([
514 FragmentTypeFlag::StreamScan,
515 FragmentTypeFlag::SourceScan,
516 FragmentTypeFlag::LocalityProvider,
517 ]))
518 }
519
520 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
521 self.jobs
522 .iter()
523 .filter_map(|(job_id, job)| match &job.status {
524 CreateStreamingJobStatus::Init => None,
525 CreateStreamingJobStatus::Creating { tracker } => {
526 let progress = tracker.gen_backfill_progress();
527 Some((
528 *job_id,
529 BackfillProgress {
530 progress,
531 backfill_type: PbBackfillType::NormalBackfill,
532 },
533 ))
534 }
535 CreateStreamingJobStatus::Created => None,
536 })
537 }
538
539 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
540 self.jobs.iter().filter_map(|(job_id, job)| {
541 job.cdc_table_backfill_tracker
542 .as_ref()
543 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
544 })
545 }
546
547 pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
548 let mut result = Vec::new();
549 for job in self.jobs.values() {
550 let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
551 continue;
552 };
553 let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
554 result.extend(fragment_progress);
555 }
556 result
557 }
558
559 pub(super) fn may_assign_fragment_cdc_backfill_splits(
560 &mut self,
561 fragment_id: FragmentId,
562 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
563 let job_id = self.fragment_location[&fragment_id];
564 let job = self.jobs.get_mut(&job_id).expect("should exist");
565 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
566 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
567 if cdc_scan_fragment_id != fragment_id {
568 return Ok(None);
569 }
570 let actors = job.fragment_infos[&cdc_scan_fragment_id]
571 .actors
572 .keys()
573 .copied()
574 .collect();
575 tracker.reassign_splits(actors).map(Some)
576 } else {
577 Ok(None)
578 }
579 }
580
581 pub(super) fn assign_cdc_backfill_splits(
582 &mut self,
583 job_id: JobId,
584 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
585 let job = self.jobs.get_mut(&job_id).expect("should exist");
586 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
587 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
588 let actors = job.fragment_infos[&cdc_scan_fragment_id]
589 .actors
590 .keys()
591 .copied()
592 .collect();
593 tracker.reassign_splits(actors).map(Some)
594 } else {
595 Ok(None)
596 }
597 }
598
599 pub(super) fn apply_collected_command(
600 &mut self,
601 command: &PostCollectCommand,
602 resps: &[BarrierCompleteResponse],
603 version_stats: &HummockVersionStats,
604 ) {
605 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
606 match job_type {
607 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
608 let job_id = info.streaming_job.id();
609 if let Some(job_info) = self.jobs.get_mut(&job_id) {
610 let CreateStreamingJobStatus::Init = replace(
611 &mut job_info.status,
612 CreateStreamingJobStatus::Creating {
613 tracker: CreateMviewProgressTracker::new(info, version_stats),
614 },
615 ) else {
616 unreachable!("should be init before collect the first barrier")
617 };
618 } else {
619 info!(%job_id, "newly create job get cancelled before first barrier is collected")
620 }
621 }
622 CreateStreamingJobType::SnapshotBackfill(_) => {
623 }
625 }
626 }
627 if let PostCollectCommand::Reschedule { reschedules, .. } = command {
628 debug_assert!(
630 reschedules
631 .values()
632 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
633 "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
634 );
635
636 let related_job_ids = reschedules
638 .keys()
639 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
640 .cloned()
641 .collect::<HashSet<_>>();
642 for job_id in related_job_ids {
643 if let Some(job) = self.jobs.get_mut(&job_id)
644 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
645 {
646 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
647 }
648 }
649 }
650 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
651 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
652 warn!(
653 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
654 );
655 continue;
656 };
657 let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
658 CreateStreamingJobStatus::Init => {
659 continue;
660 }
661 CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
662 CreateStreamingJobStatus::Created => {
663 if !progress.done {
664 warn!("update the progress of an created streaming job: {progress:?}");
665 }
666 continue;
667 }
668 };
669 tracker.apply_progress(progress, version_stats);
670 }
671 for progress in resps
672 .iter()
673 .flat_map(|resp| &resp.cdc_table_backfill_progress)
674 {
675 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
676 warn!(
677 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
678 );
679 continue;
680 };
681 let Some(tracker) = &mut self
682 .jobs
683 .get_mut(job_id)
684 .expect("should exist")
685 .cdc_table_backfill_tracker
686 else {
687 warn!("update the cdc progress of an created streaming job: {progress:?}");
688 continue;
689 };
690 tracker.update_split_progress(progress);
691 }
692 for cdc_offset_updated in resps
694 .iter()
695 .flat_map(|resp| &resp.cdc_source_offset_updated)
696 {
697 use risingwave_common::id::SourceId;
698 let source_id = SourceId::new(cdc_offset_updated.source_id);
699 let job_id = source_id.as_share_source_job_id();
700 if let Some(job) = self.jobs.get_mut(&job_id) {
701 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
702 tracker.mark_cdc_source_finished();
703 }
704 } else {
705 warn!(
706 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
707 cdc_offset_updated.source_id, job_id
708 );
709 }
710 }
711 }
712
713 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
714 self.jobs.values().filter_map(|job| match &job.status {
715 CreateStreamingJobStatus::Init => None,
716 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
717 CreateStreamingJobStatus::Created => None,
718 })
719 }
720
721 fn iter_mut_creating_job_tracker(
722 &mut self,
723 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
724 self.jobs
725 .values_mut()
726 .filter_map(|job| match &mut job.status {
727 CreateStreamingJobStatus::Init => None,
728 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
729 CreateStreamingJobStatus::Created => None,
730 })
731 }
732
733 pub(super) fn has_pending_finished_jobs(&self) -> bool {
734 self.iter_creating_job_tracker()
735 .any(|tracker| tracker.is_finished())
736 }
737
738 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
739 self.iter_mut_creating_job_tracker()
740 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
741 .collect()
742 }
743
744 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
745 let mut finished_jobs = vec![];
746 let mut table_ids_to_truncate = vec![];
747 let mut finished_cdc_table_backfill = vec![];
748 for (job_id, job) in &mut self.jobs {
749 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
750 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
751 table_ids_to_truncate.extend(truncate_table_ids);
752 if is_finished {
753 let CreateStreamingJobStatus::Creating { tracker, .. } =
754 replace(&mut job.status, CreateStreamingJobStatus::Created)
755 else {
756 unreachable!()
757 };
758 finished_jobs.push(tracker.into_tracking_job());
759 }
760 }
761 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
762 && tracker.take_pre_completed()
763 {
764 finished_cdc_table_backfill.push(*job_id);
765 }
766 }
767 StagingCommitInfo {
768 finished_jobs,
769 table_ids_to_truncate,
770 finished_cdc_table_backfill,
771 }
772 }
773
774 pub fn fragment_subscribers(
775 &self,
776 fragment_id: FragmentId,
777 ) -> impl Iterator<Item = SubscriberId> + '_ {
778 let job_id = self.fragment_location[&fragment_id];
779 self.jobs[&job_id].subscribers.keys().copied()
780 }
781
782 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
783 self.jobs[&job_id].subscribers.keys().copied()
784 }
785
786 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
787 self.jobs
788 .iter()
789 .filter_map(|(job_id, info)| {
790 info.subscribers
791 .values()
792 .filter_map(|subscriber| match subscriber {
793 SubscriberType::Subscription(retention) => Some(*retention),
794 SubscriberType::SnapshotBackfill => None,
795 })
796 .max()
797 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
798 })
799 .collect()
800 }
801
802 pub fn register_subscriber(
803 &mut self,
804 job_id: JobId,
805 subscriber_id: SubscriberId,
806 subscriber: SubscriberType,
807 ) {
808 self.jobs
809 .get_mut(&job_id)
810 .expect("should exist")
811 .subscribers
812 .try_insert(subscriber_id, subscriber)
813 .expect("non duplicate");
814 }
815
816 pub fn unregister_subscriber(
817 &mut self,
818 job_id: JobId,
819 subscriber_id: SubscriberId,
820 ) -> Option<SubscriberType> {
821 self.jobs
822 .get_mut(&job_id)
823 .expect("should exist")
824 .subscribers
825 .remove(&subscriber_id)
826 }
827
828 pub fn update_subscription_retention(
829 &mut self,
830 job_id: JobId,
831 subscriber_id: SubscriberId,
832 retention_second: u64,
833 ) {
834 let job = self.jobs.get_mut(&job_id).expect("should exist");
835 match job.subscribers.get_mut(&subscriber_id) {
836 Some(SubscriberType::Subscription(current_retention)) => {
837 *current_retention = retention_second;
838 }
839 Some(SubscriberType::SnapshotBackfill) => {
840 warn!(
841 %job_id,
842 %subscriber_id,
843 "cannot update retention for snapshot backfill subscriber"
844 );
845 }
846 None => {
847 warn!(%job_id, %subscriber_id, "subscription subscriber not found");
848 }
849 }
850 }
851
852 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
853 let job_id = self.fragment_location[&fragment_id];
854 let fragment = self
855 .jobs
856 .get_mut(&job_id)
857 .expect("should exist")
858 .fragment_infos
859 .get_mut(&fragment_id)
860 .expect("should exist");
861 (fragment, job_id)
862 }
863
864 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
865 Self {
866 database_id,
867 jobs: Default::default(),
868 fragment_location: Default::default(),
869 shared_actor_infos,
870 }
871 }
872
873 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
874 shared_actor_infos.remove_database(database_id);
876 Self::empty_inner(database_id, shared_actor_infos)
877 }
878
879 pub fn recover(
880 database_id: DatabaseId,
881 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
882 shared_actor_infos: SharedActorInfos,
883 ) -> Self {
884 let mut info = Self::empty_inner(database_id, shared_actor_infos);
885 for job in jobs {
886 info.add_existing(job);
887 }
888 info
889 }
890
891 pub fn is_empty(&self) -> bool {
892 self.jobs.is_empty()
893 }
894
895 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
896 let InflightStreamingJobInfo {
897 job_id,
898 fragment_infos,
899 subscribers,
900 status,
901 cdc_table_backfill_tracker,
902 } = job;
903 self.jobs
904 .try_insert(
905 job_id,
906 InflightStreamingJobInfo {
907 job_id,
908 subscribers,
909 fragment_infos: Default::default(), status,
911 cdc_table_backfill_tracker,
912 },
913 )
914 .expect("non-duplicate");
915 self.pre_apply_new_fragments(
916 fragment_infos
917 .into_iter()
918 .map(|(fragment_id, info)| (fragment_id, job_id, info)),
919 );
920 }
921
922 pub(crate) fn pre_apply_new_job(
924 &mut self,
925 job_id: JobId,
926 cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
927 ) {
928 {
929 self.jobs
930 .try_insert(
931 job_id,
932 InflightStreamingJobInfo {
933 job_id,
934 fragment_infos: Default::default(),
935 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
937 cdc_table_backfill_tracker,
938 },
939 )
940 .expect("non-duplicate");
941 }
942 }
943
944 pub(crate) fn pre_apply_new_fragments(
946 &mut self,
947 fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
948 ) {
949 {
950 let shared_infos = self.shared_actor_infos.clone();
951 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
952 for (fragment_id, job_id, info) in fragments {
953 {
954 {
955 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
956 shared_actor_writer.upsert([(&info, job_id)]);
957 fragment_infos
958 .fragment_infos
959 .try_insert(fragment_id, info)
960 .expect("non duplicate");
961 self.fragment_location
962 .try_insert(fragment_id, job_id)
963 .expect("non duplicate");
964 }
965 }
966 }
967 shared_actor_writer.finish();
968 }
969 }
970
971 pub(crate) fn pre_apply_reschedule(
974 &mut self,
975 fragment_id: FragmentId,
976 new_actors: HashMap<ActorId, InflightActorInfo>,
977 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
978 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
979 ) {
980 {
981 {
982 {
983 {
984 let (info, _) = self.fragment_mut(fragment_id);
985 let actors = &mut info.actors;
986 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
987 actors
988 .get_mut(&actor_id)
989 .expect("should exist")
990 .vnode_bitmap = Some(new_vnodes);
991 }
992 for (actor_id, actor) in new_actors {
993 actors
994 .try_insert(actor_id as _, actor)
995 .expect("non-duplicate");
996 }
997 for (actor_id, splits) in actor_splits {
998 actors.get_mut(&actor_id).expect("should exist").splits = splits;
999 }
1000 }
1002 }
1003 }
1004 }
1005 }
1006
1007 pub(crate) fn pre_apply_replace_node_upstream(
1009 &mut self,
1010 fragment_id: FragmentId,
1011 replace_map: &HashMap<FragmentId, FragmentId>,
1012 ) {
1013 {
1014 {
1015 {
1016 {
1017 let mut remaining_fragment_ids: HashSet<_> =
1018 replace_map.keys().cloned().collect();
1019 let (info, _) = self.fragment_mut(fragment_id);
1020 visit_stream_node_mut(&mut info.nodes, |node| {
1021 if let NodeBody::Merge(m) = node
1022 && let Some(new_upstream_fragment_id) =
1023 replace_map.get(&m.upstream_fragment_id)
1024 {
1025 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1026 if cfg!(debug_assertions) {
1027 panic!(
1028 "duplicate upstream fragment: {:?} {:?}",
1029 m, replace_map
1030 );
1031 } else {
1032 warn!(?m, ?replace_map, "duplicate upstream fragment");
1033 }
1034 }
1035 m.upstream_fragment_id = *new_upstream_fragment_id;
1036 }
1037 });
1038 if cfg!(debug_assertions) {
1039 assert!(
1040 remaining_fragment_ids.is_empty(),
1041 "non-existing fragment to replace: {:?} {:?} {:?}",
1042 remaining_fragment_ids,
1043 info.nodes,
1044 replace_map
1045 );
1046 } else {
1047 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1048 }
1049 }
1050 }
1051 }
1052 }
1053 }
1054
1055 pub(crate) fn pre_apply_add_node_upstream(
1057 &mut self,
1058 fragment_id: FragmentId,
1059 new_upstream_info: &PbUpstreamSinkInfo,
1060 ) {
1061 {
1062 {
1063 {
1064 {
1065 let (info, _) = self.fragment_mut(fragment_id);
1066 let mut injected = false;
1067 visit_stream_node_mut(&mut info.nodes, |node| {
1068 if let NodeBody::UpstreamSinkUnion(u) = node {
1069 if cfg!(debug_assertions) {
1070 let current_upstream_fragment_ids = u
1071 .init_upstreams
1072 .iter()
1073 .map(|upstream| upstream.upstream_fragment_id)
1074 .collect::<HashSet<_>>();
1075 if current_upstream_fragment_ids
1076 .contains(&new_upstream_info.upstream_fragment_id)
1077 {
1078 panic!(
1079 "duplicate upstream fragment: {:?} {:?}",
1080 u, new_upstream_info
1081 );
1082 }
1083 }
1084 u.init_upstreams.push(new_upstream_info.clone());
1085 injected = true;
1086 }
1087 });
1088 assert!(injected, "should inject upstream into UpstreamSinkUnion");
1089 }
1090 }
1091 }
1092 }
1093 }
1094
1095 pub(crate) fn pre_apply_drop_node_upstream(
1097 &mut self,
1098 fragment_id: FragmentId,
1099 drop_upstream_fragment_ids: &[FragmentId],
1100 ) {
1101 {
1102 {
1103 {
1104 {
1105 let (info, _) = self.fragment_mut(fragment_id);
1106 let mut removed = false;
1107 visit_stream_node_mut(&mut info.nodes, |node| {
1108 if let NodeBody::UpstreamSinkUnion(u) = node {
1109 if cfg!(debug_assertions) {
1110 let current_upstream_fragment_ids = u
1111 .init_upstreams
1112 .iter()
1113 .map(|upstream| upstream.upstream_fragment_id)
1114 .collect::<HashSet<FragmentId>>();
1115 for drop_fragment_id in drop_upstream_fragment_ids {
1116 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1117 {
1118 panic!(
1119 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1120 u, drop_upstream_fragment_ids, drop_fragment_id
1121 );
1122 }
1123 }
1124 }
1125 u.init_upstreams.retain(|upstream| {
1126 !drop_upstream_fragment_ids
1127 .contains(&upstream.upstream_fragment_id)
1128 });
1129 removed = true;
1130 }
1131 });
1132 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1133 }
1134 }
1135 }
1136 }
1137 }
1138
1139 pub(crate) fn pre_apply_split_assignments(
1141 &mut self,
1142 assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1143 ) {
1144 {
1145 let shared_infos = self.shared_actor_infos.clone();
1146 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1147 {
1148 {
1149 for (fragment_id, actor_splits) in assignments {
1150 let (info, job_id) = self.fragment_mut(fragment_id);
1151 let actors = &mut info.actors;
1152 for (actor_id, splits) in actor_splits {
1153 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1154 }
1155 shared_actor_writer.upsert([(&*info, job_id)]);
1156 }
1157 }
1158 }
1159 shared_actor_writer.finish();
1160 }
1161 }
1162
1163 pub(super) fn build_edge(
1164 &self,
1165 info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1166 replace_job: Option<&ReplaceStreamJobPlan>,
1167 new_upstream_sink: Option<&UpstreamSinkInfo>,
1168 control_stream_manager: &ControlStreamManager,
1169 ) -> FragmentEdgeBuildResult {
1170 let existing_fragment_ids = info
1178 .into_iter()
1179 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1180 .chain(replace_job.into_iter().flat_map(|replace_job| {
1181 replace_job
1182 .upstream_fragment_downstreams
1183 .keys()
1184 .filter(|fragment_id| {
1185 info.map(|(info, _)| {
1186 !info
1187 .stream_job_fragments
1188 .fragments
1189 .contains_key(*fragment_id)
1190 })
1191 .unwrap_or(true)
1192 })
1193 .chain(replace_job.replace_upstream.keys())
1194 }))
1195 .chain(
1196 new_upstream_sink
1197 .into_iter()
1198 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1199 )
1200 .cloned();
1201 let new_fragment_infos = info
1202 .into_iter()
1203 .flat_map(|(info, is_snapshot_backfill)| {
1204 let partial_graph_id = to_partial_graph_id(
1205 self.database_id,
1206 is_snapshot_backfill.then_some(info.streaming_job.id()),
1207 );
1208 info.stream_job_fragments
1209 .new_fragment_info(&info.init_split_assignment)
1210 .map(move |(fragment_id, info)| (fragment_id, info, partial_graph_id))
1211 })
1212 .chain(
1213 replace_job
1214 .into_iter()
1215 .flat_map(|replace_job| {
1216 replace_job
1217 .new_fragments
1218 .new_fragment_info(&replace_job.init_split_assignment)
1219 .chain(
1220 replace_job
1221 .auto_refresh_schema_sinks
1222 .as_ref()
1223 .into_iter()
1224 .flat_map(|sinks| {
1225 sinks.iter().map(|sink| {
1226 (
1227 sink.new_fragment.fragment_id,
1228 sink.new_fragment_info(),
1229 )
1230 })
1231 }),
1232 )
1233 })
1234 .map(|(fragment_id, fragment)| {
1235 (
1236 fragment_id,
1237 fragment,
1238 to_partial_graph_id(self.database_id, None),
1240 )
1241 }),
1242 )
1243 .collect_vec();
1244 let mut builder = FragmentEdgeBuilder::new(
1245 existing_fragment_ids
1246 .map(|fragment_id| {
1247 (
1248 self.fragment(fragment_id),
1249 to_partial_graph_id(self.database_id, None),
1250 )
1251 })
1252 .chain(
1253 new_fragment_infos
1254 .iter()
1255 .map(|(_, info, partial_graph_id)| (info, *partial_graph_id)),
1256 ),
1257 control_stream_manager,
1258 );
1259 if let Some((info, _)) = info {
1260 builder.add_relations(&info.upstream_fragment_downstreams);
1261 builder.add_relations(&info.stream_job_fragments.downstreams);
1262 }
1263 if let Some(replace_job) = replace_job {
1264 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1265 builder.add_relations(&replace_job.new_fragments.downstreams);
1266 }
1267 if let Some(new_upstream_sink) = new_upstream_sink {
1268 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1269 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1270 builder.add_edge(sink_fragment_id, new_sink_downstream);
1271 }
1272 if let Some(replace_job) = replace_job {
1273 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1274 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1275 fragment_replacement
1276 {
1277 builder.replace_upstream(
1278 *fragment_id,
1279 *original_upstream_fragment_id,
1280 *new_upstream_fragment_id,
1281 );
1282 }
1283 }
1284 }
1285 builder.build()
1286 }
1287
1288 pub(crate) fn post_apply_reschedules(
1290 &mut self,
1291 reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1292 ) {
1293 let inner = self.shared_actor_infos.clone();
1294 let mut shared_actor_writer = inner.start_writer(self.database_id);
1295 {
1296 {
1297 {
1298 for (fragment_id, to_remove) in reschedules {
1299 let job_id = self.fragment_location[&fragment_id];
1300 let info = self
1301 .jobs
1302 .get_mut(&job_id)
1303 .expect("should exist")
1304 .fragment_infos
1305 .get_mut(&fragment_id)
1306 .expect("should exist");
1307 for actor_id in to_remove {
1308 assert!(info.actors.remove(&actor_id).is_some());
1309 }
1310 shared_actor_writer.upsert([(&*info, job_id)]);
1311 }
1312 }
1313 }
1314 }
1315 shared_actor_writer.finish();
1316 }
1317
1318 pub(crate) fn post_apply_remove_fragments(
1320 &mut self,
1321 fragment_ids: impl IntoIterator<Item = FragmentId>,
1322 ) {
1323 let inner = self.shared_actor_infos.clone();
1324 let mut shared_actor_writer = inner.start_writer(self.database_id);
1325 {
1326 {
1327 {
1328 for fragment_id in fragment_ids {
1329 let job_id = self
1330 .fragment_location
1331 .remove(&fragment_id)
1332 .expect("should exist");
1333 let job = self.jobs.get_mut(&job_id).expect("should exist");
1334 let fragment = job
1335 .fragment_infos
1336 .remove(&fragment_id)
1337 .expect("should exist");
1338 shared_actor_writer.remove(&fragment);
1339 if job.fragment_infos.is_empty() {
1340 self.jobs.remove(&job_id).expect("should exist");
1341 }
1342 }
1343 }
1344 }
1345 }
1346 shared_actor_writer.finish();
1347 }
1348}
1349
1350impl InflightFragmentInfo {
1351 pub(crate) fn actor_ids_to_collect(
1353 infos: impl IntoIterator<Item = &Self>,
1354 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1355 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1356 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1357 assert!(
1358 ret.entry(actor.worker_id)
1359 .or_default()
1360 .insert(*actor_id as _)
1361 )
1362 }
1363 ret
1364 }
1365
1366 pub fn existing_table_ids<'a>(
1367 infos: impl IntoIterator<Item = &'a Self> + 'a,
1368 ) -> impl Iterator<Item = TableId> + 'a {
1369 infos
1370 .into_iter()
1371 .flat_map(|info| info.state_table_ids.iter().cloned())
1372 }
1373
1374 pub fn workers<'a>(
1375 infos: impl IntoIterator<Item = &'a Self> + 'a,
1376 ) -> impl Iterator<Item = WorkerId> + 'a {
1377 infos
1378 .into_iter()
1379 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1380 }
1381
1382 pub fn contains_worker<'a>(
1383 infos: impl IntoIterator<Item = &'a Self> + 'a,
1384 worker_id: WorkerId,
1385 ) -> bool {
1386 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1387 }
1388}
1389
1390impl InflightDatabaseInfo {
1391 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1392 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1393 }
1394
1395 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1396 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1397 }
1398}