risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbCdcTableBackfillProgress, PbCreateMviewProgress, PbLocalSstableInfo,
30};
31use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
32use risingwave_storage::store_impl::AsHummock;
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot;
37use tokio::task::JoinHandle;
38use tonic::{Code, Status};
39use tracing::warn;
40
41use self::managed_state::ManagedBarrierState;
42use crate::error::{ScoredStreamError, StreamError, StreamResult};
43#[cfg(test)]
44use crate::task::LocalBarrierManager;
45use crate::task::managed_state::BarrierToComplete;
46use crate::task::{
47    ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
48};
49pub mod managed_state;
50#[cfg(test)]
51mod tests;
52
53use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
54use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
55use risingwave_pb::stream_service::streaming_control_stream_request::{
56    DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
57};
58use risingwave_pb::stream_service::streaming_control_stream_response::{
59    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
60};
61use risingwave_pb::stream_service::{
62    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
63    StreamingControlStreamResponse, streaming_control_stream_response,
64};
65
66use crate::executor::Barrier;
67use crate::executor::exchange::permit::Receiver;
68use crate::executor::monitor::StreamingMetrics;
69use crate::task::barrier_worker::managed_state::{
70    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
71    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
72};
73
74/// If enabled, all actors will be grouped in the same tracing span within one epoch.
75/// Note that this option will significantly increase the overhead of tracing.
76pub const ENABLE_BARRIER_AGGREGATION: bool = false;
77
78/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::on_epoch_completed`].
79#[derive(Debug)]
80pub struct BarrierCompleteResult {
81    /// The result returned from `sync` of `StateStore`.
82    pub sync_result: Option<SyncResult>,
83
84    /// The updated creation progress of materialized view after this barrier.
85    pub create_mview_progress: Vec<PbCreateMviewProgress>,
86
87    /// The source IDs that have finished loading data for refreshable batch sources.
88    pub load_finished_source_ids: Vec<u32>,
89
90    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
91
92    /// The table IDs that should be truncated.
93    pub truncate_tables: Vec<u32>,
94    /// The table IDs that have finished refresh.
95    pub refresh_finished_tables: Vec<u32>,
96}
97
98/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
99/// Communicates with `ControlStreamManager` in meta.
100/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
101pub(super) struct ControlStreamHandle {
102    #[expect(clippy::type_complexity)]
103    pair: Option<(
104        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
105        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
106    )>,
107}
108
109impl ControlStreamHandle {
110    fn empty() -> Self {
111        Self { pair: None }
112    }
113
114    pub(super) fn new(
115        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
116        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
117    ) -> Self {
118        Self {
119            pair: Some((sender, request_stream)),
120        }
121    }
122
123    pub(super) fn connected(&self) -> bool {
124        self.pair.is_some()
125    }
126
127    fn reset_stream_with_err(&mut self, err: Status) {
128        if let Some((sender, _)) = self.pair.take() {
129            // Note: `TonicStatusWrapper` provides a better error report.
130            let err = TonicStatusWrapper::new(err);
131            warn!(error = %err.as_report(), "control stream reset with error");
132
133            let err = err.into_inner();
134            if sender.send(Err(err)).is_err() {
135                warn!("failed to notify reset of control stream");
136            }
137        }
138    }
139
140    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
141    /// by the meta service.
142    async fn shutdown_stream(&mut self) {
143        if let Some((sender, _)) = self.pair.take() {
144            if sender
145                .send(Ok(StreamingControlStreamResponse {
146                    response: Some(streaming_control_stream_response::Response::Shutdown(
147                        ShutdownResponse::default(),
148                    )),
149                }))
150                .is_err()
151            {
152                warn!("failed to notify shutdown of control stream");
153            } else {
154                tracing::info!("waiting for meta service to close control stream...");
155
156                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
157                // been acknowledged by the meta service for more precise error report.
158                //
159                // This is because the meta service will reset the control stream manager and
160                // drop the connection to us upon recovery. As a result, the receiver part of
161                // this sender will also be dropped, causing the stream to close.
162                sender.closed().await;
163            }
164        } else {
165            debug!("control stream has been reset, ignore shutdown");
166        }
167    }
168
169    pub(super) fn ack_reset_database(
170        &mut self,
171        database_id: DatabaseId,
172        root_err: Option<ScoredStreamError>,
173        reset_request_id: u32,
174    ) {
175        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
176            database_id: database_id.database_id,
177            root_err: root_err.map(|err| PbScoredError {
178                err_msg: err.error.to_report_string(),
179                score: err.score.0,
180            }),
181            reset_request_id,
182        }));
183    }
184
185    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
186        if let Some((sender, _)) = self.pair.as_ref() {
187            if sender
188                .send(Ok(StreamingControlStreamResponse {
189                    response: Some(response),
190                }))
191                .is_err()
192            {
193                self.pair = None;
194                warn!("fail to send response. control stream reset");
195            }
196        } else {
197            debug!(?response, "control stream has been reset. ignore response");
198        }
199    }
200
201    async fn next_request(&mut self) -> StreamingControlStreamRequest {
202        if let Some((_, stream)) = &mut self.pair {
203            match stream.next().await {
204                Some(Ok(request)) => {
205                    return request;
206                }
207                Some(Err(e)) => self.reset_stream_with_err(
208                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
209                        .context("failed to get request")
210                        .to_status_unnamed(Code::Internal),
211                ),
212                None => self.reset_stream_with_err(Status::internal("end of stream")),
213            }
214        }
215        pending().await
216    }
217}
218
219/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
220///
221/// See [`crate::task`] for architecture overview.
222#[derive(strum_macros::Display)]
223pub(super) enum LocalActorOperation {
224    NewControlStream {
225        handle: ControlStreamHandle,
226        init_request: InitRequest,
227    },
228    TakeReceiver {
229        database_id: DatabaseId,
230        term_id: String,
231        ids: UpDownActorIds,
232        result_sender: oneshot::Sender<StreamResult<Receiver>>,
233    },
234    #[cfg(test)]
235    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
236    #[cfg(test)]
237    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
238    #[cfg(test)]
239    Flush(oneshot::Sender<()>),
240    InspectState {
241        result_sender: oneshot::Sender<String>,
242    },
243    Shutdown {
244        result_sender: oneshot::Sender<()>,
245    },
246}
247
248pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
249    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
250    has_control_stream_connected: bool,
251}
252
253impl Display for LocalBarrierWorkerDebugInfo<'_> {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        writeln!(
256            f,
257            "\nhas_control_stream_connected: {}",
258            self.has_control_stream_connected
259        )?;
260
261        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
262            writeln!(
263                f,
264                "database {} status: {} managed_barrier_state:\n{}",
265                database_id.database_id,
266                status,
267                managed_barrier_state
268                    .as_ref()
269                    .map(ToString::to_string)
270                    .unwrap_or_default()
271            )?;
272        }
273        Ok(())
274    }
275}
276
277/// [`LocalBarrierWorker`] manages barrier control flow.
278/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
279/// barriers to and collects them from all actors, and finally reports the progress.
280///
281/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
282///
283/// See [`crate::task`] for architecture overview.
284pub(super) struct LocalBarrierWorker {
285    /// Current barrier collection state.
286    pub(super) state: ManagedBarrierState,
287
288    /// Futures will be finished in the order of epoch in ascending order.
289    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
290
291    control_stream_handle: ControlStreamHandle,
292
293    pub(super) actor_manager: Arc<StreamActorManager>,
294
295    pub(super) term_id: String,
296}
297
298impl LocalBarrierWorker {
299    pub(super) fn new(
300        actor_manager: Arc<StreamActorManager>,
301        initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
302        term_id: String,
303    ) -> Self {
304        let state = ManagedBarrierState::new(
305            actor_manager.clone(),
306            initial_partial_graphs,
307            term_id.clone(),
308        );
309        Self {
310            state,
311            await_epoch_completed_futures: Default::default(),
312            control_stream_handle: ControlStreamHandle::empty(),
313            actor_manager,
314            term_id,
315        }
316    }
317
318    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
319        LocalBarrierWorkerDebugInfo {
320            managed_barrier_state: self
321                .state
322                .databases
323                .iter()
324                .map(|(database_id, status)| {
325                    (*database_id, {
326                        match status {
327                            DatabaseStatus::ReceivedExchangeRequest(_) => {
328                                ("ReceivedExchangeRequest".to_owned(), None)
329                            }
330                            DatabaseStatus::Running(state) => {
331                                ("running".to_owned(), Some(state.to_debug_info()))
332                            }
333                            DatabaseStatus::Suspended(state) => {
334                                (format!("suspended: {:?}", state.suspend_time), None)
335                            }
336                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
337                            DatabaseStatus::Unspecified => {
338                                unreachable!()
339                            }
340                        }
341                    })
342                })
343                .collect(),
344            has_control_stream_connected: self.control_stream_handle.connected(),
345        }
346    }
347
348    async fn next_completed_epoch(
349        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
350    ) -> (
351        DatabaseId,
352        PartialGraphId,
353        Barrier,
354        StreamResult<BarrierCompleteResult>,
355    ) {
356        poll_fn(|cx| {
357            for (database_id, futures) in &mut *futures {
358                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
359                    futures.poll_next_unpin(cx)
360                {
361                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
362                }
363            }
364            Poll::Pending
365        })
366        .await
367    }
368
369    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
370        loop {
371            select! {
372                biased;
373                (database_id, event) = self.state.next_event() => {
374                    match event {
375                        ManagedBarrierStateEvent::BarrierCollected{
376                            partial_graph_id,
377                            barrier,
378                        } => {
379                            // update await_epoch_completed_futures
380                            // handled below in next_completed_epoch
381                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
382                        }
383                        ManagedBarrierStateEvent::ActorError{
384                            actor_id,
385                            err,
386                        } => {
387                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
388                        }
389                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
390                            self.ack_database_reset(database_id, Some(output), reset_request_id);
391                        }
392                    }
393                }
394                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
395                    match result {
396                        Ok(result) => {
397                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
398                        }
399                        Err(err) => {
400                            // TODO: may only report as database failure instead of reset the stream
401                            // when the HummockUploader support partial recovery. Currently the HummockUploader
402                            // enter `Err` state and stop working until a global recovery to clear the uploader.
403                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
404                        }
405                    }
406                },
407                actor_op = actor_op_rx.recv() => {
408                    if let Some(actor_op) = actor_op {
409                        match actor_op {
410                            LocalActorOperation::NewControlStream { handle, init_request  } => {
411                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
412                                self.reset(init_request).await;
413                                self.control_stream_handle = handle;
414                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
415                            }
416                            LocalActorOperation::Shutdown { result_sender } => {
417                                if self.state.databases.values().any(|database| {
418                                    match database {
419                                        DatabaseStatus::Running(database) => {
420                                            !database.actor_states.is_empty()
421                                        }
422                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
423                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
424                                            false
425                                        }
426                                        DatabaseStatus::Unspecified => {
427                                            unreachable!()
428                                        }
429                                    }
430                                }) {
431                                    tracing::warn!(
432                                        "shutdown with running actors, scaling or migration will be triggered"
433                                    );
434                                }
435                                self.control_stream_handle.shutdown_stream().await;
436                                let _ = result_sender.send(());
437                            }
438                            actor_op => {
439                                self.handle_actor_op(actor_op);
440                            }
441                        }
442                    }
443                    else {
444                        break;
445                    }
446                },
447                request = self.control_stream_handle.next_request() => {
448                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
449                    if let Err((database_id, err)) = result {
450                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
451                    }
452                },
453            }
454        }
455    }
456
457    fn handle_streaming_control_request(
458        &mut self,
459        request: Request,
460    ) -> Result<(), (DatabaseId, StreamError)> {
461        match request {
462            Request::InjectBarrier(req) => {
463                let database_id = DatabaseId::new(req.database_id);
464                let result: StreamResult<()> = try {
465                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
466                    self.send_barrier(&barrier, req)?;
467                };
468                result.map_err(|e| (database_id, e))?;
469                Ok(())
470            }
471            Request::RemovePartialGraph(req) => {
472                self.remove_partial_graphs(
473                    DatabaseId::new(req.database_id),
474                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
475                );
476                Ok(())
477            }
478            Request::CreatePartialGraph(req) => {
479                self.add_partial_graph(
480                    DatabaseId::new(req.database_id),
481                    PartialGraphId::new(req.partial_graph_id),
482                );
483                Ok(())
484            }
485            Request::ResetDatabase(req) => {
486                self.reset_database(req);
487                Ok(())
488            }
489            Request::Init(_) => {
490                unreachable!()
491            }
492        }
493    }
494
495    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
496        match actor_op {
497            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
498                unreachable!("event {actor_op} should be handled separately in async context")
499            }
500            LocalActorOperation::TakeReceiver {
501                database_id,
502                term_id,
503                ids,
504                result_sender,
505            } => {
506                let err = if self.term_id != term_id {
507                    {
508                        warn!(
509                            ?ids,
510                            term_id,
511                            current_term_id = self.term_id,
512                            "take receiver on unmatched term_id"
513                        );
514                        anyhow!(
515                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
516                            ids,
517                            term_id,
518                            self.term_id
519                        )
520                    }
521                } else {
522                    match self.state.databases.entry(database_id) {
523                        Entry::Occupied(mut entry) => match entry.get_mut() {
524                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
525                                pending_requests.push((ids, result_sender));
526                                return;
527                            }
528                            DatabaseStatus::Running(database) => {
529                                let (upstream_actor_id, actor_id) = ids;
530                                database.new_actor_remote_output_request(
531                                    actor_id,
532                                    upstream_actor_id,
533                                    result_sender,
534                                );
535                                return;
536                            }
537                            DatabaseStatus::Suspended(_) => {
538                                anyhow!("database suspended")
539                            }
540                            DatabaseStatus::Resetting(_) => {
541                                anyhow!("database resetting")
542                            }
543                            DatabaseStatus::Unspecified => {
544                                unreachable!()
545                            }
546                        },
547                        Entry::Vacant(entry) => {
548                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
549                                ids,
550                                result_sender,
551                            )]));
552                            return;
553                        }
554                    }
555                };
556                let _ = result_sender.send(Err(err.into()));
557            }
558            #[cfg(test)]
559            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
560                let database_status = self
561                    .state
562                    .databases
563                    .get(&crate::task::TEST_DATABASE_ID)
564                    .unwrap();
565                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
566                let _ = sender.send(database_state.local_barrier_manager.clone());
567            }
568            #[cfg(test)]
569            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
570                let database_status = self
571                    .state
572                    .databases
573                    .get_mut(&crate::task::TEST_DATABASE_ID)
574                    .unwrap();
575
576                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
577                assert!(!database_state.actor_states.contains_key(&actor_id));
578                let requests = database_state
579                    .actor_pending_new_output_requests
580                    .remove(&actor_id)
581                    .unwrap();
582                let _ = sender.send(requests);
583            }
584            #[cfg(test)]
585            LocalActorOperation::Flush(sender) => {
586                use futures::FutureExt;
587                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
588                    self.handle_streaming_control_request(
589                        request.request.expect("should not be empty"),
590                    )
591                    .unwrap();
592                }
593                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
594                    match event {
595                        ManagedBarrierStateEvent::BarrierCollected {
596                            partial_graph_id,
597                            barrier,
598                        } => {
599                            self.complete_barrier(
600                                database_id,
601                                partial_graph_id,
602                                barrier.epoch.prev,
603                            );
604                        }
605                        ManagedBarrierStateEvent::ActorError { .. }
606                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
607                            unreachable!()
608                        }
609                    }
610                }
611                sender.send(()).unwrap()
612            }
613            LocalActorOperation::InspectState { result_sender } => {
614                let debug_info = self.to_debug_info();
615                let _ = result_sender.send(debug_info.to_string());
616            }
617        }
618    }
619}
620
621mod await_epoch_completed_future {
622    use std::future::Future;
623
624    use futures::FutureExt;
625    use futures::future::BoxFuture;
626    use risingwave_hummock_sdk::SyncResult;
627    use risingwave_pb::stream_service::barrier_complete_response::{
628        PbCdcTableBackfillProgress, PbCreateMviewProgress,
629    };
630
631    use crate::error::StreamResult;
632    use crate::executor::Barrier;
633    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
634
635    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
636        + 'static;
637
638    #[define_opaque(AwaitEpochCompletedFuture)]
639    #[expect(clippy::too_many_arguments)]
640    pub(super) fn instrument_complete_barrier_future(
641        partial_graph_id: PartialGraphId,
642        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
643        barrier: Barrier,
644        barrier_await_tree_reg: Option<&await_tree::Registry>,
645        create_mview_progress: Vec<PbCreateMviewProgress>,
646        load_finished_source_ids: Vec<u32>,
647        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
648        truncate_tables: Vec<u32>,
649        refresh_finished_tables: Vec<u32>,
650    ) -> AwaitEpochCompletedFuture {
651        let prev_epoch = barrier.epoch.prev;
652        let future = async move {
653            if let Some(future) = complete_barrier_future {
654                let result = future.await;
655                result.map(Some)
656            } else {
657                Ok(None)
658            }
659        }
660        .map(move |result| {
661            (
662                partial_graph_id,
663                barrier,
664                result.map(|sync_result| BarrierCompleteResult {
665                    sync_result,
666                    create_mview_progress,
667                    load_finished_source_ids,
668                    cdc_table_backfill_progress,
669                    truncate_tables,
670                    refresh_finished_tables,
671                }),
672            )
673        });
674        if let Some(reg) = barrier_await_tree_reg {
675            reg.register(
676                await_tree_key::BarrierAwait { prev_epoch },
677                format!("SyncEpoch({})", prev_epoch),
678            )
679            .instrument(future)
680            .left_future()
681        } else {
682            future.right_future()
683        }
684    }
685}
686
687use await_epoch_completed_future::*;
688use risingwave_common::catalog::{DatabaseId, TableId};
689use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
690use risingwave_storage::{StateStoreImpl, dispatch_state_store};
691
692use crate::executor::exchange::permit;
693
694fn sync_epoch(
695    state_store: &StateStoreImpl,
696    streaming_metrics: &StreamingMetrics,
697    prev_epoch: u64,
698    table_ids: HashSet<TableId>,
699) -> BoxFuture<'static, StreamResult<SyncResult>> {
700    let timer = streaming_metrics.barrier_sync_latency.start_timer();
701
702    let state_store = state_store.clone();
703    let future = async move {
704        dispatch_state_store!(state_store, hummock, {
705            hummock.sync(vec![(prev_epoch, table_ids)]).await
706        })
707    };
708
709    future
710        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
711        .inspect_ok(move |_| {
712            timer.observe_duration();
713        })
714        .map_err(move |e| {
715            tracing::error!(
716                prev_epoch,
717                error = %e.as_report(),
718                "Failed to sync state store",
719            );
720            e.into()
721        })
722        .boxed()
723}
724
725impl LocalBarrierWorker {
726    fn complete_barrier(
727        &mut self,
728        database_id: DatabaseId,
729        partial_graph_id: PartialGraphId,
730        prev_epoch: u64,
731    ) {
732        {
733            let Some(database_state) = self
734                .state
735                .databases
736                .get_mut(&database_id)
737                .expect("should exist")
738                .state_for_request()
739            else {
740                return;
741            };
742            let BarrierToComplete {
743                barrier,
744                table_ids,
745                create_mview_progress,
746                load_finished_source_ids,
747                cdc_table_backfill_progress,
748                truncate_tables,
749                refresh_finished_tables,
750            } = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
751
752            let complete_barrier_future = match &barrier.kind {
753                BarrierKind::Unspecified => unreachable!(),
754                BarrierKind::Initial => {
755                    tracing::info!(
756                        epoch = prev_epoch,
757                        "ignore sealing data for the first barrier"
758                    );
759                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
760                    None
761                }
762                BarrierKind::Barrier => None,
763                BarrierKind::Checkpoint => Some(sync_epoch(
764                    &self.actor_manager.env.state_store(),
765                    &self.actor_manager.streaming_metrics,
766                    prev_epoch,
767                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
768                )),
769            };
770
771            self.await_epoch_completed_futures
772                .entry(database_id)
773                .or_default()
774                .push_back({
775                    instrument_complete_barrier_future(
776                        partial_graph_id,
777                        complete_barrier_future,
778                        barrier,
779                        self.actor_manager.await_tree_reg.as_ref(),
780                        create_mview_progress,
781                        load_finished_source_ids,
782                        cdc_table_backfill_progress,
783                        truncate_tables,
784                        refresh_finished_tables,
785                    )
786                });
787        }
788    }
789
790    fn on_epoch_completed(
791        &mut self,
792        database_id: DatabaseId,
793        partial_graph_id: PartialGraphId,
794        epoch: u64,
795        result: BarrierCompleteResult,
796    ) {
797        let BarrierCompleteResult {
798            create_mview_progress,
799            sync_result,
800            load_finished_source_ids,
801            cdc_table_backfill_progress,
802            truncate_tables,
803            refresh_finished_tables,
804        } = result;
805
806        let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
807            .map(|sync_result| {
808                (
809                    sync_result.uncommitted_ssts,
810                    sync_result.table_watermarks,
811                    sync_result.old_value_ssts,
812                    sync_result.vector_index_adds,
813                )
814            })
815            .unwrap_or_default();
816
817        let result = {
818            {
819                streaming_control_stream_response::Response::CompleteBarrier(
820                    BarrierCompleteResponse {
821                        request_id: "todo".to_owned(),
822                        partial_graph_id: partial_graph_id.into(),
823                        epoch,
824                        status: None,
825                        create_mview_progress,
826                        synced_sstables: synced_sstables
827                            .into_iter()
828                            .map(
829                                |LocalSstableInfo {
830                                     sst_info,
831                                     table_stats,
832                                     created_at,
833                                 }| PbLocalSstableInfo {
834                                    sst: Some(sst_info.into()),
835                                    table_stats_map: to_prost_table_stats_map(table_stats),
836                                    created_at,
837                                },
838                            )
839                            .collect_vec(),
840                        worker_id: self.actor_manager.env.worker_id(),
841                        table_watermarks: table_watermarks
842                            .into_iter()
843                            .map(|(key, value)| (key.table_id, value.into()))
844                            .collect(),
845                        old_value_sstables: old_value_ssts
846                            .into_iter()
847                            .map(|sst| sst.sst_info.into())
848                            .collect(),
849                        database_id: database_id.database_id,
850                        load_finished_source_ids,
851                        vector_index_adds: vector_index_adds
852                            .into_iter()
853                            .map(|(table_id, adds)| {
854                                (
855                                    table_id.table_id,
856                                    PbVectorIndexAdds {
857                                        adds: adds.into_iter().map(|add| add.into()).collect(),
858                                    },
859                                )
860                            })
861                            .collect(),
862                        cdc_table_backfill_progress,
863                        truncate_tables,
864                        refresh_finished_tables,
865                    },
866                )
867            }
868        };
869
870        self.control_stream_handle.send_response(result);
871    }
872
873    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
874    /// barrier is finished, in managed mode.
875    ///
876    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
877    /// the root cause of the failure. The caller should then call `try_find_root_failure`
878    /// to find the root cause.
879    fn send_barrier(
880        &mut self,
881        barrier: &Barrier,
882        request: InjectBarrierRequest,
883    ) -> StreamResult<()> {
884        debug!(
885            target: "events::stream::barrier::manager::send",
886            "send barrier {:?}, actor_ids_to_collect = {:?}",
887            barrier,
888            request.actor_ids_to_collect
889        );
890
891        let database_status = self
892            .state
893            .databases
894            .get_mut(&DatabaseId::new(request.database_id))
895            .expect("should exist");
896        if let Some(state) = database_status.state_for_request() {
897            state.transform_to_issued(barrier, request)?;
898        }
899        Ok(())
900    }
901
902    fn remove_partial_graphs(
903        &mut self,
904        database_id: DatabaseId,
905        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
906    ) {
907        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
908            warn!(
909                database_id = database_id.database_id,
910                "database to remove partial graph not exist"
911            );
912            return;
913        };
914        let Some(database_state) = database_status.state_for_request() else {
915            warn!(
916                database_id = database_id.database_id,
917                "ignore remove partial graph request on err database",
918            );
919            return;
920        };
921        for partial_graph_id in partial_graph_ids {
922            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
923                assert!(
924                    graph.is_empty(),
925                    "non empty graph to be removed: {}",
926                    &graph
927                );
928            } else {
929                warn!(
930                    partial_graph_id = partial_graph_id.0,
931                    "no partial graph to remove"
932                );
933            }
934        }
935    }
936
937    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
938        let status = match self.state.databases.entry(database_id) {
939            Entry::Occupied(entry) => {
940                let status = entry.into_mut();
941                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
942                    let mut database = DatabaseManagedBarrierState::new(
943                        database_id,
944                        self.term_id.clone(),
945                        self.actor_manager.clone(),
946                        vec![],
947                    );
948                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
949                    {
950                        database.new_actor_remote_output_request(
951                            actor_id,
952                            upstream_actor_id,
953                            result_sender,
954                        );
955                    }
956                    *status = DatabaseStatus::Running(database);
957                }
958
959                status
960            }
961            Entry::Vacant(entry) => {
962                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
963                    database_id,
964                    self.term_id.clone(),
965                    self.actor_manager.clone(),
966                    vec![],
967                )))
968            }
969        };
970        if let Some(state) = status.state_for_request() {
971            assert!(
972                state
973                    .graph_states
974                    .insert(
975                        partial_graph_id,
976                        PartialGraphManagedBarrierState::new(&self.actor_manager)
977                    )
978                    .is_none()
979            );
980        }
981    }
982
983    fn reset_database(&mut self, req: ResetDatabaseRequest) {
984        let database_id = DatabaseId::new(req.database_id);
985        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
986            database_status.start_reset(
987                database_id,
988                self.await_epoch_completed_futures.remove(&database_id),
989                req.reset_request_id,
990            );
991        } else {
992            self.ack_database_reset(database_id, None, req.reset_request_id);
993        }
994    }
995
996    fn ack_database_reset(
997        &mut self,
998        database_id: DatabaseId,
999        reset_output: Option<ResetDatabaseOutput>,
1000        reset_request_id: u32,
1001    ) {
1002        info!(
1003            database_id = database_id.database_id,
1004            "database reset successfully"
1005        );
1006        if let Some(reset_database) = self.state.databases.remove(&database_id) {
1007            match reset_database {
1008                DatabaseStatus::Resetting(_) => {}
1009                _ => {
1010                    unreachable!("must be resetting previously")
1011                }
1012            }
1013        }
1014        self.await_epoch_completed_futures.remove(&database_id);
1015        self.control_stream_handle.ack_reset_database(
1016            database_id,
1017            reset_output.and_then(|output| output.root_err),
1018            reset_request_id,
1019        );
1020    }
1021
1022    /// When some other failure happens (like failed to send barrier), the error is reported using
1023    /// this function. The control stream will be responded with a message to notify about the error,
1024    /// and the global barrier worker will later reset and rerun the database.
1025    fn on_database_failure(
1026        &mut self,
1027        database_id: DatabaseId,
1028        failed_actor: Option<ActorId>,
1029        err: StreamError,
1030        message: impl Into<String>,
1031    ) {
1032        let message = message.into();
1033        error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1034        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1035        self.state
1036            .databases
1037            .get_mut(&database_id)
1038            .expect("should exist")
1039            .suspend(failed_actor, err, completing_futures);
1040        self.control_stream_handle
1041            .send_response(Response::ReportDatabaseFailure(
1042                ReportDatabaseFailureResponse {
1043                    database_id: database_id.database_id,
1044                },
1045            ));
1046    }
1047
1048    /// Force stop all actors on this worker, and then drop their resources.
1049    async fn reset(&mut self, init_request: InitRequest) {
1050        join_all(
1051            self.state
1052                .databases
1053                .values_mut()
1054                .map(|database| database.abort()),
1055        )
1056        .await;
1057        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1058            m.clear();
1059        }
1060
1061        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1062            hummock
1063                .clear_shared_buffer()
1064                .instrument_await("store_clear_shared_buffer".verbose())
1065                .await
1066        }
1067        self.actor_manager.env.dml_manager_ref().clear();
1068        *self = Self::new(
1069            self.actor_manager.clone(),
1070            init_request.databases,
1071            init_request.term_id,
1072        );
1073        self.actor_manager.env.client_pool().invalidate_all();
1074    }
1075
1076    /// Create a [`LocalBarrierWorker`] with managed mode.
1077    pub fn spawn(
1078        env: StreamEnvironment,
1079        streaming_metrics: Arc<StreamingMetrics>,
1080        await_tree_reg: Option<await_tree::Registry>,
1081        watermark_epoch: AtomicU64Ref,
1082        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1083    ) -> JoinHandle<()> {
1084        let runtime = {
1085            let mut builder = tokio::runtime::Builder::new_multi_thread();
1086            if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1087                builder.worker_threads(worker_threads_num);
1088            }
1089            builder
1090                .thread_name("rw-streaming")
1091                .enable_all()
1092                .build()
1093                .unwrap()
1094        };
1095
1096        let actor_manager = Arc::new(StreamActorManager {
1097            env: env.clone(),
1098            streaming_metrics,
1099            watermark_epoch,
1100            await_tree_reg,
1101            runtime: runtime.into(),
1102        });
1103        let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1104        tokio::spawn(worker.run(actor_op_rx))
1105    }
1106}
1107
1108pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1109
1110impl<T> Clone for EventSender<T> {
1111    fn clone(&self) -> Self {
1112        Self(self.0.clone())
1113    }
1114}
1115
1116impl<T> EventSender<T> {
1117    pub(super) fn send_event(&self, event: T) {
1118        self.0.send(event).expect("should be able to send event")
1119    }
1120
1121    pub(super) async fn send_and_await<RSP>(
1122        &self,
1123        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1124    ) -> StreamResult<RSP> {
1125        let (tx, rx) = oneshot::channel();
1126        let event = make_event(tx);
1127        self.send_event(event);
1128        rx.await
1129            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1130    }
1131}
1132
1133pub(crate) enum NewOutputRequest {
1134    Local(permit::Sender),
1135    Remote(permit::Sender),
1136}
1137
1138#[cfg(test)]
1139pub(crate) mod barrier_test_utils {
1140    use assert_matches::assert_matches;
1141    use futures::StreamExt;
1142    use risingwave_pb::stream_service::streaming_control_stream_request::{
1143        InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1144    };
1145    use risingwave_pb::stream_service::{
1146        InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1147        streaming_control_stream_request, streaming_control_stream_response,
1148    };
1149    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1150    use tokio::sync::oneshot;
1151    use tokio_stream::wrappers::UnboundedReceiverStream;
1152    use tonic::Status;
1153
1154    use crate::executor::Barrier;
1155    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1156    use crate::task::{
1157        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1158    };
1159
1160    pub(crate) struct LocalBarrierTestEnv {
1161        pub local_barrier_manager: LocalBarrierManager,
1162        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1163        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1164        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1165    }
1166
1167    impl LocalBarrierTestEnv {
1168        pub(crate) async fn for_test() -> Self {
1169            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1170
1171            let (request_tx, request_rx) = unbounded_channel();
1172            let (response_tx, mut response_rx) = unbounded_channel();
1173
1174            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1175                handle: ControlStreamHandle::new(
1176                    response_tx,
1177                    UnboundedReceiverStream::new(request_rx).boxed(),
1178                ),
1179                init_request: InitRequest {
1180                    databases: vec![PbDatabaseInitialPartialGraph {
1181                        database_id: TEST_DATABASE_ID.database_id,
1182                        graphs: vec![PbInitialPartialGraph {
1183                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1184                            subscriptions: vec![],
1185                        }],
1186                    }],
1187                    term_id: "for_test".into(),
1188                },
1189            });
1190
1191            assert_matches!(
1192                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1193                streaming_control_stream_response::Response::Init(_)
1194            );
1195
1196            let local_barrier_manager = actor_op_tx
1197                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1198                .await
1199                .unwrap();
1200
1201            Self {
1202                local_barrier_manager,
1203                actor_op_tx,
1204                request_tx,
1205                response_rx,
1206            }
1207        }
1208
1209        pub(crate) fn inject_barrier(
1210            &self,
1211            barrier: &Barrier,
1212            actor_to_collect: impl IntoIterator<Item = ActorId>,
1213        ) {
1214            self.request_tx
1215                .send(Ok(StreamingControlStreamRequest {
1216                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1217                        InjectBarrierRequest {
1218                            request_id: "".to_owned(),
1219                            barrier: Some(barrier.to_protobuf()),
1220                            database_id: TEST_DATABASE_ID.database_id,
1221                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1222                            table_ids_to_sync: vec![],
1223                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1224                            actors_to_build: vec![],
1225                            subscriptions_to_add: vec![],
1226                            subscriptions_to_remove: vec![],
1227                        },
1228                    )),
1229                }))
1230                .unwrap();
1231        }
1232
1233        pub(crate) async fn flush_all_events(&self) {
1234            Self::flush_all_events_impl(&self.actor_op_tx).await
1235        }
1236
1237        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1238            let (tx, rx) = oneshot::channel();
1239            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1240            rx.await.unwrap()
1241        }
1242
1243        pub(crate) async fn take_pending_new_output_requests(
1244            &self,
1245            actor_id: ActorId,
1246        ) -> Vec<(ActorId, NewOutputRequest)> {
1247            self.actor_op_tx
1248                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1249                .await
1250                .unwrap()
1251        }
1252    }
1253}