1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use risingwave_pb::stream_plan::barrier_mutation::PbMutation;
25use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
26use risingwave_pb::stream_plan::{
27 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
28};
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
33use crate::barrier::edge_builder::FragmentEdgeBuilder;
34use crate::barrier::info::{
35 BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
36};
37use crate::barrier::rpc::ControlStreamManager;
38use crate::barrier::utils::NodeToCollect;
39use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
40use crate::controller::fragment::InflightFragmentInfo;
41use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
42
43pub(in crate::barrier) struct BarrierWorkerState {
45 in_flight_prev_epoch: TracedEpoch,
50
51 pending_non_checkpoint_barriers: Vec<u64>,
53
54 is_paused: bool,
56}
57
58impl BarrierWorkerState {
59 pub(super) fn new() -> Self {
60 Self {
61 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
62 pending_non_checkpoint_barriers: vec![],
63 is_paused: false,
64 }
65 }
66
67 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
68 Self {
69 in_flight_prev_epoch,
70 pending_non_checkpoint_barriers: vec![],
71 is_paused,
72 }
73 }
74
75 pub fn is_paused(&self) -> bool {
76 self.is_paused
77 }
78
79 fn set_is_paused(&mut self, is_paused: bool) {
80 if self.is_paused != is_paused {
81 tracing::info!(
82 currently_paused = self.is_paused,
83 newly_paused = is_paused,
84 "update paused state"
85 );
86 self.is_paused = is_paused;
87 }
88 }
89
90 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
91 &self.in_flight_prev_epoch
92 }
93
94 pub fn next_barrier_info(
96 &mut self,
97 is_checkpoint: bool,
98 curr_epoch: TracedEpoch,
99 ) -> BarrierInfo {
100 assert!(
101 self.in_flight_prev_epoch.value() < curr_epoch.value(),
102 "curr epoch regress. {} > {}",
103 self.in_flight_prev_epoch.value(),
104 curr_epoch.value()
105 );
106 let prev_epoch = self.in_flight_prev_epoch.clone();
107 self.in_flight_prev_epoch = curr_epoch.clone();
108 self.pending_non_checkpoint_barriers
109 .push(prev_epoch.value().0);
110 let kind = if is_checkpoint {
111 let epochs = take(&mut self.pending_non_checkpoint_barriers);
112 BarrierKind::Checkpoint(epochs)
113 } else {
114 BarrierKind::Barrier
115 };
116 BarrierInfo {
117 prev_epoch,
118 curr_epoch,
119 kind,
120 }
121 }
122}
123
124pub(super) struct ApplyCommandInfo {
125 pub mv_subscription_max_retention: HashMap<TableId, u64>,
126 pub table_ids_to_commit: HashSet<TableId>,
127 pub jobs_to_wait: HashSet<JobId>,
128 pub node_to_collect: NodeToCollect,
129 pub command: Option<Command>,
130}
131
132impl DatabaseCheckpointControl {
133 pub(super) fn apply_command(
136 &mut self,
137 mut command: Option<Command>,
138 barrier_info: &BarrierInfo,
139 control_stream_manager: &mut ControlStreamManager,
140 hummock_version_stats: &HummockVersionStats,
141 ) -> MetaResult<ApplyCommandInfo> {
142 let mut edges = self
143 .database_info
144 .build_edge(command.as_ref(), &*control_stream_manager);
145
146 if let &mut Some(Command::CreateStreamingJob {
148 ref mut job_type,
149 ref mut info,
150 ref cross_db_snapshot_backfill_info,
151 }) = &mut command
152 {
153 match job_type {
154 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
155 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
156 fill_snapshot_backfill_epoch(
157 &mut fragment.nodes,
158 None,
159 cross_db_snapshot_backfill_info,
160 )?;
161 }
162 }
163 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
164 assert!(!self.state.is_paused());
165 let snapshot_epoch = barrier_info.prev_epoch();
166 for snapshot_backfill_epoch in snapshot_backfill_info
168 .upstream_mv_table_id_to_backfill_epoch
169 .values_mut()
170 {
171 assert_eq!(
172 snapshot_backfill_epoch.replace(snapshot_epoch),
173 None,
174 "must not set previously"
175 );
176 }
177 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
178 fill_snapshot_backfill_epoch(
179 &mut fragment.nodes,
180 Some(snapshot_backfill_info),
181 cross_db_snapshot_backfill_info,
182 )?;
183 }
184 let job_id = info.stream_job_fragments.stream_job_id();
185 let snapshot_backfill_upstream_tables = snapshot_backfill_info
186 .upstream_mv_table_id_to_backfill_epoch
187 .keys()
188 .cloned()
189 .collect();
190
191 let job = CreatingStreamingJobControl::new(
192 info,
193 snapshot_backfill_upstream_tables,
194 snapshot_epoch,
195 hummock_version_stats,
196 control_stream_manager,
197 edges.as_mut().expect("should exist"),
198 )?;
199
200 self.database_info
201 .shared_actor_infos
202 .upsert(self.database_id, job.fragment_infos_with_job_id());
203
204 self.creating_streaming_job_controls.insert(job_id, job);
205 }
206 }
207 }
208
209 let post_apply_changes = if let Some(Command::CreateStreamingJob {
211 job_type: CreateStreamingJobType::SnapshotBackfill(_),
212 ..
213 }) = command
214 {
215 None
216 } else if let Some((new_job, fragment_changes)) =
217 command.as_ref().and_then(Command::fragment_changes)
218 {
219 Some(self.database_info.pre_apply(new_job, fragment_changes))
220 } else {
221 None
222 };
223
224 match &command {
225 Some(Command::CreateSubscription {
226 subscription_id,
227 upstream_mv_table_id,
228 retention_second,
229 }) => {
230 self.database_info.register_subscriber(
231 upstream_mv_table_id.as_job_id(),
232 subscription_id.as_subscriber_id(),
233 SubscriberType::Subscription(*retention_second),
234 );
235 }
236 Some(Command::CreateStreamingJob {
237 info,
238 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
239 ..
240 }) => {
241 for upstream_mv_table_id in snapshot_backfill_info
242 .upstream_mv_table_id_to_backfill_epoch
243 .keys()
244 {
245 self.database_info.register_subscriber(
246 upstream_mv_table_id.as_job_id(),
247 info.streaming_job.id().as_subscriber_id(),
248 SubscriberType::SnapshotBackfill,
249 );
250 }
251 }
252 _ => {}
253 };
254
255 let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
256 let mut actors_to_create = command.as_ref().and_then(|command| {
257 command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
258 });
259 let mut node_actors =
260 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
261
262 if let Some(post_apply_changes) = post_apply_changes {
263 self.database_info.post_apply(post_apply_changes);
264 }
265
266 let prev_is_paused = self.state.is_paused();
267 let curr_is_paused = match command {
268 Some(Command::Pause) => true,
269 Some(Command::Resume) => false,
270 _ => prev_is_paused,
271 };
272 self.state.set_is_paused(curr_is_paused);
273
274 let mutation = if let Some(c) = &command {
275 c.to_mutation(
276 prev_is_paused,
277 &mut edges,
278 control_stream_manager,
279 &mut self.database_info,
280 )?
281 } else {
282 None
283 };
284
285 let mut finished_snapshot_backfill_jobs = HashSet::new();
286 let mutation = match mutation {
287 Some(mutation) => Some(mutation),
288 None => {
289 let mut finished_snapshot_backfill_job_info = HashMap::new();
290 if barrier_info.kind.is_checkpoint() {
291 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
292 if creating_job.should_merge_to_upstream() {
293 let info = creating_job
294 .start_consume_upstream(control_stream_manager, barrier_info)?;
295 finished_snapshot_backfill_job_info
296 .try_insert(job_id, info)
297 .expect("non-duplicated");
298 }
299 }
300 }
301
302 if !finished_snapshot_backfill_job_info.is_empty() {
303 let actors_to_create = actors_to_create.get_or_insert_default();
304 let mut subscriptions_to_drop = vec![];
305 let mut dispatcher_update = vec![];
306 let mut actor_splits = HashMap::new();
307 for (job_id, info) in finished_snapshot_backfill_job_info {
308 finished_snapshot_backfill_jobs.insert(job_id);
309 subscriptions_to_drop.extend(
310 info.snapshot_backfill_upstream_tables.iter().map(
311 |upstream_table_id| PbSubscriptionUpstreamInfo {
312 subscriber_id: job_id.as_subscriber_id(),
313 upstream_mv_table_id: *upstream_table_id,
314 },
315 ),
316 );
317 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
318 assert_matches!(
319 self.database_info.unregister_subscriber(
320 upstream_mv_table_id.as_job_id(),
321 job_id.as_subscriber_id()
322 ),
323 Some(SubscriberType::SnapshotBackfill)
324 );
325 }
326
327 table_ids_to_commit.extend(
328 info.fragment_infos
329 .values()
330 .flat_map(|fragment| fragment.state_table_ids.iter())
331 .copied(),
332 );
333
334 let actor_len = info
335 .fragment_infos
336 .values()
337 .map(|fragment| fragment.actors.len() as u64)
338 .sum();
339 let id_gen = GlobalActorIdGen::new(
340 control_stream_manager.env.actor_id_generator(),
341 actor_len,
342 );
343 let mut next_local_actor_id = 0;
344 let actor_mapping: HashMap<_, _> = info
346 .fragment_infos
347 .values()
348 .flat_map(|fragment| fragment.actors.keys())
349 .map(|old_actor_id| {
350 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
351 next_local_actor_id += 1;
352 (*old_actor_id, new_actor_id.as_global_id())
353 })
354 .collect();
355 let actor_mapping = &actor_mapping;
356 let new_stream_actors: HashMap<_, _> = info
357 .stream_actors
358 .into_iter()
359 .map(|(old_actor_id, mut actor)| {
360 let new_actor_id = actor_mapping[&old_actor_id];
361 actor.actor_id = new_actor_id;
362 (new_actor_id, actor)
363 })
364 .collect();
365 let new_fragment_info: HashMap<_, _> = info
366 .fragment_infos
367 .into_iter()
368 .map(|(fragment_id, mut fragment)| {
369 let actors = take(&mut fragment.actors);
370 fragment.actors = actors
371 .into_iter()
372 .map(|(old_actor_id, actor)| {
373 let new_actor_id = actor_mapping[&old_actor_id];
374 (new_actor_id, actor)
375 })
376 .collect();
377 (fragment_id, fragment)
378 })
379 .collect();
380 actor_splits.extend(
381 new_fragment_info
382 .values()
383 .flat_map(|fragment| &fragment.actors)
384 .map(|(actor_id, actor)| {
385 (
386 *actor_id,
387 ConnectorSplits {
388 splits: actor
389 .splits
390 .iter()
391 .map(ConnectorSplit::from)
392 .collect(),
393 },
394 )
395 }),
396 );
397 let mut edge_builder = FragmentEdgeBuilder::new(
398 info.upstream_fragment_downstreams
399 .keys()
400 .map(|upstream_fragment_id| {
401 self.database_info.fragment(*upstream_fragment_id)
402 })
403 .chain(new_fragment_info.values()),
404 control_stream_manager,
405 );
406 edge_builder.add_relations(&info.upstream_fragment_downstreams);
407 edge_builder.add_relations(&info.downstreams);
408 let mut edges = edge_builder.build();
409 let new_actors_to_create = edges.collect_actors_to_create(
410 new_fragment_info.values().map(|fragment| {
411 (
412 fragment.fragment_id,
413 &fragment.nodes,
414 fragment.actors.iter().map(|(actor_id, actor)| {
415 (&new_stream_actors[actor_id], actor.worker_id)
416 }),
417 [], )
419 }),
420 );
421 dispatcher_update.extend(
422 info.upstream_fragment_downstreams.keys().flat_map(
423 |upstream_fragment_id| {
424 let new_actor_dispatchers = edges
425 .dispatchers
426 .remove(upstream_fragment_id)
427 .expect("should exist");
428 new_actor_dispatchers.into_iter().flat_map(
429 |(upstream_actor_id, dispatchers)| {
430 dispatchers.into_iter().map(move |dispatcher| {
431 PbDispatcherUpdate {
432 actor_id: upstream_actor_id,
433 dispatcher_id: dispatcher.dispatcher_id,
434 hash_mapping: dispatcher.hash_mapping,
435 removed_downstream_actor_id: dispatcher
436 .downstream_actor_id
437 .iter()
438 .map(|new_downstream_actor_id| {
439 actor_mapping
440 .iter()
441 .find_map(
442 |(old_actor_id, new_actor_id)| {
443 (new_downstream_actor_id
444 == new_actor_id)
445 .then_some(*old_actor_id)
446 },
447 )
448 .expect("should exist")
449 })
450 .collect(),
451 added_downstream_actor_id: dispatcher
452 .downstream_actor_id,
453 }
454 })
455 },
456 )
457 },
458 ),
459 );
460 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
461 for (worker_id, worker_actors) in new_actors_to_create {
462 node_actors.entry(worker_id).or_default().extend(
463 worker_actors.values().flat_map(|(_, actors, _)| {
464 actors.iter().map(|(actor, _, _)| actor.actor_id)
465 }),
466 );
467 actors_to_create
468 .entry(worker_id)
469 .or_default()
470 .extend(worker_actors);
471 }
472 self.database_info.add_existing(InflightStreamingJobInfo {
473 job_id,
474 fragment_infos: new_fragment_info,
475 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
477 cdc_table_backfill_tracker: None, });
479 }
480
481 Some(PbMutation::Update(PbUpdateMutation {
482 dispatcher_update,
483 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
487 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
491 }))
492 } else {
493 let fragment_ids = self.database_info.take_pending_backfill_nodes();
494 if fragment_ids.is_empty() {
495 None
496 } else {
497 Some(PbMutation::StartFragmentBackfill(
498 PbStartFragmentBackfillMutation { fragment_ids },
499 ))
500 }
501 }
502 }
503 };
504
505 #[expect(clippy::collapsible_if)]
506 if let Some(Command::DropSubscription {
507 subscription_id,
508 upstream_mv_table_id,
509 }) = command
510 {
511 if self
512 .database_info
513 .unregister_subscriber(
514 upstream_mv_table_id.as_job_id(),
515 subscription_id.as_subscriber_id(),
516 )
517 .is_none()
518 {
519 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
520 }
521 }
522
523 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
524 if !finished_snapshot_backfill_jobs.contains(job_id) {
525 creating_job.on_new_upstream_barrier(control_stream_manager, barrier_info)?;
526 }
527 }
528
529 let node_to_collect = control_stream_manager.inject_barrier(
530 self.database_id,
531 None,
532 mutation,
533 barrier_info,
534 &node_actors,
535 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
536 actors_to_create,
537 )?;
538
539 Ok(ApplyCommandInfo {
540 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
541 table_ids_to_commit,
542 jobs_to_wait: finished_snapshot_backfill_jobs,
543 node_to_collect,
544 command,
545 })
546 }
547}