risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
30    PbLocalSstableInfo,
31};
32use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
33use risingwave_storage::store_impl::AsHummock;
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tonic::{Code, Status};
40use tracing::warn;
41
42use self::managed_state::ManagedBarrierState;
43use crate::error::{ScoredStreamError, StreamError, StreamResult};
44#[cfg(test)]
45use crate::task::LocalBarrierManager;
46use crate::task::managed_state::BarrierToComplete;
47use crate::task::{
48    ActorId, AtomicU64Ref, CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY, ConfigOverrideCache,
49    PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
50};
51pub mod managed_state;
52#[cfg(test)]
53mod tests;
54
55use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
56use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
57use risingwave_pb::stream_service::streaming_control_stream_request::{
58    InitRequest, Request, ResetDatabaseRequest,
59};
60use risingwave_pb::stream_service::streaming_control_stream_response::{
61    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
62};
63use risingwave_pb::stream_service::{
64    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
65    StreamingControlStreamResponse, streaming_control_stream_response,
66};
67
68use crate::executor::Barrier;
69use crate::executor::exchange::permit::Receiver;
70use crate::executor::monitor::StreamingMetrics;
71use crate::task::barrier_worker::managed_state::{
72    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
73    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
74};
75
76/// If enabled, all actors will be grouped in the same tracing span within one epoch.
77/// Note that this option will significantly increase the overhead of tracing.
78pub const ENABLE_BARRIER_AGGREGATION: bool = false;
79
80/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::on_epoch_completed`].
81#[derive(Debug)]
82pub struct BarrierCompleteResult {
83    /// The result returned from `sync` of `StateStore`.
84    pub sync_result: Option<SyncResult>,
85
86    /// The updated creation progress of materialized view after this barrier.
87    pub create_mview_progress: Vec<PbCreateMviewProgress>,
88
89    /// The source IDs that have finished listing data for refreshable batch sources.
90    pub list_finished_source_ids: Vec<PbListFinishedSource>,
91
92    /// The source IDs that have finished loading data for refreshable batch sources.
93    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
94
95    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
96
97    /// The table IDs that should be truncated.
98    pub truncate_tables: Vec<TableId>,
99    /// The table IDs that have finished refresh.
100    pub refresh_finished_tables: Vec<TableId>,
101}
102
103/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
104/// Communicates with `ControlStreamManager` in meta.
105/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
106pub(super) struct ControlStreamHandle {
107    #[expect(clippy::type_complexity)]
108    pair: Option<(
109        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
110        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
111    )>,
112}
113
114impl ControlStreamHandle {
115    fn empty() -> Self {
116        Self { pair: None }
117    }
118
119    pub(super) fn new(
120        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
121        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
122    ) -> Self {
123        Self {
124            pair: Some((sender, request_stream)),
125        }
126    }
127
128    pub(super) fn connected(&self) -> bool {
129        self.pair.is_some()
130    }
131
132    fn reset_stream_with_err(&mut self, err: Status) {
133        if let Some((sender, _)) = self.pair.take() {
134            // Note: `TonicStatusWrapper` provides a better error report.
135            let err = TonicStatusWrapper::new(err);
136            warn!(error = %err.as_report(), "control stream reset with error");
137
138            let err = err.into_inner();
139            if sender.send(Err(err)).is_err() {
140                warn!("failed to notify reset of control stream");
141            }
142        }
143    }
144
145    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
146    /// by the meta service.
147    async fn shutdown_stream(&mut self) {
148        if let Some((sender, _)) = self.pair.take() {
149            if sender
150                .send(Ok(StreamingControlStreamResponse {
151                    response: Some(streaming_control_stream_response::Response::Shutdown(
152                        ShutdownResponse::default(),
153                    )),
154                }))
155                .is_err()
156            {
157                warn!("failed to notify shutdown of control stream");
158            } else {
159                tracing::info!("waiting for meta service to close control stream...");
160
161                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
162                // been acknowledged by the meta service for more precise error report.
163                //
164                // This is because the meta service will reset the control stream manager and
165                // drop the connection to us upon recovery. As a result, the receiver part of
166                // this sender will also be dropped, causing the stream to close.
167                sender.closed().await;
168            }
169        } else {
170            debug!("control stream has been reset, ignore shutdown");
171        }
172    }
173
174    pub(super) fn ack_reset_database(
175        &mut self,
176        database_id: DatabaseId,
177        root_err: Option<ScoredStreamError>,
178        reset_request_id: u32,
179    ) {
180        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
181            database_id,
182            root_err: root_err.map(|err| PbScoredError {
183                err_msg: err.error.to_report_string(),
184                score: err.score.0,
185            }),
186            reset_request_id,
187        }));
188    }
189
190    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
191        if let Some((sender, _)) = self.pair.as_ref() {
192            if sender
193                .send(Ok(StreamingControlStreamResponse {
194                    response: Some(response),
195                }))
196                .is_err()
197            {
198                self.pair = None;
199                warn!("fail to send response. control stream reset");
200            }
201        } else {
202            debug!(?response, "control stream has been reset. ignore response");
203        }
204    }
205
206    async fn next_request(&mut self) -> StreamingControlStreamRequest {
207        if let Some((_, stream)) = &mut self.pair {
208            match stream.next().await {
209                Some(Ok(request)) => {
210                    return request;
211                }
212                Some(Err(e)) => self.reset_stream_with_err(
213                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
214                        .context("failed to get request")
215                        .to_status_unnamed(Code::Internal),
216                ),
217                None => self.reset_stream_with_err(Status::internal("end of stream")),
218            }
219        }
220        pending().await
221    }
222}
223
224/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
225///
226/// See [`crate::task`] for architecture overview.
227#[derive(strum_macros::Display)]
228pub(super) enum LocalActorOperation {
229    NewControlStream {
230        handle: ControlStreamHandle,
231        init_request: InitRequest,
232    },
233    TakeReceiver {
234        database_id: DatabaseId,
235        term_id: String,
236        ids: UpDownActorIds,
237        result_sender: oneshot::Sender<StreamResult<Receiver>>,
238    },
239    #[cfg(test)]
240    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
241    #[cfg(test)]
242    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
243    #[cfg(test)]
244    Flush(oneshot::Sender<()>),
245    InspectState {
246        result_sender: oneshot::Sender<String>,
247    },
248    Shutdown {
249        result_sender: oneshot::Sender<()>,
250    },
251}
252
253pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
254    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
255    has_control_stream_connected: bool,
256}
257
258impl Display for LocalBarrierWorkerDebugInfo<'_> {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        writeln!(
261            f,
262            "\nhas_control_stream_connected: {}",
263            self.has_control_stream_connected
264        )?;
265
266        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
267            writeln!(
268                f,
269                "database {} status: {} managed_barrier_state:\n{}",
270                database_id,
271                status,
272                managed_barrier_state
273                    .as_ref()
274                    .map(ToString::to_string)
275                    .unwrap_or_default()
276            )?;
277        }
278        Ok(())
279    }
280}
281
282/// [`LocalBarrierWorker`] manages barrier control flow.
283/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
284/// barriers to and collects them from all actors, and finally reports the progress.
285///
286/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
287///
288/// See [`crate::task`] for architecture overview.
289pub(super) struct LocalBarrierWorker {
290    /// Current barrier collection state.
291    pub(super) state: ManagedBarrierState,
292
293    /// Futures will be finished in the order of epoch in ascending order.
294    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
295
296    control_stream_handle: ControlStreamHandle,
297
298    pub(super) actor_manager: Arc<StreamActorManager>,
299
300    pub(super) term_id: String,
301}
302
303impl LocalBarrierWorker {
304    pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
305        Self {
306            state: Default::default(),
307            await_epoch_completed_futures: Default::default(),
308            control_stream_handle: ControlStreamHandle::empty(),
309            actor_manager,
310            term_id,
311        }
312    }
313
314    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
315        LocalBarrierWorkerDebugInfo {
316            managed_barrier_state: self
317                .state
318                .databases
319                .iter()
320                .map(|(database_id, status)| {
321                    (*database_id, {
322                        match status {
323                            DatabaseStatus::ReceivedExchangeRequest(_) => {
324                                ("ReceivedExchangeRequest".to_owned(), None)
325                            }
326                            DatabaseStatus::Running(state) => {
327                                ("running".to_owned(), Some(state.to_debug_info()))
328                            }
329                            DatabaseStatus::Suspended(state) => {
330                                (format!("suspended: {:?}", state.suspend_time), None)
331                            }
332                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
333                            DatabaseStatus::Unspecified => {
334                                unreachable!()
335                            }
336                        }
337                    })
338                })
339                .collect(),
340            has_control_stream_connected: self.control_stream_handle.connected(),
341        }
342    }
343
344    async fn next_completed_epoch(
345        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
346    ) -> (
347        DatabaseId,
348        PartialGraphId,
349        Barrier,
350        StreamResult<BarrierCompleteResult>,
351    ) {
352        poll_fn(|cx| {
353            for (database_id, futures) in &mut *futures {
354                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
355                    futures.poll_next_unpin(cx)
356                {
357                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
358                }
359            }
360            Poll::Pending
361        })
362        .await
363    }
364
365    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
366        loop {
367            select! {
368                biased;
369                (database_id, event) = self.state.next_event() => {
370                    match event {
371                        ManagedBarrierStateEvent::BarrierCollected{
372                            partial_graph_id,
373                            barrier,
374                        } => {
375                            // update await_epoch_completed_futures
376                            // handled below in next_completed_epoch
377                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
378                        }
379                        ManagedBarrierStateEvent::ActorError{
380                            actor_id,
381                            err,
382                        } => {
383                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
384                        }
385                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
386                            self.ack_database_reset(database_id, Some(output), reset_request_id);
387                        }
388                    }
389                }
390                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
391                    match result {
392                        Ok(result) => {
393                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
394                        }
395                        Err(err) => {
396                            // TODO: may only report as database failure instead of reset the stream
397                            // when the HummockUploader support partial recovery. Currently the HummockUploader
398                            // enter `Err` state and stop working until a global recovery to clear the uploader.
399                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
400                        }
401                    }
402                },
403                actor_op = actor_op_rx.recv() => {
404                    if let Some(actor_op) = actor_op {
405                        match actor_op {
406                            LocalActorOperation::NewControlStream { handle, init_request  } => {
407                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
408                                self.reset(init_request).await;
409                                self.control_stream_handle = handle;
410                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
411                            }
412                            LocalActorOperation::Shutdown { result_sender } => {
413                                if self.state.databases.values().any(|database| {
414                                    match database {
415                                        DatabaseStatus::Running(database) => {
416                                            !database.actor_states.is_empty()
417                                        }
418                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
419                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
420                                            false
421                                        }
422                                        DatabaseStatus::Unspecified => {
423                                            unreachable!()
424                                        }
425                                    }
426                                }) {
427                                    tracing::warn!(
428                                        "shutdown with running actors, scaling or migration will be triggered"
429                                    );
430                                }
431                                self.control_stream_handle.shutdown_stream().await;
432                                let _ = result_sender.send(());
433                            }
434                            actor_op => {
435                                self.handle_actor_op(actor_op);
436                            }
437                        }
438                    }
439                    else {
440                        break;
441                    }
442                },
443                request = self.control_stream_handle.next_request() => {
444                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
445                    if let Err((database_id, err)) = result {
446                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
447                    }
448                },
449            }
450        }
451    }
452
453    fn handle_streaming_control_request(
454        &mut self,
455        request: Request,
456    ) -> Result<(), (DatabaseId, StreamError)> {
457        match request {
458            Request::InjectBarrier(req) => {
459                let database_id = req.database_id;
460                let result: StreamResult<()> = try {
461                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
462                    self.send_barrier(&barrier, req)?;
463                };
464                result.map_err(|e| (database_id, e))?;
465                Ok(())
466            }
467            Request::RemovePartialGraph(req) => {
468                self.remove_partial_graphs(
469                    req.database_id,
470                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
471                );
472                Ok(())
473            }
474            Request::CreatePartialGraph(req) => {
475                self.add_partial_graph(req.database_id, PartialGraphId::new(req.partial_graph_id));
476                Ok(())
477            }
478            Request::ResetDatabase(req) => {
479                self.reset_database(req);
480                Ok(())
481            }
482            Request::Init(_) => {
483                unreachable!()
484            }
485        }
486    }
487
488    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
489        match actor_op {
490            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
491                unreachable!("event {actor_op} should be handled separately in async context")
492            }
493            LocalActorOperation::TakeReceiver {
494                database_id,
495                term_id,
496                ids,
497                result_sender,
498            } => {
499                let err = if self.term_id != term_id {
500                    {
501                        warn!(
502                            ?ids,
503                            term_id,
504                            current_term_id = self.term_id,
505                            "take receiver on unmatched term_id"
506                        );
507                        anyhow!(
508                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
509                            ids,
510                            term_id,
511                            self.term_id
512                        )
513                    }
514                } else {
515                    match self.state.databases.entry(database_id) {
516                        Entry::Occupied(mut entry) => match entry.get_mut() {
517                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
518                                pending_requests.push((ids, result_sender));
519                                return;
520                            }
521                            DatabaseStatus::Running(database) => {
522                                let (upstream_actor_id, actor_id) = ids;
523                                database.new_actor_remote_output_request(
524                                    actor_id,
525                                    upstream_actor_id,
526                                    result_sender,
527                                );
528                                return;
529                            }
530                            DatabaseStatus::Suspended(_) => {
531                                anyhow!("database suspended")
532                            }
533                            DatabaseStatus::Resetting(_) => {
534                                anyhow!("database resetting")
535                            }
536                            DatabaseStatus::Unspecified => {
537                                unreachable!()
538                            }
539                        },
540                        Entry::Vacant(entry) => {
541                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
542                                ids,
543                                result_sender,
544                            )]));
545                            return;
546                        }
547                    }
548                };
549                let _ = result_sender.send(Err(err.into()));
550            }
551            #[cfg(test)]
552            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
553                let database_status = self
554                    .state
555                    .databases
556                    .get(&crate::task::TEST_DATABASE_ID)
557                    .unwrap();
558                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
559                let _ = sender.send(database_state.local_barrier_manager.clone());
560            }
561            #[cfg(test)]
562            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
563                let database_status = self
564                    .state
565                    .databases
566                    .get_mut(&crate::task::TEST_DATABASE_ID)
567                    .unwrap();
568
569                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
570                assert!(!database_state.actor_states.contains_key(&actor_id));
571                let requests = database_state
572                    .actor_pending_new_output_requests
573                    .remove(&actor_id)
574                    .unwrap();
575                let _ = sender.send(requests);
576            }
577            #[cfg(test)]
578            LocalActorOperation::Flush(sender) => {
579                use futures::FutureExt;
580                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
581                    self.handle_streaming_control_request(
582                        request.request.expect("should not be empty"),
583                    )
584                    .unwrap();
585                }
586                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
587                    match event {
588                        ManagedBarrierStateEvent::BarrierCollected {
589                            partial_graph_id,
590                            barrier,
591                        } => {
592                            self.complete_barrier(
593                                database_id,
594                                partial_graph_id,
595                                barrier.epoch.prev,
596                            );
597                        }
598                        ManagedBarrierStateEvent::ActorError { .. }
599                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
600                            unreachable!()
601                        }
602                    }
603                }
604                sender.send(()).unwrap()
605            }
606            LocalActorOperation::InspectState { result_sender } => {
607                let debug_info = self.to_debug_info();
608                let _ = result_sender.send(debug_info.to_string());
609            }
610        }
611    }
612}
613
614mod await_epoch_completed_future {
615    use std::future::Future;
616
617    use futures::FutureExt;
618    use futures::future::BoxFuture;
619    use risingwave_common::id::TableId;
620    use risingwave_hummock_sdk::SyncResult;
621    use risingwave_pb::stream_service::barrier_complete_response::{
622        PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource,
623        PbLoadFinishedSource,
624    };
625
626    use crate::error::StreamResult;
627    use crate::executor::Barrier;
628    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
629
630    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
631        + 'static;
632
633    #[define_opaque(AwaitEpochCompletedFuture)]
634    #[expect(clippy::too_many_arguments)]
635    pub(super) fn instrument_complete_barrier_future(
636        partial_graph_id: PartialGraphId,
637        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
638        barrier: Barrier,
639        barrier_await_tree_reg: Option<&await_tree::Registry>,
640        create_mview_progress: Vec<PbCreateMviewProgress>,
641        list_finished_source_ids: Vec<PbListFinishedSource>,
642        load_finished_source_ids: Vec<PbLoadFinishedSource>,
643        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
644        truncate_tables: Vec<TableId>,
645        refresh_finished_tables: Vec<TableId>,
646    ) -> AwaitEpochCompletedFuture {
647        let prev_epoch = barrier.epoch.prev;
648        let future = async move {
649            if let Some(future) = complete_barrier_future {
650                let result = future.await;
651                result.map(Some)
652            } else {
653                Ok(None)
654            }
655        }
656        .map(move |result| {
657            (
658                partial_graph_id,
659                barrier,
660                result.map(|sync_result| BarrierCompleteResult {
661                    sync_result,
662                    create_mview_progress,
663                    list_finished_source_ids,
664                    load_finished_source_ids,
665                    cdc_table_backfill_progress,
666                    truncate_tables,
667                    refresh_finished_tables,
668                }),
669            )
670        });
671        if let Some(reg) = barrier_await_tree_reg {
672            reg.register(
673                await_tree_key::BarrierAwait { prev_epoch },
674                format!("SyncEpoch({})", prev_epoch),
675            )
676            .instrument(future)
677            .left_future()
678        } else {
679            future.right_future()
680        }
681    }
682}
683
684use await_epoch_completed_future::*;
685use risingwave_common::catalog::{DatabaseId, TableId};
686use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
687use risingwave_storage::{StateStoreImpl, dispatch_state_store};
688
689use crate::executor::exchange::permit;
690
691fn sync_epoch(
692    state_store: &StateStoreImpl,
693    streaming_metrics: &StreamingMetrics,
694    prev_epoch: u64,
695    table_ids: HashSet<TableId>,
696) -> BoxFuture<'static, StreamResult<SyncResult>> {
697    let timer = streaming_metrics.barrier_sync_latency.start_timer();
698
699    let state_store = state_store.clone();
700    let future = async move {
701        dispatch_state_store!(state_store, hummock, {
702            hummock.sync(vec![(prev_epoch, table_ids)]).await
703        })
704    };
705
706    future
707        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
708        .inspect_ok(move |_| {
709            timer.observe_duration();
710        })
711        .map_err(move |e| {
712            tracing::error!(
713                prev_epoch,
714                error = %e.as_report(),
715                "Failed to sync state store",
716            );
717            e.into()
718        })
719        .boxed()
720}
721
722impl LocalBarrierWorker {
723    fn complete_barrier(
724        &mut self,
725        database_id: DatabaseId,
726        partial_graph_id: PartialGraphId,
727        prev_epoch: u64,
728    ) {
729        {
730            let Some(database_state) = self
731                .state
732                .databases
733                .get_mut(&database_id)
734                .expect("should exist")
735                .state_for_request()
736            else {
737                return;
738            };
739            let BarrierToComplete {
740                barrier,
741                table_ids,
742                create_mview_progress,
743                list_finished_source_ids,
744                load_finished_source_ids,
745                cdc_table_backfill_progress,
746                truncate_tables,
747                refresh_finished_tables,
748            } = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
749
750            let complete_barrier_future = match &barrier.kind {
751                BarrierKind::Unspecified => unreachable!(),
752                BarrierKind::Initial => {
753                    tracing::info!(
754                        epoch = prev_epoch,
755                        "ignore sealing data for the first barrier"
756                    );
757                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
758                    None
759                }
760                BarrierKind::Barrier => None,
761                BarrierKind::Checkpoint => Some(sync_epoch(
762                    &self.actor_manager.env.state_store(),
763                    &self.actor_manager.streaming_metrics,
764                    prev_epoch,
765                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
766                )),
767            };
768
769            self.await_epoch_completed_futures
770                .entry(database_id)
771                .or_default()
772                .push_back({
773                    instrument_complete_barrier_future(
774                        partial_graph_id,
775                        complete_barrier_future,
776                        barrier,
777                        self.actor_manager.await_tree_reg.as_ref(),
778                        create_mview_progress,
779                        list_finished_source_ids,
780                        load_finished_source_ids,
781                        cdc_table_backfill_progress,
782                        truncate_tables,
783                        refresh_finished_tables,
784                    )
785                });
786        }
787    }
788
789    fn on_epoch_completed(
790        &mut self,
791        database_id: DatabaseId,
792        partial_graph_id: PartialGraphId,
793        epoch: u64,
794        result: BarrierCompleteResult,
795    ) {
796        let BarrierCompleteResult {
797            create_mview_progress,
798            sync_result,
799            list_finished_source_ids,
800            load_finished_source_ids,
801            cdc_table_backfill_progress,
802            truncate_tables,
803            refresh_finished_tables,
804        } = result;
805
806        let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
807            .map(|sync_result| {
808                (
809                    sync_result.uncommitted_ssts,
810                    sync_result.table_watermarks,
811                    sync_result.old_value_ssts,
812                    sync_result.vector_index_adds,
813                )
814            })
815            .unwrap_or_default();
816
817        let result = {
818            {
819                streaming_control_stream_response::Response::CompleteBarrier(
820                    BarrierCompleteResponse {
821                        request_id: "todo".to_owned(),
822                        partial_graph_id: partial_graph_id.into(),
823                        epoch,
824                        status: None,
825                        create_mview_progress,
826                        synced_sstables: synced_sstables
827                            .into_iter()
828                            .map(
829                                |LocalSstableInfo {
830                                     sst_info,
831                                     table_stats,
832                                     created_at,
833                                 }| PbLocalSstableInfo {
834                                    sst: Some(sst_info.into()),
835                                    table_stats_map: to_prost_table_stats_map(table_stats),
836                                    created_at,
837                                },
838                            )
839                            .collect_vec(),
840                        worker_id: self.actor_manager.env.worker_id(),
841                        table_watermarks: table_watermarks
842                            .into_iter()
843                            .map(|(key, value)| (key, value.into()))
844                            .collect(),
845                        old_value_sstables: old_value_ssts
846                            .into_iter()
847                            .map(|sst| sst.sst_info.into())
848                            .collect(),
849                        database_id,
850                        list_finished_sources: list_finished_source_ids,
851                        load_finished_sources: load_finished_source_ids,
852                        vector_index_adds: vector_index_adds
853                            .into_iter()
854                            .map(|(table_id, adds)| {
855                                (
856                                    table_id,
857                                    PbVectorIndexAdds {
858                                        adds: adds.into_iter().map(|add| add.into()).collect(),
859                                    },
860                                )
861                            })
862                            .collect(),
863                        cdc_table_backfill_progress,
864                        truncate_tables,
865                        refresh_finished_tables,
866                    },
867                )
868            }
869        };
870
871        self.control_stream_handle.send_response(result);
872    }
873
874    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
875    /// barrier is finished, in managed mode.
876    ///
877    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
878    /// the root cause of the failure. The caller should then call `try_find_root_failure`
879    /// to find the root cause.
880    fn send_barrier(
881        &mut self,
882        barrier: &Barrier,
883        request: InjectBarrierRequest,
884    ) -> StreamResult<()> {
885        debug!(
886            target: "events::stream::barrier::manager::send",
887            "send barrier {:?}, actor_ids_to_collect = {:?}",
888            barrier,
889            request.actor_ids_to_collect
890        );
891
892        let database_status = self
893            .state
894            .databases
895            .get_mut(&request.database_id)
896            .expect("should exist");
897        if let Some(state) = database_status.state_for_request() {
898            state.transform_to_issued(barrier, request)?;
899        }
900        Ok(())
901    }
902
903    fn remove_partial_graphs(
904        &mut self,
905        database_id: DatabaseId,
906        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
907    ) {
908        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
909            warn!(
910                %database_id,
911                "database to remove partial graph not exist"
912            );
913            return;
914        };
915        let Some(database_state) = database_status.state_for_request() else {
916            warn!(
917                %database_id,
918                "ignore remove partial graph request on err database",
919            );
920            return;
921        };
922        for partial_graph_id in partial_graph_ids {
923            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
924                assert!(
925                    graph.is_empty(),
926                    "non empty graph to be removed: {}",
927                    &graph
928                );
929            } else {
930                warn!(
931                    partial_graph_id = partial_graph_id.0,
932                    "no partial graph to remove"
933                );
934            }
935        }
936    }
937
938    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
939        let status = match self.state.databases.entry(database_id) {
940            Entry::Occupied(entry) => {
941                let status = entry.into_mut();
942                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
943                    let mut database = DatabaseManagedBarrierState::new(
944                        database_id,
945                        self.term_id.clone(),
946                        self.actor_manager.clone(),
947                    );
948                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
949                    {
950                        database.new_actor_remote_output_request(
951                            actor_id,
952                            upstream_actor_id,
953                            result_sender,
954                        );
955                    }
956                    *status = DatabaseStatus::Running(database);
957                }
958
959                status
960            }
961            Entry::Vacant(entry) => {
962                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
963                    database_id,
964                    self.term_id.clone(),
965                    self.actor_manager.clone(),
966                )))
967            }
968        };
969        if let Some(state) = status.state_for_request() {
970            assert!(
971                state
972                    .graph_states
973                    .insert(
974                        partial_graph_id,
975                        PartialGraphManagedBarrierState::new(&self.actor_manager)
976                    )
977                    .is_none()
978            );
979        }
980    }
981
982    fn reset_database(&mut self, req: ResetDatabaseRequest) {
983        let database_id = req.database_id;
984        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
985            database_status.start_reset(
986                database_id,
987                self.await_epoch_completed_futures.remove(&database_id),
988                req.reset_request_id,
989            );
990        } else {
991            self.ack_database_reset(database_id, None, req.reset_request_id);
992        }
993    }
994
995    fn ack_database_reset(
996        &mut self,
997        database_id: DatabaseId,
998        reset_output: Option<ResetDatabaseOutput>,
999        reset_request_id: u32,
1000    ) {
1001        info!(
1002            %database_id,
1003            "database reset successfully"
1004        );
1005        if let Some(reset_database) = self.state.databases.remove(&database_id) {
1006            match reset_database {
1007                DatabaseStatus::Resetting(_) => {}
1008                _ => {
1009                    unreachable!("must be resetting previously")
1010                }
1011            }
1012        }
1013        self.await_epoch_completed_futures.remove(&database_id);
1014        self.control_stream_handle.ack_reset_database(
1015            database_id,
1016            reset_output.and_then(|output| output.root_err),
1017            reset_request_id,
1018        );
1019    }
1020
1021    /// When some other failure happens (like failed to send barrier), the error is reported using
1022    /// this function. The control stream will be responded with a message to notify about the error,
1023    /// and the global barrier worker will later reset and rerun the database.
1024    fn on_database_failure(
1025        &mut self,
1026        database_id: DatabaseId,
1027        failed_actor: Option<ActorId>,
1028        err: StreamError,
1029        message: impl Into<String>,
1030    ) {
1031        let message = message.into();
1032        error!(%database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1033        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1034        self.state
1035            .databases
1036            .get_mut(&database_id)
1037            .expect("should exist")
1038            .suspend(failed_actor, err, completing_futures);
1039        self.control_stream_handle
1040            .send_response(Response::ReportDatabaseFailure(
1041                ReportDatabaseFailureResponse { database_id },
1042            ));
1043    }
1044
1045    /// Force stop all actors on this worker, and then drop their resources.
1046    async fn reset(&mut self, init_request: InitRequest) {
1047        join_all(
1048            self.state
1049                .databases
1050                .values_mut()
1051                .map(|database| database.abort()),
1052        )
1053        .await;
1054        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1055            m.clear();
1056        }
1057
1058        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1059            hummock
1060                .clear_shared_buffer()
1061                .instrument_await("store_clear_shared_buffer".verbose())
1062                .await
1063        }
1064        self.actor_manager.env.dml_manager_ref().clear();
1065        *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1066        self.actor_manager.env.client_pool().invalidate_all();
1067    }
1068
1069    /// Create a [`LocalBarrierWorker`] with managed mode.
1070    pub fn spawn(
1071        env: StreamEnvironment,
1072        streaming_metrics: Arc<StreamingMetrics>,
1073        await_tree_reg: Option<await_tree::Registry>,
1074        watermark_epoch: AtomicU64Ref,
1075        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1076    ) -> JoinHandle<()> {
1077        let runtime = {
1078            let mut builder = tokio::runtime::Builder::new_multi_thread();
1079            if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1080                builder.worker_threads(worker_threads_num);
1081            }
1082            builder
1083                .thread_name("rw-streaming")
1084                .enable_all()
1085                .build()
1086                .unwrap()
1087        };
1088
1089        let actor_manager = Arc::new(StreamActorManager {
1090            env,
1091            streaming_metrics,
1092            watermark_epoch,
1093            await_tree_reg,
1094            runtime: runtime.into(),
1095            config_override_cache: ConfigOverrideCache::new(CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY),
1096        });
1097        let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1098        tokio::spawn(worker.run(actor_op_rx))
1099    }
1100}
1101
1102pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1103
1104impl<T> Clone for EventSender<T> {
1105    fn clone(&self) -> Self {
1106        Self(self.0.clone())
1107    }
1108}
1109
1110impl<T> EventSender<T> {
1111    pub(super) fn send_event(&self, event: T) {
1112        self.0.send(event).expect("should be able to send event")
1113    }
1114
1115    pub(super) async fn send_and_await<RSP>(
1116        &self,
1117        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1118    ) -> StreamResult<RSP> {
1119        let (tx, rx) = oneshot::channel();
1120        let event = make_event(tx);
1121        self.send_event(event);
1122        rx.await
1123            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1124    }
1125}
1126
1127pub(crate) enum NewOutputRequest {
1128    Local(permit::Sender),
1129    Remote(permit::Sender),
1130}
1131
1132#[cfg(test)]
1133pub(crate) mod barrier_test_utils {
1134    use assert_matches::assert_matches;
1135    use futures::StreamExt;
1136    use risingwave_pb::stream_service::streaming_control_stream_request::{
1137        InitRequest, PbCreatePartialGraphRequest,
1138    };
1139    use risingwave_pb::stream_service::{
1140        InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1141        StreamingControlStreamResponse, streaming_control_stream_request,
1142        streaming_control_stream_response,
1143    };
1144    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1145    use tokio::sync::oneshot;
1146    use tokio_stream::wrappers::UnboundedReceiverStream;
1147    use tonic::Status;
1148
1149    use crate::executor::Barrier;
1150    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1151    use crate::task::{
1152        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1153    };
1154
1155    pub(crate) struct LocalBarrierTestEnv {
1156        pub local_barrier_manager: LocalBarrierManager,
1157        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1158        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1159        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1160    }
1161
1162    impl LocalBarrierTestEnv {
1163        pub(crate) async fn for_test() -> Self {
1164            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1165
1166            let (request_tx, request_rx) = unbounded_channel();
1167            let (response_tx, mut response_rx) = unbounded_channel();
1168
1169            request_tx
1170                .send(Ok(PbStreamingControlStreamRequest {
1171                    request: Some(
1172                        streaming_control_stream_request::Request::CreatePartialGraph(
1173                            PbCreatePartialGraphRequest {
1174                                partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1175                                database_id: TEST_DATABASE_ID,
1176                            },
1177                        ),
1178                    ),
1179                }))
1180                .unwrap();
1181
1182            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1183                handle: ControlStreamHandle::new(
1184                    response_tx,
1185                    UnboundedReceiverStream::new(request_rx).boxed(),
1186                ),
1187                init_request: InitRequest {
1188                    term_id: "for_test".into(),
1189                },
1190            });
1191
1192            assert_matches!(
1193                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1194                streaming_control_stream_response::Response::Init(_)
1195            );
1196
1197            let local_barrier_manager = actor_op_tx
1198                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1199                .await
1200                .unwrap();
1201
1202            Self {
1203                local_barrier_manager,
1204                actor_op_tx,
1205                request_tx,
1206                response_rx,
1207            }
1208        }
1209
1210        pub(crate) fn inject_barrier(
1211            &self,
1212            barrier: &Barrier,
1213            actor_to_collect: impl IntoIterator<Item = ActorId>,
1214        ) {
1215            self.request_tx
1216                .send(Ok(StreamingControlStreamRequest {
1217                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1218                        InjectBarrierRequest {
1219                            request_id: "".to_owned(),
1220                            barrier: Some(barrier.to_protobuf()),
1221                            database_id: TEST_DATABASE_ID,
1222                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1223                            table_ids_to_sync: vec![],
1224                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1225                            actors_to_build: vec![],
1226                        },
1227                    )),
1228                }))
1229                .unwrap();
1230        }
1231
1232        pub(crate) async fn flush_all_events(&self) {
1233            Self::flush_all_events_impl(&self.actor_op_tx).await
1234        }
1235
1236        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1237            let (tx, rx) = oneshot::channel();
1238            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1239            rx.await.unwrap()
1240        }
1241
1242        pub(crate) async fn take_pending_new_output_requests(
1243            &self,
1244            actor_id: ActorId,
1245        ) -> Vec<(ActorId, NewOutputRequest)> {
1246            self.actor_op_tx
1247                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1248                .await
1249                .unwrap()
1250        }
1251    }
1252}