risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::util::epoch::EpochPair;
21use tokio::sync::mpsc;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
23
24use crate::error::{IntoUnexpectedExit, StreamError};
25use crate::executor::exchange::permit::{self, channel_from_config};
26use crate::executor::{Barrier, BarrierInner};
27use crate::task::barrier_manager::progress::BackfillState;
28use crate::task::cdc_progress::CdcTableBackfillState;
29use crate::task::{ActorId, StreamEnvironment};
30
31/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState`].
32///
33/// See [`crate::task`] for architecture overview.
34pub(super) enum LocalBarrierEvent {
35    ReportActorCollected {
36        actor_id: ActorId,
37        epoch: EpochPair,
38    },
39    ReportCreateProgress {
40        epoch: EpochPair,
41        actor: ActorId,
42        state: BackfillState,
43    },
44    ReportSourceLoadFinished {
45        epoch: EpochPair,
46        actor_id: ActorId,
47        table_id: u32,
48        associated_source_id: u32,
49    },
50    RegisterBarrierSender {
51        actor_id: ActorId,
52        barrier_sender: mpsc::UnboundedSender<Barrier>,
53    },
54    RegisterLocalUpstreamOutput {
55        actor_id: ActorId,
56        upstream_actor_id: ActorId,
57        tx: permit::Sender,
58    },
59    ReportCdcTableBackfillProgress {
60        actor_id: ActorId,
61        epoch: EpochPair,
62        state: CdcTableBackfillState,
63    },
64}
65
66/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
67///
68/// See [`crate::task`] for architecture overview.
69#[derive(Clone)]
70pub struct LocalBarrierManager {
71    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
72    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
73    pub(crate) database_id: DatabaseId,
74    pub(crate) term_id: String,
75    pub(crate) env: StreamEnvironment,
76}
77
78impl LocalBarrierManager {
79    pub(super) fn new(
80        database_id: DatabaseId,
81        term_id: String,
82        env: StreamEnvironment,
83    ) -> (
84        Self,
85        UnboundedReceiver<LocalBarrierEvent>,
86        UnboundedReceiver<(ActorId, StreamError)>,
87    ) {
88        let (event_tx, event_rx) = unbounded_channel();
89        let (err_tx, err_rx) = unbounded_channel();
90        (
91            Self {
92                barrier_event_sender: event_tx,
93                actor_failure_sender: err_tx,
94                database_id,
95                term_id,
96                env,
97            },
98            event_rx,
99            err_rx,
100        )
101    }
102
103    #[cfg(test)]
104    pub fn for_test() -> Self {
105        Self::new(
106            DatabaseId {
107                database_id: 114514,
108            },
109            "114514".to_owned(),
110            StreamEnvironment::for_test(),
111        )
112        .0
113    }
114
115    /// Event is handled by [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
116    fn send_event(&self, event: LocalBarrierEvent) {
117        // ignore error, because the current barrier manager maybe a stale one
118        let _ = self.barrier_event_sender.send(event);
119    }
120
121    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
122    /// and collect this barrier with its own `actor_id` using this function.
123    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
124        self.send_event(LocalBarrierEvent::ReportActorCollected {
125            actor_id,
126            epoch: barrier.epoch,
127        })
128    }
129
130    /// When a actor exit unexpectedly, it should report this event using this function, so meta
131    /// will notice actor's exit while collecting.
132    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
133        let _ = self
134            .actor_failure_sender
135            .send((actor_id, err.into_unexpected_exit(actor_id)));
136    }
137
138    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
139        let (tx, rx) = mpsc::unbounded_channel();
140        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
141            actor_id,
142            barrier_sender: tx,
143        });
144        rx
145    }
146
147    pub fn register_local_upstream_output(
148        &self,
149        actor_id: ActorId,
150        upstream_actor_id: ActorId,
151    ) -> permit::Receiver {
152        let (tx, rx) = channel_from_config(self.env.config());
153        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
154            actor_id,
155            upstream_actor_id,
156            tx,
157        });
158        rx
159    }
160
161    pub fn report_source_load_finished(
162        &self,
163        epoch: EpochPair,
164        actor_id: ActorId,
165        table_id: u32,
166        associated_source_id: u32,
167    ) {
168        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
169            epoch,
170            actor_id,
171            table_id,
172            associated_source_id,
173        });
174    }
175}
176
177#[cfg(test)]
178impl LocalBarrierManager {
179    pub(super) fn spawn_for_test()
180    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
181    {
182        use std::sync::Arc;
183        use std::sync::atomic::AtomicU64;
184
185        use crate::executor::monitor::StreamingMetrics;
186        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
187
188        let (tx, rx) = unbounded_channel();
189        let _join_handle = LocalBarrierWorker::spawn(
190            StreamEnvironment::for_test(),
191            Arc::new(StreamingMetrics::unused()),
192            None,
193            Arc::new(AtomicU64::new(0)),
194            rx,
195        );
196        EventSender(tx)
197    }
198}