risingwave_stream/task/barrier_manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cdc_progress;
16pub mod progress;
17
18pub use progress::CreateMviewProgressReporter;
19use risingwave_common::catalog::DatabaseId;
20use risingwave_common::util::epoch::EpochPair;
21use tokio::sync::mpsc;
22use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
23
24use crate::error::{IntoUnexpectedExit, StreamError};
25use crate::executor::exchange::permit::{self, channel_from_config};
26use crate::executor::{Barrier, BarrierInner};
27use crate::task::barrier_manager::progress::BackfillState;
28use crate::task::cdc_progress::CdcTableBackfillState;
29use crate::task::{ActorId, StreamEnvironment};
30
31/// Events sent from actors via [`LocalBarrierManager`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState`].
32///
33/// See [`crate::task`] for architecture overview.
34pub(super) enum LocalBarrierEvent {
35    ReportActorCollected {
36        actor_id: ActorId,
37        epoch: EpochPair,
38    },
39    ReportCreateProgress {
40        epoch: EpochPair,
41        actor: ActorId,
42        state: BackfillState,
43    },
44    ReportSourceLoadFinished {
45        epoch: EpochPair,
46        actor_id: ActorId,
47        table_id: u32,
48        associated_source_id: u32,
49    },
50    RefreshFinished {
51        epoch: EpochPair,
52        actor_id: ActorId,
53        table_id: u32,
54        staging_table_id: u32,
55    },
56    RegisterBarrierSender {
57        actor_id: ActorId,
58        barrier_sender: mpsc::UnboundedSender<Barrier>,
59    },
60    RegisterLocalUpstreamOutput {
61        actor_id: ActorId,
62        upstream_actor_id: ActorId,
63        tx: permit::Sender,
64    },
65    ReportCdcTableBackfillProgress {
66        actor_id: ActorId,
67        epoch: EpochPair,
68        state: CdcTableBackfillState,
69    },
70}
71
72/// Can send [`LocalBarrierEvent`] to [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
73///
74/// See [`crate::task`] for architecture overview.
75#[derive(Clone)]
76pub struct LocalBarrierManager {
77    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
78    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
79    pub(crate) database_id: DatabaseId,
80    pub(crate) term_id: String,
81    pub(crate) env: StreamEnvironment,
82}
83
84impl LocalBarrierManager {
85    pub(super) fn new(
86        database_id: DatabaseId,
87        term_id: String,
88        env: StreamEnvironment,
89    ) -> (
90        Self,
91        UnboundedReceiver<LocalBarrierEvent>,
92        UnboundedReceiver<(ActorId, StreamError)>,
93    ) {
94        let (event_tx, event_rx) = unbounded_channel();
95        let (err_tx, err_rx) = unbounded_channel();
96        (
97            Self {
98                barrier_event_sender: event_tx,
99                actor_failure_sender: err_tx,
100                database_id,
101                term_id,
102                env,
103            },
104            event_rx,
105            err_rx,
106        )
107    }
108
109    pub fn for_test() -> Self {
110        Self::new(
111            DatabaseId {
112                database_id: 114514,
113            },
114            "114514".to_owned(),
115            StreamEnvironment::for_test(),
116        )
117        .0
118    }
119
120    /// Event is handled by [`super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`]
121    fn send_event(&self, event: LocalBarrierEvent) {
122        // ignore error, because the current barrier manager maybe a stale one
123        let _ = self.barrier_event_sender.send(event);
124    }
125
126    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
127    /// and collect this barrier with its own `actor_id` using this function.
128    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
129        self.send_event(LocalBarrierEvent::ReportActorCollected {
130            actor_id,
131            epoch: barrier.epoch,
132        })
133    }
134
135    /// When a actor exit unexpectedly, it should report this event using this function, so meta
136    /// will notice actor's exit while collecting.
137    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
138        let _ = self
139            .actor_failure_sender
140            .send((actor_id, err.into_unexpected_exit(actor_id)));
141    }
142
143    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
144        let (tx, rx) = mpsc::unbounded_channel();
145        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
146            actor_id,
147            barrier_sender: tx,
148        });
149        rx
150    }
151
152    pub fn register_local_upstream_output(
153        &self,
154        actor_id: ActorId,
155        upstream_actor_id: ActorId,
156    ) -> permit::Receiver {
157        let (tx, rx) = channel_from_config(self.env.config());
158        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
159            actor_id,
160            upstream_actor_id,
161            tx,
162        });
163        rx
164    }
165
166    pub fn report_source_load_finished(
167        &self,
168        epoch: EpochPair,
169        actor_id: ActorId,
170        table_id: u32,
171        associated_source_id: u32,
172    ) {
173        self.send_event(LocalBarrierEvent::ReportSourceLoadFinished {
174            epoch,
175            actor_id,
176            table_id,
177            associated_source_id,
178        });
179    }
180
181    pub fn report_refresh_finished(
182        &self,
183        epoch: EpochPair,
184        actor_id: ActorId,
185        table_id: u32,
186        staging_table_id: u32,
187    ) {
188        self.send_event(LocalBarrierEvent::RefreshFinished {
189            epoch,
190            actor_id,
191            table_id,
192            staging_table_id,
193        });
194    }
195}
196
197#[cfg(test)]
198impl LocalBarrierManager {
199    pub(super) fn spawn_for_test()
200    -> crate::task::barrier_worker::EventSender<crate::task::barrier_worker::LocalActorOperation>
201    {
202        use std::sync::Arc;
203        use std::sync::atomic::AtomicU64;
204
205        use crate::executor::monitor::StreamingMetrics;
206        use crate::task::barrier_worker::{EventSender, LocalBarrierWorker};
207
208        let (tx, rx) = unbounded_channel();
209        let _join_handle = LocalBarrierWorker::spawn(
210            StreamEnvironment::for_test(),
211            Arc::new(StreamingMetrics::unused()),
212            None,
213            Arc::new(AtomicU64::new(0)),
214            rx,
215        );
216        EventSender(tx)
217    }
218}