1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RawRwLock;
21use parking_lot::lock_api::RwLockReadGuard;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::WorkerId;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
30use risingwave_pb::meta::subscribe_response::Operation;
31use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
32use risingwave_pb::stream_plan::stream_node::NodeBody;
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
37use crate::barrier::rpc::ControlStreamManager;
38use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::utils::rebuild_fragment_mapping;
41use crate::manager::NotificationManagerRef;
42use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
43
44#[derive(Debug, Clone)]
45pub struct SharedFragmentInfo {
46 pub fragment_id: FragmentId,
47 pub job_id: JobId,
48 pub distribution_type: DistributionType,
49 pub actors: HashMap<ActorId, InflightActorInfo>,
50 pub vnode_count: usize,
51 pub fragment_type_mask: FragmentTypeMask,
52}
53
54impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
55 fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
56 let (info, job_id) = pair;
57
58 let InflightFragmentInfo {
59 fragment_id,
60 distribution_type,
61 fragment_type_mask,
62 actors,
63 vnode_count,
64 ..
65 } = info;
66
67 Self {
68 fragment_id: *fragment_id,
69 job_id,
70 distribution_type: *distribution_type,
71 fragment_type_mask: *fragment_type_mask,
72 actors: actors.clone(),
73 vnode_count: *vnode_count,
74 }
75 }
76}
77
78#[derive(Default, Debug)]
79pub struct SharedActorInfosInner {
80 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
81}
82
83impl SharedActorInfosInner {
84 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
85 self.info
86 .values()
87 .find_map(|database| database.get(&fragment_id))
88 }
89
90 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
91 self.info.values().flatten()
92 }
93}
94
95#[derive(Clone, educe::Educe)]
96#[educe(Debug)]
97pub struct SharedActorInfos {
98 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
99 #[educe(Debug(ignore))]
100 notification_manager: NotificationManagerRef,
101}
102
103impl SharedActorInfos {
104 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
105 self.inner.read()
106 }
107
108 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
109 let core = self.inner.read();
110 core.iter_over_fragments()
111 .flat_map(|(_, fragment)| {
112 fragment
113 .actors
114 .iter()
115 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
116 })
117 .collect()
118 }
119
120 pub fn migrate_splits_for_source_actors(
126 &self,
127 fragment_id: FragmentId,
128 prev_actor_ids: &[ActorId],
129 curr_actor_ids: &[ActorId],
130 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
131 let guard = self.read_guard();
132
133 let prev_splits = prev_actor_ids
134 .iter()
135 .flat_map(|actor_id| {
136 guard
138 .get_fragment(fragment_id)
139 .and_then(|info| info.actors.get(actor_id))
140 .map(|actor| actor.splits.clone())
141 .unwrap_or_default()
142 })
143 .map(|split| (split.id(), split))
144 .collect();
145
146 let empty_actor_splits = curr_actor_ids
147 .iter()
148 .map(|actor_id| (*actor_id, vec![]))
149 .collect();
150
151 let diff = crate::stream::source_manager::reassign_splits(
152 fragment_id,
153 empty_actor_splits,
154 &prev_splits,
155 std::default::Default::default(),
157 )
158 .unwrap_or_default();
159
160 Ok(diff)
161 }
162}
163
164impl SharedActorInfos {
165 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
166 Self {
167 inner: Arc::new(Default::default()),
168 notification_manager,
169 }
170 }
171
172 pub(super) fn remove_database(&self, database_id: DatabaseId) {
173 if let Some(database) = self.inner.write().info.remove(&database_id) {
174 let mapping = database
175 .into_values()
176 .map(|fragment| rebuild_fragment_mapping(&fragment))
177 .collect_vec();
178 if !mapping.is_empty() {
179 self.notification_manager
180 .notify_fragment_mapping(Operation::Delete, mapping);
181 }
182 }
183 }
184
185 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
186 let database_ids: HashSet<_> = database_ids.into_iter().collect();
187
188 let mut mapping = Vec::new();
189 for fragment in self
190 .inner
191 .write()
192 .info
193 .extract_if(|database_id, _| !database_ids.contains(database_id))
194 .flat_map(|(_, fragments)| fragments.into_values())
195 {
196 mapping.push(rebuild_fragment_mapping(&fragment));
197 }
198 if !mapping.is_empty() {
199 self.notification_manager
200 .notify_fragment_mapping(Operation::Delete, mapping);
201 }
202 }
203
204 pub(super) fn recover_database(
205 &self,
206 database_id: DatabaseId,
207 fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
208 ) {
209 let mut remaining_fragments: HashMap<_, _> = fragments
210 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
211 .collect();
212 let mut writer = self.start_writer(database_id);
214 let database = writer.write_guard.info.entry(database_id).or_default();
215 for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
216 if let Some(info) = remaining_fragments.remove(fragment_id) {
217 let info = info.into();
218 writer
219 .updated_fragment_mapping
220 .get_or_insert_default()
221 .push(rebuild_fragment_mapping(&info));
222 *fragment_info = info;
223 false
224 } else {
225 true
226 }
227 }) {
228 writer
229 .deleted_fragment_mapping
230 .get_or_insert_default()
231 .push(rebuild_fragment_mapping(&fragment));
232 }
233 for (fragment_id, info) in remaining_fragments {
234 let info = info.into();
235 writer
236 .added_fragment_mapping
237 .get_or_insert_default()
238 .push(rebuild_fragment_mapping(&info));
239 database.insert(fragment_id, info);
240 }
241 writer.finish();
242 }
243
244 pub(super) fn upsert(
245 &self,
246 database_id: DatabaseId,
247 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
248 ) {
249 let mut writer = self.start_writer(database_id);
250 writer.upsert(infos);
251 writer.finish();
252 }
253
254 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
255 SharedActorInfoWriter {
256 database_id,
257 write_guard: self.inner.write(),
258 notification_manager: &self.notification_manager,
259 added_fragment_mapping: None,
260 updated_fragment_mapping: None,
261 deleted_fragment_mapping: None,
262 }
263 }
264}
265
266pub(super) struct SharedActorInfoWriter<'a> {
267 database_id: DatabaseId,
268 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
269 notification_manager: &'a NotificationManagerRef,
270 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
271 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
272 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
273}
274
275impl SharedActorInfoWriter<'_> {
276 pub(super) fn upsert(
277 &mut self,
278 infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
279 ) {
280 let database = self.write_guard.info.entry(self.database_id).or_default();
281 for info @ (fragment, _) in infos {
282 match database.entry(fragment.fragment_id) {
283 Entry::Occupied(mut entry) => {
284 let info = info.into();
285 self.updated_fragment_mapping
286 .get_or_insert_default()
287 .push(rebuild_fragment_mapping(&info));
288 entry.insert(info);
289 }
290 Entry::Vacant(entry) => {
291 let info = info.into();
292 self.added_fragment_mapping
293 .get_or_insert_default()
294 .push(rebuild_fragment_mapping(&info));
295 entry.insert(info);
296 }
297 }
298 }
299 }
300
301 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
302 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
303 && let Some(fragment) = database.remove(&info.fragment_id)
304 {
305 self.deleted_fragment_mapping
306 .get_or_insert_default()
307 .push(rebuild_fragment_mapping(&fragment));
308 }
309 }
310
311 pub(super) fn finish(self) {
312 if let Some(mapping) = self.added_fragment_mapping {
313 self.notification_manager
314 .notify_fragment_mapping(Operation::Add, mapping);
315 }
316 if let Some(mapping) = self.updated_fragment_mapping {
317 self.notification_manager
318 .notify_fragment_mapping(Operation::Update, mapping);
319 }
320 if let Some(mapping) = self.deleted_fragment_mapping {
321 self.notification_manager
322 .notify_fragment_mapping(Operation::Delete, mapping);
323 }
324 }
325}
326
327#[derive(Debug, Clone)]
328pub(super) struct BarrierInfo {
329 pub prev_epoch: TracedEpoch,
330 pub curr_epoch: TracedEpoch,
331 pub kind: BarrierKind,
332}
333
334impl BarrierInfo {
335 pub(super) fn prev_epoch(&self) -> u64 {
336 self.prev_epoch.value().0
337 }
338}
339
340#[derive(Debug, Clone)]
341pub(crate) enum CommandFragmentChanges {
342 NewFragment {
343 job_id: JobId,
344 info: InflightFragmentInfo,
345 is_existing: bool,
349 },
350 AddNodeUpstream(PbUpstreamSinkInfo),
351 DropNodeUpstream(Vec<FragmentId>),
352 ReplaceNodeUpstream(
353 HashMap<FragmentId, FragmentId>,
355 ),
356 Reschedule {
357 new_actors: HashMap<ActorId, InflightActorInfo>,
358 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
359 to_remove: HashSet<ActorId>,
360 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
361 },
362 RemoveFragment,
363 SplitAssignment {
364 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
365 },
366}
367
368#[derive(Clone, Debug)]
369pub enum SubscriberType {
370 Subscription(u64),
371 SnapshotBackfill,
372}
373
374#[derive(Clone, Debug)]
375pub struct InflightStreamingJobInfo {
376 pub job_id: JobId,
377 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
378 pub subscribers: HashMap<u32, SubscriberType>,
379}
380
381impl InflightStreamingJobInfo {
382 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
383 self.fragment_infos.values()
384 }
385
386 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
387 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
388 }
389
390 pub fn snapshot_backfill_actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
391 self.fragment_infos
392 .values()
393 .filter(|fragment| {
394 fragment
395 .fragment_type_mask
396 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
397 })
398 .flat_map(|fragment| fragment.actors.keys().copied())
399 }
400
401 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
402 StreamJobFragments::tracking_progress_actor_ids_impl(
403 self.fragment_infos
404 .values()
405 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
406 )
407 }
408}
409
410impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
411 type Item = &'a InflightFragmentInfo;
412
413 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
414
415 fn into_iter(self) -> Self::IntoIter {
416 self.fragment_infos()
417 }
418}
419
420#[derive(Clone, Debug)]
421pub struct InflightDatabaseInfo {
422 database_id: DatabaseId,
423 jobs: HashMap<JobId, InflightStreamingJobInfo>,
424 fragment_location: HashMap<FragmentId, JobId>,
425 pub(super) shared_actor_infos: SharedActorInfos,
426}
427
428impl InflightDatabaseInfo {
429 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
430 self.jobs.values().flat_map(|job| job.fragment_infos())
431 }
432
433 pub fn contains_job(&self, job_id: JobId) -> bool {
434 self.jobs.contains_key(&job_id)
435 }
436
437 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
438 let job_id = self.fragment_location[&fragment_id];
439 self.jobs
440 .get(&job_id)
441 .expect("should exist")
442 .fragment_infos
443 .get(&fragment_id)
444 .expect("should exist")
445 }
446
447 pub fn fragment_subscribers(&self, fragment_id: FragmentId) -> impl Iterator<Item = u32> + '_ {
448 let job_id = self.fragment_location[&fragment_id];
449 self.jobs[&job_id].subscribers.keys().copied()
450 }
451
452 pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = u32> + '_ {
453 self.jobs[&job_id].subscribers.keys().copied()
454 }
455
456 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
457 self.jobs
458 .iter()
459 .filter_map(|(job_id, info)| {
460 info.subscribers
461 .values()
462 .filter_map(|subscriber| match subscriber {
463 SubscriberType::Subscription(retention) => Some(*retention),
464 SubscriberType::SnapshotBackfill => None,
465 })
466 .max()
467 .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
468 })
469 .collect()
470 }
471
472 pub fn register_subscriber(
473 &mut self,
474 job_id: JobId,
475 subscriber_id: u32,
476 subscriber: SubscriberType,
477 ) {
478 self.jobs
479 .get_mut(&job_id)
480 .expect("should exist")
481 .subscribers
482 .try_insert(subscriber_id, subscriber)
483 .expect("non duplicate");
484 }
485
486 pub fn unregister_subscriber(
487 &mut self,
488 job_id: JobId,
489 subscriber_id: u32,
490 ) -> Option<SubscriberType> {
491 self.jobs
492 .get_mut(&job_id)
493 .expect("should exist")
494 .subscribers
495 .remove(&subscriber_id)
496 }
497
498 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
499 let job_id = self.fragment_location[&fragment_id];
500 let fragment = self
501 .jobs
502 .get_mut(&job_id)
503 .expect("should exist")
504 .fragment_infos
505 .get_mut(&fragment_id)
506 .expect("should exist");
507 (fragment, job_id)
508 }
509
510 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
511 Self {
512 database_id,
513 jobs: Default::default(),
514 fragment_location: Default::default(),
515 shared_actor_infos,
516 }
517 }
518
519 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
520 shared_actor_infos.remove_database(database_id);
522 Self::empty_inner(database_id, shared_actor_infos)
523 }
524
525 pub fn recover(
526 database_id: DatabaseId,
527 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
528 shared_actor_infos: SharedActorInfos,
529 ) -> Self {
530 let mut info = Self::empty_inner(database_id, shared_actor_infos);
531 for job in jobs {
532 info.add_existing(job);
533 }
534 info
535 }
536
537 pub fn is_empty(&self) -> bool {
538 self.jobs.is_empty()
539 }
540
541 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
542 self.jobs
543 .try_insert(
544 job.job_id,
545 InflightStreamingJobInfo {
546 job_id: job.job_id,
547 subscribers: job.subscribers,
548 fragment_infos: Default::default(), },
550 )
551 .expect("non-duplicate");
552 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
553 (
554 fragment_id,
555 CommandFragmentChanges::NewFragment {
556 job_id: job.job_id,
557 info,
558 is_existing: true,
559 },
560 )
561 }))
562 }
563
564 pub(crate) fn pre_apply(
567 &mut self,
568 new_job_id: Option<JobId>,
569 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
570 ) {
571 if let Some(job_id) = new_job_id {
572 self.jobs
573 .try_insert(
574 job_id,
575 InflightStreamingJobInfo {
576 job_id,
577 fragment_infos: Default::default(),
578 subscribers: Default::default(), },
580 )
581 .expect("non-duplicate");
582 }
583 self.apply_add(
584 fragment_changes
585 .iter()
586 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
587 )
588 }
589
590 fn apply_add(
591 &mut self,
592 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
593 ) {
594 {
595 let shared_infos = self.shared_actor_infos.clone();
596 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
597 for (fragment_id, change) in fragment_changes {
598 match change {
599 CommandFragmentChanges::NewFragment {
600 job_id,
601 info,
602 is_existing,
603 } => {
604 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
605 if !is_existing {
606 shared_actor_writer.upsert([(&info, job_id)]);
607 }
608 fragment_infos
609 .fragment_infos
610 .try_insert(fragment_id, info)
611 .expect("non duplicate");
612 self.fragment_location
613 .try_insert(fragment_id, job_id)
614 .expect("non duplicate");
615 }
616 CommandFragmentChanges::Reschedule {
617 new_actors,
618 actor_update_vnode_bitmap,
619 actor_splits,
620 ..
621 } => {
622 let (info, _) = self.fragment_mut(fragment_id);
623 let actors = &mut info.actors;
624 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
625 actors
626 .get_mut(&actor_id)
627 .expect("should exist")
628 .vnode_bitmap = Some(new_vnodes);
629 }
630 for (actor_id, actor) in new_actors {
631 actors
632 .try_insert(actor_id as _, actor)
633 .expect("non-duplicate");
634 }
635 for (actor_id, splits) in actor_splits {
636 actors.get_mut(&actor_id).expect("should exist").splits = splits;
637 }
638
639 }
641 CommandFragmentChanges::RemoveFragment => {}
642 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
643 let mut remaining_fragment_ids: HashSet<_> =
644 replace_map.keys().cloned().collect();
645 let (info, _) = self.fragment_mut(fragment_id);
646 visit_stream_node_mut(&mut info.nodes, |node| {
647 if let NodeBody::Merge(m) = node
648 && let Some(new_upstream_fragment_id) =
649 replace_map.get(&m.upstream_fragment_id)
650 {
651 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
652 if cfg!(debug_assertions) {
653 panic!(
654 "duplicate upstream fragment: {:?} {:?}",
655 m, replace_map
656 );
657 } else {
658 warn!(?m, ?replace_map, "duplicate upstream fragment");
659 }
660 }
661 m.upstream_fragment_id = *new_upstream_fragment_id;
662 }
663 });
664 if cfg!(debug_assertions) {
665 assert!(
666 remaining_fragment_ids.is_empty(),
667 "non-existing fragment to replace: {:?} {:?} {:?}",
668 remaining_fragment_ids,
669 info.nodes,
670 replace_map
671 );
672 } else {
673 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
674 }
675 }
676 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
677 let (info, _) = self.fragment_mut(fragment_id);
678 let mut injected = false;
679 visit_stream_node_mut(&mut info.nodes, |node| {
680 if let NodeBody::UpstreamSinkUnion(u) = node {
681 if cfg!(debug_assertions) {
682 let current_upstream_fragment_ids = u
683 .init_upstreams
684 .iter()
685 .map(|upstream| upstream.upstream_fragment_id)
686 .collect::<HashSet<_>>();
687 if current_upstream_fragment_ids
688 .contains(&new_upstream_info.upstream_fragment_id)
689 {
690 panic!(
691 "duplicate upstream fragment: {:?} {:?}",
692 u, new_upstream_info
693 );
694 }
695 }
696 u.init_upstreams.push(new_upstream_info.clone());
697 injected = true;
698 }
699 });
700 assert!(injected, "should inject upstream into UpstreamSinkUnion");
701 }
702 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
703 let (info, _) = self.fragment_mut(fragment_id);
704 let mut removed = false;
705 visit_stream_node_mut(&mut info.nodes, |node| {
706 if let NodeBody::UpstreamSinkUnion(u) = node {
707 if cfg!(debug_assertions) {
708 let current_upstream_fragment_ids = u
709 .init_upstreams
710 .iter()
711 .map(|upstream| upstream.upstream_fragment_id)
712 .collect::<HashSet<FragmentId>>();
713 for drop_fragment_id in &drop_upstream_fragment_ids {
714 if !current_upstream_fragment_ids.contains(drop_fragment_id)
715 {
716 panic!(
717 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
718 u, drop_upstream_fragment_ids, drop_fragment_id
719 );
720 }
721 }
722 }
723 u.init_upstreams.retain(|upstream| {
724 !drop_upstream_fragment_ids
725 .contains(&upstream.upstream_fragment_id)
726 });
727 removed = true;
728 }
729 });
730 assert!(removed, "should remove upstream from UpstreamSinkUnion");
731 }
732 CommandFragmentChanges::SplitAssignment { actor_splits } => {
733 let (info, job_id) = self.fragment_mut(fragment_id);
734 let actors = &mut info.actors;
735 for (actor_id, splits) in actor_splits {
736 actors.get_mut(&actor_id).expect("should exist").splits = splits;
737 }
738 shared_actor_writer.upsert([(&*info, job_id)]);
739 }
740 }
741 }
742 shared_actor_writer.finish();
743 }
744 }
745
746 pub(super) fn build_edge(
747 &self,
748 command: Option<&Command>,
749 control_stream_manager: &ControlStreamManager,
750 ) -> Option<FragmentEdgeBuildResult> {
751 let (info, replace_job, new_upstream_sink) = match command {
752 None => {
753 return None;
754 }
755 Some(command) => match command {
756 Command::Flush
757 | Command::Pause
758 | Command::Resume
759 | Command::DropStreamingJobs { .. }
760 | Command::MergeSnapshotBackfillStreamingJobs(_)
761 | Command::RescheduleFragment { .. }
762 | Command::SourceChangeSplit { .. }
763 | Command::Throttle(_)
764 | Command::CreateSubscription { .. }
765 | Command::DropSubscription { .. }
766 | Command::ConnectorPropsChange(_)
767 | Command::StartFragmentBackfill { .. }
768 | Command::Refresh { .. }
769 | Command::ListFinish { .. }
770 | Command::LoadFinish { .. } => {
771 return None;
772 }
773 Command::CreateStreamingJob { info, job_type, .. } => {
774 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
775 new_upstream_sink,
776 ) = job_type
777 {
778 Some(new_upstream_sink)
779 } else {
780 None
781 };
782 (Some(info), None, new_upstream_sink)
783 }
784 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
785 },
786 };
787 let existing_fragment_ids = info
795 .into_iter()
796 .flat_map(|info| info.upstream_fragment_downstreams.keys())
797 .chain(replace_job.into_iter().flat_map(|replace_job| {
798 replace_job
799 .upstream_fragment_downstreams
800 .keys()
801 .filter(|fragment_id| {
802 info.map(|info| {
803 !info
804 .stream_job_fragments
805 .fragments
806 .contains_key(fragment_id)
807 })
808 .unwrap_or(true)
809 })
810 .chain(replace_job.replace_upstream.keys())
811 }))
812 .chain(
813 new_upstream_sink
814 .into_iter()
815 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
816 )
817 .cloned();
818 let new_fragment_infos = info
819 .into_iter()
820 .flat_map(|info| {
821 info.stream_job_fragments
822 .new_fragment_info(&info.init_split_assignment)
823 })
824 .chain(replace_job.into_iter().flat_map(|replace_job| {
825 replace_job
826 .new_fragments
827 .new_fragment_info(&replace_job.init_split_assignment)
828 .chain(
829 replace_job
830 .auto_refresh_schema_sinks
831 .as_ref()
832 .into_iter()
833 .flat_map(|sinks| {
834 sinks.iter().map(|sink| {
835 (sink.new_fragment.fragment_id, sink.new_fragment_info())
836 })
837 }),
838 )
839 }))
840 .collect_vec();
841 let mut builder = FragmentEdgeBuilder::new(
842 existing_fragment_ids
843 .map(|fragment_id| self.fragment(fragment_id))
844 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
845 control_stream_manager,
846 );
847 if let Some(info) = info {
848 builder.add_relations(&info.upstream_fragment_downstreams);
849 builder.add_relations(&info.stream_job_fragments.downstreams);
850 }
851 if let Some(replace_job) = replace_job {
852 builder.add_relations(&replace_job.upstream_fragment_downstreams);
853 builder.add_relations(&replace_job.new_fragments.downstreams);
854 }
855 if let Some(new_upstream_sink) = new_upstream_sink {
856 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
857 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
858 builder.add_edge(sink_fragment_id, new_sink_downstream);
859 }
860 if let Some(replace_job) = replace_job {
861 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
862 for (original_upstream_fragment_id, new_upstream_fragment_id) in
863 fragment_replacement
864 {
865 builder.replace_upstream(
866 *fragment_id,
867 *original_upstream_fragment_id,
868 *new_upstream_fragment_id,
869 );
870 }
871 }
872 }
873 Some(builder.build())
874 }
875
876 pub(crate) fn post_apply(
879 &mut self,
880 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
881 ) {
882 let inner = self.shared_actor_infos.clone();
883 let mut shared_actor_writer = inner.start_writer(self.database_id);
884 {
885 for (fragment_id, changes) in fragment_changes {
886 match changes {
887 CommandFragmentChanges::NewFragment { .. } => {}
888 CommandFragmentChanges::Reschedule { to_remove, .. } => {
889 let job_id = self.fragment_location[fragment_id];
890 let info = self
891 .jobs
892 .get_mut(&job_id)
893 .expect("should exist")
894 .fragment_infos
895 .get_mut(fragment_id)
896 .expect("should exist");
897 for actor_id in to_remove {
898 assert!(info.actors.remove(&(*actor_id as _)).is_some());
899 }
900 shared_actor_writer.upsert([(&*info, job_id)]);
901 }
902 CommandFragmentChanges::RemoveFragment => {
903 let job_id = self
904 .fragment_location
905 .remove(fragment_id)
906 .expect("should exist");
907 let job = self.jobs.get_mut(&job_id).expect("should exist");
908 let fragment = job
909 .fragment_infos
910 .remove(fragment_id)
911 .expect("should exist");
912 shared_actor_writer.remove(&fragment);
913 if job.fragment_infos.is_empty() {
914 self.jobs.remove(&job_id).expect("should exist");
915 }
916 }
917 CommandFragmentChanges::ReplaceNodeUpstream(_)
918 | CommandFragmentChanges::AddNodeUpstream(_)
919 | CommandFragmentChanges::DropNodeUpstream(_)
920 | CommandFragmentChanges::SplitAssignment { .. } => {}
921 }
922 }
923 }
924 shared_actor_writer.finish();
925 }
926}
927
928impl InflightFragmentInfo {
929 pub(crate) fn actor_ids_to_collect(
931 infos: impl IntoIterator<Item = &Self>,
932 ) -> HashMap<WorkerId, HashSet<ActorId>> {
933 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
934 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
935 assert!(
936 ret.entry(actor.worker_id)
937 .or_default()
938 .insert(*actor_id as _)
939 )
940 }
941 ret
942 }
943
944 pub fn existing_table_ids<'a>(
945 infos: impl IntoIterator<Item = &'a Self> + 'a,
946 ) -> impl Iterator<Item = TableId> + 'a {
947 infos
948 .into_iter()
949 .flat_map(|info| info.state_table_ids.iter().cloned())
950 }
951
952 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
953 infos.into_iter().any(|fragment| {
954 fragment
955 .actors
956 .values()
957 .any(|actor| (actor.worker_id) == worker_id)
958 })
959 }
960}
961
962impl InflightDatabaseInfo {
963 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
964 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
965 }
966
967 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
968 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
969 }
970}