1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
23use risingwave_meta_model::WorkerId;
24use risingwave_meta_model::fragment::DistributionType;
25use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
26use risingwave_pb::meta::subscribe_response::Operation;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{PbSubscriptionUpstreamInfo, PbUpstreamSinkInfo};
29use tracing::warn;
30
31use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
32use crate::barrier::rpc::ControlStreamManager;
33use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
34use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
35use crate::controller::utils::rebuild_fragment_mapping;
36use crate::manager::NotificationManagerRef;
37use crate::model::{ActorId, FragmentId, SubscriptionId};
38
39#[derive(Debug, Clone)]
40pub struct SharedFragmentInfo {
41 pub fragment_id: FragmentId,
42 pub distribution_type: DistributionType,
43 pub actors: HashMap<ActorId, InflightActorInfo>,
44}
45
46impl From<&InflightFragmentInfo> for SharedFragmentInfo {
47 fn from(info: &InflightFragmentInfo) -> Self {
48 Self {
49 fragment_id: info.fragment_id,
50 distribution_type: info.distribution_type,
51 actors: info.actors.clone(),
52 }
53 }
54}
55
56type SharedActorInfosInner = HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>;
57
58#[derive(Clone, educe::Educe)]
59#[educe(Debug)]
60pub(crate) struct SharedActorInfos {
61 inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
62 #[educe(Debug(ignore))]
63 notification_manager: NotificationManagerRef,
64}
65
66impl SharedActorInfos {
67 pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
68 Self {
69 inner: Arc::new(Default::default()),
70 notification_manager,
71 }
72 }
73
74 pub(super) fn remove_database(&self, database_id: DatabaseId) {
75 if let Some(database) = self.inner.write().remove(&database_id) {
76 let mapping = database
77 .into_values()
78 .map(|fragment| rebuild_fragment_mapping(&fragment))
79 .collect_vec();
80 if !mapping.is_empty() {
81 self.notification_manager
82 .notify_fragment_mapping(Operation::Delete, mapping);
83 }
84 }
85 }
86
87 pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
88 let database_ids: HashSet<_> = database_ids.into_iter().collect();
89
90 let mut mapping = Vec::new();
91 for fragment in self
92 .inner
93 .write()
94 .extract_if(|database_id, _| !database_ids.contains(database_id))
95 .flat_map(|(_, fragments)| fragments.into_values())
96 {
97 mapping.push(rebuild_fragment_mapping(&fragment));
98 }
99 if !mapping.is_empty() {
100 self.notification_manager
101 .notify_fragment_mapping(Operation::Delete, mapping);
102 }
103 }
104
105 pub(super) fn recover_database(
106 &self,
107 database_id: DatabaseId,
108 fragments: impl Iterator<Item = &InflightFragmentInfo>,
109 ) {
110 let mut remaining_fragments: HashMap<_, _> = fragments
111 .map(|fragment| (fragment.fragment_id, fragment))
112 .collect();
113 let mut writer = self.start_writer(database_id);
115 let database = writer.write_guard.entry(database_id).or_default();
116 for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
117 if let Some(info) = remaining_fragments.remove(fragment_id) {
118 let info = info.into();
119 writer
120 .updated_fragment_mapping
121 .get_or_insert_default()
122 .push(rebuild_fragment_mapping(&info));
123 *fragment_info = info;
124 false
125 } else {
126 true
127 }
128 }) {
129 writer
130 .deleted_fragment_mapping
131 .get_or_insert_default()
132 .push(rebuild_fragment_mapping(&fragment));
133 }
134 for (fragment_id, fragment) in remaining_fragments {
135 let info = fragment.into();
136 writer
137 .added_fragment_mapping
138 .get_or_insert_default()
139 .push(rebuild_fragment_mapping(&info));
140 database.insert(fragment_id, info);
141 }
142 writer.finish();
143 }
144
145 pub(super) fn upsert(
146 &self,
147 database_id: DatabaseId,
148 infos: impl IntoIterator<Item = &InflightFragmentInfo>,
149 ) {
150 let mut writer = self.start_writer(database_id);
151 writer.upsert(infos);
152 writer.finish();
153 }
154
155 pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
156 SharedActorInfoWriter {
157 database_id,
158 write_guard: self.inner.write(),
159 notification_manager: &self.notification_manager,
160 added_fragment_mapping: None,
161 updated_fragment_mapping: None,
162 deleted_fragment_mapping: None,
163 }
164 }
165}
166
167pub(super) struct SharedActorInfoWriter<'a> {
168 database_id: DatabaseId,
169 write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
170 notification_manager: &'a NotificationManagerRef,
171 added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
172 updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
173 deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
174}
175
176impl SharedActorInfoWriter<'_> {
177 pub(super) fn upsert(&mut self, infos: impl IntoIterator<Item = &InflightFragmentInfo>) {
178 let database = self.write_guard.entry(self.database_id).or_default();
179 for info in infos {
180 match database.entry(info.fragment_id) {
181 Entry::Occupied(mut entry) => {
182 let info = info.into();
183 self.updated_fragment_mapping
184 .get_or_insert_default()
185 .push(rebuild_fragment_mapping(&info));
186 entry.insert(info);
187 }
188 Entry::Vacant(entry) => {
189 let info = info.into();
190 self.added_fragment_mapping
191 .get_or_insert_default()
192 .push(rebuild_fragment_mapping(&info));
193 entry.insert(info);
194 }
195 }
196 }
197 }
198
199 pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
200 if let Some(database) = self.write_guard.get_mut(&self.database_id)
201 && let Some(fragment) = database.remove(&info.fragment_id)
202 {
203 self.deleted_fragment_mapping
204 .get_or_insert_default()
205 .push(rebuild_fragment_mapping(&fragment));
206 }
207 }
208
209 pub(super) fn finish(self) {
210 if let Some(mapping) = self.added_fragment_mapping {
211 self.notification_manager
212 .notify_fragment_mapping(Operation::Add, mapping);
213 }
214 if let Some(mapping) = self.updated_fragment_mapping {
215 self.notification_manager
216 .notify_fragment_mapping(Operation::Update, mapping);
217 }
218 if let Some(mapping) = self.deleted_fragment_mapping {
219 self.notification_manager
220 .notify_fragment_mapping(Operation::Delete, mapping);
221 }
222 }
223}
224
225#[derive(Debug, Clone)]
226pub(super) struct BarrierInfo {
227 pub prev_epoch: TracedEpoch,
228 pub curr_epoch: TracedEpoch,
229 pub kind: BarrierKind,
230}
231
232impl BarrierInfo {
233 pub(super) fn prev_epoch(&self) -> u64 {
234 self.prev_epoch.value().0
235 }
236}
237
238#[derive(Debug, Clone)]
239pub(crate) enum CommandFragmentChanges {
240 NewFragment {
241 job_id: TableId,
242 info: InflightFragmentInfo,
243 is_existing: bool,
247 },
248 AddNodeUpstream(PbUpstreamSinkInfo),
249 DropNodeUpstream(Vec<FragmentId>),
250 ReplaceNodeUpstream(
251 HashMap<FragmentId, FragmentId>,
253 ),
254 Reschedule {
255 new_actors: HashMap<ActorId, InflightActorInfo>,
256 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
257 to_remove: HashSet<ActorId>,
258 },
259 RemoveFragment,
260}
261
262#[derive(Default, Clone, Debug)]
263pub struct InflightSubscriptionInfo {
264 pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
266}
267
268#[derive(Clone, Debug)]
269pub struct InflightStreamingJobInfo {
270 pub job_id: TableId,
271 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
272}
273
274impl InflightStreamingJobInfo {
275 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
276 self.fragment_infos.values()
277 }
278
279 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
280 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
281 }
282}
283
284impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
285 type Item = &'a InflightFragmentInfo;
286
287 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
288
289 fn into_iter(self) -> Self::IntoIter {
290 self.fragment_infos()
291 }
292}
293
294#[derive(Clone, Debug)]
295pub struct InflightDatabaseInfo {
296 database_id: DatabaseId,
297 jobs: HashMap<TableId, InflightStreamingJobInfo>,
298 fragment_location: HashMap<FragmentId, TableId>,
299 pub(super) shared_actor_infos: SharedActorInfos,
300}
301
302impl InflightDatabaseInfo {
303 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
304 self.jobs.values().flat_map(|job| job.fragment_infos())
305 }
306
307 pub fn contains_job(&self, job_id: TableId) -> bool {
308 self.jobs.contains_key(&job_id)
309 }
310
311 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
312 let job_id = self.fragment_location[&fragment_id];
313 self.jobs
314 .get(&job_id)
315 .expect("should exist")
316 .fragment_infos
317 .get(&fragment_id)
318 .expect("should exist")
319 }
320
321 fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
322 let job_id = self.fragment_location[&fragment_id];
323 self.jobs
324 .get_mut(&job_id)
325 .expect("should exist")
326 .fragment_infos
327 .get_mut(&fragment_id)
328 .expect("should exist")
329 }
330
331 fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
332 Self {
333 database_id,
334 jobs: Default::default(),
335 fragment_location: Default::default(),
336 shared_actor_infos,
337 }
338 }
339
340 pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
341 shared_actor_infos.remove_database(database_id);
343 Self::empty_inner(database_id, shared_actor_infos)
344 }
345
346 pub fn recover(
347 database_id: DatabaseId,
348 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
349 shared_actor_infos: SharedActorInfos,
350 ) -> Self {
351 let mut info = Self::empty_inner(database_id, shared_actor_infos);
352 for job in jobs {
353 info.add_existing(job);
354 }
355 info
356 }
357
358 pub fn is_empty(&self) -> bool {
359 self.jobs.is_empty()
360 }
361
362 pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
363 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
364 (
365 fragment_id,
366 CommandFragmentChanges::NewFragment {
367 job_id: job.job_id,
368 info,
369 is_existing: true,
370 },
371 )
372 }))
373 }
374
375 pub(crate) fn pre_apply(
378 &mut self,
379 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
380 ) {
381 self.apply_add(
382 fragment_changes
383 .iter()
384 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
385 )
386 }
387
388 fn apply_add(
389 &mut self,
390 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
391 ) {
392 {
393 let shared_infos = self.shared_actor_infos.clone();
394 let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
395 for (fragment_id, change) in fragment_changes {
396 match change {
397 CommandFragmentChanges::NewFragment {
398 job_id,
399 info,
400 is_existing,
401 } => {
402 let fragment_infos =
403 self.jobs
404 .entry(job_id)
405 .or_insert_with(|| InflightStreamingJobInfo {
406 job_id,
407 fragment_infos: Default::default(),
408 });
409 if !is_existing {
410 shared_actor_writer.upsert([&info]);
411 }
412 fragment_infos
413 .fragment_infos
414 .try_insert(fragment_id, info)
415 .expect("non duplicate");
416 self.fragment_location
417 .try_insert(fragment_id, job_id)
418 .expect("non duplicate");
419 }
420 CommandFragmentChanges::Reschedule {
421 new_actors,
422 actor_update_vnode_bitmap,
423 ..
424 } => {
425 let info = self.fragment_mut(fragment_id);
426 let actors = &mut info.actors;
427 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
428 actors
429 .get_mut(&actor_id)
430 .expect("should exist")
431 .vnode_bitmap = Some(new_vnodes);
432 }
433 for (actor_id, actor) in new_actors {
434 actors
435 .try_insert(actor_id as _, actor)
436 .expect("non-duplicate");
437 }
438 }
439 CommandFragmentChanges::RemoveFragment => {}
440 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
441 let mut remaining_fragment_ids: HashSet<_> =
442 replace_map.keys().cloned().collect();
443 let info = self.fragment_mut(fragment_id);
444 visit_stream_node_mut(&mut info.nodes, |node| {
445 if let NodeBody::Merge(m) = node
446 && let Some(new_upstream_fragment_id) =
447 replace_map.get(&m.upstream_fragment_id)
448 {
449 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
450 if cfg!(debug_assertions) {
451 panic!(
452 "duplicate upstream fragment: {:?} {:?}",
453 m, replace_map
454 );
455 } else {
456 warn!(?m, ?replace_map, "duplicate upstream fragment");
457 }
458 }
459 m.upstream_fragment_id = *new_upstream_fragment_id;
460 }
461 });
462 if cfg!(debug_assertions) {
463 assert!(
464 remaining_fragment_ids.is_empty(),
465 "non-existing fragment to replace: {:?} {:?} {:?}",
466 remaining_fragment_ids,
467 info.nodes,
468 replace_map
469 );
470 } else {
471 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
472 }
473 }
474 CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
475 let info = self.fragment_mut(fragment_id);
476 let mut injected = false;
477 visit_stream_node_mut(&mut info.nodes, |node| {
478 if let NodeBody::UpstreamSinkUnion(u) = node {
479 if cfg!(debug_assertions) {
480 let current_upstream_fragment_ids = u
481 .init_upstreams
482 .iter()
483 .map(|upstream| upstream.upstream_fragment_id)
484 .collect::<HashSet<_>>();
485 if current_upstream_fragment_ids
486 .contains(&new_upstream_info.upstream_fragment_id)
487 {
488 panic!(
489 "duplicate upstream fragment: {:?} {:?}",
490 u, new_upstream_info
491 );
492 }
493 }
494 u.init_upstreams.push(new_upstream_info.clone());
495 injected = true;
496 }
497 });
498 assert!(injected, "should inject upstream into UpstreamSinkUnion");
499 }
500 CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
501 let info = self.fragment_mut(fragment_id);
502 let mut removed = false;
503 visit_stream_node_mut(&mut info.nodes, |node| {
504 if let NodeBody::UpstreamSinkUnion(u) = node {
505 if cfg!(debug_assertions) {
506 let current_upstream_fragment_ids = u
507 .init_upstreams
508 .iter()
509 .map(|upstream| upstream.upstream_fragment_id)
510 .collect::<HashSet<_>>();
511 for drop_fragment_id in &drop_upstream_fragment_ids {
512 if !current_upstream_fragment_ids.contains(drop_fragment_id)
513 {
514 panic!(
515 "non-existing upstream fragment to drop: {:?} {:?} {:?}",
516 u, drop_upstream_fragment_ids, drop_fragment_id
517 );
518 }
519 }
520 }
521 u.init_upstreams.retain(|upstream| {
522 !drop_upstream_fragment_ids
523 .contains(&upstream.upstream_fragment_id)
524 });
525 removed = true;
526 }
527 });
528 assert!(removed, "should remove upstream from UpstreamSinkUnion");
529 }
530 }
531 }
532 shared_actor_writer.finish();
533 }
534 }
535
536 pub(super) fn build_edge(
537 &self,
538 command: Option<&Command>,
539 control_stream_manager: &ControlStreamManager,
540 ) -> Option<FragmentEdgeBuildResult> {
541 let (info, replace_job, new_upstream_sink) = match command {
542 None => {
543 return None;
544 }
545 Some(command) => match command {
546 Command::Flush
547 | Command::Pause
548 | Command::Resume
549 | Command::DropStreamingJobs { .. }
550 | Command::MergeSnapshotBackfillStreamingJobs(_)
551 | Command::RescheduleFragment { .. }
552 | Command::SourceChangeSplit(_)
553 | Command::Throttle(_)
554 | Command::CreateSubscription { .. }
555 | Command::DropSubscription { .. }
556 | Command::ConnectorPropsChange(_)
557 | Command::StartFragmentBackfill { .. }
558 | Command::Refresh { .. }
559 | Command::LoadFinish { .. } => {
560 return None;
561 }
562 Command::CreateStreamingJob { info, job_type, .. } => {
563 let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
564 new_upstream_sink,
565 ) = job_type
566 {
567 Some(new_upstream_sink)
568 } else {
569 None
570 };
571 (Some(info), None, new_upstream_sink)
572 }
573 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
574 },
575 };
576 let existing_fragment_ids = info
584 .into_iter()
585 .flat_map(|info| info.upstream_fragment_downstreams.keys())
586 .chain(replace_job.into_iter().flat_map(|replace_job| {
587 replace_job
588 .upstream_fragment_downstreams
589 .keys()
590 .filter(|fragment_id| {
591 info.map(|info| {
592 !info
593 .stream_job_fragments
594 .fragments
595 .contains_key(fragment_id)
596 })
597 .unwrap_or(true)
598 })
599 .chain(replace_job.replace_upstream.keys())
600 }))
601 .chain(
602 new_upstream_sink
603 .into_iter()
604 .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
605 )
606 .cloned();
607 let new_fragment_infos = info
608 .into_iter()
609 .flat_map(|info| info.stream_job_fragments.new_fragment_info())
610 .chain(replace_job.into_iter().flat_map(|replace_job| {
611 replace_job.new_fragments.new_fragment_info().chain(
612 replace_job
613 .auto_refresh_schema_sinks
614 .as_ref()
615 .into_iter()
616 .flat_map(|sinks| {
617 sinks.iter().map(|sink| {
618 (sink.new_fragment.fragment_id, sink.new_fragment_info())
619 })
620 }),
621 )
622 }))
623 .collect_vec();
624 let mut builder = FragmentEdgeBuilder::new(
625 existing_fragment_ids
626 .map(|fragment_id| self.fragment(fragment_id))
627 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
628 control_stream_manager,
629 );
630 if let Some(info) = info {
631 builder.add_relations(&info.upstream_fragment_downstreams);
632 builder.add_relations(&info.stream_job_fragments.downstreams);
633 }
634 if let Some(replace_job) = replace_job {
635 builder.add_relations(&replace_job.upstream_fragment_downstreams);
636 builder.add_relations(&replace_job.new_fragments.downstreams);
637 }
638 if let Some(new_upstream_sink) = new_upstream_sink {
639 let sink_fragment_id = new_upstream_sink.sink_fragment_id;
640 let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
641 builder.add_edge(sink_fragment_id, new_sink_downstream);
642 }
643 if let Some(replace_job) = replace_job {
644 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
645 for (original_upstream_fragment_id, new_upstream_fragment_id) in
646 fragment_replacement
647 {
648 builder.replace_upstream(
649 *fragment_id,
650 *original_upstream_fragment_id,
651 *new_upstream_fragment_id,
652 );
653 }
654 }
655 }
656 Some(builder.build())
657 }
658}
659
660impl InflightSubscriptionInfo {
661 pub fn pre_apply(&mut self, command: &Command) {
662 if let Command::CreateSubscription {
663 subscription_id,
664 upstream_mv_table_id,
665 retention_second,
666 } = command
667 && let Some(prev_retiontion) = self
668 .mv_depended_subscriptions
669 .entry(*upstream_mv_table_id)
670 .or_default()
671 .insert(*subscription_id, *retention_second)
672 {
673 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
674 }
675 }
676}
677
678impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
679 type Item = PbSubscriptionUpstreamInfo;
680
681 type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
682
683 fn into_iter(self) -> Self::IntoIter {
684 self.mv_depended_subscriptions
685 .iter()
686 .flat_map(|(table_id, subscriptions)| {
687 subscriptions
688 .keys()
689 .map(|subscriber_id| PbSubscriptionUpstreamInfo {
690 subscriber_id: *subscriber_id,
691 upstream_mv_table_id: table_id.table_id,
692 })
693 })
694 }
695}
696
697impl InflightDatabaseInfo {
698 pub(crate) fn post_apply(
701 &mut self,
702 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
703 ) {
704 let inner = self.shared_actor_infos.clone();
705 let mut shared_actor_writer = inner.start_writer(self.database_id);
706 {
707 for (fragment_id, changes) in fragment_changes {
708 match changes {
709 CommandFragmentChanges::NewFragment { .. } => {}
710 CommandFragmentChanges::Reschedule { to_remove, .. } => {
711 let job_id = self.fragment_location[fragment_id];
712 let info = self
713 .jobs
714 .get_mut(&job_id)
715 .expect("should exist")
716 .fragment_infos
717 .get_mut(fragment_id)
718 .expect("should exist");
719 for actor_id in to_remove {
720 assert!(info.actors.remove(&(*actor_id as _)).is_some());
721 }
722 shared_actor_writer.upsert([&*info]);
723 }
724 CommandFragmentChanges::RemoveFragment => {
725 let job_id = self
726 .fragment_location
727 .remove(fragment_id)
728 .expect("should exist");
729 let job = self.jobs.get_mut(&job_id).expect("should exist");
730 let fragment = job
731 .fragment_infos
732 .remove(fragment_id)
733 .expect("should exist");
734 shared_actor_writer.remove(&fragment);
735 if job.fragment_infos.is_empty() {
736 self.jobs.remove(&job_id).expect("should exist");
737 }
738 }
739 CommandFragmentChanges::ReplaceNodeUpstream(_)
740 | CommandFragmentChanges::AddNodeUpstream(_)
741 | CommandFragmentChanges::DropNodeUpstream(_) => {}
742 }
743 }
744 }
745 shared_actor_writer.finish();
746 }
747}
748
749impl InflightSubscriptionInfo {
750 pub fn post_apply(&mut self, command: &Command) {
751 if let Command::DropSubscription {
752 subscription_id,
753 upstream_mv_table_id,
754 } = command
755 {
756 let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
757 Some(subscriptions) => {
758 let removed = subscriptions.remove(subscription_id).is_some();
759 if removed && subscriptions.is_empty() {
760 self.mv_depended_subscriptions.remove(upstream_mv_table_id);
761 }
762 removed
763 }
764 None => false,
765 };
766 if !removed {
767 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
768 }
769 }
770 }
771}
772
773impl InflightFragmentInfo {
774 pub(crate) fn actor_ids_to_collect(
776 infos: impl IntoIterator<Item = &Self>,
777 ) -> HashMap<WorkerId, HashSet<ActorId>> {
778 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
779 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
780 assert!(
781 ret.entry(actor.worker_id)
782 .or_default()
783 .insert(*actor_id as _)
784 )
785 }
786 ret
787 }
788
789 pub fn existing_table_ids<'a>(
790 infos: impl IntoIterator<Item = &'a Self> + 'a,
791 ) -> impl Iterator<Item = TableId> + 'a {
792 infos
793 .into_iter()
794 .flat_map(|info| info.state_table_ids.iter().cloned())
795 }
796
797 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
798 infos.into_iter().any(|fragment| {
799 fragment
800 .actors
801 .values()
802 .any(|actor| (actor.worker_id) == worker_id)
803 })
804 }
805
806 pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
807 infos
808 .into_iter()
809 .flat_map(|info| info.actors.values())
810 .map(|actor| actor.worker_id)
811 .collect()
812 }
813}
814
815impl InflightDatabaseInfo {
816 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
817 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
818 }
819
820 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
821 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
822 }
823}