1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_meta_model::WorkerId;
22use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use tracing::warn;
25
26use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
27use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
28use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
29use crate::model::{ActorId, FragmentId, SubscriptionId};
30
31#[derive(Debug, Clone)]
32pub(super) struct BarrierInfo {
33 pub prev_epoch: TracedEpoch,
34 pub curr_epoch: TracedEpoch,
35 pub kind: BarrierKind,
36}
37
38impl BarrierInfo {
39 pub(super) fn prev_epoch(&self) -> u64 {
40 self.prev_epoch.value().0
41 }
42}
43
44#[derive(Debug, Clone)]
45pub(crate) enum CommandFragmentChanges {
46 NewFragment(TableId, InflightFragmentInfo),
47 ReplaceNodeUpstream(
48 HashMap<FragmentId, FragmentId>,
50 ),
51 Reschedule {
52 new_actors: HashMap<ActorId, InflightActorInfo>,
53 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
54 to_remove: HashSet<ActorId>,
55 },
56 RemoveFragment,
57}
58
59#[derive(Default, Clone, Debug)]
60pub struct InflightSubscriptionInfo {
61 pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
63}
64
65#[derive(Clone, Debug)]
66pub struct InflightStreamingJobInfo {
67 pub job_id: TableId,
68 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
69}
70
71impl InflightStreamingJobInfo {
72 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
73 self.fragment_infos.values()
74 }
75
76 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
77 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
78 }
79}
80
81impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
82 type Item = &'a InflightFragmentInfo;
83
84 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
85
86 fn into_iter(self) -> Self::IntoIter {
87 self.fragment_infos()
88 }
89}
90
91#[derive(Clone, Debug)]
92pub struct InflightDatabaseInfo {
93 jobs: HashMap<TableId, InflightStreamingJobInfo>,
94 fragment_location: HashMap<FragmentId, TableId>,
95}
96
97impl InflightDatabaseInfo {
98 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
99 self.jobs.values().flat_map(|job| job.fragment_infos())
100 }
101
102 pub fn job_ids(&self) -> impl Iterator<Item = TableId> + '_ {
103 self.jobs.keys().cloned()
104 }
105
106 pub fn contains_job(&self, job_id: TableId) -> bool {
107 self.jobs.contains_key(&job_id)
108 }
109
110 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
111 let job_id = self.fragment_location[&fragment_id];
112 self.jobs
113 .get(&job_id)
114 .expect("should exist")
115 .fragment_infos
116 .get(&fragment_id)
117 .expect("should exist")
118 }
119
120 fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
121 let job_id = self.fragment_location[&fragment_id];
122 self.jobs
123 .get_mut(&job_id)
124 .expect("should exist")
125 .fragment_infos
126 .get_mut(&fragment_id)
127 .expect("should exist")
128 }
129}
130
131impl InflightDatabaseInfo {
132 pub fn empty() -> Self {
133 Self {
134 jobs: Default::default(),
135 fragment_location: Default::default(),
136 }
137 }
138
139 pub fn new<I: Iterator<Item = (FragmentId, InflightFragmentInfo)>>(
141 fragment_infos: impl Iterator<Item = (TableId, I)>,
142 ) -> Self {
143 let mut fragment_location = HashMap::new();
144 let mut jobs = HashMap::new();
145
146 for (job_id, job_fragment_info) in fragment_infos {
147 let job_fragment_info: HashMap<_, _> = job_fragment_info.collect();
148 assert!(!job_fragment_info.is_empty());
149 for fragment_id in job_fragment_info.keys() {
150 fragment_location
151 .try_insert(*fragment_id, job_id)
152 .expect("no duplicate");
153 }
154 jobs.insert(
155 job_id,
156 InflightStreamingJobInfo {
157 job_id,
158 fragment_infos: job_fragment_info,
159 },
160 );
161 }
162 assert!(!jobs.is_empty());
163 Self {
164 jobs,
165 fragment_location,
166 }
167 }
168
169 pub fn is_empty(&self) -> bool {
170 self.jobs.is_empty()
171 }
172
173 pub(crate) fn extend(&mut self, job: InflightStreamingJobInfo) {
174 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
175 (
176 fragment_id,
177 CommandFragmentChanges::NewFragment(job.job_id, info),
178 )
179 }))
180 }
181
182 pub(crate) fn pre_apply(
185 &mut self,
186 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
187 ) {
188 self.apply_add(
189 fragment_changes
190 .iter()
191 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
192 )
193 }
194
195 fn apply_add(
196 &mut self,
197 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
198 ) {
199 {
200 for (fragment_id, change) in fragment_changes {
201 match change {
202 CommandFragmentChanges::NewFragment(job_id, info) => {
203 let fragment_infos =
204 self.jobs
205 .entry(job_id)
206 .or_insert_with(|| InflightStreamingJobInfo {
207 job_id,
208 fragment_infos: Default::default(),
209 });
210 fragment_infos
211 .fragment_infos
212 .try_insert(fragment_id, info)
213 .expect("non duplicate");
214 self.fragment_location
215 .try_insert(fragment_id, job_id)
216 .expect("non duplicate");
217 }
218 CommandFragmentChanges::Reschedule {
219 new_actors,
220 actor_update_vnode_bitmap,
221 ..
222 } => {
223 let info = self.fragment_mut(fragment_id);
224 let actors = &mut info.actors;
225 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
226 actors
227 .get_mut(&actor_id)
228 .expect("should exist")
229 .vnode_bitmap = Some(new_vnodes);
230 }
231 for (actor_id, actor) in new_actors {
232 actors
233 .try_insert(actor_id as _, actor)
234 .expect("non-duplicate");
235 }
236 }
237 CommandFragmentChanges::RemoveFragment => {}
238 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
239 let mut remaining_fragment_ids: HashSet<_> =
240 replace_map.keys().cloned().collect();
241 let info = self.fragment_mut(fragment_id);
242 visit_stream_node_mut(&mut info.nodes, |node| {
243 if let NodeBody::Merge(m) = node
244 && let Some(new_upstream_fragment_id) =
245 replace_map.get(&m.upstream_fragment_id)
246 {
247 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
248 if cfg!(debug_assertions) {
249 panic!(
250 "duplicate upstream fragment: {:?} {:?}",
251 m, replace_map
252 );
253 } else {
254 warn!(?m, ?replace_map, "duplicate upstream fragment");
255 }
256 }
257 m.upstream_fragment_id = *new_upstream_fragment_id;
258 }
259 });
260 if cfg!(debug_assertions) {
261 assert!(
262 remaining_fragment_ids.is_empty(),
263 "non-existing fragment to replace: {:?} {:?} {:?}",
264 remaining_fragment_ids,
265 info.nodes,
266 replace_map
267 );
268 } else {
269 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
270 }
271 }
272 }
273 }
274 }
275 }
276
277 pub(super) fn build_edge(&self, command: Option<&Command>) -> Option<FragmentEdgeBuildResult> {
278 let (info, replace_job) = match command {
279 None => {
280 return None;
281 }
282 Some(command) => match command {
283 Command::Flush
284 | Command::Pause
285 | Command::Resume
286 | Command::DropStreamingJobs { .. }
287 | Command::MergeSnapshotBackfillStreamingJobs(_)
288 | Command::RescheduleFragment { .. }
289 | Command::SourceChangeSplit(_)
290 | Command::Throttle(_)
291 | Command::CreateSubscription { .. }
292 | Command::DropSubscription { .. } => {
293 return None;
294 }
295 Command::CreateStreamingJob { info, job_type, .. } => {
296 let replace_job = match job_type {
297 CreateStreamingJobType::Normal
298 | CreateStreamingJobType::SnapshotBackfill(_) => None,
299 CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
300 };
301 (Some(info), replace_job)
302 }
303 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
304 },
305 };
306 let existing_fragment_ids = info
312 .into_iter()
313 .flat_map(|info| info.upstream_fragment_downstreams.keys())
314 .chain(replace_job.into_iter().flat_map(|replace_job| {
315 replace_job
316 .upstream_fragment_downstreams
317 .keys()
318 .filter(|fragment_id| {
319 info.map(|info| {
320 !info
321 .stream_job_fragments
322 .fragments
323 .contains_key(fragment_id)
324 })
325 .unwrap_or(true)
326 })
327 .chain(replace_job.replace_upstream.keys())
328 }))
329 .cloned();
330 let new_fragment_infos = info
331 .into_iter()
332 .flat_map(|info| info.stream_job_fragments.new_fragment_info())
333 .chain(
334 replace_job
335 .into_iter()
336 .flat_map(|replace_job| replace_job.new_fragments.new_fragment_info()),
337 )
338 .collect_vec();
339 let mut builder = FragmentEdgeBuilder::new(
340 existing_fragment_ids
341 .map(|fragment_id| self.fragment(fragment_id))
342 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
343 );
344 if let Some(info) = info {
345 builder.add_relations(&info.upstream_fragment_downstreams);
346 builder.add_relations(&info.stream_job_fragments.downstreams);
347 }
348 if let Some(replace_job) = replace_job {
349 builder.add_relations(&replace_job.upstream_fragment_downstreams);
350 builder.add_relations(&replace_job.new_fragments.downstreams);
351 }
352 if let Some(replace_job) = replace_job {
353 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
354 for (original_upstream_fragment_id, new_upstream_fragment_id) in
355 fragment_replacement
356 {
357 builder.replace_upstream(
358 *fragment_id,
359 *original_upstream_fragment_id,
360 *new_upstream_fragment_id,
361 );
362 }
363 }
364 }
365 Some(builder.build())
366 }
367}
368
369impl InflightSubscriptionInfo {
370 pub fn pre_apply(&mut self, command: &Command) {
371 if let Command::CreateSubscription {
372 subscription_id,
373 upstream_mv_table_id,
374 retention_second,
375 } = command
376 {
377 if let Some(prev_retiontion) = self
378 .mv_depended_subscriptions
379 .entry(*upstream_mv_table_id)
380 .or_default()
381 .insert(*subscription_id, *retention_second)
382 {
383 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
384 }
385 }
386 }
387}
388
389impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
390 type Item = PbSubscriptionUpstreamInfo;
391
392 type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
393
394 fn into_iter(self) -> Self::IntoIter {
395 self.mv_depended_subscriptions
396 .iter()
397 .flat_map(|(table_id, subscriptions)| {
398 subscriptions
399 .keys()
400 .map(|subscriber_id| PbSubscriptionUpstreamInfo {
401 subscriber_id: *subscriber_id,
402 upstream_mv_table_id: table_id.table_id,
403 })
404 })
405 }
406}
407
408impl InflightDatabaseInfo {
409 pub(crate) fn post_apply(
412 &mut self,
413 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
414 ) {
415 {
416 for (fragment_id, changes) in fragment_changes {
417 match changes {
418 CommandFragmentChanges::NewFragment(_, _) => {}
419 CommandFragmentChanges::Reschedule { to_remove, .. } => {
420 let job_id = self.fragment_location[fragment_id];
421 let info = self
422 .jobs
423 .get_mut(&job_id)
424 .expect("should exist")
425 .fragment_infos
426 .get_mut(fragment_id)
427 .expect("should exist");
428 for actor_id in to_remove {
429 assert!(info.actors.remove(&(*actor_id as _)).is_some());
430 }
431 }
432 CommandFragmentChanges::RemoveFragment => {
433 let job_id = self
434 .fragment_location
435 .remove(fragment_id)
436 .expect("should exist");
437 let job = self.jobs.get_mut(&job_id).expect("should exist");
438 job.fragment_infos
439 .remove(fragment_id)
440 .expect("should exist");
441 if job.fragment_infos.is_empty() {
442 self.jobs.remove(&job_id).expect("should exist");
443 }
444 }
445 CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
446 }
447 }
448 }
449 }
450}
451
452impl InflightSubscriptionInfo {
453 pub fn post_apply(&mut self, command: &Command) {
454 if let Command::DropSubscription {
455 subscription_id,
456 upstream_mv_table_id,
457 } = command
458 {
459 let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
460 Some(subscriptions) => {
461 let removed = subscriptions.remove(subscription_id).is_some();
462 if removed && subscriptions.is_empty() {
463 self.mv_depended_subscriptions.remove(upstream_mv_table_id);
464 }
465 removed
466 }
467 None => false,
468 };
469 if !removed {
470 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
471 }
472 }
473 }
474}
475
476impl InflightFragmentInfo {
477 pub(crate) fn actor_ids_to_collect(
479 infos: impl IntoIterator<Item = &Self>,
480 ) -> HashMap<WorkerId, HashSet<ActorId>> {
481 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
482 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
483 assert!(
484 ret.entry(actor.worker_id)
485 .or_default()
486 .insert(*actor_id as _)
487 )
488 }
489 ret
490 }
491
492 pub fn existing_table_ids<'a>(
493 infos: impl IntoIterator<Item = &'a Self> + 'a,
494 ) -> impl Iterator<Item = TableId> + 'a {
495 infos
496 .into_iter()
497 .flat_map(|info| info.state_table_ids.iter().cloned())
498 }
499
500 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
501 infos.into_iter().any(|fragment| {
502 fragment
503 .actors
504 .values()
505 .any(|actor| (actor.worker_id) == worker_id)
506 })
507 }
508}
509
510impl InflightDatabaseInfo {
511 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
512 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
513 }
514
515 pub(crate) fn workers(&self) -> HashSet<WorkerId> {
516 self.fragment_infos()
517 .flat_map(|info| info.actors.values())
518 .map(|actor| actor.worker_id)
519 .collect()
520 }
521
522 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
523 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
524 }
525}