risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::{BoxFuture, join, join_all};
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_pb::stream_plan::barrier::BarrierKind;
29use risingwave_pb::stream_service::barrier_complete_response::{
30    PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
31    PbListFinishedSource, PbLoadFinishedSource, PbLocalSstableInfo,
32};
33use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
34use risingwave_storage::store_impl::AsHummock;
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tokio::{select, spawn};
40use tonic::{Code, Status};
41use tracing::warn;
42
43use self::managed_state::ManagedBarrierState;
44use crate::error::{ScoredStreamError, StreamError, StreamResult};
45#[cfg(test)]
46use crate::task::LocalBarrierManager;
47use crate::task::managed_state::{BarrierToComplete, ResetPartialGraphOutput};
48use crate::task::{
49    ActorId, AtomicU64Ref, CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY, ConfigOverrideCache,
50    PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
51};
52pub mod managed_state;
53#[cfg(test)]
54mod tests;
55
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59    InitRequest, Request, ResetPartialGraphsRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62    InitResponse, ReportPartialGraphFailureResponse, ResetPartialGraphResponse, Response,
63    ShutdownResponse,
64};
65use risingwave_pb::stream_service::{
66    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
67    StreamingControlStreamResponse, streaming_control_stream_response,
68};
69
70use crate::executor::Barrier;
71use crate::executor::exchange::permit::Receiver;
72use crate::executor::monitor::StreamingMetrics;
73use crate::task::barrier_worker::managed_state::{
74    ManagedBarrierStateDebugInfo, ManagedBarrierStateEvent, PartialGraphState, PartialGraphStatus,
75};
76
77/// If enabled, all actors will be grouped in the same tracing span within one epoch.
78/// Note that this option will significantly increase the overhead of tracing.
79pub const ENABLE_BARRIER_AGGREGATION: bool = false;
80
81/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::on_epoch_completed`].
82#[derive(Debug)]
83pub struct BarrierCompleteResult {
84    /// The result returned from `sync` of `StateStore`.
85    pub sync_result: Option<SyncResult>,
86
87    /// The updated creation progress of materialized view after this barrier.
88    pub create_mview_progress: Vec<PbCreateMviewProgress>,
89
90    /// The source IDs that have finished listing data for refreshable batch sources.
91    pub list_finished_source_ids: Vec<PbListFinishedSource>,
92
93    /// The source IDs that have finished loading data for refreshable batch sources.
94    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
95
96    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
97
98    /// CDC sources that have updated their offset at least once.
99    pub cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
100
101    /// The table IDs that should be truncated.
102    pub truncate_tables: Vec<TableId>,
103    /// The table IDs that have finished refresh.
104    pub refresh_finished_tables: Vec<TableId>,
105}
106
107/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
108/// Communicates with `ControlStreamManager` in meta.
109/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
110pub(super) struct ControlStreamHandle {
111    #[expect(clippy::type_complexity)]
112    pair: Option<(
113        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
114        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
115    )>,
116}
117
118impl ControlStreamHandle {
119    fn empty() -> Self {
120        Self { pair: None }
121    }
122
123    pub(super) fn new(
124        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
125        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
126    ) -> Self {
127        Self {
128            pair: Some((sender, request_stream)),
129        }
130    }
131
132    pub(super) fn connected(&self) -> bool {
133        self.pair.is_some()
134    }
135
136    fn reset_stream_with_err(&mut self, err: Status) {
137        if let Some((sender, _)) = self.pair.take() {
138            // Note: `TonicStatusWrapper` provides a better error report.
139            let err = TonicStatusWrapper::new(err);
140            warn!(error = %err.as_report(), "control stream reset with error");
141
142            let err = err.into_inner();
143            if sender.send(Err(err)).is_err() {
144                warn!("failed to notify reset of control stream");
145            }
146        }
147    }
148
149    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
150    /// by the meta service.
151    async fn shutdown_stream(&mut self) {
152        if let Some((sender, _)) = self.pair.take() {
153            if sender
154                .send(Ok(StreamingControlStreamResponse {
155                    response: Some(streaming_control_stream_response::Response::Shutdown(
156                        ShutdownResponse::default(),
157                    )),
158                }))
159                .is_err()
160            {
161                warn!("failed to notify shutdown of control stream");
162            } else {
163                tracing::info!("waiting for meta service to close control stream...");
164
165                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
166                // been acknowledged by the meta service for more precise error report.
167                //
168                // This is because the meta service will reset the control stream manager and
169                // drop the connection to us upon recovery. As a result, the receiver part of
170                // this sender will also be dropped, causing the stream to close.
171                sender.closed().await;
172            }
173        } else {
174            debug!("control stream has been reset, ignore shutdown");
175        }
176    }
177
178    pub(super) fn ack_reset_partial_graph(
179        &mut self,
180        partial_graph_id: PartialGraphId,
181        root_err: Option<ScoredStreamError>,
182    ) {
183        self.send_response(Response::ResetPartialGraph(ResetPartialGraphResponse {
184            partial_graph_id,
185            root_err: root_err.map(|err| PbScoredError {
186                err_msg: err.error.to_report_string(),
187                score: err.score.0,
188            }),
189        }));
190    }
191
192    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
193        if let Some((sender, _)) = self.pair.as_ref() {
194            if sender
195                .send(Ok(StreamingControlStreamResponse {
196                    response: Some(response),
197                }))
198                .is_err()
199            {
200                self.pair = None;
201                warn!("fail to send response. control stream reset");
202            }
203        } else {
204            debug!(?response, "control stream has been reset. ignore response");
205        }
206    }
207
208    async fn next_request(&mut self) -> StreamingControlStreamRequest {
209        if let Some((_, stream)) = &mut self.pair {
210            match stream.next().await {
211                Some(Ok(request)) => {
212                    return request;
213                }
214                Some(Err(e)) => self.reset_stream_with_err(
215                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
216                        .context("failed to get request")
217                        .to_status_unnamed(Code::Internal),
218                ),
219                None => self.reset_stream_with_err(Status::internal("end of stream")),
220            }
221        }
222        pending().await
223    }
224}
225
226pub(super) enum TakeReceiverRequest {
227    Remote(oneshot::Sender<StreamResult<Receiver>>),
228    Local(permit::Sender),
229}
230
231/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
232///
233/// See [`crate::task`] for architecture overview.
234#[derive(strum_macros::Display)]
235pub(super) enum LocalActorOperation {
236    NewControlStream {
237        handle: ControlStreamHandle,
238        init_request: InitRequest,
239    },
240    TakeReceiver {
241        partial_graph_id: PartialGraphId,
242        term_id: String,
243        ids: UpDownActorIds,
244        request: TakeReceiverRequest,
245    },
246    #[cfg(test)]
247    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
248    #[cfg(test)]
249    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
250    #[cfg(test)]
251    Flush(oneshot::Sender<()>),
252    InspectState {
253        result_sender: oneshot::Sender<String>,
254    },
255    Shutdown {
256        result_sender: oneshot::Sender<()>,
257    },
258}
259
260pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
261    managed_barrier_state:
262        HashMap<PartialGraphId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
263    has_control_stream_connected: bool,
264}
265
266impl Display for LocalBarrierWorkerDebugInfo<'_> {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        writeln!(
269            f,
270            "\nhas_control_stream_connected: {}",
271            self.has_control_stream_connected
272        )?;
273
274        for (partial_graph_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
275            writeln!(
276                f,
277                "partial graph {} status: {} managed_barrier_state:\n{}",
278                partial_graph_id,
279                status,
280                managed_barrier_state
281                    .as_ref()
282                    .map(ToString::to_string)
283                    .unwrap_or_default()
284            )?;
285        }
286        Ok(())
287    }
288}
289
290/// [`LocalBarrierWorker`] manages barrier control flow.
291/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
292/// barriers to and collects them from all actors, and finally reports the progress.
293///
294/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
295///
296/// See [`crate::task`] for architecture overview.
297pub(super) struct LocalBarrierWorker {
298    /// Current barrier collection state.
299    pub(super) state: ManagedBarrierState,
300
301    /// Futures will be finished in the order of epoch in ascending order.
302    await_epoch_completed_futures:
303        HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
304
305    control_stream_handle: ControlStreamHandle,
306
307    pub(super) actor_manager: Arc<StreamActorManager>,
308
309    pub(super) term_id: String,
310}
311
312impl LocalBarrierWorker {
313    pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
314        Self {
315            state: Default::default(),
316            await_epoch_completed_futures: Default::default(),
317            control_stream_handle: ControlStreamHandle::empty(),
318            actor_manager,
319            term_id,
320        }
321    }
322
323    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
324        LocalBarrierWorkerDebugInfo {
325            managed_barrier_state: self
326                .state
327                .partial_graphs
328                .iter()
329                .map(|(partial_graph_id, status)| {
330                    (*partial_graph_id, {
331                        match status {
332                            PartialGraphStatus::ReceivedExchangeRequest(_) => {
333                                ("ReceivedExchangeRequest".to_owned(), None)
334                            }
335                            PartialGraphStatus::Running(state) => {
336                                ("running".to_owned(), Some(state.to_debug_info()))
337                            }
338                            PartialGraphStatus::Suspended(state) => {
339                                (format!("suspended: {:?}", state.suspend_time), None)
340                            }
341                            PartialGraphStatus::Resetting => ("resetting".to_owned(), None),
342                            PartialGraphStatus::Unspecified => {
343                                unreachable!()
344                            }
345                        }
346                    })
347                })
348                .collect(),
349            has_control_stream_connected: self.control_stream_handle.connected(),
350        }
351    }
352
353    async fn next_completed_epoch(
354        futures: &mut HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
355    ) -> (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>) {
356        poll_fn(|cx| {
357            for (partial_graph_id, futures) in &mut *futures {
358                if let Poll::Ready(Some((barrier, result))) = futures.poll_next_unpin(cx) {
359                    return Poll::Ready((*partial_graph_id, barrier, result));
360                }
361            }
362            Poll::Pending
363        })
364        .await
365    }
366
367    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
368        loop {
369            select! {
370                biased;
371                event = self.state.next_event() => {
372                    match event {
373                        ManagedBarrierStateEvent::BarrierCollected{
374                            partial_graph_id,
375                            barrier,
376                        } => {
377                            // update await_epoch_completed_futures
378                            // handled below in next_completed_epoch
379                            self.complete_barrier(partial_graph_id, barrier.epoch.prev);
380                        }
381                        ManagedBarrierStateEvent::ActorError{
382                            partial_graph_id,
383                            actor_id,
384                            err,
385                        } => {
386                            self.on_partial_graph_failure(partial_graph_id, Some(actor_id), err, "recv actor failure");
387                        }
388                        ManagedBarrierStateEvent::PartialGraphsReset(output) => {
389                            for (partial_graph_id, output) in output {
390                                self.ack_partial_graph_reset(partial_graph_id, Some(output));
391                            }
392                        }
393                        ManagedBarrierStateEvent::RegisterLocalUpstreamOutput{
394                            actor_id,
395                            upstream_actor_id,
396                            upstream_partial_graph_id,
397                            tx
398                        } => {
399                            self.handle_actor_op(LocalActorOperation::TakeReceiver {
400                                partial_graph_id: upstream_partial_graph_id,
401                                term_id: self.term_id.clone(),
402                                ids: (upstream_actor_id, actor_id),
403                                request: TakeReceiverRequest::Local(tx),
404                            });
405                        }
406                    }
407                }
408                (partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
409                    match result {
410                        Ok(result) => {
411                            self.on_epoch_completed(partial_graph_id, barrier.epoch.prev, result);
412                        }
413                        Err(err) => {
414                            // TODO: may only report as partial graph failure instead of reset the stream
415                            // when the HummockUploader support partial recovery. Currently the HummockUploader
416                            // enter `Err` state and stop working until a global recovery to clear the uploader.
417                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {:?} {:?}", partial_graph_id, barrier.epoch, err.as_report())));
418                        }
419                    }
420                },
421                actor_op = actor_op_rx.recv() => {
422                    if let Some(actor_op) = actor_op {
423                        match actor_op {
424                            LocalActorOperation::NewControlStream { handle, init_request  } => {
425                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
426                                self.reset(init_request).await;
427                                self.control_stream_handle = handle;
428                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
429                            }
430                            LocalActorOperation::Shutdown { result_sender } => {
431                                if self.state.partial_graphs.values().any(|graph| {
432                                    match graph {
433                                        PartialGraphStatus::Running(graph) => {
434                                            !graph.actor_states.is_empty()
435                                        }
436                                        PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting |
437                                            PartialGraphStatus::ReceivedExchangeRequest(_) => {
438                                            false
439                                        }
440                                        PartialGraphStatus::Unspecified => {
441                                            unreachable!()
442                                        }
443                                    }
444                                }) {
445                                    tracing::warn!(
446                                        "shutdown with running actors, scaling or migration will be triggered"
447                                    );
448                                }
449                                self.control_stream_handle.shutdown_stream().await;
450                                let _ = result_sender.send(());
451                            }
452                            actor_op => {
453                                self.handle_actor_op(actor_op);
454                            }
455                        }
456                    }
457                    else {
458                        break;
459                    }
460                },
461                request = self.control_stream_handle.next_request() => {
462                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
463                    if let Err((partial_graph_id, err)) = result {
464                        self.on_partial_graph_failure(partial_graph_id, None, err, "failed to inject barrier");
465                    }
466                },
467            }
468        }
469    }
470
471    fn handle_streaming_control_request(
472        &mut self,
473        request: Request,
474    ) -> Result<(), (PartialGraphId, StreamError)> {
475        match request {
476            Request::InjectBarrier(req) => {
477                let partial_graph_id = req.partial_graph_id;
478                let result: StreamResult<()> = try {
479                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
480                    self.send_barrier(&barrier, req)?;
481                };
482                result.map_err(|e| (partial_graph_id, e))?;
483                Ok(())
484            }
485            Request::RemovePartialGraph(req) => {
486                self.remove_partial_graphs(req.partial_graph_ids);
487                Ok(())
488            }
489            Request::CreatePartialGraph(req) => {
490                self.add_partial_graph(req.partial_graph_id);
491                Ok(())
492            }
493            Request::ResetPartialGraphs(req) => {
494                self.reset_partial_graphs(req);
495                Ok(())
496            }
497            Request::Init(_) => {
498                unreachable!()
499            }
500        }
501    }
502
503    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
504        match actor_op {
505            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
506                unreachable!("event {actor_op} should be handled separately in async context")
507            }
508            LocalActorOperation::TakeReceiver {
509                partial_graph_id,
510                term_id,
511                ids,
512                request,
513            } => {
514                let err = if self.term_id != term_id {
515                    {
516                        warn!(
517                            ?ids,
518                            term_id,
519                            current_term_id = self.term_id,
520                            "take receiver on unmatched term_id"
521                        );
522                        anyhow!(
523                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
524                            ids,
525                            term_id,
526                            self.term_id
527                        )
528                    }
529                } else {
530                    match self.state.partial_graphs.entry(partial_graph_id) {
531                        Entry::Occupied(mut entry) => match entry.get_mut() {
532                            PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
533                                pending_requests.push((ids, request));
534                                return;
535                            }
536                            PartialGraphStatus::Running(graph) => {
537                                let (upstream_actor_id, actor_id) = ids;
538                                graph.new_actor_output_request(
539                                    actor_id,
540                                    upstream_actor_id,
541                                    request,
542                                );
543                                return;
544                            }
545                            PartialGraphStatus::Suspended(_) => {
546                                anyhow!("partial graph suspended")
547                            }
548                            PartialGraphStatus::Resetting => {
549                                anyhow!("partial graph resetting")
550                            }
551                            PartialGraphStatus::Unspecified => {
552                                unreachable!()
553                            }
554                        },
555                        Entry::Vacant(entry) => {
556                            entry.insert(PartialGraphStatus::ReceivedExchangeRequest(vec![(
557                                ids, request,
558                            )]));
559                            return;
560                        }
561                    }
562                };
563                if let TakeReceiverRequest::Remote(result_sender) = request {
564                    let _ = result_sender.send(Err(err.into()));
565                }
566            }
567            #[cfg(test)]
568            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
569                let partial_graph_status = self
570                    .state
571                    .partial_graphs
572                    .get(&crate::task::TEST_PARTIAL_GRAPH_ID)
573                    .unwrap();
574                let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
575                let _ = sender.send(partial_graph_state.local_barrier_manager.clone());
576            }
577            #[cfg(test)]
578            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
579                let partial_graph_status = self
580                    .state
581                    .partial_graphs
582                    .get_mut(&crate::task::TEST_PARTIAL_GRAPH_ID)
583                    .unwrap();
584
585                let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
586                assert!(!partial_graph_state.actor_states.contains_key(&actor_id));
587                let requests = partial_graph_state
588                    .actor_pending_new_output_requests
589                    .remove(&actor_id)
590                    .unwrap();
591                let _ = sender.send(requests);
592            }
593            #[cfg(test)]
594            LocalActorOperation::Flush(sender) => {
595                use futures::FutureExt;
596                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
597                    self.handle_streaming_control_request(
598                        request.request.expect("should not be empty"),
599                    )
600                    .unwrap();
601                }
602                while let Some(event) = self.state.next_event().now_or_never() {
603                    match event {
604                        ManagedBarrierStateEvent::BarrierCollected {
605                            barrier,
606                            partial_graph_id,
607                        } => {
608                            self.complete_barrier(partial_graph_id, barrier.epoch.prev);
609                        }
610                        ManagedBarrierStateEvent::ActorError { .. }
611                        | ManagedBarrierStateEvent::PartialGraphsReset { .. }
612                        | ManagedBarrierStateEvent::RegisterLocalUpstreamOutput { .. } => {
613                            unreachable!()
614                        }
615                    }
616                }
617                sender.send(()).unwrap()
618            }
619            LocalActorOperation::InspectState { result_sender } => {
620                let debug_info = self.to_debug_info();
621                let _ = result_sender.send(debug_info.to_string());
622            }
623        }
624    }
625}
626
627mod await_epoch_completed_future {
628    use std::future::Future;
629
630    use futures::FutureExt;
631    use futures::future::BoxFuture;
632    use risingwave_common::id::TableId;
633    use risingwave_hummock_sdk::SyncResult;
634    use risingwave_pb::stream_service::barrier_complete_response::{
635        PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
636        PbListFinishedSource, PbLoadFinishedSource,
637    };
638
639    use crate::error::StreamResult;
640    use crate::executor::Barrier;
641    use crate::task::{BarrierCompleteResult, await_tree_key};
642
643    pub(super) type AwaitEpochCompletedFuture =
644        impl Future<Output = (Barrier, StreamResult<BarrierCompleteResult>)> + 'static;
645
646    #[define_opaque(AwaitEpochCompletedFuture)]
647    #[expect(clippy::too_many_arguments)]
648    pub(super) fn instrument_complete_barrier_future(
649        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
650        barrier: Barrier,
651        barrier_await_tree_reg: Option<&await_tree::Registry>,
652        create_mview_progress: Vec<PbCreateMviewProgress>,
653        list_finished_source_ids: Vec<PbListFinishedSource>,
654        load_finished_source_ids: Vec<PbLoadFinishedSource>,
655        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
656        cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
657        truncate_tables: Vec<TableId>,
658        refresh_finished_tables: Vec<TableId>,
659    ) -> AwaitEpochCompletedFuture {
660        let prev_epoch = barrier.epoch.prev;
661        let future = async move {
662            if let Some(future) = complete_barrier_future {
663                let result = future.await;
664                result.map(Some)
665            } else {
666                Ok(None)
667            }
668        }
669        .map(move |result| {
670            (
671                barrier,
672                result.map(|sync_result| BarrierCompleteResult {
673                    sync_result,
674                    create_mview_progress,
675                    list_finished_source_ids,
676                    load_finished_source_ids,
677                    cdc_table_backfill_progress,
678                    cdc_source_offset_updated,
679                    truncate_tables,
680                    refresh_finished_tables,
681                }),
682            )
683        });
684        if let Some(reg) = barrier_await_tree_reg {
685            reg.register(
686                await_tree_key::BarrierAwait { prev_epoch },
687                format!("SyncEpoch({})", prev_epoch),
688            )
689            .instrument(future)
690            .left_future()
691        } else {
692            future.right_future()
693        }
694    }
695}
696
697use await_epoch_completed_future::*;
698use risingwave_common::catalog::TableId;
699use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
700use risingwave_storage::{StateStoreImpl, dispatch_state_store};
701
702use crate::executor::exchange::permit;
703
704fn sync_epoch(
705    state_store: &StateStoreImpl,
706    streaming_metrics: &StreamingMetrics,
707    prev_epoch: u64,
708    table_ids: HashSet<TableId>,
709) -> BoxFuture<'static, StreamResult<SyncResult>> {
710    let timer = streaming_metrics.barrier_sync_latency.start_timer();
711
712    let state_store = state_store.clone();
713    let future = async move {
714        dispatch_state_store!(state_store, hummock, {
715            hummock.sync(vec![(prev_epoch, table_ids)]).await
716        })
717    };
718
719    future
720        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
721        .inspect_ok(move |_| {
722            timer.observe_duration();
723        })
724        .map_err(move |e| {
725            tracing::error!(
726                prev_epoch,
727                error = %e.as_report(),
728                "Failed to sync state store",
729            );
730            e.into()
731        })
732        .boxed()
733}
734
735impl LocalBarrierWorker {
736    fn complete_barrier(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) {
737        {
738            let Some(graph_state) = self
739                .state
740                .partial_graphs
741                .get_mut(&partial_graph_id)
742                .expect("should exist")
743                .state_for_request()
744            else {
745                return;
746            };
747            let BarrierToComplete {
748                barrier,
749                table_ids,
750                create_mview_progress,
751                list_finished_source_ids,
752                load_finished_source_ids,
753                cdc_table_backfill_progress,
754                cdc_source_offset_updated,
755                truncate_tables,
756                refresh_finished_tables,
757            } = graph_state.pop_barrier_to_complete(prev_epoch);
758
759            let complete_barrier_future = match &barrier.kind {
760                BarrierKind::Unspecified => unreachable!(),
761                BarrierKind::Initial => {
762                    tracing::info!(
763                        epoch = prev_epoch,
764                        "ignore sealing data for the first barrier"
765                    );
766                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
767                    None
768                }
769                BarrierKind::Barrier => None,
770                BarrierKind::Checkpoint => Some(sync_epoch(
771                    &self.actor_manager.env.state_store(),
772                    &self.actor_manager.streaming_metrics,
773                    prev_epoch,
774                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
775                )),
776            };
777
778            self.await_epoch_completed_futures
779                .entry(partial_graph_id)
780                .or_default()
781                .push_back({
782                    instrument_complete_barrier_future(
783                        complete_barrier_future,
784                        barrier,
785                        self.actor_manager.await_tree_reg.as_ref(),
786                        create_mview_progress,
787                        list_finished_source_ids,
788                        load_finished_source_ids,
789                        cdc_table_backfill_progress,
790                        cdc_source_offset_updated,
791                        truncate_tables,
792                        refresh_finished_tables,
793                    )
794                });
795        }
796    }
797
798    fn on_epoch_completed(
799        &mut self,
800        partial_graph_id: PartialGraphId,
801        epoch: u64,
802        result: BarrierCompleteResult,
803    ) {
804        let BarrierCompleteResult {
805            create_mview_progress,
806            sync_result,
807            list_finished_source_ids,
808            load_finished_source_ids,
809            cdc_table_backfill_progress,
810            cdc_source_offset_updated,
811            truncate_tables,
812            refresh_finished_tables,
813        } = result;
814
815        let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
816            .map(|sync_result| {
817                (
818                    sync_result.uncommitted_ssts,
819                    sync_result.table_watermarks,
820                    sync_result.old_value_ssts,
821                    sync_result.vector_index_adds,
822                )
823            })
824            .unwrap_or_default();
825
826        let result = {
827            {
828                streaming_control_stream_response::Response::CompleteBarrier(
829                    BarrierCompleteResponse {
830                        request_id: "todo".to_owned(),
831                        partial_graph_id,
832                        epoch,
833                        status: None,
834                        create_mview_progress,
835                        synced_sstables: synced_sstables
836                            .into_iter()
837                            .map(
838                                |LocalSstableInfo {
839                                     sst_info,
840                                     table_stats,
841                                     created_at,
842                                 }| PbLocalSstableInfo {
843                                    sst: Some(sst_info.into()),
844                                    table_stats_map: to_prost_table_stats_map(table_stats),
845                                    created_at,
846                                },
847                            )
848                            .collect_vec(),
849                        worker_id: self.actor_manager.env.worker_id(),
850                        table_watermarks: table_watermarks
851                            .into_iter()
852                            .map(|(key, value)| (key, value.into()))
853                            .collect(),
854                        old_value_sstables: old_value_ssts
855                            .into_iter()
856                            .map(|sst| sst.sst_info.into())
857                            .collect(),
858                        list_finished_sources: list_finished_source_ids,
859                        load_finished_sources: load_finished_source_ids,
860                        cdc_source_offset_updated,
861                        vector_index_adds: vector_index_adds
862                            .into_iter()
863                            .map(|(table_id, adds)| {
864                                (
865                                    table_id,
866                                    PbVectorIndexAdds {
867                                        adds: adds.into_iter().map(|add| add.into()).collect(),
868                                    },
869                                )
870                            })
871                            .collect(),
872                        cdc_table_backfill_progress,
873                        truncate_tables,
874                        refresh_finished_tables,
875                    },
876                )
877            }
878        };
879
880        self.control_stream_handle.send_response(result);
881    }
882
883    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
884    /// barrier is finished, in managed mode.
885    ///
886    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
887    /// the root cause of the failure. The caller should then call `try_find_root_failure`
888    /// to find the root cause.
889    fn send_barrier(
890        &mut self,
891        barrier: &Barrier,
892        request: InjectBarrierRequest,
893    ) -> StreamResult<()> {
894        debug!(
895            target: "events::stream::barrier::manager::send",
896            "send barrier {:?}, actor_ids_to_collect = {:?}",
897            barrier,
898            request.actor_ids_to_collect
899        );
900
901        let status = self
902            .state
903            .partial_graphs
904            .get_mut(&request.partial_graph_id)
905            .expect("should exist");
906        if let Some(state) = status.state_for_request() {
907            state.transform_to_issued(barrier, request)?;
908        }
909        Ok(())
910    }
911
912    fn remove_partial_graphs(
913        &mut self,
914        partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
915    ) {
916        for partial_graph_id in partial_graph_ids {
917            if let Some(mut graph) = self.state.partial_graphs.remove(&partial_graph_id) {
918                if let Some(graph) = graph.state_for_request() {
919                    assert!(
920                        graph.graph_state.is_empty(),
921                        "non empty graph to be removed: {}",
922                        &graph.graph_state
923                    );
924                }
925            } else {
926                warn!(
927                    partial_graph_id = %partial_graph_id,
928                    "no partial graph to remove"
929                );
930            }
931        }
932    }
933
934    fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
935        match self.state.partial_graphs.entry(partial_graph_id) {
936            Entry::Occupied(entry) => {
937                let status = entry.into_mut();
938                if let PartialGraphStatus::ReceivedExchangeRequest(pending_requests) = status {
939                    let mut graph = PartialGraphState::new(
940                        partial_graph_id,
941                        self.term_id.clone(),
942                        self.actor_manager.clone(),
943                    );
944                    for ((upstream_actor_id, actor_id), request) in pending_requests.drain(..) {
945                        graph.new_actor_output_request(actor_id, upstream_actor_id, request);
946                    }
947                    *status = PartialGraphStatus::Running(graph);
948                } else {
949                    panic!("duplicated partial graph: {}", partial_graph_id);
950                }
951
952                status
953            }
954            Entry::Vacant(entry) => {
955                entry.insert(PartialGraphStatus::Running(PartialGraphState::new(
956                    partial_graph_id,
957                    self.term_id.clone(),
958                    self.actor_manager.clone(),
959                )))
960            }
961        };
962    }
963
964    fn reset_partial_graphs(&mut self, req: ResetPartialGraphsRequest) {
965        let mut table_ids_to_clear = HashSet::new();
966        let mut reset_futures = HashMap::new();
967        for partial_graph_id in req.partial_graph_ids {
968            if let Some(status) = self.state.partial_graphs.get_mut(&partial_graph_id) {
969                let reset_future = status.start_reset(
970                    partial_graph_id,
971                    self.await_epoch_completed_futures.remove(&partial_graph_id),
972                    &mut table_ids_to_clear,
973                );
974                reset_futures.insert(partial_graph_id, reset_future);
975            } else {
976                self.ack_partial_graph_reset(partial_graph_id, None);
977            }
978        }
979        if reset_futures.is_empty() {
980            assert!(table_ids_to_clear.is_empty());
981            return;
982        }
983        let state_store = self.actor_manager.env.state_store();
984        self.state.resetting_graphs.push(spawn(async move {
985            let outputs =
986                join_all(
987                    reset_futures
988                        .into_iter()
989                        .map(|(partial_graph_id, future)| async move {
990                            (partial_graph_id, future.await)
991                        }),
992                )
993                .await;
994            if !table_ids_to_clear.is_empty()
995                && let Some(hummock) = state_store.as_hummock()
996            {
997                hummock.clear_tables(table_ids_to_clear).await;
998            }
999            outputs
1000        }));
1001    }
1002
1003    fn ack_partial_graph_reset(
1004        &mut self,
1005        partial_graph_id: PartialGraphId,
1006        reset_output: Option<ResetPartialGraphOutput>,
1007    ) {
1008        info!(
1009            %partial_graph_id,
1010            "partial graph reset successfully"
1011        );
1012        assert!(!self.state.partial_graphs.contains_key(&partial_graph_id));
1013        self.await_epoch_completed_futures.remove(&partial_graph_id);
1014        self.control_stream_handle.ack_reset_partial_graph(
1015            partial_graph_id,
1016            reset_output.and_then(|output| output.root_err),
1017        );
1018    }
1019
1020    /// When some other failure happens (like failed to send barrier), the error is reported using
1021    /// this function. The control stream will be responded with a message to notify about the error,
1022    /// and the global barrier worker will later reset and rerun the partial graph.
1023    fn on_partial_graph_failure(
1024        &mut self,
1025        partial_graph_id: PartialGraphId,
1026        failed_actor: Option<ActorId>,
1027        err: StreamError,
1028        message: impl Into<String>,
1029    ) {
1030        let message = message.into();
1031        error!(%partial_graph_id, ?failed_actor, message, err = ?err.as_report(), "suspend partial graph on error");
1032        let completing_futures = self.await_epoch_completed_futures.remove(&partial_graph_id);
1033        self.state
1034            .partial_graphs
1035            .get_mut(&partial_graph_id)
1036            .expect("should exist")
1037            .suspend(failed_actor, err, completing_futures);
1038        self.control_stream_handle
1039            .send_response(Response::ReportPartialGraphFailure(
1040                ReportPartialGraphFailureResponse { partial_graph_id },
1041            ));
1042    }
1043
1044    /// Force stop all actors on this worker, and then drop their resources.
1045    async fn reset(&mut self, init_request: InitRequest) {
1046        join(
1047            join_all(
1048                self.state
1049                    .partial_graphs
1050                    .values_mut()
1051                    .map(|graph| graph.abort()),
1052            ),
1053            async {
1054                while let Some(join_result) = self.state.resetting_graphs.next().await {
1055                    join_result.expect("failed to join reset partial graphs handle");
1056                }
1057            },
1058        )
1059        .await;
1060        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1061            m.clear();
1062        }
1063
1064        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1065            hummock
1066                .clear_shared_buffer()
1067                .instrument_await("store_clear_shared_buffer".verbose())
1068                .await
1069        }
1070        self.actor_manager.env.dml_manager_ref().clear();
1071        *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1072        self.actor_manager.env.client_pool().invalidate_all();
1073    }
1074
1075    /// Create a [`LocalBarrierWorker`] with managed mode.
1076    pub fn spawn(
1077        env: StreamEnvironment,
1078        streaming_metrics: Arc<StreamingMetrics>,
1079        await_tree_reg: Option<await_tree::Registry>,
1080        watermark_epoch: AtomicU64Ref,
1081        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1082    ) -> JoinHandle<()> {
1083        let runtime = {
1084            let mut builder = tokio::runtime::Builder::new_multi_thread();
1085            if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1086                builder.worker_threads(worker_threads_num);
1087            }
1088            builder
1089                .thread_name("rw-streaming")
1090                .enable_all()
1091                .build()
1092                .unwrap()
1093        };
1094
1095        let actor_manager = Arc::new(StreamActorManager {
1096            env,
1097            streaming_metrics,
1098            watermark_epoch,
1099            await_tree_reg,
1100            runtime: runtime.into(),
1101            config_override_cache: ConfigOverrideCache::new(CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY),
1102        });
1103        let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1104        tokio::spawn(worker.run(actor_op_rx))
1105    }
1106}
1107
1108pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1109
1110impl<T> Clone for EventSender<T> {
1111    fn clone(&self) -> Self {
1112        Self(self.0.clone())
1113    }
1114}
1115
1116impl<T> EventSender<T> {
1117    pub(super) fn send_event(&self, event: T) {
1118        self.0.send(event).expect("should be able to send event")
1119    }
1120
1121    pub(super) async fn send_and_await<RSP>(
1122        &self,
1123        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1124    ) -> StreamResult<RSP> {
1125        let (tx, rx) = oneshot::channel();
1126        let event = make_event(tx);
1127        self.send_event(event);
1128        rx.await
1129            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1130    }
1131}
1132
1133pub(crate) enum NewOutputRequest {
1134    Local(permit::Sender),
1135    Remote(permit::Sender),
1136}
1137
1138#[cfg(test)]
1139pub(crate) mod barrier_test_utils {
1140    use assert_matches::assert_matches;
1141    use futures::StreamExt;
1142    use risingwave_pb::stream_service::streaming_control_stream_request::{
1143        InitRequest, PbCreatePartialGraphRequest,
1144    };
1145    use risingwave_pb::stream_service::{
1146        InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1147        StreamingControlStreamResponse, streaming_control_stream_request,
1148        streaming_control_stream_response,
1149    };
1150    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1151    use tokio::sync::oneshot;
1152    use tokio_stream::wrappers::UnboundedReceiverStream;
1153    use tonic::Status;
1154
1155    use crate::executor::Barrier;
1156    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1157    use crate::task::{ActorId, LocalBarrierManager, NewOutputRequest, TEST_PARTIAL_GRAPH_ID};
1158
1159    pub(crate) struct LocalBarrierTestEnv {
1160        pub local_barrier_manager: LocalBarrierManager,
1161        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1162        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1163        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1164    }
1165
1166    impl LocalBarrierTestEnv {
1167        pub(crate) async fn for_test() -> Self {
1168            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1169
1170            let (request_tx, request_rx) = unbounded_channel();
1171            let (response_tx, mut response_rx) = unbounded_channel();
1172
1173            request_tx
1174                .send(Ok(PbStreamingControlStreamRequest {
1175                    request: Some(
1176                        streaming_control_stream_request::Request::CreatePartialGraph(
1177                            PbCreatePartialGraphRequest {
1178                                partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1179                            },
1180                        ),
1181                    ),
1182                }))
1183                .unwrap();
1184
1185            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1186                handle: ControlStreamHandle::new(
1187                    response_tx,
1188                    UnboundedReceiverStream::new(request_rx).boxed(),
1189                ),
1190                init_request: InitRequest {
1191                    term_id: "for_test".into(),
1192                },
1193            });
1194
1195            assert_matches!(
1196                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1197                streaming_control_stream_response::Response::Init(_)
1198            );
1199
1200            let local_barrier_manager = actor_op_tx
1201                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1202                .await
1203                .unwrap();
1204
1205            Self {
1206                local_barrier_manager,
1207                actor_op_tx,
1208                request_tx,
1209                response_rx,
1210            }
1211        }
1212
1213        pub(crate) fn inject_barrier(
1214            &self,
1215            barrier: &Barrier,
1216            actor_to_collect: impl IntoIterator<Item = ActorId>,
1217        ) {
1218            self.request_tx
1219                .send(Ok(StreamingControlStreamRequest {
1220                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1221                        InjectBarrierRequest {
1222                            request_id: "".to_owned(),
1223                            barrier: Some(barrier.to_protobuf()),
1224                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1225                            table_ids_to_sync: vec![],
1226                            partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1227                            actors_to_build: vec![],
1228                        },
1229                    )),
1230                }))
1231                .unwrap();
1232        }
1233
1234        pub(crate) async fn flush_all_events(&self) {
1235            Self::flush_all_events_impl(&self.actor_op_tx).await
1236        }
1237
1238        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1239            let (tx, rx) = oneshot::channel();
1240            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1241            rx.await.unwrap()
1242        }
1243
1244        pub(crate) async fn take_pending_new_output_requests(
1245            &self,
1246            actor_id: ActorId,
1247        ) -> Vec<(ActorId, NewOutputRequest)> {
1248            self.actor_op_tx
1249                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1250                .await
1251                .unwrap()
1252        }
1253    }
1254}