risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::ops::Bound::{Excluded, Unbounded};
21
22use barrier_control::CreatingStreamingJobBarrierControl;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::metrics::LabelGuardedIntGauge;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::HummockVersionStats;
28use risingwave_pb::stream_plan::AddMutation;
29use risingwave_pb::stream_plan::barrier_mutation::Mutation;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use status::CreatingStreamingJobStatus;
32use tracing::info;
33
34use crate::MetaResult;
35use crate::barrier::edge_builder::FragmentEdgeBuildResult;
36use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
37use crate::barrier::progress::CreateMviewProgressTracker;
38use crate::barrier::rpc::ControlStreamManager;
39use crate::barrier::{Command, CreateStreamingJobCommandInfo};
40use crate::controller::fragment::InflightFragmentInfo;
41use crate::model::StreamJobActorsToCreate;
42use crate::rpc::metrics::GLOBAL_META_METRICS;
43use crate::stream::build_actor_connector_splits;
44
45#[derive(Debug)]
46pub(crate) struct CreatingStreamingJobControl {
47    database_id: DatabaseId,
48    pub(super) job_id: TableId,
49    definition: String,
50    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
51    backfill_epoch: u64,
52
53    graph_info: InflightStreamingJobInfo,
54
55    barrier_control: CreatingStreamingJobBarrierControl,
56    status: CreatingStreamingJobStatus,
57
58    upstream_lag: LabelGuardedIntGauge<1>,
59}
60
61impl CreatingStreamingJobControl {
62    pub(super) fn new(
63        info: &CreateStreamingJobCommandInfo,
64        snapshot_backfill_upstream_tables: HashSet<TableId>,
65        backfill_epoch: u64,
66        version_stat: &HummockVersionStats,
67        control_stream_manager: &mut ControlStreamManager,
68        edges: &mut FragmentEdgeBuildResult,
69    ) -> MetaResult<Self> {
70        let job_id = info.stream_job_fragments.stream_job_id();
71        let database_id = DatabaseId::new(info.streaming_job.database_id());
72        info!(
73            %job_id,
74            definition = info.definition,
75            "new creating job"
76        );
77        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
78        let create_mview_tracker = CreateMviewProgressTracker::recover(
79            [(
80                job_id,
81                (info.definition.clone(), &*info.stream_job_fragments),
82            )],
83            version_stat,
84        );
85        let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
86
87        let table_id = info.stream_job_fragments.stream_job_id();
88        let table_id_str = format!("{}", table_id.table_id);
89
90        let actors_to_create =
91            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
92
93        let graph_info = InflightStreamingJobInfo {
94            job_id: table_id,
95            fragment_infos,
96        };
97
98        let mut barrier_control =
99            CreatingStreamingJobBarrierControl::new(table_id, backfill_epoch, false);
100
101        let mut prev_epoch_fake_physical_time = 0;
102        let mut pending_non_checkpoint_barriers = vec![];
103
104        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
105            &mut prev_epoch_fake_physical_time,
106            &mut pending_non_checkpoint_barriers,
107            true,
108        );
109
110        let added_actors = info.stream_job_fragments.actor_ids();
111        let actor_splits = info
112            .init_split_assignment
113            .values()
114            .flat_map(build_actor_connector_splits)
115            .collect();
116
117        let initial_mutation = Mutation::Add(AddMutation {
118            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
119            actor_dispatchers: Default::default(),
120            added_actors,
121            actor_splits,
122            // we assume that when handling snapshot backfill, the cluster must not be paused
123            pause: false,
124            subscriptions_to_add: Default::default(),
125        });
126
127        control_stream_manager.add_partial_graph(database_id, Some(job_id));
128        Self::inject_barrier(
129            database_id,
130            job_id,
131            control_stream_manager,
132            &mut barrier_control,
133            &graph_info,
134            Some(&graph_info),
135            initial_barrier_info,
136            Some(actors_to_create),
137            Some(initial_mutation),
138        )?;
139
140        assert!(pending_non_checkpoint_barriers.is_empty());
141
142        Ok(Self {
143            database_id,
144            definition: info.definition.clone(),
145            job_id,
146            snapshot_backfill_upstream_tables,
147            barrier_control,
148            backfill_epoch,
149            graph_info,
150            status: CreatingStreamingJobStatus::ConsumingSnapshot {
151                prev_epoch_fake_physical_time,
152                pending_upstream_barriers: vec![],
153                version_stats: version_stat.clone(),
154                create_mview_tracker,
155                snapshot_backfill_actors,
156                backfill_epoch,
157                pending_non_checkpoint_barriers,
158            },
159            upstream_lag: GLOBAL_META_METRICS
160                .snapshot_backfill_lag
161                .with_guarded_label_values(&[&table_id_str]),
162        })
163    }
164
165    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
166        self.barrier_control.is_valid_after_worker_err(worker_id)
167            && (!self.status.is_finishing()
168                || InflightFragmentInfo::contains_worker(
169                    self.graph_info.fragment_infos(),
170                    worker_id,
171                ))
172    }
173
174    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
175        let progress = match &self.status {
176            CreatingStreamingJobStatus::ConsumingSnapshot {
177                create_mview_tracker,
178                ..
179            } => {
180                if create_mview_tracker.has_pending_finished_jobs() {
181                    "Snapshot finished".to_owned()
182                } else {
183                    let progress = create_mview_tracker
184                        .gen_ddl_progress()
185                        .remove(&self.job_id.table_id)
186                        .expect("should exist");
187                    format!("Snapshot [{}]", progress.progress)
188                }
189            }
190            CreatingStreamingJobStatus::ConsumingLogStore {
191                log_store_progress_tracker,
192                ..
193            } => {
194                format!(
195                    "LogStore [{}]",
196                    log_store_progress_tracker.gen_ddl_progress()
197                )
198            }
199            CreatingStreamingJobStatus::Finishing(_) => {
200                format!(
201                    "Finishing [epoch count: {}]",
202                    self.barrier_control.inflight_barrier_count()
203                )
204            }
205        };
206        DdlProgress {
207            id: self.job_id.table_id as u64,
208            statement: self.definition.clone(),
209            progress,
210        }
211    }
212
213    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
214        if self.status.is_finishing() {
215            None
216        } else {
217            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
218            Some(max(
219                self.barrier_control.max_collected_epoch().unwrap_or(0),
220                self.backfill_epoch,
221            ))
222        }
223    }
224
225    fn inject_barrier(
226        database_id: DatabaseId,
227        table_id: TableId,
228        control_stream_manager: &mut ControlStreamManager,
229        barrier_control: &mut CreatingStreamingJobBarrierControl,
230        pre_applied_graph_info: &InflightStreamingJobInfo,
231        applied_graph_info: Option<&InflightStreamingJobInfo>,
232        barrier_info: BarrierInfo,
233        new_actors: Option<StreamJobActorsToCreate>,
234        mutation: Option<Mutation>,
235    ) -> MetaResult<()> {
236        let node_to_collect = control_stream_manager.inject_barrier(
237            database_id,
238            Some(table_id),
239            mutation,
240            &barrier_info,
241            pre_applied_graph_info.fragment_infos(),
242            applied_graph_info
243                .map(|graph_info| graph_info.fragment_infos())
244                .into_iter()
245                .flatten(),
246            new_actors,
247            vec![],
248            vec![],
249        )?;
250        barrier_control.enqueue_epoch(
251            barrier_info.prev_epoch(),
252            node_to_collect,
253            barrier_info.kind.is_checkpoint(),
254        );
255        Ok(())
256    }
257
258    pub(super) fn on_new_command(
259        &mut self,
260        control_stream_manager: &mut ControlStreamManager,
261        command: Option<&Command>,
262        barrier_info: &BarrierInfo,
263    ) -> MetaResult<()> {
264        let table_id = self.job_id;
265        let start_consume_upstream =
266            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
267                jobs_to_merge.contains_key(&table_id)
268            } else {
269                false
270            };
271        if start_consume_upstream {
272            info!(
273                table_id = self.job_id.table_id,
274                prev_epoch = barrier_info.prev_epoch(),
275                "start consuming upstream"
276            );
277        }
278        let progress_epoch =
279            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
280                max(max_collected_epoch, self.backfill_epoch)
281            } else {
282                self.backfill_epoch
283            };
284        self.upstream_lag.set(
285            barrier_info
286                .prev_epoch
287                .value()
288                .0
289                .saturating_sub(progress_epoch) as _,
290        );
291        if start_consume_upstream {
292            self.status.start_consume_upstream(barrier_info);
293            Self::inject_barrier(
294                self.database_id,
295                self.job_id,
296                control_stream_manager,
297                &mut self.barrier_control,
298                &self.graph_info,
299                None,
300                barrier_info.clone(),
301                None,
302                None,
303            )?;
304        } else {
305            for barrier_to_inject in self.status.on_new_upstream_epoch(barrier_info) {
306                Self::inject_barrier(
307                    self.database_id,
308                    self.job_id,
309                    control_stream_manager,
310                    &mut self.barrier_control,
311                    &self.graph_info,
312                    Some(&self.graph_info),
313                    barrier_to_inject,
314                    None,
315                    None,
316                )?;
317            }
318        }
319        Ok(())
320    }
321
322    pub(super) fn collect(
323        &mut self,
324        epoch: u64,
325        worker_id: WorkerId,
326        resp: BarrierCompleteResponse,
327    ) -> MetaResult<bool> {
328        self.status.update_progress(&resp.create_mview_progress);
329        self.barrier_control.collect(epoch, worker_id, resp);
330        Ok(self.should_merge_to_upstream().is_some())
331    }
332
333    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
334        if let CreatingStreamingJobStatus::ConsumingLogStore {
335            log_store_progress_tracker,
336            barriers_to_inject,
337        } = &self.status
338            && barriers_to_inject.is_none()
339            && log_store_progress_tracker.is_finished()
340        {
341            Some(&self.graph_info)
342        } else {
343            None
344        }
345    }
346}
347
348pub(super) enum CompleteJobType {
349    /// The first barrier
350    First,
351    Normal,
352    /// The last barrier to complete
353    Finished,
354}
355
356impl CreatingStreamingJobControl {
357    pub(super) fn start_completing(
358        &mut self,
359        min_upstream_inflight_epoch: Option<u64>,
360    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
361        let (finished_at_epoch, epoch_end_bound) = match &self.status {
362            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
363                let epoch_end_bound = min_upstream_inflight_epoch
364                    .map(|upstream_epoch| {
365                        if upstream_epoch < *finish_at_epoch {
366                            Excluded(upstream_epoch)
367                        } else {
368                            Unbounded
369                        }
370                    })
371                    .unwrap_or(Unbounded);
372                (Some(*finish_at_epoch), epoch_end_bound)
373            }
374            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
375            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
376                None,
377                min_upstream_inflight_epoch
378                    .map(Excluded)
379                    .unwrap_or(Unbounded),
380            ),
381        };
382        self.barrier_control.start_completing(epoch_end_bound).map(
383            |(epoch, resps, is_first_commit)| {
384                let status = if let Some(finish_at_epoch) = finished_at_epoch {
385                    assert!(!is_first_commit);
386                    if epoch == finish_at_epoch {
387                        self.barrier_control.ack_completed(epoch);
388                        assert!(self.barrier_control.is_empty());
389                        CompleteJobType::Finished
390                    } else {
391                        CompleteJobType::Normal
392                    }
393                } else if is_first_commit {
394                    CompleteJobType::First
395                } else {
396                    CompleteJobType::Normal
397                };
398                (epoch, resps, status)
399            },
400        )
401    }
402
403    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
404        self.barrier_control.ack_completed(completed_epoch);
405    }
406
407    pub(super) fn is_finished(&self) -> bool {
408        self.barrier_control.is_empty() && self.status.is_finishing()
409    }
410
411    pub fn inflight_graph_info(&self) -> Option<&InflightStreamingJobInfo> {
412        match &self.status {
413            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
414            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => Some(&self.graph_info),
415            CreatingStreamingJobStatus::Finishing(_) => None,
416        }
417    }
418
419    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
420        self.graph_info.existing_table_ids()
421    }
422}