1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_meta_model::WorkerId;
22use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use tracing::warn;
25
26use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
27use crate::barrier::rpc::ControlStreamManager;
28use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
29use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
30use crate::model::{ActorId, FragmentId, SubscriptionId};
31
32#[derive(Debug, Clone)]
33pub(super) struct BarrierInfo {
34 pub prev_epoch: TracedEpoch,
35 pub curr_epoch: TracedEpoch,
36 pub kind: BarrierKind,
37}
38
39impl BarrierInfo {
40 pub(super) fn prev_epoch(&self) -> u64 {
41 self.prev_epoch.value().0
42 }
43}
44
45#[derive(Debug, Clone)]
46pub(crate) enum CommandFragmentChanges {
47 NewFragment(TableId, InflightFragmentInfo),
48 ReplaceNodeUpstream(
49 HashMap<FragmentId, FragmentId>,
51 ),
52 Reschedule {
53 new_actors: HashMap<ActorId, InflightActorInfo>,
54 actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
55 to_remove: HashSet<ActorId>,
56 },
57 RemoveFragment,
58}
59
60#[derive(Default, Clone, Debug)]
61pub struct InflightSubscriptionInfo {
62 pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
64}
65
66#[derive(Clone, Debug)]
67pub struct InflightStreamingJobInfo {
68 pub job_id: TableId,
69 pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
70}
71
72impl InflightStreamingJobInfo {
73 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
74 self.fragment_infos.values()
75 }
76
77 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
78 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
79 }
80}
81
82impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
83 type Item = &'a InflightFragmentInfo;
84
85 type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
86
87 fn into_iter(self) -> Self::IntoIter {
88 self.fragment_infos()
89 }
90}
91
92#[derive(Clone, Debug)]
93pub struct InflightDatabaseInfo {
94 jobs: HashMap<TableId, InflightStreamingJobInfo>,
95 fragment_location: HashMap<FragmentId, TableId>,
96}
97
98impl InflightDatabaseInfo {
99 pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
100 self.jobs.values().flat_map(|job| job.fragment_infos())
101 }
102
103 pub fn contains_job(&self, job_id: TableId) -> bool {
104 self.jobs.contains_key(&job_id)
105 }
106
107 pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
108 let job_id = self.fragment_location[&fragment_id];
109 self.jobs
110 .get(&job_id)
111 .expect("should exist")
112 .fragment_infos
113 .get(&fragment_id)
114 .expect("should exist")
115 }
116
117 fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
118 let job_id = self.fragment_location[&fragment_id];
119 self.jobs
120 .get_mut(&job_id)
121 .expect("should exist")
122 .fragment_infos
123 .get_mut(&fragment_id)
124 .expect("should exist")
125 }
126}
127
128impl InflightDatabaseInfo {
129 pub fn empty() -> Self {
130 Self {
131 jobs: Default::default(),
132 fragment_location: Default::default(),
133 }
134 }
135
136 pub fn is_empty(&self) -> bool {
137 self.jobs.is_empty()
138 }
139
140 pub(crate) fn extend(&mut self, job: InflightStreamingJobInfo) {
141 self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
142 (
143 fragment_id,
144 CommandFragmentChanges::NewFragment(job.job_id, info),
145 )
146 }))
147 }
148
149 pub(crate) fn pre_apply(
152 &mut self,
153 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
154 ) {
155 self.apply_add(
156 fragment_changes
157 .iter()
158 .map(|(fragment_id, change)| (*fragment_id, change.clone())),
159 )
160 }
161
162 fn apply_add(
163 &mut self,
164 fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
165 ) {
166 {
167 for (fragment_id, change) in fragment_changes {
168 match change {
169 CommandFragmentChanges::NewFragment(job_id, info) => {
170 let fragment_infos =
171 self.jobs
172 .entry(job_id)
173 .or_insert_with(|| InflightStreamingJobInfo {
174 job_id,
175 fragment_infos: Default::default(),
176 });
177 fragment_infos
178 .fragment_infos
179 .try_insert(fragment_id, info)
180 .expect("non duplicate");
181 self.fragment_location
182 .try_insert(fragment_id, job_id)
183 .expect("non duplicate");
184 }
185 CommandFragmentChanges::Reschedule {
186 new_actors,
187 actor_update_vnode_bitmap,
188 ..
189 } => {
190 let info = self.fragment_mut(fragment_id);
191 let actors = &mut info.actors;
192 for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
193 actors
194 .get_mut(&actor_id)
195 .expect("should exist")
196 .vnode_bitmap = Some(new_vnodes);
197 }
198 for (actor_id, actor) in new_actors {
199 actors
200 .try_insert(actor_id as _, actor)
201 .expect("non-duplicate");
202 }
203 }
204 CommandFragmentChanges::RemoveFragment => {}
205 CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
206 let mut remaining_fragment_ids: HashSet<_> =
207 replace_map.keys().cloned().collect();
208 let info = self.fragment_mut(fragment_id);
209 visit_stream_node_mut(&mut info.nodes, |node| {
210 if let NodeBody::Merge(m) = node
211 && let Some(new_upstream_fragment_id) =
212 replace_map.get(&m.upstream_fragment_id)
213 {
214 if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
215 if cfg!(debug_assertions) {
216 panic!(
217 "duplicate upstream fragment: {:?} {:?}",
218 m, replace_map
219 );
220 } else {
221 warn!(?m, ?replace_map, "duplicate upstream fragment");
222 }
223 }
224 m.upstream_fragment_id = *new_upstream_fragment_id;
225 }
226 });
227 if cfg!(debug_assertions) {
228 assert!(
229 remaining_fragment_ids.is_empty(),
230 "non-existing fragment to replace: {:?} {:?} {:?}",
231 remaining_fragment_ids,
232 info.nodes,
233 replace_map
234 );
235 } else {
236 warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
237 }
238 }
239 }
240 }
241 }
242 }
243
244 pub(super) fn build_edge(
245 &self,
246 command: Option<&Command>,
247 control_stream_manager: &ControlStreamManager,
248 ) -> Option<FragmentEdgeBuildResult> {
249 let (info, replace_job) = match command {
250 None => {
251 return None;
252 }
253 Some(command) => match command {
254 Command::Flush
255 | Command::Pause
256 | Command::Resume
257 | Command::DropStreamingJobs { .. }
258 | Command::MergeSnapshotBackfillStreamingJobs(_)
259 | Command::RescheduleFragment { .. }
260 | Command::SourceChangeSplit(_)
261 | Command::Throttle(_)
262 | Command::CreateSubscription { .. }
263 | Command::DropSubscription { .. }
264 | Command::ConnectorPropsChange(_) => {
265 return None;
266 }
267 Command::CreateStreamingJob { info, job_type, .. } => {
268 let replace_job = match job_type {
269 CreateStreamingJobType::Normal
270 | CreateStreamingJobType::SnapshotBackfill(_) => None,
271 CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
272 };
273 (Some(info), replace_job)
274 }
275 Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
276 },
277 };
278 let existing_fragment_ids = info
284 .into_iter()
285 .flat_map(|info| info.upstream_fragment_downstreams.keys())
286 .chain(replace_job.into_iter().flat_map(|replace_job| {
287 replace_job
288 .upstream_fragment_downstreams
289 .keys()
290 .filter(|fragment_id| {
291 info.map(|info| {
292 !info
293 .stream_job_fragments
294 .fragments
295 .contains_key(fragment_id)
296 })
297 .unwrap_or(true)
298 })
299 .chain(replace_job.replace_upstream.keys())
300 }))
301 .cloned();
302 let new_fragment_infos = info
303 .into_iter()
304 .flat_map(|info| info.stream_job_fragments.new_fragment_info())
305 .chain(
306 replace_job
307 .into_iter()
308 .flat_map(|replace_job| replace_job.new_fragments.new_fragment_info()),
309 )
310 .collect_vec();
311 let mut builder = FragmentEdgeBuilder::new(
312 existing_fragment_ids
313 .map(|fragment_id| self.fragment(fragment_id))
314 .chain(new_fragment_infos.iter().map(|(_, info)| info)),
315 control_stream_manager,
316 );
317 if let Some(info) = info {
318 builder.add_relations(&info.upstream_fragment_downstreams);
319 builder.add_relations(&info.stream_job_fragments.downstreams);
320 }
321 if let Some(replace_job) = replace_job {
322 builder.add_relations(&replace_job.upstream_fragment_downstreams);
323 builder.add_relations(&replace_job.new_fragments.downstreams);
324 }
325 if let Some(replace_job) = replace_job {
326 for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
327 for (original_upstream_fragment_id, new_upstream_fragment_id) in
328 fragment_replacement
329 {
330 builder.replace_upstream(
331 *fragment_id,
332 *original_upstream_fragment_id,
333 *new_upstream_fragment_id,
334 );
335 }
336 }
337 }
338 Some(builder.build())
339 }
340}
341
342impl InflightSubscriptionInfo {
343 pub fn pre_apply(&mut self, command: &Command) {
344 if let Command::CreateSubscription {
345 subscription_id,
346 upstream_mv_table_id,
347 retention_second,
348 } = command
349 {
350 if let Some(prev_retiontion) = self
351 .mv_depended_subscriptions
352 .entry(*upstream_mv_table_id)
353 .or_default()
354 .insert(*subscription_id, *retention_second)
355 {
356 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
357 }
358 }
359 }
360}
361
362impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
363 type Item = PbSubscriptionUpstreamInfo;
364
365 type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
366
367 fn into_iter(self) -> Self::IntoIter {
368 self.mv_depended_subscriptions
369 .iter()
370 .flat_map(|(table_id, subscriptions)| {
371 subscriptions
372 .keys()
373 .map(|subscriber_id| PbSubscriptionUpstreamInfo {
374 subscriber_id: *subscriber_id,
375 upstream_mv_table_id: table_id.table_id,
376 })
377 })
378 }
379}
380
381impl InflightDatabaseInfo {
382 pub(crate) fn post_apply(
385 &mut self,
386 fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
387 ) {
388 {
389 for (fragment_id, changes) in fragment_changes {
390 match changes {
391 CommandFragmentChanges::NewFragment(_, _) => {}
392 CommandFragmentChanges::Reschedule { to_remove, .. } => {
393 let job_id = self.fragment_location[fragment_id];
394 let info = self
395 .jobs
396 .get_mut(&job_id)
397 .expect("should exist")
398 .fragment_infos
399 .get_mut(fragment_id)
400 .expect("should exist");
401 for actor_id in to_remove {
402 assert!(info.actors.remove(&(*actor_id as _)).is_some());
403 }
404 }
405 CommandFragmentChanges::RemoveFragment => {
406 let job_id = self
407 .fragment_location
408 .remove(fragment_id)
409 .expect("should exist");
410 let job = self.jobs.get_mut(&job_id).expect("should exist");
411 job.fragment_infos
412 .remove(fragment_id)
413 .expect("should exist");
414 if job.fragment_infos.is_empty() {
415 self.jobs.remove(&job_id).expect("should exist");
416 }
417 }
418 CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
419 }
420 }
421 }
422 }
423}
424
425impl InflightSubscriptionInfo {
426 pub fn post_apply(&mut self, command: &Command) {
427 if let Command::DropSubscription {
428 subscription_id,
429 upstream_mv_table_id,
430 } = command
431 {
432 let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
433 Some(subscriptions) => {
434 let removed = subscriptions.remove(subscription_id).is_some();
435 if removed && subscriptions.is_empty() {
436 self.mv_depended_subscriptions.remove(upstream_mv_table_id);
437 }
438 removed
439 }
440 None => false,
441 };
442 if !removed {
443 warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
444 }
445 }
446 }
447}
448
449impl InflightFragmentInfo {
450 pub(crate) fn actor_ids_to_collect(
452 infos: impl IntoIterator<Item = &Self>,
453 ) -> HashMap<WorkerId, HashSet<ActorId>> {
454 let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
455 for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
456 assert!(
457 ret.entry(actor.worker_id)
458 .or_default()
459 .insert(*actor_id as _)
460 )
461 }
462 ret
463 }
464
465 pub fn existing_table_ids<'a>(
466 infos: impl IntoIterator<Item = &'a Self> + 'a,
467 ) -> impl Iterator<Item = TableId> + 'a {
468 infos
469 .into_iter()
470 .flat_map(|info| info.state_table_ids.iter().cloned())
471 }
472
473 pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
474 infos.into_iter().any(|fragment| {
475 fragment
476 .actors
477 .values()
478 .any(|actor| (actor.worker_id) == worker_id)
479 })
480 }
481
482 pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
483 infos
484 .into_iter()
485 .flat_map(|info| info.actors.values())
486 .map(|actor| actor.worker_id)
487 .collect()
488 }
489}
490
491impl InflightDatabaseInfo {
492 pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
493 InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
494 }
495
496 pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
497 InflightFragmentInfo::existing_table_ids(self.fragment_infos())
498 }
499}