risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29    PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource, PbLoadFinishedSource,
30    PbLocalSstableInfo,
31};
32use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
33use risingwave_storage::store_impl::AsHummock;
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tonic::{Code, Status};
40use tracing::warn;
41
42use self::managed_state::ManagedBarrierState;
43use crate::error::{ScoredStreamError, StreamError, StreamResult};
44#[cfg(test)]
45use crate::task::LocalBarrierManager;
46use crate::task::managed_state::BarrierToComplete;
47use crate::task::{
48    ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
49};
50pub mod managed_state;
51#[cfg(test)]
52mod tests;
53
54use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
55use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
56use risingwave_pb::stream_service::streaming_control_stream_request::{
57    InitRequest, Request, ResetDatabaseRequest,
58};
59use risingwave_pb::stream_service::streaming_control_stream_response::{
60    InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
61};
62use risingwave_pb::stream_service::{
63    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
64    StreamingControlStreamResponse, streaming_control_stream_response,
65};
66
67use crate::executor::Barrier;
68use crate::executor::exchange::permit::Receiver;
69use crate::executor::monitor::StreamingMetrics;
70use crate::task::barrier_worker::managed_state::{
71    DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
72    ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
73};
74
75/// If enabled, all actors will be grouped in the same tracing span within one epoch.
76/// Note that this option will significantly increase the overhead of tracing.
77pub const ENABLE_BARRIER_AGGREGATION: bool = false;
78
79/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::on_epoch_completed`].
80#[derive(Debug)]
81pub struct BarrierCompleteResult {
82    /// The result returned from `sync` of `StateStore`.
83    pub sync_result: Option<SyncResult>,
84
85    /// The updated creation progress of materialized view after this barrier.
86    pub create_mview_progress: Vec<PbCreateMviewProgress>,
87
88    /// The source IDs that have finished listing data for refreshable batch sources.
89    pub list_finished_source_ids: Vec<PbListFinishedSource>,
90
91    /// The source IDs that have finished loading data for refreshable batch sources.
92    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
93
94    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
95
96    /// The table IDs that should be truncated.
97    pub truncate_tables: Vec<TableId>,
98    /// The table IDs that have finished refresh.
99    pub refresh_finished_tables: Vec<TableId>,
100}
101
102/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
103/// Communicates with `ControlStreamManager` in meta.
104/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
105pub(super) struct ControlStreamHandle {
106    #[expect(clippy::type_complexity)]
107    pair: Option<(
108        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
109        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
110    )>,
111}
112
113impl ControlStreamHandle {
114    fn empty() -> Self {
115        Self { pair: None }
116    }
117
118    pub(super) fn new(
119        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
120        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
121    ) -> Self {
122        Self {
123            pair: Some((sender, request_stream)),
124        }
125    }
126
127    pub(super) fn connected(&self) -> bool {
128        self.pair.is_some()
129    }
130
131    fn reset_stream_with_err(&mut self, err: Status) {
132        if let Some((sender, _)) = self.pair.take() {
133            // Note: `TonicStatusWrapper` provides a better error report.
134            let err = TonicStatusWrapper::new(err);
135            warn!(error = %err.as_report(), "control stream reset with error");
136
137            let err = err.into_inner();
138            if sender.send(Err(err)).is_err() {
139                warn!("failed to notify reset of control stream");
140            }
141        }
142    }
143
144    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
145    /// by the meta service.
146    async fn shutdown_stream(&mut self) {
147        if let Some((sender, _)) = self.pair.take() {
148            if sender
149                .send(Ok(StreamingControlStreamResponse {
150                    response: Some(streaming_control_stream_response::Response::Shutdown(
151                        ShutdownResponse::default(),
152                    )),
153                }))
154                .is_err()
155            {
156                warn!("failed to notify shutdown of control stream");
157            } else {
158                tracing::info!("waiting for meta service to close control stream...");
159
160                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
161                // been acknowledged by the meta service for more precise error report.
162                //
163                // This is because the meta service will reset the control stream manager and
164                // drop the connection to us upon recovery. As a result, the receiver part of
165                // this sender will also be dropped, causing the stream to close.
166                sender.closed().await;
167            }
168        } else {
169            debug!("control stream has been reset, ignore shutdown");
170        }
171    }
172
173    pub(super) fn ack_reset_database(
174        &mut self,
175        database_id: DatabaseId,
176        root_err: Option<ScoredStreamError>,
177        reset_request_id: u32,
178    ) {
179        self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
180            database_id,
181            root_err: root_err.map(|err| PbScoredError {
182                err_msg: err.error.to_report_string(),
183                score: err.score.0,
184            }),
185            reset_request_id,
186        }));
187    }
188
189    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
190        if let Some((sender, _)) = self.pair.as_ref() {
191            if sender
192                .send(Ok(StreamingControlStreamResponse {
193                    response: Some(response),
194                }))
195                .is_err()
196            {
197                self.pair = None;
198                warn!("fail to send response. control stream reset");
199            }
200        } else {
201            debug!(?response, "control stream has been reset. ignore response");
202        }
203    }
204
205    async fn next_request(&mut self) -> StreamingControlStreamRequest {
206        if let Some((_, stream)) = &mut self.pair {
207            match stream.next().await {
208                Some(Ok(request)) => {
209                    return request;
210                }
211                Some(Err(e)) => self.reset_stream_with_err(
212                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
213                        .context("failed to get request")
214                        .to_status_unnamed(Code::Internal),
215                ),
216                None => self.reset_stream_with_err(Status::internal("end of stream")),
217            }
218        }
219        pending().await
220    }
221}
222
223/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
224///
225/// See [`crate::task`] for architecture overview.
226#[derive(strum_macros::Display)]
227pub(super) enum LocalActorOperation {
228    NewControlStream {
229        handle: ControlStreamHandle,
230        init_request: InitRequest,
231    },
232    TakeReceiver {
233        database_id: DatabaseId,
234        term_id: String,
235        ids: UpDownActorIds,
236        result_sender: oneshot::Sender<StreamResult<Receiver>>,
237    },
238    #[cfg(test)]
239    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
240    #[cfg(test)]
241    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
242    #[cfg(test)]
243    Flush(oneshot::Sender<()>),
244    InspectState {
245        result_sender: oneshot::Sender<String>,
246    },
247    Shutdown {
248        result_sender: oneshot::Sender<()>,
249    },
250}
251
252pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
253    managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
254    has_control_stream_connected: bool,
255}
256
257impl Display for LocalBarrierWorkerDebugInfo<'_> {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        writeln!(
260            f,
261            "\nhas_control_stream_connected: {}",
262            self.has_control_stream_connected
263        )?;
264
265        for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
266            writeln!(
267                f,
268                "database {} status: {} managed_barrier_state:\n{}",
269                database_id,
270                status,
271                managed_barrier_state
272                    .as_ref()
273                    .map(ToString::to_string)
274                    .unwrap_or_default()
275            )?;
276        }
277        Ok(())
278    }
279}
280
281/// [`LocalBarrierWorker`] manages barrier control flow.
282/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
283/// barriers to and collects them from all actors, and finally reports the progress.
284///
285/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
286///
287/// See [`crate::task`] for architecture overview.
288pub(super) struct LocalBarrierWorker {
289    /// Current barrier collection state.
290    pub(super) state: ManagedBarrierState,
291
292    /// Futures will be finished in the order of epoch in ascending order.
293    await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
294
295    control_stream_handle: ControlStreamHandle,
296
297    pub(super) actor_manager: Arc<StreamActorManager>,
298
299    pub(super) term_id: String,
300}
301
302impl LocalBarrierWorker {
303    pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
304        Self {
305            state: Default::default(),
306            await_epoch_completed_futures: Default::default(),
307            control_stream_handle: ControlStreamHandle::empty(),
308            actor_manager,
309            term_id,
310        }
311    }
312
313    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
314        LocalBarrierWorkerDebugInfo {
315            managed_barrier_state: self
316                .state
317                .databases
318                .iter()
319                .map(|(database_id, status)| {
320                    (*database_id, {
321                        match status {
322                            DatabaseStatus::ReceivedExchangeRequest(_) => {
323                                ("ReceivedExchangeRequest".to_owned(), None)
324                            }
325                            DatabaseStatus::Running(state) => {
326                                ("running".to_owned(), Some(state.to_debug_info()))
327                            }
328                            DatabaseStatus::Suspended(state) => {
329                                (format!("suspended: {:?}", state.suspend_time), None)
330                            }
331                            DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
332                            DatabaseStatus::Unspecified => {
333                                unreachable!()
334                            }
335                        }
336                    })
337                })
338                .collect(),
339            has_control_stream_connected: self.control_stream_handle.connected(),
340        }
341    }
342
343    async fn next_completed_epoch(
344        futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
345    ) -> (
346        DatabaseId,
347        PartialGraphId,
348        Barrier,
349        StreamResult<BarrierCompleteResult>,
350    ) {
351        poll_fn(|cx| {
352            for (database_id, futures) in &mut *futures {
353                if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
354                    futures.poll_next_unpin(cx)
355                {
356                    return Poll::Ready((*database_id, partial_graph_id, barrier, result));
357                }
358            }
359            Poll::Pending
360        })
361        .await
362    }
363
364    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
365        loop {
366            select! {
367                biased;
368                (database_id, event) = self.state.next_event() => {
369                    match event {
370                        ManagedBarrierStateEvent::BarrierCollected{
371                            partial_graph_id,
372                            barrier,
373                        } => {
374                            // update await_epoch_completed_futures
375                            // handled below in next_completed_epoch
376                            self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
377                        }
378                        ManagedBarrierStateEvent::ActorError{
379                            actor_id,
380                            err,
381                        } => {
382                            self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
383                        }
384                        ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
385                            self.ack_database_reset(database_id, Some(output), reset_request_id);
386                        }
387                    }
388                }
389                (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
390                    match result {
391                        Ok(result) => {
392                            self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
393                        }
394                        Err(err) => {
395                            // TODO: may only report as database failure instead of reset the stream
396                            // when the HummockUploader support partial recovery. Currently the HummockUploader
397                            // enter `Err` state and stop working until a global recovery to clear the uploader.
398                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
399                        }
400                    }
401                },
402                actor_op = actor_op_rx.recv() => {
403                    if let Some(actor_op) = actor_op {
404                        match actor_op {
405                            LocalActorOperation::NewControlStream { handle, init_request  } => {
406                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
407                                self.reset(init_request).await;
408                                self.control_stream_handle = handle;
409                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
410                            }
411                            LocalActorOperation::Shutdown { result_sender } => {
412                                if self.state.databases.values().any(|database| {
413                                    match database {
414                                        DatabaseStatus::Running(database) => {
415                                            !database.actor_states.is_empty()
416                                        }
417                                        DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
418                                            DatabaseStatus::ReceivedExchangeRequest(_) => {
419                                            false
420                                        }
421                                        DatabaseStatus::Unspecified => {
422                                            unreachable!()
423                                        }
424                                    }
425                                }) {
426                                    tracing::warn!(
427                                        "shutdown with running actors, scaling or migration will be triggered"
428                                    );
429                                }
430                                self.control_stream_handle.shutdown_stream().await;
431                                let _ = result_sender.send(());
432                            }
433                            actor_op => {
434                                self.handle_actor_op(actor_op);
435                            }
436                        }
437                    }
438                    else {
439                        break;
440                    }
441                },
442                request = self.control_stream_handle.next_request() => {
443                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
444                    if let Err((database_id, err)) = result {
445                        self.on_database_failure(database_id, None, err, "failed to inject barrier");
446                    }
447                },
448            }
449        }
450    }
451
452    fn handle_streaming_control_request(
453        &mut self,
454        request: Request,
455    ) -> Result<(), (DatabaseId, StreamError)> {
456        match request {
457            Request::InjectBarrier(req) => {
458                let database_id = req.database_id;
459                let result: StreamResult<()> = try {
460                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
461                    self.send_barrier(&barrier, req)?;
462                };
463                result.map_err(|e| (database_id, e))?;
464                Ok(())
465            }
466            Request::RemovePartialGraph(req) => {
467                self.remove_partial_graphs(
468                    req.database_id,
469                    req.partial_graph_ids.into_iter().map(PartialGraphId::new),
470                );
471                Ok(())
472            }
473            Request::CreatePartialGraph(req) => {
474                self.add_partial_graph(req.database_id, PartialGraphId::new(req.partial_graph_id));
475                Ok(())
476            }
477            Request::ResetDatabase(req) => {
478                self.reset_database(req);
479                Ok(())
480            }
481            Request::Init(_) => {
482                unreachable!()
483            }
484        }
485    }
486
487    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
488        match actor_op {
489            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
490                unreachable!("event {actor_op} should be handled separately in async context")
491            }
492            LocalActorOperation::TakeReceiver {
493                database_id,
494                term_id,
495                ids,
496                result_sender,
497            } => {
498                let err = if self.term_id != term_id {
499                    {
500                        warn!(
501                            ?ids,
502                            term_id,
503                            current_term_id = self.term_id,
504                            "take receiver on unmatched term_id"
505                        );
506                        anyhow!(
507                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
508                            ids,
509                            term_id,
510                            self.term_id
511                        )
512                    }
513                } else {
514                    match self.state.databases.entry(database_id) {
515                        Entry::Occupied(mut entry) => match entry.get_mut() {
516                            DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
517                                pending_requests.push((ids, result_sender));
518                                return;
519                            }
520                            DatabaseStatus::Running(database) => {
521                                let (upstream_actor_id, actor_id) = ids;
522                                database.new_actor_remote_output_request(
523                                    actor_id,
524                                    upstream_actor_id,
525                                    result_sender,
526                                );
527                                return;
528                            }
529                            DatabaseStatus::Suspended(_) => {
530                                anyhow!("database suspended")
531                            }
532                            DatabaseStatus::Resetting(_) => {
533                                anyhow!("database resetting")
534                            }
535                            DatabaseStatus::Unspecified => {
536                                unreachable!()
537                            }
538                        },
539                        Entry::Vacant(entry) => {
540                            entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
541                                ids,
542                                result_sender,
543                            )]));
544                            return;
545                        }
546                    }
547                };
548                let _ = result_sender.send(Err(err.into()));
549            }
550            #[cfg(test)]
551            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
552                let database_status = self
553                    .state
554                    .databases
555                    .get(&crate::task::TEST_DATABASE_ID)
556                    .unwrap();
557                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
558                let _ = sender.send(database_state.local_barrier_manager.clone());
559            }
560            #[cfg(test)]
561            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
562                let database_status = self
563                    .state
564                    .databases
565                    .get_mut(&crate::task::TEST_DATABASE_ID)
566                    .unwrap();
567
568                let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
569                assert!(!database_state.actor_states.contains_key(&actor_id));
570                let requests = database_state
571                    .actor_pending_new_output_requests
572                    .remove(&actor_id)
573                    .unwrap();
574                let _ = sender.send(requests);
575            }
576            #[cfg(test)]
577            LocalActorOperation::Flush(sender) => {
578                use futures::FutureExt;
579                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
580                    self.handle_streaming_control_request(
581                        request.request.expect("should not be empty"),
582                    )
583                    .unwrap();
584                }
585                while let Some((database_id, event)) = self.state.next_event().now_or_never() {
586                    match event {
587                        ManagedBarrierStateEvent::BarrierCollected {
588                            partial_graph_id,
589                            barrier,
590                        } => {
591                            self.complete_barrier(
592                                database_id,
593                                partial_graph_id,
594                                barrier.epoch.prev,
595                            );
596                        }
597                        ManagedBarrierStateEvent::ActorError { .. }
598                        | ManagedBarrierStateEvent::DatabaseReset(..) => {
599                            unreachable!()
600                        }
601                    }
602                }
603                sender.send(()).unwrap()
604            }
605            LocalActorOperation::InspectState { result_sender } => {
606                let debug_info = self.to_debug_info();
607                let _ = result_sender.send(debug_info.to_string());
608            }
609        }
610    }
611}
612
613mod await_epoch_completed_future {
614    use std::future::Future;
615
616    use futures::FutureExt;
617    use futures::future::BoxFuture;
618    use risingwave_common::id::TableId;
619    use risingwave_hummock_sdk::SyncResult;
620    use risingwave_pb::stream_service::barrier_complete_response::{
621        PbCdcTableBackfillProgress, PbCreateMviewProgress, PbListFinishedSource,
622        PbLoadFinishedSource,
623    };
624
625    use crate::error::StreamResult;
626    use crate::executor::Barrier;
627    use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
628
629    pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
630        + 'static;
631
632    #[define_opaque(AwaitEpochCompletedFuture)]
633    #[expect(clippy::too_many_arguments)]
634    pub(super) fn instrument_complete_barrier_future(
635        partial_graph_id: PartialGraphId,
636        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
637        barrier: Barrier,
638        barrier_await_tree_reg: Option<&await_tree::Registry>,
639        create_mview_progress: Vec<PbCreateMviewProgress>,
640        list_finished_source_ids: Vec<PbListFinishedSource>,
641        load_finished_source_ids: Vec<PbLoadFinishedSource>,
642        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
643        truncate_tables: Vec<TableId>,
644        refresh_finished_tables: Vec<TableId>,
645    ) -> AwaitEpochCompletedFuture {
646        let prev_epoch = barrier.epoch.prev;
647        let future = async move {
648            if let Some(future) = complete_barrier_future {
649                let result = future.await;
650                result.map(Some)
651            } else {
652                Ok(None)
653            }
654        }
655        .map(move |result| {
656            (
657                partial_graph_id,
658                barrier,
659                result.map(|sync_result| BarrierCompleteResult {
660                    sync_result,
661                    create_mview_progress,
662                    list_finished_source_ids,
663                    load_finished_source_ids,
664                    cdc_table_backfill_progress,
665                    truncate_tables,
666                    refresh_finished_tables,
667                }),
668            )
669        });
670        if let Some(reg) = barrier_await_tree_reg {
671            reg.register(
672                await_tree_key::BarrierAwait { prev_epoch },
673                format!("SyncEpoch({})", prev_epoch),
674            )
675            .instrument(future)
676            .left_future()
677        } else {
678            future.right_future()
679        }
680    }
681}
682
683use await_epoch_completed_future::*;
684use risingwave_common::catalog::{DatabaseId, TableId};
685use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
686use risingwave_storage::{StateStoreImpl, dispatch_state_store};
687
688use crate::executor::exchange::permit;
689
690fn sync_epoch(
691    state_store: &StateStoreImpl,
692    streaming_metrics: &StreamingMetrics,
693    prev_epoch: u64,
694    table_ids: HashSet<TableId>,
695) -> BoxFuture<'static, StreamResult<SyncResult>> {
696    let timer = streaming_metrics.barrier_sync_latency.start_timer();
697
698    let state_store = state_store.clone();
699    let future = async move {
700        dispatch_state_store!(state_store, hummock, {
701            hummock.sync(vec![(prev_epoch, table_ids)]).await
702        })
703    };
704
705    future
706        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
707        .inspect_ok(move |_| {
708            timer.observe_duration();
709        })
710        .map_err(move |e| {
711            tracing::error!(
712                prev_epoch,
713                error = %e.as_report(),
714                "Failed to sync state store",
715            );
716            e.into()
717        })
718        .boxed()
719}
720
721impl LocalBarrierWorker {
722    fn complete_barrier(
723        &mut self,
724        database_id: DatabaseId,
725        partial_graph_id: PartialGraphId,
726        prev_epoch: u64,
727    ) {
728        {
729            let Some(database_state) = self
730                .state
731                .databases
732                .get_mut(&database_id)
733                .expect("should exist")
734                .state_for_request()
735            else {
736                return;
737            };
738            let BarrierToComplete {
739                barrier,
740                table_ids,
741                create_mview_progress,
742                list_finished_source_ids,
743                load_finished_source_ids,
744                cdc_table_backfill_progress,
745                truncate_tables,
746                refresh_finished_tables,
747            } = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
748
749            let complete_barrier_future = match &barrier.kind {
750                BarrierKind::Unspecified => unreachable!(),
751                BarrierKind::Initial => {
752                    tracing::info!(
753                        epoch = prev_epoch,
754                        "ignore sealing data for the first barrier"
755                    );
756                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
757                    None
758                }
759                BarrierKind::Barrier => None,
760                BarrierKind::Checkpoint => Some(sync_epoch(
761                    &self.actor_manager.env.state_store(),
762                    &self.actor_manager.streaming_metrics,
763                    prev_epoch,
764                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
765                )),
766            };
767
768            self.await_epoch_completed_futures
769                .entry(database_id)
770                .or_default()
771                .push_back({
772                    instrument_complete_barrier_future(
773                        partial_graph_id,
774                        complete_barrier_future,
775                        barrier,
776                        self.actor_manager.await_tree_reg.as_ref(),
777                        create_mview_progress,
778                        list_finished_source_ids,
779                        load_finished_source_ids,
780                        cdc_table_backfill_progress,
781                        truncate_tables,
782                        refresh_finished_tables,
783                    )
784                });
785        }
786    }
787
788    fn on_epoch_completed(
789        &mut self,
790        database_id: DatabaseId,
791        partial_graph_id: PartialGraphId,
792        epoch: u64,
793        result: BarrierCompleteResult,
794    ) {
795        let BarrierCompleteResult {
796            create_mview_progress,
797            sync_result,
798            list_finished_source_ids,
799            load_finished_source_ids,
800            cdc_table_backfill_progress,
801            truncate_tables,
802            refresh_finished_tables,
803        } = result;
804
805        let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
806            .map(|sync_result| {
807                (
808                    sync_result.uncommitted_ssts,
809                    sync_result.table_watermarks,
810                    sync_result.old_value_ssts,
811                    sync_result.vector_index_adds,
812                )
813            })
814            .unwrap_or_default();
815
816        let result = {
817            {
818                streaming_control_stream_response::Response::CompleteBarrier(
819                    BarrierCompleteResponse {
820                        request_id: "todo".to_owned(),
821                        partial_graph_id: partial_graph_id.into(),
822                        epoch,
823                        status: None,
824                        create_mview_progress,
825                        synced_sstables: synced_sstables
826                            .into_iter()
827                            .map(
828                                |LocalSstableInfo {
829                                     sst_info,
830                                     table_stats,
831                                     created_at,
832                                 }| PbLocalSstableInfo {
833                                    sst: Some(sst_info.into()),
834                                    table_stats_map: to_prost_table_stats_map(table_stats),
835                                    created_at,
836                                },
837                            )
838                            .collect_vec(),
839                        worker_id: self.actor_manager.env.worker_id(),
840                        table_watermarks: table_watermarks
841                            .into_iter()
842                            .map(|(key, value)| (key, value.into()))
843                            .collect(),
844                        old_value_sstables: old_value_ssts
845                            .into_iter()
846                            .map(|sst| sst.sst_info.into())
847                            .collect(),
848                        database_id,
849                        list_finished_sources: list_finished_source_ids,
850                        load_finished_sources: load_finished_source_ids,
851                        vector_index_adds: vector_index_adds
852                            .into_iter()
853                            .map(|(table_id, adds)| {
854                                (
855                                    table_id,
856                                    PbVectorIndexAdds {
857                                        adds: adds.into_iter().map(|add| add.into()).collect(),
858                                    },
859                                )
860                            })
861                            .collect(),
862                        cdc_table_backfill_progress,
863                        truncate_tables,
864                        refresh_finished_tables,
865                    },
866                )
867            }
868        };
869
870        self.control_stream_handle.send_response(result);
871    }
872
873    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
874    /// barrier is finished, in managed mode.
875    ///
876    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
877    /// the root cause of the failure. The caller should then call `try_find_root_failure`
878    /// to find the root cause.
879    fn send_barrier(
880        &mut self,
881        barrier: &Barrier,
882        request: InjectBarrierRequest,
883    ) -> StreamResult<()> {
884        debug!(
885            target: "events::stream::barrier::manager::send",
886            "send barrier {:?}, actor_ids_to_collect = {:?}",
887            barrier,
888            request.actor_ids_to_collect
889        );
890
891        let database_status = self
892            .state
893            .databases
894            .get_mut(&request.database_id)
895            .expect("should exist");
896        if let Some(state) = database_status.state_for_request() {
897            state.transform_to_issued(barrier, request)?;
898        }
899        Ok(())
900    }
901
902    fn remove_partial_graphs(
903        &mut self,
904        database_id: DatabaseId,
905        partial_graph_ids: impl Iterator<Item = PartialGraphId>,
906    ) {
907        let Some(database_status) = self.state.databases.get_mut(&database_id) else {
908            warn!(
909                %database_id,
910                "database to remove partial graph not exist"
911            );
912            return;
913        };
914        let Some(database_state) = database_status.state_for_request() else {
915            warn!(
916                %database_id,
917                "ignore remove partial graph request on err database",
918            );
919            return;
920        };
921        for partial_graph_id in partial_graph_ids {
922            if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
923                assert!(
924                    graph.is_empty(),
925                    "non empty graph to be removed: {}",
926                    &graph
927                );
928            } else {
929                warn!(
930                    partial_graph_id = partial_graph_id.0,
931                    "no partial graph to remove"
932                );
933            }
934        }
935    }
936
937    fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
938        let status = match self.state.databases.entry(database_id) {
939            Entry::Occupied(entry) => {
940                let status = entry.into_mut();
941                if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
942                    let mut database = DatabaseManagedBarrierState::new(
943                        database_id,
944                        self.term_id.clone(),
945                        self.actor_manager.clone(),
946                    );
947                    for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
948                    {
949                        database.new_actor_remote_output_request(
950                            actor_id,
951                            upstream_actor_id,
952                            result_sender,
953                        );
954                    }
955                    *status = DatabaseStatus::Running(database);
956                }
957
958                status
959            }
960            Entry::Vacant(entry) => {
961                entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
962                    database_id,
963                    self.term_id.clone(),
964                    self.actor_manager.clone(),
965                )))
966            }
967        };
968        if let Some(state) = status.state_for_request() {
969            assert!(
970                state
971                    .graph_states
972                    .insert(
973                        partial_graph_id,
974                        PartialGraphManagedBarrierState::new(&self.actor_manager)
975                    )
976                    .is_none()
977            );
978        }
979    }
980
981    fn reset_database(&mut self, req: ResetDatabaseRequest) {
982        let database_id = req.database_id;
983        if let Some(database_status) = self.state.databases.get_mut(&database_id) {
984            database_status.start_reset(
985                database_id,
986                self.await_epoch_completed_futures.remove(&database_id),
987                req.reset_request_id,
988            );
989        } else {
990            self.ack_database_reset(database_id, None, req.reset_request_id);
991        }
992    }
993
994    fn ack_database_reset(
995        &mut self,
996        database_id: DatabaseId,
997        reset_output: Option<ResetDatabaseOutput>,
998        reset_request_id: u32,
999    ) {
1000        info!(
1001            %database_id,
1002            "database reset successfully"
1003        );
1004        if let Some(reset_database) = self.state.databases.remove(&database_id) {
1005            match reset_database {
1006                DatabaseStatus::Resetting(_) => {}
1007                _ => {
1008                    unreachable!("must be resetting previously")
1009                }
1010            }
1011        }
1012        self.await_epoch_completed_futures.remove(&database_id);
1013        self.control_stream_handle.ack_reset_database(
1014            database_id,
1015            reset_output.and_then(|output| output.root_err),
1016            reset_request_id,
1017        );
1018    }
1019
1020    /// When some other failure happens (like failed to send barrier), the error is reported using
1021    /// this function. The control stream will be responded with a message to notify about the error,
1022    /// and the global barrier worker will later reset and rerun the database.
1023    fn on_database_failure(
1024        &mut self,
1025        database_id: DatabaseId,
1026        failed_actor: Option<ActorId>,
1027        err: StreamError,
1028        message: impl Into<String>,
1029    ) {
1030        let message = message.into();
1031        error!(%database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1032        let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1033        self.state
1034            .databases
1035            .get_mut(&database_id)
1036            .expect("should exist")
1037            .suspend(failed_actor, err, completing_futures);
1038        self.control_stream_handle
1039            .send_response(Response::ReportDatabaseFailure(
1040                ReportDatabaseFailureResponse { database_id },
1041            ));
1042    }
1043
1044    /// Force stop all actors on this worker, and then drop their resources.
1045    async fn reset(&mut self, init_request: InitRequest) {
1046        join_all(
1047            self.state
1048                .databases
1049                .values_mut()
1050                .map(|database| database.abort()),
1051        )
1052        .await;
1053        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1054            m.clear();
1055        }
1056
1057        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1058            hummock
1059                .clear_shared_buffer()
1060                .instrument_await("store_clear_shared_buffer".verbose())
1061                .await
1062        }
1063        self.actor_manager.env.dml_manager_ref().clear();
1064        *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1065        self.actor_manager.env.client_pool().invalidate_all();
1066    }
1067
1068    /// Create a [`LocalBarrierWorker`] with managed mode.
1069    pub fn spawn(
1070        env: StreamEnvironment,
1071        streaming_metrics: Arc<StreamingMetrics>,
1072        await_tree_reg: Option<await_tree::Registry>,
1073        watermark_epoch: AtomicU64Ref,
1074        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1075    ) -> JoinHandle<()> {
1076        let runtime = {
1077            let mut builder = tokio::runtime::Builder::new_multi_thread();
1078            if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1079                builder.worker_threads(worker_threads_num);
1080            }
1081            builder
1082                .thread_name("rw-streaming")
1083                .enable_all()
1084                .build()
1085                .unwrap()
1086        };
1087
1088        let actor_manager = Arc::new(StreamActorManager {
1089            env,
1090            streaming_metrics,
1091            watermark_epoch,
1092            await_tree_reg,
1093            runtime: runtime.into(),
1094        });
1095        let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1096        tokio::spawn(worker.run(actor_op_rx))
1097    }
1098}
1099
1100pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1101
1102impl<T> Clone for EventSender<T> {
1103    fn clone(&self) -> Self {
1104        Self(self.0.clone())
1105    }
1106}
1107
1108impl<T> EventSender<T> {
1109    pub(super) fn send_event(&self, event: T) {
1110        self.0.send(event).expect("should be able to send event")
1111    }
1112
1113    pub(super) async fn send_and_await<RSP>(
1114        &self,
1115        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1116    ) -> StreamResult<RSP> {
1117        let (tx, rx) = oneshot::channel();
1118        let event = make_event(tx);
1119        self.send_event(event);
1120        rx.await
1121            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1122    }
1123}
1124
1125pub(crate) enum NewOutputRequest {
1126    Local(permit::Sender),
1127    Remote(permit::Sender),
1128}
1129
1130#[cfg(test)]
1131pub(crate) mod barrier_test_utils {
1132    use assert_matches::assert_matches;
1133    use futures::StreamExt;
1134    use risingwave_pb::stream_service::streaming_control_stream_request::{
1135        InitRequest, PbCreatePartialGraphRequest,
1136    };
1137    use risingwave_pb::stream_service::{
1138        InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1139        StreamingControlStreamResponse, streaming_control_stream_request,
1140        streaming_control_stream_response,
1141    };
1142    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1143    use tokio::sync::oneshot;
1144    use tokio_stream::wrappers::UnboundedReceiverStream;
1145    use tonic::Status;
1146
1147    use crate::executor::Barrier;
1148    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1149    use crate::task::{
1150        ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1151    };
1152
1153    pub(crate) struct LocalBarrierTestEnv {
1154        pub local_barrier_manager: LocalBarrierManager,
1155        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1156        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1157        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1158    }
1159
1160    impl LocalBarrierTestEnv {
1161        pub(crate) async fn for_test() -> Self {
1162            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1163
1164            let (request_tx, request_rx) = unbounded_channel();
1165            let (response_tx, mut response_rx) = unbounded_channel();
1166
1167            request_tx
1168                .send(Ok(PbStreamingControlStreamRequest {
1169                    request: Some(
1170                        streaming_control_stream_request::Request::CreatePartialGraph(
1171                            PbCreatePartialGraphRequest {
1172                                partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1173                                database_id: TEST_DATABASE_ID,
1174                            },
1175                        ),
1176                    ),
1177                }))
1178                .unwrap();
1179
1180            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1181                handle: ControlStreamHandle::new(
1182                    response_tx,
1183                    UnboundedReceiverStream::new(request_rx).boxed(),
1184                ),
1185                init_request: InitRequest {
1186                    term_id: "for_test".into(),
1187                },
1188            });
1189
1190            assert_matches!(
1191                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1192                streaming_control_stream_response::Response::Init(_)
1193            );
1194
1195            let local_barrier_manager = actor_op_tx
1196                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1197                .await
1198                .unwrap();
1199
1200            Self {
1201                local_barrier_manager,
1202                actor_op_tx,
1203                request_tx,
1204                response_rx,
1205            }
1206        }
1207
1208        pub(crate) fn inject_barrier(
1209            &self,
1210            barrier: &Barrier,
1211            actor_to_collect: impl IntoIterator<Item = ActorId>,
1212        ) {
1213            self.request_tx
1214                .send(Ok(StreamingControlStreamRequest {
1215                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1216                        InjectBarrierRequest {
1217                            request_id: "".to_owned(),
1218                            barrier: Some(barrier.to_protobuf()),
1219                            database_id: TEST_DATABASE_ID,
1220                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1221                            table_ids_to_sync: vec![],
1222                            partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1223                            actors_to_build: vec![],
1224                        },
1225                    )),
1226                }))
1227                .unwrap();
1228        }
1229
1230        pub(crate) async fn flush_all_events(&self) {
1231            Self::flush_all_events_impl(&self.actor_op_tx).await
1232        }
1233
1234        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1235            let (tx, rx) = oneshot::channel();
1236            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1237            rx.await.unwrap()
1238        }
1239
1240        pub(crate) async fn take_pending_new_output_requests(
1241            &self,
1242            actor_id: ActorId,
1243        ) -> Vec<(ActorId, NewOutputRequest)> {
1244            self.actor_op_tx
1245                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1246                .await
1247                .unwrap()
1248        }
1249    }
1250}