1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use risingwave_pb::stream_plan::barrier_mutation::PbMutation;
25use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
26use risingwave_pb::stream_plan::{
27 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
28};
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
33use crate::barrier::edge_builder::FragmentEdgeBuilder;
34use crate::barrier::info::{
35 BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
36};
37use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
38use crate::barrier::utils::NodeToCollect;
39use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
40use crate::controller::fragment::InflightFragmentInfo;
41use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
42
43pub(in crate::barrier) struct BarrierWorkerState {
45 in_flight_prev_epoch: TracedEpoch,
50
51 pending_non_checkpoint_barriers: Vec<u64>,
53
54 is_paused: bool,
56}
57
58impl BarrierWorkerState {
59 pub(super) fn new() -> Self {
60 Self {
61 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
62 pending_non_checkpoint_barriers: vec![],
63 is_paused: false,
64 }
65 }
66
67 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
68 Self {
69 in_flight_prev_epoch,
70 pending_non_checkpoint_barriers: vec![],
71 is_paused,
72 }
73 }
74
75 pub fn is_paused(&self) -> bool {
76 self.is_paused
77 }
78
79 fn set_is_paused(&mut self, is_paused: bool) {
80 if self.is_paused != is_paused {
81 tracing::info!(
82 currently_paused = self.is_paused,
83 newly_paused = is_paused,
84 "update paused state"
85 );
86 self.is_paused = is_paused;
87 }
88 }
89
90 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
91 &self.in_flight_prev_epoch
92 }
93
94 pub fn next_barrier_info(
96 &mut self,
97 is_checkpoint: bool,
98 curr_epoch: TracedEpoch,
99 ) -> BarrierInfo {
100 assert!(
101 self.in_flight_prev_epoch.value() < curr_epoch.value(),
102 "curr epoch regress. {} > {}",
103 self.in_flight_prev_epoch.value(),
104 curr_epoch.value()
105 );
106 let prev_epoch = self.in_flight_prev_epoch.clone();
107 self.in_flight_prev_epoch = curr_epoch.clone();
108 self.pending_non_checkpoint_barriers
109 .push(prev_epoch.value().0);
110 let kind = if is_checkpoint {
111 let epochs = take(&mut self.pending_non_checkpoint_barriers);
112 BarrierKind::Checkpoint(epochs)
113 } else {
114 BarrierKind::Barrier
115 };
116 BarrierInfo {
117 prev_epoch,
118 curr_epoch,
119 kind,
120 }
121 }
122}
123
124pub(super) struct ApplyCommandInfo {
125 pub mv_subscription_max_retention: HashMap<TableId, u64>,
126 pub table_ids_to_commit: HashSet<TableId>,
127 pub jobs_to_wait: HashSet<JobId>,
128 pub node_to_collect: NodeToCollect,
129 pub command: Option<Command>,
130}
131
132impl DatabaseCheckpointControl {
133 pub(super) fn apply_command(
136 &mut self,
137 mut command: Option<Command>,
138 barrier_info: &BarrierInfo,
139 control_stream_manager: &mut ControlStreamManager,
140 hummock_version_stats: &HummockVersionStats,
141 ) -> MetaResult<ApplyCommandInfo> {
142 let mut edges = self
143 .database_info
144 .build_edge(command.as_ref(), &*control_stream_manager);
145
146 if let &mut Some(Command::CreateStreamingJob {
148 ref mut job_type,
149 ref mut info,
150 ref cross_db_snapshot_backfill_info,
151 }) = &mut command
152 {
153 match job_type {
154 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
155 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
156 fill_snapshot_backfill_epoch(
157 &mut fragment.nodes,
158 None,
159 cross_db_snapshot_backfill_info,
160 )?;
161 }
162 }
163 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
164 assert!(!self.state.is_paused());
165 let snapshot_epoch = barrier_info.prev_epoch();
166 for snapshot_backfill_epoch in snapshot_backfill_info
168 .upstream_mv_table_id_to_backfill_epoch
169 .values_mut()
170 {
171 assert_eq!(
172 snapshot_backfill_epoch.replace(snapshot_epoch),
173 None,
174 "must not set previously"
175 );
176 }
177 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
178 fill_snapshot_backfill_epoch(
179 &mut fragment.nodes,
180 Some(snapshot_backfill_info),
181 cross_db_snapshot_backfill_info,
182 )?;
183 }
184 let job_id = info.stream_job_fragments.stream_job_id();
185 let snapshot_backfill_upstream_tables = snapshot_backfill_info
186 .upstream_mv_table_id_to_backfill_epoch
187 .keys()
188 .cloned()
189 .collect();
190
191 let job = CreatingStreamingJobControl::new(
192 info,
193 snapshot_backfill_upstream_tables,
194 snapshot_epoch,
195 hummock_version_stats,
196 control_stream_manager,
197 edges.as_mut().expect("should exist"),
198 )?;
199
200 self.database_info
201 .shared_actor_infos
202 .upsert(self.database_id, job.fragment_infos_with_job_id());
203
204 self.creating_streaming_job_controls.insert(job_id, job);
205 }
206 }
207 }
208
209 let post_apply_changes = if let Some(Command::CreateStreamingJob {
211 job_type: CreateStreamingJobType::SnapshotBackfill(_),
212 ..
213 }) = command
214 {
215 None
216 } else if let Some((new_job, fragment_changes)) =
217 command.as_ref().and_then(Command::fragment_changes)
218 {
219 Some(self.database_info.pre_apply(new_job, fragment_changes))
220 } else {
221 None
222 };
223
224 match &command {
225 Some(Command::CreateSubscription {
226 subscription_id,
227 upstream_mv_table_id,
228 retention_second,
229 }) => {
230 self.database_info.register_subscriber(
231 upstream_mv_table_id.as_job_id(),
232 subscription_id.as_subscriber_id(),
233 SubscriberType::Subscription(*retention_second),
234 );
235 }
236 Some(Command::CreateStreamingJob {
237 info,
238 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
239 ..
240 }) => {
241 for upstream_mv_table_id in snapshot_backfill_info
242 .upstream_mv_table_id_to_backfill_epoch
243 .keys()
244 {
245 self.database_info.register_subscriber(
246 upstream_mv_table_id.as_job_id(),
247 info.streaming_job.id().as_subscriber_id(),
248 SubscriberType::SnapshotBackfill,
249 );
250 }
251 }
252 _ => {}
253 };
254
255 let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
256 let mut actors_to_create = command.as_ref().and_then(|command| {
257 command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
258 });
259 let mut node_actors =
260 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
261
262 if let Some(post_apply_changes) = post_apply_changes {
263 self.database_info.post_apply(post_apply_changes);
264 }
265
266 let prev_is_paused = self.state.is_paused();
267 let curr_is_paused = match command {
268 Some(Command::Pause) => true,
269 Some(Command::Resume) => false,
270 _ => prev_is_paused,
271 };
272 self.state.set_is_paused(curr_is_paused);
273
274 let mutation = if let Some(c) = &command {
275 c.to_mutation(
276 prev_is_paused,
277 &mut edges,
278 control_stream_manager,
279 &mut self.database_info,
280 )?
281 } else {
282 None
283 };
284
285 let mut finished_snapshot_backfill_jobs = HashSet::new();
286 let mutation = match mutation {
287 Some(mutation) => Some(mutation),
288 None => {
289 let mut finished_snapshot_backfill_job_info = HashMap::new();
290 if barrier_info.kind.is_checkpoint() {
291 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
292 if creating_job.should_merge_to_upstream() {
293 let info = creating_job
294 .start_consume_upstream(control_stream_manager, barrier_info)?;
295 finished_snapshot_backfill_job_info
296 .try_insert(job_id, info)
297 .expect("non-duplicated");
298 }
299 }
300 }
301
302 if !finished_snapshot_backfill_job_info.is_empty() {
303 let actors_to_create = actors_to_create.get_or_insert_default();
304 let mut subscriptions_to_drop = vec![];
305 let mut dispatcher_update = vec![];
306 let mut actor_splits = HashMap::new();
307 for (job_id, info) in finished_snapshot_backfill_job_info {
308 finished_snapshot_backfill_jobs.insert(job_id);
309 subscriptions_to_drop.extend(
310 info.snapshot_backfill_upstream_tables.iter().map(
311 |upstream_table_id| PbSubscriptionUpstreamInfo {
312 subscriber_id: job_id.as_subscriber_id(),
313 upstream_mv_table_id: *upstream_table_id,
314 },
315 ),
316 );
317 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
318 assert_matches!(
319 self.database_info.unregister_subscriber(
320 upstream_mv_table_id.as_job_id(),
321 job_id.as_subscriber_id()
322 ),
323 Some(SubscriberType::SnapshotBackfill)
324 );
325 }
326
327 table_ids_to_commit.extend(
328 info.fragment_infos
329 .values()
330 .flat_map(|fragment| fragment.state_table_ids.iter())
331 .copied(),
332 );
333
334 let actor_len = info
335 .fragment_infos
336 .values()
337 .map(|fragment| fragment.actors.len() as u64)
338 .sum();
339 let id_gen = GlobalActorIdGen::new(
340 control_stream_manager.env.actor_id_generator(),
341 actor_len,
342 );
343 let mut next_local_actor_id = 0;
344 let actor_mapping: HashMap<_, _> = info
346 .fragment_infos
347 .values()
348 .flat_map(|fragment| fragment.actors.keys())
349 .map(|old_actor_id| {
350 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
351 next_local_actor_id += 1;
352 (*old_actor_id, new_actor_id.as_global_id())
353 })
354 .collect();
355 let actor_mapping = &actor_mapping;
356 let new_stream_actors: HashMap<_, _> = info
357 .stream_actors
358 .into_iter()
359 .map(|(old_actor_id, mut actor)| {
360 let new_actor_id = actor_mapping[&old_actor_id];
361 actor.actor_id = new_actor_id;
362 (new_actor_id, actor)
363 })
364 .collect();
365 let new_fragment_info: HashMap<_, _> = info
366 .fragment_infos
367 .into_iter()
368 .map(|(fragment_id, mut fragment)| {
369 let actors = take(&mut fragment.actors);
370 fragment.actors = actors
371 .into_iter()
372 .map(|(old_actor_id, actor)| {
373 let new_actor_id = actor_mapping[&old_actor_id];
374 (new_actor_id, actor)
375 })
376 .collect();
377 (fragment_id, fragment)
378 })
379 .collect();
380 actor_splits.extend(
381 new_fragment_info
382 .values()
383 .flat_map(|fragment| &fragment.actors)
384 .map(|(actor_id, actor)| {
385 (
386 *actor_id,
387 ConnectorSplits {
388 splits: actor
389 .splits
390 .iter()
391 .map(ConnectorSplit::from)
392 .collect(),
393 },
394 )
395 }),
396 );
397 let partial_graph_id = to_partial_graph_id(None);
399 let mut edge_builder = FragmentEdgeBuilder::new(
400 info.upstream_fragment_downstreams
401 .keys()
402 .map(|upstream_fragment_id| {
403 self.database_info.fragment(*upstream_fragment_id)
404 })
405 .chain(new_fragment_info.values())
406 .map(|info| (info, partial_graph_id)),
407 control_stream_manager,
408 );
409 edge_builder.add_relations(&info.upstream_fragment_downstreams);
410 edge_builder.add_relations(&info.downstreams);
411 let mut edges = edge_builder.build();
412 let new_actors_to_create = edges.collect_actors_to_create(
413 new_fragment_info.values().map(|fragment| {
414 (
415 fragment.fragment_id,
416 &fragment.nodes,
417 fragment.actors.iter().map(|(actor_id, actor)| {
418 (&new_stream_actors[actor_id], actor.worker_id)
419 }),
420 [], )
422 }),
423 );
424 dispatcher_update.extend(
425 info.upstream_fragment_downstreams.keys().flat_map(
426 |upstream_fragment_id| {
427 let new_actor_dispatchers = edges
428 .dispatchers
429 .remove(upstream_fragment_id)
430 .expect("should exist");
431 new_actor_dispatchers.into_iter().flat_map(
432 |(upstream_actor_id, dispatchers)| {
433 dispatchers.into_iter().map(move |dispatcher| {
434 PbDispatcherUpdate {
435 actor_id: upstream_actor_id,
436 dispatcher_id: dispatcher.dispatcher_id,
437 hash_mapping: dispatcher.hash_mapping,
438 removed_downstream_actor_id: dispatcher
439 .downstream_actor_id
440 .iter()
441 .map(|new_downstream_actor_id| {
442 actor_mapping
443 .iter()
444 .find_map(
445 |(old_actor_id, new_actor_id)| {
446 (new_downstream_actor_id
447 == new_actor_id)
448 .then_some(*old_actor_id)
449 },
450 )
451 .expect("should exist")
452 })
453 .collect(),
454 added_downstream_actor_id: dispatcher
455 .downstream_actor_id,
456 }
457 })
458 },
459 )
460 },
461 ),
462 );
463 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
464 for (worker_id, worker_actors) in new_actors_to_create {
465 node_actors.entry(worker_id).or_default().extend(
466 worker_actors.values().flat_map(|(_, actors, _)| {
467 actors.iter().map(|(actor, _, _)| actor.actor_id)
468 }),
469 );
470 actors_to_create
471 .entry(worker_id)
472 .or_default()
473 .extend(worker_actors);
474 }
475 self.database_info.add_existing(InflightStreamingJobInfo {
476 job_id,
477 fragment_infos: new_fragment_info,
478 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
480 cdc_table_backfill_tracker: None, });
482 }
483
484 Some(PbMutation::Update(PbUpdateMutation {
485 dispatcher_update,
486 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
490 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
494 }))
495 } else {
496 let fragment_ids = self.database_info.take_pending_backfill_nodes();
497 if fragment_ids.is_empty() {
498 None
499 } else {
500 Some(PbMutation::StartFragmentBackfill(
501 PbStartFragmentBackfillMutation { fragment_ids },
502 ))
503 }
504 }
505 }
506 };
507
508 #[expect(clippy::collapsible_if)]
509 if let Some(Command::DropSubscription {
510 subscription_id,
511 upstream_mv_table_id,
512 }) = command
513 {
514 if self
515 .database_info
516 .unregister_subscriber(
517 upstream_mv_table_id.as_job_id(),
518 subscription_id.as_subscriber_id(),
519 )
520 .is_none()
521 {
522 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
523 }
524 }
525
526 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
527 if !finished_snapshot_backfill_jobs.contains(job_id) {
528 creating_job.on_new_upstream_barrier(control_stream_manager, barrier_info)?;
529 }
530 }
531
532 let node_to_collect = control_stream_manager.inject_barrier(
533 self.database_id,
534 None,
535 mutation,
536 barrier_info,
537 &node_actors,
538 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
539 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
540 actors_to_create,
541 )?;
542
543 Ok(ApplyCommandInfo {
544 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
545 table_ids_to_commit,
546 jobs_to_wait: finished_snapshot_backfill_jobs,
547 node_to_collect,
548 command,
549 })
550 }
551}