risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::{BoxFuture, join_all};
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_pb::stream_plan::barrier::BarrierKind;
29use risingwave_pb::stream_service::barrier_complete_response::{
30    PbCreateMviewProgress, PbLocalSstableInfo,
31};
32use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
33use risingwave_storage::store_impl::AsHummock;
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tonic::{Code, Status};
40use tracing::warn;
41
42use self::managed_state::ManagedBarrierState;
43use crate::error::{ScoredStreamError, StreamError, StreamResult};
44#[cfg(test)]
45use crate::task::LocalBarrierManager;
46use crate::task::{
47    ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
48};
49
50pub mod managed_state;
51#[cfg(test)]
52mod tests;
53
54use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
55use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
56use risingwave_pb::stream_service::streaming_control_stream_request::{
57    DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
58};
59use risingwave_pb::stream_service::streaming_control_stream_response::{
60    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
61};
62use risingwave_pb::stream_service::{
63    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
64    StreamingControlStreamResponse, streaming_control_stream_response,
65};
66
67use crate::executor::Barrier;
68use crate::executor::exchange::permit::Receiver;
69use crate::executor::monitor::StreamingMetrics;
70use crate::task::barrier_worker::managed_state::{
71    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
72    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
73};
74
75/// If enabled, all actors will be grouped in the same tracing span within one epoch.
76/// Note that this option will significantly increase the overhead of tracing.
77pub const ENABLE_BARRIER_AGGREGATION: bool = false;
78
79/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::next_completed_epoch`].
80#[derive(Debug)]
81pub struct BarrierCompleteResult {
82    /// The result returned from `sync` of `StateStore`.
83    pub sync_result: Option<SyncResult>,
84
85    /// The updated creation progress of materialized view after this barrier.
86    pub create_mview_progress: Vec<PbCreateMviewProgress>,
87}
88
89/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
90/// Communicates with `ControlStreamManager` in meta.
91/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
92pub(super) struct ControlStreamHandle {
93    #[expect(clippy::type_complexity)]
94    pair: Option<(
95        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
96        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
97    )>,
98}
99
100impl ControlStreamHandle {
101    fn empty() -> Self {
102        Self { pair: None }
103    }
104
105    pub(super) fn new(
106        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
107        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
108    ) -> Self {
109        Self {
110            pair: Some((sender, request_stream)),
111        }
112    }
113
114    pub(super) fn connected(&self) -> bool {
115        self.pair.is_some()
116    }
117
118    fn reset_stream_with_err(&mut self, err: Status) {
119        if let Some((sender, _)) = self.pair.take() {
120            // Note: `TonicStatusWrapper` provides a better error report.
121            let err = TonicStatusWrapper::new(err);
122            warn!(error = %err.as_report(), "control stream reset with error");
123
124            let err = err.into_inner();
125            if sender.send(Err(err)).is_err() {
126                warn!("failed to notify reset of control stream");
127            }
128        }
129    }
130
131    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
132    /// by the meta service.
133    async fn shutdown_stream(&mut self) {
134        if let Some((sender, _)) = self.pair.take() {
135            if sender
136                .send(Ok(StreamingControlStreamResponse {
137                    response: Some(streaming_control_stream_response::Response::Shutdown(
138                        ShutdownResponse::default(),
139                    )),
140                }))
141                .is_err()
142            {
143                warn!("failed to notify shutdown of control stream");
144            } else {
145                tracing::info!("waiting for meta service to close control stream...");
146
147                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
148                // been acknowledged by the meta service for more precise error report.
149                //
150                // This is because the meta service will reset the control stream manager and
151                // drop the connection to us upon recovery. As a result, the receiver part of
152                // this sender will also be dropped, causing the stream to close.
153                sender.closed().await;
154            }
155        } else {
156            debug!("control stream has been reset, ignore shutdown");
157        }
158    }
159
160    pub(super) fn ack_reset_database(
161        &mut self,
162        database_id: DatabaseId,
163        root_err: Option<ScoredStreamError>,
164        reset_request_id: u32,
165    ) {
166        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
167            database_id: database_id.database_id,
168            root_err: root_err.map(|err| PbScoredError {
169                err_msg: err.error.to_report_string(),
170                score: err.score.0,
171            }),
172            reset_request_id,
173        }));
174    }
175
176    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
177        if let Some((sender, _)) = self.pair.as_ref() {
178            if sender
179                .send(Ok(StreamingControlStreamResponse {
180                    response: Some(response),
181                }))
182                .is_err()
183            {
184                self.pair = None;
185                warn!("fail to send response. control stream reset");
186            }
187        } else {
188            debug!(?response, "control stream has been reset. ignore response");
189        }
190    }
191
192    async fn next_request(&mut self) -> StreamingControlStreamRequest {
193        if let Some((_, stream)) = &mut self.pair {
194            match stream.next().await {
195                Some(Ok(request)) => {
196                    return request;
197                }
198                Some(Err(e)) => self.reset_stream_with_err(
199                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
200                        .context("failed to get request")
201                        .to_status_unnamed(Code::Internal),
202                ),
203                None => self.reset_stream_with_err(Status::internal("end of stream")),
204            }
205        }
206        pending().await
207    }
208}
209
210/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
211///
212/// See [`crate::task`] for architecture overview.
213#[derive(strum_macros::Display)]
214pub(super) enum LocalActorOperation {
215    NewControlStream {
216        handle: ControlStreamHandle,
217        init_request: InitRequest,
218    },
219    TakeReceiver {
220        database_id: DatabaseId,
221        term_id: String,
222        ids: UpDownActorIds,
223        result_sender: oneshot::Sender<StreamResult<Receiver>>,
224    },
225    #[cfg(test)]
226    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
227    #[cfg(test)]
228    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
229    #[cfg(test)]
230    Flush(oneshot::Sender<()>),
231    InspectState {
232        result_sender: oneshot::Sender<String>,
233    },
234    Shutdown {
235        result_sender: oneshot::Sender<()>,
236    },
237}
238
239pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
240    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
241    has_control_stream_connected: bool,
242}
243
244impl Display for LocalBarrierWorkerDebugInfo<'_> {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        writeln!(
247            f,
248            "\nhas_control_stream_connected: {}",
249            self.has_control_stream_connected
250        )?;
251
252        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
253            writeln!(
254                f,
255                "database {} status: {} managed_barrier_state:\n{}",
256                database_id.database_id,
257                status,
258                managed_barrier_state
259                    .as_ref()
260                    .map(ToString::to_string)
261                    .unwrap_or_default()
262            )?;
263        }
264        Ok(())
265    }
266}
267
268/// [`LocalBarrierWorker`] manages barrier control flow.
269/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
270/// barriers to and collects them from all actors, and finally reports the progress.
271///
272/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
273///
274/// See [`crate::task`] for architecture overview.
275pub(super) struct LocalBarrierWorker {
276    /// Current barrier collection state.
277    pub(super) state: ManagedBarrierState,
278
279    /// Futures will be finished in the order of epoch in ascending order.
280    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
281
282    control_stream_handle: ControlStreamHandle,
283
284    pub(super) actor_manager: Arc<StreamActorManager>,
285
286    pub(super) term_id: String,
287}
288
289impl LocalBarrierWorker {
290    pub(super) fn new(
291        actor_manager: Arc<StreamActorManager>,
292        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
293        term_id: String,
294    ) -> Self {
295        let state = ManagedBarrierState::new(
296            actor_manager.clone(),
297            initial_partial_graphs,
298            term_id.clone(),
299        );
300        Self {
301            state,
302            await_epoch_completed_futures: Default::default(),
303            control_stream_handle: ControlStreamHandle::empty(),
304            actor_manager,
305            term_id,
306        }
307    }
308
309    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
310        LocalBarrierWorkerDebugInfo {
311            managed_barrier_state: self
312                .state
313                .databases
314                .iter()
315                .map(|(database_id, status)| {
316                    (*database_id, {
317                        match status {
318                            DatabaseStatus::ReceivedExchangeRequest(_) => {
319                                ("ReceivedExchangeRequest".to_owned(), None)
320                            }
321                            DatabaseStatus::Running(state) => {
322                                ("running".to_owned(), Some(state.to_debug_info()))
323                            }
324                            DatabaseStatus::Suspended(state) => {
325                                (format!("suspended: {:?}", state.suspend_time), None)
326                            }
327                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
328                            DatabaseStatus::Unspecified => {
329                                unreachable!()
330                            }
331                        }
332                    })
333                })
334                .collect(),
335            has_control_stream_connected: self.control_stream_handle.connected(),
336        }
337    }
338
339    async fn next_completed_epoch(
340        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
341    ) -> (
342        DatabaseId,
343        PartialGraphId,
344        Barrier,
345        StreamResult<BarrierCompleteResult>,
346    ) {
347        poll_fn(|cx| {
348            for (database_id, futures) in &mut *futures {
349                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
350                    futures.poll_next_unpin(cx)
351                {
352                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
353                }
354            }
355            Poll::Pending
356        })
357        .await
358    }
359
360    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
361        loop {
362            select! {
363                biased;
364                (database_id, event) = self.state.next_event() => {
365                    match event {
366                        ManagedBarrierStateEvent::BarrierCollected{
367                            partial_graph_id,
368                            barrier,
369                        } => {
370                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
371                        }
372                        ManagedBarrierStateEvent::ActorError{
373                            actor_id,
374                            err,
375                        } => {
376                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
377                        }
378                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
379                            self.ack_database_reset(database_id, Some(output), reset_request_id);
380                        }
381                    }
382                }
383                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
384                    match result {
385                        Ok(result) => {
386                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
387                        }
388                        Err(err) => {
389                            // TODO: may only report as database failure instead of reset the stream
390                            // when the HummockUploader support partial recovery. Currently the HummockUploader
391                            // enter `Err` state and stop working until a global recovery to clear the uploader.
392                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
393                        }
394                    }
395                },
396                actor_op = actor_op_rx.recv() => {
397                    if let Some(actor_op) = actor_op {
398                        match actor_op {
399                            LocalActorOperation::NewControlStream { handle, init_request  } => {
400                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
401                                self.reset(init_request).await;
402                                self.control_stream_handle = handle;
403                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
404                            }
405                            LocalActorOperation::Shutdown { result_sender } => {
406                                if self.state.databases.values().any(|database| {
407                                    match database {
408                                        DatabaseStatus::Running(database) => {
409                                            !database.actor_states.is_empty()
410                                        }
411                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
412                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
413                                            false
414                                        }
415                                        DatabaseStatus::Unspecified => {
416                                            unreachable!()
417                                        }
418                                    }
419                                }) {
420                                    tracing::warn!(
421                                        "shutdown with running actors, scaling or migration will be triggered"
422                                    );
423                                }
424                                self.control_stream_handle.shutdown_stream().await;
425                                let _ = result_sender.send(());
426                            }
427                            actor_op => {
428                                self.handle_actor_op(actor_op);
429                            }
430                        }
431                    }
432                    else {
433                        break;
434                    }
435                },
436                request = self.control_stream_handle.next_request() => {
437                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
438                    if let Err((database_id, err)) = result {
439                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
440                    }
441                },
442            }
443        }
444    }
445
446    fn handle_streaming_control_request(
447        &mut self,
448        request: Request,
449    ) -> Result<(), (DatabaseId, StreamError)> {
450        match request {
451            Request::InjectBarrier(req) => {
452                let database_id = DatabaseId::new(req.database_id);
453                let result: StreamResult<()> = try {
454                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
455                    self.send_barrier(&barrier, req)?;
456                };
457                result.map_err(|e| (database_id, e))?;
458                Ok(())
459            }
460            Request::RemovePartialGraph(req) => {
461                self.remove_partial_graphs(
462                    DatabaseId::new(req.database_id),
463                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
464                );
465                Ok(())
466            }
467            Request::CreatePartialGraph(req) => {
468                self.add_partial_graph(
469                    DatabaseId::new(req.database_id),
470                    PartialGraphId::new(req.partial_graph_id),
471                );
472                Ok(())
473            }
474            Request::ResetDatabase(req) => {
475                self.reset_database(req);
476                Ok(())
477            }
478            Request::Init(_) => {
479                unreachable!()
480            }
481        }
482    }
483
484    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
485        match actor_op {
486            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
487                unreachable!("event {actor_op} should be handled separately in async context")
488            }
489            LocalActorOperation::TakeReceiver {
490                database_id,
491                term_id,
492                ids,
493                result_sender,
494            } => {
495                let err = if self.term_id != term_id {
496                    {
497                        warn!(
498                            ?ids,
499                            term_id,
500                            current_term_id = self.term_id,
501                            "take receiver on unmatched term_id"
502                        );
503                        anyhow!(
504                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
505                            ids,
506                            term_id,
507                            self.term_id
508                        )
509                    }
510                } else {
511                    match self.state.databases.entry(database_id) {
512                        Entry::Occupied(mut entry) => match entry.get_mut() {
513                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
514                                pending_requests.push((ids, result_sender));
515                                return;
516                            }
517                            DatabaseStatus::Running(database) => {
518                                let (upstream_actor_id, actor_id) = ids;
519                                database.new_actor_remote_output_request(
520                                    actor_id,
521                                    upstream_actor_id,
522                                    result_sender,
523                                );
524                                return;
525                            }
526                            DatabaseStatus::Suspended(_) => {
527                                anyhow!("database suspended")
528                            }
529                            DatabaseStatus::Resetting(_) => {
530                                anyhow!("database resetting")
531                            }
532                            DatabaseStatus::Unspecified => {
533                                unreachable!()
534                            }
535                        },
536                        Entry::Vacant(entry) => {
537                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
538                                ids,
539                                result_sender,
540                            )]));
541                            return;
542                        }
543                    }
544                };
545                let _ = result_sender.send(Err(err.into()));
546            }
547            #[cfg(test)]
548            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
549                let database_status = self
550                    .state
551                    .databases
552                    .get(&crate::task::TEST_DATABASE_ID)
553                    .unwrap();
554                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
555                let _ = sender.send(database_state.local_barrier_manager.clone());
556            }
557            #[cfg(test)]
558            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
559                let database_status = self
560                    .state
561                    .databases
562                    .get_mut(&crate::task::TEST_DATABASE_ID)
563                    .unwrap();
564
565                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
566                assert!(!database_state.actor_states.contains_key(&actor_id));
567                let requests = database_state
568                    .actor_pending_new_output_requests
569                    .remove(&actor_id)
570                    .unwrap();
571                let _ = sender.send(requests);
572            }
573            #[cfg(test)]
574            LocalActorOperation::Flush(sender) => {
575                use futures::FutureExt;
576                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
577                    self.handle_streaming_control_request(
578                        request.request.expect("should not be empty"),
579                    )
580                    .unwrap();
581                }
582                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
583                    match event {
584                        ManagedBarrierStateEvent::BarrierCollected {
585                            partial_graph_id,
586                            barrier,
587                        } => {
588                            self.complete_barrier(
589                                database_id,
590                                partial_graph_id,
591                                barrier.epoch.prev,
592                            );
593                        }
594                        ManagedBarrierStateEvent::ActorError { .. }
595                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
596                            unreachable!()
597                        }
598                    }
599                }
600                sender.send(()).unwrap()
601            }
602            LocalActorOperation::InspectState { result_sender } => {
603                let debug_info = self.to_debug_info();
604                let _ = result_sender.send(debug_info.to_string());
605            }
606        }
607    }
608}
609
610mod await_epoch_completed_future {
611    use std::future::Future;
612
613    use futures::FutureExt;
614    use futures::future::BoxFuture;
615    use risingwave_hummock_sdk::SyncResult;
616    use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
617
618    use crate::error::StreamResult;
619    use crate::executor::Barrier;
620    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
621
622    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
623        + 'static;
624
625    #[define_opaque(AwaitEpochCompletedFuture)]
626    pub(super) fn instrument_complete_barrier_future(
627        partial_graph_id: PartialGraphId,
628        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
629        barrier: Barrier,
630        barrier_await_tree_reg: Option<&await_tree::Registry>,
631        create_mview_progress: Vec<PbCreateMviewProgress>,
632    ) -> AwaitEpochCompletedFuture {
633        let prev_epoch = barrier.epoch.prev;
634        let future = async move {
635            if let Some(future) = complete_barrier_future {
636                let result = future.await;
637                result.map(Some)
638            } else {
639                Ok(None)
640            }
641        }
642        .map(move |result| {
643            (
644                partial_graph_id,
645                barrier,
646                result.map(|sync_result| BarrierCompleteResult {
647                    sync_result,
648                    create_mview_progress,
649                }),
650            )
651        });
652        if let Some(reg) = barrier_await_tree_reg {
653            reg.register(
654                await_tree_key::BarrierAwait { prev_epoch },
655                format!("SyncEpoch({})", prev_epoch),
656            )
657            .instrument(future)
658            .left_future()
659        } else {
660            future.right_future()
661        }
662    }
663}
664
665use await_epoch_completed_future::*;
666use risingwave_common::catalog::{DatabaseId, TableId};
667use risingwave_storage::{StateStoreImpl, dispatch_state_store};
668
669use crate::executor::exchange::permit;
670
671fn sync_epoch(
672    state_store: &StateStoreImpl,
673    streaming_metrics: &StreamingMetrics,
674    prev_epoch: u64,
675    table_ids: HashSet<TableId>,
676) -> BoxFuture<'static, StreamResult<SyncResult>> {
677    let timer = streaming_metrics.barrier_sync_latency.start_timer();
678
679    let state_store = state_store.clone();
680    let future = async move {
681        dispatch_state_store!(state_store, hummock, {
682            hummock.sync(vec![(prev_epoch, table_ids)]).await
683        })
684    };
685
686    future
687        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
688        .inspect_ok(move |_| {
689            timer.observe_duration();
690        })
691        .map_err(move |e| {
692            tracing::error!(
693                prev_epoch,
694                error = %e.as_report(),
695                "Failed to sync state store",
696            );
697            e.into()
698        })
699        .boxed()
700}
701
702impl LocalBarrierWorker {
703    fn complete_barrier(
704        &mut self,
705        database_id: DatabaseId,
706        partial_graph_id: PartialGraphId,
707        prev_epoch: u64,
708    ) {
709        {
710            let Some(database_state) = self
711                .state
712                .databases
713                .get_mut(&database_id)
714                .expect("should exist")
715                .state_for_request()
716            else {
717                return;
718            };
719            let (barrier, table_ids, create_mview_progress) =
720                database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
721
722            let complete_barrier_future = match &barrier.kind {
723                BarrierKind::Unspecified => unreachable!(),
724                BarrierKind::Initial => {
725                    tracing::info!(
726                        epoch = prev_epoch,
727                        "ignore sealing data for the first barrier"
728                    );
729                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
730                    None
731                }
732                BarrierKind::Barrier => None,
733                BarrierKind::Checkpoint => Some(sync_epoch(
734                    &self.actor_manager.env.state_store(),
735                    &self.actor_manager.streaming_metrics,
736                    prev_epoch,
737                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
738                )),
739            };
740
741            self.await_epoch_completed_futures
742                .entry(database_id)
743                .or_default()
744                .push_back({
745                    instrument_complete_barrier_future(
746                        partial_graph_id,
747                        complete_barrier_future,
748                        barrier,
749                        self.actor_manager.await_tree_reg.as_ref(),
750                        create_mview_progress,
751                    )
752                });
753        }
754    }
755
756    fn on_epoch_completed(
757        &mut self,
758        database_id: DatabaseId,
759        partial_graph_id: PartialGraphId,
760        epoch: u64,
761        result: BarrierCompleteResult,
762    ) {
763        let BarrierCompleteResult {
764            create_mview_progress,
765            sync_result,
766        } = result;
767
768        let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
769            .map(|sync_result| {
770                (
771                    sync_result.uncommitted_ssts,
772                    sync_result.table_watermarks,
773                    sync_result.old_value_ssts,
774                )
775            })
776            .unwrap_or_default();
777
778        let result = {
779            {
780                streaming_control_stream_response::Response::CompleteBarrier(
781                    BarrierCompleteResponse {
782                        request_id: "todo".to_owned(),
783                        partial_graph_id: partial_graph_id.into(),
784                        epoch,
785                        status: None,
786                        create_mview_progress,
787                        synced_sstables: synced_sstables
788                            .into_iter()
789                            .map(
790                                |LocalSstableInfo {
791                                     sst_info,
792                                     table_stats,
793                                     created_at,
794                                 }| PbLocalSstableInfo {
795                                    sst: Some(sst_info.into()),
796                                    table_stats_map: to_prost_table_stats_map(table_stats),
797                                    created_at,
798                                },
799                            )
800                            .collect_vec(),
801                        worker_id: self.actor_manager.env.worker_id(),
802                        table_watermarks: table_watermarks
803                            .into_iter()
804                            .map(|(key, value)| (key.table_id, value.into()))
805                            .collect(),
806                        old_value_sstables: old_value_ssts
807                            .into_iter()
808                            .map(|sst| sst.sst_info.into())
809                            .collect(),
810                        database_id: database_id.database_id,
811                    },
812                )
813            }
814        };
815
816        self.control_stream_handle.send_response(result);
817    }
818
819    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
820    /// barrier is finished, in managed mode.
821    ///
822    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
823    /// the root cause of the failure. The caller should then call `try_find_root_failure`
824    /// to find the root cause.
825    fn send_barrier(
826        &mut self,
827        barrier: &Barrier,
828        request: InjectBarrierRequest,
829    ) -> StreamResult<()> {
830        debug!(
831            target: "events::stream::barrier::manager::send",
832            "send barrier {:?}, actor_ids_to_collect = {:?}",
833            barrier,
834            request.actor_ids_to_collect
835        );
836
837        let database_status = self
838            .state
839            .databases
840            .get_mut(&DatabaseId::new(request.database_id))
841            .expect("should exist");
842        if let Some(state) = database_status.state_for_request() {
843            state.transform_to_issued(barrier, request)?;
844        }
845        Ok(())
846    }
847
848    fn remove_partial_graphs(
849        &mut self,
850        database_id: DatabaseId,
851        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
852    ) {
853        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
854            warn!(
855                database_id = database_id.database_id,
856                "database to remove partial graph not exist"
857            );
858            return;
859        };
860        let Some(database_state) = database_status.state_for_request() else {
861            warn!(
862                database_id = database_id.database_id,
863                "ignore remove partial graph request on err database",
864            );
865            return;
866        };
867        for partial_graph_id in partial_graph_ids {
868            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
869                assert!(
870                    graph.is_empty(),
871                    "non empty graph to be removed: {}",
872                    &graph
873                );
874            } else {
875                warn!(
876                    partial_graph_id = partial_graph_id.0,
877                    "no partial graph to remove"
878                );
879            }
880        }
881    }
882
883    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
884        let status = match self.state.databases.entry(database_id) {
885            Entry::Occupied(entry) => {
886                let status = entry.into_mut();
887                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
888                    let mut database = DatabaseManagedBarrierState::new(
889                        database_id,
890                        self.term_id.clone(),
891                        self.actor_manager.clone(),
892                        vec![],
893                    );
894                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
895                    {
896                        database.new_actor_remote_output_request(
897                            actor_id,
898                            upstream_actor_id,
899                            result_sender,
900                        );
901                    }
902                    *status = DatabaseStatus::Running(database);
903                }
904
905                status
906            }
907            Entry::Vacant(entry) => {
908                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
909                    database_id,
910                    self.term_id.clone(),
911                    self.actor_manager.clone(),
912                    vec![],
913                )))
914            }
915        };
916        if let Some(state) = status.state_for_request() {
917            assert!(
918                state
919                    .graph_states
920                    .insert(
921                        partial_graph_id,
922                        PartialGraphManagedBarrierState::new(&self.actor_manager)
923                    )
924                    .is_none()
925            );
926        }
927    }
928
929    fn reset_database(&mut self, req: ResetDatabaseRequest) {
930        let database_id = DatabaseId::new(req.database_id);
931        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
932            database_status.start_reset(
933                database_id,
934                self.await_epoch_completed_futures.remove(&database_id),
935                req.reset_request_id,
936            );
937        } else {
938            self.ack_database_reset(database_id, None, req.reset_request_id);
939        }
940    }
941
942    fn ack_database_reset(
943        &mut self,
944        database_id: DatabaseId,
945        reset_output: Option<ResetDatabaseOutput>,
946        reset_request_id: u32,
947    ) {
948        info!(
949            database_id = database_id.database_id,
950            "database reset successfully"
951        );
952        if let Some(reset_database) = self.state.databases.remove(&database_id) {
953            match reset_database {
954                DatabaseStatus::Resetting(_) => {}
955                _ => {
956                    unreachable!("must be resetting previously")
957                }
958            }
959        }
960        self.await_epoch_completed_futures.remove(&database_id);
961        self.control_stream_handle.ack_reset_database(
962            database_id,
963            reset_output.and_then(|output| output.root_err),
964            reset_request_id,
965        );
966    }
967
968    /// When some other failure happens (like failed to send barrier), the error is reported using
969    /// this function. The control stream will be responded with a message to notify about the error,
970    /// and the global barrier worker will later reset and rerun the database.
971    fn on_database_failure(
972        &mut self,
973        database_id: DatabaseId,
974        failed_actor: Option<ActorId>,
975        err: StreamError,
976        message: impl Into<String>,
977    ) {
978        let message = message.into();
979        error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
980        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
981        self.state
982            .databases
983            .get_mut(&database_id)
984            .expect("should exist")
985            .suspend(failed_actor, err, completing_futures);
986        self.control_stream_handle
987            .send_response(Response::ReportDatabaseFailure(
988                ReportDatabaseFailureResponse {
989                    database_id: database_id.database_id,
990                },
991            ));
992    }
993
994    /// Force stop all actors on this worker, and then drop their resources.
995    async fn reset(&mut self, init_request: InitRequest) {
996        join_all(
997            self.state
998                .databases
999                .values_mut()
1000                .map(|database| database.abort()),
1001        )
1002        .await;
1003        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1004            m.clear();
1005        }
1006
1007        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1008            hummock
1009                .clear_shared_buffer()
1010                .instrument_await("store_clear_shared_buffer".verbose())
1011                .await
1012        }
1013        self.actor_manager.env.dml_manager_ref().clear();
1014        *self = Self::new(
1015            self.actor_manager.clone(),
1016            init_request.databases,
1017            init_request.term_id,
1018        );
1019        self.actor_manager.env.client_pool().invalidate_all();
1020    }
1021
1022    /// Create a [`LocalBarrierWorker`] with managed mode.
1023    pub fn spawn(
1024        env: StreamEnvironment,
1025        streaming_metrics: Arc<StreamingMetrics>,
1026        await_tree_reg: Option<await_tree::Registry>,
1027        watermark_epoch: AtomicU64Ref,
1028        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1029    ) -> JoinHandle<()> {
1030        let runtime = {
1031            let mut builder = tokio::runtime::Builder::new_multi_thread();
1032            if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1033                builder.worker_threads(worker_threads_num);
1034            }
1035            builder
1036                .thread_name("rw-streaming")
1037                .enable_all()
1038                .build()
1039                .unwrap()
1040        };
1041
1042        let actor_manager = Arc::new(StreamActorManager {
1043            env: env.clone(),
1044            streaming_metrics,
1045            watermark_epoch,
1046            await_tree_reg,
1047            runtime: runtime.into(),
1048        });
1049        let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1050        tokio::spawn(worker.run(actor_op_rx))
1051    }
1052}
1053
1054pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1055
1056impl<T> Clone for EventSender<T> {
1057    fn clone(&self) -> Self {
1058        Self(self.0.clone())
1059    }
1060}
1061
1062impl<T> EventSender<T> {
1063    pub(super) fn send_event(&self, event: T) {
1064        self.0.send(event).expect("should be able to send event")
1065    }
1066
1067    pub(super) async fn send_and_await<RSP>(
1068        &self,
1069        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1070    ) -> StreamResult<RSP> {
1071        let (tx, rx) = oneshot::channel();
1072        let event = make_event(tx);
1073        self.send_event(event);
1074        rx.await
1075            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1076    }
1077}
1078
1079pub(crate) enum NewOutputRequest {
1080    Local(permit::Sender),
1081    Remote(permit::Sender),
1082}
1083
1084#[cfg(test)]
1085pub(crate) mod barrier_test_utils {
1086    use assert_matches::assert_matches;
1087    use futures::StreamExt;
1088    use risingwave_pb::stream_service::streaming_control_stream_request::{
1089        InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1090    };
1091    use risingwave_pb::stream_service::{
1092        InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1093        streaming_control_stream_request, streaming_control_stream_response,
1094    };
1095    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1096    use tokio::sync::oneshot;
1097    use tokio_stream::wrappers::UnboundedReceiverStream;
1098    use tonic::Status;
1099
1100    use crate::executor::Barrier;
1101    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1102    use crate::task::{
1103        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1104    };
1105
1106    pub(crate) struct LocalBarrierTestEnv {
1107        pub local_barrier_manager: LocalBarrierManager,
1108        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1109        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1110        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1111    }
1112
1113    impl LocalBarrierTestEnv {
1114        pub(crate) async fn for_test() -> Self {
1115            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1116
1117            let (request_tx, request_rx) = unbounded_channel();
1118            let (response_tx, mut response_rx) = unbounded_channel();
1119
1120            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1121                handle: ControlStreamHandle::new(
1122                    response_tx,
1123                    UnboundedReceiverStream::new(request_rx).boxed(),
1124                ),
1125                init_request: InitRequest {
1126                    databases: vec![PbDatabaseInitialPartialGraph {
1127                        database_id: TEST_DATABASE_ID.database_id,
1128                        graphs: vec![PbInitialPartialGraph {
1129                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1130                            subscriptions: vec![],
1131                        }],
1132                    }],
1133                    term_id: "for_test".into(),
1134                },
1135            });
1136
1137            assert_matches!(
1138                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1139                streaming_control_stream_response::Response::Init(_)
1140            );
1141
1142            let local_barrier_manager = actor_op_tx
1143                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1144                .await
1145                .unwrap();
1146
1147            Self {
1148                local_barrier_manager,
1149                actor_op_tx,
1150                request_tx,
1151                response_rx,
1152            }
1153        }
1154
1155        pub(crate) fn inject_barrier(
1156            &self,
1157            barrier: &Barrier,
1158            actor_to_collect: impl IntoIterator<Item = ActorId>,
1159        ) {
1160            self.request_tx
1161                .send(Ok(StreamingControlStreamRequest {
1162                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1163                        InjectBarrierRequest {
1164                            request_id: "".to_owned(),
1165                            barrier: Some(barrier.to_protobuf()),
1166                            database_id: TEST_DATABASE_ID.database_id,
1167                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1168                            table_ids_to_sync: vec![],
1169                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1170                            actors_to_build: vec![],
1171                            subscriptions_to_add: vec![],
1172                            subscriptions_to_remove: vec![],
1173                        },
1174                    )),
1175                }))
1176                .unwrap();
1177        }
1178
1179        pub(crate) async fn flush_all_events(&self) {
1180            Self::flush_all_events_impl(&self.actor_op_tx).await
1181        }
1182
1183        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1184            let (tx, rx) = oneshot::channel();
1185            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1186            rx.await.unwrap()
1187        }
1188
1189        pub(crate) async fn take_pending_new_output_requests(
1190            &self,
1191            actor_id: ActorId,
1192        ) -> Vec<(ActorId, NewOutputRequest)> {
1193            self.actor_op_tx
1194                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1195                .await
1196                .unwrap()
1197        }
1198    }
1199}