risingwave_stream/task/
barrier_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use await_tree::InstrumentAwait;
24use futures::future::BoxFuture;
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_common::error::tonic::extra::{Score, ScoredError};
29use risingwave_pb::stream_plan::barrier::BarrierKind;
30use risingwave_pb::stream_service::barrier_complete_response::{
31    PbCreateMviewProgress, PbLocalSstableInfo,
32};
33use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
34use risingwave_storage::store_impl::AsHummock;
35use thiserror_ext::AsReport;
36use tokio::select;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
38use tokio::sync::{mpsc, oneshot};
39use tokio::task::JoinHandle;
40use tonic::{Code, Status};
41use tracing::warn;
42
43use self::managed_state::ManagedBarrierState;
44use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
45use crate::task::{
46    ActorId, AtomicU64Ref, PartialGraphId, SharedContext, StreamEnvironment, UpDownActorIds,
47};
48
49mod managed_state;
50mod progress;
51#[cfg(test)]
52mod tests;
53
54pub use progress::CreateMviewProgressReporter;
55use risingwave_common::util::epoch::EpochPair;
56use risingwave_common::util::runtime::BackgroundShutdownRuntime;
57use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
58use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
59use risingwave_pb::stream_service::streaming_control_stream_request::{
60    DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
61};
62use risingwave_pb::stream_service::streaming_control_stream_response::{
63    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
64};
65use risingwave_pb::stream_service::{
66    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
67    StreamingControlStreamResponse, streaming_control_stream_response,
68};
69
70use crate::executor::exchange::permit::Receiver;
71use crate::executor::monitor::StreamingMetrics;
72use crate::executor::{Barrier, BarrierInner, StreamExecutorError};
73use crate::task::barrier_manager::managed_state::{
74    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
75    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
76};
77use crate::task::barrier_manager::progress::BackfillState;
78
79/// If enabled, all actors will be grouped in the same tracing span within one epoch.
80/// Note that this option will significantly increase the overhead of tracing.
81pub const ENABLE_BARRIER_AGGREGATION: bool = false;
82
83/// Collect result of some barrier on current compute node. Will be reported to the meta service.
84#[derive(Debug)]
85pub struct BarrierCompleteResult {
86    /// The result returned from `sync` of `StateStore`.
87    pub sync_result: Option<SyncResult>,
88
89    /// The updated creation progress of materialized view after this barrier.
90    pub create_mview_progress: Vec<PbCreateMviewProgress>,
91}
92
93pub(super) struct ControlStreamHandle {
94    #[expect(clippy::type_complexity)]
95    pair: Option<(
96        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
97        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
98    )>,
99}
100
101impl ControlStreamHandle {
102    fn empty() -> Self {
103        Self { pair: None }
104    }
105
106    pub(super) fn new(
107        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
108        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
109    ) -> Self {
110        Self {
111            pair: Some((sender, request_stream)),
112        }
113    }
114
115    pub(super) fn connected(&self) -> bool {
116        self.pair.is_some()
117    }
118
119    fn reset_stream_with_err(&mut self, err: Status) {
120        if let Some((sender, _)) = self.pair.take() {
121            // Note: `TonicStatusWrapper` provides a better error report.
122            let err = TonicStatusWrapper::new(err);
123            warn!(error = %err.as_report(), "control stream reset with error");
124
125            let err = err.into_inner();
126            if sender.send(Err(err)).is_err() {
127                warn!("failed to notify reset of control stream");
128            }
129        }
130    }
131
132    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
133    /// by the meta service.
134    async fn shutdown_stream(&mut self) {
135        if let Some((sender, _)) = self.pair.take() {
136            if sender
137                .send(Ok(StreamingControlStreamResponse {
138                    response: Some(streaming_control_stream_response::Response::Shutdown(
139                        ShutdownResponse::default(),
140                    )),
141                }))
142                .is_err()
143            {
144                warn!("failed to notify shutdown of control stream");
145            } else {
146                tracing::info!("waiting for meta service to close control stream...");
147
148                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
149                // been acknowledged by the meta service for more precise error report.
150                //
151                // This is because the meta service will reset the control stream manager and
152                // drop the connection to us upon recovery. As a result, the receiver part of
153                // this sender will also be dropped, causing the stream to close.
154                sender.closed().await;
155            }
156        } else {
157            debug!("control stream has been reset, ignore shutdown");
158        }
159    }
160
161    pub(super) fn ack_reset_database(
162        &mut self,
163        database_id: DatabaseId,
164        root_err: Option<ScoredStreamError>,
165        reset_request_id: u32,
166    ) {
167        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
168            database_id: database_id.database_id,
169            root_err: root_err.map(|err| PbScoredError {
170                err_msg: err.error.to_report_string(),
171                score: err.score.0,
172            }),
173            reset_request_id,
174        }));
175    }
176
177    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
178        if let Some((sender, _)) = self.pair.as_ref() {
179            if sender
180                .send(Ok(StreamingControlStreamResponse {
181                    response: Some(response),
182                }))
183                .is_err()
184            {
185                self.pair = None;
186                warn!("fail to send response. control stream reset");
187            }
188        } else {
189            debug!(?response, "control stream has been reset. ignore response");
190        }
191    }
192
193    async fn next_request(&mut self) -> StreamingControlStreamRequest {
194        if let Some((_, stream)) = &mut self.pair {
195            match stream.next().await {
196                Some(Ok(request)) => {
197                    return request;
198                }
199                Some(Err(e)) => self.reset_stream_with_err(
200                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
201                        .context("failed to get request")
202                        .to_status_unnamed(Code::Internal),
203                ),
204                None => self.reset_stream_with_err(Status::internal("end of stream")),
205            }
206        }
207        pending().await
208    }
209}
210
211pub(super) enum LocalBarrierEvent {
212    ReportActorCollected {
213        actor_id: ActorId,
214        epoch: EpochPair,
215    },
216    ReportCreateProgress {
217        epoch: EpochPair,
218        actor: ActorId,
219        state: BackfillState,
220    },
221    RegisterBarrierSender {
222        actor_id: ActorId,
223        barrier_sender: mpsc::UnboundedSender<Barrier>,
224    },
225}
226
227#[derive(strum_macros::Display)]
228pub(super) enum LocalActorOperation {
229    NewControlStream {
230        handle: ControlStreamHandle,
231        init_request: InitRequest,
232    },
233    TakeReceiver {
234        database_id: DatabaseId,
235        term_id: String,
236        ids: UpDownActorIds,
237        result_sender: oneshot::Sender<StreamResult<Receiver>>,
238    },
239    #[cfg(test)]
240    GetCurrentSharedContext(oneshot::Sender<(Arc<SharedContext>, LocalBarrierManager)>),
241    #[cfg(test)]
242    Flush(oneshot::Sender<()>),
243    InspectState {
244        result_sender: oneshot::Sender<String>,
245    },
246    Shutdown {
247        result_sender: oneshot::Sender<()>,
248    },
249}
250
251pub(crate) struct StreamActorManager {
252    pub(super) env: StreamEnvironment,
253    pub(super) streaming_metrics: Arc<StreamingMetrics>,
254
255    /// Watermark epoch number.
256    pub(super) watermark_epoch: AtomicU64Ref,
257
258    /// Manages the await-trees of all actors.
259    pub(super) await_tree_reg: Option<await_tree::Registry>,
260
261    /// Runtime for the streaming actors.
262    pub(super) runtime: BackgroundShutdownRuntime,
263}
264
265pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
266    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
267    has_control_stream_connected: bool,
268}
269
270impl Display for LocalBarrierWorkerDebugInfo<'_> {
271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272        writeln!(
273            f,
274            "\nhas_control_stream_connected: {}",
275            self.has_control_stream_connected
276        )?;
277
278        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
279            writeln!(
280                f,
281                "database {} status: {} managed_barrier_state:\n{}",
282                database_id.database_id,
283                status,
284                managed_barrier_state
285                    .as_ref()
286                    .map(ToString::to_string)
287                    .unwrap_or_default()
288            )?;
289        }
290        Ok(())
291    }
292}
293
294/// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager.
295/// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the
296/// barriers to and collect them from all actors, and finally report the progress.
297pub(super) struct LocalBarrierWorker {
298    /// Current barrier collection state.
299    pub(super) state: ManagedBarrierState,
300
301    /// Futures will be finished in the order of epoch in ascending order.
302    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
303
304    control_stream_handle: ControlStreamHandle,
305
306    pub(super) actor_manager: Arc<StreamActorManager>,
307
308    pub(super) term_id: String,
309}
310
311impl LocalBarrierWorker {
312    pub(super) fn new(
313        actor_manager: Arc<StreamActorManager>,
314        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
315        term_id: String,
316    ) -> Self {
317        let state = ManagedBarrierState::new(
318            actor_manager.clone(),
319            initial_partial_graphs,
320            term_id.clone(),
321        );
322        Self {
323            state,
324            await_epoch_completed_futures: Default::default(),
325            control_stream_handle: ControlStreamHandle::empty(),
326            actor_manager,
327            term_id,
328        }
329    }
330
331    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
332        LocalBarrierWorkerDebugInfo {
333            managed_barrier_state: self
334                .state
335                .databases
336                .iter()
337                .map(|(database_id, status)| {
338                    (*database_id, {
339                        match status {
340                            DatabaseStatus::Running(state) => {
341                                ("running".to_owned(), Some(state.to_debug_info()))
342                            }
343                            DatabaseStatus::Suspended(state) => {
344                                (format!("suspended: {:?}", state.suspend_time), None)
345                            }
346                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
347                            DatabaseStatus::Unspecified => {
348                                unreachable!()
349                            }
350                        }
351                    })
352                })
353                .collect(),
354            has_control_stream_connected: self.control_stream_handle.connected(),
355        }
356    }
357
358    pub(crate) fn get_or_insert_database_shared_context<'a>(
359        current_shared_context: &'a mut HashMap<DatabaseId, Arc<SharedContext>>,
360        database_id: DatabaseId,
361        actor_manager: &StreamActorManager,
362        term_id: &String,
363    ) -> &'a Arc<SharedContext> {
364        current_shared_context
365            .entry(database_id)
366            .or_insert_with(|| {
367                Arc::new(SharedContext::new(
368                    database_id,
369                    &actor_manager.env,
370                    term_id.clone(),
371                ))
372            })
373    }
374
375    async fn next_completed_epoch(
376        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
377    ) -> (
378        DatabaseId,
379        PartialGraphId,
380        Barrier,
381        StreamResult<BarrierCompleteResult>,
382    ) {
383        poll_fn(|cx| {
384            for (database_id, futures) in &mut *futures {
385                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
386                    futures.poll_next_unpin(cx)
387                {
388                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
389                }
390            }
391            Poll::Pending
392        })
393        .await
394    }
395
396    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
397        loop {
398            select! {
399                biased;
400                (database_id, event) = self.state.next_event() => {
401                    match event {
402                        ManagedBarrierStateEvent::BarrierCollected{
403                            partial_graph_id,
404                            barrier,
405                        } => {
406                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
407                        }
408                        ManagedBarrierStateEvent::ActorError{
409                            actor_id,
410                            err,
411                        } => {
412                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
413                        }
414                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
415                            self.ack_database_reset(database_id, Some(output), reset_request_id);
416                        }
417                    }
418                }
419                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
420                    match result {
421                        Ok(result) => {
422                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
423                        }
424                        Err(err) => {
425                            // TODO: may only report as database failure instead of reset the stream
426                            // when the HummockUploader support partial recovery. Currently the HummockUploader
427                            // enter `Err` state and stop working until a global recovery to clear the uploader.
428                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
429                        }
430                    }
431                },
432                actor_op = actor_op_rx.recv() => {
433                    if let Some(actor_op) = actor_op {
434                        match actor_op {
435                            LocalActorOperation::NewControlStream { handle, init_request  } => {
436                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
437                                self.reset(init_request).await;
438                                self.control_stream_handle = handle;
439                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
440                            }
441                            LocalActorOperation::Shutdown { result_sender } => {
442                                if self.state.databases.values().any(|database| {
443                                    match database {
444                                        DatabaseStatus::Running(database) => {
445                                            !database.actor_states.is_empty()
446                                        }
447                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) => {
448                                            false
449                                        }
450                                        DatabaseStatus::Unspecified => {
451                                            unreachable!()
452                                        }
453                                    }
454                                }) {
455                                    tracing::warn!(
456                                        "shutdown with running actors, scaling or migration will be triggered"
457                                    );
458                                }
459                                self.control_stream_handle.shutdown_stream().await;
460                                let _ = result_sender.send(());
461                            }
462                            actor_op => {
463                                self.handle_actor_op(actor_op);
464                            }
465                        }
466                    }
467                    else {
468                        break;
469                    }
470                },
471                request = self.control_stream_handle.next_request() => {
472                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
473                    if let Err((database_id, err)) = result {
474                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
475                    }
476                },
477            }
478        }
479    }
480
481    fn handle_streaming_control_request(
482        &mut self,
483        request: Request,
484    ) -> Result<(), (DatabaseId, StreamError)> {
485        match request {
486            Request::InjectBarrier(req) => {
487                let database_id = DatabaseId::new(req.database_id);
488                let result: StreamResult<()> = try {
489                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
490                    self.update_actor_info(database_id, req.broadcast_info.iter().cloned());
491                    self.send_barrier(&barrier, req)?;
492                };
493                result.map_err(|e| (database_id, e))?;
494                Ok(())
495            }
496            Request::RemovePartialGraph(req) => {
497                self.remove_partial_graphs(
498                    DatabaseId::new(req.database_id),
499                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
500                );
501                Ok(())
502            }
503            Request::CreatePartialGraph(req) => {
504                self.add_partial_graph(
505                    DatabaseId::new(req.database_id),
506                    PartialGraphId::new(req.partial_graph_id),
507                );
508                Ok(())
509            }
510            Request::ResetDatabase(req) => {
511                self.reset_database(req);
512                Ok(())
513            }
514            Request::Init(_) => {
515                unreachable!()
516            }
517        }
518    }
519
520    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
521        match actor_op {
522            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
523                unreachable!("event {actor_op} should be handled separately in async context")
524            }
525            LocalActorOperation::TakeReceiver {
526                database_id,
527                term_id,
528                ids,
529                result_sender,
530            } => {
531                let result = try {
532                    if self.term_id != term_id {
533                        warn!(
534                            ?ids,
535                            term_id,
536                            current_term_id = self.term_id,
537                            "take receiver on unmatched term_id"
538                        );
539                        Err(anyhow!(
540                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
541                            ids,
542                            term_id,
543                            self.term_id
544                        ))?;
545                    }
546                    LocalBarrierWorker::get_or_insert_database_shared_context(
547                        &mut self.state.current_shared_context,
548                        database_id,
549                        &self.actor_manager,
550                        &self.term_id,
551                    )
552                    .take_receiver(ids)?
553                };
554                let _ = result_sender.send(result);
555            }
556            #[cfg(test)]
557            LocalActorOperation::GetCurrentSharedContext(sender) => {
558                let database_status = self
559                    .state
560                    .databases
561                    .get(&crate::task::TEST_DATABASE_ID)
562                    .unwrap();
563                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
564                let _ = sender.send((
565                    database_state.current_shared_context.clone(),
566                    database_state.local_barrier_manager.clone(),
567                ));
568            }
569            #[cfg(test)]
570            LocalActorOperation::Flush(sender) => {
571                use futures::FutureExt;
572                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
573                    self.handle_streaming_control_request(
574                        request.request.expect("should not be empty"),
575                    )
576                    .unwrap();
577                }
578                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
579                    match event {
580                        ManagedBarrierStateEvent::BarrierCollected {
581                            partial_graph_id,
582                            barrier,
583                        } => {
584                            self.complete_barrier(
585                                database_id,
586                                partial_graph_id,
587                                barrier.epoch.prev,
588                            );
589                        }
590                        ManagedBarrierStateEvent::ActorError { .. }
591                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
592                            unreachable!()
593                        }
594                    }
595                }
596                sender.send(()).unwrap()
597            }
598            LocalActorOperation::InspectState { result_sender } => {
599                let debug_info = self.to_debug_info();
600                let _ = result_sender.send(debug_info.to_string());
601            }
602        }
603    }
604}
605
606mod await_epoch_completed_future {
607    use std::future::Future;
608
609    use futures::FutureExt;
610    use futures::future::BoxFuture;
611    use risingwave_hummock_sdk::SyncResult;
612    use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
613
614    use crate::error::StreamResult;
615    use crate::executor::Barrier;
616    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
617
618    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
619        + 'static;
620
621    pub(super) fn instrument_complete_barrier_future(
622        partial_graph_id: PartialGraphId,
623        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
624        barrier: Barrier,
625        barrier_await_tree_reg: Option<&await_tree::Registry>,
626        create_mview_progress: Vec<PbCreateMviewProgress>,
627    ) -> AwaitEpochCompletedFuture {
628        let prev_epoch = barrier.epoch.prev;
629        let future = async move {
630            if let Some(future) = complete_barrier_future {
631                let result = future.await;
632                result.map(Some)
633            } else {
634                Ok(None)
635            }
636        }
637        .map(move |result| {
638            (
639                partial_graph_id,
640                barrier,
641                result.map(|sync_result| BarrierCompleteResult {
642                    sync_result,
643                    create_mview_progress,
644                }),
645            )
646        });
647        if let Some(reg) = barrier_await_tree_reg {
648            reg.register(
649                await_tree_key::BarrierAwait { prev_epoch },
650                format!("SyncEpoch({})", prev_epoch),
651            )
652            .instrument(future)
653            .left_future()
654        } else {
655            future.right_future()
656        }
657    }
658}
659
660use await_epoch_completed_future::*;
661use risingwave_common::catalog::{DatabaseId, TableId};
662use risingwave_storage::{StateStoreImpl, dispatch_state_store};
663
664fn sync_epoch(
665    state_store: &StateStoreImpl,
666    streaming_metrics: &StreamingMetrics,
667    prev_epoch: u64,
668    table_ids: HashSet<TableId>,
669) -> BoxFuture<'static, StreamResult<SyncResult>> {
670    let timer = streaming_metrics.barrier_sync_latency.start_timer();
671
672    let state_store = state_store.clone();
673    let future = async move {
674        dispatch_state_store!(state_store, hummock, {
675            hummock.sync(vec![(prev_epoch, table_ids)]).await
676        })
677    };
678
679    future
680        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
681        .inspect_ok(move |_| {
682            timer.observe_duration();
683        })
684        .map_err(move |e| {
685            tracing::error!(
686                prev_epoch,
687                error = %e.as_report(),
688                "Failed to sync state store",
689            );
690            e.into()
691        })
692        .boxed()
693}
694
695impl LocalBarrierWorker {
696    fn complete_barrier(
697        &mut self,
698        database_id: DatabaseId,
699        partial_graph_id: PartialGraphId,
700        prev_epoch: u64,
701    ) {
702        {
703            let Some(database_state) = self
704                .state
705                .databases
706                .get_mut(&database_id)
707                .expect("should exist")
708                .state_for_request()
709            else {
710                return;
711            };
712            let (barrier, table_ids, create_mview_progress) =
713                database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
714
715            let complete_barrier_future = match &barrier.kind {
716                BarrierKind::Unspecified => unreachable!(),
717                BarrierKind::Initial => {
718                    tracing::info!(
719                        epoch = prev_epoch,
720                        "ignore sealing data for the first barrier"
721                    );
722                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
723                    None
724                }
725                BarrierKind::Barrier => None,
726                BarrierKind::Checkpoint => Some(sync_epoch(
727                    &self.actor_manager.env.state_store(),
728                    &self.actor_manager.streaming_metrics,
729                    prev_epoch,
730                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
731                )),
732            };
733
734            self.await_epoch_completed_futures
735                .entry(database_id)
736                .or_default()
737                .push_back({
738                    instrument_complete_barrier_future(
739                        partial_graph_id,
740                        complete_barrier_future,
741                        barrier,
742                        self.actor_manager.await_tree_reg.as_ref(),
743                        create_mview_progress,
744                    )
745                });
746        }
747    }
748
749    fn on_epoch_completed(
750        &mut self,
751        database_id: DatabaseId,
752        partial_graph_id: PartialGraphId,
753        epoch: u64,
754        result: BarrierCompleteResult,
755    ) {
756        let BarrierCompleteResult {
757            create_mview_progress,
758            sync_result,
759        } = result;
760
761        let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
762            .map(|sync_result| {
763                (
764                    sync_result.uncommitted_ssts,
765                    sync_result.table_watermarks,
766                    sync_result.old_value_ssts,
767                )
768            })
769            .unwrap_or_default();
770
771        let result = {
772            {
773                streaming_control_stream_response::Response::CompleteBarrier(
774                    BarrierCompleteResponse {
775                        request_id: "todo".to_owned(),
776                        partial_graph_id: partial_graph_id.into(),
777                        epoch,
778                        status: None,
779                        create_mview_progress,
780                        synced_sstables: synced_sstables
781                            .into_iter()
782                            .map(
783                                |LocalSstableInfo {
784                                     sst_info,
785                                     table_stats,
786                                     created_at,
787                                 }| PbLocalSstableInfo {
788                                    sst: Some(sst_info.into()),
789                                    table_stats_map: to_prost_table_stats_map(table_stats),
790                                    created_at,
791                                },
792                            )
793                            .collect_vec(),
794                        worker_id: self.actor_manager.env.worker_id(),
795                        table_watermarks: table_watermarks
796                            .into_iter()
797                            .map(|(key, value)| (key.table_id, value.into()))
798                            .collect(),
799                        old_value_sstables: old_value_ssts
800                            .into_iter()
801                            .map(|sst| sst.sst_info.into())
802                            .collect(),
803                        database_id: database_id.database_id,
804                    },
805                )
806            }
807        };
808
809        self.control_stream_handle.send_response(result);
810    }
811
812    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
813    /// barrier is finished, in managed mode.
814    ///
815    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
816    /// the root cause of the failure. The caller should then call `try_find_root_failure`
817    /// to find the root cause.
818    fn send_barrier(
819        &mut self,
820        barrier: &Barrier,
821        request: InjectBarrierRequest,
822    ) -> StreamResult<()> {
823        debug!(
824            target: "events::stream::barrier::manager::send",
825            "send barrier {:?}, actor_ids_to_collect = {:?}",
826            barrier,
827            request.actor_ids_to_collect
828        );
829
830        let database_status = self
831            .state
832            .databases
833            .get_mut(&DatabaseId::new(request.database_id))
834            .expect("should exist");
835        if let Some(state) = database_status.state_for_request() {
836            state.transform_to_issued(barrier, request)?;
837        }
838        Ok(())
839    }
840
841    fn remove_partial_graphs(
842        &mut self,
843        database_id: DatabaseId,
844        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
845    ) {
846        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
847            warn!(
848                database_id = database_id.database_id,
849                "database to remove partial graph not exist"
850            );
851            return;
852        };
853        let Some(database_state) = database_status.state_for_request() else {
854            warn!(
855                database_id = database_id.database_id,
856                "ignore remove partial graph request on err database",
857            );
858            return;
859        };
860        for partial_graph_id in partial_graph_ids {
861            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
862                assert!(
863                    graph.is_empty(),
864                    "non empty graph to be removed: {}",
865                    &graph
866                );
867            } else {
868                warn!(
869                    partial_graph_id = partial_graph_id.0,
870                    "no partial graph to remove"
871                );
872            }
873        }
874    }
875
876    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
877        let status = self.state.databases.entry(database_id).or_insert_with(|| {
878            DatabaseStatus::Running(DatabaseManagedBarrierState::new(
879                database_id,
880                self.actor_manager.clone(),
881                LocalBarrierWorker::get_or_insert_database_shared_context(
882                    &mut self.state.current_shared_context,
883                    database_id,
884                    &self.actor_manager,
885                    &self.term_id,
886                )
887                .clone(),
888                vec![],
889            ))
890        });
891        if let Some(state) = status.state_for_request() {
892            assert!(
893                state
894                    .graph_states
895                    .insert(
896                        partial_graph_id,
897                        PartialGraphManagedBarrierState::new(&self.actor_manager)
898                    )
899                    .is_none()
900            );
901        }
902    }
903
904    fn reset_database(&mut self, req: ResetDatabaseRequest) {
905        let database_id = DatabaseId::new(req.database_id);
906        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
907            database_status.start_reset(
908                database_id,
909                self.await_epoch_completed_futures.remove(&database_id),
910                req.reset_request_id,
911            );
912        } else {
913            self.ack_database_reset(database_id, None, req.reset_request_id);
914        }
915    }
916
917    fn ack_database_reset(
918        &mut self,
919        database_id: DatabaseId,
920        reset_output: Option<ResetDatabaseOutput>,
921        reset_request_id: u32,
922    ) {
923        info!(
924            database_id = database_id.database_id,
925            "database reset successfully"
926        );
927        if let Some(reset_database) = self.state.databases.remove(&database_id) {
928            match reset_database {
929                DatabaseStatus::Resetting(_) => {}
930                _ => {
931                    unreachable!("must be resetting previously")
932                }
933            }
934        }
935        self.state.current_shared_context.remove(&database_id);
936        self.await_epoch_completed_futures.remove(&database_id);
937        self.control_stream_handle.ack_reset_database(
938            database_id,
939            reset_output.and_then(|output| output.root_err),
940            reset_request_id,
941        );
942    }
943
944    /// When some other failure happens (like failed to send barrier), the error is reported using
945    /// this function. The control stream will be responded with a message to notify about the error,
946    /// and the global barrier worker will later reset and rerun the database.
947    fn on_database_failure(
948        &mut self,
949        database_id: DatabaseId,
950        failed_actor: Option<ActorId>,
951        err: StreamError,
952        message: impl Into<String>,
953    ) {
954        let message = message.into();
955        error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
956        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
957        self.state
958            .databases
959            .get_mut(&database_id)
960            .expect("should exist")
961            .suspend(failed_actor, err, completing_futures);
962        self.control_stream_handle
963            .send_response(Response::ReportDatabaseFailure(
964                ReportDatabaseFailureResponse {
965                    database_id: database_id.database_id,
966                },
967            ));
968    }
969}
970
971impl DatabaseManagedBarrierState {
972    /// Collect actor errors for a while and find the one that might be the root cause.
973    ///
974    /// Returns `None` if there's no actor error received.
975    async fn try_find_root_actor_failure(
976        &mut self,
977        first_failure: Option<(Option<ActorId>, StreamError)>,
978    ) -> Option<ScoredStreamError> {
979        let mut later_errs = vec![];
980        // fetch more actor errors within a timeout
981        let _ = tokio::time::timeout(Duration::from_secs(3), async {
982            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
983            if let Some((Some(failed_actor), _)) = &first_failure {
984                uncollected_actors.remove(failed_actor);
985            }
986            while !uncollected_actors.is_empty()
987                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
988            {
989                uncollected_actors.remove(&actor_id);
990                later_errs.push(error);
991            }
992        })
993        .await;
994
995        first_failure
996            .into_iter()
997            .map(|(_, err)| err)
998            .chain(later_errs.into_iter())
999            .map(|e| e.with_score())
1000            .max_by_key(|e| e.score)
1001    }
1002}
1003
1004#[derive(Clone)]
1005pub struct LocalBarrierManager {
1006    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
1007    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
1008}
1009
1010impl LocalBarrierWorker {
1011    /// Create a [`LocalBarrierWorker`] with managed mode.
1012    pub fn spawn(
1013        env: StreamEnvironment,
1014        streaming_metrics: Arc<StreamingMetrics>,
1015        await_tree_reg: Option<await_tree::Registry>,
1016        watermark_epoch: AtomicU64Ref,
1017        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1018    ) -> JoinHandle<()> {
1019        let runtime = {
1020            let mut builder = tokio::runtime::Builder::new_multi_thread();
1021            if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1022                builder.worker_threads(worker_threads_num);
1023            }
1024            builder
1025                .thread_name("rw-streaming")
1026                .enable_all()
1027                .build()
1028                .unwrap()
1029        };
1030
1031        let actor_manager = Arc::new(StreamActorManager {
1032            env: env.clone(),
1033            streaming_metrics,
1034            watermark_epoch,
1035            await_tree_reg,
1036            runtime: runtime.into(),
1037        });
1038        let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1039        tokio::spawn(worker.run(actor_op_rx))
1040    }
1041}
1042
1043pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1044
1045impl<T> Clone for EventSender<T> {
1046    fn clone(&self) -> Self {
1047        Self(self.0.clone())
1048    }
1049}
1050
1051impl<T> EventSender<T> {
1052    pub(super) fn send_event(&self, event: T) {
1053        self.0.send(event).expect("should be able to send event")
1054    }
1055
1056    pub(super) async fn send_and_await<RSP>(
1057        &self,
1058        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1059    ) -> StreamResult<RSP> {
1060        let (tx, rx) = oneshot::channel();
1061        let event = make_event(tx);
1062        self.send_event(event);
1063        rx.await
1064            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1065    }
1066}
1067
1068impl LocalBarrierManager {
1069    pub(super) fn new() -> (
1070        Self,
1071        UnboundedReceiver<LocalBarrierEvent>,
1072        UnboundedReceiver<(ActorId, StreamError)>,
1073    ) {
1074        let (event_tx, event_rx) = unbounded_channel();
1075        let (err_tx, err_rx) = unbounded_channel();
1076        (
1077            Self {
1078                barrier_event_sender: event_tx,
1079                actor_failure_sender: err_tx,
1080            },
1081            event_rx,
1082            err_rx,
1083        )
1084    }
1085
1086    fn send_event(&self, event: LocalBarrierEvent) {
1087        // ignore error, because the current barrier manager maybe a stale one
1088        let _ = self.barrier_event_sender.send(event);
1089    }
1090
1091    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
1092    /// and collect this barrier with its own `actor_id` using this function.
1093    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
1094        self.send_event(LocalBarrierEvent::ReportActorCollected {
1095            actor_id,
1096            epoch: barrier.epoch,
1097        })
1098    }
1099
1100    /// When a actor exit unexpectedly, it should report this event using this function, so meta
1101    /// will notice actor's exit while collecting.
1102    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
1103        let _ = self
1104            .actor_failure_sender
1105            .send((actor_id, err.into_unexpected_exit(actor_id)));
1106    }
1107
1108    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
1109        let (tx, rx) = mpsc::unbounded_channel();
1110        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
1111            actor_id,
1112            barrier_sender: tx,
1113        });
1114        rx
1115    }
1116}
1117
1118/// A [`StreamError`] with a score, used to find the root cause of actor failures.
1119type ScoredStreamError = ScoredError<StreamError>;
1120
1121impl StreamError {
1122    /// Score the given error based on hard-coded rules.
1123    fn with_score(self) -> ScoredStreamError {
1124        // Explicitly list all error kinds here to notice developers to update this function when
1125        // there are changes in error kinds.
1126
1127        fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
1128            use crate::executor::error::ErrorKind;
1129            match e.inner() {
1130                // `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
1131                // and not the root cause.
1132                ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
1133
1134                // Normal errors.
1135                ErrorKind::Uncategorized(_)
1136                | ErrorKind::Storage(_)
1137                | ErrorKind::ArrayError(_)
1138                | ErrorKind::ExprError(_)
1139                | ErrorKind::SerdeError(_)
1140                | ErrorKind::SinkError(_, _)
1141                | ErrorKind::RpcError(_)
1142                | ErrorKind::AlignBarrier(_, _)
1143                | ErrorKind::ConnectorError(_)
1144                | ErrorKind::DmlError(_)
1145                | ErrorKind::NotImplemented(_) => 999,
1146            }
1147        }
1148
1149        fn stream_error_score(e: &StreamError) -> i32 {
1150            use crate::error::ErrorKind;
1151            match e.inner() {
1152                // `UnexpectedExit` wraps the original error. Score on the inner error.
1153                ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
1154
1155                // `BarrierSend` is likely to be caused by actor exit and not the root cause.
1156                ErrorKind::BarrierSend { .. } => 1,
1157
1158                // Executor errors first.
1159                ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
1160
1161                // Then other errors.
1162                ErrorKind::Uncategorized(_)
1163                | ErrorKind::Storage(_)
1164                | ErrorKind::Expression(_)
1165                | ErrorKind::Array(_)
1166                | ErrorKind::Secret(_) => 1000,
1167            }
1168        }
1169
1170        let score = Score(stream_error_score(&self));
1171        ScoredStreamError { error: self, score }
1172    }
1173}
1174
1175#[cfg(test)]
1176impl LocalBarrierManager {
1177    fn spawn_for_test() -> EventSender<LocalActorOperation> {
1178        use std::sync::atomic::AtomicU64;
1179        let (tx, rx) = unbounded_channel();
1180        let _join_handle = LocalBarrierWorker::spawn(
1181            StreamEnvironment::for_test(),
1182            Arc::new(StreamingMetrics::unused()),
1183            None,
1184            Arc::new(AtomicU64::new(0)),
1185            rx,
1186        );
1187        EventSender(tx)
1188    }
1189
1190    pub fn for_test() -> Self {
1191        let (tx, mut rx) = unbounded_channel();
1192        let (failure_tx, failure_rx) = unbounded_channel();
1193        let _join_handle = tokio::spawn(async move {
1194            let _failure_rx = failure_rx;
1195            while rx.recv().await.is_some() {}
1196        });
1197        Self {
1198            barrier_event_sender: tx,
1199            actor_failure_sender: failure_tx,
1200        }
1201    }
1202}
1203
1204#[cfg(test)]
1205pub(crate) mod barrier_test_utils {
1206    use std::sync::Arc;
1207
1208    use assert_matches::assert_matches;
1209    use futures::StreamExt;
1210    use risingwave_pb::stream_service::streaming_control_stream_request::{
1211        InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1212    };
1213    use risingwave_pb::stream_service::{
1214        InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1215        streaming_control_stream_request, streaming_control_stream_response,
1216    };
1217    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1218    use tokio::sync::oneshot;
1219    use tokio_stream::wrappers::UnboundedReceiverStream;
1220    use tonic::Status;
1221
1222    use crate::executor::Barrier;
1223    use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation};
1224    use crate::task::{
1225        ActorId, LocalBarrierManager, SharedContext, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1226    };
1227
1228    pub(crate) struct LocalBarrierTestEnv {
1229        pub shared_context: Arc<SharedContext>,
1230        pub local_barrier_manager: LocalBarrierManager,
1231        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1232        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1233        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1234    }
1235
1236    impl LocalBarrierTestEnv {
1237        pub(crate) async fn for_test() -> Self {
1238            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1239
1240            let (request_tx, request_rx) = unbounded_channel();
1241            let (response_tx, mut response_rx) = unbounded_channel();
1242
1243            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1244                handle: ControlStreamHandle::new(
1245                    response_tx,
1246                    UnboundedReceiverStream::new(request_rx).boxed(),
1247                ),
1248                init_request: InitRequest {
1249                    databases: vec![PbDatabaseInitialPartialGraph {
1250                        database_id: TEST_DATABASE_ID.database_id,
1251                        graphs: vec![PbInitialPartialGraph {
1252                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1253                            subscriptions: vec![],
1254                            actor_infos: vec![],
1255                        }],
1256                    }],
1257                    term_id: "for_test".into(),
1258                },
1259            });
1260
1261            assert_matches!(
1262                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1263                streaming_control_stream_response::Response::Init(_)
1264            );
1265
1266            let (shared_context, local_barrier_manager) = actor_op_tx
1267                .send_and_await(LocalActorOperation::GetCurrentSharedContext)
1268                .await
1269                .unwrap();
1270
1271            Self {
1272                shared_context,
1273                local_barrier_manager,
1274                actor_op_tx,
1275                request_tx,
1276                response_rx,
1277            }
1278        }
1279
1280        pub(crate) fn inject_barrier(
1281            &self,
1282            barrier: &Barrier,
1283            actor_to_collect: impl IntoIterator<Item = ActorId>,
1284        ) {
1285            self.request_tx
1286                .send(Ok(StreamingControlStreamRequest {
1287                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1288                        InjectBarrierRequest {
1289                            request_id: "".to_owned(),
1290                            barrier: Some(barrier.to_protobuf()),
1291                            database_id: TEST_DATABASE_ID.database_id,
1292                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1293                            table_ids_to_sync: vec![],
1294                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1295                            broadcast_info: vec![],
1296                            actors_to_build: vec![],
1297                            subscriptions_to_add: vec![],
1298                            subscriptions_to_remove: vec![],
1299                        },
1300                    )),
1301                }))
1302                .unwrap();
1303        }
1304
1305        pub(crate) async fn flush_all_events(&self) {
1306            Self::flush_all_events_impl(&self.actor_op_tx).await
1307        }
1308
1309        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1310            let (tx, rx) = oneshot::channel();
1311            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1312            rx.await.unwrap()
1313        }
1314    }
1315}