1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
36use risingwave_pb::stream_plan::stream_node::NodeBody;
37use risingwave_pb::stream_service::BarrierCompleteResponse;
38use tracing::{info, warn};
39
40use crate::MetaResult;
41use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
42use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
43use crate::barrier::rpc::ControlStreamManager;
44use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::controller::utils::rebuild_fragment_mapping;
47use crate::manager::NotificationManagerRef;
48use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
49
50#[derive(Debug, Clone)]
51pub struct SharedActorInfo {
52 pub worker_id: WorkerId,
53 pub vnode_bitmap: Option<Bitmap>,
54 pub splits: Vec<SplitImpl>,
55}
56
57impl From<&InflightActorInfo> for SharedActorInfo {
58 fn from(value: &InflightActorInfo) -> Self {
59 Self {
60 worker_id: value.worker_id,
61 vnode_bitmap: value.vnode_bitmap.clone(),
62 splits: value.splits.clone(),
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct SharedFragmentInfo {
69 pub fragment_id: FragmentId,
70 pub job_id: JobId,
71 pub distribution_type: DistributionType,
72 pub actors: HashMap<ActorId, SharedActorInfo>,
73 pub vnode_count: usize,
74 pub fragment_type_mask: FragmentTypeMask,
75}
76
77impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
78 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
79 let (info, job_id) = pair;
80
81 let InflightFragmentInfo {
82 fragment_id,
83 distribution_type,
84 fragment_type_mask,
85 actors,
86 vnode_count,
87 ..
88 } = info;
89
90 Self {
91 fragment_id: *fragment_id,
92 job_id,
93 distribution_type: *distribution_type,
94 fragment_type_mask: *fragment_type_mask,
95 actors: actors
96 .iter()
97 .map(|(actor_id, actor)| (*actor_id, actor.into()))
98 .collect(),
99 vnode_count: *vnode_count,
100 }
101 }
102}
103
104#[derive(Default, Debug)]
105pub struct SharedActorInfosInner {
106 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
107}
108
109impl SharedActorInfosInner {
110 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
111 self.info
112 .values()
113 .find_map(|database| database.get(&fragment_id))
114 }
115
116 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
117 self.info.values().flatten()
118 }
119}
120
121#[derive(Clone, educe::Educe)]
122#[educe(Debug)]
123pub struct SharedActorInfos {
124 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
125 #[educe(Debug(ignore))]
126 notification_manager: NotificationManagerRef,
127}
128
129impl SharedActorInfos {
130 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
131 self.inner.read()
132 }
133
134 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
135 let core = self.inner.read();
136 core.iter_over_fragments()
137 .flat_map(|(_, fragment)| {
138 fragment
139 .actors
140 .iter()
141 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
142 })
143 .collect()
144 }
145
146 pub fn migrate_splits_for_source_actors(
152 &self,
153 fragment_id: FragmentId,
154 prev_actor_ids: &[ActorId],
155 curr_actor_ids: &[ActorId],
156 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
157 let guard = self.read_guard();
158
159 let prev_splits = prev_actor_ids
160 .iter()
161 .flat_map(|actor_id| {
162 guard
164 .get_fragment(fragment_id)
165 .and_then(|info| info.actors.get(actor_id))
166 .map(|actor| actor.splits.clone())
167 .unwrap_or_default()
168 })
169 .map(|split| (split.id(), split))
170 .collect();
171
172 let empty_actor_splits = curr_actor_ids
173 .iter()
174 .map(|actor_id| (*actor_id, vec![]))
175 .collect();
176
177 let diff = crate::stream::source_manager::reassign_splits(
178 fragment_id,
179 empty_actor_splits,
180 &prev_splits,
181 std::default::Default::default(),
183 )
184 .unwrap_or_default();
185
186 Ok(diff)
187 }
188}
189
190impl SharedActorInfos {
191 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
192 Self {
193 inner: Arc::new(Default::default()),
194 notification_manager,
195 }
196 }
197
198 pub(super) fn remove_database(&self, database_id: DatabaseId) {
199 if let Some(database) = self.inner.write().info.remove(&database_id) {
200 let mapping = database
201 .into_values()
202 .map(|fragment| rebuild_fragment_mapping(&fragment))
203 .collect_vec();
204 if !mapping.is_empty() {
205 self.notification_manager
206 .notify_fragment_mapping(Operation::Delete, mapping);
207 }
208 }
209 }
210
211 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
212 let database_ids: HashSet<_> = database_ids.into_iter().collect();
213
214 let mut mapping = Vec::new();
215 for fragment in self
216 .inner
217 .write()
218 .info
219 .extract_if(|database_id, _| !database_ids.contains(database_id))
220 .flat_map(|(_, fragments)| fragments.into_values())
221 {
222 mapping.push(rebuild_fragment_mapping(&fragment));
223 }
224 if !mapping.is_empty() {
225 self.notification_manager
226 .notify_fragment_mapping(Operation::Delete, mapping);
227 }
228 }
229
230 pub(super) fn recover_database(
231 &self,
232 database_id: DatabaseId,
233 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
234 ) {
235 let mut remaining_fragments: HashMap<_, _> = fragments
236 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
237 .collect();
238 let mut writer = self.start_writer(database_id);
240 let database = writer.write_guard.info.entry(database_id).or_default();
241 for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
242 if let Some(info) = remaining_fragments.remove(fragment_id) {
243 let info = info.into();
244 writer
245 .updated_fragment_mapping
246 .get_or_insert_default()
247 .push(rebuild_fragment_mapping(&info));
248 *fragment_infos = info;
249 false
250 } else {
251 true
252 }
253 }) {
254 writer
255 .deleted_fragment_mapping
256 .get_or_insert_default()
257 .push(rebuild_fragment_mapping(&fragment));
258 }
259 for (fragment_id, info) in remaining_fragments {
260 let info = info.into();
261 writer
262 .added_fragment_mapping
263 .get_or_insert_default()
264 .push(rebuild_fragment_mapping(&info));
265 database.insert(fragment_id, info);
266 }
267 writer.finish();
268 }
269
270 pub(super) fn upsert(
271 &self,
272 database_id: DatabaseId,
273 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
274 ) {
275 let mut writer = self.start_writer(database_id);
276 writer.upsert(infos);
277 writer.finish();
278 }
279
280 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
281 SharedActorInfoWriter {
282 database_id,
283 write_guard: self.inner.write(),
284 notification_manager: &self.notification_manager,
285 added_fragment_mapping: None,
286 updated_fragment_mapping: None,
287 deleted_fragment_mapping: None,
288 }
289 }
290}
291
292pub(super) struct SharedActorInfoWriter<'a> {
293 database_id: DatabaseId,
294 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
295 notification_manager: &'a NotificationManagerRef,
296 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
297 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
298 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299}
300
301impl SharedActorInfoWriter<'_> {
302 pub(super) fn upsert(
303 &mut self,
304 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
305 ) {
306 let database = self.write_guard.info.entry(self.database_id).or_default();
307 for info @ (fragment, _) in infos {
308 match database.entry(fragment.fragment_id) {
309 Entry::Occupied(mut entry) => {
310 let info = info.into();
311 self.updated_fragment_mapping
312 .get_or_insert_default()
313 .push(rebuild_fragment_mapping(&info));
314 entry.insert(info);
315 }
316 Entry::Vacant(entry) => {
317 let info = info.into();
318 self.added_fragment_mapping
319 .get_or_insert_default()
320 .push(rebuild_fragment_mapping(&info));
321 entry.insert(info);
322 }
323 }
324 }
325 }
326
327 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
328 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
329 && let Some(fragment) = database.remove(&info.fragment_id)
330 {
331 self.deleted_fragment_mapping
332 .get_or_insert_default()
333 .push(rebuild_fragment_mapping(&fragment));
334 }
335 }
336
337 pub(super) fn finish(self) {
338 if let Some(mapping) = self.added_fragment_mapping {
339 self.notification_manager
340 .notify_fragment_mapping(Operation::Add, mapping);
341 }
342 if let Some(mapping) = self.updated_fragment_mapping {
343 self.notification_manager
344 .notify_fragment_mapping(Operation::Update, mapping);
345 }
346 if let Some(mapping) = self.deleted_fragment_mapping {
347 self.notification_manager
348 .notify_fragment_mapping(Operation::Delete, mapping);
349 }
350 }
351}
352
353#[derive(Debug, Clone)]
354pub(super) struct BarrierInfo {
355 pub prev_epoch: TracedEpoch,
356 pub curr_epoch: TracedEpoch,
357 pub kind: BarrierKind,
358}
359
360impl BarrierInfo {
361 pub(super) fn prev_epoch(&self) -> u64 {
362 self.prev_epoch.value().0
363 }
364}
365
366#[derive(Debug)]
367pub(super) enum CommandFragmentChanges {
368 NewFragment {
369 job_id: JobId,
370 info: InflightFragmentInfo,
371 is_existing: bool,
375 },
376 AddNodeUpstream(PbUpstreamSinkInfo),
377 DropNodeUpstream(Vec<FragmentId>),
378 ReplaceNodeUpstream(
379 HashMap<FragmentId, FragmentId>,
381 ),
382 Reschedule {
383 new_actors: HashMap<ActorId, InflightActorInfo>,
384 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
385 to_remove: HashSet<ActorId>,
386 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
387 },
388 RemoveFragment,
389 SplitAssignment {
390 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
391 },
392}
393
394pub(super) enum PostApplyFragmentChanges {
395 Reschedule { to_remove: HashSet<ActorId> },
396 RemoveFragment,
397}
398
399#[derive(Clone, Debug)]
400pub enum SubscriberType {
401 Subscription(u64),
402 SnapshotBackfill,
403}
404
405#[derive(Debug)]
406pub(super) enum CreateStreamingJobStatus {
407 Init,
408 Creating(CreateMviewProgressTracker),
409 Created,
410}
411
412#[derive(Debug)]
413pub(super) struct InflightStreamingJobInfo {
414 pub job_id: JobId,
415 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
416 pub subscribers: HashMap<SubscriberId, SubscriberType>,
417 pub status: CreateStreamingJobStatus,
418}
419
420impl InflightStreamingJobInfo {
421 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
422 self.fragment_infos.values()
423 }
424
425 pub fn snapshot_backfill_actor_ids(
426 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
427 ) -> impl Iterator<Item = ActorId> + '_ {
428 fragment_infos
429 .values()
430 .filter(|fragment| {
431 fragment
432 .fragment_type_mask
433 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
434 })
435 .flat_map(|fragment| fragment.actors.keys().copied())
436 }
437
438 pub fn tracking_progress_actor_ids(
439 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
440 ) -> Vec<(ActorId, BackfillUpstreamType)> {
441 StreamJobFragments::tracking_progress_actor_ids_impl(
442 fragment_infos
443 .values()
444 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
445 )
446 }
447}
448
449impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
450 type Item = &'a InflightFragmentInfo;
451
452 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
453
454 fn into_iter(self) -> Self::IntoIter {
455 self.fragment_infos()
456 }
457}
458
459#[derive(Debug)]
460pub struct InflightDatabaseInfo {
461 database_id: DatabaseId,
462 jobs: HashMap<JobId, InflightStreamingJobInfo>,
463 fragment_location: HashMap<FragmentId, JobId>,
464 pub(super) shared_actor_infos: SharedActorInfos,
465}
466
467impl InflightDatabaseInfo {
468 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
469 self.jobs.values().flat_map(|job| job.fragment_infos())
470 }
471
472 pub fn contains_job(&self, job_id: JobId) -> bool {
473 self.jobs.contains_key(&job_id)
474 }
475
476 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
477 let job_id = self.fragment_location[&fragment_id];
478 self.jobs
479 .get(&job_id)
480 .expect("should exist")
481 .fragment_infos
482 .get(&fragment_id)
483 .expect("should exist")
484 }
485
486 pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
487 self.jobs
488 .iter()
489 .filter_map(|(job_id, job)| match &job.status {
490 CreateStreamingJobStatus::Init => None,
491 CreateStreamingJobStatus::Creating(tracker) => {
492 Some((*job_id, tracker.gen_ddl_progress()))
493 }
494 CreateStreamingJobStatus::Created => None,
495 })
496 }
497
498 pub(super) fn apply_collected_command(
499 &mut self,
500 command: Option<&Command>,
501 resps: impl Iterator<Item = &BarrierCompleteResponse>,
502 version_stats: &HummockVersionStats,
503 ) {
504 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
505 match job_type {
506 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
507 let job_id = info.streaming_job.id();
508 if let Some(job_info) = self.jobs.get_mut(&job_id) {
509 let CreateStreamingJobStatus::Init = replace(
510 &mut job_info.status,
511 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
512 info,
513 version_stats,
514 )),
515 ) else {
516 unreachable!("should be init before collect the first barrier")
517 };
518 } else {
519 info!(%job_id, "newly create job get cancelled before first barrier is collected")
520 }
521 }
522 CreateStreamingJobType::SnapshotBackfill(_) => {
523 }
525 }
526 }
527 for progress in resps.flat_map(|resp| &resp.create_mview_progress) {
528 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
529 warn!(
530 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
531 );
532 continue;
533 };
534 let CreateStreamingJobStatus::Creating(tracker) =
535 &mut self.jobs.get_mut(job_id).expect("should exist").status
536 else {
537 warn!("update the progress of an created streaming job: {progress:?}");
538 continue;
539 };
540 tracker.apply_progress(progress, version_stats);
541 }
542 }
543
544 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
545 self.jobs.values().filter_map(|job| match &job.status {
546 CreateStreamingJobStatus::Init => None,
547 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
548 CreateStreamingJobStatus::Created => None,
549 })
550 }
551
552 fn iter_mut_creating_job_tracker(
553 &mut self,
554 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
555 self.jobs
556 .values_mut()
557 .filter_map(|job| match &mut job.status {
558 CreateStreamingJobStatus::Init => None,
559 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
560 CreateStreamingJobStatus::Created => None,
561 })
562 }
563
564 pub(super) fn has_pending_finished_jobs(&self) -> bool {
565 self.iter_creating_job_tracker()
566 .any(|tracker| tracker.is_finished())
567 }
568
569 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
570 self.iter_mut_creating_job_tracker()
571 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
572 .collect()
573 }
574
575 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
576 let mut finished_jobs = vec![];
577 let mut table_ids_to_truncate = vec![];
578 for job in self.jobs.values_mut() {
579 if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
580 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
581 table_ids_to_truncate.extend(truncate_table_ids);
582 if is_finished {
583 let CreateStreamingJobStatus::Creating(tracker) =
584 replace(&mut job.status, CreateStreamingJobStatus::Created)
585 else {
586 unreachable!()
587 };
588 finished_jobs.push(tracker.into_tracking_job());
589 }
590 }
591 }
592 StagingCommitInfo {
593 finished_jobs,
594 table_ids_to_truncate,
595 }
596 }
597
598 pub fn fragment_subscribers(
599 &self,
600 fragment_id: FragmentId,
601 ) -> impl Iterator<Item = SubscriberId> + '_ {
602 let job_id = self.fragment_location[&fragment_id];
603 self.jobs[&job_id].subscribers.keys().copied()
604 }
605
606 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
607 self.jobs[&job_id].subscribers.keys().copied()
608 }
609
610 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
611 self.jobs
612 .iter()
613 .filter_map(|(job_id, info)| {
614 info.subscribers
615 .values()
616 .filter_map(|subscriber| match subscriber {
617 SubscriberType::Subscription(retention) => Some(*retention),
618 SubscriberType::SnapshotBackfill => None,
619 })
620 .max()
621 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
622 })
623 .collect()
624 }
625
626 pub fn register_subscriber(
627 &mut self,
628 job_id: JobId,
629 subscriber_id: SubscriberId,
630 subscriber: SubscriberType,
631 ) {
632 self.jobs
633 .get_mut(&job_id)
634 .expect("should exist")
635 .subscribers
636 .try_insert(subscriber_id, subscriber)
637 .expect("non duplicate");
638 }
639
640 pub fn unregister_subscriber(
641 &mut self,
642 job_id: JobId,
643 subscriber_id: SubscriberId,
644 ) -> Option<SubscriberType> {
645 self.jobs
646 .get_mut(&job_id)
647 .expect("should exist")
648 .subscribers
649 .remove(&subscriber_id)
650 }
651
652 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
653 let job_id = self.fragment_location[&fragment_id];
654 let fragment = self
655 .jobs
656 .get_mut(&job_id)
657 .expect("should exist")
658 .fragment_infos
659 .get_mut(&fragment_id)
660 .expect("should exist");
661 (fragment, job_id)
662 }
663
664 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
665 Self {
666 database_id,
667 jobs: Default::default(),
668 fragment_location: Default::default(),
669 shared_actor_infos,
670 }
671 }
672
673 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
674 shared_actor_infos.remove_database(database_id);
676 Self::empty_inner(database_id, shared_actor_infos)
677 }
678
679 pub fn recover(
680 database_id: DatabaseId,
681 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
682 shared_actor_infos: SharedActorInfos,
683 ) -> Self {
684 let mut info = Self::empty_inner(database_id, shared_actor_infos);
685 for job in jobs {
686 info.add_existing(job);
687 }
688 info
689 }
690
691 pub fn is_empty(&self) -> bool {
692 self.jobs.is_empty()
693 }
694
695 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
696 let InflightStreamingJobInfo {
697 job_id,
698 fragment_infos,
699 subscribers,
700 status,
701 } = job;
702 self.jobs
703 .try_insert(
704 job.job_id,
705 InflightStreamingJobInfo {
706 job_id,
707 subscribers,
708 fragment_infos: Default::default(), status,
710 },
711 )
712 .expect("non-duplicate");
713 let post_apply_changes =
714 self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
715 (
716 fragment_id,
717 CommandFragmentChanges::NewFragment {
718 job_id: job.job_id,
719 info,
720 is_existing: true,
721 },
722 )
723 }));
724 self.post_apply(post_apply_changes);
725 }
726
727 pub(crate) fn pre_apply(
730 &mut self,
731 new_job_id: Option<JobId>,
732 fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
733 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
734 if let Some(job_id) = new_job_id {
735 self.jobs
736 .try_insert(
737 job_id,
738 InflightStreamingJobInfo {
739 job_id,
740 fragment_infos: Default::default(),
741 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
743 },
744 )
745 .expect("non-duplicate");
746 }
747 self.apply_add(fragment_changes.into_iter())
748 }
749
750 fn apply_add(
751 &mut self,
752 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
753 ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
754 let mut post_apply = HashMap::new();
755 {
756 let shared_infos = self.shared_actor_infos.clone();
757 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
758 for (fragment_id, change) in fragment_changes {
759 match change {
760 CommandFragmentChanges::NewFragment {
761 job_id,
762 info,
763 is_existing,
764 } => {
765 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
766 if !is_existing {
767 shared_actor_writer.upsert([(&info, job_id)]);
768 }
769 fragment_infos
770 .fragment_infos
771 .try_insert(fragment_id, info)
772 .expect("non duplicate");
773 self.fragment_location
774 .try_insert(fragment_id, job_id)
775 .expect("non duplicate");
776 }
777 CommandFragmentChanges::Reschedule {
778 new_actors,
779 actor_update_vnode_bitmap,
780 to_remove,
781 actor_splits,
782 } => {
783 let (info, _) = self.fragment_mut(fragment_id);
784 let actors = &mut info.actors;
785 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
786 actors
787 .get_mut(&actor_id)
788 .expect("should exist")
789 .vnode_bitmap = Some(new_vnodes);
790 }
791 for (actor_id, actor) in new_actors {
792 actors
793 .try_insert(actor_id as _, actor)
794 .expect("non-duplicate");
795 }
796 for (actor_id, splits) in actor_splits {
797 actors.get_mut(&actor_id).expect("should exist").splits = splits;
798 }
799
800 post_apply.insert(
801 fragment_id,
802 PostApplyFragmentChanges::Reschedule { to_remove },
803 );
804
805 }
807 CommandFragmentChanges::RemoveFragment => {
808 post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
809 }
810 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
811 let mut remaining_fragment_ids: HashSet<_> =
812 replace_map.keys().cloned().collect();
813 let (info, _) = self.fragment_mut(fragment_id);
814 visit_stream_node_mut(&mut info.nodes, |node| {
815 if let NodeBody::Merge(m) = node
816 && let Some(new_upstream_fragment_id) =
817 replace_map.get(&m.upstream_fragment_id)
818 {
819 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
820 if cfg!(debug_assertions) {
821 panic!(
822 "duplicate upstream fragment: {:?} {:?}",
823 m, replace_map
824 );
825 } else {
826 warn!(?m, ?replace_map, "duplicate upstream fragment");
827 }
828 }
829 m.upstream_fragment_id = *new_upstream_fragment_id;
830 }
831 });
832 if cfg!(debug_assertions) {
833 assert!(
834 remaining_fragment_ids.is_empty(),
835 "non-existing fragment to replace: {:?} {:?} {:?}",
836 remaining_fragment_ids,
837 info.nodes,
838 replace_map
839 );
840 } else {
841 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
842 }
843 }
844 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
845 let (info, _) = self.fragment_mut(fragment_id);
846 let mut injected = false;
847 visit_stream_node_mut(&mut info.nodes, |node| {
848 if let NodeBody::UpstreamSinkUnion(u) = node {
849 if cfg!(debug_assertions) {
850 let current_upstream_fragment_ids = u
851 .init_upstreams
852 .iter()
853 .map(|upstream| upstream.upstream_fragment_id)
854 .collect::<HashSet<_>>();
855 if current_upstream_fragment_ids
856 .contains(&new_upstream_info.upstream_fragment_id)
857 {
858 panic!(
859 "duplicate upstream fragment: {:?} {:?}",
860 u, new_upstream_info
861 );
862 }
863 }
864 u.init_upstreams.push(new_upstream_info.clone());
865 injected = true;
866 }
867 });
868 assert!(injected, "should inject upstream into UpstreamSinkUnion");
869 }
870 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
871 let (info, _) = self.fragment_mut(fragment_id);
872 let mut removed = false;
873 visit_stream_node_mut(&mut info.nodes, |node| {
874 if let NodeBody::UpstreamSinkUnion(u) = node {
875 if cfg!(debug_assertions) {
876 let current_upstream_fragment_ids = u
877 .init_upstreams
878 .iter()
879 .map(|upstream| upstream.upstream_fragment_id)
880 .collect::<HashSet<FragmentId>>();
881 for drop_fragment_id in &drop_upstream_fragment_ids {
882 if !current_upstream_fragment_ids.contains(drop_fragment_id)
883 {
884 panic!(
885 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
886 u, drop_upstream_fragment_ids, drop_fragment_id
887 );
888 }
889 }
890 }
891 u.init_upstreams.retain(|upstream| {
892 !drop_upstream_fragment_ids
893 .contains(&upstream.upstream_fragment_id)
894 });
895 removed = true;
896 }
897 });
898 assert!(removed, "should remove upstream from UpstreamSinkUnion");
899 }
900 CommandFragmentChanges::SplitAssignment { actor_splits } => {
901 let (info, job_id) = self.fragment_mut(fragment_id);
902 let actors = &mut info.actors;
903 for (actor_id, splits) in actor_splits {
904 actors.get_mut(&actor_id).expect("should exist").splits = splits;
905 }
906 shared_actor_writer.upsert([(&*info, job_id)]);
907 }
908 }
909 }
910 shared_actor_writer.finish();
911 }
912 post_apply
913 }
914
915 pub(super) fn build_edge(
916 &self,
917 command: Option<&Command>,
918 control_stream_manager: &ControlStreamManager,
919 ) -> Option<FragmentEdgeBuildResult> {
920 let (info, replace_job, new_upstream_sink) = match command {
921 None => {
922 return None;
923 }
924 Some(command) => match command {
925 Command::Flush
926 | Command::Pause
927 | Command::Resume
928 | Command::DropStreamingJobs { .. }
929 | Command::MergeSnapshotBackfillStreamingJobs(_)
930 | Command::RescheduleFragment { .. }
931 | Command::SourceChangeSplit { .. }
932 | Command::Throttle(_)
933 | Command::CreateSubscription { .. }
934 | Command::DropSubscription { .. }
935 | Command::ConnectorPropsChange(_)
936 | Command::StartFragmentBackfill { .. }
937 | Command::Refresh { .. }
938 | Command::ListFinish { .. }
939 | Command::LoadFinish { .. } => {
940 return None;
941 }
942 Command::CreateStreamingJob { info, job_type, .. } => {
943 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
944 new_upstream_sink,
945 ) = job_type
946 {
947 Some(new_upstream_sink)
948 } else {
949 None
950 };
951 (Some(info), None, new_upstream_sink)
952 }
953 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
954 },
955 };
956 let existing_fragment_ids = info
964 .into_iter()
965 .flat_map(|info| info.upstream_fragment_downstreams.keys())
966 .chain(replace_job.into_iter().flat_map(|replace_job| {
967 replace_job
968 .upstream_fragment_downstreams
969 .keys()
970 .filter(|fragment_id| {
971 info.map(|info| {
972 !info
973 .stream_job_fragments
974 .fragments
975 .contains_key(fragment_id)
976 })
977 .unwrap_or(true)
978 })
979 .chain(replace_job.replace_upstream.keys())
980 }))
981 .chain(
982 new_upstream_sink
983 .into_iter()
984 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
985 )
986 .cloned();
987 let new_fragment_infos = info
988 .into_iter()
989 .flat_map(|info| {
990 info.stream_job_fragments
991 .new_fragment_info(&info.init_split_assignment)
992 })
993 .chain(replace_job.into_iter().flat_map(|replace_job| {
994 replace_job
995 .new_fragments
996 .new_fragment_info(&replace_job.init_split_assignment)
997 .chain(
998 replace_job
999 .auto_refresh_schema_sinks
1000 .as_ref()
1001 .into_iter()
1002 .flat_map(|sinks| {
1003 sinks.iter().map(|sink| {
1004 (sink.new_fragment.fragment_id, sink.new_fragment_info())
1005 })
1006 }),
1007 )
1008 }))
1009 .collect_vec();
1010 let mut builder = FragmentEdgeBuilder::new(
1011 existing_fragment_ids
1012 .map(|fragment_id| self.fragment(fragment_id))
1013 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
1014 control_stream_manager,
1015 );
1016 if let Some(info) = info {
1017 builder.add_relations(&info.upstream_fragment_downstreams);
1018 builder.add_relations(&info.stream_job_fragments.downstreams);
1019 }
1020 if let Some(replace_job) = replace_job {
1021 builder.add_relations(&replace_job.upstream_fragment_downstreams);
1022 builder.add_relations(&replace_job.new_fragments.downstreams);
1023 }
1024 if let Some(new_upstream_sink) = new_upstream_sink {
1025 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1026 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1027 builder.add_edge(sink_fragment_id, new_sink_downstream);
1028 }
1029 if let Some(replace_job) = replace_job {
1030 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1031 for (original_upstream_fragment_id, new_upstream_fragment_id) in
1032 fragment_replacement
1033 {
1034 builder.replace_upstream(
1035 *fragment_id,
1036 *original_upstream_fragment_id,
1037 *new_upstream_fragment_id,
1038 );
1039 }
1040 }
1041 }
1042 Some(builder.build())
1043 }
1044
1045 pub(crate) fn post_apply(
1048 &mut self,
1049 fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1050 ) {
1051 let inner = self.shared_actor_infos.clone();
1052 let mut shared_actor_writer = inner.start_writer(self.database_id);
1053 {
1054 for (fragment_id, changes) in fragment_changes {
1055 match changes {
1056 PostApplyFragmentChanges::Reschedule { to_remove } => {
1057 let job_id = self.fragment_location[&fragment_id];
1058 let info = self
1059 .jobs
1060 .get_mut(&job_id)
1061 .expect("should exist")
1062 .fragment_infos
1063 .get_mut(&fragment_id)
1064 .expect("should exist");
1065 for actor_id in to_remove {
1066 assert!(info.actors.remove(&actor_id).is_some());
1067 }
1068 shared_actor_writer.upsert([(&*info, job_id)]);
1069 }
1070 PostApplyFragmentChanges::RemoveFragment => {
1071 let job_id = self
1072 .fragment_location
1073 .remove(&fragment_id)
1074 .expect("should exist");
1075 let job = self.jobs.get_mut(&job_id).expect("should exist");
1076 let fragment = job
1077 .fragment_infos
1078 .remove(&fragment_id)
1079 .expect("should exist");
1080 shared_actor_writer.remove(&fragment);
1081 if job.fragment_infos.is_empty() {
1082 self.jobs.remove(&job_id).expect("should exist");
1083 }
1084 }
1085 }
1086 }
1087 }
1088 shared_actor_writer.finish();
1089 }
1090}
1091
1092impl InflightFragmentInfo {
1093 pub(crate) fn actor_ids_to_collect(
1095 infos: impl IntoIterator<Item = &Self>,
1096 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1097 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1098 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1099 assert!(
1100 ret.entry(actor.worker_id)
1101 .or_default()
1102 .insert(*actor_id as _)
1103 )
1104 }
1105 ret
1106 }
1107
1108 pub fn existing_table_ids<'a>(
1109 infos: impl IntoIterator<Item = &'a Self> + 'a,
1110 ) -> impl Iterator<Item = TableId> + 'a {
1111 infos
1112 .into_iter()
1113 .flat_map(|info| info.state_table_ids.iter().cloned())
1114 }
1115
1116 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1117 infos.into_iter().any(|fragment| {
1118 fragment
1119 .actors
1120 .values()
1121 .any(|actor| (actor.worker_id) == worker_id)
1122 })
1123 }
1124}
1125
1126impl InflightDatabaseInfo {
1127 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1128 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1129 }
1130
1131 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1132 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1133 }
1134}