risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_meta_model::WorkerId;
22use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use tracing::warn;
25
26use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
27use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
28use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
29use crate::model::{ActorId, FragmentId, SubscriptionId};
30
31#[derive(Debug, Clone)]
32pub(super) struct BarrierInfo {
33    pub prev_epoch: TracedEpoch,
34    pub curr_epoch: TracedEpoch,
35    pub kind: BarrierKind,
36}
37
38impl BarrierInfo {
39    pub(super) fn prev_epoch(&self) -> u64 {
40        self.prev_epoch.value().0
41    }
42}
43
44#[derive(Debug, Clone)]
45pub(crate) enum CommandFragmentChanges {
46    NewFragment(TableId, InflightFragmentInfo),
47    ReplaceNodeUpstream(
48        /// old `fragment_id` -> new `fragment_id`
49        HashMap<FragmentId, FragmentId>,
50    ),
51    Reschedule {
52        new_actors: HashMap<ActorId, InflightActorInfo>,
53        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
54        to_remove: HashSet<ActorId>,
55    },
56    RemoveFragment,
57}
58
59#[derive(Default, Clone, Debug)]
60pub struct InflightSubscriptionInfo {
61    /// `mv_table_id` => `subscription_id` => retention seconds
62    pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
63}
64
65#[derive(Clone, Debug)]
66pub struct InflightStreamingJobInfo {
67    pub job_id: TableId,
68    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
69}
70
71impl InflightStreamingJobInfo {
72    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
73        self.fragment_infos.values()
74    }
75
76    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
77        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
78    }
79}
80
81impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
82    type Item = &'a InflightFragmentInfo;
83
84    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
85
86    fn into_iter(self) -> Self::IntoIter {
87        self.fragment_infos()
88    }
89}
90
91#[derive(Clone, Debug)]
92pub struct InflightDatabaseInfo {
93    jobs: HashMap<TableId, InflightStreamingJobInfo>,
94    fragment_location: HashMap<FragmentId, TableId>,
95}
96
97impl InflightDatabaseInfo {
98    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
99        self.jobs.values().flat_map(|job| job.fragment_infos())
100    }
101
102    pub fn job_ids(&self) -> impl Iterator<Item = TableId> + '_ {
103        self.jobs.keys().cloned()
104    }
105
106    pub fn contains_job(&self, job_id: TableId) -> bool {
107        self.jobs.contains_key(&job_id)
108    }
109
110    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
111        let job_id = self.fragment_location[&fragment_id];
112        self.jobs
113            .get(&job_id)
114            .expect("should exist")
115            .fragment_infos
116            .get(&fragment_id)
117            .expect("should exist")
118    }
119
120    fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
121        let job_id = self.fragment_location[&fragment_id];
122        self.jobs
123            .get_mut(&job_id)
124            .expect("should exist")
125            .fragment_infos
126            .get_mut(&fragment_id)
127            .expect("should exist")
128    }
129}
130
131impl InflightDatabaseInfo {
132    pub fn empty() -> Self {
133        Self {
134            jobs: Default::default(),
135            fragment_location: Default::default(),
136        }
137    }
138
139    /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
140    pub fn new<I: Iterator<Item = (FragmentId, InflightFragmentInfo)>>(
141        fragment_infos: impl Iterator<Item = (TableId, I)>,
142    ) -> Self {
143        let mut fragment_location = HashMap::new();
144        let mut jobs = HashMap::new();
145
146        for (job_id, job_fragment_info) in fragment_infos {
147            let job_fragment_info: HashMap<_, _> = job_fragment_info.collect();
148            assert!(!job_fragment_info.is_empty());
149            for fragment_id in job_fragment_info.keys() {
150                fragment_location
151                    .try_insert(*fragment_id, job_id)
152                    .expect("no duplicate");
153            }
154            jobs.insert(
155                job_id,
156                InflightStreamingJobInfo {
157                    job_id,
158                    fragment_infos: job_fragment_info,
159                },
160            );
161        }
162        assert!(!jobs.is_empty());
163        Self {
164            jobs,
165            fragment_location,
166        }
167    }
168
169    pub fn is_empty(&self) -> bool {
170        self.jobs.is_empty()
171    }
172
173    pub(crate) fn extend(&mut self, job: InflightStreamingJobInfo) {
174        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
175            (
176                fragment_id,
177                CommandFragmentChanges::NewFragment(job.job_id, info),
178            )
179        }))
180    }
181
182    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
183    /// the info correspondingly.
184    pub(crate) fn pre_apply(
185        &mut self,
186        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
187    ) {
188        self.apply_add(
189            fragment_changes
190                .iter()
191                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
192        )
193    }
194
195    fn apply_add(
196        &mut self,
197        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
198    ) {
199        {
200            for (fragment_id, change) in fragment_changes {
201                match change {
202                    CommandFragmentChanges::NewFragment(job_id, info) => {
203                        let fragment_infos =
204                            self.jobs
205                                .entry(job_id)
206                                .or_insert_with(|| InflightStreamingJobInfo {
207                                    job_id,
208                                    fragment_infos: Default::default(),
209                                });
210                        fragment_infos
211                            .fragment_infos
212                            .try_insert(fragment_id, info)
213                            .expect("non duplicate");
214                        self.fragment_location
215                            .try_insert(fragment_id, job_id)
216                            .expect("non duplicate");
217                    }
218                    CommandFragmentChanges::Reschedule {
219                        new_actors,
220                        actor_update_vnode_bitmap,
221                        ..
222                    } => {
223                        let info = self.fragment_mut(fragment_id);
224                        let actors = &mut info.actors;
225                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
226                            actors
227                                .get_mut(&actor_id)
228                                .expect("should exist")
229                                .vnode_bitmap = Some(new_vnodes);
230                        }
231                        for (actor_id, actor) in new_actors {
232                            actors
233                                .try_insert(actor_id as _, actor)
234                                .expect("non-duplicate");
235                        }
236                    }
237                    CommandFragmentChanges::RemoveFragment => {}
238                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
239                        let mut remaining_fragment_ids: HashSet<_> =
240                            replace_map.keys().cloned().collect();
241                        let info = self.fragment_mut(fragment_id);
242                        visit_stream_node_mut(&mut info.nodes, |node| {
243                            if let NodeBody::Merge(m) = node
244                                && let Some(new_upstream_fragment_id) =
245                                    replace_map.get(&m.upstream_fragment_id)
246                            {
247                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
248                                    if cfg!(debug_assertions) {
249                                        panic!(
250                                            "duplicate upstream fragment: {:?} {:?}",
251                                            m, replace_map
252                                        );
253                                    } else {
254                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
255                                    }
256                                }
257                                m.upstream_fragment_id = *new_upstream_fragment_id;
258                            }
259                        });
260                        if cfg!(debug_assertions) {
261                            assert!(
262                                remaining_fragment_ids.is_empty(),
263                                "non-existing fragment to replace: {:?} {:?} {:?}",
264                                remaining_fragment_ids,
265                                info.nodes,
266                                replace_map
267                            );
268                        } else {
269                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
270                        }
271                    }
272                }
273            }
274        }
275    }
276
277    pub(super) fn build_edge(&self, command: Option<&Command>) -> Option<FragmentEdgeBuildResult> {
278        let (info, replace_job) = match command {
279            None => {
280                return None;
281            }
282            Some(command) => match command {
283                Command::Flush
284                | Command::Pause
285                | Command::Resume
286                | Command::DropStreamingJobs { .. }
287                | Command::MergeSnapshotBackfillStreamingJobs(_)
288                | Command::RescheduleFragment { .. }
289                | Command::SourceChangeSplit(_)
290                | Command::Throttle(_)
291                | Command::CreateSubscription { .. }
292                | Command::DropSubscription { .. } => {
293                    return None;
294                }
295                Command::CreateStreamingJob { info, job_type, .. } => {
296                    let replace_job = match job_type {
297                        CreateStreamingJobType::Normal
298                        | CreateStreamingJobType::SnapshotBackfill(_) => None,
299                        CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
300                    };
301                    (Some(info), replace_job)
302                }
303                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
304            },
305        };
306        // `existing_fragment_ids` consists of
307        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
308        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
309        // if the upstream fragment previously exists
310        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments.
311        let existing_fragment_ids = info
312            .into_iter()
313            .flat_map(|info| info.upstream_fragment_downstreams.keys())
314            .chain(replace_job.into_iter().flat_map(|replace_job| {
315                replace_job
316                    .upstream_fragment_downstreams
317                    .keys()
318                    .filter(|fragment_id| {
319                        info.map(|info| {
320                            !info
321                                .stream_job_fragments
322                                .fragments
323                                .contains_key(fragment_id)
324                        })
325                        .unwrap_or(true)
326                    })
327                    .chain(replace_job.replace_upstream.keys())
328            }))
329            .cloned();
330        let new_fragment_infos = info
331            .into_iter()
332            .flat_map(|info| info.stream_job_fragments.new_fragment_info())
333            .chain(
334                replace_job
335                    .into_iter()
336                    .flat_map(|replace_job| replace_job.new_fragments.new_fragment_info()),
337            )
338            .collect_vec();
339        let mut builder = FragmentEdgeBuilder::new(
340            existing_fragment_ids
341                .map(|fragment_id| self.fragment(fragment_id))
342                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
343        );
344        if let Some(info) = info {
345            builder.add_relations(&info.upstream_fragment_downstreams);
346            builder.add_relations(&info.stream_job_fragments.downstreams);
347        }
348        if let Some(replace_job) = replace_job {
349            builder.add_relations(&replace_job.upstream_fragment_downstreams);
350            builder.add_relations(&replace_job.new_fragments.downstreams);
351        }
352        if let Some(replace_job) = replace_job {
353            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
354                for (original_upstream_fragment_id, new_upstream_fragment_id) in
355                    fragment_replacement
356                {
357                    builder.replace_upstream(
358                        *fragment_id,
359                        *original_upstream_fragment_id,
360                        *new_upstream_fragment_id,
361                    );
362                }
363            }
364        }
365        Some(builder.build())
366    }
367}
368
369impl InflightSubscriptionInfo {
370    pub fn pre_apply(&mut self, command: &Command) {
371        if let Command::CreateSubscription {
372            subscription_id,
373            upstream_mv_table_id,
374            retention_second,
375        } = command
376        {
377            if let Some(prev_retiontion) = self
378                .mv_depended_subscriptions
379                .entry(*upstream_mv_table_id)
380                .or_default()
381                .insert(*subscription_id, *retention_second)
382            {
383                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
384            }
385        }
386    }
387}
388
389impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
390    type Item = PbSubscriptionUpstreamInfo;
391
392    type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
393
394    fn into_iter(self) -> Self::IntoIter {
395        self.mv_depended_subscriptions
396            .iter()
397            .flat_map(|(table_id, subscriptions)| {
398                subscriptions
399                    .keys()
400                    .map(|subscriber_id| PbSubscriptionUpstreamInfo {
401                        subscriber_id: *subscriber_id,
402                        upstream_mv_table_id: table_id.table_id,
403                    })
404            })
405    }
406}
407
408impl InflightDatabaseInfo {
409    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
410    /// remove that from the snapshot correspondingly.
411    pub(crate) fn post_apply(
412        &mut self,
413        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
414    ) {
415        {
416            for (fragment_id, changes) in fragment_changes {
417                match changes {
418                    CommandFragmentChanges::NewFragment(_, _) => {}
419                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
420                        let job_id = self.fragment_location[fragment_id];
421                        let info = self
422                            .jobs
423                            .get_mut(&job_id)
424                            .expect("should exist")
425                            .fragment_infos
426                            .get_mut(fragment_id)
427                            .expect("should exist");
428                        for actor_id in to_remove {
429                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
430                        }
431                    }
432                    CommandFragmentChanges::RemoveFragment => {
433                        let job_id = self
434                            .fragment_location
435                            .remove(fragment_id)
436                            .expect("should exist");
437                        let job = self.jobs.get_mut(&job_id).expect("should exist");
438                        job.fragment_infos
439                            .remove(fragment_id)
440                            .expect("should exist");
441                        if job.fragment_infos.is_empty() {
442                            self.jobs.remove(&job_id).expect("should exist");
443                        }
444                    }
445                    CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
446                }
447            }
448        }
449    }
450}
451
452impl InflightSubscriptionInfo {
453    pub fn post_apply(&mut self, command: &Command) {
454        if let Command::DropSubscription {
455            subscription_id,
456            upstream_mv_table_id,
457        } = command
458        {
459            let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
460                Some(subscriptions) => {
461                    let removed = subscriptions.remove(subscription_id).is_some();
462                    if removed && subscriptions.is_empty() {
463                        self.mv_depended_subscriptions.remove(upstream_mv_table_id);
464                    }
465                    removed
466                }
467                None => false,
468            };
469            if !removed {
470                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
471            }
472        }
473    }
474}
475
476impl InflightFragmentInfo {
477    /// Returns actor list to collect in the target worker node.
478    pub(crate) fn actor_ids_to_collect(
479        infos: impl IntoIterator<Item = &Self>,
480    ) -> HashMap<WorkerId, HashSet<ActorId>> {
481        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
482        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
483            assert!(
484                ret.entry(actor.worker_id)
485                    .or_default()
486                    .insert(*actor_id as _)
487            )
488        }
489        ret
490    }
491
492    pub fn existing_table_ids<'a>(
493        infos: impl IntoIterator<Item = &'a Self> + 'a,
494    ) -> impl Iterator<Item = TableId> + 'a {
495        infos
496            .into_iter()
497            .flat_map(|info| info.state_table_ids.iter().cloned())
498    }
499
500    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
501        infos.into_iter().any(|fragment| {
502            fragment
503                .actors
504                .values()
505                .any(|actor| (actor.worker_id) == worker_id)
506        })
507    }
508}
509
510impl InflightDatabaseInfo {
511    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
512        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
513    }
514
515    pub(crate) fn workers(&self) -> HashSet<WorkerId> {
516        self.fragment_infos()
517            .flat_map(|info| info.actors.values())
518            .map(|actor| actor.worker_id)
519            .collect()
520    }
521
522    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
523        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
524    }
525}