1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
42use crate::barrier::command::PostCollectCommand;
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
46use crate::barrier::{
47 BackfillProgress, BarrierKind, Command, CreateStreamingJobType, FragmentBackfillProgress,
48 TracedEpoch,
49};
50use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
51use crate::controller::utils::rebuild_fragment_mapping;
52use crate::manager::NotificationManagerRef;
53use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
54use crate::{MetaError, MetaResult};
55
56#[derive(Debug, Clone)]
57pub struct SharedActorInfo {
58 pub worker_id: WorkerId,
59 pub vnode_bitmap: Option<Bitmap>,
60 pub splits: Vec<SplitImpl>,
61}
62
63impl From<&InflightActorInfo> for SharedActorInfo {
64 fn from(value: &InflightActorInfo) -> Self {
65 Self {
66 worker_id: value.worker_id,
67 vnode_bitmap: value.vnode_bitmap.clone(),
68 splits: value.splits.clone(),
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
74pub struct SharedFragmentInfo {
75 pub fragment_id: FragmentId,
76 pub job_id: JobId,
77 pub distribution_type: DistributionType,
78 pub actors: HashMap<ActorId, SharedActorInfo>,
79 pub vnode_count: usize,
80 pub fragment_type_mask: FragmentTypeMask,
81}
82
83impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
84 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
85 let (info, job_id) = pair;
86
87 let InflightFragmentInfo {
88 fragment_id,
89 distribution_type,
90 fragment_type_mask,
91 actors,
92 vnode_count,
93 ..
94 } = info;
95
96 Self {
97 fragment_id: *fragment_id,
98 job_id,
99 distribution_type: *distribution_type,
100 fragment_type_mask: *fragment_type_mask,
101 actors: actors
102 .iter()
103 .map(|(actor_id, actor)| (*actor_id, actor.into()))
104 .collect(),
105 vnode_count: *vnode_count,
106 }
107 }
108}
109
110#[derive(Default, Debug)]
111pub struct SharedActorInfosInner {
112 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
113}
114
115impl SharedActorInfosInner {
116 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
117 self.info
118 .values()
119 .find_map(|database| database.get(&fragment_id))
120 }
121
122 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
123 self.info.values().flatten()
124 }
125}
126
127#[derive(Clone, educe::Educe)]
128#[educe(Debug)]
129pub struct SharedActorInfos {
130 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
131 #[educe(Debug(ignore))]
132 notification_manager: NotificationManagerRef,
133}
134
135impl SharedActorInfos {
136 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
137 self.inner.read()
138 }
139
140 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
141 let core = self.inner.read();
142 core.iter_over_fragments()
143 .flat_map(|(_, fragment)| {
144 fragment
145 .actors
146 .iter()
147 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
148 })
149 .collect()
150 }
151
152 pub fn migrate_splits_for_source_actors(
158 &self,
159 fragment_id: FragmentId,
160 prev_actor_ids: &[ActorId],
161 curr_actor_ids: &[ActorId],
162 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
163 let guard = self.read_guard();
164
165 let prev_splits = prev_actor_ids
166 .iter()
167 .flat_map(|actor_id| {
168 guard
170 .get_fragment(fragment_id)
171 .and_then(|info| info.actors.get(actor_id))
172 .map(|actor| actor.splits.clone())
173 .unwrap_or_default()
174 })
175 .map(|split| (split.id(), split))
176 .collect();
177
178 let empty_actor_splits = curr_actor_ids
179 .iter()
180 .map(|actor_id| (*actor_id, vec![]))
181 .collect();
182
183 let diff = crate::stream::source_manager::reassign_splits(
184 fragment_id,
185 empty_actor_splits,
186 &prev_splits,
187 std::default::Default::default(),
189 )
190 .unwrap_or_default();
191
192 Ok(diff)
193 }
194}
195
196impl SharedActorInfos {
197 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
198 Self {
199 inner: Arc::new(Default::default()),
200 notification_manager,
201 }
202 }
203
204 pub(super) fn remove_database(&self, database_id: DatabaseId) {
205 if let Some(database) = self.inner.write().info.remove(&database_id) {
206 let mapping = database
207 .into_values()
208 .map(|fragment| rebuild_fragment_mapping(&fragment))
209 .collect_vec();
210 if !mapping.is_empty() {
211 self.notification_manager
212 .notify_fragment_mapping(Operation::Delete, mapping);
213 }
214 }
215 }
216
217 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
218 let database_ids: HashSet<_> = database_ids.into_iter().collect();
219
220 let mut mapping = Vec::new();
221 for fragment in self
222 .inner
223 .write()
224 .info
225 .extract_if(|database_id, _| !database_ids.contains(database_id))
226 .flat_map(|(_, fragments)| fragments.into_values())
227 {
228 mapping.push(rebuild_fragment_mapping(&fragment));
229 }
230 if !mapping.is_empty() {
231 self.notification_manager
232 .notify_fragment_mapping(Operation::Delete, mapping);
233 }
234 }
235
236 pub(super) fn recover_database(
237 &self,
238 database_id: DatabaseId,
239 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
240 ) {
241 let mut remaining_fragments: HashMap<_, _> = fragments
242 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
243 .collect();
244 let mut writer = self.start_writer(database_id);
246 let database = writer.write_guard.info.entry(database_id).or_default();
247 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
248 if let Some(info) = remaining_fragments.remove(fragment_id) {
249 let info = info.into();
250 writer
251 .updated_fragment_mapping
252 .get_or_insert_default()
253 .push(rebuild_fragment_mapping(&info));
254 *fragment_infos = info;
255 false
256 } else {
257 true
258 }
259 }) {
260 writer
261 .deleted_fragment_mapping
262 .get_or_insert_default()
263 .push(rebuild_fragment_mapping(&fragment));
264 }
265 for (fragment_id, info) in remaining_fragments {
266 let info = info.into();
267 writer
268 .added_fragment_mapping
269 .get_or_insert_default()
270 .push(rebuild_fragment_mapping(&info));
271 database.insert(fragment_id, info);
272 }
273 writer.finish();
274 }
275
276 pub(super) fn upsert(
277 &self,
278 database_id: DatabaseId,
279 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
280 ) {
281 let mut writer = self.start_writer(database_id);
282 writer.upsert(infos);
283 writer.finish();
284 }
285
286 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
287 SharedActorInfoWriter {
288 database_id,
289 write_guard: self.inner.write(),
290 notification_manager: &self.notification_manager,
291 added_fragment_mapping: None,
292 updated_fragment_mapping: None,
293 deleted_fragment_mapping: None,
294 }
295 }
296}
297
298pub(super) struct SharedActorInfoWriter<'a> {
299 database_id: DatabaseId,
300 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
301 notification_manager: &'a NotificationManagerRef,
302 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
303 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
304 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
305}
306
307impl SharedActorInfoWriter<'_> {
308 pub(super) fn upsert(
309 &mut self,
310 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
311 ) {
312 let database = self.write_guard.info.entry(self.database_id).or_default();
313 for info @ (fragment, _) in infos {
314 match database.entry(fragment.fragment_id) {
315 Entry::Occupied(mut entry) => {
316 let info = info.into();
317 self.updated_fragment_mapping
318 .get_or_insert_default()
319 .push(rebuild_fragment_mapping(&info));
320 entry.insert(info);
321 }
322 Entry::Vacant(entry) => {
323 let info = info.into();
324 self.added_fragment_mapping
325 .get_or_insert_default()
326 .push(rebuild_fragment_mapping(&info));
327 entry.insert(info);
328 }
329 }
330 }
331 }
332
333 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
334 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
335 && let Some(fragment) = database.remove(&info.fragment_id)
336 {
337 self.deleted_fragment_mapping
338 .get_or_insert_default()
339 .push(rebuild_fragment_mapping(&fragment));
340 }
341 }
342
343 pub(super) fn finish(self) {
344 if let Some(mapping) = self.added_fragment_mapping {
345 self.notification_manager
346 .notify_fragment_mapping(Operation::Add, mapping);
347 }
348 if let Some(mapping) = self.updated_fragment_mapping {
349 self.notification_manager
350 .notify_fragment_mapping(Operation::Update, mapping);
351 }
352 if let Some(mapping) = self.deleted_fragment_mapping {
353 self.notification_manager
354 .notify_fragment_mapping(Operation::Delete, mapping);
355 }
356 }
357}
358
359#[derive(Debug, Clone)]
360pub(super) struct BarrierInfo {
361 pub prev_epoch: TracedEpoch,
362 pub curr_epoch: TracedEpoch,
363 pub kind: BarrierKind,
364}
365
366impl BarrierInfo {
367 pub(super) fn prev_epoch(&self) -> u64 {
368 self.prev_epoch.value().0
369 }
370
371 pub(super) fn curr_epoch(&self) -> u64 {
372 self.curr_epoch.value().0
373 }
374}
375
376#[derive(Debug)]
377pub(super) enum CommandFragmentChanges {
378 NewFragment {
379 job_id: JobId,
380 info: InflightFragmentInfo,
381 },
382 AddNodeUpstream(PbUpstreamSinkInfo),
383 DropNodeUpstream(Vec<FragmentId>),
384 ReplaceNodeUpstream(
385 HashMap<FragmentId, FragmentId>,
387 ),
388 Reschedule {
389 new_actors: HashMap<ActorId, InflightActorInfo>,
390 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
391 to_remove: HashSet<ActorId>,
392 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393 },
394 RemoveFragment,
395 SplitAssignment {
396 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
397 },
398}
399
400pub(super) enum PostApplyFragmentChanges {
401 Reschedule { to_remove: HashSet<ActorId> },
402 RemoveFragment,
403}
404
405#[derive(Clone, Debug)]
406pub enum SubscriberType {
407 Subscription(u64),
408 SnapshotBackfill,
409}
410
411#[derive(Debug)]
412pub(super) enum CreateStreamingJobStatus {
413 Init,
414 Creating { tracker: CreateMviewProgressTracker },
415 Created,
416}
417
418#[derive(Debug)]
419pub(super) struct InflightStreamingJobInfo {
420 pub job_id: JobId,
421 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
422 pub subscribers: HashMap<SubscriberId, SubscriberType>,
423 pub status: CreateStreamingJobStatus,
424 pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
425}
426
427impl InflightStreamingJobInfo {
428 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
429 self.fragment_infos.values()
430 }
431
432 pub fn snapshot_backfill_actor_ids(
433 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
434 ) -> impl Iterator<Item = ActorId> + '_ {
435 fragment_infos
436 .values()
437 .filter(|fragment| {
438 fragment
439 .fragment_type_mask
440 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
441 })
442 .flat_map(|fragment| fragment.actors.keys().copied())
443 }
444
445 pub fn tracking_progress_actor_ids(
446 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
447 ) -> Vec<(ActorId, BackfillUpstreamType)> {
448 StreamJobFragments::tracking_progress_actor_ids_impl(
449 fragment_infos
450 .values()
451 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
452 )
453 }
454}
455
456impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
457 type Item = &'a InflightFragmentInfo;
458
459 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
460
461 fn into_iter(self) -> Self::IntoIter {
462 self.fragment_infos()
463 }
464}
465
466#[derive(Debug)]
467pub struct InflightDatabaseInfo {
468 pub(super) database_id: DatabaseId,
469 jobs: HashMap<JobId, InflightStreamingJobInfo>,
470 fragment_location: HashMap<FragmentId, JobId>,
471 pub(super) shared_actor_infos: SharedActorInfos,
472}
473
474impl InflightDatabaseInfo {
475 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
476 self.jobs.values().flat_map(|job| job.fragment_infos())
477 }
478
479 pub fn contains_job(&self, job_id: JobId) -> bool {
480 self.jobs.contains_key(&job_id)
481 }
482
483 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
484 let job_id = self.fragment_location[&fragment_id];
485 self.jobs
486 .get(&job_id)
487 .expect("should exist")
488 .fragment_infos
489 .get(&fragment_id)
490 .expect("should exist")
491 }
492
493 pub(super) fn backfill_fragment_ids_for_job(
494 &self,
495 job_id: JobId,
496 ) -> MetaResult<HashSet<FragmentId>> {
497 let job = self
498 .jobs
499 .get(&job_id)
500 .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
501 Ok(job
502 .fragment_infos
503 .iter()
504 .filter_map(|(fragment_id, fragment)| {
505 fragment
506 .fragment_type_mask
507 .contains_any([
508 FragmentTypeFlag::StreamScan,
509 FragmentTypeFlag::SourceScan,
510 FragmentTypeFlag::LocalityProvider,
511 ])
512 .then_some(*fragment_id)
513 })
514 .collect())
515 }
516
517 pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
518 let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
519 MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
520 })?;
521 let fragment = self
522 .jobs
523 .get(job_id)
524 .expect("should exist")
525 .fragment_infos
526 .get(&fragment_id)
527 .expect("should exist");
528 Ok(fragment.fragment_type_mask.contains_any([
529 FragmentTypeFlag::StreamScan,
530 FragmentTypeFlag::SourceScan,
531 FragmentTypeFlag::LocalityProvider,
532 ]))
533 }
534
535 pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
536 self.jobs
537 .iter()
538 .filter_map(|(job_id, job)| match &job.status {
539 CreateStreamingJobStatus::Init => None,
540 CreateStreamingJobStatus::Creating { tracker } => {
541 let progress = tracker.gen_backfill_progress();
542 Some((
543 *job_id,
544 BackfillProgress {
545 progress,
546 backfill_type: PbBackfillType::NormalBackfill,
547 },
548 ))
549 }
550 CreateStreamingJobStatus::Created => None,
551 })
552 }
553
554 pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
555 self.jobs.iter().filter_map(|(job_id, job)| {
556 job.cdc_table_backfill_tracker
557 .as_ref()
558 .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
559 })
560 }
561
562 pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
563 let mut result = Vec::new();
564 for job in self.jobs.values() {
565 let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
566 continue;
567 };
568 let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
569 result.extend(fragment_progress);
570 }
571 result
572 }
573
574 pub(super) fn may_assign_fragment_cdc_backfill_splits(
575 &mut self,
576 fragment_id: FragmentId,
577 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
578 let job_id = self.fragment_location[&fragment_id];
579 let job = self.jobs.get_mut(&job_id).expect("should exist");
580 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
581 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
582 if cdc_scan_fragment_id != fragment_id {
583 return Ok(None);
584 }
585 let actors = job.fragment_infos[&cdc_scan_fragment_id]
586 .actors
587 .keys()
588 .copied()
589 .collect();
590 tracker.reassign_splits(actors).map(Some)
591 } else {
592 Ok(None)
593 }
594 }
595
596 pub(super) fn assign_cdc_backfill_splits(
597 &mut self,
598 job_id: JobId,
599 ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
600 let job = self.jobs.get_mut(&job_id).expect("should exist");
601 if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
602 let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
603 let actors = job.fragment_infos[&cdc_scan_fragment_id]
604 .actors
605 .keys()
606 .copied()
607 .collect();
608 tracker.reassign_splits(actors).map(Some)
609 } else {
610 Ok(None)
611 }
612 }
613
614 pub(super) fn apply_collected_command(
615 &mut self,
616 command: &PostCollectCommand,
617 resps: &[BarrierCompleteResponse],
618 version_stats: &HummockVersionStats,
619 ) {
620 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
621 match job_type {
622 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
623 let job_id = info.streaming_job.id();
624 if let Some(job_info) = self.jobs.get_mut(&job_id) {
625 let CreateStreamingJobStatus::Init = replace(
626 &mut job_info.status,
627 CreateStreamingJobStatus::Creating {
628 tracker: CreateMviewProgressTracker::new(info, version_stats),
629 },
630 ) else {
631 unreachable!("should be init before collect the first barrier")
632 };
633 } else {
634 info!(%job_id, "newly create job get cancelled before first barrier is collected")
635 }
636 }
637 CreateStreamingJobType::SnapshotBackfill(_) => {
638 }
640 }
641 }
642 if let PostCollectCommand::RescheduleFragment { reschedules, .. } = command {
643 debug_assert!(
645 reschedules
646 .values()
647 .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
648 "RescheduleFragment should not carry vnode bitmap updates when actors are rebuilt"
649 );
650
651 let related_job_ids = reschedules
653 .keys()
654 .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
655 .cloned()
656 .collect::<HashSet<_>>();
657 for job_id in related_job_ids {
658 if let Some(job) = self.jobs.get_mut(&job_id)
659 && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
660 {
661 tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
662 }
663 }
664 }
665 for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
666 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
667 warn!(
668 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
669 );
670 continue;
671 };
672 let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
673 CreateStreamingJobStatus::Init => {
674 continue;
675 }
676 CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
677 CreateStreamingJobStatus::Created => {
678 if !progress.done {
679 warn!("update the progress of an created streaming job: {progress:?}");
680 }
681 continue;
682 }
683 };
684 tracker.apply_progress(progress, version_stats);
685 }
686 for progress in resps
687 .iter()
688 .flat_map(|resp| &resp.cdc_table_backfill_progress)
689 {
690 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
691 warn!(
692 "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
693 );
694 continue;
695 };
696 let Some(tracker) = &mut self
697 .jobs
698 .get_mut(job_id)
699 .expect("should exist")
700 .cdc_table_backfill_tracker
701 else {
702 warn!("update the cdc progress of an created streaming job: {progress:?}");
703 continue;
704 };
705 tracker.update_split_progress(progress);
706 }
707 for cdc_offset_updated in resps
709 .iter()
710 .flat_map(|resp| &resp.cdc_source_offset_updated)
711 {
712 use risingwave_common::id::SourceId;
713 let source_id = SourceId::new(cdc_offset_updated.source_id);
714 let job_id = source_id.as_share_source_job_id();
715 if let Some(job) = self.jobs.get_mut(&job_id) {
716 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
717 tracker.mark_cdc_source_finished();
718 }
719 } else {
720 warn!(
721 "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
722 cdc_offset_updated.source_id, job_id
723 );
724 }
725 }
726 }
727
728 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
729 self.jobs.values().filter_map(|job| match &job.status {
730 CreateStreamingJobStatus::Init => None,
731 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
732 CreateStreamingJobStatus::Created => None,
733 })
734 }
735
736 fn iter_mut_creating_job_tracker(
737 &mut self,
738 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
739 self.jobs
740 .values_mut()
741 .filter_map(|job| match &mut job.status {
742 CreateStreamingJobStatus::Init => None,
743 CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
744 CreateStreamingJobStatus::Created => None,
745 })
746 }
747
748 pub(super) fn has_pending_finished_jobs(&self) -> bool {
749 self.iter_creating_job_tracker()
750 .any(|tracker| tracker.is_finished())
751 }
752
753 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
754 self.iter_mut_creating_job_tracker()
755 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
756 .collect()
757 }
758
759 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
760 let mut finished_jobs = vec![];
761 let mut table_ids_to_truncate = vec![];
762 let mut finished_cdc_table_backfill = vec![];
763 for (job_id, job) in &mut self.jobs {
764 if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
765 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
766 table_ids_to_truncate.extend(truncate_table_ids);
767 if is_finished {
768 let CreateStreamingJobStatus::Creating { tracker, .. } =
769 replace(&mut job.status, CreateStreamingJobStatus::Created)
770 else {
771 unreachable!()
772 };
773 finished_jobs.push(tracker.into_tracking_job());
774 }
775 }
776 if let Some(tracker) = &mut job.cdc_table_backfill_tracker
777 && tracker.take_pre_completed()
778 {
779 finished_cdc_table_backfill.push(*job_id);
780 }
781 }
782 StagingCommitInfo {
783 finished_jobs,
784 table_ids_to_truncate,
785 finished_cdc_table_backfill,
786 }
787 }
788
789 pub fn fragment_subscribers(
790 &self,
791 fragment_id: FragmentId,
792 ) -> impl Iterator<Item = SubscriberId> + '_ {
793 let job_id = self.fragment_location[&fragment_id];
794 self.jobs[&job_id].subscribers.keys().copied()
795 }
796
797 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
798 self.jobs[&job_id].subscribers.keys().copied()
799 }
800
801 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
802 self.jobs
803 .iter()
804 .filter_map(|(job_id, info)| {
805 info.subscribers
806 .values()
807 .filter_map(|subscriber| match subscriber {
808 SubscriberType::Subscription(retention) => Some(*retention),
809 SubscriberType::SnapshotBackfill => None,
810 })
811 .max()
812 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
813 })
814 .collect()
815 }
816
817 pub fn register_subscriber(
818 &mut self,
819 job_id: JobId,
820 subscriber_id: SubscriberId,
821 subscriber: SubscriberType,
822 ) {
823 self.jobs
824 .get_mut(&job_id)
825 .expect("should exist")
826 .subscribers
827 .try_insert(subscriber_id, subscriber)
828 .expect("non duplicate");
829 }
830
831 pub fn unregister_subscriber(
832 &mut self,
833 job_id: JobId,
834 subscriber_id: SubscriberId,
835 ) -> Option<SubscriberType> {
836 self.jobs
837 .get_mut(&job_id)
838 .expect("should exist")
839 .subscribers
840 .remove(&subscriber_id)
841 }
842
843 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
844 let job_id = self.fragment_location[&fragment_id];
845 let fragment = self
846 .jobs
847 .get_mut(&job_id)
848 .expect("should exist")
849 .fragment_infos
850 .get_mut(&fragment_id)
851 .expect("should exist");
852 (fragment, job_id)
853 }
854
855 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
856 Self {
857 database_id,
858 jobs: Default::default(),
859 fragment_location: Default::default(),
860 shared_actor_infos,
861 }
862 }
863
864 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
865 shared_actor_infos.remove_database(database_id);
867 Self::empty_inner(database_id, shared_actor_infos)
868 }
869
870 pub fn recover(
871 database_id: DatabaseId,
872 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
873 shared_actor_infos: SharedActorInfos,
874 ) -> Self {
875 let mut info = Self::empty_inner(database_id, shared_actor_infos);
876 for job in jobs {
877 info.add_existing(job);
878 }
879 info
880 }
881
882 pub fn is_empty(&self) -> bool {
883 self.jobs.is_empty()
884 }
885
886 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
887 let InflightStreamingJobInfo {
888 job_id,
889 fragment_infos,
890 subscribers,
891 status,
892 cdc_table_backfill_tracker,
893 } = job;
894 self.jobs
895 .try_insert(
896 job.job_id,
897 InflightStreamingJobInfo {
898 job_id,
899 subscribers,
900 fragment_infos: Default::default(), status,
902 cdc_table_backfill_tracker,
903 },
904 )
905 .expect("non-duplicate");
906 let post_apply_changes =
907 self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
908 (
909 fragment_id,
910 CommandFragmentChanges::NewFragment {
911 job_id: job.job_id,
912 info,
913 },
914 )
915 }));
916 self.post_apply(post_apply_changes);
917 }
918
919 pub(crate) fn pre_apply(
922 &mut self,
923 new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
924 fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
925 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
926 if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
927 self.jobs
928 .try_insert(
929 job_id,
930 InflightStreamingJobInfo {
931 job_id,
932 fragment_infos: Default::default(),
933 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
935 cdc_table_backfill_tracker,
936 },
937 )
938 .expect("non-duplicate");
939 }
940 self.apply_add(fragment_changes.into_iter())
941 }
942
943 fn apply_add(
944 &mut self,
945 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
946 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
947 let mut post_apply = HashMap::new();
948 {
949 let shared_infos = self.shared_actor_infos.clone();
950 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
951 for (fragment_id, change) in fragment_changes {
952 match change {
953 CommandFragmentChanges::NewFragment { job_id, info } => {
954 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
955 shared_actor_writer.upsert([(&info, job_id)]);
956 fragment_infos
957 .fragment_infos
958 .try_insert(fragment_id, info)
959 .expect("non duplicate");
960 self.fragment_location
961 .try_insert(fragment_id, job_id)
962 .expect("non duplicate");
963 }
964 CommandFragmentChanges::Reschedule {
965 new_actors,
966 actor_update_vnode_bitmap,
967 to_remove,
968 actor_splits,
969 } => {
970 let (info, _) = self.fragment_mut(fragment_id);
971 let actors = &mut info.actors;
972 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
973 actors
974 .get_mut(&actor_id)
975 .expect("should exist")
976 .vnode_bitmap = Some(new_vnodes);
977 }
978 for (actor_id, actor) in new_actors {
979 actors
980 .try_insert(actor_id as _, actor)
981 .expect("non-duplicate");
982 }
983 for (actor_id, splits) in actor_splits {
984 actors.get_mut(&actor_id).expect("should exist").splits = splits;
985 }
986
987 post_apply.insert(
988 fragment_id,
989 PostApplyFragmentChanges::Reschedule { to_remove },
990 );
991
992 }
994 CommandFragmentChanges::RemoveFragment => {
995 post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
996 }
997 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
998 let mut remaining_fragment_ids: HashSet<_> =
999 replace_map.keys().cloned().collect();
1000 let (info, _) = self.fragment_mut(fragment_id);
1001 visit_stream_node_mut(&mut info.nodes, |node| {
1002 if let NodeBody::Merge(m) = node
1003 && let Some(new_upstream_fragment_id) =
1004 replace_map.get(&m.upstream_fragment_id)
1005 {
1006 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1007 if cfg!(debug_assertions) {
1008 panic!(
1009 "duplicate upstream fragment: {:?} {:?}",
1010 m, replace_map
1011 );
1012 } else {
1013 warn!(?m, ?replace_map, "duplicate upstream fragment");
1014 }
1015 }
1016 m.upstream_fragment_id = *new_upstream_fragment_id;
1017 }
1018 });
1019 if cfg!(debug_assertions) {
1020 assert!(
1021 remaining_fragment_ids.is_empty(),
1022 "non-existing fragment to replace: {:?} {:?} {:?}",
1023 remaining_fragment_ids,
1024 info.nodes,
1025 replace_map
1026 );
1027 } else {
1028 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1029 }
1030 }
1031 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
1032 let (info, _) = self.fragment_mut(fragment_id);
1033 let mut injected = false;
1034 visit_stream_node_mut(&mut info.nodes, |node| {
1035 if let NodeBody::UpstreamSinkUnion(u) = node {
1036 if cfg!(debug_assertions) {
1037 let current_upstream_fragment_ids = u
1038 .init_upstreams
1039 .iter()
1040 .map(|upstream| upstream.upstream_fragment_id)
1041 .collect::<HashSet<_>>();
1042 if current_upstream_fragment_ids
1043 .contains(&new_upstream_info.upstream_fragment_id)
1044 {
1045 panic!(
1046 "duplicate upstream fragment: {:?} {:?}",
1047 u, new_upstream_info
1048 );
1049 }
1050 }
1051 u.init_upstreams.push(new_upstream_info.clone());
1052 injected = true;
1053 }
1054 });
1055 assert!(injected, "should inject upstream into UpstreamSinkUnion");
1056 }
1057 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
1058 let (info, _) = self.fragment_mut(fragment_id);
1059 let mut removed = false;
1060 visit_stream_node_mut(&mut info.nodes, |node| {
1061 if let NodeBody::UpstreamSinkUnion(u) = node {
1062 if cfg!(debug_assertions) {
1063 let current_upstream_fragment_ids = u
1064 .init_upstreams
1065 .iter()
1066 .map(|upstream| upstream.upstream_fragment_id)
1067 .collect::<HashSet<FragmentId>>();
1068 for drop_fragment_id in &drop_upstream_fragment_ids {
1069 if !current_upstream_fragment_ids.contains(drop_fragment_id)
1070 {
1071 panic!(
1072 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1073 u, drop_upstream_fragment_ids, drop_fragment_id
1074 );
1075 }
1076 }
1077 }
1078 u.init_upstreams.retain(|upstream| {
1079 !drop_upstream_fragment_ids
1080 .contains(&upstream.upstream_fragment_id)
1081 });
1082 removed = true;
1083 }
1084 });
1085 assert!(removed, "should remove upstream from UpstreamSinkUnion");
1086 }
1087 CommandFragmentChanges::SplitAssignment { actor_splits } => {
1088 let (info, job_id) = self.fragment_mut(fragment_id);
1089 let actors = &mut info.actors;
1090 for (actor_id, splits) in actor_splits {
1091 actors.get_mut(&actor_id).expect("should exist").splits = splits;
1092 }
1093 shared_actor_writer.upsert([(&*info, job_id)]);
1094 }
1095 }
1096 }
1097 shared_actor_writer.finish();
1098 }
1099 post_apply
1100 }
1101
1102 pub(super) fn build_edge(
1103 &self,
1104 command: Option<&Command>,
1105 control_stream_manager: &ControlStreamManager,
1106 ) -> Option<FragmentEdgeBuildResult> {
1107 let (info, replace_job, new_upstream_sink) = match command {
1108 None => {
1109 return None;
1110 }
1111 Some(command) => match command {
1112 Command::Flush
1113 | Command::Pause
1114 | Command::Resume
1115 | Command::DropStreamingJobs { .. }
1116 | Command::RescheduleFragment { .. }
1117 | Command::SourceChangeSplit { .. }
1118 | Command::Throttle { .. }
1119 | Command::CreateSubscription { .. }
1120 | Command::DropSubscription { .. }
1121 | Command::ConnectorPropsChange(_)
1122 | Command::Refresh { .. }
1123 | Command::ListFinish { .. }
1124 | Command::LoadFinish { .. }
1125 | Command::ResumeBackfill { .. } => {
1126 return None;
1127 }
1128 Command::CreateStreamingJob { info, job_type, .. } => {
1129 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1130 new_upstream_sink,
1131 ) = job_type
1132 {
1133 Some(new_upstream_sink)
1134 } else {
1135 None
1136 };
1137 let is_snapshot_backfill =
1138 matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_));
1139 (Some((info, is_snapshot_backfill)), None, new_upstream_sink)
1140 }
1141 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1142 Command::ResetSource { .. } => (None, None, None),
1143 },
1144 };
1145 let existing_fragment_ids = info
1153 .into_iter()
1154 .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1155 .chain(replace_job.into_iter().flat_map(|replace_job| {
1156 replace_job
1157 .upstream_fragment_downstreams
1158 .keys()
1159 .filter(|fragment_id| {
1160 info.map(|(info, _)| {
1161 !info
1162 .stream_job_fragments
1163 .fragments
1164 .contains_key(*fragment_id)
1165 })
1166 .unwrap_or(true)
1167 })
1168 .chain(replace_job.replace_upstream.keys())
1169 }))
1170 .chain(
1171 new_upstream_sink
1172 .into_iter()
1173 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1174 )
1175 .cloned();
1176 let new_fragment_infos = info
1177 .into_iter()
1178 .flat_map(|(info, is_snapshot_backfill)| {
1179 let partial_graph_id = to_partial_graph_id(
1180 self.database_id,
1181 is_snapshot_backfill.then_some(info.streaming_job.id()),
1182 );
1183 info.stream_job_fragments
1184 .new_fragment_info(&info.init_split_assignment)
1185 .map(move |(fragment_id, info)| (fragment_id, info, partial_graph_id))
1186 })
1187 .chain(
1188 replace_job
1189 .into_iter()
1190 .flat_map(|replace_job| {
1191 replace_job
1192 .new_fragments
1193 .new_fragment_info(&replace_job.init_split_assignment)
1194 .chain(
1195 replace_job
1196 .auto_refresh_schema_sinks
1197 .as_ref()
1198 .into_iter()
1199 .flat_map(|sinks| {
1200 sinks.iter().map(|sink| {
1201 (
1202 sink.new_fragment.fragment_id,
1203 sink.new_fragment_info(),
1204 )
1205 })
1206 }),
1207 )
1208 })
1209 .map(|(fragment_id, fragment)| {
1210 (
1211 fragment_id,
1212 fragment,
1213 to_partial_graph_id(self.database_id, None),
1215 )
1216 }),
1217 )
1218 .collect_vec();
1219 let mut builder = FragmentEdgeBuilder::new(
1220 existing_fragment_ids
1221 .map(|fragment_id| {
1222 (
1223 self.fragment(fragment_id),
1224 to_partial_graph_id(self.database_id, None),
1225 )
1226 })
1227 .chain(
1228 new_fragment_infos
1229 .iter()
1230 .map(|(_, info, partial_graph_id)| (info, *partial_graph_id)),
1231 ),
1232 control_stream_manager,
1233 );
1234 if let Some((info, _)) = info {
1235 builder.add_relations(&info.upstream_fragment_downstreams);
1236 builder.add_relations(&info.stream_job_fragments.downstreams);
1237 }
1238 if let Some(replace_job) = replace_job {
1239 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1240 builder.add_relations(&replace_job.new_fragments.downstreams);
1241 }
1242 if let Some(new_upstream_sink) = new_upstream_sink {
1243 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1244 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1245 builder.add_edge(sink_fragment_id, new_sink_downstream);
1246 }
1247 if let Some(replace_job) = replace_job {
1248 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1249 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1250 fragment_replacement
1251 {
1252 builder.replace_upstream(
1253 *fragment_id,
1254 *original_upstream_fragment_id,
1255 *new_upstream_fragment_id,
1256 );
1257 }
1258 }
1259 }
1260 Some(builder.build())
1261 }
1262
1263 pub(crate) fn post_apply(
1266 &mut self,
1267 fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1268 ) {
1269 let inner = self.shared_actor_infos.clone();
1270 let mut shared_actor_writer = inner.start_writer(self.database_id);
1271 {
1272 for (fragment_id, changes) in fragment_changes {
1273 match changes {
1274 PostApplyFragmentChanges::Reschedule { to_remove } => {
1275 let job_id = self.fragment_location[&fragment_id];
1276 let info = self
1277 .jobs
1278 .get_mut(&job_id)
1279 .expect("should exist")
1280 .fragment_infos
1281 .get_mut(&fragment_id)
1282 .expect("should exist");
1283 for actor_id in to_remove {
1284 assert!(info.actors.remove(&actor_id).is_some());
1285 }
1286 shared_actor_writer.upsert([(&*info, job_id)]);
1287 }
1288 PostApplyFragmentChanges::RemoveFragment => {
1289 let job_id = self
1290 .fragment_location
1291 .remove(&fragment_id)
1292 .expect("should exist");
1293 let job = self.jobs.get_mut(&job_id).expect("should exist");
1294 let fragment = job
1295 .fragment_infos
1296 .remove(&fragment_id)
1297 .expect("should exist");
1298 shared_actor_writer.remove(&fragment);
1299 if job.fragment_infos.is_empty() {
1300 self.jobs.remove(&job_id).expect("should exist");
1301 }
1302 }
1303 }
1304 }
1305 }
1306 shared_actor_writer.finish();
1307 }
1308}
1309
1310impl InflightFragmentInfo {
1311 pub(crate) fn actor_ids_to_collect(
1313 infos: impl IntoIterator<Item = &Self>,
1314 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1315 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1316 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1317 assert!(
1318 ret.entry(actor.worker_id)
1319 .or_default()
1320 .insert(*actor_id as _)
1321 )
1322 }
1323 ret
1324 }
1325
1326 pub fn existing_table_ids<'a>(
1327 infos: impl IntoIterator<Item = &'a Self> + 'a,
1328 ) -> impl Iterator<Item = TableId> + 'a {
1329 infos
1330 .into_iter()
1331 .flat_map(|info| info.state_table_ids.iter().cloned())
1332 }
1333
1334 pub fn workers<'a>(
1335 infos: impl IntoIterator<Item = &'a Self> + 'a,
1336 ) -> impl Iterator<Item = WorkerId> + 'a {
1337 infos
1338 .into_iter()
1339 .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1340 }
1341
1342 pub fn contains_worker<'a>(
1343 infos: impl IntoIterator<Item = &'a Self> + 'a,
1344 worker_id: WorkerId,
1345 ) -> bool {
1346 Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1347 }
1348}
1349
1350impl InflightDatabaseInfo {
1351 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1352 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1353 }
1354
1355 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1356 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1357 }
1358}