1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
33use risingwave_pb::meta::subscribe_response::Operation;
34use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use tracing::{info, warn};
38
39use crate::MetaResult;
40use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
41use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
42use crate::barrier::rpc::ControlStreamManager;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::controller::utils::rebuild_fragment_mapping;
46use crate::manager::NotificationManagerRef;
47use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
48
49#[derive(Debug, Clone)]
50pub struct SharedFragmentInfo {
51 pub fragment_id: FragmentId,
52 pub job_id: JobId,
53 pub distribution_type: DistributionType,
54 pub actors: HashMap<ActorId, InflightActorInfo>,
55 pub vnode_count: usize,
56 pub fragment_type_mask: FragmentTypeMask,
57}
58
59impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
60 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
61 let (info, job_id) = pair;
62
63 let InflightFragmentInfo {
64 fragment_id,
65 distribution_type,
66 fragment_type_mask,
67 actors,
68 vnode_count,
69 ..
70 } = info;
71
72 Self {
73 fragment_id: *fragment_id,
74 job_id,
75 distribution_type: *distribution_type,
76 fragment_type_mask: *fragment_type_mask,
77 actors: actors.clone(),
78 vnode_count: *vnode_count,
79 }
80 }
81}
82
83#[derive(Default, Debug)]
84pub struct SharedActorInfosInner {
85 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
86}
87
88impl SharedActorInfosInner {
89 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
90 self.info
91 .values()
92 .find_map(|database| database.get(&fragment_id))
93 }
94
95 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
96 self.info.values().flatten()
97 }
98}
99
100#[derive(Clone, educe::Educe)]
101#[educe(Debug)]
102pub struct SharedActorInfos {
103 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
104 #[educe(Debug(ignore))]
105 notification_manager: NotificationManagerRef,
106}
107
108impl SharedActorInfos {
109 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
110 self.inner.read()
111 }
112
113 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
114 let core = self.inner.read();
115 core.iter_over_fragments()
116 .flat_map(|(_, fragment)| {
117 fragment
118 .actors
119 .iter()
120 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
121 })
122 .collect()
123 }
124
125 pub fn migrate_splits_for_source_actors(
131 &self,
132 fragment_id: FragmentId,
133 prev_actor_ids: &[ActorId],
134 curr_actor_ids: &[ActorId],
135 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
136 let guard = self.read_guard();
137
138 let prev_splits = prev_actor_ids
139 .iter()
140 .flat_map(|actor_id| {
141 guard
143 .get_fragment(fragment_id)
144 .and_then(|info| info.actors.get(actor_id))
145 .map(|actor| actor.splits.clone())
146 .unwrap_or_default()
147 })
148 .map(|split| (split.id(), split))
149 .collect();
150
151 let empty_actor_splits = curr_actor_ids
152 .iter()
153 .map(|actor_id| (*actor_id, vec![]))
154 .collect();
155
156 let diff = crate::stream::source_manager::reassign_splits(
157 fragment_id,
158 empty_actor_splits,
159 &prev_splits,
160 std::default::Default::default(),
162 )
163 .unwrap_or_default();
164
165 Ok(diff)
166 }
167}
168
169impl SharedActorInfos {
170 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
171 Self {
172 inner: Arc::new(Default::default()),
173 notification_manager,
174 }
175 }
176
177 pub(super) fn remove_database(&self, database_id: DatabaseId) {
178 if let Some(database) = self.inner.write().info.remove(&database_id) {
179 let mapping = database
180 .into_values()
181 .map(|fragment| rebuild_fragment_mapping(&fragment))
182 .collect_vec();
183 if !mapping.is_empty() {
184 self.notification_manager
185 .notify_fragment_mapping(Operation::Delete, mapping);
186 }
187 }
188 }
189
190 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
191 let database_ids: HashSet<_> = database_ids.into_iter().collect();
192
193 let mut mapping = Vec::new();
194 for fragment in self
195 .inner
196 .write()
197 .info
198 .extract_if(|database_id, _| !database_ids.contains(database_id))
199 .flat_map(|(_, fragments)| fragments.into_values())
200 {
201 mapping.push(rebuild_fragment_mapping(&fragment));
202 }
203 if !mapping.is_empty() {
204 self.notification_manager
205 .notify_fragment_mapping(Operation::Delete, mapping);
206 }
207 }
208
209 pub(super) fn recover_database(
210 &self,
211 database_id: DatabaseId,
212 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
213 ) {
214 let mut remaining_fragments: HashMap<_, _> = fragments
215 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
216 .collect();
217 let mut writer = self.start_writer(database_id);
219 let database = writer.write_guard.info.entry(database_id).or_default();
220 for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
221 if let Some(info) = remaining_fragments.remove(fragment_id) {
222 let info = info.into();
223 writer
224 .updated_fragment_mapping
225 .get_or_insert_default()
226 .push(rebuild_fragment_mapping(&info));
227 *fragment_info = info;
228 false
229 } else {
230 true
231 }
232 }) {
233 writer
234 .deleted_fragment_mapping
235 .get_or_insert_default()
236 .push(rebuild_fragment_mapping(&fragment));
237 }
238 for (fragment_id, info) in remaining_fragments {
239 let info = info.into();
240 writer
241 .added_fragment_mapping
242 .get_or_insert_default()
243 .push(rebuild_fragment_mapping(&info));
244 database.insert(fragment_id, info);
245 }
246 writer.finish();
247 }
248
249 pub(super) fn upsert(
250 &self,
251 database_id: DatabaseId,
252 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
253 ) {
254 let mut writer = self.start_writer(database_id);
255 writer.upsert(infos);
256 writer.finish();
257 }
258
259 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
260 SharedActorInfoWriter {
261 database_id,
262 write_guard: self.inner.write(),
263 notification_manager: &self.notification_manager,
264 added_fragment_mapping: None,
265 updated_fragment_mapping: None,
266 deleted_fragment_mapping: None,
267 }
268 }
269}
270
271pub(super) struct SharedActorInfoWriter<'a> {
272 database_id: DatabaseId,
273 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
274 notification_manager: &'a NotificationManagerRef,
275 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
276 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
277 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
278}
279
280impl SharedActorInfoWriter<'_> {
281 pub(super) fn upsert(
282 &mut self,
283 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
284 ) {
285 let database = self.write_guard.info.entry(self.database_id).or_default();
286 for info @ (fragment, _) in infos {
287 match database.entry(fragment.fragment_id) {
288 Entry::Occupied(mut entry) => {
289 let info = info.into();
290 self.updated_fragment_mapping
291 .get_or_insert_default()
292 .push(rebuild_fragment_mapping(&info));
293 entry.insert(info);
294 }
295 Entry::Vacant(entry) => {
296 let info = info.into();
297 self.added_fragment_mapping
298 .get_or_insert_default()
299 .push(rebuild_fragment_mapping(&info));
300 entry.insert(info);
301 }
302 }
303 }
304 }
305
306 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
307 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
308 && let Some(fragment) = database.remove(&info.fragment_id)
309 {
310 self.deleted_fragment_mapping
311 .get_or_insert_default()
312 .push(rebuild_fragment_mapping(&fragment));
313 }
314 }
315
316 pub(super) fn finish(self) {
317 if let Some(mapping) = self.added_fragment_mapping {
318 self.notification_manager
319 .notify_fragment_mapping(Operation::Add, mapping);
320 }
321 if let Some(mapping) = self.updated_fragment_mapping {
322 self.notification_manager
323 .notify_fragment_mapping(Operation::Update, mapping);
324 }
325 if let Some(mapping) = self.deleted_fragment_mapping {
326 self.notification_manager
327 .notify_fragment_mapping(Operation::Delete, mapping);
328 }
329 }
330}
331
332#[derive(Debug, Clone)]
333pub(super) struct BarrierInfo {
334 pub prev_epoch: TracedEpoch,
335 pub curr_epoch: TracedEpoch,
336 pub kind: BarrierKind,
337}
338
339impl BarrierInfo {
340 pub(super) fn prev_epoch(&self) -> u64 {
341 self.prev_epoch.value().0
342 }
343}
344
345#[derive(Debug, Clone)]
346pub(crate) enum CommandFragmentChanges {
347 NewFragment {
348 job_id: JobId,
349 info: InflightFragmentInfo,
350 is_existing: bool,
354 },
355 AddNodeUpstream(PbUpstreamSinkInfo),
356 DropNodeUpstream(Vec<FragmentId>),
357 ReplaceNodeUpstream(
358 HashMap<FragmentId, FragmentId>,
360 ),
361 Reschedule {
362 new_actors: HashMap<ActorId, InflightActorInfo>,
363 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
364 to_remove: HashSet<ActorId>,
365 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
366 },
367 RemoveFragment,
368 SplitAssignment {
369 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
370 },
371}
372
373#[derive(Clone, Debug)]
374pub enum SubscriberType {
375 Subscription(u64),
376 SnapshotBackfill,
377}
378
379#[derive(Debug, Clone)]
380pub(super) enum CreateStreamingJobStatus {
381 Init,
382 Creating(CreateMviewProgressTracker),
383 Created,
384}
385
386#[derive(Debug, Clone)]
387pub(super) struct InflightStreamingJobInfo {
388 pub job_id: JobId,
389 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
390 pub subscribers: HashMap<u32, SubscriberType>,
391 pub status: CreateStreamingJobStatus,
392}
393
394impl InflightStreamingJobInfo {
395 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
396 self.fragment_infos.values()
397 }
398
399 pub fn snapshot_backfill_actor_ids(
400 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
401 ) -> impl Iterator<Item = ActorId> + '_ {
402 fragment_infos
403 .values()
404 .filter(|fragment| {
405 fragment
406 .fragment_type_mask
407 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
408 })
409 .flat_map(|fragment| fragment.actors.keys().copied())
410 }
411
412 pub fn tracking_progress_actor_ids(
413 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
414 ) -> Vec<(ActorId, BackfillUpstreamType)> {
415 StreamJobFragments::tracking_progress_actor_ids_impl(
416 fragment_infos
417 .values()
418 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
419 )
420 }
421}
422
423impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
424 type Item = &'a InflightFragmentInfo;
425
426 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
427
428 fn into_iter(self) -> Self::IntoIter {
429 self.fragment_infos()
430 }
431}
432
433#[derive(Debug, Clone)]
434pub struct InflightDatabaseInfo {
435 database_id: DatabaseId,
436 jobs: HashMap<JobId, InflightStreamingJobInfo>,
437 fragment_location: HashMap<FragmentId, JobId>,
438 pub(super) shared_actor_infos: SharedActorInfos,
439}
440
441impl InflightDatabaseInfo {
442 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
443 self.jobs.values().flat_map(|job| job.fragment_infos())
444 }
445
446 pub fn contains_job(&self, job_id: JobId) -> bool {
447 self.jobs.contains_key(&job_id)
448 }
449
450 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
451 let job_id = self.fragment_location[&fragment_id];
452 self.jobs
453 .get(&job_id)
454 .expect("should exist")
455 .fragment_infos
456 .get(&fragment_id)
457 .expect("should exist")
458 }
459
460 pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
461 self.jobs
462 .iter()
463 .filter_map(|(job_id, job)| match &job.status {
464 CreateStreamingJobStatus::Init => None,
465 CreateStreamingJobStatus::Creating(tracker) => {
466 Some((*job_id, tracker.gen_ddl_progress()))
467 }
468 CreateStreamingJobStatus::Created => None,
469 })
470 }
471
472 pub(super) fn apply_collected_command(
473 &mut self,
474 command: Option<&Command>,
475 resps: impl Iterator<Item = &BarrierCompleteResponse>,
476 version_stats: &HummockVersionStats,
477 ) {
478 if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
479 match job_type {
480 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
481 let job_id = info.streaming_job.id();
482 if let Some(job_info) = self.jobs.get_mut(&job_id) {
483 let CreateStreamingJobStatus::Init = replace(
484 &mut job_info.status,
485 CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
486 info,
487 version_stats,
488 )),
489 ) else {
490 unreachable!("should be init before collect the first barrier")
491 };
492 } else {
493 info!(%job_id, "newly create job get cancelled before first barrier is collected")
494 }
495 }
496 CreateStreamingJobType::SnapshotBackfill(_) => {
497 }
499 }
500 }
501 for progress in resps.flat_map(|resp| &resp.create_mview_progress) {
502 let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
503 warn!(
504 "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
505 );
506 continue;
507 };
508 let CreateStreamingJobStatus::Creating(tracker) =
509 &mut self.jobs.get_mut(job_id).expect("should exist").status
510 else {
511 warn!("update the progress of an created streaming job: {progress:?}");
512 continue;
513 };
514 tracker.apply_progress(progress, version_stats);
515 }
516 }
517
518 fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
519 self.jobs.values().filter_map(|job| match &job.status {
520 CreateStreamingJobStatus::Init => None,
521 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
522 CreateStreamingJobStatus::Created => None,
523 })
524 }
525
526 fn iter_mut_creating_job_tracker(
527 &mut self,
528 ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
529 self.jobs
530 .values_mut()
531 .filter_map(|job| match &mut job.status {
532 CreateStreamingJobStatus::Init => None,
533 CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
534 CreateStreamingJobStatus::Created => None,
535 })
536 }
537
538 pub(super) fn has_pending_finished_jobs(&self) -> bool {
539 self.iter_creating_job_tracker()
540 .any(|tracker| tracker.is_finished())
541 }
542
543 pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
544 self.iter_mut_creating_job_tracker()
545 .flat_map(|tracker| tracker.take_pending_backfill_nodes())
546 .collect()
547 }
548
549 pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
550 let mut finished_jobs = vec![];
551 let mut table_ids_to_truncate = vec![];
552 for job in self.jobs.values_mut() {
553 if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
554 let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
555 table_ids_to_truncate.extend(truncate_table_ids);
556 if is_finished {
557 let CreateStreamingJobStatus::Creating(tracker) =
558 replace(&mut job.status, CreateStreamingJobStatus::Created)
559 else {
560 unreachable!()
561 };
562 finished_jobs.push(tracker.into_tracking_job());
563 }
564 }
565 }
566 StagingCommitInfo {
567 finished_jobs,
568 table_ids_to_truncate,
569 }
570 }
571
572 pub fn fragment_subscribers(&self, fragment_id: FragmentId) -> impl Iterator<Item = u32> + '_ {
573 let job_id = self.fragment_location[&fragment_id];
574 self.jobs[&job_id].subscribers.keys().copied()
575 }
576
577 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = u32> + '_ {
578 self.jobs[&job_id].subscribers.keys().copied()
579 }
580
581 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
582 self.jobs
583 .iter()
584 .filter_map(|(job_id, info)| {
585 info.subscribers
586 .values()
587 .filter_map(|subscriber| match subscriber {
588 SubscriberType::Subscription(retention) => Some(*retention),
589 SubscriberType::SnapshotBackfill => None,
590 })
591 .max()
592 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
593 })
594 .collect()
595 }
596
597 pub fn register_subscriber(
598 &mut self,
599 job_id: JobId,
600 subscriber_id: u32,
601 subscriber: SubscriberType,
602 ) {
603 self.jobs
604 .get_mut(&job_id)
605 .expect("should exist")
606 .subscribers
607 .try_insert(subscriber_id, subscriber)
608 .expect("non duplicate");
609 }
610
611 pub fn unregister_subscriber(
612 &mut self,
613 job_id: JobId,
614 subscriber_id: u32,
615 ) -> Option<SubscriberType> {
616 self.jobs
617 .get_mut(&job_id)
618 .expect("should exist")
619 .subscribers
620 .remove(&subscriber_id)
621 }
622
623 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
624 let job_id = self.fragment_location[&fragment_id];
625 let fragment = self
626 .jobs
627 .get_mut(&job_id)
628 .expect("should exist")
629 .fragment_infos
630 .get_mut(&fragment_id)
631 .expect("should exist");
632 (fragment, job_id)
633 }
634
635 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
636 Self {
637 database_id,
638 jobs: Default::default(),
639 fragment_location: Default::default(),
640 shared_actor_infos,
641 }
642 }
643
644 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
645 shared_actor_infos.remove_database(database_id);
647 Self::empty_inner(database_id, shared_actor_infos)
648 }
649
650 pub fn recover(
651 database_id: DatabaseId,
652 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
653 shared_actor_infos: SharedActorInfos,
654 ) -> Self {
655 let mut info = Self::empty_inner(database_id, shared_actor_infos);
656 for job in jobs {
657 info.add_existing(job);
658 }
659 info
660 }
661
662 pub fn is_empty(&self) -> bool {
663 self.jobs.is_empty()
664 }
665
666 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
667 let InflightStreamingJobInfo {
668 job_id,
669 fragment_infos,
670 subscribers,
671 status,
672 } = job;
673 self.jobs
674 .try_insert(
675 job.job_id,
676 InflightStreamingJobInfo {
677 job_id,
678 subscribers,
679 fragment_infos: Default::default(), status,
681 },
682 )
683 .expect("non-duplicate");
684 self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
685 (
686 fragment_id,
687 CommandFragmentChanges::NewFragment {
688 job_id: job.job_id,
689 info,
690 is_existing: true,
691 },
692 )
693 }))
694 }
695
696 pub(crate) fn pre_apply(
699 &mut self,
700 new_job_id: Option<JobId>,
701 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
702 ) {
703 if let Some(job_id) = new_job_id {
704 self.jobs
705 .try_insert(
706 job_id,
707 InflightStreamingJobInfo {
708 job_id,
709 fragment_infos: Default::default(),
710 subscribers: Default::default(), status: CreateStreamingJobStatus::Init,
712 },
713 )
714 .expect("non-duplicate");
715 }
716 self.apply_add(
717 fragment_changes
718 .iter()
719 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
720 )
721 }
722
723 fn apply_add(
724 &mut self,
725 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
726 ) {
727 {
728 let shared_infos = self.shared_actor_infos.clone();
729 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
730 for (fragment_id, change) in fragment_changes {
731 match change {
732 CommandFragmentChanges::NewFragment {
733 job_id,
734 info,
735 is_existing,
736 } => {
737 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
738 if !is_existing {
739 shared_actor_writer.upsert([(&info, job_id)]);
740 }
741 fragment_infos
742 .fragment_infos
743 .try_insert(fragment_id, info)
744 .expect("non duplicate");
745 self.fragment_location
746 .try_insert(fragment_id, job_id)
747 .expect("non duplicate");
748 }
749 CommandFragmentChanges::Reschedule {
750 new_actors,
751 actor_update_vnode_bitmap,
752 actor_splits,
753 ..
754 } => {
755 let (info, _) = self.fragment_mut(fragment_id);
756 let actors = &mut info.actors;
757 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
758 actors
759 .get_mut(&actor_id)
760 .expect("should exist")
761 .vnode_bitmap = Some(new_vnodes);
762 }
763 for (actor_id, actor) in new_actors {
764 actors
765 .try_insert(actor_id as _, actor)
766 .expect("non-duplicate");
767 }
768 for (actor_id, splits) in actor_splits {
769 actors.get_mut(&actor_id).expect("should exist").splits = splits;
770 }
771
772 }
774 CommandFragmentChanges::RemoveFragment => {}
775 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
776 let mut remaining_fragment_ids: HashSet<_> =
777 replace_map.keys().cloned().collect();
778 let (info, _) = self.fragment_mut(fragment_id);
779 visit_stream_node_mut(&mut info.nodes, |node| {
780 if let NodeBody::Merge(m) = node
781 && let Some(new_upstream_fragment_id) =
782 replace_map.get(&m.upstream_fragment_id)
783 {
784 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
785 if cfg!(debug_assertions) {
786 panic!(
787 "duplicate upstream fragment: {:?} {:?}",
788 m, replace_map
789 );
790 } else {
791 warn!(?m, ?replace_map, "duplicate upstream fragment");
792 }
793 }
794 m.upstream_fragment_id = *new_upstream_fragment_id;
795 }
796 });
797 if cfg!(debug_assertions) {
798 assert!(
799 remaining_fragment_ids.is_empty(),
800 "non-existing fragment to replace: {:?} {:?} {:?}",
801 remaining_fragment_ids,
802 info.nodes,
803 replace_map
804 );
805 } else {
806 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
807 }
808 }
809 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
810 let (info, _) = self.fragment_mut(fragment_id);
811 let mut injected = false;
812 visit_stream_node_mut(&mut info.nodes, |node| {
813 if let NodeBody::UpstreamSinkUnion(u) = node {
814 if cfg!(debug_assertions) {
815 let current_upstream_fragment_ids = u
816 .init_upstreams
817 .iter()
818 .map(|upstream| upstream.upstream_fragment_id)
819 .collect::<HashSet<_>>();
820 if current_upstream_fragment_ids
821 .contains(&new_upstream_info.upstream_fragment_id)
822 {
823 panic!(
824 "duplicate upstream fragment: {:?} {:?}",
825 u, new_upstream_info
826 );
827 }
828 }
829 u.init_upstreams.push(new_upstream_info.clone());
830 injected = true;
831 }
832 });
833 assert!(injected, "should inject upstream into UpstreamSinkUnion");
834 }
835 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
836 let (info, _) = self.fragment_mut(fragment_id);
837 let mut removed = false;
838 visit_stream_node_mut(&mut info.nodes, |node| {
839 if let NodeBody::UpstreamSinkUnion(u) = node {
840 if cfg!(debug_assertions) {
841 let current_upstream_fragment_ids = u
842 .init_upstreams
843 .iter()
844 .map(|upstream| upstream.upstream_fragment_id)
845 .collect::<HashSet<FragmentId>>();
846 for drop_fragment_id in &drop_upstream_fragment_ids {
847 if !current_upstream_fragment_ids.contains(drop_fragment_id)
848 {
849 panic!(
850 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
851 u, drop_upstream_fragment_ids, drop_fragment_id
852 );
853 }
854 }
855 }
856 u.init_upstreams.retain(|upstream| {
857 !drop_upstream_fragment_ids
858 .contains(&upstream.upstream_fragment_id)
859 });
860 removed = true;
861 }
862 });
863 assert!(removed, "should remove upstream from UpstreamSinkUnion");
864 }
865 CommandFragmentChanges::SplitAssignment { actor_splits } => {
866 let (info, job_id) = self.fragment_mut(fragment_id);
867 let actors = &mut info.actors;
868 for (actor_id, splits) in actor_splits {
869 actors.get_mut(&actor_id).expect("should exist").splits = splits;
870 }
871 shared_actor_writer.upsert([(&*info, job_id)]);
872 }
873 }
874 }
875 shared_actor_writer.finish();
876 }
877 }
878
879 pub(super) fn build_edge(
880 &self,
881 command: Option<&Command>,
882 control_stream_manager: &ControlStreamManager,
883 ) -> Option<FragmentEdgeBuildResult> {
884 let (info, replace_job, new_upstream_sink) = match command {
885 None => {
886 return None;
887 }
888 Some(command) => match command {
889 Command::Flush
890 | Command::Pause
891 | Command::Resume
892 | Command::DropStreamingJobs { .. }
893 | Command::MergeSnapshotBackfillStreamingJobs(_)
894 | Command::RescheduleFragment { .. }
895 | Command::SourceChangeSplit { .. }
896 | Command::Throttle(_)
897 | Command::CreateSubscription { .. }
898 | Command::DropSubscription { .. }
899 | Command::ConnectorPropsChange(_)
900 | Command::StartFragmentBackfill { .. }
901 | Command::Refresh { .. }
902 | Command::ListFinish { .. }
903 | Command::LoadFinish { .. } => {
904 return None;
905 }
906 Command::CreateStreamingJob { info, job_type, .. } => {
907 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
908 new_upstream_sink,
909 ) = job_type
910 {
911 Some(new_upstream_sink)
912 } else {
913 None
914 };
915 (Some(info), None, new_upstream_sink)
916 }
917 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
918 },
919 };
920 let existing_fragment_ids = info
928 .into_iter()
929 .flat_map(|info| info.upstream_fragment_downstreams.keys())
930 .chain(replace_job.into_iter().flat_map(|replace_job| {
931 replace_job
932 .upstream_fragment_downstreams
933 .keys()
934 .filter(|fragment_id| {
935 info.map(|info| {
936 !info
937 .stream_job_fragments
938 .fragments
939 .contains_key(fragment_id)
940 })
941 .unwrap_or(true)
942 })
943 .chain(replace_job.replace_upstream.keys())
944 }))
945 .chain(
946 new_upstream_sink
947 .into_iter()
948 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
949 )
950 .cloned();
951 let new_fragment_infos = info
952 .into_iter()
953 .flat_map(|info| {
954 info.stream_job_fragments
955 .new_fragment_info(&info.init_split_assignment)
956 })
957 .chain(replace_job.into_iter().flat_map(|replace_job| {
958 replace_job
959 .new_fragments
960 .new_fragment_info(&replace_job.init_split_assignment)
961 .chain(
962 replace_job
963 .auto_refresh_schema_sinks
964 .as_ref()
965 .into_iter()
966 .flat_map(|sinks| {
967 sinks.iter().map(|sink| {
968 (sink.new_fragment.fragment_id, sink.new_fragment_info())
969 })
970 }),
971 )
972 }))
973 .collect_vec();
974 let mut builder = FragmentEdgeBuilder::new(
975 existing_fragment_ids
976 .map(|fragment_id| self.fragment(fragment_id))
977 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
978 control_stream_manager,
979 );
980 if let Some(info) = info {
981 builder.add_relations(&info.upstream_fragment_downstreams);
982 builder.add_relations(&info.stream_job_fragments.downstreams);
983 }
984 if let Some(replace_job) = replace_job {
985 builder.add_relations(&replace_job.upstream_fragment_downstreams);
986 builder.add_relations(&replace_job.new_fragments.downstreams);
987 }
988 if let Some(new_upstream_sink) = new_upstream_sink {
989 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
990 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
991 builder.add_edge(sink_fragment_id, new_sink_downstream);
992 }
993 if let Some(replace_job) = replace_job {
994 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
995 for (original_upstream_fragment_id, new_upstream_fragment_id) in
996 fragment_replacement
997 {
998 builder.replace_upstream(
999 *fragment_id,
1000 *original_upstream_fragment_id,
1001 *new_upstream_fragment_id,
1002 );
1003 }
1004 }
1005 }
1006 Some(builder.build())
1007 }
1008
1009 pub(crate) fn post_apply(
1012 &mut self,
1013 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
1014 ) {
1015 let inner = self.shared_actor_infos.clone();
1016 let mut shared_actor_writer = inner.start_writer(self.database_id);
1017 {
1018 for (fragment_id, changes) in fragment_changes {
1019 match changes {
1020 CommandFragmentChanges::NewFragment { .. } => {}
1021 CommandFragmentChanges::Reschedule { to_remove, .. } => {
1022 let job_id = self.fragment_location[fragment_id];
1023 let info = self
1024 .jobs
1025 .get_mut(&job_id)
1026 .expect("should exist")
1027 .fragment_infos
1028 .get_mut(fragment_id)
1029 .expect("should exist");
1030 for actor_id in to_remove {
1031 assert!(info.actors.remove(&(*actor_id as _)).is_some());
1032 }
1033 shared_actor_writer.upsert([(&*info, job_id)]);
1034 }
1035 CommandFragmentChanges::RemoveFragment => {
1036 let job_id = self
1037 .fragment_location
1038 .remove(fragment_id)
1039 .expect("should exist");
1040 let job = self.jobs.get_mut(&job_id).expect("should exist");
1041 let fragment = job
1042 .fragment_infos
1043 .remove(fragment_id)
1044 .expect("should exist");
1045 shared_actor_writer.remove(&fragment);
1046 if job.fragment_infos.is_empty() {
1047 self.jobs.remove(&job_id).expect("should exist");
1048 }
1049 }
1050 CommandFragmentChanges::ReplaceNodeUpstream(_)
1051 | CommandFragmentChanges::AddNodeUpstream(_)
1052 | CommandFragmentChanges::DropNodeUpstream(_)
1053 | CommandFragmentChanges::SplitAssignment { .. } => {}
1054 }
1055 }
1056 }
1057 shared_actor_writer.finish();
1058 }
1059}
1060
1061impl InflightFragmentInfo {
1062 pub(crate) fn actor_ids_to_collect(
1064 infos: impl IntoIterator<Item = &Self>,
1065 ) -> HashMap<WorkerId, HashSet<ActorId>> {
1066 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1067 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1068 assert!(
1069 ret.entry(actor.worker_id)
1070 .or_default()
1071 .insert(*actor_id as _)
1072 )
1073 }
1074 ret
1075 }
1076
1077 pub fn existing_table_ids<'a>(
1078 infos: impl IntoIterator<Item = &'a Self> + 'a,
1079 ) -> impl Iterator<Item = TableId> + 'a {
1080 infos
1081 .into_iter()
1082 .flat_map(|info| info.state_table_ids.iter().cloned())
1083 }
1084
1085 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1086 infos.into_iter().any(|fragment| {
1087 fragment
1088 .actors
1089 .values()
1090 .any(|actor| (actor.worker_id) == worker_id)
1091 })
1092 }
1093}
1094
1095impl InflightDatabaseInfo {
1096 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1097 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1098 }
1099
1100 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1101 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1102 }
1103}