risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_meta_model::WorkerId;
22use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
23use risingwave_pb::stream_plan::stream_node::NodeBody;
24use tracing::warn;
25
26use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
27use crate::barrier::rpc::ControlStreamManager;
28use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
29use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
30use crate::model::{ActorId, FragmentId, SubscriptionId};
31
32#[derive(Debug, Clone)]
33pub(super) struct BarrierInfo {
34    pub prev_epoch: TracedEpoch,
35    pub curr_epoch: TracedEpoch,
36    pub kind: BarrierKind,
37}
38
39impl BarrierInfo {
40    pub(super) fn prev_epoch(&self) -> u64 {
41        self.prev_epoch.value().0
42    }
43}
44
45#[derive(Debug, Clone)]
46pub(crate) enum CommandFragmentChanges {
47    NewFragment(TableId, InflightFragmentInfo),
48    ReplaceNodeUpstream(
49        /// old `fragment_id` -> new `fragment_id`
50        HashMap<FragmentId, FragmentId>,
51    ),
52    Reschedule {
53        new_actors: HashMap<ActorId, InflightActorInfo>,
54        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
55        to_remove: HashSet<ActorId>,
56    },
57    RemoveFragment,
58}
59
60#[derive(Default, Clone, Debug)]
61pub struct InflightSubscriptionInfo {
62    /// `mv_table_id` => `subscription_id` => retention seconds
63    pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
64}
65
66#[derive(Clone, Debug)]
67pub struct InflightStreamingJobInfo {
68    pub job_id: TableId,
69    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
70}
71
72impl InflightStreamingJobInfo {
73    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
74        self.fragment_infos.values()
75    }
76
77    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
78        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
79    }
80}
81
82impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
83    type Item = &'a InflightFragmentInfo;
84
85    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
86
87    fn into_iter(self) -> Self::IntoIter {
88        self.fragment_infos()
89    }
90}
91
92#[derive(Clone, Debug)]
93pub struct InflightDatabaseInfo {
94    jobs: HashMap<TableId, InflightStreamingJobInfo>,
95    fragment_location: HashMap<FragmentId, TableId>,
96}
97
98impl InflightDatabaseInfo {
99    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
100        self.jobs.values().flat_map(|job| job.fragment_infos())
101    }
102
103    pub fn contains_job(&self, job_id: TableId) -> bool {
104        self.jobs.contains_key(&job_id)
105    }
106
107    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
108        let job_id = self.fragment_location[&fragment_id];
109        self.jobs
110            .get(&job_id)
111            .expect("should exist")
112            .fragment_infos
113            .get(&fragment_id)
114            .expect("should exist")
115    }
116
117    fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
118        let job_id = self.fragment_location[&fragment_id];
119        self.jobs
120            .get_mut(&job_id)
121            .expect("should exist")
122            .fragment_infos
123            .get_mut(&fragment_id)
124            .expect("should exist")
125    }
126}
127
128impl InflightDatabaseInfo {
129    pub fn empty() -> Self {
130        Self {
131            jobs: Default::default(),
132            fragment_location: Default::default(),
133        }
134    }
135
136    pub fn is_empty(&self) -> bool {
137        self.jobs.is_empty()
138    }
139
140    pub(crate) fn extend(&mut self, job: InflightStreamingJobInfo) {
141        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
142            (
143                fragment_id,
144                CommandFragmentChanges::NewFragment(job.job_id, info),
145            )
146        }))
147    }
148
149    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
150    /// the info correspondingly.
151    pub(crate) fn pre_apply(
152        &mut self,
153        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
154    ) {
155        self.apply_add(
156            fragment_changes
157                .iter()
158                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
159        )
160    }
161
162    fn apply_add(
163        &mut self,
164        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
165    ) {
166        {
167            for (fragment_id, change) in fragment_changes {
168                match change {
169                    CommandFragmentChanges::NewFragment(job_id, info) => {
170                        let fragment_infos =
171                            self.jobs
172                                .entry(job_id)
173                                .or_insert_with(|| InflightStreamingJobInfo {
174                                    job_id,
175                                    fragment_infos: Default::default(),
176                                });
177                        fragment_infos
178                            .fragment_infos
179                            .try_insert(fragment_id, info)
180                            .expect("non duplicate");
181                        self.fragment_location
182                            .try_insert(fragment_id, job_id)
183                            .expect("non duplicate");
184                    }
185                    CommandFragmentChanges::Reschedule {
186                        new_actors,
187                        actor_update_vnode_bitmap,
188                        ..
189                    } => {
190                        let info = self.fragment_mut(fragment_id);
191                        let actors = &mut info.actors;
192                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
193                            actors
194                                .get_mut(&actor_id)
195                                .expect("should exist")
196                                .vnode_bitmap = Some(new_vnodes);
197                        }
198                        for (actor_id, actor) in new_actors {
199                            actors
200                                .try_insert(actor_id as _, actor)
201                                .expect("non-duplicate");
202                        }
203                    }
204                    CommandFragmentChanges::RemoveFragment => {}
205                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
206                        let mut remaining_fragment_ids: HashSet<_> =
207                            replace_map.keys().cloned().collect();
208                        let info = self.fragment_mut(fragment_id);
209                        visit_stream_node_mut(&mut info.nodes, |node| {
210                            if let NodeBody::Merge(m) = node
211                                && let Some(new_upstream_fragment_id) =
212                                    replace_map.get(&m.upstream_fragment_id)
213                            {
214                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
215                                    if cfg!(debug_assertions) {
216                                        panic!(
217                                            "duplicate upstream fragment: {:?} {:?}",
218                                            m, replace_map
219                                        );
220                                    } else {
221                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
222                                    }
223                                }
224                                m.upstream_fragment_id = *new_upstream_fragment_id;
225                            }
226                        });
227                        if cfg!(debug_assertions) {
228                            assert!(
229                                remaining_fragment_ids.is_empty(),
230                                "non-existing fragment to replace: {:?} {:?} {:?}",
231                                remaining_fragment_ids,
232                                info.nodes,
233                                replace_map
234                            );
235                        } else {
236                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
237                        }
238                    }
239                }
240            }
241        }
242    }
243
244    pub(super) fn build_edge(
245        &self,
246        command: Option<&Command>,
247        control_stream_manager: &ControlStreamManager,
248    ) -> Option<FragmentEdgeBuildResult> {
249        let (info, replace_job) = match command {
250            None => {
251                return None;
252            }
253            Some(command) => match command {
254                Command::Flush
255                | Command::Pause
256                | Command::Resume
257                | Command::DropStreamingJobs { .. }
258                | Command::MergeSnapshotBackfillStreamingJobs(_)
259                | Command::RescheduleFragment { .. }
260                | Command::SourceChangeSplit(_)
261                | Command::Throttle(_)
262                | Command::CreateSubscription { .. }
263                | Command::DropSubscription { .. }
264                | Command::ConnectorPropsChange(_) => {
265                    return None;
266                }
267                Command::CreateStreamingJob { info, job_type, .. } => {
268                    let replace_job = match job_type {
269                        CreateStreamingJobType::Normal
270                        | CreateStreamingJobType::SnapshotBackfill(_) => None,
271                        CreateStreamingJobType::SinkIntoTable(replace_job) => Some(replace_job),
272                    };
273                    (Some(info), replace_job)
274                }
275                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job)),
276            },
277        };
278        // `existing_fragment_ids` consists of
279        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
280        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
281        // if the upstream fragment previously exists
282        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments.
283        let existing_fragment_ids = info
284            .into_iter()
285            .flat_map(|info| info.upstream_fragment_downstreams.keys())
286            .chain(replace_job.into_iter().flat_map(|replace_job| {
287                replace_job
288                    .upstream_fragment_downstreams
289                    .keys()
290                    .filter(|fragment_id| {
291                        info.map(|info| {
292                            !info
293                                .stream_job_fragments
294                                .fragments
295                                .contains_key(fragment_id)
296                        })
297                        .unwrap_or(true)
298                    })
299                    .chain(replace_job.replace_upstream.keys())
300            }))
301            .cloned();
302        let new_fragment_infos = info
303            .into_iter()
304            .flat_map(|info| info.stream_job_fragments.new_fragment_info())
305            .chain(
306                replace_job
307                    .into_iter()
308                    .flat_map(|replace_job| replace_job.new_fragments.new_fragment_info()),
309            )
310            .collect_vec();
311        let mut builder = FragmentEdgeBuilder::new(
312            existing_fragment_ids
313                .map(|fragment_id| self.fragment(fragment_id))
314                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
315            control_stream_manager,
316        );
317        if let Some(info) = info {
318            builder.add_relations(&info.upstream_fragment_downstreams);
319            builder.add_relations(&info.stream_job_fragments.downstreams);
320        }
321        if let Some(replace_job) = replace_job {
322            builder.add_relations(&replace_job.upstream_fragment_downstreams);
323            builder.add_relations(&replace_job.new_fragments.downstreams);
324        }
325        if let Some(replace_job) = replace_job {
326            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
327                for (original_upstream_fragment_id, new_upstream_fragment_id) in
328                    fragment_replacement
329                {
330                    builder.replace_upstream(
331                        *fragment_id,
332                        *original_upstream_fragment_id,
333                        *new_upstream_fragment_id,
334                    );
335                }
336            }
337        }
338        Some(builder.build())
339    }
340}
341
342impl InflightSubscriptionInfo {
343    pub fn pre_apply(&mut self, command: &Command) {
344        if let Command::CreateSubscription {
345            subscription_id,
346            upstream_mv_table_id,
347            retention_second,
348        } = command
349        {
350            if let Some(prev_retiontion) = self
351                .mv_depended_subscriptions
352                .entry(*upstream_mv_table_id)
353                .or_default()
354                .insert(*subscription_id, *retention_second)
355            {
356                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
357            }
358        }
359    }
360}
361
362impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
363    type Item = PbSubscriptionUpstreamInfo;
364
365    type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
366
367    fn into_iter(self) -> Self::IntoIter {
368        self.mv_depended_subscriptions
369            .iter()
370            .flat_map(|(table_id, subscriptions)| {
371                subscriptions
372                    .keys()
373                    .map(|subscriber_id| PbSubscriptionUpstreamInfo {
374                        subscriber_id: *subscriber_id,
375                        upstream_mv_table_id: table_id.table_id,
376                    })
377            })
378    }
379}
380
381impl InflightDatabaseInfo {
382    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
383    /// remove that from the snapshot correspondingly.
384    pub(crate) fn post_apply(
385        &mut self,
386        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
387    ) {
388        {
389            for (fragment_id, changes) in fragment_changes {
390                match changes {
391                    CommandFragmentChanges::NewFragment(_, _) => {}
392                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
393                        let job_id = self.fragment_location[fragment_id];
394                        let info = self
395                            .jobs
396                            .get_mut(&job_id)
397                            .expect("should exist")
398                            .fragment_infos
399                            .get_mut(fragment_id)
400                            .expect("should exist");
401                        for actor_id in to_remove {
402                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
403                        }
404                    }
405                    CommandFragmentChanges::RemoveFragment => {
406                        let job_id = self
407                            .fragment_location
408                            .remove(fragment_id)
409                            .expect("should exist");
410                        let job = self.jobs.get_mut(&job_id).expect("should exist");
411                        job.fragment_infos
412                            .remove(fragment_id)
413                            .expect("should exist");
414                        if job.fragment_infos.is_empty() {
415                            self.jobs.remove(&job_id).expect("should exist");
416                        }
417                    }
418                    CommandFragmentChanges::ReplaceNodeUpstream(_) => {}
419                }
420            }
421        }
422    }
423}
424
425impl InflightSubscriptionInfo {
426    pub fn post_apply(&mut self, command: &Command) {
427        if let Command::DropSubscription {
428            subscription_id,
429            upstream_mv_table_id,
430        } = command
431        {
432            let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
433                Some(subscriptions) => {
434                    let removed = subscriptions.remove(subscription_id).is_some();
435                    if removed && subscriptions.is_empty() {
436                        self.mv_depended_subscriptions.remove(upstream_mv_table_id);
437                    }
438                    removed
439                }
440                None => false,
441            };
442            if !removed {
443                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
444            }
445        }
446    }
447}
448
449impl InflightFragmentInfo {
450    /// Returns actor list to collect in the target worker node.
451    pub(crate) fn actor_ids_to_collect(
452        infos: impl IntoIterator<Item = &Self>,
453    ) -> HashMap<WorkerId, HashSet<ActorId>> {
454        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
455        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
456            assert!(
457                ret.entry(actor.worker_id)
458                    .or_default()
459                    .insert(*actor_id as _)
460            )
461        }
462        ret
463    }
464
465    pub fn existing_table_ids<'a>(
466        infos: impl IntoIterator<Item = &'a Self> + 'a,
467    ) -> impl Iterator<Item = TableId> + 'a {
468        infos
469            .into_iter()
470            .flat_map(|info| info.state_table_ids.iter().cloned())
471    }
472
473    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
474        infos.into_iter().any(|fragment| {
475            fragment
476                .actors
477                .values()
478                .any(|actor| (actor.worker_id) == worker_id)
479        })
480    }
481
482    pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
483        infos
484            .into_iter()
485            .flat_map(|info| info.actors.values())
486            .map(|actor| actor.worker_id)
487            .collect()
488    }
489}
490
491impl InflightDatabaseInfo {
492    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
493        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
494    }
495
496    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
497        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
498    }
499}