risingwave_stream/task/
barrier_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::stream::{BoxStream, FuturesOrdered};
27use futures::{FutureExt, StreamExt, TryFutureExt};
28use itertools::Itertools;
29use risingwave_common::error::tonic::extra::{Score, ScoredError};
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::{
32    PbCreateMviewProgress, PbLocalSstableInfo,
33};
34use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
35use risingwave_storage::store_impl::AsHummock;
36use thiserror_ext::AsReport;
37use tokio::select;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
39use tokio::sync::{mpsc, oneshot};
40use tokio::task::JoinHandle;
41use tonic::{Code, Status};
42use tracing::warn;
43
44use self::managed_state::ManagedBarrierState;
45use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
46use crate::task::{ActorId, AtomicU64Ref, PartialGraphId, StreamEnvironment, UpDownActorIds};
47
48mod managed_state;
49mod progress;
50#[cfg(test)]
51mod tests;
52
53pub use progress::CreateMviewProgressReporter;
54use risingwave_common::util::epoch::EpochPair;
55use risingwave_common::util::runtime::BackgroundShutdownRuntime;
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59    DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
63};
64use risingwave_pb::stream_service::{
65    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
66    StreamingControlStreamResponse, streaming_control_stream_response,
67};
68
69use crate::executor::exchange::permit::{Receiver, channel_from_config};
70use crate::executor::monitor::StreamingMetrics;
71use crate::executor::{Barrier, BarrierInner, StreamExecutorError};
72use crate::task::barrier_manager::managed_state::{
73    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
74    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
75};
76use crate::task::barrier_manager::progress::BackfillState;
77
78/// If enabled, all actors will be grouped in the same tracing span within one epoch.
79/// Note that this option will significantly increase the overhead of tracing.
80pub const ENABLE_BARRIER_AGGREGATION: bool = false;
81
82/// Collect result of some barrier on current compute node. Will be reported to the meta service.
83#[derive(Debug)]
84pub struct BarrierCompleteResult {
85    /// The result returned from `sync` of `StateStore`.
86    pub sync_result: Option<SyncResult>,
87
88    /// The updated creation progress of materialized view after this barrier.
89    pub create_mview_progress: Vec<PbCreateMviewProgress>,
90}
91
92pub(super) struct ControlStreamHandle {
93    #[expect(clippy::type_complexity)]
94    pair: Option<(
95        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
96        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
97    )>,
98}
99
100impl ControlStreamHandle {
101    fn empty() -> Self {
102        Self { pair: None }
103    }
104
105    pub(super) fn new(
106        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
107        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
108    ) -> Self {
109        Self {
110            pair: Some((sender, request_stream)),
111        }
112    }
113
114    pub(super) fn connected(&self) -> bool {
115        self.pair.is_some()
116    }
117
118    fn reset_stream_with_err(&mut self, err: Status) {
119        if let Some((sender, _)) = self.pair.take() {
120            // Note: `TonicStatusWrapper` provides a better error report.
121            let err = TonicStatusWrapper::new(err);
122            warn!(error = %err.as_report(), "control stream reset with error");
123
124            let err = err.into_inner();
125            if sender.send(Err(err)).is_err() {
126                warn!("failed to notify reset of control stream");
127            }
128        }
129    }
130
131    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
132    /// by the meta service.
133    async fn shutdown_stream(&mut self) {
134        if let Some((sender, _)) = self.pair.take() {
135            if sender
136                .send(Ok(StreamingControlStreamResponse {
137                    response: Some(streaming_control_stream_response::Response::Shutdown(
138                        ShutdownResponse::default(),
139                    )),
140                }))
141                .is_err()
142            {
143                warn!("failed to notify shutdown of control stream");
144            } else {
145                tracing::info!("waiting for meta service to close control stream...");
146
147                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
148                // been acknowledged by the meta service for more precise error report.
149                //
150                // This is because the meta service will reset the control stream manager and
151                // drop the connection to us upon recovery. As a result, the receiver part of
152                // this sender will also be dropped, causing the stream to close.
153                sender.closed().await;
154            }
155        } else {
156            debug!("control stream has been reset, ignore shutdown");
157        }
158    }
159
160    pub(super) fn ack_reset_database(
161        &mut self,
162        database_id: DatabaseId,
163        root_err: Option<ScoredStreamError>,
164        reset_request_id: u32,
165    ) {
166        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
167            database_id: database_id.database_id,
168            root_err: root_err.map(|err| PbScoredError {
169                err_msg: err.error.to_report_string(),
170                score: err.score.0,
171            }),
172            reset_request_id,
173        }));
174    }
175
176    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
177        if let Some((sender, _)) = self.pair.as_ref() {
178            if sender
179                .send(Ok(StreamingControlStreamResponse {
180                    response: Some(response),
181                }))
182                .is_err()
183            {
184                self.pair = None;
185                warn!("fail to send response. control stream reset");
186            }
187        } else {
188            debug!(?response, "control stream has been reset. ignore response");
189        }
190    }
191
192    async fn next_request(&mut self) -> StreamingControlStreamRequest {
193        if let Some((_, stream)) = &mut self.pair {
194            match stream.next().await {
195                Some(Ok(request)) => {
196                    return request;
197                }
198                Some(Err(e)) => self.reset_stream_with_err(
199                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
200                        .context("failed to get request")
201                        .to_status_unnamed(Code::Internal),
202                ),
203                None => self.reset_stream_with_err(Status::internal("end of stream")),
204            }
205        }
206        pending().await
207    }
208}
209
210pub(super) enum LocalBarrierEvent {
211    ReportActorCollected {
212        actor_id: ActorId,
213        epoch: EpochPair,
214    },
215    ReportCreateProgress {
216        epoch: EpochPair,
217        actor: ActorId,
218        state: BackfillState,
219    },
220    RegisterBarrierSender {
221        actor_id: ActorId,
222        barrier_sender: mpsc::UnboundedSender<Barrier>,
223    },
224    RegisterLocalUpstreamOutput {
225        actor_id: ActorId,
226        upstream_actor_id: ActorId,
227        tx: permit::Sender,
228    },
229}
230
231#[derive(strum_macros::Display)]
232pub(super) enum LocalActorOperation {
233    NewControlStream {
234        handle: ControlStreamHandle,
235        init_request: InitRequest,
236    },
237    TakeReceiver {
238        database_id: DatabaseId,
239        term_id: String,
240        ids: UpDownActorIds,
241        result_sender: oneshot::Sender<StreamResult<Receiver>>,
242    },
243    #[cfg(test)]
244    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
245    #[cfg(test)]
246    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
247    #[cfg(test)]
248    Flush(oneshot::Sender<()>),
249    InspectState {
250        result_sender: oneshot::Sender<String>,
251    },
252    Shutdown {
253        result_sender: oneshot::Sender<()>,
254    },
255}
256
257pub(crate) struct StreamActorManager {
258    pub(super) env: StreamEnvironment,
259    pub(super) streaming_metrics: Arc<StreamingMetrics>,
260
261    /// Watermark epoch number.
262    pub(super) watermark_epoch: AtomicU64Ref,
263
264    /// Manages the await-trees of all actors.
265    pub(super) await_tree_reg: Option<await_tree::Registry>,
266
267    /// Runtime for the streaming actors.
268    pub(super) runtime: BackgroundShutdownRuntime,
269}
270
271pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
272    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
273    has_control_stream_connected: bool,
274}
275
276impl Display for LocalBarrierWorkerDebugInfo<'_> {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        writeln!(
279            f,
280            "\nhas_control_stream_connected: {}",
281            self.has_control_stream_connected
282        )?;
283
284        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
285            writeln!(
286                f,
287                "database {} status: {} managed_barrier_state:\n{}",
288                database_id.database_id,
289                status,
290                managed_barrier_state
291                    .as_ref()
292                    .map(ToString::to_string)
293                    .unwrap_or_default()
294            )?;
295        }
296        Ok(())
297    }
298}
299
300/// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager.
301/// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the
302/// barriers to and collect them from all actors, and finally report the progress.
303pub(super) struct LocalBarrierWorker {
304    /// Current barrier collection state.
305    pub(super) state: ManagedBarrierState,
306
307    /// Futures will be finished in the order of epoch in ascending order.
308    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
309
310    control_stream_handle: ControlStreamHandle,
311
312    pub(super) actor_manager: Arc<StreamActorManager>,
313
314    pub(super) term_id: String,
315}
316
317impl LocalBarrierWorker {
318    pub(super) fn new(
319        actor_manager: Arc<StreamActorManager>,
320        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
321        term_id: String,
322    ) -> Self {
323        let state = ManagedBarrierState::new(
324            actor_manager.clone(),
325            initial_partial_graphs,
326            term_id.clone(),
327        );
328        Self {
329            state,
330            await_epoch_completed_futures: Default::default(),
331            control_stream_handle: ControlStreamHandle::empty(),
332            actor_manager,
333            term_id,
334        }
335    }
336
337    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
338        LocalBarrierWorkerDebugInfo {
339            managed_barrier_state: self
340                .state
341                .databases
342                .iter()
343                .map(|(database_id, status)| {
344                    (*database_id, {
345                        match status {
346                            DatabaseStatus::ReceivedExchangeRequest(_) => {
347                                ("ReceivedExchangeRequest".to_owned(), None)
348                            }
349                            DatabaseStatus::Running(state) => {
350                                ("running".to_owned(), Some(state.to_debug_info()))
351                            }
352                            DatabaseStatus::Suspended(state) => {
353                                (format!("suspended: {:?}", state.suspend_time), None)
354                            }
355                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
356                            DatabaseStatus::Unspecified => {
357                                unreachable!()
358                            }
359                        }
360                    })
361                })
362                .collect(),
363            has_control_stream_connected: self.control_stream_handle.connected(),
364        }
365    }
366
367    async fn next_completed_epoch(
368        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
369    ) -> (
370        DatabaseId,
371        PartialGraphId,
372        Barrier,
373        StreamResult<BarrierCompleteResult>,
374    ) {
375        poll_fn(|cx| {
376            for (database_id, futures) in &mut *futures {
377                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
378                    futures.poll_next_unpin(cx)
379                {
380                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
381                }
382            }
383            Poll::Pending
384        })
385        .await
386    }
387
388    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
389        loop {
390            select! {
391                biased;
392                (database_id, event) = self.state.next_event() => {
393                    match event {
394                        ManagedBarrierStateEvent::BarrierCollected{
395                            partial_graph_id,
396                            barrier,
397                        } => {
398                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
399                        }
400                        ManagedBarrierStateEvent::ActorError{
401                            actor_id,
402                            err,
403                        } => {
404                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
405                        }
406                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
407                            self.ack_database_reset(database_id, Some(output), reset_request_id);
408                        }
409                    }
410                }
411                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
412                    match result {
413                        Ok(result) => {
414                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
415                        }
416                        Err(err) => {
417                            // TODO: may only report as database failure instead of reset the stream
418                            // when the HummockUploader support partial recovery. Currently the HummockUploader
419                            // enter `Err` state and stop working until a global recovery to clear the uploader.
420                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
421                        }
422                    }
423                },
424                actor_op = actor_op_rx.recv() => {
425                    if let Some(actor_op) = actor_op {
426                        match actor_op {
427                            LocalActorOperation::NewControlStream { handle, init_request  } => {
428                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
429                                self.reset(init_request).await;
430                                self.control_stream_handle = handle;
431                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
432                            }
433                            LocalActorOperation::Shutdown { result_sender } => {
434                                if self.state.databases.values().any(|database| {
435                                    match database {
436                                        DatabaseStatus::Running(database) => {
437                                            !database.actor_states.is_empty()
438                                        }
439                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
440                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
441                                            false
442                                        }
443                                        DatabaseStatus::Unspecified => {
444                                            unreachable!()
445                                        }
446                                    }
447                                }) {
448                                    tracing::warn!(
449                                        "shutdown with running actors, scaling or migration will be triggered"
450                                    );
451                                }
452                                self.control_stream_handle.shutdown_stream().await;
453                                let _ = result_sender.send(());
454                            }
455                            actor_op => {
456                                self.handle_actor_op(actor_op);
457                            }
458                        }
459                    }
460                    else {
461                        break;
462                    }
463                },
464                request = self.control_stream_handle.next_request() => {
465                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
466                    if let Err((database_id, err)) = result {
467                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
468                    }
469                },
470            }
471        }
472    }
473
474    fn handle_streaming_control_request(
475        &mut self,
476        request: Request,
477    ) -> Result<(), (DatabaseId, StreamError)> {
478        match request {
479            Request::InjectBarrier(req) => {
480                let database_id = DatabaseId::new(req.database_id);
481                let result: StreamResult<()> = try {
482                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
483                    self.send_barrier(&barrier, req)?;
484                };
485                result.map_err(|e| (database_id, e))?;
486                Ok(())
487            }
488            Request::RemovePartialGraph(req) => {
489                self.remove_partial_graphs(
490                    DatabaseId::new(req.database_id),
491                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
492                );
493                Ok(())
494            }
495            Request::CreatePartialGraph(req) => {
496                self.add_partial_graph(
497                    DatabaseId::new(req.database_id),
498                    PartialGraphId::new(req.partial_graph_id),
499                );
500                Ok(())
501            }
502            Request::ResetDatabase(req) => {
503                self.reset_database(req);
504                Ok(())
505            }
506            Request::Init(_) => {
507                unreachable!()
508            }
509        }
510    }
511
512    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
513        match actor_op {
514            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
515                unreachable!("event {actor_op} should be handled separately in async context")
516            }
517            LocalActorOperation::TakeReceiver {
518                database_id,
519                term_id,
520                ids,
521                result_sender,
522            } => {
523                let err = if self.term_id != term_id {
524                    {
525                        warn!(
526                            ?ids,
527                            term_id,
528                            current_term_id = self.term_id,
529                            "take receiver on unmatched term_id"
530                        );
531                        anyhow!(
532                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
533                            ids,
534                            term_id,
535                            self.term_id
536                        )
537                    }
538                } else {
539                    match self.state.databases.entry(database_id) {
540                        Entry::Occupied(mut entry) => match entry.get_mut() {
541                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
542                                pending_requests.push((ids, result_sender));
543                                return;
544                            }
545                            DatabaseStatus::Running(database) => {
546                                let (upstream_actor_id, actor_id) = ids;
547                                database.new_actor_remote_output_request(
548                                    actor_id,
549                                    upstream_actor_id,
550                                    result_sender,
551                                );
552                                return;
553                            }
554                            DatabaseStatus::Suspended(_) => {
555                                anyhow!("database suspended")
556                            }
557                            DatabaseStatus::Resetting(_) => {
558                                anyhow!("database resetting")
559                            }
560                            DatabaseStatus::Unspecified => {
561                                unreachable!()
562                            }
563                        },
564                        Entry::Vacant(entry) => {
565                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
566                                ids,
567                                result_sender,
568                            )]));
569                            return;
570                        }
571                    }
572                };
573                let _ = result_sender.send(Err(err.into()));
574            }
575            #[cfg(test)]
576            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
577                let database_status = self
578                    .state
579                    .databases
580                    .get(&crate::task::TEST_DATABASE_ID)
581                    .unwrap();
582                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
583                let _ = sender.send(database_state.local_barrier_manager.clone());
584            }
585            #[cfg(test)]
586            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
587                let database_status = self
588                    .state
589                    .databases
590                    .get_mut(&crate::task::TEST_DATABASE_ID)
591                    .unwrap();
592
593                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
594                assert!(!database_state.actor_states.contains_key(&actor_id));
595                let requests = database_state
596                    .actor_pending_new_output_requests
597                    .remove(&actor_id)
598                    .unwrap();
599                let _ = sender.send(requests);
600            }
601            #[cfg(test)]
602            LocalActorOperation::Flush(sender) => {
603                use futures::FutureExt;
604                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
605                    self.handle_streaming_control_request(
606                        request.request.expect("should not be empty"),
607                    )
608                    .unwrap();
609                }
610                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
611                    match event {
612                        ManagedBarrierStateEvent::BarrierCollected {
613                            partial_graph_id,
614                            barrier,
615                        } => {
616                            self.complete_barrier(
617                                database_id,
618                                partial_graph_id,
619                                barrier.epoch.prev,
620                            );
621                        }
622                        ManagedBarrierStateEvent::ActorError { .. }
623                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
624                            unreachable!()
625                        }
626                    }
627                }
628                sender.send(()).unwrap()
629            }
630            LocalActorOperation::InspectState { result_sender } => {
631                let debug_info = self.to_debug_info();
632                let _ = result_sender.send(debug_info.to_string());
633            }
634        }
635    }
636}
637
638mod await_epoch_completed_future {
639    use std::future::Future;
640
641    use futures::FutureExt;
642    use futures::future::BoxFuture;
643    use risingwave_hummock_sdk::SyncResult;
644    use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
645
646    use crate::error::StreamResult;
647    use crate::executor::Barrier;
648    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
649
650    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
651        + 'static;
652
653    pub(super) fn instrument_complete_barrier_future(
654        partial_graph_id: PartialGraphId,
655        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
656        barrier: Barrier,
657        barrier_await_tree_reg: Option<&await_tree::Registry>,
658        create_mview_progress: Vec<PbCreateMviewProgress>,
659    ) -> AwaitEpochCompletedFuture {
660        let prev_epoch = barrier.epoch.prev;
661        let future = async move {
662            if let Some(future) = complete_barrier_future {
663                let result = future.await;
664                result.map(Some)
665            } else {
666                Ok(None)
667            }
668        }
669        .map(move |result| {
670            (
671                partial_graph_id,
672                barrier,
673                result.map(|sync_result| BarrierCompleteResult {
674                    sync_result,
675                    create_mview_progress,
676                }),
677            )
678        });
679        if let Some(reg) = barrier_await_tree_reg {
680            reg.register(
681                await_tree_key::BarrierAwait { prev_epoch },
682                format!("SyncEpoch({})", prev_epoch),
683            )
684            .instrument(future)
685            .left_future()
686        } else {
687            future.right_future()
688        }
689    }
690}
691
692use await_epoch_completed_future::*;
693use risingwave_common::catalog::{DatabaseId, TableId};
694use risingwave_storage::{StateStoreImpl, dispatch_state_store};
695
696use crate::executor::exchange::permit;
697
698fn sync_epoch(
699    state_store: &StateStoreImpl,
700    streaming_metrics: &StreamingMetrics,
701    prev_epoch: u64,
702    table_ids: HashSet<TableId>,
703) -> BoxFuture<'static, StreamResult<SyncResult>> {
704    let timer = streaming_metrics.barrier_sync_latency.start_timer();
705
706    let state_store = state_store.clone();
707    let future = async move {
708        dispatch_state_store!(state_store, hummock, {
709            hummock.sync(vec![(prev_epoch, table_ids)]).await
710        })
711    };
712
713    future
714        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
715        .inspect_ok(move |_| {
716            timer.observe_duration();
717        })
718        .map_err(move |e| {
719            tracing::error!(
720                prev_epoch,
721                error = %e.as_report(),
722                "Failed to sync state store",
723            );
724            e.into()
725        })
726        .boxed()
727}
728
729impl LocalBarrierWorker {
730    fn complete_barrier(
731        &mut self,
732        database_id: DatabaseId,
733        partial_graph_id: PartialGraphId,
734        prev_epoch: u64,
735    ) {
736        {
737            let Some(database_state) = self
738                .state
739                .databases
740                .get_mut(&database_id)
741                .expect("should exist")
742                .state_for_request()
743            else {
744                return;
745            };
746            let (barrier, table_ids, create_mview_progress) =
747                database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
748
749            let complete_barrier_future = match &barrier.kind {
750                BarrierKind::Unspecified => unreachable!(),
751                BarrierKind::Initial => {
752                    tracing::info!(
753                        epoch = prev_epoch,
754                        "ignore sealing data for the first barrier"
755                    );
756                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
757                    None
758                }
759                BarrierKind::Barrier => None,
760                BarrierKind::Checkpoint => Some(sync_epoch(
761                    &self.actor_manager.env.state_store(),
762                    &self.actor_manager.streaming_metrics,
763                    prev_epoch,
764                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
765                )),
766            };
767
768            self.await_epoch_completed_futures
769                .entry(database_id)
770                .or_default()
771                .push_back({
772                    instrument_complete_barrier_future(
773                        partial_graph_id,
774                        complete_barrier_future,
775                        barrier,
776                        self.actor_manager.await_tree_reg.as_ref(),
777                        create_mview_progress,
778                    )
779                });
780        }
781    }
782
783    fn on_epoch_completed(
784        &mut self,
785        database_id: DatabaseId,
786        partial_graph_id: PartialGraphId,
787        epoch: u64,
788        result: BarrierCompleteResult,
789    ) {
790        let BarrierCompleteResult {
791            create_mview_progress,
792            sync_result,
793        } = result;
794
795        let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
796            .map(|sync_result| {
797                (
798                    sync_result.uncommitted_ssts,
799                    sync_result.table_watermarks,
800                    sync_result.old_value_ssts,
801                )
802            })
803            .unwrap_or_default();
804
805        let result = {
806            {
807                streaming_control_stream_response::Response::CompleteBarrier(
808                    BarrierCompleteResponse {
809                        request_id: "todo".to_owned(),
810                        partial_graph_id: partial_graph_id.into(),
811                        epoch,
812                        status: None,
813                        create_mview_progress,
814                        synced_sstables: synced_sstables
815                            .into_iter()
816                            .map(
817                                |LocalSstableInfo {
818                                     sst_info,
819                                     table_stats,
820                                     created_at,
821                                 }| PbLocalSstableInfo {
822                                    sst: Some(sst_info.into()),
823                                    table_stats_map: to_prost_table_stats_map(table_stats),
824                                    created_at,
825                                },
826                            )
827                            .collect_vec(),
828                        worker_id: self.actor_manager.env.worker_id(),
829                        table_watermarks: table_watermarks
830                            .into_iter()
831                            .map(|(key, value)| (key.table_id, value.into()))
832                            .collect(),
833                        old_value_sstables: old_value_ssts
834                            .into_iter()
835                            .map(|sst| sst.sst_info.into())
836                            .collect(),
837                        database_id: database_id.database_id,
838                    },
839                )
840            }
841        };
842
843        self.control_stream_handle.send_response(result);
844    }
845
846    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
847    /// barrier is finished, in managed mode.
848    ///
849    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
850    /// the root cause of the failure. The caller should then call `try_find_root_failure`
851    /// to find the root cause.
852    fn send_barrier(
853        &mut self,
854        barrier: &Barrier,
855        request: InjectBarrierRequest,
856    ) -> StreamResult<()> {
857        debug!(
858            target: "events::stream::barrier::manager::send",
859            "send barrier {:?}, actor_ids_to_collect = {:?}",
860            barrier,
861            request.actor_ids_to_collect
862        );
863
864        let database_status = self
865            .state
866            .databases
867            .get_mut(&DatabaseId::new(request.database_id))
868            .expect("should exist");
869        if let Some(state) = database_status.state_for_request() {
870            state.transform_to_issued(barrier, request)?;
871        }
872        Ok(())
873    }
874
875    fn remove_partial_graphs(
876        &mut self,
877        database_id: DatabaseId,
878        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
879    ) {
880        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
881            warn!(
882                database_id = database_id.database_id,
883                "database to remove partial graph not exist"
884            );
885            return;
886        };
887        let Some(database_state) = database_status.state_for_request() else {
888            warn!(
889                database_id = database_id.database_id,
890                "ignore remove partial graph request on err database",
891            );
892            return;
893        };
894        for partial_graph_id in partial_graph_ids {
895            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
896                assert!(
897                    graph.is_empty(),
898                    "non empty graph to be removed: {}",
899                    &graph
900                );
901            } else {
902                warn!(
903                    partial_graph_id = partial_graph_id.0,
904                    "no partial graph to remove"
905                );
906            }
907        }
908    }
909
910    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
911        let status = match self.state.databases.entry(database_id) {
912            Entry::Occupied(entry) => {
913                let status = entry.into_mut();
914                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
915                    let mut database = DatabaseManagedBarrierState::new(
916                        database_id,
917                        self.term_id.clone(),
918                        self.actor_manager.clone(),
919                        vec![],
920                    );
921                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
922                    {
923                        database.new_actor_remote_output_request(
924                            actor_id,
925                            upstream_actor_id,
926                            result_sender,
927                        );
928                    }
929                    *status = DatabaseStatus::Running(database);
930                }
931
932                status
933            }
934            Entry::Vacant(entry) => {
935                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
936                    database_id,
937                    self.term_id.clone(),
938                    self.actor_manager.clone(),
939                    vec![],
940                )))
941            }
942        };
943        if let Some(state) = status.state_for_request() {
944            assert!(
945                state
946                    .graph_states
947                    .insert(
948                        partial_graph_id,
949                        PartialGraphManagedBarrierState::new(&self.actor_manager)
950                    )
951                    .is_none()
952            );
953        }
954    }
955
956    fn reset_database(&mut self, req: ResetDatabaseRequest) {
957        let database_id = DatabaseId::new(req.database_id);
958        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
959            database_status.start_reset(
960                database_id,
961                self.await_epoch_completed_futures.remove(&database_id),
962                req.reset_request_id,
963            );
964        } else {
965            self.ack_database_reset(database_id, None, req.reset_request_id);
966        }
967    }
968
969    fn ack_database_reset(
970        &mut self,
971        database_id: DatabaseId,
972        reset_output: Option<ResetDatabaseOutput>,
973        reset_request_id: u32,
974    ) {
975        info!(
976            database_id = database_id.database_id,
977            "database reset successfully"
978        );
979        if let Some(reset_database) = self.state.databases.remove(&database_id) {
980            match reset_database {
981                DatabaseStatus::Resetting(_) => {}
982                _ => {
983                    unreachable!("must be resetting previously")
984                }
985            }
986        }
987        self.await_epoch_completed_futures.remove(&database_id);
988        self.control_stream_handle.ack_reset_database(
989            database_id,
990            reset_output.and_then(|output| output.root_err),
991            reset_request_id,
992        );
993    }
994
995    /// When some other failure happens (like failed to send barrier), the error is reported using
996    /// this function. The control stream will be responded with a message to notify about the error,
997    /// and the global barrier worker will later reset and rerun the database.
998    fn on_database_failure(
999        &mut self,
1000        database_id: DatabaseId,
1001        failed_actor: Option<ActorId>,
1002        err: StreamError,
1003        message: impl Into<String>,
1004    ) {
1005        let message = message.into();
1006        error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1007        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1008        self.state
1009            .databases
1010            .get_mut(&database_id)
1011            .expect("should exist")
1012            .suspend(failed_actor, err, completing_futures);
1013        self.control_stream_handle
1014            .send_response(Response::ReportDatabaseFailure(
1015                ReportDatabaseFailureResponse {
1016                    database_id: database_id.database_id,
1017                },
1018            ));
1019    }
1020}
1021
1022impl DatabaseManagedBarrierState {
1023    /// Collect actor errors for a while and find the one that might be the root cause.
1024    ///
1025    /// Returns `None` if there's no actor error received.
1026    async fn try_find_root_actor_failure(
1027        &mut self,
1028        first_failure: Option<(Option<ActorId>, StreamError)>,
1029    ) -> Option<ScoredStreamError> {
1030        let mut later_errs = vec![];
1031        // fetch more actor errors within a timeout
1032        let _ = tokio::time::timeout(Duration::from_secs(3), async {
1033            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1034            if let Some((Some(failed_actor), _)) = &first_failure {
1035                uncollected_actors.remove(failed_actor);
1036            }
1037            while !uncollected_actors.is_empty()
1038                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1039            {
1040                uncollected_actors.remove(&actor_id);
1041                later_errs.push(error);
1042            }
1043        })
1044        .await;
1045
1046        first_failure
1047            .into_iter()
1048            .map(|(_, err)| err)
1049            .chain(later_errs.into_iter())
1050            .map(|e| e.with_score())
1051            .max_by_key(|e| e.score)
1052    }
1053}
1054
1055#[derive(Clone)]
1056pub struct LocalBarrierManager {
1057    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
1058    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
1059    pub(crate) database_id: DatabaseId,
1060    pub(crate) term_id: String,
1061    pub(crate) env: StreamEnvironment,
1062}
1063
1064impl LocalBarrierWorker {
1065    /// Create a [`LocalBarrierWorker`] with managed mode.
1066    pub fn spawn(
1067        env: StreamEnvironment,
1068        streaming_metrics: Arc<StreamingMetrics>,
1069        await_tree_reg: Option<await_tree::Registry>,
1070        watermark_epoch: AtomicU64Ref,
1071        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1072    ) -> JoinHandle<()> {
1073        let runtime = {
1074            let mut builder = tokio::runtime::Builder::new_multi_thread();
1075            if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1076                builder.worker_threads(worker_threads_num);
1077            }
1078            builder
1079                .thread_name("rw-streaming")
1080                .enable_all()
1081                .build()
1082                .unwrap()
1083        };
1084
1085        let actor_manager = Arc::new(StreamActorManager {
1086            env: env.clone(),
1087            streaming_metrics,
1088            watermark_epoch,
1089            await_tree_reg,
1090            runtime: runtime.into(),
1091        });
1092        let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1093        tokio::spawn(worker.run(actor_op_rx))
1094    }
1095}
1096
1097pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1098
1099impl<T> Clone for EventSender<T> {
1100    fn clone(&self) -> Self {
1101        Self(self.0.clone())
1102    }
1103}
1104
1105impl<T> EventSender<T> {
1106    pub(super) fn send_event(&self, event: T) {
1107        self.0.send(event).expect("should be able to send event")
1108    }
1109
1110    pub(super) async fn send_and_await<RSP>(
1111        &self,
1112        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1113    ) -> StreamResult<RSP> {
1114        let (tx, rx) = oneshot::channel();
1115        let event = make_event(tx);
1116        self.send_event(event);
1117        rx.await
1118            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1119    }
1120}
1121
1122pub(crate) enum NewOutputRequest {
1123    Local(permit::Sender),
1124    Remote(permit::Sender),
1125}
1126
1127impl LocalBarrierManager {
1128    pub(super) fn new(
1129        database_id: DatabaseId,
1130        term_id: String,
1131        env: StreamEnvironment,
1132    ) -> (
1133        Self,
1134        UnboundedReceiver<LocalBarrierEvent>,
1135        UnboundedReceiver<(ActorId, StreamError)>,
1136    ) {
1137        let (event_tx, event_rx) = unbounded_channel();
1138        let (err_tx, err_rx) = unbounded_channel();
1139        (
1140            Self {
1141                barrier_event_sender: event_tx,
1142                actor_failure_sender: err_tx,
1143                database_id,
1144                term_id,
1145                env,
1146            },
1147            event_rx,
1148            err_rx,
1149        )
1150    }
1151
1152    fn send_event(&self, event: LocalBarrierEvent) {
1153        // ignore error, because the current barrier manager maybe a stale one
1154        let _ = self.barrier_event_sender.send(event);
1155    }
1156
1157    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
1158    /// and collect this barrier with its own `actor_id` using this function.
1159    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
1160        self.send_event(LocalBarrierEvent::ReportActorCollected {
1161            actor_id,
1162            epoch: barrier.epoch,
1163        })
1164    }
1165
1166    /// When a actor exit unexpectedly, it should report this event using this function, so meta
1167    /// will notice actor's exit while collecting.
1168    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
1169        let _ = self
1170            .actor_failure_sender
1171            .send((actor_id, err.into_unexpected_exit(actor_id)));
1172    }
1173
1174    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
1175        let (tx, rx) = mpsc::unbounded_channel();
1176        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
1177            actor_id,
1178            barrier_sender: tx,
1179        });
1180        rx
1181    }
1182
1183    pub fn register_local_upstream_output(
1184        &self,
1185        actor_id: ActorId,
1186        upstream_actor_id: ActorId,
1187    ) -> permit::Receiver {
1188        let (tx, rx) = channel_from_config(self.env.config());
1189        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
1190            actor_id,
1191            upstream_actor_id,
1192            tx,
1193        });
1194        rx
1195    }
1196}
1197
1198/// A [`StreamError`] with a score, used to find the root cause of actor failures.
1199type ScoredStreamError = ScoredError<StreamError>;
1200
1201impl StreamError {
1202    /// Score the given error based on hard-coded rules.
1203    fn with_score(self) -> ScoredStreamError {
1204        // Explicitly list all error kinds here to notice developers to update this function when
1205        // there are changes in error kinds.
1206
1207        fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
1208            use crate::executor::error::ErrorKind;
1209            match e.inner() {
1210                // `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
1211                // and not the root cause.
1212                ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
1213
1214                // Normal errors.
1215                ErrorKind::Uncategorized(_)
1216                | ErrorKind::Storage(_)
1217                | ErrorKind::ArrayError(_)
1218                | ErrorKind::ExprError(_)
1219                | ErrorKind::SerdeError(_)
1220                | ErrorKind::SinkError(_, _)
1221                | ErrorKind::RpcError(_)
1222                | ErrorKind::AlignBarrier(_, _)
1223                | ErrorKind::ConnectorError(_)
1224                | ErrorKind::DmlError(_)
1225                | ErrorKind::NotImplemented(_) => 999,
1226            }
1227        }
1228
1229        fn stream_error_score(e: &StreamError) -> i32 {
1230            use crate::error::ErrorKind;
1231            match e.inner() {
1232                // `UnexpectedExit` wraps the original error. Score on the inner error.
1233                ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
1234
1235                // `BarrierSend` is likely to be caused by actor exit and not the root cause.
1236                ErrorKind::BarrierSend { .. } => 1,
1237
1238                // Executor errors first.
1239                ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
1240
1241                // Then other errors.
1242                ErrorKind::Uncategorized(_)
1243                | ErrorKind::Storage(_)
1244                | ErrorKind::Expression(_)
1245                | ErrorKind::Array(_)
1246                | ErrorKind::Secret(_) => 1000,
1247            }
1248        }
1249
1250        let score = Score(stream_error_score(&self));
1251        ScoredStreamError { error: self, score }
1252    }
1253}
1254
1255#[cfg(test)]
1256impl LocalBarrierManager {
1257    fn spawn_for_test() -> EventSender<LocalActorOperation> {
1258        use std::sync::atomic::AtomicU64;
1259        let (tx, rx) = unbounded_channel();
1260        let _join_handle = LocalBarrierWorker::spawn(
1261            StreamEnvironment::for_test(),
1262            Arc::new(StreamingMetrics::unused()),
1263            None,
1264            Arc::new(AtomicU64::new(0)),
1265            rx,
1266        );
1267        EventSender(tx)
1268    }
1269}
1270
1271#[cfg(test)]
1272pub(crate) mod barrier_test_utils {
1273    use assert_matches::assert_matches;
1274    use futures::StreamExt;
1275    use risingwave_pb::stream_service::streaming_control_stream_request::{
1276        InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1277    };
1278    use risingwave_pb::stream_service::{
1279        InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1280        streaming_control_stream_request, streaming_control_stream_response,
1281    };
1282    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1283    use tokio::sync::oneshot;
1284    use tokio_stream::wrappers::UnboundedReceiverStream;
1285    use tonic::Status;
1286
1287    use crate::executor::Barrier;
1288    use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation};
1289    use crate::task::{
1290        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1291    };
1292
1293    pub(crate) struct LocalBarrierTestEnv {
1294        pub local_barrier_manager: LocalBarrierManager,
1295        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1296        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1297        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1298    }
1299
1300    impl LocalBarrierTestEnv {
1301        pub(crate) async fn for_test() -> Self {
1302            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1303
1304            let (request_tx, request_rx) = unbounded_channel();
1305            let (response_tx, mut response_rx) = unbounded_channel();
1306
1307            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1308                handle: ControlStreamHandle::new(
1309                    response_tx,
1310                    UnboundedReceiverStream::new(request_rx).boxed(),
1311                ),
1312                init_request: InitRequest {
1313                    databases: vec![PbDatabaseInitialPartialGraph {
1314                        database_id: TEST_DATABASE_ID.database_id,
1315                        graphs: vec![PbInitialPartialGraph {
1316                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1317                            subscriptions: vec![],
1318                        }],
1319                    }],
1320                    term_id: "for_test".into(),
1321                },
1322            });
1323
1324            assert_matches!(
1325                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1326                streaming_control_stream_response::Response::Init(_)
1327            );
1328
1329            let local_barrier_manager = actor_op_tx
1330                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1331                .await
1332                .unwrap();
1333
1334            Self {
1335                local_barrier_manager,
1336                actor_op_tx,
1337                request_tx,
1338                response_rx,
1339            }
1340        }
1341
1342        pub(crate) fn inject_barrier(
1343            &self,
1344            barrier: &Barrier,
1345            actor_to_collect: impl IntoIterator<Item = ActorId>,
1346        ) {
1347            self.request_tx
1348                .send(Ok(StreamingControlStreamRequest {
1349                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1350                        InjectBarrierRequest {
1351                            request_id: "".to_owned(),
1352                            barrier: Some(barrier.to_protobuf()),
1353                            database_id: TEST_DATABASE_ID.database_id,
1354                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1355                            table_ids_to_sync: vec![],
1356                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1357                            actors_to_build: vec![],
1358                            subscriptions_to_add: vec![],
1359                            subscriptions_to_remove: vec![],
1360                        },
1361                    )),
1362                }))
1363                .unwrap();
1364        }
1365
1366        pub(crate) async fn flush_all_events(&self) {
1367            Self::flush_all_events_impl(&self.actor_op_tx).await
1368        }
1369
1370        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1371            let (tx, rx) = oneshot::channel();
1372            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1373            rx.await.unwrap()
1374        }
1375
1376        pub(crate) async fn take_pending_new_output_requests(
1377            &self,
1378            actor_id: ActorId,
1379        ) -> Vec<(ActorId, NewOutputRequest)> {
1380            self.actor_op_tx
1381                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1382                .await
1383                .unwrap()
1384        }
1385    }
1386}