risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
25use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
26use risingwave_pb::stream_plan::{
27    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation, ThrottleMutation,
28};
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
33use crate::barrier::command::PostCollectCommand;
34use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
35use crate::barrier::edge_builder::FragmentEdgeBuilder;
36use crate::barrier::info::{
37    BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
38};
39use crate::barrier::notifier::Notifier;
40use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
41use crate::barrier::utils::NodeToCollect;
42use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
43use crate::controller::fragment::InflightFragmentInfo;
44use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
45
46/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
47pub(in crate::barrier) struct BarrierWorkerState {
48    /// The last sent `prev_epoch`
49    ///
50    /// There's no need to persist this field. On recovery, we will restore this from the latest
51    /// committed snapshot in `HummockManager`.
52    in_flight_prev_epoch: TracedEpoch,
53
54    /// The `prev_epoch` of pending non checkpoint barriers
55    pending_non_checkpoint_barriers: Vec<u64>,
56
57    /// Whether the cluster is paused.
58    is_paused: bool,
59}
60
61impl BarrierWorkerState {
62    pub(super) fn new() -> Self {
63        Self {
64            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
65            pending_non_checkpoint_barriers: vec![],
66            is_paused: false,
67        }
68    }
69
70    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
71        Self {
72            in_flight_prev_epoch,
73            pending_non_checkpoint_barriers: vec![],
74            is_paused,
75        }
76    }
77
78    pub fn is_paused(&self) -> bool {
79        self.is_paused
80    }
81
82    fn set_is_paused(&mut self, is_paused: bool) {
83        if self.is_paused != is_paused {
84            tracing::info!(
85                currently_paused = self.is_paused,
86                newly_paused = is_paused,
87                "update paused state"
88            );
89            self.is_paused = is_paused;
90        }
91    }
92
93    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
94        &self.in_flight_prev_epoch
95    }
96
97    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
98    pub fn next_barrier_info(
99        &mut self,
100        is_checkpoint: bool,
101        curr_epoch: TracedEpoch,
102    ) -> BarrierInfo {
103        assert!(
104            self.in_flight_prev_epoch.value() < curr_epoch.value(),
105            "curr epoch regress. {} > {}",
106            self.in_flight_prev_epoch.value(),
107            curr_epoch.value()
108        );
109        let prev_epoch = self.in_flight_prev_epoch.clone();
110        self.in_flight_prev_epoch = curr_epoch.clone();
111        self.pending_non_checkpoint_barriers
112            .push(prev_epoch.value().0);
113        let kind = if is_checkpoint {
114            let epochs = take(&mut self.pending_non_checkpoint_barriers);
115            BarrierKind::Checkpoint(epochs)
116        } else {
117            BarrierKind::Barrier
118        };
119        BarrierInfo {
120            prev_epoch,
121            curr_epoch,
122            kind,
123        }
124    }
125}
126
127pub(super) struct ApplyCommandInfo {
128    pub mv_subscription_max_retention: HashMap<TableId, u64>,
129    pub table_ids_to_commit: HashSet<TableId>,
130    pub jobs_to_wait: HashSet<JobId>,
131    pub node_to_collect: NodeToCollect,
132    pub command: PostCollectCommand,
133}
134
135impl DatabaseCheckpointControl {
136    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
137    /// will be removed from the state after the info get resolved.
138    pub(super) fn apply_command(
139        &mut self,
140        mut command: Option<Command>,
141        notifiers: &mut Vec<Notifier>,
142        barrier_info: &BarrierInfo,
143        control_stream_manager: &mut ControlStreamManager,
144        hummock_version_stats: &HummockVersionStats,
145    ) -> MetaResult<ApplyCommandInfo> {
146        let mut edges = self
147            .database_info
148            .build_edge(command.as_ref(), &*control_stream_manager);
149
150        // Insert newly added snapshot backfill job
151        if let &mut Some(Command::CreateStreamingJob {
152            ref mut job_type,
153            ref mut info,
154            ref cross_db_snapshot_backfill_info,
155        }) = &mut command
156        {
157            match job_type {
158                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
159                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
160                        fill_snapshot_backfill_epoch(
161                            &mut fragment.nodes,
162                            None,
163                            cross_db_snapshot_backfill_info,
164                        )?;
165                    }
166                }
167                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
168                    assert!(!self.state.is_paused());
169                    let snapshot_epoch = barrier_info.prev_epoch();
170                    // set snapshot epoch of upstream table for snapshot backfill
171                    for snapshot_backfill_epoch in snapshot_backfill_info
172                        .upstream_mv_table_id_to_backfill_epoch
173                        .values_mut()
174                    {
175                        assert_eq!(
176                            snapshot_backfill_epoch.replace(snapshot_epoch),
177                            None,
178                            "must not set previously"
179                        );
180                    }
181                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
182                        fill_snapshot_backfill_epoch(
183                            &mut fragment.nodes,
184                            Some(snapshot_backfill_info),
185                            cross_db_snapshot_backfill_info,
186                        )?;
187                    }
188                    let job_id = info.stream_job_fragments.stream_job_id();
189                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
190                        .upstream_mv_table_id_to_backfill_epoch
191                        .keys()
192                        .cloned()
193                        .collect();
194
195                    let job = CreatingStreamingJobControl::new(
196                        CreateSnapshotBackfillJobCommandInfo {
197                            info: info.clone(),
198                            snapshot_backfill_info: snapshot_backfill_info.clone(),
199                            cross_db_snapshot_backfill_info: cross_db_snapshot_backfill_info
200                                .clone(),
201                        },
202                        take(notifiers),
203                        snapshot_backfill_upstream_tables,
204                        snapshot_epoch,
205                        hummock_version_stats,
206                        control_stream_manager,
207                        edges.as_mut().expect("should exist"),
208                    )?;
209
210                    self.database_info
211                        .shared_actor_infos
212                        .upsert(self.database_id, job.fragment_infos_with_job_id());
213
214                    self.creating_streaming_job_controls.insert(job_id, job);
215                }
216            }
217        }
218
219        // update the fragment_infos outside pre_apply
220        let post_apply_changes = if let Some(Command::CreateStreamingJob {
221            job_type: CreateStreamingJobType::SnapshotBackfill(_),
222            ..
223        }) = command
224        {
225            None
226        } else if let Some((new_job, fragment_changes)) =
227            command.as_ref().and_then(Command::fragment_changes)
228        {
229            Some(self.database_info.pre_apply(new_job, fragment_changes))
230        } else {
231            None
232        };
233
234        match &command {
235            Some(Command::CreateSubscription {
236                subscription_id,
237                upstream_mv_table_id,
238                retention_second,
239            }) => {
240                self.database_info.register_subscriber(
241                    upstream_mv_table_id.as_job_id(),
242                    subscription_id.as_subscriber_id(),
243                    SubscriberType::Subscription(*retention_second),
244                );
245            }
246            Some(Command::CreateStreamingJob {
247                info,
248                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
249                ..
250            }) => {
251                for upstream_mv_table_id in snapshot_backfill_info
252                    .upstream_mv_table_id_to_backfill_epoch
253                    .keys()
254                {
255                    self.database_info.register_subscriber(
256                        upstream_mv_table_id.as_job_id(),
257                        info.streaming_job.id().as_subscriber_id(),
258                        SubscriberType::SnapshotBackfill,
259                    );
260                }
261            }
262            _ => {}
263        };
264
265        let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
266        let mut actors_to_create = command.as_ref().and_then(|command| {
267            command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
268        });
269        let mut node_actors =
270            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
271
272        if let Some(post_apply_changes) = post_apply_changes {
273            self.database_info.post_apply(post_apply_changes);
274        }
275
276        let prev_is_paused = self.state.is_paused();
277        let curr_is_paused = match command {
278            Some(Command::Pause) => true,
279            Some(Command::Resume) => false,
280            _ => prev_is_paused,
281        };
282        self.state.set_is_paused(curr_is_paused);
283
284        let mutation = if let Some(c) = &command {
285            c.to_mutation(
286                prev_is_paused,
287                &mut edges,
288                control_stream_manager,
289                &mut self.database_info,
290            )?
291        } else {
292            None
293        };
294
295        let mut finished_snapshot_backfill_jobs = HashSet::new();
296        let mutation = match mutation {
297            Some(mutation) => Some(mutation),
298            None => {
299                let mut finished_snapshot_backfill_job_info = HashMap::new();
300                if barrier_info.kind.is_checkpoint() {
301                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
302                        if creating_job.should_merge_to_upstream() {
303                            let info = creating_job
304                                .start_consume_upstream(control_stream_manager, barrier_info)?;
305                            finished_snapshot_backfill_job_info
306                                .try_insert(job_id, info)
307                                .expect("non-duplicated");
308                        }
309                    }
310                }
311
312                if !finished_snapshot_backfill_job_info.is_empty() {
313                    let actors_to_create = actors_to_create.get_or_insert_default();
314                    let mut subscriptions_to_drop = vec![];
315                    let mut dispatcher_update = vec![];
316                    let mut actor_splits = HashMap::new();
317                    for (job_id, info) in finished_snapshot_backfill_job_info {
318                        finished_snapshot_backfill_jobs.insert(job_id);
319                        subscriptions_to_drop.extend(
320                            info.snapshot_backfill_upstream_tables.iter().map(
321                                |upstream_table_id| PbSubscriptionUpstreamInfo {
322                                    subscriber_id: job_id.as_subscriber_id(),
323                                    upstream_mv_table_id: *upstream_table_id,
324                                },
325                            ),
326                        );
327                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
328                            assert_matches!(
329                                self.database_info.unregister_subscriber(
330                                    upstream_mv_table_id.as_job_id(),
331                                    job_id.as_subscriber_id()
332                                ),
333                                Some(SubscriberType::SnapshotBackfill)
334                            );
335                        }
336
337                        table_ids_to_commit.extend(
338                            info.fragment_infos
339                                .values()
340                                .flat_map(|fragment| fragment.state_table_ids.iter())
341                                .copied(),
342                        );
343
344                        let actor_len = info
345                            .fragment_infos
346                            .values()
347                            .map(|fragment| fragment.actors.len() as u64)
348                            .sum();
349                        let id_gen = GlobalActorIdGen::new(
350                            control_stream_manager.env.actor_id_generator(),
351                            actor_len,
352                        );
353                        let mut next_local_actor_id = 0;
354                        // mapping from old_actor_id to new_actor_id
355                        let actor_mapping: HashMap<_, _> = info
356                            .fragment_infos
357                            .values()
358                            .flat_map(|fragment| fragment.actors.keys())
359                            .map(|old_actor_id| {
360                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
361                                next_local_actor_id += 1;
362                                (*old_actor_id, new_actor_id.as_global_id())
363                            })
364                            .collect();
365                        let actor_mapping = &actor_mapping;
366                        let new_stream_actors: HashMap<_, _> = info
367                            .stream_actors
368                            .into_iter()
369                            .map(|(old_actor_id, mut actor)| {
370                                let new_actor_id = actor_mapping[&old_actor_id];
371                                actor.actor_id = new_actor_id;
372                                (new_actor_id, actor)
373                            })
374                            .collect();
375                        let new_fragment_info: HashMap<_, _> = info
376                            .fragment_infos
377                            .into_iter()
378                            .map(|(fragment_id, mut fragment)| {
379                                let actors = take(&mut fragment.actors);
380                                fragment.actors = actors
381                                    .into_iter()
382                                    .map(|(old_actor_id, actor)| {
383                                        let new_actor_id = actor_mapping[&old_actor_id];
384                                        (new_actor_id, actor)
385                                    })
386                                    .collect();
387                                (fragment_id, fragment)
388                            })
389                            .collect();
390                        actor_splits.extend(
391                            new_fragment_info
392                                .values()
393                                .flat_map(|fragment| &fragment.actors)
394                                .map(|(actor_id, actor)| {
395                                    (
396                                        *actor_id,
397                                        ConnectorSplits {
398                                            splits: actor
399                                                .splits
400                                                .iter()
401                                                .map(ConnectorSplit::from)
402                                                .collect(),
403                                        },
404                                    )
405                                }),
406                        );
407                        // new actors belong to the database partial graph
408                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
409                        let mut edge_builder = FragmentEdgeBuilder::new(
410                            info.upstream_fragment_downstreams
411                                .keys()
412                                .map(|upstream_fragment_id| {
413                                    self.database_info.fragment(*upstream_fragment_id)
414                                })
415                                .chain(new_fragment_info.values())
416                                .map(|info| (info, partial_graph_id)),
417                            control_stream_manager,
418                        );
419                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
420                        edge_builder.add_relations(&info.downstreams);
421                        let mut edges = edge_builder.build();
422                        let new_actors_to_create = edges.collect_actors_to_create(
423                            new_fragment_info.values().map(|fragment| {
424                                (
425                                    fragment.fragment_id,
426                                    &fragment.nodes,
427                                    fragment.actors.iter().map(|(actor_id, actor)| {
428                                        (&new_stream_actors[actor_id], actor.worker_id)
429                                    }),
430                                    [], // no initial subscriber for backfilling job
431                                )
432                            }),
433                        );
434                        dispatcher_update.extend(
435                            info.upstream_fragment_downstreams.keys().flat_map(
436                                |upstream_fragment_id| {
437                                    let new_actor_dispatchers = edges
438                                        .dispatchers
439                                        .remove(upstream_fragment_id)
440                                        .expect("should exist");
441                                    new_actor_dispatchers.into_iter().flat_map(
442                                        |(upstream_actor_id, dispatchers)| {
443                                            dispatchers.into_iter().map(move |dispatcher| {
444                                                PbDispatcherUpdate {
445                                                    actor_id: upstream_actor_id,
446                                                    dispatcher_id: dispatcher.dispatcher_id,
447                                                    hash_mapping: dispatcher.hash_mapping,
448                                                    removed_downstream_actor_id: dispatcher
449                                                        .downstream_actor_id
450                                                        .iter()
451                                                        .map(|new_downstream_actor_id| {
452                                                            actor_mapping
453                                                            .iter()
454                                                            .find_map(
455                                                                |(old_actor_id, new_actor_id)| {
456                                                                    (new_downstream_actor_id
457                                                                        == new_actor_id)
458                                                                        .then_some(*old_actor_id)
459                                                                },
460                                                            )
461                                                            .expect("should exist")
462                                                        })
463                                                        .collect(),
464                                                    added_downstream_actor_id: dispatcher
465                                                        .downstream_actor_id,
466                                                }
467                                            })
468                                        },
469                                    )
470                                },
471                            ),
472                        );
473                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
474                        for (worker_id, worker_actors) in new_actors_to_create {
475                            node_actors.entry(worker_id).or_default().extend(
476                                worker_actors.values().flat_map(|(_, actors, _)| {
477                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
478                                }),
479                            );
480                            actors_to_create
481                                .entry(worker_id)
482                                .or_default()
483                                .extend(worker_actors);
484                        }
485                        self.database_info.add_existing(InflightStreamingJobInfo {
486                            job_id,
487                            fragment_infos: new_fragment_info,
488                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
489                            status: CreateStreamingJobStatus::Created,
490                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
491                        });
492                    }
493
494                    Some(PbMutation::Update(PbUpdateMutation {
495                        dispatcher_update,
496                        merge_update: vec![], // no upstream update on existing actors
497                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
498                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
499                        actor_splits,
500                        actor_new_dispatchers: Default::default(), // no new dispatcher
501                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
502                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
503                        subscriptions_to_drop,
504                    }))
505                } else {
506                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
507                    if fragment_ids.is_empty() {
508                        None
509                    } else {
510                        Some(PbMutation::StartFragmentBackfill(
511                            PbStartFragmentBackfillMutation { fragment_ids },
512                        ))
513                    }
514                }
515            }
516        };
517
518        #[expect(clippy::collapsible_if)]
519        if let Some(Command::DropSubscription {
520            subscription_id,
521            upstream_mv_table_id,
522        }) = command
523        {
524            if self
525                .database_info
526                .unregister_subscriber(
527                    upstream_mv_table_id.as_job_id(),
528                    subscription_id.as_subscriber_id(),
529                )
530                .is_none()
531            {
532                warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
533            }
534        }
535
536        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
537            if !finished_snapshot_backfill_jobs.contains(job_id) {
538                let throttle_mutation = if let Some(Command::Throttle { jobs, config }) = &command
539                    && jobs.contains(job_id)
540                {
541                    assert_eq!(
542                        jobs.len(),
543                        1,
544                        "should not alter rate limit of snapshot backfill job with other jobs"
545                    );
546                    Some((
547                        Mutation::Throttle(ThrottleMutation {
548                            fragment_throttle: config
549                                .iter()
550                                .map(|(fragment_id, config)| (*fragment_id, *config))
551                                .collect(),
552                        }),
553                        take(notifiers),
554                    ))
555                } else {
556                    None
557                };
558                creating_job.on_new_upstream_barrier(
559                    control_stream_manager,
560                    barrier_info,
561                    throttle_mutation,
562                )?;
563            }
564        }
565
566        let node_to_collect = control_stream_manager.inject_barrier(
567            self.database_id,
568            None,
569            mutation,
570            barrier_info,
571            &node_actors,
572            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
573            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
574            actors_to_create,
575        )?;
576
577        Ok(ApplyCommandInfo {
578            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
579            table_ids_to_commit,
580            jobs_to_wait: finished_snapshot_backfill_jobs,
581            node_to_collect,
582            command: command
583                .map(Command::into_post_collect)
584                .unwrap_or(PostCollectCommand::barrier()),
585        })
586    }
587}