risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::util::epoch::EpochPair;
21use tokio::sync::mpsc;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
23
24use crate::error::{IntoUnexpectedExit, StreamError};
25use crate::executor::exchange::permit::{self, channel_from_config};
26use crate::executor::{Barrier, BarrierInner};
27use crate::task::barrier_manager::progress::BackfillState;
28use crate::task::cdc_progress::CdcTableBackfillState;
29use crate::task::{ActorId, StreamEnvironment};
30
31/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState`].
32///
33/// See [`crate::task`] for architecture overview.
34pub(super) enum LocalBarrierEvent {
35    ReportActorCollected {
36        actor_id: ActorId,
37        epoch: EpochPair,
38    },
39    ReportCreateProgress {
40        epoch: EpochPair,
41        actor: ActorId,
42        state: BackfillState,
43    },
44    ReportSourceListFinished {
45        epoch: EpochPair,
46        actor_id: ActorId,
47        table_id: u32,
48        associated_source_id: u32,
49    },
50    ReportSourceLoadFinished {
51        epoch: EpochPair,
52        actor_id: ActorId,
53        table_id: u32,
54        associated_source_id: u32,
55    },
56    RefreshFinished {
57        epoch: EpochPair,
58        actor_id: ActorId,
59        table_id: u32,
60        staging_table_id: u32,
61    },
62    RegisterBarrierSender {
63        actor_id: ActorId,
64        barrier_sender: mpsc::UnboundedSender<Barrier>,
65    },
66    RegisterLocalUpstreamOutput {
67        actor_id: ActorId,
68        upstream_actor_id: ActorId,
69        tx: permit::Sender,
70    },
71    ReportCdcTableBackfillProgress {
72        actor_id: ActorId,
73        epoch: EpochPair,
74        state: CdcTableBackfillState,
75    },
76}
77
78/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
79///
80/// See [`crate::task`] for architecture overview.
81#[derive(Clone)]
82pub struct LocalBarrierManager {
83    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
84    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
85    pub(crate) database_id: DatabaseId,
86    pub(crate) term_id: String,
87    pub(crate) env: StreamEnvironment,
88}
89
90impl LocalBarrierManager {
91    pub(super) fn new(
92        database_id: DatabaseId,
93        term_id: String,
94        env: StreamEnvironment,
95    ) -> (
96        Self,
97        UnboundedReceiver<LocalBarrierEvent>,
98        UnboundedReceiver<(ActorId, StreamError)>,
99    ) {
100        let (event_tx, event_rx) = unbounded_channel();
101        let (err_tx, err_rx) = unbounded_channel();
102        (
103            Self {
104                barrier_event_sender: event_tx,
105                actor_failure_sender: err_tx,
106                database_id,
107                term_id,
108                env,
109            },
110            event_rx,
111            err_rx,
112        )
113    }
114
115    pub fn for_test() -> Self {
116        Self::new(
117            DatabaseId {
118                database_id: 114514,
119            },
120            "114514".to_owned(),
121            StreamEnvironment::for_test(),
122        )
123        .0
124    }
125
126    /// Event is handled by [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
127    fn send_event(&self, event: LocalBarrierEvent) {
128        // ignore error, because the current barrier manager maybe a stale one
129        let _ = self.barrier_event_sender.send(event);
130    }
131
132    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
133    /// and collect this barrier with its own `actor_id` using this function.
134    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
135        self.send_event(LocalBarrierEvent::ReportActorCollected {
136            actor_id,
137            epoch: barrier.epoch,
138        })
139    }
140
141    /// When a actor exit unexpectedly, it should report this event using this function, so meta
142    /// will notice actor's exit while collecting.
143    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
144        let _ = self
145            .actor_failure_sender
146            .send((actor_id, err.into_unexpected_exit(actor_id)));
147    }
148
149    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
150        let (tx, rx) = mpsc::unbounded_channel();
151        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
152            actor_id,
153            barrier_sender: tx,
154        });
155        rx
156    }
157
158    pub fn register_local_upstream_output(
159        &self,
160        actor_id: ActorId,
161        upstream_actor_id: ActorId,
162    ) -> permit::Receiver {
163        let (tx, rx) = channel_from_config(self.env.config());
164        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
165            actor_id,
166            upstream_actor_id,
167            tx,
168        });
169        rx
170    }
171
172    pub fn report_source_list_finished(
173        &self,
174        epoch: EpochPair,
175        actor_id: ActorId,
176        table_id: u32,
177        associated_source_id: u32,
178    ) {
179        self.send_event(LocalBarrierEvent::ReportSourceListFinished {
180            epoch,
181            actor_id,
182            table_id,
183            associated_source_id,
184        });
185    }
186
187    pub fn report_source_load_finished(
188        &self,
189        epoch: EpochPair,
190        actor_id: ActorId,
191        table_id: u32,
192        associated_source_id: u32,
193    ) {
194        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
195            epoch,
196            actor_id,
197            table_id,
198            associated_source_id,
199        });
200    }
201
202    pub fn report_refresh_finished(
203        &self,
204        epoch: EpochPair,
205        actor_id: ActorId,
206        table_id: u32,
207        staging_table_id: u32,
208    ) {
209        self.send_event(LocalBarrierEvent::RefreshFinished {
210            epoch,
211            actor_id,
212            table_id,
213            staging_table_id,
214        });
215    }
216}
217
218#[cfg(test)]
219impl LocalBarrierManager {
220    pub(super) fn spawn_for_test()
221    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
222    {
223        use std::sync::Arc;
224        use std::sync::atomic::AtomicU64;
225
226        use crate::executor::monitor::StreamingMetrics;
227        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
228
229        let (tx, rx) = unbounded_channel();
230        let _join_handle = LocalBarrierWorker::spawn(
231            StreamEnvironment::for_test(),
232            Arc::new(StreamingMetrics::unused()),
233            None,
234            Arc::new(AtomicU64::new(0)),
235            rx,
236        );
237        EventSender(tx)
238    }
239}