risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_plan::AddMutation;
31use risingwave_pb::stream_plan::barrier::PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use status::CreatingStreamingJobStatus;
35use tracing::{debug, info};
36
37use crate::MetaResult;
38use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
39use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
40use crate::barrier::edge_builder::FragmentEdgeBuildResult;
41use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
42use crate::barrier::progress::CreateMviewProgressTracker;
43use crate::barrier::rpc::ControlStreamManager;
44use crate::barrier::{
45    BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
46};
47use crate::controller::fragment::InflightFragmentInfo;
48use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
49use crate::rpc::metrics::GLOBAL_META_METRICS;
50use crate::stream::build_actor_connector_splits;
51
52#[derive(Debug)]
53pub(crate) struct CreatingStreamingJobControl {
54    database_id: DatabaseId,
55    pub(super) job_id: TableId,
56    definition: String,
57    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
58    backfill_epoch: u64,
59
60    graph_info: InflightStreamingJobInfo,
61
62    barrier_control: CreatingStreamingJobBarrierControl,
63    status: CreatingStreamingJobStatus,
64
65    upstream_lag: LabelGuardedIntGauge,
66}
67
68impl CreatingStreamingJobControl {
69    pub(super) fn new(
70        info: &CreateStreamingJobCommandInfo,
71        snapshot_backfill_upstream_tables: HashSet<TableId>,
72        backfill_epoch: u64,
73        version_stat: &HummockVersionStats,
74        control_stream_manager: &mut ControlStreamManager,
75        edges: &mut FragmentEdgeBuildResult,
76    ) -> MetaResult<Self> {
77        let job_id = info.stream_job_fragments.stream_job_id();
78        let database_id = DatabaseId::new(info.streaming_job.database_id());
79        debug!(
80            %job_id,
81            definition = info.definition,
82            "new creating job"
83        );
84        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
85        let backfill_nodes_to_pause =
86            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
87                .into_iter()
88                .collect();
89        let backfill_order_state = BackfillOrderState::new(
90            info.fragment_backfill_ordering.clone(),
91            &info.stream_job_fragments,
92        );
93        let create_mview_tracker = CreateMviewProgressTracker::recover(
94            [(
95                job_id,
96                (
97                    info.definition.clone(),
98                    &*info.stream_job_fragments,
99                    backfill_order_state,
100                ),
101            )],
102            version_stat,
103        );
104        let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
105
106        let actors_to_create =
107            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
108
109        let graph_info = InflightStreamingJobInfo {
110            job_id,
111            fragment_infos,
112        };
113
114        let mut barrier_control =
115            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
116
117        let mut prev_epoch_fake_physical_time = 0;
118        let mut pending_non_checkpoint_barriers = vec![];
119
120        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
121            &mut prev_epoch_fake_physical_time,
122            &mut pending_non_checkpoint_barriers,
123            PbBarrierKind::Checkpoint,
124        );
125
126        let added_actors = info.stream_job_fragments.actor_ids();
127        let actor_splits = info
128            .init_split_assignment
129            .values()
130            .flat_map(build_actor_connector_splits)
131            .collect();
132
133        let initial_mutation = Mutation::Add(AddMutation {
134            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
135            actor_dispatchers: Default::default(),
136            added_actors,
137            actor_splits,
138            // we assume that when handling snapshot backfill, the cluster must not be paused
139            pause: false,
140            subscriptions_to_add: Default::default(),
141            backfill_nodes_to_pause,
142        });
143
144        control_stream_manager.add_partial_graph(database_id, Some(job_id));
145        Self::inject_barrier(
146            database_id,
147            job_id,
148            control_stream_manager,
149            &mut barrier_control,
150            &graph_info,
151            Some(&graph_info),
152            initial_barrier_info,
153            Some(actors_to_create),
154            Some(initial_mutation),
155        )?;
156
157        assert!(pending_non_checkpoint_barriers.is_empty());
158
159        Ok(Self {
160            database_id,
161            definition: info.definition.clone(),
162            job_id,
163            snapshot_backfill_upstream_tables,
164            barrier_control,
165            backfill_epoch,
166            graph_info,
167            status: CreatingStreamingJobStatus::ConsumingSnapshot {
168                prev_epoch_fake_physical_time,
169                pending_upstream_barriers: vec![],
170                version_stats: version_stat.clone(),
171                create_mview_tracker,
172                snapshot_backfill_actors,
173                backfill_epoch,
174                pending_non_checkpoint_barriers,
175            },
176            upstream_lag: GLOBAL_META_METRICS
177                .snapshot_backfill_lag
178                .with_guarded_label_values(&[&format!("{}", job_id)]),
179        })
180    }
181
182    fn resolve_upstream_log_epochs(
183        snapshot_backfill_upstream_tables: &HashSet<TableId>,
184        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
185        committed_epoch: u64,
186        upstream_curr_epoch: u64,
187    ) -> MetaResult<Vec<BarrierInfo>> {
188        // TODO: add sanity check
189        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
190        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
191        loop {
192            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
193            if *checkpoint_epoch < committed_epoch {
194                continue;
195            }
196            assert_eq!(*checkpoint_epoch, committed_epoch);
197            break;
198        }
199        let mut ret = vec![];
200        let mut prev_epoch = committed_epoch;
201        let mut pending_non_checkpoint_barriers = vec![];
202        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
203            for (i, epoch) in non_checkpoint_epochs
204                .iter()
205                .chain([checkpoint_epoch])
206                .enumerate()
207            {
208                assert!(*epoch > prev_epoch);
209                pending_non_checkpoint_barriers.push(prev_epoch);
210                ret.push(BarrierInfo {
211                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
212                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
213                    kind: if i == 0 {
214                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
215                    } else {
216                        BarrierKind::Barrier
217                    },
218                });
219                prev_epoch = *epoch;
220            }
221        }
222        ret.push(BarrierInfo {
223            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
224            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
225            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
226        });
227        Ok(ret)
228    }
229
230    fn recover_consuming_snapshot(
231        job_id: TableId,
232        definition: &String,
233        snapshot_backfill_upstream_tables: &HashSet<TableId>,
234        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
235        backfill_epoch: u64,
236        committed_epoch: u64,
237        upstream_curr_epoch: u64,
238        stream_job_fragments: StreamJobFragments,
239        version_stat: &HummockVersionStats,
240    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
241        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
242        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
243        let mut pending_non_checkpoint_barriers = vec![];
244        let create_mview_tracker = CreateMviewProgressTracker::recover(
245            [(
246                job_id,
247                (
248                    definition.clone(),
249                    &stream_job_fragments,
250                    Default::default(),
251                ),
252            )],
253            version_stat,
254        );
255        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
256            &mut prev_epoch_fake_physical_time,
257            &mut pending_non_checkpoint_barriers,
258            PbBarrierKind::Initial,
259        );
260        Ok((
261            CreatingStreamingJobStatus::ConsumingSnapshot {
262                prev_epoch_fake_physical_time,
263                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
264                    snapshot_backfill_upstream_tables,
265                    upstream_table_log_epochs,
266                    backfill_epoch,
267                    upstream_curr_epoch,
268                )?,
269                version_stats: version_stat.clone(),
270                create_mview_tracker,
271                snapshot_backfill_actors,
272                backfill_epoch,
273                pending_non_checkpoint_barriers,
274            },
275            barrier_info,
276        ))
277    }
278
279    fn recover_consuming_log_store(
280        snapshot_backfill_upstream_tables: &HashSet<TableId>,
281        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
282        committed_epoch: u64,
283        upstream_curr_epoch: u64,
284        stream_job_fragments: StreamJobFragments,
285    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
286        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
287        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
288            snapshot_backfill_upstream_tables,
289            upstream_table_log_epochs,
290            committed_epoch,
291            upstream_curr_epoch,
292        )?;
293        let mut first_barrier = barriers_to_inject.remove(0);
294        assert!(first_barrier.kind.is_checkpoint());
295        first_barrier.kind = BarrierKind::Initial;
296        Ok((
297            CreatingStreamingJobStatus::ConsumingLogStore {
298                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
299                    snapshot_backfill_actors.into_iter(),
300                    barriers_to_inject
301                        .last()
302                        .map(|info| info.prev_epoch() - committed_epoch)
303                        .unwrap_or(0),
304                ),
305                barriers_to_inject: Some(barriers_to_inject),
306            },
307            first_barrier,
308        ))
309    }
310
311    #[expect(clippy::too_many_arguments)]
312    pub(crate) fn recover(
313        database_id: DatabaseId,
314        job_id: TableId,
315        definition: String,
316        snapshot_backfill_upstream_tables: HashSet<TableId>,
317        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
318        backfill_epoch: u64,
319        committed_epoch: u64,
320        upstream_curr_epoch: u64,
321        graph_info: InflightStreamingJobInfo,
322        stream_job_fragments: StreamJobFragments,
323        version_stat: &HummockVersionStats,
324        new_actors: StreamJobActorsToCreate,
325        initial_mutation: Mutation,
326        control_stream_manager: &mut ControlStreamManager,
327    ) -> MetaResult<Self> {
328        debug!(
329            %job_id,
330            definition,
331            "recovered creating job"
332        );
333        let mut barrier_control =
334            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
335
336        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
337            Self::recover_consuming_snapshot(
338                job_id,
339                &definition,
340                &snapshot_backfill_upstream_tables,
341                upstream_table_log_epochs,
342                backfill_epoch,
343                committed_epoch,
344                upstream_curr_epoch,
345                stream_job_fragments,
346                version_stat,
347            )?
348        } else {
349            Self::recover_consuming_log_store(
350                &snapshot_backfill_upstream_tables,
351                upstream_table_log_epochs,
352                committed_epoch,
353                upstream_curr_epoch,
354                stream_job_fragments,
355            )?
356        };
357        control_stream_manager.add_partial_graph(database_id, Some(job_id));
358        Self::inject_barrier(
359            database_id,
360            job_id,
361            control_stream_manager,
362            &mut barrier_control,
363            &graph_info,
364            Some(&graph_info),
365            first_barrier_info,
366            Some(new_actors),
367            Some(initial_mutation),
368        )?;
369        Ok(Self {
370            database_id,
371            job_id,
372            definition,
373            snapshot_backfill_upstream_tables,
374            backfill_epoch,
375            graph_info,
376            barrier_control,
377            status,
378            upstream_lag: GLOBAL_META_METRICS
379                .snapshot_backfill_lag
380                .with_guarded_label_values(&[&format!("{}", job_id)]),
381        })
382    }
383
384    pub(crate) fn is_empty(&self) -> bool {
385        self.barrier_control.is_empty()
386    }
387
388    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
389        self.barrier_control.is_valid_after_worker_err(worker_id)
390            && (!self.status.is_finishing()
391                || InflightFragmentInfo::contains_worker(
392                    self.graph_info.fragment_infos(),
393                    worker_id,
394                ))
395    }
396
397    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
398        let progress = match &self.status {
399            CreatingStreamingJobStatus::ConsumingSnapshot {
400                create_mview_tracker,
401                ..
402            } => {
403                if create_mview_tracker.has_pending_finished_jobs() {
404                    "Snapshot finished".to_owned()
405                } else {
406                    let progress = create_mview_tracker
407                        .gen_ddl_progress()
408                        .remove(&self.job_id.table_id)
409                        .expect("should exist");
410                    format!("Snapshot [{}]", progress.progress)
411                }
412            }
413            CreatingStreamingJobStatus::ConsumingLogStore {
414                log_store_progress_tracker,
415                ..
416            } => {
417                format!(
418                    "LogStore [{}]",
419                    log_store_progress_tracker.gen_ddl_progress()
420                )
421            }
422            CreatingStreamingJobStatus::Finishing(_) => {
423                format!(
424                    "Finishing [epoch count: {}]",
425                    self.barrier_control.inflight_barrier_count()
426                )
427            }
428        };
429        DdlProgress {
430            id: self.job_id.table_id as u64,
431            statement: self.definition.clone(),
432            progress,
433        }
434    }
435
436    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
437        if self.status.is_finishing() {
438            None
439        } else {
440            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
441            Some(max(
442                self.barrier_control.max_collected_epoch().unwrap_or(0),
443                self.backfill_epoch,
444            ))
445        }
446    }
447
448    fn inject_barrier(
449        database_id: DatabaseId,
450        table_id: TableId,
451        control_stream_manager: &mut ControlStreamManager,
452        barrier_control: &mut CreatingStreamingJobBarrierControl,
453        pre_applied_graph_info: &InflightStreamingJobInfo,
454        applied_graph_info: Option<&InflightStreamingJobInfo>,
455        barrier_info: BarrierInfo,
456        new_actors: Option<StreamJobActorsToCreate>,
457        mutation: Option<Mutation>,
458    ) -> MetaResult<()> {
459        let node_to_collect = control_stream_manager.inject_barrier(
460            database_id,
461            Some(table_id),
462            mutation,
463            &barrier_info,
464            pre_applied_graph_info.fragment_infos(),
465            applied_graph_info
466                .map(|graph_info| graph_info.fragment_infos())
467                .into_iter()
468                .flatten(),
469            new_actors,
470            vec![],
471            vec![],
472        )?;
473        barrier_control.enqueue_epoch(
474            barrier_info.prev_epoch(),
475            node_to_collect,
476            barrier_info.kind.clone(),
477        );
478        Ok(())
479    }
480
481    pub(super) fn on_new_command(
482        &mut self,
483        control_stream_manager: &mut ControlStreamManager,
484        command: Option<&Command>,
485        barrier_info: &BarrierInfo,
486    ) -> MetaResult<()> {
487        let table_id = self.job_id;
488        let start_consume_upstream =
489            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
490                jobs_to_merge.contains_key(&table_id)
491            } else {
492                false
493            };
494        if start_consume_upstream {
495            info!(
496                table_id = self.job_id.table_id,
497                prev_epoch = barrier_info.prev_epoch(),
498                "start consuming upstream"
499            );
500        }
501        let progress_epoch =
502            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
503                max(max_collected_epoch, self.backfill_epoch)
504            } else {
505                self.backfill_epoch
506            };
507        self.upstream_lag.set(
508            barrier_info
509                .prev_epoch
510                .value()
511                .0
512                .saturating_sub(progress_epoch) as _,
513        );
514        if start_consume_upstream {
515            self.status.start_consume_upstream(barrier_info);
516            Self::inject_barrier(
517                self.database_id,
518                self.job_id,
519                control_stream_manager,
520                &mut self.barrier_control,
521                &self.graph_info,
522                None,
523                barrier_info.clone(),
524                None,
525                None,
526            )?;
527        } else {
528            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
529                Self::inject_barrier(
530                    self.database_id,
531                    self.job_id,
532                    control_stream_manager,
533                    &mut self.barrier_control,
534                    &self.graph_info,
535                    Some(&self.graph_info),
536                    barrier_to_inject,
537                    None,
538                    mutation,
539                )?;
540            }
541        }
542        Ok(())
543    }
544
545    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
546        self.status.update_progress(&resp.create_mview_progress);
547        self.barrier_control.collect(resp);
548        self.should_merge_to_upstream().is_some()
549    }
550
551    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
552        if let CreatingStreamingJobStatus::ConsumingLogStore {
553            log_store_progress_tracker,
554            barriers_to_inject,
555        } = &self.status
556            && barriers_to_inject.is_none()
557            && log_store_progress_tracker.is_finished()
558        {
559            Some(&self.graph_info)
560        } else {
561            None
562        }
563    }
564}
565
566pub(super) enum CompleteJobType {
567    /// The first barrier
568    First,
569    Normal,
570    /// The last barrier to complete
571    Finished,
572}
573
574impl CreatingStreamingJobControl {
575    pub(super) fn start_completing(
576        &mut self,
577        min_upstream_inflight_epoch: Option<u64>,
578    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
579        let (finished_at_epoch, epoch_end_bound) = match &self.status {
580            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
581                let epoch_end_bound = min_upstream_inflight_epoch
582                    .map(|upstream_epoch| {
583                        if upstream_epoch < *finish_at_epoch {
584                            Excluded(upstream_epoch)
585                        } else {
586                            Unbounded
587                        }
588                    })
589                    .unwrap_or(Unbounded);
590                (Some(*finish_at_epoch), epoch_end_bound)
591            }
592            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
593            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
594                None,
595                min_upstream_inflight_epoch
596                    .map(Excluded)
597                    .unwrap_or(Unbounded),
598            ),
599        };
600        self.barrier_control.start_completing(epoch_end_bound).map(
601            |(epoch, resps, is_first_commit)| {
602                let status = if let Some(finish_at_epoch) = finished_at_epoch {
603                    assert!(!is_first_commit);
604                    if epoch == finish_at_epoch {
605                        self.barrier_control.ack_completed(epoch);
606                        assert!(self.barrier_control.is_empty());
607                        CompleteJobType::Finished
608                    } else {
609                        CompleteJobType::Normal
610                    }
611                } else if is_first_commit {
612                    CompleteJobType::First
613                } else {
614                    CompleteJobType::Normal
615                };
616                (epoch, resps, status)
617            },
618        )
619    }
620
621    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
622        self.barrier_control.ack_completed(completed_epoch);
623    }
624
625    pub(super) fn is_finished(&self) -> bool {
626        self.barrier_control.is_empty() && self.status.is_finishing()
627    }
628
629    pub fn is_consuming(&self) -> bool {
630        match &self.status {
631            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
632            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
633            CreatingStreamingJobStatus::Finishing(_) => false,
634        }
635    }
636
637    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
638        self.graph_info.existing_table_ids()
639    }
640}