1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46 BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55 database_id: DatabaseId,
56 pub(super) job_id: TableId,
57 definition: String,
58 create_type: CreateType,
59 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60 backfill_epoch: u64,
61
62 graph_info: InflightStreamingJobInfo,
63
64 barrier_control: CreatingStreamingJobBarrierControl,
65 status: CreatingStreamingJobStatus,
66
67 upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71 pub(super) fn new(
72 info: &CreateStreamingJobCommandInfo,
73 snapshot_backfill_upstream_tables: HashSet<TableId>,
74 backfill_epoch: u64,
75 version_stat: &HummockVersionStats,
76 control_stream_manager: &mut ControlStreamManager,
77 edges: &mut FragmentEdgeBuildResult,
78 ) -> MetaResult<Self> {
79 let job_id = info.stream_job_fragments.stream_job_id();
80 let database_id = DatabaseId::new(info.streaming_job.database_id());
81 debug!(
82 %job_id,
83 definition = info.definition,
84 "new creating job"
85 );
86 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
87 let backfill_nodes_to_pause =
88 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
89 .into_iter()
90 .collect();
91 let backfill_order_state = BackfillOrderState::new(
92 info.fragment_backfill_ordering.clone(),
93 &info.stream_job_fragments,
94 );
95 let create_mview_tracker = CreateMviewProgressTracker::recover(
96 [(
97 job_id,
98 (
99 info.definition.clone(),
100 &*info.stream_job_fragments,
101 backfill_order_state,
102 ),
103 )],
104 version_stat,
105 );
106 let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
107
108 let actors_to_create =
109 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
110
111 let graph_info = InflightStreamingJobInfo {
112 job_id,
113 fragment_infos,
114 };
115
116 let mut barrier_control =
117 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
118
119 let mut prev_epoch_fake_physical_time = 0;
120 let mut pending_non_checkpoint_barriers = vec![];
121
122 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
123 &mut prev_epoch_fake_physical_time,
124 &mut pending_non_checkpoint_barriers,
125 PbBarrierKind::Checkpoint,
126 );
127
128 let added_actors = info.stream_job_fragments.actor_ids();
129 let actor_splits = info
130 .init_split_assignment
131 .values()
132 .flat_map(build_actor_connector_splits)
133 .collect();
134
135 let initial_mutation = Mutation::Add(AddMutation {
136 actor_dispatchers: Default::default(),
138 added_actors,
139 actor_splits,
140 pause: false,
142 subscriptions_to_add: Default::default(),
143 backfill_nodes_to_pause,
144 actor_cdc_table_snapshot_splits:
145 build_pb_actor_cdc_table_snapshot_splits_with_generation(
146 info.cdc_table_snapshot_split_assignment.clone(),
147 )
148 .into(),
149 new_upstream_sinks: Default::default(),
150 });
151
152 control_stream_manager.add_partial_graph(database_id, Some(job_id));
153 Self::inject_barrier(
154 database_id,
155 job_id,
156 control_stream_manager,
157 &mut barrier_control,
158 &graph_info,
159 Some(&graph_info),
160 initial_barrier_info,
161 Some(actors_to_create),
162 Some(initial_mutation),
163 )?;
164
165 assert!(pending_non_checkpoint_barriers.is_empty());
166
167 Ok(Self {
168 database_id,
169 definition: info.definition.clone(),
170 create_type: info.create_type.into(),
171 job_id,
172 snapshot_backfill_upstream_tables,
173 barrier_control,
174 backfill_epoch,
175 graph_info,
176 status: CreatingStreamingJobStatus::ConsumingSnapshot {
177 prev_epoch_fake_physical_time,
178 pending_upstream_barriers: vec![],
179 version_stats: version_stat.clone(),
180 create_mview_tracker,
181 snapshot_backfill_actors,
182 backfill_epoch,
183 pending_non_checkpoint_barriers,
184 },
185 upstream_lag: GLOBAL_META_METRICS
186 .snapshot_backfill_lag
187 .with_guarded_label_values(&[&format!("{}", job_id)]),
188 })
189 }
190
191 fn resolve_upstream_log_epochs(
192 snapshot_backfill_upstream_tables: &HashSet<TableId>,
193 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
194 committed_epoch: u64,
195 upstream_curr_epoch: u64,
196 ) -> MetaResult<Vec<BarrierInfo>> {
197 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
199 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
200 loop {
201 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
202 if *checkpoint_epoch < committed_epoch {
203 continue;
204 }
205 assert_eq!(*checkpoint_epoch, committed_epoch);
206 break;
207 }
208 let mut ret = vec![];
209 let mut prev_epoch = committed_epoch;
210 let mut pending_non_checkpoint_barriers = vec![];
211 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
212 for (i, epoch) in non_checkpoint_epochs
213 .iter()
214 .chain([checkpoint_epoch])
215 .enumerate()
216 {
217 assert!(*epoch > prev_epoch);
218 pending_non_checkpoint_barriers.push(prev_epoch);
219 ret.push(BarrierInfo {
220 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
221 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
222 kind: if i == 0 {
223 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
224 } else {
225 BarrierKind::Barrier
226 },
227 });
228 prev_epoch = *epoch;
229 }
230 }
231 ret.push(BarrierInfo {
232 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
233 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
234 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
235 });
236 Ok(ret)
237 }
238
239 fn recover_consuming_snapshot(
240 job_id: TableId,
241 definition: &String,
242 snapshot_backfill_upstream_tables: &HashSet<TableId>,
243 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
244 backfill_epoch: u64,
245 committed_epoch: u64,
246 upstream_curr_epoch: u64,
247 stream_job_fragments: StreamJobFragments,
248 version_stat: &HummockVersionStats,
249 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
250 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
251 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
252 let mut pending_non_checkpoint_barriers = vec![];
253 let create_mview_tracker = CreateMviewProgressTracker::recover(
254 [(
255 job_id,
256 (
257 definition.clone(),
258 &stream_job_fragments,
259 Default::default(),
260 ),
261 )],
262 version_stat,
263 );
264 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
265 &mut prev_epoch_fake_physical_time,
266 &mut pending_non_checkpoint_barriers,
267 PbBarrierKind::Initial,
268 );
269 Ok((
270 CreatingStreamingJobStatus::ConsumingSnapshot {
271 prev_epoch_fake_physical_time,
272 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
273 snapshot_backfill_upstream_tables,
274 upstream_table_log_epochs,
275 backfill_epoch,
276 upstream_curr_epoch,
277 )?,
278 version_stats: version_stat.clone(),
279 create_mview_tracker,
280 snapshot_backfill_actors,
281 backfill_epoch,
282 pending_non_checkpoint_barriers,
283 },
284 barrier_info,
285 ))
286 }
287
288 fn recover_consuming_log_store(
289 snapshot_backfill_upstream_tables: &HashSet<TableId>,
290 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
291 committed_epoch: u64,
292 upstream_curr_epoch: u64,
293 stream_job_fragments: StreamJobFragments,
294 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
295 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
296 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
297 snapshot_backfill_upstream_tables,
298 upstream_table_log_epochs,
299 committed_epoch,
300 upstream_curr_epoch,
301 )?;
302 let mut first_barrier = barriers_to_inject.remove(0);
303 assert!(first_barrier.kind.is_checkpoint());
304 first_barrier.kind = BarrierKind::Initial;
305 Ok((
306 CreatingStreamingJobStatus::ConsumingLogStore {
307 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
308 snapshot_backfill_actors.into_iter(),
309 barriers_to_inject
310 .last()
311 .map(|info| info.prev_epoch() - committed_epoch)
312 .unwrap_or(0),
313 ),
314 barriers_to_inject: Some(barriers_to_inject),
315 },
316 first_barrier,
317 ))
318 }
319
320 #[expect(clippy::too_many_arguments)]
321 pub(crate) fn recover(
322 database_id: DatabaseId,
323 job_id: TableId,
324 definition: String,
325 snapshot_backfill_upstream_tables: HashSet<TableId>,
326 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
327 backfill_epoch: u64,
328 committed_epoch: u64,
329 upstream_curr_epoch: u64,
330 graph_info: InflightStreamingJobInfo,
331 stream_job_fragments: StreamJobFragments,
332 version_stat: &HummockVersionStats,
333 new_actors: StreamJobActorsToCreate,
334 initial_mutation: Mutation,
335 control_stream_manager: &mut ControlStreamManager,
336 ) -> MetaResult<Self> {
337 debug!(
338 %job_id,
339 definition,
340 "recovered creating job"
341 );
342 let mut barrier_control =
343 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
344
345 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
346 Self::recover_consuming_snapshot(
347 job_id,
348 &definition,
349 &snapshot_backfill_upstream_tables,
350 upstream_table_log_epochs,
351 backfill_epoch,
352 committed_epoch,
353 upstream_curr_epoch,
354 stream_job_fragments,
355 version_stat,
356 )?
357 } else {
358 Self::recover_consuming_log_store(
359 &snapshot_backfill_upstream_tables,
360 upstream_table_log_epochs,
361 committed_epoch,
362 upstream_curr_epoch,
363 stream_job_fragments,
364 )?
365 };
366 control_stream_manager.add_partial_graph(database_id, Some(job_id));
367 Self::inject_barrier(
368 database_id,
369 job_id,
370 control_stream_manager,
371 &mut barrier_control,
372 &graph_info,
373 Some(&graph_info),
374 first_barrier_info,
375 Some(new_actors),
376 Some(initial_mutation),
377 )?;
378 Ok(Self {
379 database_id,
380 job_id,
381 definition,
382 create_type: CreateType::Background,
383 snapshot_backfill_upstream_tables,
384 backfill_epoch,
385 graph_info,
386 barrier_control,
387 status,
388 upstream_lag: GLOBAL_META_METRICS
389 .snapshot_backfill_lag
390 .with_guarded_label_values(&[&format!("{}", job_id)]),
391 })
392 }
393
394 pub(crate) fn is_empty(&self) -> bool {
395 self.barrier_control.is_empty()
396 }
397
398 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
399 self.barrier_control.is_valid_after_worker_err(worker_id)
400 && (!self.status.is_finishing()
401 || InflightFragmentInfo::contains_worker(
402 self.graph_info.fragment_infos(),
403 worker_id,
404 ))
405 }
406
407 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
408 let progress = match &self.status {
409 CreatingStreamingJobStatus::ConsumingSnapshot {
410 create_mview_tracker,
411 ..
412 } => {
413 if create_mview_tracker.has_pending_finished_jobs() {
414 "Snapshot finished".to_owned()
415 } else {
416 let progress = create_mview_tracker
417 .gen_ddl_progress()
418 .remove(&self.job_id.table_id)
419 .expect("should exist");
420 format!("Snapshot [{}]", progress.progress)
421 }
422 }
423 CreatingStreamingJobStatus::ConsumingLogStore {
424 log_store_progress_tracker,
425 ..
426 } => {
427 format!(
428 "LogStore [{}]",
429 log_store_progress_tracker.gen_ddl_progress()
430 )
431 }
432 CreatingStreamingJobStatus::Finishing(_) => {
433 format!(
434 "Finishing [epoch count: {}]",
435 self.barrier_control.inflight_barrier_count()
436 )
437 }
438 };
439 DdlProgress {
440 id: self.job_id.table_id as u64,
441 statement: self.definition.clone(),
442 create_type: self.create_type.as_str().to_owned(),
443 progress,
444 }
445 }
446
447 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
448 if self.status.is_finishing() {
449 None
450 } else {
451 Some(max(
453 self.barrier_control.max_collected_epoch().unwrap_or(0),
454 self.backfill_epoch,
455 ))
456 }
457 }
458
459 fn inject_barrier(
460 database_id: DatabaseId,
461 table_id: TableId,
462 control_stream_manager: &mut ControlStreamManager,
463 barrier_control: &mut CreatingStreamingJobBarrierControl,
464 pre_applied_graph_info: &InflightStreamingJobInfo,
465 applied_graph_info: Option<&InflightStreamingJobInfo>,
466 barrier_info: BarrierInfo,
467 new_actors: Option<StreamJobActorsToCreate>,
468 mutation: Option<Mutation>,
469 ) -> MetaResult<()> {
470 let node_to_collect = control_stream_manager.inject_barrier(
471 database_id,
472 Some(table_id),
473 mutation,
474 &barrier_info,
475 pre_applied_graph_info.fragment_infos(),
476 applied_graph_info
477 .map(|graph_info| graph_info.fragment_infos())
478 .into_iter()
479 .flatten(),
480 new_actors,
481 vec![],
482 vec![],
483 )?;
484 barrier_control.enqueue_epoch(
485 barrier_info.prev_epoch(),
486 node_to_collect,
487 barrier_info.kind.clone(),
488 );
489 Ok(())
490 }
491
492 pub(super) fn on_new_command(
493 &mut self,
494 control_stream_manager: &mut ControlStreamManager,
495 command: Option<&Command>,
496 barrier_info: &BarrierInfo,
497 ) -> MetaResult<()> {
498 let table_id = self.job_id;
499 let start_consume_upstream =
500 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
501 jobs_to_merge.contains_key(&table_id)
502 } else {
503 false
504 };
505 if start_consume_upstream {
506 info!(
507 table_id = self.job_id.table_id,
508 prev_epoch = barrier_info.prev_epoch(),
509 "start consuming upstream"
510 );
511 }
512 let progress_epoch =
513 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
514 max(max_collected_epoch, self.backfill_epoch)
515 } else {
516 self.backfill_epoch
517 };
518 self.upstream_lag.set(
519 barrier_info
520 .prev_epoch
521 .value()
522 .0
523 .saturating_sub(progress_epoch) as _,
524 );
525 if start_consume_upstream {
526 self.status.start_consume_upstream(barrier_info);
527 Self::inject_barrier(
528 self.database_id,
529 self.job_id,
530 control_stream_manager,
531 &mut self.barrier_control,
532 &self.graph_info,
533 None,
534 barrier_info.clone(),
535 None,
536 None,
537 )?;
538 } else {
539 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
540 Self::inject_barrier(
541 self.database_id,
542 self.job_id,
543 control_stream_manager,
544 &mut self.barrier_control,
545 &self.graph_info,
546 Some(&self.graph_info),
547 barrier_to_inject,
548 None,
549 mutation,
550 )?;
551 }
552 }
553 Ok(())
554 }
555
556 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
557 self.status.update_progress(&resp.create_mview_progress);
558 self.barrier_control.collect(resp);
559 self.should_merge_to_upstream().is_some()
560 }
561
562 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
563 if let CreatingStreamingJobStatus::ConsumingLogStore {
564 log_store_progress_tracker,
565 barriers_to_inject,
566 } = &self.status
567 && barriers_to_inject.is_none()
568 && log_store_progress_tracker.is_finished()
569 {
570 Some(&self.graph_info)
571 } else {
572 None
573 }
574 }
575}
576
577pub(super) enum CompleteJobType {
578 First,
580 Normal,
581 Finished,
583}
584
585impl CreatingStreamingJobControl {
586 pub(super) fn start_completing(
587 &mut self,
588 min_upstream_inflight_epoch: Option<u64>,
589 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
590 let (finished_at_epoch, epoch_end_bound) = match &self.status {
591 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
592 let epoch_end_bound = min_upstream_inflight_epoch
593 .map(|upstream_epoch| {
594 if upstream_epoch < *finish_at_epoch {
595 Excluded(upstream_epoch)
596 } else {
597 Unbounded
598 }
599 })
600 .unwrap_or(Unbounded);
601 (Some(*finish_at_epoch), epoch_end_bound)
602 }
603 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
604 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
605 None,
606 min_upstream_inflight_epoch
607 .map(Excluded)
608 .unwrap_or(Unbounded),
609 ),
610 };
611 self.barrier_control.start_completing(epoch_end_bound).map(
612 |(epoch, resps, is_first_commit)| {
613 let status = if let Some(finish_at_epoch) = finished_at_epoch {
614 assert!(!is_first_commit);
615 if epoch == finish_at_epoch {
616 self.barrier_control.ack_completed(epoch);
617 assert!(self.barrier_control.is_empty());
618 CompleteJobType::Finished
619 } else {
620 CompleteJobType::Normal
621 }
622 } else if is_first_commit {
623 CompleteJobType::First
624 } else {
625 CompleteJobType::Normal
626 };
627 (epoch, resps, status)
628 },
629 )
630 }
631
632 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
633 self.barrier_control.ack_completed(completed_epoch);
634 }
635
636 pub(super) fn is_finished(&self) -> bool {
637 self.barrier_control.is_empty() && self.status.is_finishing()
638 }
639
640 pub fn is_consuming(&self) -> bool {
641 match &self.status {
642 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
643 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
644 CreatingStreamingJobStatus::Finishing(_) => false,
645 }
646 }
647
648 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
649 self.graph_info.existing_table_ids()
650 }
651
652 pub fn graph_info(&self) -> &InflightStreamingJobInfo {
653 &self.graph_info
654 }
655}