risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::id::{SourceId, TableId};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_pb::id::FragmentId;
23use tokio::sync::mpsc;
24use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
25
26use crate::error::{IntoUnexpectedExit, StreamError};
27use crate::executor::exchange::permit::{self, channel_from_config};
28use crate::executor::{Barrier, BarrierInner};
29use crate::task::barrier_manager::progress::BackfillState;
30use crate::task::cdc_progress::CdcTableBackfillState;
31use crate::task::{ActorId, StreamEnvironment};
32
33/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState`].
34///
35/// See [`crate::task`] for architecture overview.
36pub(super) enum LocalBarrierEvent {
37    ReportActorCollected {
38        actor_id: ActorId,
39        epoch: EpochPair,
40    },
41    ReportCreateProgress {
42        epoch: EpochPair,
43        fragment_id: FragmentId,
44        actor: ActorId,
45        state: BackfillState,
46    },
47    ReportSourceListFinished {
48        epoch: EpochPair,
49        actor_id: ActorId,
50        table_id: TableId,
51        associated_source_id: SourceId,
52    },
53    ReportSourceLoadFinished {
54        epoch: EpochPair,
55        actor_id: ActorId,
56        table_id: TableId,
57        associated_source_id: SourceId,
58    },
59    RefreshFinished {
60        epoch: EpochPair,
61        actor_id: ActorId,
62        table_id: TableId,
63        staging_table_id: TableId,
64    },
65    RegisterBarrierSender {
66        actor_id: ActorId,
67        barrier_sender: mpsc::UnboundedSender<Barrier>,
68    },
69    RegisterLocalUpstreamOutput {
70        actor_id: ActorId,
71        upstream_actor_id: ActorId,
72        tx: permit::Sender,
73    },
74    ReportCdcTableBackfillProgress {
75        actor_id: ActorId,
76        epoch: EpochPair,
77        state: CdcTableBackfillState,
78    },
79}
80
81/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
82///
83/// See [`crate::task`] for architecture overview.
84#[derive(Clone)]
85pub struct LocalBarrierManager {
86    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
87    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
88    pub(crate) database_id: DatabaseId,
89    pub(crate) term_id: String,
90    pub(crate) env: StreamEnvironment,
91}
92
93impl LocalBarrierManager {
94    pub(super) fn new(
95        database_id: DatabaseId,
96        term_id: String,
97        env: StreamEnvironment,
98    ) -> (
99        Self,
100        UnboundedReceiver<LocalBarrierEvent>,
101        UnboundedReceiver<(ActorId, StreamError)>,
102    ) {
103        let (event_tx, event_rx) = unbounded_channel();
104        let (err_tx, err_rx) = unbounded_channel();
105        (
106            Self {
107                barrier_event_sender: event_tx,
108                actor_failure_sender: err_tx,
109                database_id,
110                term_id,
111                env,
112            },
113            event_rx,
114            err_rx,
115        )
116    }
117
118    pub fn for_test() -> Self {
119        Self::new(
120            114514.into(),
121            "114514".to_owned(),
122            StreamEnvironment::for_test(),
123        )
124        .0
125    }
126
127    /// Event is handled by [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
128    fn send_event(&self, event: LocalBarrierEvent) {
129        // ignore error, because the current barrier manager maybe a stale one
130        let _ = self.barrier_event_sender.send(event);
131    }
132
133    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
134    /// and collect this barrier with its own `actor_id` using this function.
135    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
136        self.send_event(LocalBarrierEvent::ReportActorCollected {
137            actor_id,
138            epoch: barrier.epoch,
139        })
140    }
141
142    /// When a actor exit unexpectedly, it should report this event using this function, so meta
143    /// will notice actor's exit while collecting.
144    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
145        let _ = self
146            .actor_failure_sender
147            .send((actor_id, err.into_unexpected_exit(actor_id)));
148    }
149
150    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
151        let (tx, rx) = mpsc::unbounded_channel();
152        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
153            actor_id,
154            barrier_sender: tx,
155        });
156        rx
157    }
158
159    pub fn register_local_upstream_output(
160        &self,
161        actor_id: ActorId,
162        upstream_actor_id: ActorId,
163    ) -> permit::Receiver {
164        let (tx, rx) = channel_from_config(self.env.global_config());
165        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
166            actor_id,
167            upstream_actor_id,
168            tx,
169        });
170        rx
171    }
172
173    pub fn report_source_list_finished(
174        &self,
175        epoch: EpochPair,
176        actor_id: ActorId,
177        table_id: TableId,
178        associated_source_id: SourceId,
179    ) {
180        self.send_event(LocalBarrierEvent::ReportSourceListFinished {
181            epoch,
182            actor_id,
183            table_id,
184            associated_source_id,
185        });
186    }
187
188    pub fn report_source_load_finished(
189        &self,
190        epoch: EpochPair,
191        actor_id: ActorId,
192        table_id: TableId,
193        associated_source_id: SourceId,
194    ) {
195        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
196            epoch,
197            actor_id,
198            table_id,
199            associated_source_id,
200        });
201    }
202
203    pub fn report_refresh_finished(
204        &self,
205        epoch: EpochPair,
206        actor_id: ActorId,
207        table_id: TableId,
208        staging_table_id: TableId,
209    ) {
210        self.send_event(LocalBarrierEvent::RefreshFinished {
211            epoch,
212            actor_id,
213            table_id,
214            staging_table_id,
215        });
216    }
217}
218
219#[cfg(test)]
220impl LocalBarrierManager {
221    pub(super) fn spawn_for_test()
222    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
223    {
224        use std::sync::Arc;
225        use std::sync::atomic::AtomicU64;
226
227        use crate::executor::monitor::StreamingMetrics;
228        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
229
230        let (tx, rx) = unbounded_channel();
231        let _join_handle = LocalBarrierWorker::spawn(
232            StreamEnvironment::for_test(),
233            Arc::new(StreamingMetrics::unused()),
234            None,
235            Arc::new(AtomicU64::new(0)),
236            rx,
237        );
238        EventSender(tx)
239    }
240}