1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_plan::AddMutation;
31use risingwave_pb::stream_plan::barrier::PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use status::CreatingStreamingJobStatus;
35use tracing::{debug, info};
36
37use crate::MetaResult;
38use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
39use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
40use crate::barrier::edge_builder::FragmentEdgeBuildResult;
41use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
42use crate::barrier::progress::CreateMviewProgressTracker;
43use crate::barrier::rpc::ControlStreamManager;
44use crate::barrier::{
45 BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
46};
47use crate::controller::fragment::InflightFragmentInfo;
48use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
49use crate::rpc::metrics::GLOBAL_META_METRICS;
50use crate::stream::build_actor_connector_splits;
51
52#[derive(Debug)]
53pub(crate) struct CreatingStreamingJobControl {
54 database_id: DatabaseId,
55 pub(super) job_id: TableId,
56 definition: String,
57 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
58 backfill_epoch: u64,
59
60 graph_info: InflightStreamingJobInfo,
61
62 barrier_control: CreatingStreamingJobBarrierControl,
63 status: CreatingStreamingJobStatus,
64
65 upstream_lag: LabelGuardedIntGauge,
66}
67
68impl CreatingStreamingJobControl {
69 pub(super) fn new(
70 info: &CreateStreamingJobCommandInfo,
71 snapshot_backfill_upstream_tables: HashSet<TableId>,
72 backfill_epoch: u64,
73 version_stat: &HummockVersionStats,
74 control_stream_manager: &mut ControlStreamManager,
75 edges: &mut FragmentEdgeBuildResult,
76 ) -> MetaResult<Self> {
77 let job_id = info.stream_job_fragments.stream_job_id();
78 let database_id = DatabaseId::new(info.streaming_job.database_id());
79 debug!(
80 %job_id,
81 definition = info.definition,
82 "new creating job"
83 );
84 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
85 let backfill_nodes_to_pause =
86 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
87 .into_iter()
88 .collect();
89 let backfill_order_state = BackfillOrderState::new(
90 info.fragment_backfill_ordering.clone(),
91 &info.stream_job_fragments,
92 );
93 let create_mview_tracker = CreateMviewProgressTracker::recover(
94 [(
95 job_id,
96 (
97 info.definition.clone(),
98 &*info.stream_job_fragments,
99 backfill_order_state,
100 ),
101 )],
102 version_stat,
103 );
104 let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
105
106 let actors_to_create =
107 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
108
109 let graph_info = InflightStreamingJobInfo {
110 job_id,
111 fragment_infos,
112 };
113
114 let mut barrier_control =
115 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
116
117 let mut prev_epoch_fake_physical_time = 0;
118 let mut pending_non_checkpoint_barriers = vec![];
119
120 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
121 &mut prev_epoch_fake_physical_time,
122 &mut pending_non_checkpoint_barriers,
123 PbBarrierKind::Checkpoint,
124 );
125
126 let added_actors = info.stream_job_fragments.actor_ids();
127 let actor_splits = info
128 .init_split_assignment
129 .values()
130 .flat_map(build_actor_connector_splits)
131 .collect();
132
133 let initial_mutation = Mutation::Add(AddMutation {
134 actor_dispatchers: Default::default(),
136 added_actors,
137 actor_splits,
138 pause: false,
140 subscriptions_to_add: Default::default(),
141 backfill_nodes_to_pause,
142 });
143
144 control_stream_manager.add_partial_graph(database_id, Some(job_id));
145 Self::inject_barrier(
146 database_id,
147 job_id,
148 control_stream_manager,
149 &mut barrier_control,
150 &graph_info,
151 Some(&graph_info),
152 initial_barrier_info,
153 Some(actors_to_create),
154 Some(initial_mutation),
155 )?;
156
157 assert!(pending_non_checkpoint_barriers.is_empty());
158
159 Ok(Self {
160 database_id,
161 definition: info.definition.clone(),
162 job_id,
163 snapshot_backfill_upstream_tables,
164 barrier_control,
165 backfill_epoch,
166 graph_info,
167 status: CreatingStreamingJobStatus::ConsumingSnapshot {
168 prev_epoch_fake_physical_time,
169 pending_upstream_barriers: vec![],
170 version_stats: version_stat.clone(),
171 create_mview_tracker,
172 snapshot_backfill_actors,
173 backfill_epoch,
174 pending_non_checkpoint_barriers,
175 },
176 upstream_lag: GLOBAL_META_METRICS
177 .snapshot_backfill_lag
178 .with_guarded_label_values(&[&format!("{}", job_id)]),
179 })
180 }
181
182 fn resolve_upstream_log_epochs(
183 snapshot_backfill_upstream_tables: &HashSet<TableId>,
184 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
185 committed_epoch: u64,
186 upstream_curr_epoch: u64,
187 ) -> MetaResult<Vec<BarrierInfo>> {
188 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
190 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
191 loop {
192 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
193 if *checkpoint_epoch < committed_epoch {
194 continue;
195 }
196 assert_eq!(*checkpoint_epoch, committed_epoch);
197 break;
198 }
199 let mut ret = vec![];
200 let mut prev_epoch = committed_epoch;
201 let mut pending_non_checkpoint_barriers = vec![];
202 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
203 for (i, epoch) in non_checkpoint_epochs
204 .iter()
205 .chain([checkpoint_epoch])
206 .enumerate()
207 {
208 assert!(*epoch > prev_epoch);
209 pending_non_checkpoint_barriers.push(prev_epoch);
210 ret.push(BarrierInfo {
211 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
212 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
213 kind: if i == 0 {
214 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
215 } else {
216 BarrierKind::Barrier
217 },
218 });
219 prev_epoch = *epoch;
220 }
221 }
222 ret.push(BarrierInfo {
223 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
224 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
225 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
226 });
227 Ok(ret)
228 }
229
230 fn recover_consuming_snapshot(
231 job_id: TableId,
232 definition: &String,
233 snapshot_backfill_upstream_tables: &HashSet<TableId>,
234 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
235 backfill_epoch: u64,
236 committed_epoch: u64,
237 upstream_curr_epoch: u64,
238 stream_job_fragments: StreamJobFragments,
239 version_stat: &HummockVersionStats,
240 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
241 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
242 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
243 let mut pending_non_checkpoint_barriers = vec![];
244 let create_mview_tracker = CreateMviewProgressTracker::recover(
245 [(
246 job_id,
247 (
248 definition.clone(),
249 &stream_job_fragments,
250 Default::default(),
251 ),
252 )],
253 version_stat,
254 );
255 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
256 &mut prev_epoch_fake_physical_time,
257 &mut pending_non_checkpoint_barriers,
258 PbBarrierKind::Initial,
259 );
260 Ok((
261 CreatingStreamingJobStatus::ConsumingSnapshot {
262 prev_epoch_fake_physical_time,
263 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
264 snapshot_backfill_upstream_tables,
265 upstream_table_log_epochs,
266 backfill_epoch,
267 upstream_curr_epoch,
268 )?,
269 version_stats: version_stat.clone(),
270 create_mview_tracker,
271 snapshot_backfill_actors,
272 backfill_epoch,
273 pending_non_checkpoint_barriers,
274 },
275 barrier_info,
276 ))
277 }
278
279 fn recover_consuming_log_store(
280 snapshot_backfill_upstream_tables: &HashSet<TableId>,
281 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
282 committed_epoch: u64,
283 upstream_curr_epoch: u64,
284 stream_job_fragments: StreamJobFragments,
285 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
286 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
287 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
288 snapshot_backfill_upstream_tables,
289 upstream_table_log_epochs,
290 committed_epoch,
291 upstream_curr_epoch,
292 )?;
293 let mut first_barrier = barriers_to_inject.remove(0);
294 assert!(first_barrier.kind.is_checkpoint());
295 first_barrier.kind = BarrierKind::Initial;
296 Ok((
297 CreatingStreamingJobStatus::ConsumingLogStore {
298 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
299 snapshot_backfill_actors.into_iter(),
300 barriers_to_inject
301 .last()
302 .map(|info| info.prev_epoch() - committed_epoch)
303 .unwrap_or(0),
304 ),
305 barriers_to_inject: Some(barriers_to_inject),
306 },
307 first_barrier,
308 ))
309 }
310
311 #[expect(clippy::too_many_arguments)]
312 pub(crate) fn recover(
313 database_id: DatabaseId,
314 job_id: TableId,
315 definition: String,
316 snapshot_backfill_upstream_tables: HashSet<TableId>,
317 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
318 backfill_epoch: u64,
319 committed_epoch: u64,
320 upstream_curr_epoch: u64,
321 graph_info: InflightStreamingJobInfo,
322 stream_job_fragments: StreamJobFragments,
323 version_stat: &HummockVersionStats,
324 new_actors: StreamJobActorsToCreate,
325 initial_mutation: Mutation,
326 control_stream_manager: &mut ControlStreamManager,
327 ) -> MetaResult<Self> {
328 debug!(
329 %job_id,
330 definition,
331 "recovered creating job"
332 );
333 let mut barrier_control =
334 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
335
336 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
337 Self::recover_consuming_snapshot(
338 job_id,
339 &definition,
340 &snapshot_backfill_upstream_tables,
341 upstream_table_log_epochs,
342 backfill_epoch,
343 committed_epoch,
344 upstream_curr_epoch,
345 stream_job_fragments,
346 version_stat,
347 )?
348 } else {
349 Self::recover_consuming_log_store(
350 &snapshot_backfill_upstream_tables,
351 upstream_table_log_epochs,
352 committed_epoch,
353 upstream_curr_epoch,
354 stream_job_fragments,
355 )?
356 };
357 control_stream_manager.add_partial_graph(database_id, Some(job_id));
358 Self::inject_barrier(
359 database_id,
360 job_id,
361 control_stream_manager,
362 &mut barrier_control,
363 &graph_info,
364 Some(&graph_info),
365 first_barrier_info,
366 Some(new_actors),
367 Some(initial_mutation),
368 )?;
369 Ok(Self {
370 database_id,
371 job_id,
372 definition,
373 snapshot_backfill_upstream_tables,
374 backfill_epoch,
375 graph_info,
376 barrier_control,
377 status,
378 upstream_lag: GLOBAL_META_METRICS
379 .snapshot_backfill_lag
380 .with_guarded_label_values(&[&format!("{}", job_id)]),
381 })
382 }
383
384 pub(crate) fn is_empty(&self) -> bool {
385 self.barrier_control.is_empty()
386 }
387
388 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
389 self.barrier_control.is_valid_after_worker_err(worker_id)
390 && (!self.status.is_finishing()
391 || InflightFragmentInfo::contains_worker(
392 self.graph_info.fragment_infos(),
393 worker_id,
394 ))
395 }
396
397 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
398 let progress = match &self.status {
399 CreatingStreamingJobStatus::ConsumingSnapshot {
400 create_mview_tracker,
401 ..
402 } => {
403 if create_mview_tracker.has_pending_finished_jobs() {
404 "Snapshot finished".to_owned()
405 } else {
406 let progress = create_mview_tracker
407 .gen_ddl_progress()
408 .remove(&self.job_id.table_id)
409 .expect("should exist");
410 format!("Snapshot [{}]", progress.progress)
411 }
412 }
413 CreatingStreamingJobStatus::ConsumingLogStore {
414 log_store_progress_tracker,
415 ..
416 } => {
417 format!(
418 "LogStore [{}]",
419 log_store_progress_tracker.gen_ddl_progress()
420 )
421 }
422 CreatingStreamingJobStatus::Finishing(_) => {
423 format!(
424 "Finishing [epoch count: {}]",
425 self.barrier_control.inflight_barrier_count()
426 )
427 }
428 };
429 DdlProgress {
430 id: self.job_id.table_id as u64,
431 statement: self.definition.clone(),
432 progress,
433 }
434 }
435
436 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
437 if self.status.is_finishing() {
438 None
439 } else {
440 Some(max(
442 self.barrier_control.max_collected_epoch().unwrap_or(0),
443 self.backfill_epoch,
444 ))
445 }
446 }
447
448 fn inject_barrier(
449 database_id: DatabaseId,
450 table_id: TableId,
451 control_stream_manager: &mut ControlStreamManager,
452 barrier_control: &mut CreatingStreamingJobBarrierControl,
453 pre_applied_graph_info: &InflightStreamingJobInfo,
454 applied_graph_info: Option<&InflightStreamingJobInfo>,
455 barrier_info: BarrierInfo,
456 new_actors: Option<StreamJobActorsToCreate>,
457 mutation: Option<Mutation>,
458 ) -> MetaResult<()> {
459 let node_to_collect = control_stream_manager.inject_barrier(
460 database_id,
461 Some(table_id),
462 mutation,
463 &barrier_info,
464 pre_applied_graph_info.fragment_infos(),
465 applied_graph_info
466 .map(|graph_info| graph_info.fragment_infos())
467 .into_iter()
468 .flatten(),
469 new_actors,
470 vec![],
471 vec![],
472 )?;
473 barrier_control.enqueue_epoch(
474 barrier_info.prev_epoch(),
475 node_to_collect,
476 barrier_info.kind.clone(),
477 );
478 Ok(())
479 }
480
481 pub(super) fn on_new_command(
482 &mut self,
483 control_stream_manager: &mut ControlStreamManager,
484 command: Option<&Command>,
485 barrier_info: &BarrierInfo,
486 ) -> MetaResult<()> {
487 let table_id = self.job_id;
488 let start_consume_upstream =
489 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
490 jobs_to_merge.contains_key(&table_id)
491 } else {
492 false
493 };
494 if start_consume_upstream {
495 info!(
496 table_id = self.job_id.table_id,
497 prev_epoch = barrier_info.prev_epoch(),
498 "start consuming upstream"
499 );
500 }
501 let progress_epoch =
502 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
503 max(max_collected_epoch, self.backfill_epoch)
504 } else {
505 self.backfill_epoch
506 };
507 self.upstream_lag.set(
508 barrier_info
509 .prev_epoch
510 .value()
511 .0
512 .saturating_sub(progress_epoch) as _,
513 );
514 if start_consume_upstream {
515 self.status.start_consume_upstream(barrier_info);
516 Self::inject_barrier(
517 self.database_id,
518 self.job_id,
519 control_stream_manager,
520 &mut self.barrier_control,
521 &self.graph_info,
522 None,
523 barrier_info.clone(),
524 None,
525 None,
526 )?;
527 } else {
528 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
529 Self::inject_barrier(
530 self.database_id,
531 self.job_id,
532 control_stream_manager,
533 &mut self.barrier_control,
534 &self.graph_info,
535 Some(&self.graph_info),
536 barrier_to_inject,
537 None,
538 mutation,
539 )?;
540 }
541 }
542 Ok(())
543 }
544
545 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
546 self.status.update_progress(&resp.create_mview_progress);
547 self.barrier_control.collect(resp);
548 self.should_merge_to_upstream().is_some()
549 }
550
551 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
552 if let CreatingStreamingJobStatus::ConsumingLogStore {
553 log_store_progress_tracker,
554 barriers_to_inject,
555 } = &self.status
556 && barriers_to_inject.is_none()
557 && log_store_progress_tracker.is_finished()
558 {
559 Some(&self.graph_info)
560 } else {
561 None
562 }
563 }
564}
565
566pub(super) enum CompleteJobType {
567 First,
569 Normal,
570 Finished,
572}
573
574impl CreatingStreamingJobControl {
575 pub(super) fn start_completing(
576 &mut self,
577 min_upstream_inflight_epoch: Option<u64>,
578 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
579 let (finished_at_epoch, epoch_end_bound) = match &self.status {
580 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
581 let epoch_end_bound = min_upstream_inflight_epoch
582 .map(|upstream_epoch| {
583 if upstream_epoch < *finish_at_epoch {
584 Excluded(upstream_epoch)
585 } else {
586 Unbounded
587 }
588 })
589 .unwrap_or(Unbounded);
590 (Some(*finish_at_epoch), epoch_end_bound)
591 }
592 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
593 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
594 None,
595 min_upstream_inflight_epoch
596 .map(Excluded)
597 .unwrap_or(Unbounded),
598 ),
599 };
600 self.barrier_control.start_completing(epoch_end_bound).map(
601 |(epoch, resps, is_first_commit)| {
602 let status = if let Some(finish_at_epoch) = finished_at_epoch {
603 assert!(!is_first_commit);
604 if epoch == finish_at_epoch {
605 self.barrier_control.ack_completed(epoch);
606 assert!(self.barrier_control.is_empty());
607 CompleteJobType::Finished
608 } else {
609 CompleteJobType::Normal
610 }
611 } else if is_first_commit {
612 CompleteJobType::First
613 } else {
614 CompleteJobType::Normal
615 };
616 (epoch, resps, status)
617 },
618 )
619 }
620
621 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
622 self.barrier_control.ack_completed(completed_epoch);
623 }
624
625 pub(super) fn is_finished(&self) -> bool {
626 self.barrier_control.is_empty() && self.status.is_finishing()
627 }
628
629 pub fn is_consuming(&self) -> bool {
630 match &self.status {
631 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
632 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
633 CreatingStreamingJobStatus::Finishing(_) => false,
634 }
635 }
636
637 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
638 self.graph_info.existing_table_ids()
639 }
640}