risingwave_stream/task/barrier_worker/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::{BoxFuture, join, join_all};
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_pb::stream_plan::barrier::BarrierKind;
29use risingwave_pb::stream_service::barrier_complete_response::{
30    PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
31    PbListFinishedSource, PbLoadFinishedSource, PbLocalSstableInfo,
32};
33use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
34use risingwave_storage::store_impl::AsHummock;
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tokio::{select, spawn};
40use tonic::{Code, Status};
41use tracing::warn;
42
43use self::managed_state::ManagedBarrierState;
44use crate::error::{ScoredStreamError, StreamError, StreamResult};
45#[cfg(test)]
46use crate::task::LocalBarrierManager;
47use crate::task::managed_state::{BarrierToComplete, ResetPartialGraphOutput};
48use crate::task::{
49    ActorId, AtomicU64Ref, CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY, ConfigOverrideCache, FragmentId,
50    PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
51};
52pub mod managed_state;
53#[cfg(test)]
54mod tests;
55
56use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
57use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
58use risingwave_pb::stream_service::streaming_control_stream_request::{
59    InitRequest, Request, ResetPartialGraphsRequest,
60};
61use risingwave_pb::stream_service::streaming_control_stream_response::{
62    InitResponse, ReportPartialGraphFailureResponse, ResetPartialGraphResponse, Response,
63    ShutdownResponse,
64};
65use risingwave_pb::stream_service::{
66    BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
67    StreamingControlStreamResponse, streaming_control_stream_response,
68};
69
70use crate::executor::Barrier;
71use crate::executor::exchange::permit::Receiver;
72use crate::executor::monitor::StreamingMetrics;
73use crate::task::barrier_worker::managed_state::{
74    ManagedBarrierStateDebugInfo, ManagedBarrierStateEvent, PartialGraphState, PartialGraphStatus,
75};
76
77/// If enabled, all actors will be grouped in the same tracing span within one epoch.
78/// Note that this option will significantly increase the overhead of tracing.
79pub const ENABLE_BARRIER_AGGREGATION: bool = false;
80
81/// Collect result of some barrier on current compute node. Will be reported to the meta service in [`LocalBarrierWorker::on_epoch_completed`].
82#[derive(Debug)]
83pub struct BarrierCompleteResult {
84    /// The result returned from `sync` of `StateStore`.
85    pub sync_result: Option<SyncResult>,
86
87    /// The updated creation progress of materialized view after this barrier.
88    pub create_mview_progress: Vec<PbCreateMviewProgress>,
89
90    /// The source IDs that have finished listing data for refreshable batch sources.
91    pub list_finished_source_ids: Vec<PbListFinishedSource>,
92
93    /// The source IDs that have finished loading data for refreshable batch sources.
94    pub load_finished_source_ids: Vec<PbLoadFinishedSource>,
95
96    pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
97
98    /// CDC sources that have updated their offset at least once.
99    pub cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
100
101    /// The table IDs that should be truncated.
102    pub truncate_tables: Vec<TableId>,
103    /// The table IDs that have finished refresh.
104    pub refresh_finished_tables: Vec<TableId>,
105}
106
107/// Lives in [`crate::task::barrier_worker::LocalBarrierWorker`],
108/// Communicates with `ControlStreamManager` in meta.
109/// Handles [`risingwave_pb::stream_service::streaming_control_stream_request::Request`].
110pub(super) struct ControlStreamHandle {
111    #[expect(clippy::type_complexity)]
112    pair: Option<(
113        UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
114        BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
115    )>,
116}
117
118impl ControlStreamHandle {
119    fn empty() -> Self {
120        Self { pair: None }
121    }
122
123    pub(super) fn new(
124        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
125        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
126    ) -> Self {
127        Self {
128            pair: Some((sender, request_stream)),
129        }
130    }
131
132    pub(super) fn connected(&self) -> bool {
133        self.pair.is_some()
134    }
135
136    fn reset_stream_with_err(&mut self, err: Status) {
137        if let Some((sender, _)) = self.pair.take() {
138            // Note: `TonicStatusWrapper` provides a better error report.
139            let err = TonicStatusWrapper::new(err);
140            warn!(error = %err.as_report(), "control stream reset with error");
141
142            let err = err.into_inner();
143            if sender.send(Err(err)).is_err() {
144                warn!("failed to notify reset of control stream");
145            }
146        }
147    }
148
149    /// Send `Shutdown` message to the control stream and wait for the stream to be closed
150    /// by the meta service.
151    async fn shutdown_stream(&mut self) {
152        if let Some((sender, _)) = self.pair.take() {
153            if sender
154                .send(Ok(StreamingControlStreamResponse {
155                    response: Some(streaming_control_stream_response::Response::Shutdown(
156                        ShutdownResponse::default(),
157                    )),
158                }))
159                .is_err()
160            {
161                warn!("failed to notify shutdown of control stream");
162            } else {
163                tracing::info!("waiting for meta service to close control stream...");
164
165                // Wait for the stream to be closed, to ensure that the `Shutdown` message has
166                // been acknowledged by the meta service for more precise error report.
167                //
168                // This is because the meta service will reset the control stream manager and
169                // drop the connection to us upon recovery. As a result, the receiver part of
170                // this sender will also be dropped, causing the stream to close.
171                sender.closed().await;
172            }
173        } else {
174            debug!("control stream has been reset, ignore shutdown");
175        }
176    }
177
178    pub(super) fn ack_reset_partial_graph(
179        &mut self,
180        partial_graph_id: PartialGraphId,
181        root_err: Option<ScoredStreamError>,
182    ) {
183        self.send_response(Response::ResetPartialGraph(ResetPartialGraphResponse {
184            partial_graph_id,
185            root_err: root_err.map(|err| PbScoredError {
186                err_msg: err.error.to_report_string(),
187                score: err.score.0,
188            }),
189        }));
190    }
191
192    fn send_response(&mut self, response: streaming_control_stream_response::Response) {
193        if let Some((sender, _)) = self.pair.as_ref() {
194            if sender
195                .send(Ok(StreamingControlStreamResponse {
196                    response: Some(response),
197                }))
198                .is_err()
199            {
200                self.pair = None;
201                warn!("fail to send response. control stream reset");
202            }
203        } else {
204            debug!(?response, "control stream has been reset. ignore response");
205        }
206    }
207
208    async fn next_request(&mut self) -> StreamingControlStreamRequest {
209        if let Some((_, stream)) = &mut self.pair {
210            match stream.next().await {
211                Some(Ok(request)) => {
212                    return request;
213                }
214                Some(Err(e)) => self.reset_stream_with_err(
215                    anyhow!(TonicStatusWrapper::new(e)) // wrap the status to provide better error report
216                        .context("failed to get request")
217                        .to_status_unnamed(Code::Internal),
218                ),
219                None => self.reset_stream_with_err(Status::internal("end of stream")),
220            }
221        }
222        pending().await
223    }
224}
225
226pub(super) enum TakeReceiverRequest {
227    Remote {
228        result_sender: oneshot::Sender<StreamResult<Receiver>>,
229        upstream_fragment_id: FragmentId,
230    },
231    Local(permit::Sender),
232}
233
234/// Sent from [`crate::task::stream_manager::LocalStreamManager`] to [`crate::task::barrier_worker::LocalBarrierWorker::run`].
235///
236/// See [`crate::task`] for architecture overview.
237#[derive(strum_macros::Display)]
238pub(super) enum LocalActorOperation {
239    NewControlStream {
240        handle: ControlStreamHandle,
241        init_request: InitRequest,
242    },
243    TakeReceiver {
244        partial_graph_id: PartialGraphId,
245        term_id: String,
246        ids: UpDownActorIds,
247        request: TakeReceiverRequest,
248    },
249    #[cfg(test)]
250    GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
251    #[cfg(test)]
252    TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
253    #[cfg(test)]
254    Flush(oneshot::Sender<()>),
255    InspectState {
256        result_sender: oneshot::Sender<String>,
257    },
258    Shutdown {
259        result_sender: oneshot::Sender<()>,
260    },
261}
262
263pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
264    managed_barrier_state:
265        HashMap<PartialGraphId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
266    has_control_stream_connected: bool,
267}
268
269impl Display for LocalBarrierWorkerDebugInfo<'_> {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        writeln!(
272            f,
273            "\nhas_control_stream_connected: {}",
274            self.has_control_stream_connected
275        )?;
276
277        for (partial_graph_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
278            writeln!(
279                f,
280                "partial graph {} status: {} managed_barrier_state:\n{}",
281                partial_graph_id,
282                status,
283                managed_barrier_state
284                    .as_ref()
285                    .map(ToString::to_string)
286                    .unwrap_or_default()
287            )?;
288        }
289        Ok(())
290    }
291}
292
293/// [`LocalBarrierWorker`] manages barrier control flow.
294/// Specifically, [`LocalBarrierWorker`] serves barrier injection from meta server, sends the
295/// barriers to and collects them from all actors, and finally reports the progress.
296///
297/// Runs event loop in [`Self::run`]. Handles events sent by [`crate::task::LocalStreamManager`].
298///
299/// See [`crate::task`] for architecture overview.
300pub(super) struct LocalBarrierWorker {
301    /// Current barrier collection state.
302    pub(super) state: ManagedBarrierState,
303
304    /// Futures will be finished in the order of epoch in ascending order.
305    await_epoch_completed_futures:
306        HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
307
308    control_stream_handle: ControlStreamHandle,
309
310    pub(super) actor_manager: Arc<StreamActorManager>,
311
312    pub(super) term_id: String,
313}
314
315impl LocalBarrierWorker {
316    pub(super) fn new(actor_manager: Arc<StreamActorManager>, term_id: String) -> Self {
317        Self {
318            state: Default::default(),
319            await_epoch_completed_futures: Default::default(),
320            control_stream_handle: ControlStreamHandle::empty(),
321            actor_manager,
322            term_id,
323        }
324    }
325
326    fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
327        LocalBarrierWorkerDebugInfo {
328            managed_barrier_state: self
329                .state
330                .partial_graphs
331                .iter()
332                .map(|(partial_graph_id, status)| {
333                    (*partial_graph_id, {
334                        match status {
335                            PartialGraphStatus::ReceivedExchangeRequest(_) => {
336                                ("ReceivedExchangeRequest".to_owned(), None)
337                            }
338                            PartialGraphStatus::Running(state) => {
339                                ("running".to_owned(), Some(state.to_debug_info()))
340                            }
341                            PartialGraphStatus::Suspended(state) => {
342                                (format!("suspended: {:?}", state.suspend_time), None)
343                            }
344                            PartialGraphStatus::Resetting => ("resetting".to_owned(), None),
345                            PartialGraphStatus::Unspecified => {
346                                unreachable!()
347                            }
348                        }
349                    })
350                })
351                .collect(),
352            has_control_stream_connected: self.control_stream_handle.connected(),
353        }
354    }
355
356    async fn next_completed_epoch(
357        futures: &mut HashMap<PartialGraphId, FuturesOrdered<AwaitEpochCompletedFuture>>,
358    ) -> (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>) {
359        poll_fn(|cx| {
360            for (partial_graph_id, futures) in &mut *futures {
361                if let Poll::Ready(Some((barrier, result))) = futures.poll_next_unpin(cx) {
362                    return Poll::Ready((*partial_graph_id, barrier, result));
363                }
364            }
365            Poll::Pending
366        })
367        .await
368    }
369
370    async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
371        loop {
372            select! {
373                biased;
374                event = self.state.next_event() => {
375                    match event {
376                        ManagedBarrierStateEvent::BarrierCollected{
377                            partial_graph_id,
378                            barrier,
379                        } => {
380                            // update await_epoch_completed_futures
381                            // handled below in next_completed_epoch
382                            self.complete_barrier(partial_graph_id, barrier.epoch.prev);
383                        }
384                        ManagedBarrierStateEvent::ActorError{
385                            partial_graph_id,
386                            actor_id,
387                            err,
388                        } => {
389                            self.on_partial_graph_failure(partial_graph_id, Some(actor_id), err, "recv actor failure");
390                        }
391                        ManagedBarrierStateEvent::PartialGraphsReset(output) => {
392                            for (partial_graph_id, output) in output {
393                                self.ack_partial_graph_reset(partial_graph_id, Some(output));
394                            }
395                        }
396                        ManagedBarrierStateEvent::RegisterLocalUpstreamOutput{
397                            actor_id,
398                            upstream_actor_id,
399                            upstream_partial_graph_id,
400                            tx
401                        } => {
402                            self.handle_actor_op(LocalActorOperation::TakeReceiver {
403                                partial_graph_id: upstream_partial_graph_id,
404                                term_id: self.term_id.clone(),
405                                ids: (upstream_actor_id, actor_id),
406                                request: TakeReceiverRequest::Local(tx),
407                            });
408                        }
409                    }
410                }
411                (partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
412                    match result {
413                        Ok(result) => {
414                            self.on_epoch_completed(partial_graph_id, barrier.epoch.prev, result);
415                        }
416                        Err(err) => {
417                            // TODO: may only report as partial graph failure instead of reset the stream
418                            // when the HummockUploader support partial recovery. Currently the HummockUploader
419                            // enter `Err` state and stop working until a global recovery to clear the uploader.
420                            self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {:?} {:?}", partial_graph_id, barrier.epoch, err.as_report())));
421                        }
422                    }
423                },
424                actor_op = actor_op_rx.recv() => {
425                    if let Some(actor_op) = actor_op {
426                        match actor_op {
427                            LocalActorOperation::NewControlStream { handle, init_request  } => {
428                                self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
429                                self.reset(init_request).await;
430                                self.control_stream_handle = handle;
431                                self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
432                            }
433                            LocalActorOperation::Shutdown { result_sender } => {
434                                if self.state.partial_graphs.values().any(|graph| {
435                                    match graph {
436                                        PartialGraphStatus::Running(graph) => {
437                                            !graph.actor_states.is_empty()
438                                        }
439                                        PartialGraphStatus::Suspended(_) | PartialGraphStatus::Resetting |
440                                            PartialGraphStatus::ReceivedExchangeRequest(_) => {
441                                            false
442                                        }
443                                        PartialGraphStatus::Unspecified => {
444                                            unreachable!()
445                                        }
446                                    }
447                                }) {
448                                    tracing::warn!(
449                                        "shutdown with running actors, scaling or migration will be triggered"
450                                    );
451                                }
452                                self.control_stream_handle.shutdown_stream().await;
453                                let _ = result_sender.send(());
454                            }
455                            actor_op => {
456                                self.handle_actor_op(actor_op);
457                            }
458                        }
459                    }
460                    else {
461                        break;
462                    }
463                },
464                request = self.control_stream_handle.next_request() => {
465                    let result = self.handle_streaming_control_request(request.request.expect("non empty"));
466                    if let Err((partial_graph_id, err)) = result {
467                        self.on_partial_graph_failure(partial_graph_id, None, err, "failed to inject barrier");
468                    }
469                },
470            }
471        }
472    }
473
474    fn handle_streaming_control_request(
475        &mut self,
476        request: Request,
477    ) -> Result<(), (PartialGraphId, StreamError)> {
478        match request {
479            Request::InjectBarrier(req) => {
480                let partial_graph_id = req.partial_graph_id;
481                let result: StreamResult<()> = try {
482                    let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
483                    self.send_barrier(&barrier, req)?;
484                };
485                result.map_err(|e| (partial_graph_id, e))?;
486                Ok(())
487            }
488            Request::RemovePartialGraph(req) => {
489                self.remove_partial_graphs(req.partial_graph_ids);
490                Ok(())
491            }
492            Request::CreatePartialGraph(req) => {
493                self.add_partial_graph(req.partial_graph_id);
494                Ok(())
495            }
496            Request::ResetPartialGraphs(req) => {
497                self.reset_partial_graphs(req);
498                Ok(())
499            }
500            Request::Init(_) => {
501                unreachable!()
502            }
503        }
504    }
505
506    fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
507        match actor_op {
508            LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
509                unreachable!("event {actor_op} should be handled separately in async context")
510            }
511            LocalActorOperation::TakeReceiver {
512                partial_graph_id,
513                term_id,
514                ids,
515                request,
516            } => {
517                let err = if self.term_id != term_id {
518                    {
519                        warn!(
520                            ?ids,
521                            term_id,
522                            current_term_id = self.term_id,
523                            "take receiver on unmatched term_id"
524                        );
525                        anyhow!(
526                            "take receiver {:?} on unmatched term_id {} to current term_id {}",
527                            ids,
528                            term_id,
529                            self.term_id
530                        )
531                    }
532                } else {
533                    match self.state.partial_graphs.entry(partial_graph_id) {
534                        Entry::Occupied(mut entry) => match entry.get_mut() {
535                            PartialGraphStatus::ReceivedExchangeRequest(pending_requests) => {
536                                pending_requests.push((ids, request));
537                                return;
538                            }
539                            PartialGraphStatus::Running(graph) => {
540                                let (upstream_actor_id, actor_id) = ids;
541                                graph.new_actor_output_request(
542                                    actor_id,
543                                    upstream_actor_id,
544                                    request,
545                                );
546                                return;
547                            }
548                            PartialGraphStatus::Suspended(_) => {
549                                anyhow!("partial graph suspended")
550                            }
551                            PartialGraphStatus::Resetting => {
552                                anyhow!("partial graph resetting")
553                            }
554                            PartialGraphStatus::Unspecified => {
555                                unreachable!()
556                            }
557                        },
558                        Entry::Vacant(entry) => {
559                            entry.insert(PartialGraphStatus::ReceivedExchangeRequest(vec![(
560                                ids, request,
561                            )]));
562                            return;
563                        }
564                    }
565                };
566                if let TakeReceiverRequest::Remote { result_sender, .. } = request {
567                    let _ = result_sender.send(Err(err.into()));
568                }
569            }
570            #[cfg(test)]
571            LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
572                let partial_graph_status = self
573                    .state
574                    .partial_graphs
575                    .get(&crate::task::TEST_PARTIAL_GRAPH_ID)
576                    .unwrap();
577                let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
578                let _ = sender.send(partial_graph_state.local_barrier_manager.clone());
579            }
580            #[cfg(test)]
581            LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
582                let partial_graph_status = self
583                    .state
584                    .partial_graphs
585                    .get_mut(&crate::task::TEST_PARTIAL_GRAPH_ID)
586                    .unwrap();
587
588                let partial_graph_state = risingwave_common::must_match!(partial_graph_status, PartialGraphStatus::Running(database_state) => database_state);
589                assert!(!partial_graph_state.actor_states.contains_key(&actor_id));
590                let requests = partial_graph_state
591                    .actor_pending_new_output_requests
592                    .remove(&actor_id)
593                    .unwrap();
594                let _ = sender.send(requests);
595            }
596            #[cfg(test)]
597            LocalActorOperation::Flush(sender) => {
598                use futures::FutureExt;
599                while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
600                    self.handle_streaming_control_request(
601                        request.request.expect("should not be empty"),
602                    )
603                    .unwrap();
604                }
605                while let Some(event) = self.state.next_event().now_or_never() {
606                    match event {
607                        ManagedBarrierStateEvent::BarrierCollected {
608                            barrier,
609                            partial_graph_id,
610                        } => {
611                            self.complete_barrier(partial_graph_id, barrier.epoch.prev);
612                        }
613                        ManagedBarrierStateEvent::ActorError { .. }
614                        | ManagedBarrierStateEvent::PartialGraphsReset { .. }
615                        | ManagedBarrierStateEvent::RegisterLocalUpstreamOutput { .. } => {
616                            unreachable!()
617                        }
618                    }
619                }
620                sender.send(()).unwrap()
621            }
622            LocalActorOperation::InspectState { result_sender } => {
623                let debug_info = self.to_debug_info();
624                let _ = result_sender.send(debug_info.to_string());
625            }
626        }
627    }
628}
629
630mod await_epoch_completed_future {
631    use std::future::Future;
632
633    use futures::FutureExt;
634    use futures::future::BoxFuture;
635    use risingwave_common::id::TableId;
636    use risingwave_hummock_sdk::SyncResult;
637    use risingwave_pb::stream_service::barrier_complete_response::{
638        PbCdcSourceOffsetUpdated, PbCdcTableBackfillProgress, PbCreateMviewProgress,
639        PbListFinishedSource, PbLoadFinishedSource,
640    };
641
642    use crate::error::StreamResult;
643    use crate::executor::Barrier;
644    use crate::task::{BarrierCompleteResult, await_tree_key};
645
646    pub(super) type AwaitEpochCompletedFuture =
647        impl Future<Output = (Barrier, StreamResult<BarrierCompleteResult>)> + 'static;
648
649    #[define_opaque(AwaitEpochCompletedFuture)]
650    #[expect(clippy::too_many_arguments)]
651    pub(super) fn instrument_complete_barrier_future(
652        complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
653        barrier: Barrier,
654        barrier_await_tree_reg: Option<&await_tree::Registry>,
655        create_mview_progress: Vec<PbCreateMviewProgress>,
656        list_finished_source_ids: Vec<PbListFinishedSource>,
657        load_finished_source_ids: Vec<PbLoadFinishedSource>,
658        cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
659        cdc_source_offset_updated: Vec<PbCdcSourceOffsetUpdated>,
660        truncate_tables: Vec<TableId>,
661        refresh_finished_tables: Vec<TableId>,
662    ) -> AwaitEpochCompletedFuture {
663        let prev_epoch = barrier.epoch.prev;
664        let future = async move {
665            if let Some(future) = complete_barrier_future {
666                let result = future.await;
667                result.map(Some)
668            } else {
669                Ok(None)
670            }
671        }
672        .map(move |result| {
673            (
674                barrier,
675                result.map(|sync_result| BarrierCompleteResult {
676                    sync_result,
677                    create_mview_progress,
678                    list_finished_source_ids,
679                    load_finished_source_ids,
680                    cdc_table_backfill_progress,
681                    cdc_source_offset_updated,
682                    truncate_tables,
683                    refresh_finished_tables,
684                }),
685            )
686        });
687        if let Some(reg) = barrier_await_tree_reg {
688            reg.register(
689                await_tree_key::BarrierAwait { prev_epoch },
690                format!("SyncEpoch({})", prev_epoch),
691            )
692            .instrument(future)
693            .left_future()
694        } else {
695            future.right_future()
696        }
697    }
698}
699
700use await_epoch_completed_future::*;
701use risingwave_common::catalog::TableId;
702use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
703use risingwave_storage::{StateStoreImpl, dispatch_state_store};
704
705use crate::executor::exchange::permit;
706
707fn sync_epoch(
708    state_store: &StateStoreImpl,
709    streaming_metrics: &StreamingMetrics,
710    prev_epoch: u64,
711    table_ids: HashSet<TableId>,
712) -> BoxFuture<'static, StreamResult<SyncResult>> {
713    let timer = streaming_metrics.barrier_sync_latency.start_timer();
714
715    let state_store = state_store.clone();
716    let future = async move {
717        dispatch_state_store!(state_store, hummock, {
718            hummock.sync(vec![(prev_epoch, table_ids)]).await
719        })
720    };
721
722    future
723        .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
724        .inspect_ok(move |_| {
725            timer.observe_duration();
726        })
727        .map_err(move |e| {
728            tracing::error!(
729                prev_epoch,
730                error = %e.as_report(),
731                "Failed to sync state store",
732            );
733            e.into()
734        })
735        .boxed()
736}
737
738impl LocalBarrierWorker {
739    fn complete_barrier(&mut self, partial_graph_id: PartialGraphId, prev_epoch: u64) {
740        {
741            let Some(graph_state) = self
742                .state
743                .partial_graphs
744                .get_mut(&partial_graph_id)
745                .expect("should exist")
746                .state_for_request()
747            else {
748                return;
749            };
750            let BarrierToComplete {
751                barrier,
752                table_ids,
753                create_mview_progress,
754                list_finished_source_ids,
755                load_finished_source_ids,
756                cdc_table_backfill_progress,
757                cdc_source_offset_updated,
758                truncate_tables,
759                refresh_finished_tables,
760            } = graph_state.pop_barrier_to_complete(prev_epoch);
761
762            let complete_barrier_future = match &barrier.kind {
763                BarrierKind::Unspecified => unreachable!(),
764                BarrierKind::Initial => {
765                    tracing::info!(
766                        epoch = prev_epoch,
767                        "ignore sealing data for the first barrier"
768                    );
769                    tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
770                    None
771                }
772                BarrierKind::Barrier => None,
773                BarrierKind::Checkpoint => Some(sync_epoch(
774                    &self.actor_manager.env.state_store(),
775                    &self.actor_manager.streaming_metrics,
776                    prev_epoch,
777                    table_ids.expect("should be Some on BarrierKind::Checkpoint"),
778                )),
779            };
780
781            self.await_epoch_completed_futures
782                .entry(partial_graph_id)
783                .or_default()
784                .push_back({
785                    instrument_complete_barrier_future(
786                        complete_barrier_future,
787                        barrier,
788                        self.actor_manager.await_tree_reg.as_ref(),
789                        create_mview_progress,
790                        list_finished_source_ids,
791                        load_finished_source_ids,
792                        cdc_table_backfill_progress,
793                        cdc_source_offset_updated,
794                        truncate_tables,
795                        refresh_finished_tables,
796                    )
797                });
798        }
799    }
800
801    fn on_epoch_completed(
802        &mut self,
803        partial_graph_id: PartialGraphId,
804        epoch: u64,
805        result: BarrierCompleteResult,
806    ) {
807        let BarrierCompleteResult {
808            create_mview_progress,
809            sync_result,
810            list_finished_source_ids,
811            load_finished_source_ids,
812            cdc_table_backfill_progress,
813            cdc_source_offset_updated,
814            truncate_tables,
815            refresh_finished_tables,
816        } = result;
817
818        let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
819            .map(|sync_result| {
820                (
821                    sync_result.uncommitted_ssts,
822                    sync_result.table_watermarks,
823                    sync_result.old_value_ssts,
824                    sync_result.vector_index_adds,
825                )
826            })
827            .unwrap_or_default();
828
829        let result = {
830            {
831                streaming_control_stream_response::Response::CompleteBarrier(
832                    BarrierCompleteResponse {
833                        request_id: "todo".to_owned(),
834                        partial_graph_id,
835                        epoch,
836                        status: None,
837                        create_mview_progress,
838                        synced_sstables: synced_sstables
839                            .into_iter()
840                            .map(
841                                |LocalSstableInfo {
842                                     sst_info,
843                                     table_stats,
844                                     created_at,
845                                 }| PbLocalSstableInfo {
846                                    sst: Some(sst_info.into()),
847                                    table_stats_map: to_prost_table_stats_map(table_stats),
848                                    created_at,
849                                },
850                            )
851                            .collect_vec(),
852                        worker_id: self.actor_manager.env.worker_id(),
853                        table_watermarks: table_watermarks
854                            .into_iter()
855                            .map(|(key, value)| (key, value.into()))
856                            .collect(),
857                        old_value_sstables: old_value_ssts
858                            .into_iter()
859                            .map(|sst| sst.sst_info.into())
860                            .collect(),
861                        list_finished_sources: list_finished_source_ids,
862                        load_finished_sources: load_finished_source_ids,
863                        cdc_source_offset_updated,
864                        vector_index_adds: vector_index_adds
865                            .into_iter()
866                            .map(|(table_id, adds)| {
867                                (
868                                    table_id,
869                                    PbVectorIndexAdds {
870                                        adds: adds.into_iter().map(|add| add.into()).collect(),
871                                    },
872                                )
873                            })
874                            .collect(),
875                        cdc_table_backfill_progress,
876                        truncate_tables,
877                        refresh_finished_tables,
878                    },
879                )
880            }
881        };
882
883        self.control_stream_handle.send_response(result);
884    }
885
886    /// Broadcast a barrier to all senders. Save a receiver which will get notified when this
887    /// barrier is finished, in managed mode.
888    ///
889    /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
890    /// the root cause of the failure. The caller should then call `try_find_root_failure`
891    /// to find the root cause.
892    fn send_barrier(
893        &mut self,
894        barrier: &Barrier,
895        request: InjectBarrierRequest,
896    ) -> StreamResult<()> {
897        debug!(
898            target: "events::stream::barrier::manager::send",
899            "send barrier {:?}, actor_ids_to_collect = {:?}",
900            barrier,
901            request.actor_ids_to_collect
902        );
903
904        let status = self
905            .state
906            .partial_graphs
907            .get_mut(&request.partial_graph_id)
908            .expect("should exist");
909        if let Some(state) = status.state_for_request() {
910            state.transform_to_issued(barrier, request)?;
911        }
912        Ok(())
913    }
914
915    fn remove_partial_graphs(
916        &mut self,
917        partial_graph_ids: impl IntoIterator<Item = PartialGraphId>,
918    ) {
919        for partial_graph_id in partial_graph_ids {
920            if let Some(mut graph) = self.state.partial_graphs.remove(&partial_graph_id) {
921                if let Some(graph) = graph.state_for_request() {
922                    assert!(
923                        graph.graph_state.is_empty(),
924                        "non empty graph to be removed: {}",
925                        &graph.graph_state
926                    );
927                }
928            } else {
929                warn!(
930                    partial_graph_id = %partial_graph_id,
931                    "no partial graph to remove"
932                );
933            }
934        }
935    }
936
937    fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
938        match self.state.partial_graphs.entry(partial_graph_id) {
939            Entry::Occupied(entry) => {
940                let status = entry.into_mut();
941                if let PartialGraphStatus::ReceivedExchangeRequest(pending_requests) = status {
942                    let mut graph = PartialGraphState::new(
943                        partial_graph_id,
944                        self.term_id.clone(),
945                        self.actor_manager.clone(),
946                    );
947                    for ((upstream_actor_id, actor_id), request) in pending_requests.drain(..) {
948                        graph.new_actor_output_request(actor_id, upstream_actor_id, request);
949                    }
950                    *status = PartialGraphStatus::Running(graph);
951                } else {
952                    panic!("duplicated partial graph: {}", partial_graph_id);
953                }
954
955                status
956            }
957            Entry::Vacant(entry) => {
958                entry.insert(PartialGraphStatus::Running(PartialGraphState::new(
959                    partial_graph_id,
960                    self.term_id.clone(),
961                    self.actor_manager.clone(),
962                )))
963            }
964        };
965    }
966
967    fn reset_partial_graphs(&mut self, req: ResetPartialGraphsRequest) {
968        let mut table_ids_to_clear = HashSet::new();
969        let mut reset_futures = HashMap::new();
970        for partial_graph_id in req.partial_graph_ids {
971            if let Some(status) = self.state.partial_graphs.get_mut(&partial_graph_id) {
972                let reset_future = status.start_reset(
973                    partial_graph_id,
974                    self.await_epoch_completed_futures.remove(&partial_graph_id),
975                    &mut table_ids_to_clear,
976                );
977                reset_futures.insert(partial_graph_id, reset_future);
978            } else {
979                self.ack_partial_graph_reset(partial_graph_id, None);
980            }
981        }
982        if reset_futures.is_empty() {
983            assert!(table_ids_to_clear.is_empty());
984            return;
985        }
986        let state_store = self.actor_manager.env.state_store();
987        self.state.resetting_graphs.push(spawn(async move {
988            let outputs =
989                join_all(
990                    reset_futures
991                        .into_iter()
992                        .map(|(partial_graph_id, future)| async move {
993                            (partial_graph_id, future.await)
994                        }),
995                )
996                .await;
997            if !table_ids_to_clear.is_empty()
998                && let Some(hummock) = state_store.as_hummock()
999            {
1000                hummock.clear_tables(table_ids_to_clear).await;
1001            }
1002            outputs
1003        }));
1004    }
1005
1006    fn ack_partial_graph_reset(
1007        &mut self,
1008        partial_graph_id: PartialGraphId,
1009        reset_output: Option<ResetPartialGraphOutput>,
1010    ) {
1011        info!(
1012            %partial_graph_id,
1013            "partial graph reset successfully"
1014        );
1015        assert!(!self.state.partial_graphs.contains_key(&partial_graph_id));
1016        self.await_epoch_completed_futures.remove(&partial_graph_id);
1017        self.control_stream_handle.ack_reset_partial_graph(
1018            partial_graph_id,
1019            reset_output.and_then(|output| output.root_err),
1020        );
1021    }
1022
1023    /// When some other failure happens (like failed to send barrier), the error is reported using
1024    /// this function. The control stream will be responded with a message to notify about the error,
1025    /// and the global barrier worker will later reset and rerun the partial graph.
1026    fn on_partial_graph_failure(
1027        &mut self,
1028        partial_graph_id: PartialGraphId,
1029        failed_actor: Option<ActorId>,
1030        err: StreamError,
1031        message: impl Into<String>,
1032    ) {
1033        let message = message.into();
1034        error!(%partial_graph_id, ?failed_actor, message, err = ?err.as_report(), "suspend partial graph on error");
1035        let completing_futures = self.await_epoch_completed_futures.remove(&partial_graph_id);
1036        self.state
1037            .partial_graphs
1038            .get_mut(&partial_graph_id)
1039            .expect("should exist")
1040            .suspend(failed_actor, err, completing_futures);
1041        self.control_stream_handle
1042            .send_response(Response::ReportPartialGraphFailure(
1043                ReportPartialGraphFailureResponse { partial_graph_id },
1044            ));
1045    }
1046
1047    /// Force stop all actors on this worker, and then drop their resources.
1048    async fn reset(&mut self, init_request: InitRequest) {
1049        join(
1050            join_all(
1051                self.state
1052                    .partial_graphs
1053                    .values_mut()
1054                    .map(|graph| graph.abort()),
1055            ),
1056            async {
1057                while let Some(join_result) = self.state.resetting_graphs.next().await {
1058                    join_result.expect("failed to join reset partial graphs handle");
1059                }
1060            },
1061        )
1062        .await;
1063        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1064            m.clear();
1065        }
1066
1067        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1068            hummock
1069                .clear_shared_buffer()
1070                .instrument_await("store_clear_shared_buffer".verbose())
1071                .await
1072        }
1073        self.actor_manager.env.dml_manager_ref().clear();
1074        *self = Self::new(self.actor_manager.clone(), init_request.term_id);
1075        self.actor_manager.env.client_pool().invalidate_all();
1076    }
1077
1078    /// Create a [`LocalBarrierWorker`] with managed mode.
1079    pub fn spawn(
1080        env: StreamEnvironment,
1081        streaming_metrics: Arc<StreamingMetrics>,
1082        await_tree_reg: Option<await_tree::Registry>,
1083        watermark_epoch: AtomicU64Ref,
1084        actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1085    ) -> JoinHandle<()> {
1086        let runtime = {
1087            let mut builder = tokio::runtime::Builder::new_multi_thread();
1088            if let Some(worker_threads_num) = env.global_config().actor_runtime_worker_threads_num {
1089                builder.worker_threads(worker_threads_num);
1090            }
1091            builder
1092                .thread_name("rw-streaming")
1093                .enable_all()
1094                .build()
1095                .unwrap()
1096        };
1097
1098        let actor_manager = Arc::new(StreamActorManager {
1099            env,
1100            streaming_metrics,
1101            watermark_epoch,
1102            await_tree_reg,
1103            runtime: runtime.into(),
1104            config_override_cache: ConfigOverrideCache::new(CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY),
1105        });
1106        let worker = LocalBarrierWorker::new(actor_manager, "uninitialized".into());
1107        tokio::spawn(worker.run(actor_op_rx))
1108    }
1109}
1110
1111pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1112
1113impl<T> Clone for EventSender<T> {
1114    fn clone(&self) -> Self {
1115        Self(self.0.clone())
1116    }
1117}
1118
1119impl<T> EventSender<T> {
1120    pub(super) fn send_event(&self, event: T) {
1121        self.0.send(event).expect("should be able to send event")
1122    }
1123
1124    pub(super) async fn send_and_await<RSP>(
1125        &self,
1126        make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1127    ) -> StreamResult<RSP> {
1128        let (tx, rx) = oneshot::channel();
1129        let event = make_event(tx);
1130        self.send_event(event);
1131        rx.await
1132            .map_err(|_| anyhow!("barrier manager maybe reset").into())
1133    }
1134}
1135
1136pub(crate) enum NewOutputRequest {
1137    Local(permit::Sender),
1138    Remote(permit::Sender),
1139}
1140
1141#[cfg(test)]
1142pub(crate) mod barrier_test_utils {
1143    use assert_matches::assert_matches;
1144    use futures::StreamExt;
1145    use risingwave_pb::stream_service::streaming_control_stream_request::{
1146        InitRequest, PbCreatePartialGraphRequest,
1147    };
1148    use risingwave_pb::stream_service::{
1149        InjectBarrierRequest, PbStreamingControlStreamRequest, StreamingControlStreamRequest,
1150        StreamingControlStreamResponse, streaming_control_stream_request,
1151        streaming_control_stream_response,
1152    };
1153    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1154    use tokio::sync::oneshot;
1155    use tokio_stream::wrappers::UnboundedReceiverStream;
1156    use tonic::Status;
1157
1158    use crate::executor::Barrier;
1159    use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1160    use crate::task::{ActorId, LocalBarrierManager, NewOutputRequest, TEST_PARTIAL_GRAPH_ID};
1161
1162    pub(crate) struct LocalBarrierTestEnv {
1163        pub local_barrier_manager: LocalBarrierManager,
1164        pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1165        pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1166        pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1167    }
1168
1169    impl LocalBarrierTestEnv {
1170        pub(crate) async fn for_test() -> Self {
1171            let actor_op_tx = LocalBarrierManager::spawn_for_test();
1172
1173            let (request_tx, request_rx) = unbounded_channel();
1174            let (response_tx, mut response_rx) = unbounded_channel();
1175
1176            request_tx
1177                .send(Ok(PbStreamingControlStreamRequest {
1178                    request: Some(
1179                        streaming_control_stream_request::Request::CreatePartialGraph(
1180                            PbCreatePartialGraphRequest {
1181                                partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1182                            },
1183                        ),
1184                    ),
1185                }))
1186                .unwrap();
1187
1188            actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1189                handle: ControlStreamHandle::new(
1190                    response_tx,
1191                    UnboundedReceiverStream::new(request_rx).boxed(),
1192                ),
1193                init_request: InitRequest {
1194                    term_id: "for_test".into(),
1195                },
1196            });
1197
1198            assert_matches!(
1199                response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1200                streaming_control_stream_response::Response::Init(_)
1201            );
1202
1203            let local_barrier_manager = actor_op_tx
1204                .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1205                .await
1206                .unwrap();
1207
1208            Self {
1209                local_barrier_manager,
1210                actor_op_tx,
1211                request_tx,
1212                response_rx,
1213            }
1214        }
1215
1216        pub(crate) fn inject_barrier(
1217            &self,
1218            barrier: &Barrier,
1219            actor_to_collect: impl IntoIterator<Item = ActorId>,
1220        ) {
1221            self.request_tx
1222                .send(Ok(StreamingControlStreamRequest {
1223                    request: Some(streaming_control_stream_request::Request::InjectBarrier(
1224                        InjectBarrierRequest {
1225                            request_id: "".to_owned(),
1226                            barrier: Some(barrier.to_protobuf()),
1227                            actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1228                            table_ids_to_sync: vec![],
1229                            partial_graph_id: TEST_PARTIAL_GRAPH_ID,
1230                            actors_to_build: vec![],
1231                        },
1232                    )),
1233                }))
1234                .unwrap();
1235        }
1236
1237        pub(crate) async fn flush_all_events(&self) {
1238            Self::flush_all_events_impl(&self.actor_op_tx).await
1239        }
1240
1241        pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1242            let (tx, rx) = oneshot::channel();
1243            actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1244            rx.await.unwrap()
1245        }
1246
1247        pub(crate) async fn take_pending_new_output_requests(
1248            &self,
1249            actor_id: ActorId,
1250        ) -> Vec<(ActorId, NewOutputRequest)> {
1251            self.actor_op_tx
1252                .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1253                .await
1254                .unwrap()
1255        }
1256    }
1257}