risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbCdcTableBackfillProgress, PbCreateMviewProgress, PbLocalSstableInfo,
30};
31use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
32use risingwave_storage::store_impl::AsHummock;
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot;
37use tokio::task::JoinHandle;
38use tonic::{Code, Status};
39use tracing::warn;
40
41use self::managed_state::ManagedBarrierState;
42use crate::error::{ScoredStreamError, StreamError, StreamResult};
43#[cfg(test)]
44use crate::task::LocalBarrierManager;
45use crate::task::{
46    ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
47};
48pub mod managed_state;
49#[cfg(test)]
50mod tests;
51
52use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
53use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
54use risingwave_pb::stream_service::streaming_control_stream_request::{
55    DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
56};
57use risingwave_pb::stream_service::streaming_control_stream_response::{
58    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
59};
60use risingwave_pb::stream_service::{
61    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
62    StreamingControlStreamResponse, streaming_control_stream_response,
63};
64
65use crate::executor::Barrier;
66use crate::executor::exchange::permit::Receiver;
67use crate::executor::monitor::StreamingMetrics;
68use crate::task::barrier_worker::managed_state::{
69    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
70    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
71};
72
73/// If enabled, all actors will be grouped in the same tracing span within one epoch.
74/// Note that this option will significantly increase the overhead of tracing.
75pub const ENABLE_BARRIER_AGGREGATION: bool = false;
76
77/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::on_epoch_completed`].
78#[derive(Debug)]
79pub struct BarrierCompleteResult {
80    /// The result returned from `sync` of `StateStore`.
81    pub sync_result: Option<SyncResult>,
82
83    /// The updated creation progress of materialized view after this barrier.
84    pub create_mview_progress: Vec<PbCreateMviewProgress>,
85
86    /// The source IDs that have finished loading data for refreshable batch sources.
87    pub load_finished_source_ids: Vec<u32>,
88
89    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
90}
91
92/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
93/// Communicates with `ControlStreamManager` in meta.
94/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
95pub(super) struct ControlStreamHandle {
96    #[expect(clippy::type_complexity)]
97    pair: Option<(
98        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
99        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
100    )>,
101}
102
103impl ControlStreamHandle {
104    fn empty() -> Self {
105        Self { pair: None }
106    }
107
108    pub(super) fn new(
109        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
110        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
111    ) -> Self {
112        Self {
113            pair: Some((sender, request_stream)),
114        }
115    }
116
117    pub(super) fn connected(&self) -> bool {
118        self.pair.is_some()
119    }
120
121    fn reset_stream_with_err(&mut self, err: Status) {
122        if let Some((sender, _)) = self.pair.take() {
123            // Note: `TonicStatusWrapper` provides a better error report.
124            let err = TonicStatusWrapper::new(err);
125            warn!(error = %err.as_report(), "control stream reset with error");
126
127            let err = err.into_inner();
128            if sender.send(Err(err)).is_err() {
129                warn!("failed to notify reset of control stream");
130            }
131        }
132    }
133
134    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
135    /// by the meta service.
136    async fn shutdown_stream(&mut self) {
137        if let Some((sender, _)) = self.pair.take() {
138            if sender
139                .send(Ok(StreamingControlStreamResponse {
140                    response: Some(streaming_control_stream_response::Response::Shutdown(
141                        ShutdownResponse::default(),
142                    )),
143                }))
144                .is_err()
145            {
146                warn!("failed to notify shutdown of control stream");
147            } else {
148                tracing::info!("waiting for meta service to close control stream...");
149
150                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
151                // been acknowledged by the meta service for more precise error report.
152                //
153                // This is because the meta service will reset the control stream manager and
154                // drop the connection to us upon recovery. As a result, the receiver part of
155                // this sender will also be dropped, causing the stream to close.
156                sender.closed().await;
157            }
158        } else {
159            debug!("control stream has been reset, ignore shutdown");
160        }
161    }
162
163    pub(super) fn ack_reset_database(
164        &mut self,
165        database_id: DatabaseId,
166        root_err: Option<ScoredStreamError>,
167        reset_request_id: u32,
168    ) {
169        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
170            database_id: database_id.database_id,
171            root_err: root_err.map(|err| PbScoredError {
172                err_msg: err.error.to_report_string(),
173                score: err.score.0,
174            }),
175            reset_request_id,
176        }));
177    }
178
179    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
180        if let Some((sender, _)) = self.pair.as_ref() {
181            if sender
182                .send(Ok(StreamingControlStreamResponse {
183                    response: Some(response),
184                }))
185                .is_err()
186            {
187                self.pair = None;
188                warn!("fail to send response. control stream reset");
189            }
190        } else {
191            debug!(?response, "control stream has been reset. ignore response");
192        }
193    }
194
195    async fn next_request(&mut self) -> StreamingControlStreamRequest {
196        if let Some((_, stream)) = &mut self.pair {
197            match stream.next().await {
198                Some(Ok(request)) => {
199                    return request;
200                }
201                Some(Err(e)) => self.reset_stream_with_err(
202                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
203                        .context("failed to get request")
204                        .to_status_unnamed(Code::Internal),
205                ),
206                None => self.reset_stream_with_err(Status::internal("end of stream")),
207            }
208        }
209        pending().await
210    }
211}
212
213/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
214///
215/// See [`crate::task`] for architecture overview.
216#[derive(strum_macros::Display)]
217pub(super) enum LocalActorOperation {
218    NewControlStream {
219        handle: ControlStreamHandle,
220        init_request: InitRequest,
221    },
222    TakeReceiver {
223        database_id: DatabaseId,
224        term_id: String,
225        ids: UpDownActorIds,
226        result_sender: oneshot::Sender<StreamResult<Receiver>>,
227    },
228    #[cfg(test)]
229    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
230    #[cfg(test)]
231    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
232    #[cfg(test)]
233    Flush(oneshot::Sender<()>),
234    InspectState {
235        result_sender: oneshot::Sender<String>,
236    },
237    Shutdown {
238        result_sender: oneshot::Sender<()>,
239    },
240}
241
242pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
243    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
244    has_control_stream_connected: bool,
245}
246
247impl Display for LocalBarrierWorkerDebugInfo<'_> {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        writeln!(
250            f,
251            "\nhas_control_stream_connected: {}",
252            self.has_control_stream_connected
253        )?;
254
255        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
256            writeln!(
257                f,
258                "database {} status: {} managed_barrier_state:\n{}",
259                database_id.database_id,
260                status,
261                managed_barrier_state
262                    .as_ref()
263                    .map(ToString::to_string)
264                    .unwrap_or_default()
265            )?;
266        }
267        Ok(())
268    }
269}
270
271/// [`LocalBarrierWorker`] manages barrier control flow.
272/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
273/// barriers to and collects them from all actors, and finally reports the progress.
274///
275/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
276///
277/// See [`crate::task`] for architecture overview.
278pub(super) struct LocalBarrierWorker {
279    /// Current barrier collection state.
280    pub(super) state: ManagedBarrierState,
281
282    /// Futures will be finished in the order of epoch in ascending order.
283    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
284
285    control_stream_handle: ControlStreamHandle,
286
287    pub(super) actor_manager: Arc<StreamActorManager>,
288
289    pub(super) term_id: String,
290}
291
292impl LocalBarrierWorker {
293    pub(super) fn new(
294        actor_manager: Arc<StreamActorManager>,
295        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
296        term_id: String,
297    ) -> Self {
298        let state = ManagedBarrierState::new(
299            actor_manager.clone(),
300            initial_partial_graphs,
301            term_id.clone(),
302        );
303        Self {
304            state,
305            await_epoch_completed_futures: Default::default(),
306            control_stream_handle: ControlStreamHandle::empty(),
307            actor_manager,
308            term_id,
309        }
310    }
311
312    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
313        LocalBarrierWorkerDebugInfo {
314            managed_barrier_state: self
315                .state
316                .databases
317                .iter()
318                .map(|(database_id, status)| {
319                    (*database_id, {
320                        match status {
321                            DatabaseStatus::ReceivedExchangeRequest(_) => {
322                                ("ReceivedExchangeRequest".to_owned(), None)
323                            }
324                            DatabaseStatus::Running(state) => {
325                                ("running".to_owned(), Some(state.to_debug_info()))
326                            }
327                            DatabaseStatus::Suspended(state) => {
328                                (format!("suspended: {:?}", state.suspend_time), None)
329                            }
330                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
331                            DatabaseStatus::Unspecified => {
332                                unreachable!()
333                            }
334                        }
335                    })
336                })
337                .collect(),
338            has_control_stream_connected: self.control_stream_handle.connected(),
339        }
340    }
341
342    async fn next_completed_epoch(
343        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
344    ) -> (
345        DatabaseId,
346        PartialGraphId,
347        Barrier,
348        StreamResult<BarrierCompleteResult>,
349    ) {
350        poll_fn(|cx| {
351            for (database_id, futures) in &mut *futures {
352                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
353                    futures.poll_next_unpin(cx)
354                {
355                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
356                }
357            }
358            Poll::Pending
359        })
360        .await
361    }
362
363    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
364        loop {
365            select! {
366                biased;
367                (database_id, event) = self.state.next_event() => {
368                    match event {
369                        ManagedBarrierStateEvent::BarrierCollected{
370                            partial_graph_id,
371                            barrier,
372                        } => {
373                            // update await_epoch_completed_futures
374                            // handled below in next_completed_epoch
375                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
376                        }
377                        ManagedBarrierStateEvent::ActorError{
378                            actor_id,
379                            err,
380                        } => {
381                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
382                        }
383                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
384                            self.ack_database_reset(database_id, Some(output), reset_request_id);
385                        }
386                    }
387                }
388                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
389                    match result {
390                        Ok(result) => {
391                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
392                        }
393                        Err(err) => {
394                            // TODO: may only report as database failure instead of reset the stream
395                            // when the HummockUploader support partial recovery. Currently the HummockUploader
396                            // enter `Err` state and stop working until a global recovery to clear the uploader.
397                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
398                        }
399                    }
400                },
401                actor_op = actor_op_rx.recv() => {
402                    if let Some(actor_op) = actor_op {
403                        match actor_op {
404                            LocalActorOperation::NewControlStream { handle, init_request  } => {
405                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
406                                self.reset(init_request).await;
407                                self.control_stream_handle = handle;
408                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
409                            }
410                            LocalActorOperation::Shutdown { result_sender } => {
411                                if self.state.databases.values().any(|database| {
412                                    match database {
413                                        DatabaseStatus::Running(database) => {
414                                            !database.actor_states.is_empty()
415                                        }
416                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
417                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
418                                            false
419                                        }
420                                        DatabaseStatus::Unspecified => {
421                                            unreachable!()
422                                        }
423                                    }
424                                }) {
425                                    tracing::warn!(
426                                        "shutdown with running actors, scaling or migration will be triggered"
427                                    );
428                                }
429                                self.control_stream_handle.shutdown_stream().await;
430                                let _ = result_sender.send(());
431                            }
432                            actor_op => {
433                                self.handle_actor_op(actor_op);
434                            }
435                        }
436                    }
437                    else {
438                        break;
439                    }
440                },
441                request = self.control_stream_handle.next_request() => {
442                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
443                    if let Err((database_id, err)) = result {
444                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
445                    }
446                },
447            }
448        }
449    }
450
451    fn handle_streaming_control_request(
452        &mut self,
453        request: Request,
454    ) -> Result<(), (DatabaseId, StreamError)> {
455        match request {
456            Request::InjectBarrier(req) => {
457                let database_id = DatabaseId::new(req.database_id);
458                let result: StreamResult<()> = try {
459                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
460                    self.send_barrier(&barrier, req)?;
461                };
462                result.map_err(|e| (database_id, e))?;
463                Ok(())
464            }
465            Request::RemovePartialGraph(req) => {
466                self.remove_partial_graphs(
467                    DatabaseId::new(req.database_id),
468                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
469                );
470                Ok(())
471            }
472            Request::CreatePartialGraph(req) => {
473                self.add_partial_graph(
474                    DatabaseId::new(req.database_id),
475                    PartialGraphId::new(req.partial_graph_id),
476                );
477                Ok(())
478            }
479            Request::ResetDatabase(req) => {
480                self.reset_database(req);
481                Ok(())
482            }
483            Request::Init(_) => {
484                unreachable!()
485            }
486        }
487    }
488
489    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
490        match actor_op {
491            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
492                unreachable!("event {actor_op} should be handled separately in async context")
493            }
494            LocalActorOperation::TakeReceiver {
495                database_id,
496                term_id,
497                ids,
498                result_sender,
499            } => {
500                let err = if self.term_id != term_id {
501                    {
502                        warn!(
503                            ?ids,
504                            term_id,
505                            current_term_id = self.term_id,
506                            "take receiver on unmatched term_id"
507                        );
508                        anyhow!(
509                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
510                            ids,
511                            term_id,
512                            self.term_id
513                        )
514                    }
515                } else {
516                    match self.state.databases.entry(database_id) {
517                        Entry::Occupied(mut entry) => match entry.get_mut() {
518                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
519                                pending_requests.push((ids, result_sender));
520                                return;
521                            }
522                            DatabaseStatus::Running(database) => {
523                                let (upstream_actor_id, actor_id) = ids;
524                                database.new_actor_remote_output_request(
525                                    actor_id,
526                                    upstream_actor_id,
527                                    result_sender,
528                                );
529                                return;
530                            }
531                            DatabaseStatus::Suspended(_) => {
532                                anyhow!("database suspended")
533                            }
534                            DatabaseStatus::Resetting(_) => {
535                                anyhow!("database resetting")
536                            }
537                            DatabaseStatus::Unspecified => {
538                                unreachable!()
539                            }
540                        },
541                        Entry::Vacant(entry) => {
542                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
543                                ids,
544                                result_sender,
545                            )]));
546                            return;
547                        }
548                    }
549                };
550                let _ = result_sender.send(Err(err.into()));
551            }
552            #[cfg(test)]
553            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
554                let database_status = self
555                    .state
556                    .databases
557                    .get(&crate::task::TEST_DATABASE_ID)
558                    .unwrap();
559                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
560                let _ = sender.send(database_state.local_barrier_manager.clone());
561            }
562            #[cfg(test)]
563            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
564                let database_status = self
565                    .state
566                    .databases
567                    .get_mut(&crate::task::TEST_DATABASE_ID)
568                    .unwrap();
569
570                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
571                assert!(!database_state.actor_states.contains_key(&actor_id));
572                let requests = database_state
573                    .actor_pending_new_output_requests
574                    .remove(&actor_id)
575                    .unwrap();
576                let _ = sender.send(requests);
577            }
578            #[cfg(test)]
579            LocalActorOperation::Flush(sender) => {
580                use futures::FutureExt;
581                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
582                    self.handle_streaming_control_request(
583                        request.request.expect("should not be empty"),
584                    )
585                    .unwrap();
586                }
587                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
588                    match event {
589                        ManagedBarrierStateEvent::BarrierCollected {
590                            partial_graph_id,
591                            barrier,
592                        } => {
593                            self.complete_barrier(
594                                database_id,
595                                partial_graph_id,
596                                barrier.epoch.prev,
597                            );
598                        }
599                        ManagedBarrierStateEvent::ActorError { .. }
600                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
601                            unreachable!()
602                        }
603                    }
604                }
605                sender.send(()).unwrap()
606            }
607            LocalActorOperation::InspectState { result_sender } => {
608                let debug_info = self.to_debug_info();
609                let _ = result_sender.send(debug_info.to_string());
610            }
611        }
612    }
613}
614
615mod await_epoch_completed_future {
616    use std::future::Future;
617
618    use futures::FutureExt;
619    use futures::future::BoxFuture;
620    use risingwave_hummock_sdk::SyncResult;
621    use risingwave_pb::stream_service::barrier_complete_response::{
622        PbCdcTableBackfillProgress, PbCreateMviewProgress,
623    };
624
625    use crate::error::StreamResult;
626    use crate::executor::Barrier;
627    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
628
629    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
630        + 'static;
631
632    #[define_opaque(AwaitEpochCompletedFuture)]
633    pub(super) fn instrument_complete_barrier_future(
634        partial_graph_id: PartialGraphId,
635        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
636        barrier: Barrier,
637        barrier_await_tree_reg: Option<&await_tree::Registry>,
638        create_mview_progress: Vec<PbCreateMviewProgress>,
639        load_finished_source_ids: Vec<u32>,
640        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
641    ) -> AwaitEpochCompletedFuture {
642        let prev_epoch = barrier.epoch.prev;
643        let future = async move {
644            if let Some(future) = complete_barrier_future {
645                let result = future.await;
646                result.map(Some)
647            } else {
648                Ok(None)
649            }
650        }
651        .map(move |result| {
652            (
653                partial_graph_id,
654                barrier,
655                result.map(|sync_result| BarrierCompleteResult {
656                    sync_result,
657                    create_mview_progress,
658                    load_finished_source_ids,
659                    cdc_table_backfill_progress,
660                }),
661            )
662        });
663        if let Some(reg) = barrier_await_tree_reg {
664            reg.register(
665                await_tree_key::BarrierAwait { prev_epoch },
666                format!("SyncEpoch({})", prev_epoch),
667            )
668            .instrument(future)
669            .left_future()
670        } else {
671            future.right_future()
672        }
673    }
674}
675
676use await_epoch_completed_future::*;
677use risingwave_common::catalog::{DatabaseId, TableId};
678use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
679use risingwave_storage::{StateStoreImpl, dispatch_state_store};
680
681use crate::executor::exchange::permit;
682
683fn sync_epoch(
684    state_store: &StateStoreImpl,
685    streaming_metrics: &StreamingMetrics,
686    prev_epoch: u64,
687    table_ids: HashSet<TableId>,
688) -> BoxFuture<'static, StreamResult<SyncResult>> {
689    let timer = streaming_metrics.barrier_sync_latency.start_timer();
690
691    let state_store = state_store.clone();
692    let future = async move {
693        dispatch_state_store!(state_store, hummock, {
694            hummock.sync(vec![(prev_epoch, table_ids)]).await
695        })
696    };
697
698    future
699        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
700        .inspect_ok(move |_| {
701            timer.observe_duration();
702        })
703        .map_err(move |e| {
704            tracing::error!(
705                prev_epoch,
706                error = %e.as_report(),
707                "Failed to sync state store",
708            );
709            e.into()
710        })
711        .boxed()
712}
713
714impl LocalBarrierWorker {
715    fn complete_barrier(
716        &mut self,
717        database_id: DatabaseId,
718        partial_graph_id: PartialGraphId,
719        prev_epoch: u64,
720    ) {
721        {
722            let Some(database_state) = self
723                .state
724                .databases
725                .get_mut(&database_id)
726                .expect("should exist")
727                .state_for_request()
728            else {
729                return;
730            };
731            let (
732                barrier,
733                table_ids,
734                create_mview_progress,
735                load_finished_source_ids,
736                cdc_table_backfill_progress,
737            ) = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
738
739            let complete_barrier_future = match &barrier.kind {
740                BarrierKind::Unspecified => unreachable!(),
741                BarrierKind::Initial => {
742                    tracing::info!(
743                        epoch = prev_epoch,
744                        "ignore sealing data for the first barrier"
745                    );
746                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
747                    None
748                }
749                BarrierKind::Barrier => None,
750                BarrierKind::Checkpoint => Some(sync_epoch(
751                    &self.actor_manager.env.state_store(),
752                    &self.actor_manager.streaming_metrics,
753                    prev_epoch,
754                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
755                )),
756            };
757
758            self.await_epoch_completed_futures
759                .entry(database_id)
760                .or_default()
761                .push_back({
762                    instrument_complete_barrier_future(
763                        partial_graph_id,
764                        complete_barrier_future,
765                        barrier,
766                        self.actor_manager.await_tree_reg.as_ref(),
767                        create_mview_progress,
768                        load_finished_source_ids,
769                        cdc_table_backfill_progress,
770                    )
771                });
772        }
773    }
774
775    fn on_epoch_completed(
776        &mut self,
777        database_id: DatabaseId,
778        partial_graph_id: PartialGraphId,
779        epoch: u64,
780        result: BarrierCompleteResult,
781    ) {
782        let BarrierCompleteResult {
783            create_mview_progress,
784            sync_result,
785            load_finished_source_ids,
786            cdc_table_backfill_progress,
787        } = result;
788
789        let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
790            .map(|sync_result| {
791                (
792                    sync_result.uncommitted_ssts,
793                    sync_result.table_watermarks,
794                    sync_result.old_value_ssts,
795                    sync_result.vector_index_adds,
796                )
797            })
798            .unwrap_or_default();
799
800        let result = {
801            {
802                streaming_control_stream_response::Response::CompleteBarrier(
803                    BarrierCompleteResponse {
804                        request_id: "todo".to_owned(),
805                        partial_graph_id: partial_graph_id.into(),
806                        epoch,
807                        status: None,
808                        create_mview_progress,
809                        synced_sstables: synced_sstables
810                            .into_iter()
811                            .map(
812                                |LocalSstableInfo {
813                                     sst_info,
814                                     table_stats,
815                                     created_at,
816                                 }| PbLocalSstableInfo {
817                                    sst: Some(sst_info.into()),
818                                    table_stats_map: to_prost_table_stats_map(table_stats),
819                                    created_at,
820                                },
821                            )
822                            .collect_vec(),
823                        worker_id: self.actor_manager.env.worker_id(),
824                        table_watermarks: table_watermarks
825                            .into_iter()
826                            .map(|(key, value)| (key.table_id, value.into()))
827                            .collect(),
828                        old_value_sstables: old_value_ssts
829                            .into_iter()
830                            .map(|sst| sst.sst_info.into())
831                            .collect(),
832                        database_id: database_id.database_id,
833                        load_finished_source_ids,
834                        vector_index_adds: vector_index_adds
835                            .into_iter()
836                            .map(|(table_id, adds)| {
837                                (
838                                    table_id.table_id,
839                                    PbVectorIndexAdds {
840                                        adds: adds.into_iter().map(|add| add.into()).collect(),
841                                    },
842                                )
843                            })
844                            .collect(),
845                        cdc_table_backfill_progress,
846                    },
847                )
848            }
849        };
850
851        self.control_stream_handle.send_response(result);
852    }
853
854    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
855    /// barrier is finished, in managed mode.
856    ///
857    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
858    /// the root cause of the failure. The caller should then call `try_find_root_failure`
859    /// to find the root cause.
860    fn send_barrier(
861        &mut self,
862        barrier: &Barrier,
863        request: InjectBarrierRequest,
864    ) -> StreamResult<()> {
865        debug!(
866            target: "events::stream::barrier::manager::send",
867            "send barrier {:?}, actor_ids_to_collect = {:?}",
868            barrier,
869            request.actor_ids_to_collect
870        );
871
872        let database_status = self
873            .state
874            .databases
875            .get_mut(&DatabaseId::new(request.database_id))
876            .expect("should exist");
877        if let Some(state) = database_status.state_for_request() {
878            state.transform_to_issued(barrier, request)?;
879        }
880        Ok(())
881    }
882
883    fn remove_partial_graphs(
884        &mut self,
885        database_id: DatabaseId,
886        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
887    ) {
888        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
889            warn!(
890                database_id = database_id.database_id,
891                "database to remove partial graph not exist"
892            );
893            return;
894        };
895        let Some(database_state) = database_status.state_for_request() else {
896            warn!(
897                database_id = database_id.database_id,
898                "ignore remove partial graph request on err database",
899            );
900            return;
901        };
902        for partial_graph_id in partial_graph_ids {
903            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
904                assert!(
905                    graph.is_empty(),
906                    "non empty graph to be removed: {}",
907                    &graph
908                );
909            } else {
910                warn!(
911                    partial_graph_id = partial_graph_id.0,
912                    "no partial graph to remove"
913                );
914            }
915        }
916    }
917
918    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
919        let status = match self.state.databases.entry(database_id) {
920            Entry::Occupied(entry) => {
921                let status = entry.into_mut();
922                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
923                    let mut database = DatabaseManagedBarrierState::new(
924                        database_id,
925                        self.term_id.clone(),
926                        self.actor_manager.clone(),
927                        vec![],
928                    );
929                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
930                    {
931                        database.new_actor_remote_output_request(
932                            actor_id,
933                            upstream_actor_id,
934                            result_sender,
935                        );
936                    }
937                    *status = DatabaseStatus::Running(database);
938                }
939
940                status
941            }
942            Entry::Vacant(entry) => {
943                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
944                    database_id,
945                    self.term_id.clone(),
946                    self.actor_manager.clone(),
947                    vec![],
948                )))
949            }
950        };
951        if let Some(state) = status.state_for_request() {
952            assert!(
953                state
954                    .graph_states
955                    .insert(
956                        partial_graph_id,
957                        PartialGraphManagedBarrierState::new(&self.actor_manager)
958                    )
959                    .is_none()
960            );
961        }
962    }
963
964    fn reset_database(&mut self, req: ResetDatabaseRequest) {
965        let database_id = DatabaseId::new(req.database_id);
966        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
967            database_status.start_reset(
968                database_id,
969                self.await_epoch_completed_futures.remove(&database_id),
970                req.reset_request_id,
971            );
972        } else {
973            self.ack_database_reset(database_id, None, req.reset_request_id);
974        }
975    }
976
977    fn ack_database_reset(
978        &mut self,
979        database_id: DatabaseId,
980        reset_output: Option<ResetDatabaseOutput>,
981        reset_request_id: u32,
982    ) {
983        info!(
984            database_id = database_id.database_id,
985            "database reset successfully"
986        );
987        if let Some(reset_database) = self.state.databases.remove(&database_id) {
988            match reset_database {
989                DatabaseStatus::Resetting(_) => {}
990                _ => {
991                    unreachable!("must be resetting previously")
992                }
993            }
994        }
995        self.await_epoch_completed_futures.remove(&database_id);
996        self.control_stream_handle.ack_reset_database(
997            database_id,
998            reset_output.and_then(|output| output.root_err),
999            reset_request_id,
1000        );
1001    }
1002
1003    /// When some other failure happens (like failed to send barrier), the error is reported using
1004    /// this function. The control stream will be responded with a message to notify about the error,
1005    /// and the global barrier worker will later reset and rerun the database.
1006    fn on_database_failure(
1007        &mut self,
1008        database_id: DatabaseId,
1009        failed_actor: Option<ActorId>,
1010        err: StreamError,
1011        message: impl Into<String>,
1012    ) {
1013        let message = message.into();
1014        error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1015        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1016        self.state
1017            .databases
1018            .get_mut(&database_id)
1019            .expect("should exist")
1020            .suspend(failed_actor, err, completing_futures);
1021        self.control_stream_handle
1022            .send_response(Response::ReportDatabaseFailure(
1023                ReportDatabaseFailureResponse {
1024                    database_id: database_id.database_id,
1025                },
1026            ));
1027    }
1028
1029    /// Force stop all actors on this worker, and then drop their resources.
1030    async fn reset(&mut self, init_request: InitRequest) {
1031        join_all(
1032            self.state
1033                .databases
1034                .values_mut()
1035                .map(|database| database.abort()),
1036        )
1037        .await;
1038        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1039            m.clear();
1040        }
1041
1042        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1043            hummock
1044                .clear_shared_buffer()
1045                .instrument_await("store_clear_shared_buffer".verbose())
1046                .await
1047        }
1048        self.actor_manager.env.dml_manager_ref().clear();
1049        *self = Self::new(
1050            self.actor_manager.clone(),
1051            init_request.databases,
1052            init_request.term_id,
1053        );
1054        self.actor_manager.env.client_pool().invalidate_all();
1055    }
1056
1057    /// Create a [`LocalBarrierWorker`] with managed mode.
1058    pub fn spawn(
1059        env: StreamEnvironment,
1060        streaming_metrics: Arc<StreamingMetrics>,
1061        await_tree_reg: Option<await_tree::Registry>,
1062        watermark_epoch: AtomicU64Ref,
1063        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1064    ) -> JoinHandle<()> {
1065        let runtime = {
1066            let mut builder = tokio::runtime::Builder::new_multi_thread();
1067            if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1068                builder.worker_threads(worker_threads_num);
1069            }
1070            builder
1071                .thread_name("rw-streaming")
1072                .enable_all()
1073                .build()
1074                .unwrap()
1075        };
1076
1077        let actor_manager = Arc::new(StreamActorManager {
1078            env: env.clone(),
1079            streaming_metrics,
1080            watermark_epoch,
1081            await_tree_reg,
1082            runtime: runtime.into(),
1083        });
1084        let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1085        tokio::spawn(worker.run(actor_op_rx))
1086    }
1087}
1088
1089pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1090
1091impl<T> Clone for EventSender<T> {
1092    fn clone(&self) -> Self {
1093        Self(self.0.clone())
1094    }
1095}
1096
1097impl<T> EventSender<T> {
1098    pub(super) fn send_event(&self, event: T) {
1099        self.0.send(event).expect("should be able to send event")
1100    }
1101
1102    pub(super) async fn send_and_await<RSP>(
1103        &self,
1104        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1105    ) -> StreamResult<RSP> {
1106        let (tx, rx) = oneshot::channel();
1107        let event = make_event(tx);
1108        self.send_event(event);
1109        rx.await
1110            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1111    }
1112}
1113
1114pub(crate) enum NewOutputRequest {
1115    Local(permit::Sender),
1116    Remote(permit::Sender),
1117}
1118
1119#[cfg(test)]
1120pub(crate) mod barrier_test_utils {
1121    use assert_matches::assert_matches;
1122    use futures::StreamExt;
1123    use risingwave_pb::stream_service::streaming_control_stream_request::{
1124        InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1125    };
1126    use risingwave_pb::stream_service::{
1127        InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1128        streaming_control_stream_request, streaming_control_stream_response,
1129    };
1130    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1131    use tokio::sync::oneshot;
1132    use tokio_stream::wrappers::UnboundedReceiverStream;
1133    use tonic::Status;
1134
1135    use crate::executor::Barrier;
1136    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1137    use crate::task::{
1138        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1139    };
1140
1141    pub(crate) struct LocalBarrierTestEnv {
1142        pub local_barrier_manager: LocalBarrierManager,
1143        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1144        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1145        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1146    }
1147
1148    impl LocalBarrierTestEnv {
1149        pub(crate) async fn for_test() -> Self {
1150            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1151
1152            let (request_tx, request_rx) = unbounded_channel();
1153            let (response_tx, mut response_rx) = unbounded_channel();
1154
1155            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1156                handle: ControlStreamHandle::new(
1157                    response_tx,
1158                    UnboundedReceiverStream::new(request_rx).boxed(),
1159                ),
1160                init_request: InitRequest {
1161                    databases: vec![PbDatabaseInitialPartialGraph {
1162                        database_id: TEST_DATABASE_ID.database_id,
1163                        graphs: vec![PbInitialPartialGraph {
1164                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1165                            subscriptions: vec![],
1166                        }],
1167                    }],
1168                    term_id: "for_test".into(),
1169                },
1170            });
1171
1172            assert_matches!(
1173                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1174                streaming_control_stream_response::Response::Init(_)
1175            );
1176
1177            let local_barrier_manager = actor_op_tx
1178                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1179                .await
1180                .unwrap();
1181
1182            Self {
1183                local_barrier_manager,
1184                actor_op_tx,
1185                request_tx,
1186                response_rx,
1187            }
1188        }
1189
1190        pub(crate) fn inject_barrier(
1191            &self,
1192            barrier: &Barrier,
1193            actor_to_collect: impl IntoIterator<Item = ActorId>,
1194        ) {
1195            self.request_tx
1196                .send(Ok(StreamingControlStreamRequest {
1197                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1198                        InjectBarrierRequest {
1199                            request_id: "".to_owned(),
1200                            barrier: Some(barrier.to_protobuf()),
1201                            database_id: TEST_DATABASE_ID.database_id,
1202                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1203                            table_ids_to_sync: vec![],
1204                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1205                            actors_to_build: vec![],
1206                            subscriptions_to_add: vec![],
1207                            subscriptions_to_remove: vec![],
1208                        },
1209                    )),
1210                }))
1211                .unwrap();
1212        }
1213
1214        pub(crate) async fn flush_all_events(&self) {
1215            Self::flush_all_events_impl(&self.actor_op_tx).await
1216        }
1217
1218        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1219            let (tx, rx) = oneshot::channel();
1220            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1221            rx.await.unwrap()
1222        }
1223
1224        pub(crate) async fn take_pending_new_output_requests(
1225            &self,
1226            actor_id: ActorId,
1227        ) -> Vec<(ActorId, NewOutputRequest)> {
1228            self.actor_op_tx
1229                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1230                .await
1231                .unwrap()
1232        }
1233    }
1234}