1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RawRwLock;
21use parking_lot::lock_api::RwLockReadGuard;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
25use risingwave_connector::source::{SplitImpl, SplitMetaData};
26use risingwave_meta_model::fragment::DistributionType;
27use risingwave_meta_model::{ObjectId, WorkerId};
28use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
29use risingwave_pb::meta::subscribe_response::Operation;
30use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
31use risingwave_pb::stream_plan::stream_node::NodeBody;
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
36use crate::barrier::rpc::ControlStreamManager;
37use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
38use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
39use crate::controller::utils::rebuild_fragment_mapping;
40use crate::manager::NotificationManagerRef;
41use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
42
43#[derive(Debug, Clone)]
44pub struct SharedFragmentInfo {
45 pub fragment_id: FragmentId,
46 pub job_id: ObjectId,
47 pub distribution_type: DistributionType,
48 pub actors: HashMap<ActorId, InflightActorInfo>,
49 pub vnode_count: usize,
50 pub fragment_type_mask: FragmentTypeMask,
51}
52
53impl From<(&InflightFragmentInfo, TableId)> for SharedFragmentInfo {
54 fn from(pair: (&InflightFragmentInfo, TableId)) -> Self {
55 let (info, job_id) = pair;
56
57 let InflightFragmentInfo {
58 fragment_id,
59 distribution_type,
60 fragment_type_mask,
61 actors,
62 vnode_count,
63 ..
64 } = info;
65
66 Self {
67 fragment_id: *fragment_id,
68 job_id: job_id.table_id() as _,
69 distribution_type: *distribution_type,
70 fragment_type_mask: *fragment_type_mask,
71 actors: actors.clone(),
72 vnode_count: *vnode_count,
73 }
74 }
75}
76
77#[derive(Default, Debug)]
78pub struct SharedActorInfosInner {
79 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
80}
81
82impl SharedActorInfosInner {
83 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
84 self.info
85 .values()
86 .find_map(|database| database.get(&fragment_id))
87 }
88
89 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
90 self.info.values().flatten()
91 }
92}
93
94#[derive(Clone, educe::Educe)]
95#[educe(Debug)]
96pub struct SharedActorInfos {
97 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
98 #[educe(Debug(ignore))]
99 notification_manager: NotificationManagerRef,
100}
101
102impl SharedActorInfos {
103 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
104 self.inner.read()
105 }
106
107 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
108 let core = self.inner.read();
109 core.iter_over_fragments()
110 .flat_map(|(_, fragment)| {
111 fragment
112 .actors
113 .iter()
114 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
115 })
116 .collect()
117 }
118
119 pub fn migrate_splits_for_source_actors(
125 &self,
126 fragment_id: FragmentId,
127 prev_actor_ids: &[ActorId],
128 curr_actor_ids: &[ActorId],
129 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
130 let guard = self.read_guard();
131
132 let prev_splits = prev_actor_ids
133 .iter()
134 .flat_map(|actor_id| {
135 guard
137 .get_fragment(fragment_id)
138 .and_then(|info| info.actors.get(actor_id))
139 .map(|actor| actor.splits.clone())
140 .unwrap_or_default()
141 })
142 .map(|split| (split.id(), split))
143 .collect();
144
145 let empty_actor_splits = curr_actor_ids
146 .iter()
147 .map(|actor_id| (*actor_id, vec![]))
148 .collect();
149
150 let diff = crate::stream::source_manager::reassign_splits(
151 fragment_id,
152 empty_actor_splits,
153 &prev_splits,
154 std::default::Default::default(),
156 )
157 .unwrap_or_default();
158
159 Ok(diff)
160 }
161}
162
163impl SharedActorInfos {
164 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
165 Self {
166 inner: Arc::new(Default::default()),
167 notification_manager,
168 }
169 }
170
171 pub(super) fn remove_database(&self, database_id: DatabaseId) {
172 if let Some(database) = self.inner.write().info.remove(&database_id) {
173 let mapping = database
174 .into_values()
175 .map(|fragment| rebuild_fragment_mapping(&fragment))
176 .collect_vec();
177 if !mapping.is_empty() {
178 self.notification_manager
179 .notify_fragment_mapping(Operation::Delete, mapping);
180 }
181 }
182 }
183
184 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
185 let database_ids: HashSet<_> = database_ids.into_iter().collect();
186
187 let mut mapping = Vec::new();
188 for fragment in self
189 .inner
190 .write()
191 .info
192 .extract_if(|database_id, _| !database_ids.contains(database_id))
193 .flat_map(|(_, fragments)| fragments.into_values())
194 {
195 mapping.push(rebuild_fragment_mapping(&fragment));
196 }
197 if !mapping.is_empty() {
198 self.notification_manager
199 .notify_fragment_mapping(Operation::Delete, mapping);
200 }
201 }
202
203 pub(super) fn recover_database(
204 &self,
205 database_id: DatabaseId,
206 fragments: impl Iterator<Item = (&InflightFragmentInfo, TableId)>,
207 ) {
208 let mut remaining_fragments: HashMap<_, _> = fragments
209 .map(|info @ (fragment, _)| (fragment.fragment_id, info))
210 .collect();
211 let mut writer = self.start_writer(database_id);
213 let database = writer.write_guard.info.entry(database_id).or_default();
214 for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
215 if let Some(info) = remaining_fragments.remove(fragment_id) {
216 let info = info.into();
217 writer
218 .updated_fragment_mapping
219 .get_or_insert_default()
220 .push(rebuild_fragment_mapping(&info));
221 *fragment_info = info;
222 false
223 } else {
224 true
225 }
226 }) {
227 writer
228 .deleted_fragment_mapping
229 .get_or_insert_default()
230 .push(rebuild_fragment_mapping(&fragment));
231 }
232 for (fragment_id, info) in remaining_fragments {
233 let info = info.into();
234 writer
235 .added_fragment_mapping
236 .get_or_insert_default()
237 .push(rebuild_fragment_mapping(&info));
238 database.insert(fragment_id, info);
239 }
240 writer.finish();
241 }
242
243 pub(super) fn upsert(
244 &self,
245 database_id: DatabaseId,
246 infos: impl IntoIterator<Item = (&InflightFragmentInfo, TableId)>,
247 ) {
248 let mut writer = self.start_writer(database_id);
249 writer.upsert(infos);
250 writer.finish();
251 }
252
253 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
254 SharedActorInfoWriter {
255 database_id,
256 write_guard: self.inner.write(),
257 notification_manager: &self.notification_manager,
258 added_fragment_mapping: None,
259 updated_fragment_mapping: None,
260 deleted_fragment_mapping: None,
261 }
262 }
263}
264
265pub(super) struct SharedActorInfoWriter<'a> {
266 database_id: DatabaseId,
267 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
268 notification_manager: &'a NotificationManagerRef,
269 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
270 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
271 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
272}
273
274impl SharedActorInfoWriter<'_> {
275 pub(super) fn upsert(
276 &mut self,
277 infos: impl IntoIterator<Item = (&InflightFragmentInfo, TableId)>,
278 ) {
279 let database = self.write_guard.info.entry(self.database_id).or_default();
280 for info @ (fragment, _) in infos {
281 match database.entry(fragment.fragment_id) {
282 Entry::Occupied(mut entry) => {
283 let info = info.into();
284 self.updated_fragment_mapping
285 .get_or_insert_default()
286 .push(rebuild_fragment_mapping(&info));
287 entry.insert(info);
288 }
289 Entry::Vacant(entry) => {
290 let info = info.into();
291 self.added_fragment_mapping
292 .get_or_insert_default()
293 .push(rebuild_fragment_mapping(&info));
294 entry.insert(info);
295 }
296 }
297 }
298 }
299
300 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
301 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
302 && let Some(fragment) = database.remove(&info.fragment_id)
303 {
304 self.deleted_fragment_mapping
305 .get_or_insert_default()
306 .push(rebuild_fragment_mapping(&fragment));
307 }
308 }
309
310 pub(super) fn finish(self) {
311 if let Some(mapping) = self.added_fragment_mapping {
312 self.notification_manager
313 .notify_fragment_mapping(Operation::Add, mapping);
314 }
315 if let Some(mapping) = self.updated_fragment_mapping {
316 self.notification_manager
317 .notify_fragment_mapping(Operation::Update, mapping);
318 }
319 if let Some(mapping) = self.deleted_fragment_mapping {
320 self.notification_manager
321 .notify_fragment_mapping(Operation::Delete, mapping);
322 }
323 }
324}
325
326#[derive(Debug, Clone)]
327pub(super) struct BarrierInfo {
328 pub prev_epoch: TracedEpoch,
329 pub curr_epoch: TracedEpoch,
330 pub kind: BarrierKind,
331}
332
333impl BarrierInfo {
334 pub(super) fn prev_epoch(&self) -> u64 {
335 self.prev_epoch.value().0
336 }
337}
338
339#[derive(Debug, Clone)]
340pub(crate) enum CommandFragmentChanges {
341 NewFragment {
342 job_id: TableId,
343 info: InflightFragmentInfo,
344 is_existing: bool,
348 },
349 AddNodeUpstream(PbUpstreamSinkInfo),
350 DropNodeUpstream(Vec<FragmentId>),
351 ReplaceNodeUpstream(
352 HashMap<FragmentId, FragmentId>,
354 ),
355 Reschedule {
356 new_actors: HashMap<ActorId, InflightActorInfo>,
357 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
358 to_remove: HashSet<ActorId>,
359 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
360 },
361 RemoveFragment,
362 SplitAssignment {
363 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
364 },
365}
366
367#[derive(Clone, Debug)]
368pub enum SubscriberType {
369 Subscription(u64),
370 SnapshotBackfill,
371}
372
373#[derive(Clone, Debug)]
374pub struct InflightStreamingJobInfo {
375 pub job_id: TableId,
376 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
377 pub subscribers: HashMap<u32, SubscriberType>,
378}
379
380impl InflightStreamingJobInfo {
381 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
382 self.fragment_infos.values()
383 }
384
385 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
386 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
387 }
388
389 pub fn snapshot_backfill_actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
390 self.fragment_infos
391 .values()
392 .filter(|fragment| {
393 fragment
394 .fragment_type_mask
395 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
396 })
397 .flat_map(|fragment| fragment.actors.keys().copied())
398 }
399
400 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
401 StreamJobFragments::tracking_progress_actor_ids_impl(
402 self.fragment_infos
403 .values()
404 .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
405 )
406 }
407}
408
409impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
410 type Item = &'a InflightFragmentInfo;
411
412 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
413
414 fn into_iter(self) -> Self::IntoIter {
415 self.fragment_infos()
416 }
417}
418
419#[derive(Clone, Debug)]
420pub struct InflightDatabaseInfo {
421 database_id: DatabaseId,
422 jobs: HashMap<TableId, InflightStreamingJobInfo>,
423 fragment_location: HashMap<FragmentId, TableId>,
424 pub(super) shared_actor_infos: SharedActorInfos,
425}
426
427impl InflightDatabaseInfo {
428 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
429 self.jobs.values().flat_map(|job| job.fragment_infos())
430 }
431
432 pub fn contains_job(&self, job_id: TableId) -> bool {
433 self.jobs.contains_key(&job_id)
434 }
435
436 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
437 let job_id = self.fragment_location[&fragment_id];
438 self.jobs
439 .get(&job_id)
440 .expect("should exist")
441 .fragment_infos
442 .get(&fragment_id)
443 .expect("should exist")
444 }
445
446 pub fn fragment_subscribers(&self, fragment_id: FragmentId) -> impl Iterator<Item = u32> + '_ {
447 let job_id = self.fragment_location[&fragment_id];
448 self.jobs[&job_id].subscribers.keys().copied()
449 }
450
451 pub fn job_subscribers(&self, job_id: TableId) -> impl Iterator<Item = u32> + '_ {
452 self.jobs[&job_id].subscribers.keys().copied()
453 }
454
455 pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
456 self.jobs
457 .iter()
458 .filter_map(|(job_id, info)| {
459 info.subscribers
460 .values()
461 .filter_map(|subscriber| match subscriber {
462 SubscriberType::Subscription(retention) => Some(*retention),
463 SubscriberType::SnapshotBackfill => None,
464 })
465 .max()
466 .map(|max_subscription| (*job_id, max_subscription))
467 })
468 .collect()
469 }
470
471 pub fn register_subscriber(
472 &mut self,
473 job_id: TableId,
474 subscriber_id: u32,
475 subscriber: SubscriberType,
476 ) {
477 self.jobs
478 .get_mut(&job_id)
479 .expect("should exist")
480 .subscribers
481 .try_insert(subscriber_id, subscriber)
482 .expect("non duplicate");
483 }
484
485 pub fn unregister_subscriber(
486 &mut self,
487 job_id: TableId,
488 subscriber_id: u32,
489 ) -> Option<SubscriberType> {
490 self.jobs
491 .get_mut(&job_id)
492 .expect("should exist")
493 .subscribers
494 .remove(&subscriber_id)
495 }
496
497 fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, TableId) {
498 let job_id = self.fragment_location[&fragment_id];
499 let fragment = self
500 .jobs
501 .get_mut(&job_id)
502 .expect("should exist")
503 .fragment_infos
504 .get_mut(&fragment_id)
505 .expect("should exist");
506 (fragment, job_id)
507 }
508
509 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
510 Self {
511 database_id,
512 jobs: Default::default(),
513 fragment_location: Default::default(),
514 shared_actor_infos,
515 }
516 }
517
518 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
519 shared_actor_infos.remove_database(database_id);
521 Self::empty_inner(database_id, shared_actor_infos)
522 }
523
524 pub fn recover(
525 database_id: DatabaseId,
526 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
527 shared_actor_infos: SharedActorInfos,
528 ) -> Self {
529 let mut info = Self::empty_inner(database_id, shared_actor_infos);
530 for job in jobs {
531 info.add_existing(job);
532 }
533 info
534 }
535
536 pub fn is_empty(&self) -> bool {
537 self.jobs.is_empty()
538 }
539
540 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
541 self.jobs
542 .try_insert(
543 job.job_id,
544 InflightStreamingJobInfo {
545 job_id: job.job_id,
546 subscribers: job.subscribers,
547 fragment_infos: Default::default(), },
549 )
550 .expect("non-duplicate");
551 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
552 (
553 fragment_id,
554 CommandFragmentChanges::NewFragment {
555 job_id: job.job_id,
556 info,
557 is_existing: true,
558 },
559 )
560 }))
561 }
562
563 pub(crate) fn pre_apply(
566 &mut self,
567 new_job_id: Option<TableId>,
568 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
569 ) {
570 if let Some(job_id) = new_job_id {
571 self.jobs
572 .try_insert(
573 job_id,
574 InflightStreamingJobInfo {
575 job_id,
576 fragment_infos: Default::default(),
577 subscribers: Default::default(), },
579 )
580 .expect("non-duplicate");
581 }
582 self.apply_add(
583 fragment_changes
584 .iter()
585 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
586 )
587 }
588
589 fn apply_add(
590 &mut self,
591 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
592 ) {
593 {
594 let shared_infos = self.shared_actor_infos.clone();
595 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
596 for (fragment_id, change) in fragment_changes {
597 match change {
598 CommandFragmentChanges::NewFragment {
599 job_id,
600 info,
601 is_existing,
602 } => {
603 let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
604 if !is_existing {
605 shared_actor_writer.upsert([(&info, job_id)]);
606 }
607 fragment_infos
608 .fragment_infos
609 .try_insert(fragment_id, info)
610 .expect("non duplicate");
611 self.fragment_location
612 .try_insert(fragment_id, job_id)
613 .expect("non duplicate");
614 }
615 CommandFragmentChanges::Reschedule {
616 new_actors,
617 actor_update_vnode_bitmap,
618 actor_splits,
619 ..
620 } => {
621 let (info, _) = self.fragment_mut(fragment_id);
622 let actors = &mut info.actors;
623 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
624 actors
625 .get_mut(&actor_id)
626 .expect("should exist")
627 .vnode_bitmap = Some(new_vnodes);
628 }
629 for (actor_id, actor) in new_actors {
630 actors
631 .try_insert(actor_id as _, actor)
632 .expect("non-duplicate");
633 }
634 for (actor_id, splits) in actor_splits {
635 actors.get_mut(&actor_id).expect("should exist").splits = splits;
636 }
637
638 }
640 CommandFragmentChanges::RemoveFragment => {}
641 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
642 let mut remaining_fragment_ids: HashSet<_> =
643 replace_map.keys().cloned().collect();
644 let (info, _) = self.fragment_mut(fragment_id);
645 visit_stream_node_mut(&mut info.nodes, |node| {
646 if let NodeBody::Merge(m) = node
647 && let Some(new_upstream_fragment_id) =
648 replace_map.get(&m.upstream_fragment_id)
649 {
650 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
651 if cfg!(debug_assertions) {
652 panic!(
653 "duplicate upstream fragment: {:?} {:?}",
654 m, replace_map
655 );
656 } else {
657 warn!(?m, ?replace_map, "duplicate upstream fragment");
658 }
659 }
660 m.upstream_fragment_id = *new_upstream_fragment_id;
661 }
662 });
663 if cfg!(debug_assertions) {
664 assert!(
665 remaining_fragment_ids.is_empty(),
666 "non-existing fragment to replace: {:?} {:?} {:?}",
667 remaining_fragment_ids,
668 info.nodes,
669 replace_map
670 );
671 } else {
672 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
673 }
674 }
675 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
676 let (info, _) = self.fragment_mut(fragment_id);
677 let mut injected = false;
678 visit_stream_node_mut(&mut info.nodes, |node| {
679 if let NodeBody::UpstreamSinkUnion(u) = node {
680 if cfg!(debug_assertions) {
681 let current_upstream_fragment_ids = u
682 .init_upstreams
683 .iter()
684 .map(|upstream| upstream.upstream_fragment_id)
685 .collect::<HashSet<_>>();
686 if current_upstream_fragment_ids
687 .contains(&new_upstream_info.upstream_fragment_id)
688 {
689 panic!(
690 "duplicate upstream fragment: {:?} {:?}",
691 u, new_upstream_info
692 );
693 }
694 }
695 u.init_upstreams.push(new_upstream_info.clone());
696 injected = true;
697 }
698 });
699 assert!(injected, "should inject upstream into UpstreamSinkUnion");
700 }
701 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
702 let (info, _) = self.fragment_mut(fragment_id);
703 let mut removed = false;
704 visit_stream_node_mut(&mut info.nodes, |node| {
705 if let NodeBody::UpstreamSinkUnion(u) = node {
706 if cfg!(debug_assertions) {
707 let current_upstream_fragment_ids = u
708 .init_upstreams
709 .iter()
710 .map(|upstream| upstream.upstream_fragment_id)
711 .collect::<HashSet<_>>();
712 for drop_fragment_id in &drop_upstream_fragment_ids {
713 if !current_upstream_fragment_ids.contains(drop_fragment_id)
714 {
715 panic!(
716 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
717 u, drop_upstream_fragment_ids, drop_fragment_id
718 );
719 }
720 }
721 }
722 u.init_upstreams.retain(|upstream| {
723 !drop_upstream_fragment_ids
724 .contains(&upstream.upstream_fragment_id)
725 });
726 removed = true;
727 }
728 });
729 assert!(removed, "should remove upstream from UpstreamSinkUnion");
730 }
731 CommandFragmentChanges::SplitAssignment { actor_splits } => {
732 let (info, job_id) = self.fragment_mut(fragment_id);
733 let actors = &mut info.actors;
734 for (actor_id, splits) in actor_splits {
735 actors.get_mut(&actor_id).expect("should exist").splits = splits;
736 }
737 shared_actor_writer.upsert([(&*info, job_id)]);
738 }
739 }
740 }
741 shared_actor_writer.finish();
742 }
743 }
744
745 pub(super) fn build_edge(
746 &self,
747 command: Option<&Command>,
748 control_stream_manager: &ControlStreamManager,
749 ) -> Option<FragmentEdgeBuildResult> {
750 let (info, replace_job, new_upstream_sink) = match command {
751 None => {
752 return None;
753 }
754 Some(command) => match command {
755 Command::Flush
756 | Command::Pause
757 | Command::Resume
758 | Command::DropStreamingJobs { .. }
759 | Command::MergeSnapshotBackfillStreamingJobs(_)
760 | Command::RescheduleFragment { .. }
761 | Command::SourceChangeSplit { .. }
762 | Command::Throttle(_)
763 | Command::CreateSubscription { .. }
764 | Command::DropSubscription { .. }
765 | Command::ConnectorPropsChange(_)
766 | Command::StartFragmentBackfill { .. }
767 | Command::Refresh { .. }
768 | Command::ListFinish { .. }
769 | Command::LoadFinish { .. } => {
770 return None;
771 }
772 Command::CreateStreamingJob { info, job_type, .. } => {
773 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
774 new_upstream_sink,
775 ) = job_type
776 {
777 Some(new_upstream_sink)
778 } else {
779 None
780 };
781 (Some(info), None, new_upstream_sink)
782 }
783 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
784 },
785 };
786 let existing_fragment_ids = info
794 .into_iter()
795 .flat_map(|info| info.upstream_fragment_downstreams.keys())
796 .chain(replace_job.into_iter().flat_map(|replace_job| {
797 replace_job
798 .upstream_fragment_downstreams
799 .keys()
800 .filter(|fragment_id| {
801 info.map(|info| {
802 !info
803 .stream_job_fragments
804 .fragments
805 .contains_key(fragment_id)
806 })
807 .unwrap_or(true)
808 })
809 .chain(replace_job.replace_upstream.keys())
810 }))
811 .chain(
812 new_upstream_sink
813 .into_iter()
814 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
815 )
816 .cloned();
817 let new_fragment_infos = info
818 .into_iter()
819 .flat_map(|info| {
820 info.stream_job_fragments
821 .new_fragment_info(&info.init_split_assignment)
822 })
823 .chain(replace_job.into_iter().flat_map(|replace_job| {
824 replace_job
825 .new_fragments
826 .new_fragment_info(&replace_job.init_split_assignment)
827 .chain(
828 replace_job
829 .auto_refresh_schema_sinks
830 .as_ref()
831 .into_iter()
832 .flat_map(|sinks| {
833 sinks.iter().map(|sink| {
834 (sink.new_fragment.fragment_id, sink.new_fragment_info())
835 })
836 }),
837 )
838 }))
839 .collect_vec();
840 let mut builder = FragmentEdgeBuilder::new(
841 existing_fragment_ids
842 .map(|fragment_id| self.fragment(fragment_id))
843 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
844 control_stream_manager,
845 );
846 if let Some(info) = info {
847 builder.add_relations(&info.upstream_fragment_downstreams);
848 builder.add_relations(&info.stream_job_fragments.downstreams);
849 }
850 if let Some(replace_job) = replace_job {
851 builder.add_relations(&replace_job.upstream_fragment_downstreams);
852 builder.add_relations(&replace_job.new_fragments.downstreams);
853 }
854 if let Some(new_upstream_sink) = new_upstream_sink {
855 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
856 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
857 builder.add_edge(sink_fragment_id, new_sink_downstream);
858 }
859 if let Some(replace_job) = replace_job {
860 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
861 for (original_upstream_fragment_id, new_upstream_fragment_id) in
862 fragment_replacement
863 {
864 builder.replace_upstream(
865 *fragment_id,
866 *original_upstream_fragment_id,
867 *new_upstream_fragment_id,
868 );
869 }
870 }
871 }
872 Some(builder.build())
873 }
874
875 pub(crate) fn post_apply(
878 &mut self,
879 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
880 ) {
881 let inner = self.shared_actor_infos.clone();
882 let mut shared_actor_writer = inner.start_writer(self.database_id);
883 {
884 for (fragment_id, changes) in fragment_changes {
885 match changes {
886 CommandFragmentChanges::NewFragment { .. } => {}
887 CommandFragmentChanges::Reschedule { to_remove, .. } => {
888 let job_id = self.fragment_location[fragment_id];
889 let info = self
890 .jobs
891 .get_mut(&job_id)
892 .expect("should exist")
893 .fragment_infos
894 .get_mut(fragment_id)
895 .expect("should exist");
896 for actor_id in to_remove {
897 assert!(info.actors.remove(&(*actor_id as _)).is_some());
898 }
899 shared_actor_writer.upsert([(&*info, job_id)]);
900 }
901 CommandFragmentChanges::RemoveFragment => {
902 let job_id = self
903 .fragment_location
904 .remove(fragment_id)
905 .expect("should exist");
906 let job = self.jobs.get_mut(&job_id).expect("should exist");
907 let fragment = job
908 .fragment_infos
909 .remove(fragment_id)
910 .expect("should exist");
911 shared_actor_writer.remove(&fragment);
912 if job.fragment_infos.is_empty() {
913 self.jobs.remove(&job_id).expect("should exist");
914 }
915 }
916 CommandFragmentChanges::ReplaceNodeUpstream(_)
917 | CommandFragmentChanges::AddNodeUpstream(_)
918 | CommandFragmentChanges::DropNodeUpstream(_)
919 | CommandFragmentChanges::SplitAssignment { .. } => {}
920 }
921 }
922 }
923 shared_actor_writer.finish();
924 }
925}
926
927impl InflightFragmentInfo {
928 pub(crate) fn actor_ids_to_collect(
930 infos: impl IntoIterator<Item = &Self>,
931 ) -> HashMap<WorkerId, HashSet<ActorId>> {
932 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
933 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
934 assert!(
935 ret.entry(actor.worker_id)
936 .or_default()
937 .insert(*actor_id as _)
938 )
939 }
940 ret
941 }
942
943 pub fn existing_table_ids<'a>(
944 infos: impl IntoIterator<Item = &'a Self> + 'a,
945 ) -> impl Iterator<Item = TableId> + 'a {
946 infos
947 .into_iter()
948 .flat_map(|info| info.state_table_ids.iter().cloned())
949 }
950
951 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
952 infos.into_iter().any(|fragment| {
953 fragment
954 .actors
955 .values()
956 .any(|actor| (actor.worker_id) == worker_id)
957 })
958 }
959
960 pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
961 infos
962 .into_iter()
963 .flat_map(|info| info.actors.values())
964 .map(|actor| actor.worker_id)
965 .collect()
966 }
967}
968
969impl InflightDatabaseInfo {
970 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
971 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
972 }
973
974 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
975 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
976 }
977}