risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_meta_model::WorkerId;
22use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use tracing::warn;
25
26use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
27use crate::barrier::rpc::ControlStreamManager;
28use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
29use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
30use crate::model::{ActorId, FragmentId, SubscriptionId};
31
32#[derive(Debug, Clone)]
33pub(super) struct BarrierInfo {
34    pub prev_epoch: TracedEpoch,
35    pub curr_epoch: TracedEpoch,
36    pub kind: BarrierKind,
37}
38
39impl BarrierInfo {
40    pub(super) fn prev_epoch(&self) -> u64 {
41        self.prev_epoch.value().0
42    }
43}
44
45#[derive(Debug, Clone)]
46pub(crate) enum CommandFragmentChanges {
47    NewFragment(TableId, InflightFragmentInfo),
48    ReplaceNodeUpstream(
49        /// old `fragment_id` -> new `fragment_id`
50        HashMap<FragmentId, FragmentId>,
51    ),
52    Reschedule {
53        new_actors: HashMap<ActorId, InflightActorInfo>,
54        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
55        to_remove: HashSet<ActorId>,
56    },
57    RemoveFragment,
58}
59
60#[derive(Default, Clone, Debug)]
61pub struct InflightSubscriptionInfo {
62    /// `mv_table_id` => `subscription_id` => retention seconds
63    pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
64}
65
66#[derive(Clone, Debug)]
67pub struct InflightStreamingJobInfo {
68    pub job_id: TableId,
69    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
70}
71
72impl InflightStreamingJobInfo {
73    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
74        self.fragment_infos.values()
75    }
76
77    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
78        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
79    }
80}
81
82impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
83    type Item = &'a InflightFragmentInfo;
84
85    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
86
87    fn into_iter(self) -> Self::IntoIter {
88        self.fragment_infos()
89    }
90}
91
92#[derive(Clone, Debug)]
93pub struct InflightDatabaseInfo {
94    jobs: HashMap<TableId, InflightStreamingJobInfo>,
95    fragment_location: HashMap<FragmentId, TableId>,
96}
97
98impl InflightDatabaseInfo {
99    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
100        self.jobs.values().flat_map(|job| job.fragment_infos())
101    }
102
103    pub fn contains_job(&self, job_id: TableId) -> bool {
104        self.jobs.contains_key(&job_id)
105    }
106
107    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
108        let job_id = self.fragment_location[&fragment_id];
109        self.jobs
110            .get(&job_id)
111            .expect("should exist")
112            .fragment_infos
113            .get(&fragment_id)
114            .expect("should exist")
115    }
116
117    fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
118        let job_id = self.fragment_location[&fragment_id];
119        self.jobs
120            .get_mut(&job_id)
121            .expect("should exist")
122            .fragment_infos
123            .get_mut(&fragment_id)
124            .expect("should exist")
125    }
126}
127
128impl InflightDatabaseInfo {
129    pub fn empty() -> Self {
130        Self {
131            jobs: Default::default(),
132            fragment_location: Default::default(),
133        }
134    }
135
136    pub fn is_empty(&self) -> bool {
137        self.jobs.is_empty()
138    }
139
140    pub(crate) fn extend(&mut self, job: InflightStreamingJobInfo) {
141        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
142            (
143                fragment_id,
144                CommandFragmentChanges::NewFragment(job.job_id, info),
145            )
146        }))
147    }
148
149    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
150    /// the info correspondingly.
151    pub(crate) fn pre_apply(
152        &mut self,
153        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
154    ) {
155        self.apply_add(
156            fragment_changes
157                .iter()
158                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
159        )
160    }
161
162    fn apply_add(
163        &mut self,
164        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
165    ) {
166        {
167            for (fragment_id, change) in fragment_changes {
168                match change {
169                    CommandFragmentChanges::NewFragment(job_id, info) => {
170                        let fragment_infos =
171                            self.jobs
172                                .entry(job_id)
173                                .or_insert_with(|| InflightStreamingJobInfo {
174                                    job_id,
175                                    fragment_infos: Default::default(),
176                                });
177                        fragment_infos
178                            .fragment_infos
179                            .try_insert(fragment_id, info)
180                            .expect("non duplicate");
181                        self.fragment_location
182                            .try_insert(fragment_id, job_id)
183                            .expect("non duplicate");
184                    }
185                    CommandFragmentChanges::Reschedule {
186                        new_actors,
187                        actor_update_vnode_bitmap,
188                        ..
189                    } => {
190                        let info = self.fragment_mut(fragment_id);
191                        let actors = &mut info.actors;
192                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
193                            actors
194                                .get_mut(&actor_id)
195                                .expect("should exist")
196                                .vnode_bitmap = Some(new_vnodes);
197                        }
198                        for (actor_id, actor) in new_actors {
199                            actors
200                                .try_insert(actor_id as _, actor)
201                                .expect("non-duplicate");
202                        }
203                    }
204                    CommandFragmentChanges::RemoveFragment => {}
205                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
206                        let mut remaining_fragment_ids: HashSet<_> =
207                            replace_map.keys().cloned().collect();
208                        let info = self.fragment_mut(fragment_id);
209                        visit_stream_node_mut(&mut info.nodes, |node| {
210                            if let NodeBody::Merge(m) = node
211                                && let Some(new_upstream_fragment_id) =
212                                    replace_map.get(&m.upstream_fragment_id)
213                            {
214                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
215                                    if cfg!(debug_assertions) {
216                                        panic!(
217                                            "duplicate upstream fragment: {:?} {:?}",
218                                            m, replace_map
219                                        );
220                                    } else {
221                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
222                                    }
223                                }
224                                m.upstream_fragment_id = *new_upstream_fragment_id;
225                            }
226                        });
227                        if cfg!(debug_assertions) {
228                            assert!(
229                                remaining_fragment_ids.is_empty(),
230                                "non-existing fragment to replace: {:?} {:?} {:?}",
231                                remaining_fragment_ids,
232                                info.nodes,
233                                replace_map
234                            );
235                        } else {
236                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
237                        }
238                    }
239                }
240            }
241        }
242    }
243
244    pub(super) fn build_edge(
245        &self,
246        command: Option<&Command>,
247        control_stream_manager: &ControlStreamManager,
248    ) -> Option<FragmentEdgeBuildResult> {
249        let (info, replace_job) = match command {
250            None => {
251                return None;
252            }
253            Some(command) => match command {
254                Command::Flush
255                | Command::Pause
256                | Command::Resume
257                | Command::DropStreamingJobs { .. }
258                | Command::MergeSnapshotBackfillStreamingJobs(_)
259                | Command::RescheduleFragment { .. }
260                | Command::SourceChangeSplit(_)
261                | Command::Throttle(_)
262                | Command::CreateSubscription { .. }
263                | Command::DropSubscription { .. }
264                | Command::ConnectorPropsChange(_)
265                | Command::StartFragmentBackfill { .. } => {
266                    return None;
267                }
268                Command::CreateStreamingJob { info, job_type, .. } => {
269                    let replace_job = match job_type {
270                        CreateStreamingJobType::Normal
271                        | CreateStreamingJobType::SnapshotBackfill(_) => None,
272                        CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
273                    };
274                    (Some(info), replace_job)
275                }
276                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
277            },
278        };
279        // `existing_fragment_ids` consists of
280        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
281        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
282        // if the upstream fragment previously exists
283        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments.
284        let existing_fragment_ids = info
285            .into_iter()
286            .flat_map(|info| info.upstream_fragment_downstreams.keys())
287            .chain(replace_job.into_iter().flat_map(|replace_job| {
288                replace_job
289                    .upstream_fragment_downstreams
290                    .keys()
291                    .filter(|fragment_id| {
292                        info.map(|info| {
293                            !info
294                                .stream_job_fragments
295                                .fragments
296                                .contains_key(fragment_id)
297                        })
298                        .unwrap_or(true)
299                    })
300                    .chain(replace_job.replace_upstream.keys())
301            }))
302            .cloned();
303        let new_fragment_infos = info
304            .into_iter()
305            .flat_map(|info| info.stream_job_fragments.new_fragment_info())
306            .chain(
307                replace_job
308                    .into_iter()
309                    .flat_map(|replace_job| replace_job.new_fragments.new_fragment_info()),
310            )
311            .collect_vec();
312        let mut builder = FragmentEdgeBuilder::new(
313            existing_fragment_ids
314                .map(|fragment_id| self.fragment(fragment_id))
315                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
316            control_stream_manager,
317        );
318        if let Some(info) = info {
319            builder.add_relations(&info.upstream_fragment_downstreams);
320            builder.add_relations(&info.stream_job_fragments.downstreams);
321        }
322        if let Some(replace_job) = replace_job {
323            builder.add_relations(&replace_job.upstream_fragment_downstreams);
324            builder.add_relations(&replace_job.new_fragments.downstreams);
325        }
326        if let Some(replace_job) = replace_job {
327            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
328                for (original_upstream_fragment_id, new_upstream_fragment_id) in
329                    fragment_replacement
330                {
331                    builder.replace_upstream(
332                        *fragment_id,
333                        *original_upstream_fragment_id,
334                        *new_upstream_fragment_id,
335                    );
336                }
337            }
338        }
339        Some(builder.build())
340    }
341}
342
343impl InflightSubscriptionInfo {
344    pub fn pre_apply(&mut self, command: &Command) {
345        if let Command::CreateSubscription {
346            subscription_id,
347            upstream_mv_table_id,
348            retention_second,
349        } = command
350        {
351            if let Some(prev_retiontion) = self
352                .mv_depended_subscriptions
353                .entry(*upstream_mv_table_id)
354                .or_default()
355                .insert(*subscription_id, *retention_second)
356            {
357                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
358            }
359        }
360    }
361}
362
363impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
364    type Item = PbSubscriptionUpstreamInfo;
365
366    type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
367
368    fn into_iter(self) -> Self::IntoIter {
369        self.mv_depended_subscriptions
370            .iter()
371            .flat_map(|(table_id, subscriptions)| {
372                subscriptions
373                    .keys()
374                    .map(|subscriber_id| PbSubscriptionUpstreamInfo {
375                        subscriber_id: *subscriber_id,
376                        upstream_mv_table_id: table_id.table_id,
377                    })
378            })
379    }
380}
381
382impl InflightDatabaseInfo {
383    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
384    /// remove that from the snapshot correspondingly.
385    pub(crate) fn post_apply(
386        &mut self,
387        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
388    ) {
389        {
390            for (fragment_id, changes) in fragment_changes {
391                match changes {
392                    CommandFragmentChanges::NewFragment(_, _) => {}
393                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
394                        let job_id = self.fragment_location[fragment_id];
395                        let info = self
396                            .jobs
397                            .get_mut(&job_id)
398                            .expect("should exist")
399                            .fragment_infos
400                            .get_mut(fragment_id)
401                            .expect("should exist");
402                        for actor_id in to_remove {
403                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
404                        }
405                    }
406                    CommandFragmentChanges::RemoveFragment => {
407                        let job_id = self
408                            .fragment_location
409                            .remove(fragment_id)
410                            .expect("should exist");
411                        let job = self.jobs.get_mut(&job_id).expect("should exist");
412                        job.fragment_infos
413                            .remove(fragment_id)
414                            .expect("should exist");
415                        if job.fragment_infos.is_empty() {
416                            self.jobs.remove(&job_id).expect("should exist");
417                        }
418                    }
419                    CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
420                }
421            }
422        }
423    }
424}
425
426impl InflightSubscriptionInfo {
427    pub fn post_apply(&mut self, command: &Command) {
428        if let Command::DropSubscription {
429            subscription_id,
430            upstream_mv_table_id,
431        } = command
432        {
433            let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
434                Some(subscriptions) => {
435                    let removed = subscriptions.remove(subscription_id).is_some();
436                    if removed && subscriptions.is_empty() {
437                        self.mv_depended_subscriptions.remove(upstream_mv_table_id);
438                    }
439                    removed
440                }
441                None => false,
442            };
443            if !removed {
444                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
445            }
446        }
447    }
448}
449
450impl InflightFragmentInfo {
451    /// Returns actor list to collect in the target worker node.
452    pub(crate) fn actor_ids_to_collect(
453        infos: impl IntoIterator<Item = &Self>,
454    ) -> HashMap<WorkerId, HashSet<ActorId>> {
455        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
456        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
457            assert!(
458                ret.entry(actor.worker_id)
459                    .or_default()
460                    .insert(*actor_id as _)
461            )
462        }
463        ret
464    }
465
466    pub fn existing_table_ids<'a>(
467        infos: impl IntoIterator<Item = &'a Self> + 'a,
468    ) -> impl Iterator<Item = TableId> + 'a {
469        infos
470            .into_iter()
471            .flat_map(|info| info.state_table_ids.iter().cloned())
472    }
473
474    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
475        infos.into_iter().any(|fragment| {
476            fragment
477                .actors
478                .values()
479                .any(|actor| (actor.worker_id) == worker_id)
480        })
481    }
482
483    pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
484        infos
485            .into_iter()
486            .flat_map(|info| info.actors.values())
487            .map(|actor| actor.worker_id)
488            .collect()
489    }
490}
491
492impl InflightDatabaseInfo {
493    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
494        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
495    }
496
497    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
498        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
499    }
500}