risingwave_stream/task/
barrier_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use await_tree::InstrumentAwait;
25use futures::future::BoxFuture;
26use futures::stream::{BoxStream, FuturesOrdered};
27use futures::{FutureExt, StreamExt, TryFutureExt};
28use itertools::Itertools;
29use risingwave_common::error::tonic::extra::{Score, ScoredError};
30use risingwave_pb::stream_plan::barrier::BarrierKind;
31use risingwave_pb::stream_service::barrier_complete_response::{
32    PbCreateMviewProgress, PbLocalSstableInfo,
33};
34use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
35use risingwave_storage::store_impl::AsHummock;
36use thiserror_ext::AsReport;
37use tokio::select;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
39use tokio::sync::{mpsc, oneshot};
40use tokio::task::JoinHandle;
41use tonic::{Code, Status};
42use tracing::warn;
43
44use self::managed_state::ManagedBarrierState;
45use crate::error::{IntoUnexpectedExit, StreamError, StreamResult};
46use crate::task::{ActorId, AtomicU64Ref, PartialGraphId, StreamEnvironment, UpDownActorIds};
47
48mod managed_state;
49mod progress;
50#[cfg(test)]
51mod tests;
52
53pub use progress::CreateMviewProgressReporter;
54use risingwave_common::util::epoch::EpochPair;
55use risingwave_common::util::runtime::BackgroundShutdownRuntime;
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59    DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
63};
64use risingwave_pb::stream_service::{
65    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
66    StreamingControlStreamResponse, streaming_control_stream_response,
67};
68
69use crate::executor::exchange::permit::{Receiver, channel_from_config};
70use crate::executor::monitor::StreamingMetrics;
71use crate::executor::{Barrier, BarrierInner, StreamExecutorError};
72use crate::task::barrier_manager::managed_state::{
73    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
74    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
75};
76use crate::task::barrier_manager::progress::BackfillState;
77
78/// If enabled, all actors will be grouped in the same tracing span within one epoch.
79/// Note that this option will significantly increase the overhead of tracing.
80pub const ENABLE_BARRIER_AGGREGATION: bool = false;
81
82/// Collect result of some barrier on current compute node. Will be reported to the meta service.
83#[derive(Debug)]
84pub struct BarrierCompleteResult {
85    /// The result returned from `sync` of `StateStore`.
86    pub sync_result: Option<SyncResult>,
87
88    /// The updated creation progress of materialized view after this barrier.
89    pub create_mview_progress: Vec<PbCreateMviewProgress>,
90}
91
92pub(super) struct ControlStreamHandle {
93    #[expect(clippy::type_complexity)]
94    pair: Option<(
95        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
96        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
97    )>,
98}
99
100impl ControlStreamHandle {
101    fn empty() -> Self {
102        Self { pair: None }
103    }
104
105    pub(super) fn new(
106        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
107        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
108    ) -> Self {
109        Self {
110            pair: Some((sender, request_stream)),
111        }
112    }
113
114    pub(super) fn connected(&self) -> bool {
115        self.pair.is_some()
116    }
117
118    fn reset_stream_with_err(&mut self, err: Status) {
119        if let Some((sender, _)) = self.pair.take() {
120            // Note: `TonicStatusWrapper` provides a better error report.
121            let err = TonicStatusWrapper::new(err);
122            warn!(error = %err.as_report(), "control stream reset with error");
123
124            let err = err.into_inner();
125            if sender.send(Err(err)).is_err() {
126                warn!("failed to notify reset of control stream");
127            }
128        }
129    }
130
131    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
132    /// by the meta service.
133    async fn shutdown_stream(&mut self) {
134        if let Some((sender, _)) = self.pair.take() {
135            if sender
136                .send(Ok(StreamingControlStreamResponse {
137                    response: Some(streaming_control_stream_response::Response::Shutdown(
138                        ShutdownResponse::default(),
139                    )),
140                }))
141                .is_err()
142            {
143                warn!("failed to notify shutdown of control stream");
144            } else {
145                tracing::info!("waiting for meta service to close control stream...");
146
147                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
148                // been acknowledged by the meta service for more precise error report.
149                //
150                // This is because the meta service will reset the control stream manager and
151                // drop the connection to us upon recovery. As a result, the receiver part of
152                // this sender will also be dropped, causing the stream to close.
153                sender.closed().await;
154            }
155        } else {
156            debug!("control stream has been reset, ignore shutdown");
157        }
158    }
159
160    pub(super) fn ack_reset_database(
161        &mut self,
162        database_id: DatabaseId,
163        root_err: Option<ScoredStreamError>,
164        reset_request_id: u32,
165    ) {
166        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
167            database_id: database_id.database_id,
168            root_err: root_err.map(|err| PbScoredError {
169                err_msg: err.error.to_report_string(),
170                score: err.score.0,
171            }),
172            reset_request_id,
173        }));
174    }
175
176    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
177        if let Some((sender, _)) = self.pair.as_ref() {
178            if sender
179                .send(Ok(StreamingControlStreamResponse {
180                    response: Some(response),
181                }))
182                .is_err()
183            {
184                self.pair = None;
185                warn!("fail to send response. control stream reset");
186            }
187        } else {
188            debug!(?response, "control stream has been reset. ignore response");
189        }
190    }
191
192    async fn next_request(&mut self) -> StreamingControlStreamRequest {
193        if let Some((_, stream)) = &mut self.pair {
194            match stream.next().await {
195                Some(Ok(request)) => {
196                    return request;
197                }
198                Some(Err(e)) => self.reset_stream_with_err(
199                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
200                        .context("failed to get request")
201                        .to_status_unnamed(Code::Internal),
202                ),
203                None => self.reset_stream_with_err(Status::internal("end of stream")),
204            }
205        }
206        pending().await
207    }
208}
209
210pub(super) enum LocalBarrierEvent {
211    ReportActorCollected {
212        actor_id: ActorId,
213        epoch: EpochPair,
214    },
215    ReportCreateProgress {
216        epoch: EpochPair,
217        actor: ActorId,
218        state: BackfillState,
219    },
220    RegisterBarrierSender {
221        actor_id: ActorId,
222        barrier_sender: mpsc::UnboundedSender<Barrier>,
223    },
224    RegisterLocalUpstreamOutput {
225        actor_id: ActorId,
226        upstream_actor_id: ActorId,
227        tx: permit::Sender,
228    },
229}
230
231#[derive(strum_macros::Display)]
232pub(super) enum LocalActorOperation {
233    NewControlStream {
234        handle: ControlStreamHandle,
235        init_request: InitRequest,
236    },
237    TakeReceiver {
238        database_id: DatabaseId,
239        term_id: String,
240        ids: UpDownActorIds,
241        result_sender: oneshot::Sender<StreamResult<Receiver>>,
242    },
243    #[cfg(test)]
244    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
245    #[cfg(test)]
246    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
247    #[cfg(test)]
248    Flush(oneshot::Sender<()>),
249    InspectState {
250        result_sender: oneshot::Sender<String>,
251    },
252    Shutdown {
253        result_sender: oneshot::Sender<()>,
254    },
255}
256
257pub(crate) struct StreamActorManager {
258    pub(super) env: StreamEnvironment,
259    pub(super) streaming_metrics: Arc<StreamingMetrics>,
260
261    /// Watermark epoch number.
262    pub(super) watermark_epoch: AtomicU64Ref,
263
264    /// Manages the await-trees of all actors.
265    pub(super) await_tree_reg: Option<await_tree::Registry>,
266
267    /// Runtime for the streaming actors.
268    pub(super) runtime: BackgroundShutdownRuntime,
269}
270
271pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
272    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
273    has_control_stream_connected: bool,
274}
275
276impl Display for LocalBarrierWorkerDebugInfo<'_> {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        writeln!(
279            f,
280            "\nhas_control_stream_connected: {}",
281            self.has_control_stream_connected
282        )?;
283
284        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
285            writeln!(
286                f,
287                "database {} status: {} managed_barrier_state:\n{}",
288                database_id.database_id,
289                status,
290                managed_barrier_state
291                    .as_ref()
292                    .map(ToString::to_string)
293                    .unwrap_or_default()
294            )?;
295        }
296        Ok(())
297    }
298}
299
300/// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager.
301/// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the
302/// barriers to and collect them from all actors, and finally report the progress.
303pub(super) struct LocalBarrierWorker {
304    /// Current barrier collection state.
305    pub(super) state: ManagedBarrierState,
306
307    /// Futures will be finished in the order of epoch in ascending order.
308    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
309
310    control_stream_handle: ControlStreamHandle,
311
312    pub(super) actor_manager: Arc<StreamActorManager>,
313
314    pub(super) term_id: String,
315}
316
317impl LocalBarrierWorker {
318    pub(super) fn new(
319        actor_manager: Arc<StreamActorManager>,
320        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
321        term_id: String,
322    ) -> Self {
323        let state = ManagedBarrierState::new(
324            actor_manager.clone(),
325            initial_partial_graphs,
326            term_id.clone(),
327        );
328        Self {
329            state,
330            await_epoch_completed_futures: Default::default(),
331            control_stream_handle: ControlStreamHandle::empty(),
332            actor_manager,
333            term_id,
334        }
335    }
336
337    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
338        LocalBarrierWorkerDebugInfo {
339            managed_barrier_state: self
340                .state
341                .databases
342                .iter()
343                .map(|(database_id, status)| {
344                    (*database_id, {
345                        match status {
346                            DatabaseStatus::ReceivedExchangeRequest(_) => {
347                                ("ReceivedExchangeRequest".to_owned(), None)
348                            }
349                            DatabaseStatus::Running(state) => {
350                                ("running".to_owned(), Some(state.to_debug_info()))
351                            }
352                            DatabaseStatus::Suspended(state) => {
353                                (format!("suspended: {:?}", state.suspend_time), None)
354                            }
355                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
356                            DatabaseStatus::Unspecified => {
357                                unreachable!()
358                            }
359                        }
360                    })
361                })
362                .collect(),
363            has_control_stream_connected: self.control_stream_handle.connected(),
364        }
365    }
366
367    async fn next_completed_epoch(
368        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
369    ) -> (
370        DatabaseId,
371        PartialGraphId,
372        Barrier,
373        StreamResult<BarrierCompleteResult>,
374    ) {
375        poll_fn(|cx| {
376            for (database_id, futures) in &mut *futures {
377                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
378                    futures.poll_next_unpin(cx)
379                {
380                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
381                }
382            }
383            Poll::Pending
384        })
385        .await
386    }
387
388    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
389        loop {
390            select! {
391                biased;
392                (database_id, event) = self.state.next_event() => {
393                    match event {
394                        ManagedBarrierStateEvent::BarrierCollected{
395                            partial_graph_id,
396                            barrier,
397                        } => {
398                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
399                        }
400                        ManagedBarrierStateEvent::ActorError{
401                            actor_id,
402                            err,
403                        } => {
404                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
405                        }
406                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
407                            self.ack_database_reset(database_id, Some(output), reset_request_id);
408                        }
409                    }
410                }
411                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
412                    match result {
413                        Ok(result) => {
414                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
415                        }
416                        Err(err) => {
417                            // TODO: may only report as database failure instead of reset the stream
418                            // when the HummockUploader support partial recovery. Currently the HummockUploader
419                            // enter `Err` state and stop working until a global recovery to clear the uploader.
420                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
421                        }
422                    }
423                },
424                actor_op = actor_op_rx.recv() => {
425                    if let Some(actor_op) = actor_op {
426                        match actor_op {
427                            LocalActorOperation::NewControlStream { handle, init_request  } => {
428                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
429                                self.reset(init_request).await;
430                                self.control_stream_handle = handle;
431                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
432                            }
433                            LocalActorOperation::Shutdown { result_sender } => {
434                                if self.state.databases.values().any(|database| {
435                                    match database {
436                                        DatabaseStatus::Running(database) => {
437                                            !database.actor_states.is_empty()
438                                        }
439                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
440                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
441                                            false
442                                        }
443                                        DatabaseStatus::Unspecified => {
444                                            unreachable!()
445                                        }
446                                    }
447                                }) {
448                                    tracing::warn!(
449                                        "shutdown with running actors, scaling or migration will be triggered"
450                                    );
451                                }
452                                self.control_stream_handle.shutdown_stream().await;
453                                let _ = result_sender.send(());
454                            }
455                            actor_op => {
456                                self.handle_actor_op(actor_op);
457                            }
458                        }
459                    }
460                    else {
461                        break;
462                    }
463                },
464                request = self.control_stream_handle.next_request() => {
465                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
466                    if let Err((database_id, err)) = result {
467                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
468                    }
469                },
470            }
471        }
472    }
473
474    fn handle_streaming_control_request(
475        &mut self,
476        request: Request,
477    ) -> Result<(), (DatabaseId, StreamError)> {
478        match request {
479            Request::InjectBarrier(req) => {
480                let database_id = DatabaseId::new(req.database_id);
481                let result: StreamResult<()> = try {
482                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
483                    self.send_barrier(&barrier, req)?;
484                };
485                result.map_err(|e| (database_id, e))?;
486                Ok(())
487            }
488            Request::RemovePartialGraph(req) => {
489                self.remove_partial_graphs(
490                    DatabaseId::new(req.database_id),
491                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
492                );
493                Ok(())
494            }
495            Request::CreatePartialGraph(req) => {
496                self.add_partial_graph(
497                    DatabaseId::new(req.database_id),
498                    PartialGraphId::new(req.partial_graph_id),
499                );
500                Ok(())
501            }
502            Request::ResetDatabase(req) => {
503                self.reset_database(req);
504                Ok(())
505            }
506            Request::Init(_) => {
507                unreachable!()
508            }
509        }
510    }
511
512    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
513        match actor_op {
514            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
515                unreachable!("event {actor_op} should be handled separately in async context")
516            }
517            LocalActorOperation::TakeReceiver {
518                database_id,
519                term_id,
520                ids,
521                result_sender,
522            } => {
523                let err = if self.term_id != term_id {
524                    {
525                        warn!(
526                            ?ids,
527                            term_id,
528                            current_term_id = self.term_id,
529                            "take receiver on unmatched term_id"
530                        );
531                        anyhow!(
532                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
533                            ids,
534                            term_id,
535                            self.term_id
536                        )
537                    }
538                } else {
539                    match self.state.databases.entry(database_id) {
540                        Entry::Occupied(mut entry) => match entry.get_mut() {
541                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
542                                pending_requests.push((ids, result_sender));
543                                return;
544                            }
545                            DatabaseStatus::Running(database) => {
546                                let (upstream_actor_id, actor_id) = ids;
547                                database.new_actor_remote_output_request(
548                                    actor_id,
549                                    upstream_actor_id,
550                                    result_sender,
551                                );
552                                return;
553                            }
554                            DatabaseStatus::Suspended(_) => {
555                                anyhow!("database suspended")
556                            }
557                            DatabaseStatus::Resetting(_) => {
558                                anyhow!("database resetting")
559                            }
560                            DatabaseStatus::Unspecified => {
561                                unreachable!()
562                            }
563                        },
564                        Entry::Vacant(entry) => {
565                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
566                                ids,
567                                result_sender,
568                            )]));
569                            return;
570                        }
571                    }
572                };
573                let _ = result_sender.send(Err(err.into()));
574            }
575            #[cfg(test)]
576            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
577                let database_status = self
578                    .state
579                    .databases
580                    .get(&crate::task::TEST_DATABASE_ID)
581                    .unwrap();
582                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
583                let _ = sender.send(database_state.local_barrier_manager.clone());
584            }
585            #[cfg(test)]
586            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
587                let database_status = self
588                    .state
589                    .databases
590                    .get_mut(&crate::task::TEST_DATABASE_ID)
591                    .unwrap();
592
593                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
594                assert!(!database_state.actor_states.contains_key(&actor_id));
595                let requests = database_state
596                    .actor_pending_new_output_requests
597                    .remove(&actor_id)
598                    .unwrap();
599                let _ = sender.send(requests);
600            }
601            #[cfg(test)]
602            LocalActorOperation::Flush(sender) => {
603                use futures::FutureExt;
604                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
605                    self.handle_streaming_control_request(
606                        request.request.expect("should not be empty"),
607                    )
608                    .unwrap();
609                }
610                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
611                    match event {
612                        ManagedBarrierStateEvent::BarrierCollected {
613                            partial_graph_id,
614                            barrier,
615                        } => {
616                            self.complete_barrier(
617                                database_id,
618                                partial_graph_id,
619                                barrier.epoch.prev,
620                            );
621                        }
622                        ManagedBarrierStateEvent::ActorError { .. }
623                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
624                            unreachable!()
625                        }
626                    }
627                }
628                sender.send(()).unwrap()
629            }
630            LocalActorOperation::InspectState { result_sender } => {
631                let debug_info = self.to_debug_info();
632                let _ = result_sender.send(debug_info.to_string());
633            }
634        }
635    }
636}
637
638mod await_epoch_completed_future {
639    use std::future::Future;
640
641    use futures::FutureExt;
642    use futures::future::BoxFuture;
643    use risingwave_hummock_sdk::SyncResult;
644    use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
645
646    use crate::error::StreamResult;
647    use crate::executor::Barrier;
648    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
649
650    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
651        + 'static;
652
653    #[define_opaque(AwaitEpochCompletedFuture)]
654    pub(super) fn instrument_complete_barrier_future(
655        partial_graph_id: PartialGraphId,
656        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
657        barrier: Barrier,
658        barrier_await_tree_reg: Option<&await_tree::Registry>,
659        create_mview_progress: Vec<PbCreateMviewProgress>,
660    ) -> AwaitEpochCompletedFuture {
661        let prev_epoch = barrier.epoch.prev;
662        let future = async move {
663            if let Some(future) = complete_barrier_future {
664                let result = future.await;
665                result.map(Some)
666            } else {
667                Ok(None)
668            }
669        }
670        .map(move |result| {
671            (
672                partial_graph_id,
673                barrier,
674                result.map(|sync_result| BarrierCompleteResult {
675                    sync_result,
676                    create_mview_progress,
677                }),
678            )
679        });
680        if let Some(reg) = barrier_await_tree_reg {
681            reg.register(
682                await_tree_key::BarrierAwait { prev_epoch },
683                format!("SyncEpoch({})", prev_epoch),
684            )
685            .instrument(future)
686            .left_future()
687        } else {
688            future.right_future()
689        }
690    }
691}
692
693use await_epoch_completed_future::*;
694use risingwave_common::catalog::{DatabaseId, TableId};
695use risingwave_storage::{StateStoreImpl, dispatch_state_store};
696
697use crate::executor::exchange::permit;
698
699fn sync_epoch(
700    state_store: &StateStoreImpl,
701    streaming_metrics: &StreamingMetrics,
702    prev_epoch: u64,
703    table_ids: HashSet<TableId>,
704) -> BoxFuture<'static, StreamResult<SyncResult>> {
705    let timer = streaming_metrics.barrier_sync_latency.start_timer();
706
707    let state_store = state_store.clone();
708    let future = async move {
709        dispatch_state_store!(state_store, hummock, {
710            hummock.sync(vec![(prev_epoch, table_ids)]).await
711        })
712    };
713
714    future
715        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
716        .inspect_ok(move |_| {
717            timer.observe_duration();
718        })
719        .map_err(move |e| {
720            tracing::error!(
721                prev_epoch,
722                error = %e.as_report(),
723                "Failed to sync state store",
724            );
725            e.into()
726        })
727        .boxed()
728}
729
730impl LocalBarrierWorker {
731    fn complete_barrier(
732        &mut self,
733        database_id: DatabaseId,
734        partial_graph_id: PartialGraphId,
735        prev_epoch: u64,
736    ) {
737        {
738            let Some(database_state) = self
739                .state
740                .databases
741                .get_mut(&database_id)
742                .expect("should exist")
743                .state_for_request()
744            else {
745                return;
746            };
747            let (barrier, table_ids, create_mview_progress) =
748                database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
749
750            let complete_barrier_future = match &barrier.kind {
751                BarrierKind::Unspecified => unreachable!(),
752                BarrierKind::Initial => {
753                    tracing::info!(
754                        epoch = prev_epoch,
755                        "ignore sealing data for the first barrier"
756                    );
757                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
758                    None
759                }
760                BarrierKind::Barrier => None,
761                BarrierKind::Checkpoint => Some(sync_epoch(
762                    &self.actor_manager.env.state_store(),
763                    &self.actor_manager.streaming_metrics,
764                    prev_epoch,
765                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
766                )),
767            };
768
769            self.await_epoch_completed_futures
770                .entry(database_id)
771                .or_default()
772                .push_back({
773                    instrument_complete_barrier_future(
774                        partial_graph_id,
775                        complete_barrier_future,
776                        barrier,
777                        self.actor_manager.await_tree_reg.as_ref(),
778                        create_mview_progress,
779                    )
780                });
781        }
782    }
783
784    fn on_epoch_completed(
785        &mut self,
786        database_id: DatabaseId,
787        partial_graph_id: PartialGraphId,
788        epoch: u64,
789        result: BarrierCompleteResult,
790    ) {
791        let BarrierCompleteResult {
792            create_mview_progress,
793            sync_result,
794        } = result;
795
796        let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
797            .map(|sync_result| {
798                (
799                    sync_result.uncommitted_ssts,
800                    sync_result.table_watermarks,
801                    sync_result.old_value_ssts,
802                )
803            })
804            .unwrap_or_default();
805
806        let result = {
807            {
808                streaming_control_stream_response::Response::CompleteBarrier(
809                    BarrierCompleteResponse {
810                        request_id: "todo".to_owned(),
811                        partial_graph_id: partial_graph_id.into(),
812                        epoch,
813                        status: None,
814                        create_mview_progress,
815                        synced_sstables: synced_sstables
816                            .into_iter()
817                            .map(
818                                |LocalSstableInfo {
819                                     sst_info,
820                                     table_stats,
821                                     created_at,
822                                 }| PbLocalSstableInfo {
823                                    sst: Some(sst_info.into()),
824                                    table_stats_map: to_prost_table_stats_map(table_stats),
825                                    created_at,
826                                },
827                            )
828                            .collect_vec(),
829                        worker_id: self.actor_manager.env.worker_id(),
830                        table_watermarks: table_watermarks
831                            .into_iter()
832                            .map(|(key, value)| (key.table_id, value.into()))
833                            .collect(),
834                        old_value_sstables: old_value_ssts
835                            .into_iter()
836                            .map(|sst| sst.sst_info.into())
837                            .collect(),
838                        database_id: database_id.database_id,
839                    },
840                )
841            }
842        };
843
844        self.control_stream_handle.send_response(result);
845    }
846
847    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
848    /// barrier is finished, in managed mode.
849    ///
850    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
851    /// the root cause of the failure. The caller should then call `try_find_root_failure`
852    /// to find the root cause.
853    fn send_barrier(
854        &mut self,
855        barrier: &Barrier,
856        request: InjectBarrierRequest,
857    ) -> StreamResult<()> {
858        debug!(
859            target: "events::stream::barrier::manager::send",
860            "send barrier {:?}, actor_ids_to_collect = {:?}",
861            barrier,
862            request.actor_ids_to_collect
863        );
864
865        let database_status = self
866            .state
867            .databases
868            .get_mut(&DatabaseId::new(request.database_id))
869            .expect("should exist");
870        if let Some(state) = database_status.state_for_request() {
871            state.transform_to_issued(barrier, request)?;
872        }
873        Ok(())
874    }
875
876    fn remove_partial_graphs(
877        &mut self,
878        database_id: DatabaseId,
879        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
880    ) {
881        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
882            warn!(
883                database_id = database_id.database_id,
884                "database to remove partial graph not exist"
885            );
886            return;
887        };
888        let Some(database_state) = database_status.state_for_request() else {
889            warn!(
890                database_id = database_id.database_id,
891                "ignore remove partial graph request on err database",
892            );
893            return;
894        };
895        for partial_graph_id in partial_graph_ids {
896            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
897                assert!(
898                    graph.is_empty(),
899                    "non empty graph to be removed: {}",
900                    &graph
901                );
902            } else {
903                warn!(
904                    partial_graph_id = partial_graph_id.0,
905                    "no partial graph to remove"
906                );
907            }
908        }
909    }
910
911    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
912        let status = match self.state.databases.entry(database_id) {
913            Entry::Occupied(entry) => {
914                let status = entry.into_mut();
915                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
916                    let mut database = DatabaseManagedBarrierState::new(
917                        database_id,
918                        self.term_id.clone(),
919                        self.actor_manager.clone(),
920                        vec![],
921                    );
922                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
923                    {
924                        database.new_actor_remote_output_request(
925                            actor_id,
926                            upstream_actor_id,
927                            result_sender,
928                        );
929                    }
930                    *status = DatabaseStatus::Running(database);
931                }
932
933                status
934            }
935            Entry::Vacant(entry) => {
936                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
937                    database_id,
938                    self.term_id.clone(),
939                    self.actor_manager.clone(),
940                    vec![],
941                )))
942            }
943        };
944        if let Some(state) = status.state_for_request() {
945            assert!(
946                state
947                    .graph_states
948                    .insert(
949                        partial_graph_id,
950                        PartialGraphManagedBarrierState::new(&self.actor_manager)
951                    )
952                    .is_none()
953            );
954        }
955    }
956
957    fn reset_database(&mut self, req: ResetDatabaseRequest) {
958        let database_id = DatabaseId::new(req.database_id);
959        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
960            database_status.start_reset(
961                database_id,
962                self.await_epoch_completed_futures.remove(&database_id),
963                req.reset_request_id,
964            );
965        } else {
966            self.ack_database_reset(database_id, None, req.reset_request_id);
967        }
968    }
969
970    fn ack_database_reset(
971        &mut self,
972        database_id: DatabaseId,
973        reset_output: Option<ResetDatabaseOutput>,
974        reset_request_id: u32,
975    ) {
976        info!(
977            database_id = database_id.database_id,
978            "database reset successfully"
979        );
980        if let Some(reset_database) = self.state.databases.remove(&database_id) {
981            match reset_database {
982                DatabaseStatus::Resetting(_) => {}
983                _ => {
984                    unreachable!("must be resetting previously")
985                }
986            }
987        }
988        self.await_epoch_completed_futures.remove(&database_id);
989        self.control_stream_handle.ack_reset_database(
990            database_id,
991            reset_output.and_then(|output| output.root_err),
992            reset_request_id,
993        );
994    }
995
996    /// When some other failure happens (like failed to send barrier), the error is reported using
997    /// this function. The control stream will be responded with a message to notify about the error,
998    /// and the global barrier worker will later reset and rerun the database.
999    fn on_database_failure(
1000        &mut self,
1001        database_id: DatabaseId,
1002        failed_actor: Option<ActorId>,
1003        err: StreamError,
1004        message: impl Into<String>,
1005    ) {
1006        let message = message.into();
1007        error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1008        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1009        self.state
1010            .databases
1011            .get_mut(&database_id)
1012            .expect("should exist")
1013            .suspend(failed_actor, err, completing_futures);
1014        self.control_stream_handle
1015            .send_response(Response::ReportDatabaseFailure(
1016                ReportDatabaseFailureResponse {
1017                    database_id: database_id.database_id,
1018                },
1019            ));
1020    }
1021}
1022
1023impl DatabaseManagedBarrierState {
1024    /// Collect actor errors for a while and find the one that might be the root cause.
1025    ///
1026    /// Returns `None` if there's no actor error received.
1027    async fn try_find_root_actor_failure(
1028        &mut self,
1029        first_failure: Option<(Option<ActorId>, StreamError)>,
1030    ) -> Option<ScoredStreamError> {
1031        let mut later_errs = vec![];
1032        // fetch more actor errors within a timeout
1033        let _ = tokio::time::timeout(Duration::from_secs(3), async {
1034            let mut uncollected_actors: HashSet<_> = self.actor_states.keys().cloned().collect();
1035            if let Some((Some(failed_actor), _)) = &first_failure {
1036                uncollected_actors.remove(failed_actor);
1037            }
1038            while !uncollected_actors.is_empty()
1039                && let Some((actor_id, error)) = self.actor_failure_rx.recv().await
1040            {
1041                uncollected_actors.remove(&actor_id);
1042                later_errs.push(error);
1043            }
1044        })
1045        .await;
1046
1047        first_failure
1048            .into_iter()
1049            .map(|(_, err)| err)
1050            .chain(later_errs.into_iter())
1051            .map(|e| e.with_score())
1052            .max_by_key(|e| e.score)
1053    }
1054}
1055
1056#[derive(Clone)]
1057pub struct LocalBarrierManager {
1058    barrier_event_sender: UnboundedSender<LocalBarrierEvent>,
1059    actor_failure_sender: UnboundedSender<(ActorId, StreamError)>,
1060    pub(crate) database_id: DatabaseId,
1061    pub(crate) term_id: String,
1062    pub(crate) env: StreamEnvironment,
1063}
1064
1065impl LocalBarrierWorker {
1066    /// Create a [`LocalBarrierWorker`] with managed mode.
1067    pub fn spawn(
1068        env: StreamEnvironment,
1069        streaming_metrics: Arc<StreamingMetrics>,
1070        await_tree_reg: Option<await_tree::Registry>,
1071        watermark_epoch: AtomicU64Ref,
1072        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1073    ) -> JoinHandle<()> {
1074        let runtime = {
1075            let mut builder = tokio::runtime::Builder::new_multi_thread();
1076            if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1077                builder.worker_threads(worker_threads_num);
1078            }
1079            builder
1080                .thread_name("rw-streaming")
1081                .enable_all()
1082                .build()
1083                .unwrap()
1084        };
1085
1086        let actor_manager = Arc::new(StreamActorManager {
1087            env: env.clone(),
1088            streaming_metrics,
1089            watermark_epoch,
1090            await_tree_reg,
1091            runtime: runtime.into(),
1092        });
1093        let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1094        tokio::spawn(worker.run(actor_op_rx))
1095    }
1096}
1097
1098pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1099
1100impl<T> Clone for EventSender<T> {
1101    fn clone(&self) -> Self {
1102        Self(self.0.clone())
1103    }
1104}
1105
1106impl<T> EventSender<T> {
1107    pub(super) fn send_event(&self, event: T) {
1108        self.0.send(event).expect("should be able to send event")
1109    }
1110
1111    pub(super) async fn send_and_await<RSP>(
1112        &self,
1113        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1114    ) -> StreamResult<RSP> {
1115        let (tx, rx) = oneshot::channel();
1116        let event = make_event(tx);
1117        self.send_event(event);
1118        rx.await
1119            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1120    }
1121}
1122
1123pub(crate) enum NewOutputRequest {
1124    Local(permit::Sender),
1125    Remote(permit::Sender),
1126}
1127
1128impl LocalBarrierManager {
1129    pub(super) fn new(
1130        database_id: DatabaseId,
1131        term_id: String,
1132        env: StreamEnvironment,
1133    ) -> (
1134        Self,
1135        UnboundedReceiver<LocalBarrierEvent>,
1136        UnboundedReceiver<(ActorId, StreamError)>,
1137    ) {
1138        let (event_tx, event_rx) = unbounded_channel();
1139        let (err_tx, err_rx) = unbounded_channel();
1140        (
1141            Self {
1142                barrier_event_sender: event_tx,
1143                actor_failure_sender: err_tx,
1144                database_id,
1145                term_id,
1146                env,
1147            },
1148            event_rx,
1149            err_rx,
1150        )
1151    }
1152
1153    fn send_event(&self, event: LocalBarrierEvent) {
1154        // ignore error, because the current barrier manager maybe a stale one
1155        let _ = self.barrier_event_sender.send(event);
1156    }
1157
1158    /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report
1159    /// and collect this barrier with its own `actor_id` using this function.
1160    pub fn collect<M>(&self, actor_id: ActorId, barrier: &BarrierInner<M>) {
1161        self.send_event(LocalBarrierEvent::ReportActorCollected {
1162            actor_id,
1163            epoch: barrier.epoch,
1164        })
1165    }
1166
1167    /// When a actor exit unexpectedly, it should report this event using this function, so meta
1168    /// will notice actor's exit while collecting.
1169    pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) {
1170        let _ = self
1171            .actor_failure_sender
1172            .send((actor_id, err.into_unexpected_exit(actor_id)));
1173    }
1174
1175    pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver<Barrier> {
1176        let (tx, rx) = mpsc::unbounded_channel();
1177        self.send_event(LocalBarrierEvent::RegisterBarrierSender {
1178            actor_id,
1179            barrier_sender: tx,
1180        });
1181        rx
1182    }
1183
1184    pub fn register_local_upstream_output(
1185        &self,
1186        actor_id: ActorId,
1187        upstream_actor_id: ActorId,
1188    ) -> permit::Receiver {
1189        let (tx, rx) = channel_from_config(self.env.config());
1190        self.send_event(LocalBarrierEvent::RegisterLocalUpstreamOutput {
1191            actor_id,
1192            upstream_actor_id,
1193            tx,
1194        });
1195        rx
1196    }
1197}
1198
1199/// A [`StreamError`] with a score, used to find the root cause of actor failures.
1200type ScoredStreamError = ScoredError<StreamError>;
1201
1202impl StreamError {
1203    /// Score the given error based on hard-coded rules.
1204    fn with_score(self) -> ScoredStreamError {
1205        // Explicitly list all error kinds here to notice developers to update this function when
1206        // there are changes in error kinds.
1207
1208        fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
1209            use crate::executor::error::ErrorKind;
1210            match e.inner() {
1211                // `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
1212                // and not the root cause.
1213                ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,
1214
1215                // Normal errors.
1216                ErrorKind::Uncategorized(_)
1217                | ErrorKind::Storage(_)
1218                | ErrorKind::ArrayError(_)
1219                | ErrorKind::ExprError(_)
1220                | ErrorKind::SerdeError(_)
1221                | ErrorKind::SinkError(_, _)
1222                | ErrorKind::RpcError(_)
1223                | ErrorKind::AlignBarrier(_, _)
1224                | ErrorKind::ConnectorError(_)
1225                | ErrorKind::DmlError(_)
1226                | ErrorKind::NotImplemented(_) => 999,
1227            }
1228        }
1229
1230        fn stream_error_score(e: &StreamError) -> i32 {
1231            use crate::error::ErrorKind;
1232            match e.inner() {
1233                // `UnexpectedExit` wraps the original error. Score on the inner error.
1234                ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
1235
1236                // `BarrierSend` is likely to be caused by actor exit and not the root cause.
1237                ErrorKind::BarrierSend { .. } => 1,
1238
1239                // Executor errors first.
1240                ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
1241
1242                // Then other errors.
1243                ErrorKind::Uncategorized(_)
1244                | ErrorKind::Storage(_)
1245                | ErrorKind::Expression(_)
1246                | ErrorKind::Array(_)
1247                | ErrorKind::Secret(_) => 1000,
1248            }
1249        }
1250
1251        let score = Score(stream_error_score(&self));
1252        ScoredStreamError { error: self, score }
1253    }
1254}
1255
1256#[cfg(test)]
1257impl LocalBarrierManager {
1258    fn spawn_for_test() -> EventSender<LocalActorOperation> {
1259        use std::sync::atomic::AtomicU64;
1260        let (tx, rx) = unbounded_channel();
1261        let _join_handle = LocalBarrierWorker::spawn(
1262            StreamEnvironment::for_test(),
1263            Arc::new(StreamingMetrics::unused()),
1264            None,
1265            Arc::new(AtomicU64::new(0)),
1266            rx,
1267        );
1268        EventSender(tx)
1269    }
1270}
1271
1272#[cfg(test)]
1273pub(crate) mod barrier_test_utils {
1274    use assert_matches::assert_matches;
1275    use futures::StreamExt;
1276    use risingwave_pb::stream_service::streaming_control_stream_request::{
1277        InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1278    };
1279    use risingwave_pb::stream_service::{
1280        InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1281        streaming_control_stream_request, streaming_control_stream_response,
1282    };
1283    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1284    use tokio::sync::oneshot;
1285    use tokio_stream::wrappers::UnboundedReceiverStream;
1286    use tonic::Status;
1287
1288    use crate::executor::Barrier;
1289    use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation};
1290    use crate::task::{
1291        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1292    };
1293
1294    pub(crate) struct LocalBarrierTestEnv {
1295        pub local_barrier_manager: LocalBarrierManager,
1296        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1297        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1298        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1299    }
1300
1301    impl LocalBarrierTestEnv {
1302        pub(crate) async fn for_test() -> Self {
1303            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1304
1305            let (request_tx, request_rx) = unbounded_channel();
1306            let (response_tx, mut response_rx) = unbounded_channel();
1307
1308            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1309                handle: ControlStreamHandle::new(
1310                    response_tx,
1311                    UnboundedReceiverStream::new(request_rx).boxed(),
1312                ),
1313                init_request: InitRequest {
1314                    databases: vec![PbDatabaseInitialPartialGraph {
1315                        database_id: TEST_DATABASE_ID.database_id,
1316                        graphs: vec![PbInitialPartialGraph {
1317                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1318                            subscriptions: vec![],
1319                        }],
1320                    }],
1321                    term_id: "for_test".into(),
1322                },
1323            });
1324
1325            assert_matches!(
1326                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1327                streaming_control_stream_response::Response::Init(_)
1328            );
1329
1330            let local_barrier_manager = actor_op_tx
1331                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1332                .await
1333                .unwrap();
1334
1335            Self {
1336                local_barrier_manager,
1337                actor_op_tx,
1338                request_tx,
1339                response_rx,
1340            }
1341        }
1342
1343        pub(crate) fn inject_barrier(
1344            &self,
1345            barrier: &Barrier,
1346            actor_to_collect: impl IntoIterator<Item = ActorId>,
1347        ) {
1348            self.request_tx
1349                .send(Ok(StreamingControlStreamRequest {
1350                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1351                        InjectBarrierRequest {
1352                            request_id: "".to_owned(),
1353                            barrier: Some(barrier.to_protobuf()),
1354                            database_id: TEST_DATABASE_ID.database_id,
1355                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1356                            table_ids_to_sync: vec![],
1357                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1358                            actors_to_build: vec![],
1359                            subscriptions_to_add: vec![],
1360                            subscriptions_to_remove: vec![],
1361                        },
1362                    )),
1363                }))
1364                .unwrap();
1365        }
1366
1367        pub(crate) async fn flush_all_events(&self) {
1368            Self::flush_all_events_impl(&self.actor_op_tx).await
1369        }
1370
1371        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1372            let (tx, rx) = oneshot::channel();
1373            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1374            rx.await.unwrap()
1375        }
1376
1377        pub(crate) async fn take_pending_new_output_requests(
1378            &self,
1379            actor_id: ActorId,
1380        ) -> Vec<(ActorId, NewOutputRequest)> {
1381            self.actor_op_tx
1382                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1383                .await
1384                .unwrap()
1385        }
1386    }
1387}