risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::id::JobId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
25use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
26use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
27use risingwave_pb::stream_plan::{
28    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation, ThrottleMutation,
29};
30use tracing::warn;
31
32use crate::MetaResult;
33use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
34use crate::barrier::command::PostCollectCommand;
35use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
36use crate::barrier::edge_builder::FragmentEdgeBuilder;
37use crate::barrier::info::{
38    BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
39};
40use crate::barrier::notifier::Notifier;
41use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
42use crate::barrier::utils::NodeToCollect;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
44use crate::controller::fragment::InflightFragmentInfo;
45use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
46
47/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
48pub(in crate::barrier) struct BarrierWorkerState {
49    /// The last sent `prev_epoch`
50    ///
51    /// There's no need to persist this field. On recovery, we will restore this from the latest
52    /// committed snapshot in `HummockManager`.
53    in_flight_prev_epoch: TracedEpoch,
54
55    /// The `prev_epoch` of pending non checkpoint barriers
56    pending_non_checkpoint_barriers: Vec<u64>,
57
58    /// Whether the cluster is paused.
59    is_paused: bool,
60}
61
62impl BarrierWorkerState {
63    pub(super) fn new() -> Self {
64        Self {
65            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
66            pending_non_checkpoint_barriers: vec![],
67            is_paused: false,
68        }
69    }
70
71    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
72        Self {
73            in_flight_prev_epoch,
74            pending_non_checkpoint_barriers: vec![],
75            is_paused,
76        }
77    }
78
79    pub fn is_paused(&self) -> bool {
80        self.is_paused
81    }
82
83    fn set_is_paused(&mut self, is_paused: bool) {
84        if self.is_paused != is_paused {
85            tracing::info!(
86                currently_paused = self.is_paused,
87                newly_paused = is_paused,
88                "update paused state"
89            );
90            self.is_paused = is_paused;
91        }
92    }
93
94    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
95        &self.in_flight_prev_epoch
96    }
97
98    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
99    pub fn next_barrier_info(
100        &mut self,
101        is_checkpoint: bool,
102        curr_epoch: TracedEpoch,
103    ) -> BarrierInfo {
104        assert!(
105            self.in_flight_prev_epoch.value() < curr_epoch.value(),
106            "curr epoch regress. {} > {}",
107            self.in_flight_prev_epoch.value(),
108            curr_epoch.value()
109        );
110        let prev_epoch = self.in_flight_prev_epoch.clone();
111        self.in_flight_prev_epoch = curr_epoch.clone();
112        self.pending_non_checkpoint_barriers
113            .push(prev_epoch.value().0);
114        let kind = if is_checkpoint {
115            let epochs = take(&mut self.pending_non_checkpoint_barriers);
116            BarrierKind::Checkpoint(epochs)
117        } else {
118            BarrierKind::Barrier
119        };
120        BarrierInfo {
121            prev_epoch,
122            curr_epoch,
123            kind,
124        }
125    }
126}
127
128pub(super) struct ApplyCommandInfo {
129    pub mv_subscription_max_retention: HashMap<TableId, u64>,
130    pub table_ids_to_commit: HashSet<TableId>,
131    pub jobs_to_wait: HashSet<JobId>,
132    pub node_to_collect: NodeToCollect,
133    pub command: PostCollectCommand,
134}
135
136impl DatabaseCheckpointControl {
137    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
138    /// will be removed from the state after the info get resolved.
139    pub(super) fn apply_command(
140        &mut self,
141        mut command: Option<Command>,
142        notifiers: &mut Vec<Notifier>,
143        barrier_info: &BarrierInfo,
144        control_stream_manager: &mut ControlStreamManager,
145        hummock_version_stats: &HummockVersionStats,
146    ) -> MetaResult<ApplyCommandInfo> {
147        debug_assert!(
148            !matches!(
149                command,
150                Some(Command::RescheduleIntent {
151                    reschedule_plan: None,
152                    ..
153                })
154            ),
155            "reschedule intent must be resolved before apply"
156        );
157        if matches!(
158            command,
159            Some(Command::RescheduleIntent {
160                reschedule_plan: None,
161                ..
162            })
163        ) {
164            bail!("reschedule intent must be resolved before apply");
165        }
166        let mut edges = self
167            .database_info
168            .build_edge(command.as_ref(), &*control_stream_manager);
169
170        // Insert newly added snapshot backfill job
171        if let &mut Some(Command::CreateStreamingJob {
172            ref mut job_type,
173            ref mut info,
174            ref cross_db_snapshot_backfill_info,
175        }) = &mut command
176        {
177            match job_type {
178                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
179                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
180                        fill_snapshot_backfill_epoch(
181                            &mut fragment.nodes,
182                            None,
183                            cross_db_snapshot_backfill_info,
184                        )?;
185                    }
186                }
187                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
188                    assert!(!self.state.is_paused());
189                    let snapshot_epoch = barrier_info.prev_epoch();
190                    // set snapshot epoch of upstream table for snapshot backfill
191                    for snapshot_backfill_epoch in snapshot_backfill_info
192                        .upstream_mv_table_id_to_backfill_epoch
193                        .values_mut()
194                    {
195                        assert_eq!(
196                            snapshot_backfill_epoch.replace(snapshot_epoch),
197                            None,
198                            "must not set previously"
199                        );
200                    }
201                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
202                        fill_snapshot_backfill_epoch(
203                            &mut fragment.nodes,
204                            Some(snapshot_backfill_info),
205                            cross_db_snapshot_backfill_info,
206                        )?;
207                    }
208                    let job_id = info.stream_job_fragments.stream_job_id();
209                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
210                        .upstream_mv_table_id_to_backfill_epoch
211                        .keys()
212                        .cloned()
213                        .collect();
214
215                    let job = CreatingStreamingJobControl::new(
216                        CreateSnapshotBackfillJobCommandInfo {
217                            info: info.clone(),
218                            snapshot_backfill_info: snapshot_backfill_info.clone(),
219                            cross_db_snapshot_backfill_info: cross_db_snapshot_backfill_info
220                                .clone(),
221                        },
222                        take(notifiers),
223                        snapshot_backfill_upstream_tables,
224                        snapshot_epoch,
225                        hummock_version_stats,
226                        control_stream_manager,
227                        edges.as_mut().expect("should exist"),
228                    )?;
229
230                    self.database_info
231                        .shared_actor_infos
232                        .upsert(self.database_id, job.fragment_infos_with_job_id());
233
234                    self.creating_streaming_job_controls.insert(job_id, job);
235                }
236            }
237        }
238
239        // update the fragment_infos outside pre_apply
240        let post_apply_changes = if let Some(Command::CreateStreamingJob {
241            job_type: CreateStreamingJobType::SnapshotBackfill(_),
242            ..
243        }) = command
244        {
245            None
246        } else if let Some((new_job, fragment_changes)) =
247            command.as_ref().and_then(Command::fragment_changes)
248        {
249            Some(self.database_info.pre_apply(new_job, fragment_changes))
250        } else {
251            None
252        };
253
254        match &command {
255            Some(Command::CreateSubscription {
256                subscription_id,
257                upstream_mv_table_id,
258                retention_second,
259            }) => {
260                self.database_info.register_subscriber(
261                    upstream_mv_table_id.as_job_id(),
262                    subscription_id.as_subscriber_id(),
263                    SubscriberType::Subscription(*retention_second),
264                );
265            }
266            Some(Command::CreateStreamingJob {
267                info,
268                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
269                ..
270            }) => {
271                for upstream_mv_table_id in snapshot_backfill_info
272                    .upstream_mv_table_id_to_backfill_epoch
273                    .keys()
274                {
275                    self.database_info.register_subscriber(
276                        upstream_mv_table_id.as_job_id(),
277                        info.streaming_job.id().as_subscriber_id(),
278                        SubscriberType::SnapshotBackfill,
279                    );
280                }
281            }
282            _ => {}
283        };
284
285        let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
286        let mut actors_to_create = command.as_ref().and_then(|command| {
287            command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
288        });
289        let mut node_actors =
290            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
291
292        if let Some(post_apply_changes) = post_apply_changes {
293            self.database_info.post_apply(post_apply_changes);
294        }
295
296        let prev_is_paused = self.state.is_paused();
297        let curr_is_paused = match command {
298            Some(Command::Pause) => true,
299            Some(Command::Resume) => false,
300            _ => prev_is_paused,
301        };
302        self.state.set_is_paused(curr_is_paused);
303
304        let mutation = if let Some(c) = &command {
305            c.to_mutation(
306                prev_is_paused,
307                &mut edges,
308                control_stream_manager,
309                &mut self.database_info,
310            )?
311        } else {
312            None
313        };
314
315        let mut finished_snapshot_backfill_jobs = HashSet::new();
316        let mutation = match mutation {
317            Some(mutation) => Some(mutation),
318            None => {
319                let mut finished_snapshot_backfill_job_info = HashMap::new();
320                if barrier_info.kind.is_checkpoint() {
321                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
322                        if creating_job.should_merge_to_upstream() {
323                            let info = creating_job
324                                .start_consume_upstream(control_stream_manager, barrier_info)?;
325                            finished_snapshot_backfill_job_info
326                                .try_insert(job_id, info)
327                                .expect("non-duplicated");
328                        }
329                    }
330                }
331
332                if !finished_snapshot_backfill_job_info.is_empty() {
333                    let actors_to_create = actors_to_create.get_or_insert_default();
334                    let mut subscriptions_to_drop = vec![];
335                    let mut dispatcher_update = vec![];
336                    let mut actor_splits = HashMap::new();
337                    for (job_id, info) in finished_snapshot_backfill_job_info {
338                        finished_snapshot_backfill_jobs.insert(job_id);
339                        subscriptions_to_drop.extend(
340                            info.snapshot_backfill_upstream_tables.iter().map(
341                                |upstream_table_id| PbSubscriptionUpstreamInfo {
342                                    subscriber_id: job_id.as_subscriber_id(),
343                                    upstream_mv_table_id: *upstream_table_id,
344                                },
345                            ),
346                        );
347                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
348                            assert_matches!(
349                                self.database_info.unregister_subscriber(
350                                    upstream_mv_table_id.as_job_id(),
351                                    job_id.as_subscriber_id()
352                                ),
353                                Some(SubscriberType::SnapshotBackfill)
354                            );
355                        }
356
357                        table_ids_to_commit.extend(
358                            info.fragment_infos
359                                .values()
360                                .flat_map(|fragment| fragment.state_table_ids.iter())
361                                .copied(),
362                        );
363
364                        let actor_len = info
365                            .fragment_infos
366                            .values()
367                            .map(|fragment| fragment.actors.len() as u64)
368                            .sum();
369                        let id_gen = GlobalActorIdGen::new(
370                            control_stream_manager.env.actor_id_generator(),
371                            actor_len,
372                        );
373                        let mut next_local_actor_id = 0;
374                        // mapping from old_actor_id to new_actor_id
375                        let actor_mapping: HashMap<_, _> = info
376                            .fragment_infos
377                            .values()
378                            .flat_map(|fragment| fragment.actors.keys())
379                            .map(|old_actor_id| {
380                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
381                                next_local_actor_id += 1;
382                                (*old_actor_id, new_actor_id.as_global_id())
383                            })
384                            .collect();
385                        let actor_mapping = &actor_mapping;
386                        let new_stream_actors: HashMap<_, _> = info
387                            .stream_actors
388                            .into_iter()
389                            .map(|(old_actor_id, mut actor)| {
390                                let new_actor_id = actor_mapping[&old_actor_id];
391                                actor.actor_id = new_actor_id;
392                                (new_actor_id, actor)
393                            })
394                            .collect();
395                        let new_fragment_info: HashMap<_, _> = info
396                            .fragment_infos
397                            .into_iter()
398                            .map(|(fragment_id, mut fragment)| {
399                                let actors = take(&mut fragment.actors);
400                                fragment.actors = actors
401                                    .into_iter()
402                                    .map(|(old_actor_id, actor)| {
403                                        let new_actor_id = actor_mapping[&old_actor_id];
404                                        (new_actor_id, actor)
405                                    })
406                                    .collect();
407                                (fragment_id, fragment)
408                            })
409                            .collect();
410                        actor_splits.extend(
411                            new_fragment_info
412                                .values()
413                                .flat_map(|fragment| &fragment.actors)
414                                .map(|(actor_id, actor)| {
415                                    (
416                                        *actor_id,
417                                        ConnectorSplits {
418                                            splits: actor
419                                                .splits
420                                                .iter()
421                                                .map(ConnectorSplit::from)
422                                                .collect(),
423                                        },
424                                    )
425                                }),
426                        );
427                        // new actors belong to the database partial graph
428                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
429                        let mut edge_builder = FragmentEdgeBuilder::new(
430                            info.upstream_fragment_downstreams
431                                .keys()
432                                .map(|upstream_fragment_id| {
433                                    self.database_info.fragment(*upstream_fragment_id)
434                                })
435                                .chain(new_fragment_info.values())
436                                .map(|info| (info, partial_graph_id)),
437                            control_stream_manager,
438                        );
439                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
440                        edge_builder.add_relations(&info.downstreams);
441                        let mut edges = edge_builder.build();
442                        let new_actors_to_create = edges.collect_actors_to_create(
443                            new_fragment_info.values().map(|fragment| {
444                                (
445                                    fragment.fragment_id,
446                                    &fragment.nodes,
447                                    fragment.actors.iter().map(|(actor_id, actor)| {
448                                        (&new_stream_actors[actor_id], actor.worker_id)
449                                    }),
450                                    [], // no initial subscriber for backfilling job
451                                )
452                            }),
453                        );
454                        dispatcher_update.extend(
455                            info.upstream_fragment_downstreams.keys().flat_map(
456                                |upstream_fragment_id| {
457                                    let new_actor_dispatchers = edges
458                                        .dispatchers
459                                        .remove(upstream_fragment_id)
460                                        .expect("should exist");
461                                    new_actor_dispatchers.into_iter().flat_map(
462                                        |(upstream_actor_id, dispatchers)| {
463                                            dispatchers.into_iter().map(move |dispatcher| {
464                                                PbDispatcherUpdate {
465                                                    actor_id: upstream_actor_id,
466                                                    dispatcher_id: dispatcher.dispatcher_id,
467                                                    hash_mapping: dispatcher.hash_mapping,
468                                                    removed_downstream_actor_id: dispatcher
469                                                        .downstream_actor_id
470                                                        .iter()
471                                                        .map(|new_downstream_actor_id| {
472                                                            actor_mapping
473                                                            .iter()
474                                                            .find_map(
475                                                                |(old_actor_id, new_actor_id)| {
476                                                                    (new_downstream_actor_id
477                                                                        == new_actor_id)
478                                                                        .then_some(*old_actor_id)
479                                                                },
480                                                            )
481                                                            .expect("should exist")
482                                                        })
483                                                        .collect(),
484                                                    added_downstream_actor_id: dispatcher
485                                                        .downstream_actor_id,
486                                                }
487                                            })
488                                        },
489                                    )
490                                },
491                            ),
492                        );
493                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
494                        for (worker_id, worker_actors) in new_actors_to_create {
495                            node_actors.entry(worker_id).or_default().extend(
496                                worker_actors.values().flat_map(|(_, actors, _)| {
497                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
498                                }),
499                            );
500                            actors_to_create
501                                .entry(worker_id)
502                                .or_default()
503                                .extend(worker_actors);
504                        }
505                        self.database_info.add_existing(InflightStreamingJobInfo {
506                            job_id,
507                            fragment_infos: new_fragment_info,
508                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
509                            status: CreateStreamingJobStatus::Created,
510                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
511                        });
512                    }
513
514                    Some(PbMutation::Update(PbUpdateMutation {
515                        dispatcher_update,
516                        merge_update: vec![], // no upstream update on existing actors
517                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
518                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
519                        actor_splits,
520                        actor_new_dispatchers: Default::default(), // no new dispatcher
521                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
522                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
523                        subscriptions_to_drop,
524                    }))
525                } else {
526                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
527                    if fragment_ids.is_empty() {
528                        None
529                    } else {
530                        Some(PbMutation::StartFragmentBackfill(
531                            PbStartFragmentBackfillMutation { fragment_ids },
532                        ))
533                    }
534                }
535            }
536        };
537
538        #[expect(clippy::collapsible_if)]
539        if let Some(Command::DropSubscription {
540            subscription_id,
541            upstream_mv_table_id,
542        }) = command
543        {
544            if self
545                .database_info
546                .unregister_subscriber(
547                    upstream_mv_table_id.as_job_id(),
548                    subscription_id.as_subscriber_id(),
549                )
550                .is_none()
551            {
552                warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
553            }
554        }
555
556        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
557            if !finished_snapshot_backfill_jobs.contains(job_id) {
558                let throttle_mutation = if let Some(Command::Throttle { jobs, config }) = &command
559                    && jobs.contains(job_id)
560                {
561                    assert_eq!(
562                        jobs.len(),
563                        1,
564                        "should not alter rate limit of snapshot backfill job with other jobs"
565                    );
566                    Some((
567                        Mutation::Throttle(ThrottleMutation {
568                            fragment_throttle: config
569                                .iter()
570                                .map(|(fragment_id, config)| (*fragment_id, *config))
571                                .collect(),
572                        }),
573                        take(notifiers),
574                    ))
575                } else {
576                    None
577                };
578                creating_job.on_new_upstream_barrier(
579                    control_stream_manager,
580                    barrier_info,
581                    throttle_mutation,
582                )?;
583            }
584        }
585
586        let node_to_collect = control_stream_manager.inject_barrier(
587            self.database_id,
588            None,
589            mutation,
590            barrier_info,
591            &node_actors,
592            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
593            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
594            actors_to_create,
595        )?;
596
597        Ok(ApplyCommandInfo {
598            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
599            table_ids_to_commit,
600            jobs_to_wait: finished_snapshot_backfill_jobs,
601            node_to_collect,
602            command: command
603                .map(Command::into_post_collect)
604                .unwrap_or(PostCollectCommand::barrier()),
605        })
606    }
607}