1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46 BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::StreamJobActorsToCreate;
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55 database_id: DatabaseId,
56 pub(super) job_id: TableId,
57 definition: String,
58 create_type: CreateType,
59 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60 backfill_epoch: u64,
61
62 job_info: InflightStreamingJobInfo,
63
64 barrier_control: CreatingStreamingJobBarrierControl,
65 status: CreatingStreamingJobStatus,
66
67 upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71 pub(super) fn new(
72 info: &CreateStreamingJobCommandInfo,
73 snapshot_backfill_upstream_tables: HashSet<TableId>,
74 backfill_epoch: u64,
75 version_stat: &HummockVersionStats,
76 control_stream_manager: &mut ControlStreamManager,
77 edges: &mut FragmentEdgeBuildResult,
78 ) -> MetaResult<Self> {
79 let job_id = info.stream_job_fragments.stream_job_id();
80 let database_id = DatabaseId::new(info.streaming_job.database_id());
81 debug!(
82 %job_id,
83 definition = info.definition,
84 "new creating job"
85 );
86 let job_info = InflightStreamingJobInfo {
87 job_id,
88 fragment_infos: info
89 .stream_job_fragments
90 .new_fragment_info(&info.init_split_assignment)
91 .collect(),
92 subscribers: Default::default(), };
94 let snapshot_backfill_actors = job_info.snapshot_backfill_actor_ids().collect();
95 let backfill_nodes_to_pause =
96 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
97 .into_iter()
98 .collect();
99 let backfill_order_state = BackfillOrderState::new(
100 info.fragment_backfill_ordering.clone(),
101 &info.stream_job_fragments,
102 info.locality_fragment_state_table_mapping.clone(),
103 );
104 let create_mview_tracker = CreateMviewProgressTracker::recover(
105 [(
106 job_id,
107 (info.definition.clone(), &job_info, backfill_order_state),
108 )],
109 version_stat,
110 );
111
112 let actors_to_create =
113 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
114 |(fragment_id, node, actors)| {
115 (
116 fragment_id,
117 node,
118 actors,
119 [], )
121 },
122 ));
123
124 let mut barrier_control =
125 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
126
127 let mut prev_epoch_fake_physical_time = 0;
128 let mut pending_non_checkpoint_barriers = vec![];
129
130 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
131 &mut prev_epoch_fake_physical_time,
132 &mut pending_non_checkpoint_barriers,
133 PbBarrierKind::Checkpoint,
134 );
135
136 let added_actors = info.stream_job_fragments.actor_ids();
137 let actor_splits = info
138 .init_split_assignment
139 .values()
140 .flat_map(build_actor_connector_splits)
141 .collect();
142
143 let initial_mutation = Mutation::Add(AddMutation {
144 actor_dispatchers: Default::default(),
146 added_actors,
147 actor_splits,
148 pause: false,
150 subscriptions_to_add: Default::default(),
151 backfill_nodes_to_pause,
152 actor_cdc_table_snapshot_splits:
153 build_pb_actor_cdc_table_snapshot_splits_with_generation(
154 info.cdc_table_snapshot_split_assignment.clone(),
155 )
156 .into(),
157 new_upstream_sinks: Default::default(),
158 });
159
160 control_stream_manager.add_partial_graph(database_id, Some(job_id));
161 Self::inject_barrier(
162 database_id,
163 job_id,
164 control_stream_manager,
165 &mut barrier_control,
166 &job_info,
167 Some(&job_info),
168 initial_barrier_info,
169 Some(actors_to_create),
170 Some(initial_mutation),
171 )?;
172
173 assert!(pending_non_checkpoint_barriers.is_empty());
174
175 Ok(Self {
176 database_id,
177 definition: info.definition.clone(),
178 create_type: info.create_type.into(),
179 job_id,
180 snapshot_backfill_upstream_tables,
181 barrier_control,
182 backfill_epoch,
183 job_info,
184 status: CreatingStreamingJobStatus::ConsumingSnapshot {
185 prev_epoch_fake_physical_time,
186 pending_upstream_barriers: vec![],
187 version_stats: version_stat.clone(),
188 create_mview_tracker,
189 snapshot_backfill_actors,
190 backfill_epoch,
191 pending_non_checkpoint_barriers,
192 },
193 upstream_lag: GLOBAL_META_METRICS
194 .snapshot_backfill_lag
195 .with_guarded_label_values(&[&format!("{}", job_id)]),
196 })
197 }
198
199 fn resolve_upstream_log_epochs(
200 snapshot_backfill_upstream_tables: &HashSet<TableId>,
201 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
202 committed_epoch: u64,
203 upstream_curr_epoch: u64,
204 ) -> MetaResult<Vec<BarrierInfo>> {
205 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
207 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
208 loop {
209 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
210 if *checkpoint_epoch < committed_epoch {
211 continue;
212 }
213 assert_eq!(*checkpoint_epoch, committed_epoch);
214 break;
215 }
216 let mut ret = vec![];
217 let mut prev_epoch = committed_epoch;
218 let mut pending_non_checkpoint_barriers = vec![];
219 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
220 for (i, epoch) in non_checkpoint_epochs
221 .iter()
222 .chain([checkpoint_epoch])
223 .enumerate()
224 {
225 assert!(*epoch > prev_epoch);
226 pending_non_checkpoint_barriers.push(prev_epoch);
227 ret.push(BarrierInfo {
228 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
229 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
230 kind: if i == 0 {
231 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
232 } else {
233 BarrierKind::Barrier
234 },
235 });
236 prev_epoch = *epoch;
237 }
238 }
239 ret.push(BarrierInfo {
240 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
241 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
242 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
243 });
244 Ok(ret)
245 }
246
247 fn recover_consuming_snapshot(
248 job_id: TableId,
249 definition: &String,
250 snapshot_backfill_upstream_tables: &HashSet<TableId>,
251 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
252 backfill_epoch: u64,
253 committed_epoch: u64,
254 upstream_curr_epoch: u64,
255 job_info: &InflightStreamingJobInfo,
256 version_stat: &HummockVersionStats,
257 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
258 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
259 let mut pending_non_checkpoint_barriers = vec![];
260 let create_mview_tracker = CreateMviewProgressTracker::recover(
261 [(job_id, (definition.clone(), job_info, Default::default()))],
262 version_stat,
263 );
264 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
265 &mut prev_epoch_fake_physical_time,
266 &mut pending_non_checkpoint_barriers,
267 PbBarrierKind::Initial,
268 );
269 Ok((
270 CreatingStreamingJobStatus::ConsumingSnapshot {
271 prev_epoch_fake_physical_time,
272 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
273 snapshot_backfill_upstream_tables,
274 upstream_table_log_epochs,
275 backfill_epoch,
276 upstream_curr_epoch,
277 )?,
278 version_stats: version_stat.clone(),
279 create_mview_tracker,
280 snapshot_backfill_actors: job_info.snapshot_backfill_actor_ids().collect(),
281 backfill_epoch,
282 pending_non_checkpoint_barriers,
283 },
284 barrier_info,
285 ))
286 }
287
288 fn recover_consuming_log_store(
289 snapshot_backfill_upstream_tables: &HashSet<TableId>,
290 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
291 committed_epoch: u64,
292 upstream_curr_epoch: u64,
293 job_info: &InflightStreamingJobInfo,
294 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
295 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
296 snapshot_backfill_upstream_tables,
297 upstream_table_log_epochs,
298 committed_epoch,
299 upstream_curr_epoch,
300 )?;
301 let mut first_barrier = barriers_to_inject.remove(0);
302 assert!(first_barrier.kind.is_checkpoint());
303 first_barrier.kind = BarrierKind::Initial;
304 Ok((
305 CreatingStreamingJobStatus::ConsumingLogStore {
306 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
307 job_info.snapshot_backfill_actor_ids(),
308 barriers_to_inject
309 .last()
310 .map(|info| info.prev_epoch() - committed_epoch)
311 .unwrap_or(0),
312 ),
313 barriers_to_inject: Some(barriers_to_inject),
314 },
315 first_barrier,
316 ))
317 }
318
319 #[expect(clippy::too_many_arguments)]
320 pub(crate) fn recover(
321 database_id: DatabaseId,
322 job_id: TableId,
323 definition: String,
324 snapshot_backfill_upstream_tables: HashSet<TableId>,
325 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
326 backfill_epoch: u64,
327 committed_epoch: u64,
328 upstream_curr_epoch: u64,
329 graph_info: InflightStreamingJobInfo,
330 version_stat: &HummockVersionStats,
331 new_actors: StreamJobActorsToCreate,
332 initial_mutation: Mutation,
333 control_stream_manager: &mut ControlStreamManager,
334 ) -> MetaResult<Self> {
335 debug!(
336 %job_id,
337 definition,
338 "recovered creating job"
339 );
340 let mut barrier_control =
341 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
342
343 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
344 Self::recover_consuming_snapshot(
345 job_id,
346 &definition,
347 &snapshot_backfill_upstream_tables,
348 upstream_table_log_epochs,
349 backfill_epoch,
350 committed_epoch,
351 upstream_curr_epoch,
352 &graph_info,
353 version_stat,
354 )?
355 } else {
356 Self::recover_consuming_log_store(
357 &snapshot_backfill_upstream_tables,
358 upstream_table_log_epochs,
359 committed_epoch,
360 upstream_curr_epoch,
361 &graph_info,
362 )?
363 };
364 control_stream_manager.add_partial_graph(database_id, Some(job_id));
365 Self::inject_barrier(
366 database_id,
367 job_id,
368 control_stream_manager,
369 &mut barrier_control,
370 &graph_info,
371 Some(&graph_info),
372 first_barrier_info,
373 Some(new_actors),
374 Some(initial_mutation),
375 )?;
376 Ok(Self {
377 database_id,
378 job_id,
379 definition,
380 create_type: CreateType::Background,
381 snapshot_backfill_upstream_tables,
382 backfill_epoch,
383 job_info: graph_info,
384 barrier_control,
385 status,
386 upstream_lag: GLOBAL_META_METRICS
387 .snapshot_backfill_lag
388 .with_guarded_label_values(&[&format!("{}", job_id)]),
389 })
390 }
391
392 pub(crate) fn is_empty(&self) -> bool {
393 self.barrier_control.is_empty()
394 }
395
396 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
397 self.barrier_control.is_valid_after_worker_err(worker_id)
398 && (!self.status.is_finishing()
399 || InflightFragmentInfo::contains_worker(self.job_info.fragment_infos(), worker_id))
400 }
401
402 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
403 let progress = match &self.status {
404 CreatingStreamingJobStatus::ConsumingSnapshot {
405 create_mview_tracker,
406 ..
407 } => {
408 if create_mview_tracker.has_pending_finished_jobs() {
409 "Snapshot finished".to_owned()
410 } else {
411 let progress = create_mview_tracker
412 .gen_ddl_progress()
413 .remove(&self.job_id.table_id)
414 .expect("should exist");
415 format!("Snapshot [{}]", progress.progress)
416 }
417 }
418 CreatingStreamingJobStatus::ConsumingLogStore {
419 log_store_progress_tracker,
420 ..
421 } => {
422 format!(
423 "LogStore [{}]",
424 log_store_progress_tracker.gen_ddl_progress()
425 )
426 }
427 CreatingStreamingJobStatus::Finishing(_) => {
428 format!(
429 "Finishing [epoch count: {}]",
430 self.barrier_control.inflight_barrier_count()
431 )
432 }
433 };
434 DdlProgress {
435 id: self.job_id.table_id as u64,
436 statement: self.definition.clone(),
437 create_type: self.create_type.as_str().to_owned(),
438 progress,
439 }
440 }
441
442 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
443 if self.status.is_finishing() {
444 None
445 } else {
446 Some(max(
448 self.barrier_control.max_collected_epoch().unwrap_or(0),
449 self.backfill_epoch,
450 ))
451 }
452 }
453
454 fn inject_barrier(
455 database_id: DatabaseId,
456 table_id: TableId,
457 control_stream_manager: &mut ControlStreamManager,
458 barrier_control: &mut CreatingStreamingJobBarrierControl,
459 pre_applied_graph_info: &InflightStreamingJobInfo,
460 applied_graph_info: Option<&InflightStreamingJobInfo>,
461 barrier_info: BarrierInfo,
462 new_actors: Option<StreamJobActorsToCreate>,
463 mutation: Option<Mutation>,
464 ) -> MetaResult<()> {
465 let node_to_collect = control_stream_manager.inject_barrier(
466 database_id,
467 Some(table_id),
468 mutation,
469 &barrier_info,
470 pre_applied_graph_info.fragment_infos(),
471 applied_graph_info
472 .map(|graph_info| graph_info.fragment_infos())
473 .into_iter()
474 .flatten(),
475 new_actors,
476 )?;
477 barrier_control.enqueue_epoch(
478 barrier_info.prev_epoch(),
479 node_to_collect,
480 barrier_info.kind.clone(),
481 );
482 Ok(())
483 }
484
485 pub(super) fn on_new_command(
486 &mut self,
487 control_stream_manager: &mut ControlStreamManager,
488 command: Option<&Command>,
489 barrier_info: &BarrierInfo,
490 ) -> MetaResult<()> {
491 let table_id = self.job_id;
492 let start_consume_upstream =
493 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
494 jobs_to_merge.contains_key(&table_id)
495 } else {
496 false
497 };
498 if start_consume_upstream {
499 info!(
500 table_id = self.job_id.table_id,
501 prev_epoch = barrier_info.prev_epoch(),
502 "start consuming upstream"
503 );
504 }
505 let progress_epoch =
506 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
507 max(max_collected_epoch, self.backfill_epoch)
508 } else {
509 self.backfill_epoch
510 };
511 self.upstream_lag.set(
512 barrier_info
513 .prev_epoch
514 .value()
515 .0
516 .saturating_sub(progress_epoch) as _,
517 );
518 if start_consume_upstream {
519 self.status.start_consume_upstream(barrier_info);
520 Self::inject_barrier(
521 self.database_id,
522 self.job_id,
523 control_stream_manager,
524 &mut self.barrier_control,
525 &self.job_info,
526 None,
527 barrier_info.clone(),
528 None,
529 None,
530 )?;
531 } else {
532 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
533 Self::inject_barrier(
534 self.database_id,
535 self.job_id,
536 control_stream_manager,
537 &mut self.barrier_control,
538 &self.job_info,
539 Some(&self.job_info),
540 barrier_to_inject,
541 None,
542 mutation,
543 )?;
544 }
545 }
546 Ok(())
547 }
548
549 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
550 self.status.update_progress(&resp.create_mview_progress);
551 self.barrier_control.collect(resp);
552 self.should_merge_to_upstream().is_some()
553 }
554
555 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
556 if let CreatingStreamingJobStatus::ConsumingLogStore {
557 log_store_progress_tracker,
558 barriers_to_inject,
559 } = &self.status
560 && barriers_to_inject.is_none()
561 && log_store_progress_tracker.is_finished()
562 {
563 Some(&self.job_info)
564 } else {
565 None
566 }
567 }
568}
569
570pub(super) enum CompleteJobType {
571 First,
573 Normal,
574 Finished,
576}
577
578impl CreatingStreamingJobControl {
579 pub(super) fn start_completing(
580 &mut self,
581 min_upstream_inflight_epoch: Option<u64>,
582 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
583 let (finished_at_epoch, epoch_end_bound) = match &self.status {
584 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
585 let epoch_end_bound = min_upstream_inflight_epoch
586 .map(|upstream_epoch| {
587 if upstream_epoch < *finish_at_epoch {
588 Excluded(upstream_epoch)
589 } else {
590 Unbounded
591 }
592 })
593 .unwrap_or(Unbounded);
594 (Some(*finish_at_epoch), epoch_end_bound)
595 }
596 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
597 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
598 None,
599 min_upstream_inflight_epoch
600 .map(Excluded)
601 .unwrap_or(Unbounded),
602 ),
603 };
604 self.barrier_control.start_completing(epoch_end_bound).map(
605 |(epoch, resps, is_first_commit)| {
606 let status = if let Some(finish_at_epoch) = finished_at_epoch {
607 assert!(!is_first_commit);
608 if epoch == finish_at_epoch {
609 self.barrier_control.ack_completed(epoch);
610 assert!(self.barrier_control.is_empty());
611 CompleteJobType::Finished
612 } else {
613 CompleteJobType::Normal
614 }
615 } else if is_first_commit {
616 CompleteJobType::First
617 } else {
618 CompleteJobType::Normal
619 };
620 (epoch, resps, status)
621 },
622 )
623 }
624
625 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
626 self.barrier_control.ack_completed(completed_epoch);
627 }
628
629 pub(super) fn is_finished(&self) -> bool {
630 self.barrier_control.is_empty() && self.status.is_finishing()
631 }
632
633 pub fn is_consuming(&self) -> bool {
634 match &self.status {
635 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
636 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
637 CreatingStreamingJobStatus::Finishing(_) => false,
638 }
639 }
640
641 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
642 self.job_info.existing_table_ids()
643 }
644
645 pub fn graph_info(&self) -> &InflightStreamingJobInfo {
646 &self.job_info
647 }
648}