1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RawRwLock;
21use parking_lot::lock_api::RwLockReadGuard;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
25use risingwave_connector::source::{SplitImpl, SplitMetaData};
26use risingwave_meta_model::WorkerId;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
29use risingwave_pb::meta::subscribe_response::Operation;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{PbSubscriptionUpstreamInfo, PbUpstreamSinkInfo};
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
36use crate::barrier::rpc::ControlStreamManager;
37use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
38use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
39use crate::controller::utils::rebuild_fragment_mapping;
40use crate::manager::NotificationManagerRef;
41use crate::model::{ActorId, FragmentId, SubscriptionId};
42
43#[derive(Debug, Clone)]
44pub struct SharedFragmentInfo {
45 pub fragment_id: FragmentId,
46 pub distribution_type: DistributionType,
47 pub actors: HashMap<ActorId, InflightActorInfo>,
48}
49
50impl From<&InflightFragmentInfo> for SharedFragmentInfo {
51 fn from(info: &InflightFragmentInfo) -> Self {
52 let InflightFragmentInfo {
53 fragment_id,
54 distribution_type,
55 actors,
56 ..
57 } = info;
58
59 Self {
60 fragment_id: *fragment_id,
61 distribution_type: *distribution_type,
62 actors: actors.clone(),
63 }
64 }
65}
66
67#[derive(Default, Debug)]
68pub struct SharedActorInfosInner {
69 info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
70}
71
72impl SharedActorInfosInner {
73 pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
74 self.info
75 .values()
76 .find_map(|database| database.get(&fragment_id))
77 }
78
79 pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
80 self.info.values().flatten()
81 }
82}
83
84#[derive(Clone, educe::Educe)]
85#[educe(Debug)]
86pub struct SharedActorInfos {
87 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
88 #[educe(Debug(ignore))]
89 notification_manager: NotificationManagerRef,
90}
91
92impl SharedActorInfos {
93 pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
94 self.inner.read()
95 }
96
97 pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
98 let core = self.inner.read();
99 core.iter_over_fragments()
100 .flat_map(|(_, fragment)| {
101 fragment
102 .actors
103 .iter()
104 .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
105 })
106 .collect()
107 }
108
109 pub fn migrate_splits_for_source_actors(
115 &self,
116 fragment_id: FragmentId,
117 prev_actor_ids: &[ActorId],
118 curr_actor_ids: &[ActorId],
119 ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
120 let guard = self.read_guard();
121
122 let prev_splits = prev_actor_ids
123 .iter()
124 .flat_map(|actor_id| {
125 guard
127 .get_fragment(fragment_id)
128 .and_then(|info| info.actors.get(actor_id))
129 .map(|actor| actor.splits.clone())
130 .unwrap_or_default()
131 })
132 .map(|split| (split.id(), split))
133 .collect();
134
135 let empty_actor_splits = curr_actor_ids
136 .iter()
137 .map(|actor_id| (*actor_id, vec![]))
138 .collect();
139
140 let diff = crate::stream::source_manager::reassign_splits(
141 fragment_id,
142 empty_actor_splits,
143 &prev_splits,
144 std::default::Default::default(),
146 )
147 .unwrap_or_default();
148
149 Ok(diff)
150 }
151}
152
153impl SharedActorInfos {
154 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
155 Self {
156 inner: Arc::new(Default::default()),
157 notification_manager,
158 }
159 }
160
161 pub(super) fn remove_database(&self, database_id: DatabaseId) {
162 if let Some(database) = self.inner.write().info.remove(&database_id) {
163 let mapping = database
164 .into_values()
165 .map(|fragment| rebuild_fragment_mapping(&fragment))
166 .collect_vec();
167 if !mapping.is_empty() {
168 self.notification_manager
169 .notify_fragment_mapping(Operation::Delete, mapping);
170 }
171 }
172 }
173
174 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
175 let database_ids: HashSet<_> = database_ids.into_iter().collect();
176
177 let mut mapping = Vec::new();
178 for fragment in self
179 .inner
180 .write()
181 .info
182 .extract_if(|database_id, _| !database_ids.contains(database_id))
183 .flat_map(|(_, fragments)| fragments.into_values())
184 {
185 mapping.push(rebuild_fragment_mapping(&fragment));
186 }
187 if !mapping.is_empty() {
188 self.notification_manager
189 .notify_fragment_mapping(Operation::Delete, mapping);
190 }
191 }
192
193 pub(super) fn recover_database(
194 &self,
195 database_id: DatabaseId,
196 fragments: impl Iterator<Item = &InflightFragmentInfo>,
197 ) {
198 let mut remaining_fragments: HashMap<_, _> = fragments
199 .map(|fragment| (fragment.fragment_id, fragment))
200 .collect();
201 let mut writer = self.start_writer(database_id);
203 let database = writer.write_guard.info.entry(database_id).or_default();
204 for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
205 if let Some(info) = remaining_fragments.remove(fragment_id) {
206 let info = info.into();
207 writer
208 .updated_fragment_mapping
209 .get_or_insert_default()
210 .push(rebuild_fragment_mapping(&info));
211 *fragment_info = info;
212 false
213 } else {
214 true
215 }
216 }) {
217 writer
218 .deleted_fragment_mapping
219 .get_or_insert_default()
220 .push(rebuild_fragment_mapping(&fragment));
221 }
222 for (fragment_id, fragment) in remaining_fragments {
223 let info = fragment.into();
224 writer
225 .added_fragment_mapping
226 .get_or_insert_default()
227 .push(rebuild_fragment_mapping(&info));
228 database.insert(fragment_id, info);
229 }
230 writer.finish();
231 }
232
233 pub(super) fn upsert(
234 &self,
235 database_id: DatabaseId,
236 infos: impl IntoIterator<Item = &InflightFragmentInfo>,
237 ) {
238 let mut writer = self.start_writer(database_id);
239 writer.upsert(infos);
240 writer.finish();
241 }
242
243 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
244 SharedActorInfoWriter {
245 database_id,
246 write_guard: self.inner.write(),
247 notification_manager: &self.notification_manager,
248 added_fragment_mapping: None,
249 updated_fragment_mapping: None,
250 deleted_fragment_mapping: None,
251 }
252 }
253}
254
255pub(super) struct SharedActorInfoWriter<'a> {
256 database_id: DatabaseId,
257 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
258 notification_manager: &'a NotificationManagerRef,
259 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
260 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
261 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
262}
263
264impl SharedActorInfoWriter<'_> {
265 pub(super) fn upsert(&mut self, infos: impl IntoIterator<Item = &InflightFragmentInfo>) {
266 let database = self.write_guard.info.entry(self.database_id).or_default();
267 for info in infos {
268 match database.entry(info.fragment_id) {
269 Entry::Occupied(mut entry) => {
270 let info = info.into();
271 self.updated_fragment_mapping
272 .get_or_insert_default()
273 .push(rebuild_fragment_mapping(&info));
274 entry.insert(info);
275 }
276 Entry::Vacant(entry) => {
277 let info = info.into();
278 self.added_fragment_mapping
279 .get_or_insert_default()
280 .push(rebuild_fragment_mapping(&info));
281 entry.insert(info);
282 }
283 }
284 }
285 }
286
287 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
288 if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
289 && let Some(fragment) = database.remove(&info.fragment_id)
290 {
291 self.deleted_fragment_mapping
292 .get_or_insert_default()
293 .push(rebuild_fragment_mapping(&fragment));
294 }
295 }
296
297 pub(super) fn finish(self) {
298 if let Some(mapping) = self.added_fragment_mapping {
299 self.notification_manager
300 .notify_fragment_mapping(Operation::Add, mapping);
301 }
302 if let Some(mapping) = self.updated_fragment_mapping {
303 self.notification_manager
304 .notify_fragment_mapping(Operation::Update, mapping);
305 }
306 if let Some(mapping) = self.deleted_fragment_mapping {
307 self.notification_manager
308 .notify_fragment_mapping(Operation::Delete, mapping);
309 }
310 }
311}
312
313#[derive(Debug, Clone)]
314pub(super) struct BarrierInfo {
315 pub prev_epoch: TracedEpoch,
316 pub curr_epoch: TracedEpoch,
317 pub kind: BarrierKind,
318}
319
320impl BarrierInfo {
321 pub(super) fn prev_epoch(&self) -> u64 {
322 self.prev_epoch.value().0
323 }
324}
325
326#[derive(Debug, Clone)]
327pub(crate) enum CommandFragmentChanges {
328 NewFragment {
329 job_id: TableId,
330 info: InflightFragmentInfo,
331 is_existing: bool,
335 },
336 AddNodeUpstream(PbUpstreamSinkInfo),
337 DropNodeUpstream(Vec<FragmentId>),
338 ReplaceNodeUpstream(
339 HashMap<FragmentId, FragmentId>,
341 ),
342 Reschedule {
343 new_actors: HashMap<ActorId, InflightActorInfo>,
344 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
345 to_remove: HashSet<ActorId>,
346 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
347 },
348 RemoveFragment,
349 SplitAssignment {
350 actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
351 },
352}
353
354#[derive(Default, Clone, Debug)]
355pub struct InflightSubscriptionInfo {
356 pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
358}
359
360#[derive(Clone, Debug)]
361pub struct InflightStreamingJobInfo {
362 pub job_id: TableId,
363 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
364}
365
366impl InflightStreamingJobInfo {
367 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
368 self.fragment_infos.values()
369 }
370
371 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
372 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
373 }
374}
375
376impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
377 type Item = &'a InflightFragmentInfo;
378
379 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
380
381 fn into_iter(self) -> Self::IntoIter {
382 self.fragment_infos()
383 }
384}
385
386#[derive(Clone, Debug)]
387pub struct InflightDatabaseInfo {
388 database_id: DatabaseId,
389 jobs: HashMap<TableId, InflightStreamingJobInfo>,
390 fragment_location: HashMap<FragmentId, TableId>,
391 pub(super) shared_actor_infos: SharedActorInfos,
392}
393
394impl InflightDatabaseInfo {
395 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
396 self.jobs.values().flat_map(|job| job.fragment_infos())
397 }
398
399 pub fn contains_job(&self, job_id: TableId) -> bool {
400 self.jobs.contains_key(&job_id)
401 }
402
403 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
404 let job_id = self.fragment_location[&fragment_id];
405 self.jobs
406 .get(&job_id)
407 .expect("should exist")
408 .fragment_infos
409 .get(&fragment_id)
410 .expect("should exist")
411 }
412
413 fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
414 let job_id = self.fragment_location[&fragment_id];
415 self.jobs
416 .get_mut(&job_id)
417 .expect("should exist")
418 .fragment_infos
419 .get_mut(&fragment_id)
420 .expect("should exist")
421 }
422
423 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
424 Self {
425 database_id,
426 jobs: Default::default(),
427 fragment_location: Default::default(),
428 shared_actor_infos,
429 }
430 }
431
432 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
433 shared_actor_infos.remove_database(database_id);
435 Self::empty_inner(database_id, shared_actor_infos)
436 }
437
438 pub fn recover(
439 database_id: DatabaseId,
440 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
441 shared_actor_infos: SharedActorInfos,
442 ) -> Self {
443 let mut info = Self::empty_inner(database_id, shared_actor_infos);
444 for job in jobs {
445 info.add_existing(job);
446 }
447 info
448 }
449
450 pub fn is_empty(&self) -> bool {
451 self.jobs.is_empty()
452 }
453
454 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
455 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
456 (
457 fragment_id,
458 CommandFragmentChanges::NewFragment {
459 job_id: job.job_id,
460 info,
461 is_existing: true,
462 },
463 )
464 }))
465 }
466
467 pub(crate) fn pre_apply(
470 &mut self,
471 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
472 ) {
473 self.apply_add(
474 fragment_changes
475 .iter()
476 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
477 )
478 }
479
480 fn apply_add(
481 &mut self,
482 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
483 ) {
484 {
485 let shared_infos = self.shared_actor_infos.clone();
486 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
487 for (fragment_id, change) in fragment_changes {
488 match change {
489 CommandFragmentChanges::NewFragment {
490 job_id,
491 info,
492 is_existing,
493 } => {
494 let fragment_infos =
495 self.jobs
496 .entry(job_id)
497 .or_insert_with(|| InflightStreamingJobInfo {
498 job_id,
499 fragment_infos: Default::default(),
500 });
501 if !is_existing {
502 shared_actor_writer.upsert([&info]);
503 }
504 fragment_infos
505 .fragment_infos
506 .try_insert(fragment_id, info)
507 .expect("non duplicate");
508 self.fragment_location
509 .try_insert(fragment_id, job_id)
510 .expect("non duplicate");
511 }
512 CommandFragmentChanges::Reschedule {
513 new_actors,
514 actor_update_vnode_bitmap,
515 actor_splits,
516 ..
517 } => {
518 let info = self.fragment_mut(fragment_id);
519 let actors = &mut info.actors;
520 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
521 actors
522 .get_mut(&actor_id)
523 .expect("should exist")
524 .vnode_bitmap = Some(new_vnodes);
525 }
526 for (actor_id, actor) in new_actors {
527 actors
528 .try_insert(actor_id as _, actor)
529 .expect("non-duplicate");
530 }
531 for (actor_id, splits) in actor_splits {
532 actors.get_mut(&actor_id).expect("should exist").splits = splits;
533 }
534
535 }
537 CommandFragmentChanges::RemoveFragment => {}
538 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
539 let mut remaining_fragment_ids: HashSet<_> =
540 replace_map.keys().cloned().collect();
541 let info = self.fragment_mut(fragment_id);
542 visit_stream_node_mut(&mut info.nodes, |node| {
543 if let NodeBody::Merge(m) = node
544 && let Some(new_upstream_fragment_id) =
545 replace_map.get(&m.upstream_fragment_id)
546 {
547 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
548 if cfg!(debug_assertions) {
549 panic!(
550 "duplicate upstream fragment: {:?} {:?}",
551 m, replace_map
552 );
553 } else {
554 warn!(?m, ?replace_map, "duplicate upstream fragment");
555 }
556 }
557 m.upstream_fragment_id = *new_upstream_fragment_id;
558 }
559 });
560 if cfg!(debug_assertions) {
561 assert!(
562 remaining_fragment_ids.is_empty(),
563 "non-existing fragment to replace: {:?} {:?} {:?}",
564 remaining_fragment_ids,
565 info.nodes,
566 replace_map
567 );
568 } else {
569 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
570 }
571 }
572 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
573 let info = self.fragment_mut(fragment_id);
574 let mut injected = false;
575 visit_stream_node_mut(&mut info.nodes, |node| {
576 if let NodeBody::UpstreamSinkUnion(u) = node {
577 if cfg!(debug_assertions) {
578 let current_upstream_fragment_ids = u
579 .init_upstreams
580 .iter()
581 .map(|upstream| upstream.upstream_fragment_id)
582 .collect::<HashSet<_>>();
583 if current_upstream_fragment_ids
584 .contains(&new_upstream_info.upstream_fragment_id)
585 {
586 panic!(
587 "duplicate upstream fragment: {:?} {:?}",
588 u, new_upstream_info
589 );
590 }
591 }
592 u.init_upstreams.push(new_upstream_info.clone());
593 injected = true;
594 }
595 });
596 assert!(injected, "should inject upstream into UpstreamSinkUnion");
597 }
598 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
599 let info = self.fragment_mut(fragment_id);
600 let mut removed = false;
601 visit_stream_node_mut(&mut info.nodes, |node| {
602 if let NodeBody::UpstreamSinkUnion(u) = node {
603 if cfg!(debug_assertions) {
604 let current_upstream_fragment_ids = u
605 .init_upstreams
606 .iter()
607 .map(|upstream| upstream.upstream_fragment_id)
608 .collect::<HashSet<_>>();
609 for drop_fragment_id in &drop_upstream_fragment_ids {
610 if !current_upstream_fragment_ids.contains(drop_fragment_id)
611 {
612 panic!(
613 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
614 u, drop_upstream_fragment_ids, drop_fragment_id
615 );
616 }
617 }
618 }
619 u.init_upstreams.retain(|upstream| {
620 !drop_upstream_fragment_ids
621 .contains(&upstream.upstream_fragment_id)
622 });
623 removed = true;
624 }
625 });
626 assert!(removed, "should remove upstream from UpstreamSinkUnion");
627 }
628 CommandFragmentChanges::SplitAssignment { actor_splits } => {
629 let info = self.fragment_mut(fragment_id);
630 let actors = &mut info.actors;
631 for (actor_id, splits) in actor_splits {
632 actors.get_mut(&actor_id).expect("should exist").splits = splits;
633 }
634
635 shared_actor_writer.upsert([&*info]);
636 }
637 }
638 }
639 shared_actor_writer.finish();
640 }
641 }
642
643 pub(super) fn build_edge(
644 &self,
645 command: Option<&Command>,
646 control_stream_manager: &ControlStreamManager,
647 ) -> Option<FragmentEdgeBuildResult> {
648 let (info, replace_job, new_upstream_sink) = match command {
649 None => {
650 return None;
651 }
652 Some(command) => match command {
653 Command::Flush
654 | Command::Pause
655 | Command::Resume
656 | Command::DropStreamingJobs { .. }
657 | Command::MergeSnapshotBackfillStreamingJobs(_)
658 | Command::RescheduleFragment { .. }
659 | Command::SourceChangeSplit { .. }
660 | Command::Throttle(_)
661 | Command::CreateSubscription { .. }
662 | Command::DropSubscription { .. }
663 | Command::ConnectorPropsChange(_)
664 | Command::StartFragmentBackfill { .. }
665 | Command::Refresh { .. }
666 | Command::LoadFinish { .. } => {
667 return None;
668 }
669 Command::CreateStreamingJob { info, job_type, .. } => {
670 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
671 new_upstream_sink,
672 ) = job_type
673 {
674 Some(new_upstream_sink)
675 } else {
676 None
677 };
678 (Some(info), None, new_upstream_sink)
679 }
680 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
681 },
682 };
683 let existing_fragment_ids = info
691 .into_iter()
692 .flat_map(|info| info.upstream_fragment_downstreams.keys())
693 .chain(replace_job.into_iter().flat_map(|replace_job| {
694 replace_job
695 .upstream_fragment_downstreams
696 .keys()
697 .filter(|fragment_id| {
698 info.map(|info| {
699 !info
700 .stream_job_fragments
701 .fragments
702 .contains_key(fragment_id)
703 })
704 .unwrap_or(true)
705 })
706 .chain(replace_job.replace_upstream.keys())
707 }))
708 .chain(
709 new_upstream_sink
710 .into_iter()
711 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
712 )
713 .cloned();
714 let new_fragment_infos = info
715 .into_iter()
716 .flat_map(|info| {
717 info.stream_job_fragments
718 .new_fragment_info(&info.init_split_assignment)
719 })
720 .chain(replace_job.into_iter().flat_map(|replace_job| {
721 replace_job
722 .new_fragments
723 .new_fragment_info(&replace_job.init_split_assignment)
724 .chain(
725 replace_job
726 .auto_refresh_schema_sinks
727 .as_ref()
728 .into_iter()
729 .flat_map(|sinks| {
730 sinks.iter().map(|sink| {
731 (sink.new_fragment.fragment_id, sink.new_fragment_info())
732 })
733 }),
734 )
735 }))
736 .collect_vec();
737 let mut builder = FragmentEdgeBuilder::new(
738 existing_fragment_ids
739 .map(|fragment_id| self.fragment(fragment_id))
740 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
741 control_stream_manager,
742 );
743 if let Some(info) = info {
744 builder.add_relations(&info.upstream_fragment_downstreams);
745 builder.add_relations(&info.stream_job_fragments.downstreams);
746 }
747 if let Some(replace_job) = replace_job {
748 builder.add_relations(&replace_job.upstream_fragment_downstreams);
749 builder.add_relations(&replace_job.new_fragments.downstreams);
750 }
751 if let Some(new_upstream_sink) = new_upstream_sink {
752 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
753 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
754 builder.add_edge(sink_fragment_id, new_sink_downstream);
755 }
756 if let Some(replace_job) = replace_job {
757 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
758 for (original_upstream_fragment_id, new_upstream_fragment_id) in
759 fragment_replacement
760 {
761 builder.replace_upstream(
762 *fragment_id,
763 *original_upstream_fragment_id,
764 *new_upstream_fragment_id,
765 );
766 }
767 }
768 }
769 Some(builder.build())
770 }
771}
772
773impl InflightSubscriptionInfo {
774 pub fn pre_apply(&mut self, command: &Command) {
775 if let Command::CreateSubscription {
776 subscription_id,
777 upstream_mv_table_id,
778 retention_second,
779 } = command
780 && let Some(prev_retiontion) = self
781 .mv_depended_subscriptions
782 .entry(*upstream_mv_table_id)
783 .or_default()
784 .insert(*subscription_id, *retention_second)
785 {
786 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
787 }
788 }
789}
790
791impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
792 type Item = PbSubscriptionUpstreamInfo;
793
794 type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
795
796 fn into_iter(self) -> Self::IntoIter {
797 self.mv_depended_subscriptions
798 .iter()
799 .flat_map(|(table_id, subscriptions)| {
800 subscriptions
801 .keys()
802 .map(|subscriber_id| PbSubscriptionUpstreamInfo {
803 subscriber_id: *subscriber_id,
804 upstream_mv_table_id: table_id.table_id,
805 })
806 })
807 }
808}
809
810impl InflightDatabaseInfo {
811 pub(crate) fn post_apply(
814 &mut self,
815 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
816 ) {
817 let inner = self.shared_actor_infos.clone();
818 let mut shared_actor_writer = inner.start_writer(self.database_id);
819 {
820 for (fragment_id, changes) in fragment_changes {
821 match changes {
822 CommandFragmentChanges::NewFragment { .. } => {}
823 CommandFragmentChanges::Reschedule { to_remove, .. } => {
824 let job_id = self.fragment_location[fragment_id];
825 let info = self
826 .jobs
827 .get_mut(&job_id)
828 .expect("should exist")
829 .fragment_infos
830 .get_mut(fragment_id)
831 .expect("should exist");
832 for actor_id in to_remove {
833 assert!(info.actors.remove(&(*actor_id as _)).is_some());
834 }
835 shared_actor_writer.upsert([&*info]);
836 }
837 CommandFragmentChanges::RemoveFragment => {
838 let job_id = self
839 .fragment_location
840 .remove(fragment_id)
841 .expect("should exist");
842 let job = self.jobs.get_mut(&job_id).expect("should exist");
843 let fragment = job
844 .fragment_infos
845 .remove(fragment_id)
846 .expect("should exist");
847 shared_actor_writer.remove(&fragment);
848 if job.fragment_infos.is_empty() {
849 self.jobs.remove(&job_id).expect("should exist");
850 }
851 }
852 CommandFragmentChanges::ReplaceNodeUpstream(_)
853 | CommandFragmentChanges::AddNodeUpstream(_)
854 | CommandFragmentChanges::DropNodeUpstream(_)
855 | CommandFragmentChanges::SplitAssignment { .. } => {}
856 }
857 }
858 }
859 shared_actor_writer.finish();
860 }
861}
862
863impl InflightSubscriptionInfo {
864 pub fn post_apply(&mut self, command: &Command) {
865 if let Command::DropSubscription {
866 subscription_id,
867 upstream_mv_table_id,
868 } = command
869 {
870 let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
871 Some(subscriptions) => {
872 let removed = subscriptions.remove(subscription_id).is_some();
873 if removed && subscriptions.is_empty() {
874 self.mv_depended_subscriptions.remove(upstream_mv_table_id);
875 }
876 removed
877 }
878 None => false,
879 };
880 if !removed {
881 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
882 }
883 }
884 }
885}
886
887impl InflightFragmentInfo {
888 pub(crate) fn actor_ids_to_collect(
890 infos: impl IntoIterator<Item = &Self>,
891 ) -> HashMap<WorkerId, HashSet<ActorId>> {
892 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
893 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
894 assert!(
895 ret.entry(actor.worker_id)
896 .or_default()
897 .insert(*actor_id as _)
898 )
899 }
900 ret
901 }
902
903 pub fn existing_table_ids<'a>(
904 infos: impl IntoIterator<Item = &'a Self> + 'a,
905 ) -> impl Iterator<Item = TableId> + 'a {
906 infos
907 .into_iter()
908 .flat_map(|info| info.state_table_ids.iter().cloned())
909 }
910
911 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
912 infos.into_iter().any(|fragment| {
913 fragment
914 .actors
915 .values()
916 .any(|actor| (actor.worker_id) == worker_id)
917 })
918 }
919
920 pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
921 infos
922 .into_iter()
923 .flat_map(|info| info.actors.values())
924 .map(|actor| actor.worker_id)
925 .collect()
926 }
927}
928
929impl InflightDatabaseInfo {
930 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
931 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
932 }
933
934 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
935 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
936 }
937}