1use std::collections::hash_map::Entry;
15use std::collections::{HashMap, HashSet};
16use std::fmt::Display;
17use std::future::{pending, poll_fn};
18use std::sync::Arc;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use await_tree::{InstrumentAwait, SpanExt};
23use futures::future::{BoxFuture, join_all};
24use futures::stream::{BoxStream, FuturesOrdered};
25use futures::{FutureExt, StreamExt, TryFutureExt};
26use itertools::Itertools;
27use risingwave_pb::stream_plan::barrier::BarrierKind;
28use risingwave_pb::stream_service::barrier_complete_response::{
29 PbCdcTableBackfillProgress, PbCreateMviewProgress, PbLocalSstableInfo,
30};
31use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
32use risingwave_storage::store_impl::AsHummock;
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot;
37use tokio::task::JoinHandle;
38use tonic::{Code, Status};
39use tracing::warn;
40
41use self::managed_state::ManagedBarrierState;
42use crate::error::{ScoredStreamError, StreamError, StreamResult};
43#[cfg(test)]
44use crate::task::LocalBarrierManager;
45use crate::task::{
46 ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
47};
48pub mod managed_state;
49#[cfg(test)]
50mod tests;
51
52use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
53use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
54use risingwave_pb::stream_service::streaming_control_stream_request::{
55 DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
56};
57use risingwave_pb::stream_service::streaming_control_stream_response::{
58 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
59};
60use risingwave_pb::stream_service::{
61 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
62 StreamingControlStreamResponse, streaming_control_stream_response,
63};
64
65use crate::executor::Barrier;
66use crate::executor::exchange::permit::Receiver;
67use crate::executor::monitor::StreamingMetrics;
68use crate::task::barrier_worker::managed_state::{
69 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
70 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
71};
72
73pub const ENABLE_BARRIER_AGGREGATION: bool = false;
76
77#[derive(Debug)]
79pub struct BarrierCompleteResult {
80 pub sync_result: Option<SyncResult>,
82
83 pub create_mview_progress: Vec<PbCreateMviewProgress>,
85
86 pub load_finished_source_ids: Vec<u32>,
88
89 pub cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
90}
91
92pub(super) struct ControlStreamHandle {
96 #[expect(clippy::type_complexity)]
97 pair: Option<(
98 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
99 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
100 )>,
101}
102
103impl ControlStreamHandle {
104 fn empty() -> Self {
105 Self { pair: None }
106 }
107
108 pub(super) fn new(
109 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
110 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
111 ) -> Self {
112 Self {
113 pair: Some((sender, request_stream)),
114 }
115 }
116
117 pub(super) fn connected(&self) -> bool {
118 self.pair.is_some()
119 }
120
121 fn reset_stream_with_err(&mut self, err: Status) {
122 if let Some((sender, _)) = self.pair.take() {
123 let err = TonicStatusWrapper::new(err);
125 warn!(error = %err.as_report(), "control stream reset with error");
126
127 let err = err.into_inner();
128 if sender.send(Err(err)).is_err() {
129 warn!("failed to notify reset of control stream");
130 }
131 }
132 }
133
134 async fn shutdown_stream(&mut self) {
137 if let Some((sender, _)) = self.pair.take() {
138 if sender
139 .send(Ok(StreamingControlStreamResponse {
140 response: Some(streaming_control_stream_response::Response::Shutdown(
141 ShutdownResponse::default(),
142 )),
143 }))
144 .is_err()
145 {
146 warn!("failed to notify shutdown of control stream");
147 } else {
148 tracing::info!("waiting for meta service to close control stream...");
149
150 sender.closed().await;
157 }
158 } else {
159 debug!("control stream has been reset, ignore shutdown");
160 }
161 }
162
163 pub(super) fn ack_reset_database(
164 &mut self,
165 database_id: DatabaseId,
166 root_err: Option<ScoredStreamError>,
167 reset_request_id: u32,
168 ) {
169 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
170 database_id: database_id.database_id,
171 root_err: root_err.map(|err| PbScoredError {
172 err_msg: err.error.to_report_string(),
173 score: err.score.0,
174 }),
175 reset_request_id,
176 }));
177 }
178
179 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
180 if let Some((sender, _)) = self.pair.as_ref() {
181 if sender
182 .send(Ok(StreamingControlStreamResponse {
183 response: Some(response),
184 }))
185 .is_err()
186 {
187 self.pair = None;
188 warn!("fail to send response. control stream reset");
189 }
190 } else {
191 debug!(?response, "control stream has been reset. ignore response");
192 }
193 }
194
195 async fn next_request(&mut self) -> StreamingControlStreamRequest {
196 if let Some((_, stream)) = &mut self.pair {
197 match stream.next().await {
198 Some(Ok(request)) => {
199 return request;
200 }
201 Some(Err(e)) => self.reset_stream_with_err(
202 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
204 .to_status_unnamed(Code::Internal),
205 ),
206 None => self.reset_stream_with_err(Status::internal("end of stream")),
207 }
208 }
209 pending().await
210 }
211}
212
213#[derive(strum_macros::Display)]
217pub(super) enum LocalActorOperation {
218 NewControlStream {
219 handle: ControlStreamHandle,
220 init_request: InitRequest,
221 },
222 TakeReceiver {
223 database_id: DatabaseId,
224 term_id: String,
225 ids: UpDownActorIds,
226 result_sender: oneshot::Sender<StreamResult<Receiver>>,
227 },
228 #[cfg(test)]
229 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
230 #[cfg(test)]
231 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
232 #[cfg(test)]
233 Flush(oneshot::Sender<()>),
234 InspectState {
235 result_sender: oneshot::Sender<String>,
236 },
237 Shutdown {
238 result_sender: oneshot::Sender<()>,
239 },
240}
241
242pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
243 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
244 has_control_stream_connected: bool,
245}
246
247impl Display for LocalBarrierWorkerDebugInfo<'_> {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 writeln!(
250 f,
251 "\nhas_control_stream_connected: {}",
252 self.has_control_stream_connected
253 )?;
254
255 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
256 writeln!(
257 f,
258 "database {} status: {} managed_barrier_state:\n{}",
259 database_id.database_id,
260 status,
261 managed_barrier_state
262 .as_ref()
263 .map(ToString::to_string)
264 .unwrap_or_default()
265 )?;
266 }
267 Ok(())
268 }
269}
270
271pub(super) struct LocalBarrierWorker {
279 pub(super) state: ManagedBarrierState,
281
282 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
284
285 control_stream_handle: ControlStreamHandle,
286
287 pub(super) actor_manager: Arc<StreamActorManager>,
288
289 pub(super) term_id: String,
290}
291
292impl LocalBarrierWorker {
293 pub(super) fn new(
294 actor_manager: Arc<StreamActorManager>,
295 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
296 term_id: String,
297 ) -> Self {
298 let state = ManagedBarrierState::new(
299 actor_manager.clone(),
300 initial_partial_graphs,
301 term_id.clone(),
302 );
303 Self {
304 state,
305 await_epoch_completed_futures: Default::default(),
306 control_stream_handle: ControlStreamHandle::empty(),
307 actor_manager,
308 term_id,
309 }
310 }
311
312 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
313 LocalBarrierWorkerDebugInfo {
314 managed_barrier_state: self
315 .state
316 .databases
317 .iter()
318 .map(|(database_id, status)| {
319 (*database_id, {
320 match status {
321 DatabaseStatus::ReceivedExchangeRequest(_) => {
322 ("ReceivedExchangeRequest".to_owned(), None)
323 }
324 DatabaseStatus::Running(state) => {
325 ("running".to_owned(), Some(state.to_debug_info()))
326 }
327 DatabaseStatus::Suspended(state) => {
328 (format!("suspended: {:?}", state.suspend_time), None)
329 }
330 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
331 DatabaseStatus::Unspecified => {
332 unreachable!()
333 }
334 }
335 })
336 })
337 .collect(),
338 has_control_stream_connected: self.control_stream_handle.connected(),
339 }
340 }
341
342 async fn next_completed_epoch(
343 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
344 ) -> (
345 DatabaseId,
346 PartialGraphId,
347 Barrier,
348 StreamResult<BarrierCompleteResult>,
349 ) {
350 poll_fn(|cx| {
351 for (database_id, futures) in &mut *futures {
352 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
353 futures.poll_next_unpin(cx)
354 {
355 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
356 }
357 }
358 Poll::Pending
359 })
360 .await
361 }
362
363 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
364 loop {
365 select! {
366 biased;
367 (database_id, event) = self.state.next_event() => {
368 match event {
369 ManagedBarrierStateEvent::BarrierCollected{
370 partial_graph_id,
371 barrier,
372 } => {
373 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
376 }
377 ManagedBarrierStateEvent::ActorError{
378 actor_id,
379 err,
380 } => {
381 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
382 }
383 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
384 self.ack_database_reset(database_id, Some(output), reset_request_id);
385 }
386 }
387 }
388 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
389 match result {
390 Ok(result) => {
391 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
392 }
393 Err(err) => {
394 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
398 }
399 }
400 },
401 actor_op = actor_op_rx.recv() => {
402 if let Some(actor_op) = actor_op {
403 match actor_op {
404 LocalActorOperation::NewControlStream { handle, init_request } => {
405 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
406 self.reset(init_request).await;
407 self.control_stream_handle = handle;
408 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
409 }
410 LocalActorOperation::Shutdown { result_sender } => {
411 if self.state.databases.values().any(|database| {
412 match database {
413 DatabaseStatus::Running(database) => {
414 !database.actor_states.is_empty()
415 }
416 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
417 DatabaseStatus::ReceivedExchangeRequest(_) => {
418 false
419 }
420 DatabaseStatus::Unspecified => {
421 unreachable!()
422 }
423 }
424 }) {
425 tracing::warn!(
426 "shutdown with running actors, scaling or migration will be triggered"
427 );
428 }
429 self.control_stream_handle.shutdown_stream().await;
430 let _ = result_sender.send(());
431 }
432 actor_op => {
433 self.handle_actor_op(actor_op);
434 }
435 }
436 }
437 else {
438 break;
439 }
440 },
441 request = self.control_stream_handle.next_request() => {
442 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
443 if let Err((database_id, err)) = result {
444 self.on_database_failure(database_id, None, err, "failed to inject barrier");
445 }
446 },
447 }
448 }
449 }
450
451 fn handle_streaming_control_request(
452 &mut self,
453 request: Request,
454 ) -> Result<(), (DatabaseId, StreamError)> {
455 match request {
456 Request::InjectBarrier(req) => {
457 let database_id = DatabaseId::new(req.database_id);
458 let result: StreamResult<()> = try {
459 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
460 self.send_barrier(&barrier, req)?;
461 };
462 result.map_err(|e| (database_id, e))?;
463 Ok(())
464 }
465 Request::RemovePartialGraph(req) => {
466 self.remove_partial_graphs(
467 DatabaseId::new(req.database_id),
468 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
469 );
470 Ok(())
471 }
472 Request::CreatePartialGraph(req) => {
473 self.add_partial_graph(
474 DatabaseId::new(req.database_id),
475 PartialGraphId::new(req.partial_graph_id),
476 );
477 Ok(())
478 }
479 Request::ResetDatabase(req) => {
480 self.reset_database(req);
481 Ok(())
482 }
483 Request::Init(_) => {
484 unreachable!()
485 }
486 }
487 }
488
489 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
490 match actor_op {
491 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
492 unreachable!("event {actor_op} should be handled separately in async context")
493 }
494 LocalActorOperation::TakeReceiver {
495 database_id,
496 term_id,
497 ids,
498 result_sender,
499 } => {
500 let err = if self.term_id != term_id {
501 {
502 warn!(
503 ?ids,
504 term_id,
505 current_term_id = self.term_id,
506 "take receiver on unmatched term_id"
507 );
508 anyhow!(
509 "take receiver {:?} on unmatched term_id {} to current term_id {}",
510 ids,
511 term_id,
512 self.term_id
513 )
514 }
515 } else {
516 match self.state.databases.entry(database_id) {
517 Entry::Occupied(mut entry) => match entry.get_mut() {
518 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
519 pending_requests.push((ids, result_sender));
520 return;
521 }
522 DatabaseStatus::Running(database) => {
523 let (upstream_actor_id, actor_id) = ids;
524 database.new_actor_remote_output_request(
525 actor_id,
526 upstream_actor_id,
527 result_sender,
528 );
529 return;
530 }
531 DatabaseStatus::Suspended(_) => {
532 anyhow!("database suspended")
533 }
534 DatabaseStatus::Resetting(_) => {
535 anyhow!("database resetting")
536 }
537 DatabaseStatus::Unspecified => {
538 unreachable!()
539 }
540 },
541 Entry::Vacant(entry) => {
542 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
543 ids,
544 result_sender,
545 )]));
546 return;
547 }
548 }
549 };
550 let _ = result_sender.send(Err(err.into()));
551 }
552 #[cfg(test)]
553 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
554 let database_status = self
555 .state
556 .databases
557 .get(&crate::task::TEST_DATABASE_ID)
558 .unwrap();
559 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
560 let _ = sender.send(database_state.local_barrier_manager.clone());
561 }
562 #[cfg(test)]
563 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
564 let database_status = self
565 .state
566 .databases
567 .get_mut(&crate::task::TEST_DATABASE_ID)
568 .unwrap();
569
570 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
571 assert!(!database_state.actor_states.contains_key(&actor_id));
572 let requests = database_state
573 .actor_pending_new_output_requests
574 .remove(&actor_id)
575 .unwrap();
576 let _ = sender.send(requests);
577 }
578 #[cfg(test)]
579 LocalActorOperation::Flush(sender) => {
580 use futures::FutureExt;
581 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
582 self.handle_streaming_control_request(
583 request.request.expect("should not be empty"),
584 )
585 .unwrap();
586 }
587 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
588 match event {
589 ManagedBarrierStateEvent::BarrierCollected {
590 partial_graph_id,
591 barrier,
592 } => {
593 self.complete_barrier(
594 database_id,
595 partial_graph_id,
596 barrier.epoch.prev,
597 );
598 }
599 ManagedBarrierStateEvent::ActorError { .. }
600 | ManagedBarrierStateEvent::DatabaseReset(..) => {
601 unreachable!()
602 }
603 }
604 }
605 sender.send(()).unwrap()
606 }
607 LocalActorOperation::InspectState { result_sender } => {
608 let debug_info = self.to_debug_info();
609 let _ = result_sender.send(debug_info.to_string());
610 }
611 }
612 }
613}
614
615mod await_epoch_completed_future {
616 use std::future::Future;
617
618 use futures::FutureExt;
619 use futures::future::BoxFuture;
620 use risingwave_hummock_sdk::SyncResult;
621 use risingwave_pb::stream_service::barrier_complete_response::{
622 PbCdcTableBackfillProgress, PbCreateMviewProgress,
623 };
624
625 use crate::error::StreamResult;
626 use crate::executor::Barrier;
627 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
628
629 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
630 + 'static;
631
632 #[define_opaque(AwaitEpochCompletedFuture)]
633 pub(super) fn instrument_complete_barrier_future(
634 partial_graph_id: PartialGraphId,
635 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
636 barrier: Barrier,
637 barrier_await_tree_reg: Option<&await_tree::Registry>,
638 create_mview_progress: Vec<PbCreateMviewProgress>,
639 load_finished_source_ids: Vec<u32>,
640 cdc_table_backfill_progress: Vec<PbCdcTableBackfillProgress>,
641 ) -> AwaitEpochCompletedFuture {
642 let prev_epoch = barrier.epoch.prev;
643 let future = async move {
644 if let Some(future) = complete_barrier_future {
645 let result = future.await;
646 result.map(Some)
647 } else {
648 Ok(None)
649 }
650 }
651 .map(move |result| {
652 (
653 partial_graph_id,
654 barrier,
655 result.map(|sync_result| BarrierCompleteResult {
656 sync_result,
657 create_mview_progress,
658 load_finished_source_ids,
659 cdc_table_backfill_progress,
660 }),
661 )
662 });
663 if let Some(reg) = barrier_await_tree_reg {
664 reg.register(
665 await_tree_key::BarrierAwait { prev_epoch },
666 format!("SyncEpoch({})", prev_epoch),
667 )
668 .instrument(future)
669 .left_future()
670 } else {
671 future.right_future()
672 }
673 }
674}
675
676use await_epoch_completed_future::*;
677use risingwave_common::catalog::{DatabaseId, TableId};
678use risingwave_pb::hummock::vector_index_delta::PbVectorIndexAdds;
679use risingwave_storage::{StateStoreImpl, dispatch_state_store};
680
681use crate::executor::exchange::permit;
682
683fn sync_epoch(
684 state_store: &StateStoreImpl,
685 streaming_metrics: &StreamingMetrics,
686 prev_epoch: u64,
687 table_ids: HashSet<TableId>,
688) -> BoxFuture<'static, StreamResult<SyncResult>> {
689 let timer = streaming_metrics.barrier_sync_latency.start_timer();
690
691 let state_store = state_store.clone();
692 let future = async move {
693 dispatch_state_store!(state_store, hummock, {
694 hummock.sync(vec![(prev_epoch, table_ids)]).await
695 })
696 };
697
698 future
699 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
700 .inspect_ok(move |_| {
701 timer.observe_duration();
702 })
703 .map_err(move |e| {
704 tracing::error!(
705 prev_epoch,
706 error = %e.as_report(),
707 "Failed to sync state store",
708 );
709 e.into()
710 })
711 .boxed()
712}
713
714impl LocalBarrierWorker {
715 fn complete_barrier(
716 &mut self,
717 database_id: DatabaseId,
718 partial_graph_id: PartialGraphId,
719 prev_epoch: u64,
720 ) {
721 {
722 let Some(database_state) = self
723 .state
724 .databases
725 .get_mut(&database_id)
726 .expect("should exist")
727 .state_for_request()
728 else {
729 return;
730 };
731 let (
732 barrier,
733 table_ids,
734 create_mview_progress,
735 load_finished_source_ids,
736 cdc_table_backfill_progress,
737 ) = database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
738
739 let complete_barrier_future = match &barrier.kind {
740 BarrierKind::Unspecified => unreachable!(),
741 BarrierKind::Initial => {
742 tracing::info!(
743 epoch = prev_epoch,
744 "ignore sealing data for the first barrier"
745 );
746 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
747 None
748 }
749 BarrierKind::Barrier => None,
750 BarrierKind::Checkpoint => Some(sync_epoch(
751 &self.actor_manager.env.state_store(),
752 &self.actor_manager.streaming_metrics,
753 prev_epoch,
754 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
755 )),
756 };
757
758 self.await_epoch_completed_futures
759 .entry(database_id)
760 .or_default()
761 .push_back({
762 instrument_complete_barrier_future(
763 partial_graph_id,
764 complete_barrier_future,
765 barrier,
766 self.actor_manager.await_tree_reg.as_ref(),
767 create_mview_progress,
768 load_finished_source_ids,
769 cdc_table_backfill_progress,
770 )
771 });
772 }
773 }
774
775 fn on_epoch_completed(
776 &mut self,
777 database_id: DatabaseId,
778 partial_graph_id: PartialGraphId,
779 epoch: u64,
780 result: BarrierCompleteResult,
781 ) {
782 let BarrierCompleteResult {
783 create_mview_progress,
784 sync_result,
785 load_finished_source_ids,
786 cdc_table_backfill_progress,
787 } = result;
788
789 let (synced_sstables, table_watermarks, old_value_ssts, vector_index_adds) = sync_result
790 .map(|sync_result| {
791 (
792 sync_result.uncommitted_ssts,
793 sync_result.table_watermarks,
794 sync_result.old_value_ssts,
795 sync_result.vector_index_adds,
796 )
797 })
798 .unwrap_or_default();
799
800 let result = {
801 {
802 streaming_control_stream_response::Response::CompleteBarrier(
803 BarrierCompleteResponse {
804 request_id: "todo".to_owned(),
805 partial_graph_id: partial_graph_id.into(),
806 epoch,
807 status: None,
808 create_mview_progress,
809 synced_sstables: synced_sstables
810 .into_iter()
811 .map(
812 |LocalSstableInfo {
813 sst_info,
814 table_stats,
815 created_at,
816 }| PbLocalSstableInfo {
817 sst: Some(sst_info.into()),
818 table_stats_map: to_prost_table_stats_map(table_stats),
819 created_at,
820 },
821 )
822 .collect_vec(),
823 worker_id: self.actor_manager.env.worker_id(),
824 table_watermarks: table_watermarks
825 .into_iter()
826 .map(|(key, value)| (key.table_id, value.into()))
827 .collect(),
828 old_value_sstables: old_value_ssts
829 .into_iter()
830 .map(|sst| sst.sst_info.into())
831 .collect(),
832 database_id: database_id.database_id,
833 load_finished_source_ids,
834 vector_index_adds: vector_index_adds
835 .into_iter()
836 .map(|(table_id, adds)| {
837 (
838 table_id.table_id,
839 PbVectorIndexAdds {
840 adds: adds.into_iter().map(|add| add.into()).collect(),
841 },
842 )
843 })
844 .collect(),
845 cdc_table_backfill_progress,
846 },
847 )
848 }
849 };
850
851 self.control_stream_handle.send_response(result);
852 }
853
854 fn send_barrier(
861 &mut self,
862 barrier: &Barrier,
863 request: InjectBarrierRequest,
864 ) -> StreamResult<()> {
865 debug!(
866 target: "events::stream::barrier::manager::send",
867 "send barrier {:?}, actor_ids_to_collect = {:?}",
868 barrier,
869 request.actor_ids_to_collect
870 );
871
872 let database_status = self
873 .state
874 .databases
875 .get_mut(&DatabaseId::new(request.database_id))
876 .expect("should exist");
877 if let Some(state) = database_status.state_for_request() {
878 state.transform_to_issued(barrier, request)?;
879 }
880 Ok(())
881 }
882
883 fn remove_partial_graphs(
884 &mut self,
885 database_id: DatabaseId,
886 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
887 ) {
888 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
889 warn!(
890 database_id = database_id.database_id,
891 "database to remove partial graph not exist"
892 );
893 return;
894 };
895 let Some(database_state) = database_status.state_for_request() else {
896 warn!(
897 database_id = database_id.database_id,
898 "ignore remove partial graph request on err database",
899 );
900 return;
901 };
902 for partial_graph_id in partial_graph_ids {
903 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
904 assert!(
905 graph.is_empty(),
906 "non empty graph to be removed: {}",
907 &graph
908 );
909 } else {
910 warn!(
911 partial_graph_id = partial_graph_id.0,
912 "no partial graph to remove"
913 );
914 }
915 }
916 }
917
918 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
919 let status = match self.state.databases.entry(database_id) {
920 Entry::Occupied(entry) => {
921 let status = entry.into_mut();
922 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
923 let mut database = DatabaseManagedBarrierState::new(
924 database_id,
925 self.term_id.clone(),
926 self.actor_manager.clone(),
927 vec![],
928 );
929 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
930 {
931 database.new_actor_remote_output_request(
932 actor_id,
933 upstream_actor_id,
934 result_sender,
935 );
936 }
937 *status = DatabaseStatus::Running(database);
938 }
939
940 status
941 }
942 Entry::Vacant(entry) => {
943 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
944 database_id,
945 self.term_id.clone(),
946 self.actor_manager.clone(),
947 vec![],
948 )))
949 }
950 };
951 if let Some(state) = status.state_for_request() {
952 assert!(
953 state
954 .graph_states
955 .insert(
956 partial_graph_id,
957 PartialGraphManagedBarrierState::new(&self.actor_manager)
958 )
959 .is_none()
960 );
961 }
962 }
963
964 fn reset_database(&mut self, req: ResetDatabaseRequest) {
965 let database_id = DatabaseId::new(req.database_id);
966 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
967 database_status.start_reset(
968 database_id,
969 self.await_epoch_completed_futures.remove(&database_id),
970 req.reset_request_id,
971 );
972 } else {
973 self.ack_database_reset(database_id, None, req.reset_request_id);
974 }
975 }
976
977 fn ack_database_reset(
978 &mut self,
979 database_id: DatabaseId,
980 reset_output: Option<ResetDatabaseOutput>,
981 reset_request_id: u32,
982 ) {
983 info!(
984 database_id = database_id.database_id,
985 "database reset successfully"
986 );
987 if let Some(reset_database) = self.state.databases.remove(&database_id) {
988 match reset_database {
989 DatabaseStatus::Resetting(_) => {}
990 _ => {
991 unreachable!("must be resetting previously")
992 }
993 }
994 }
995 self.await_epoch_completed_futures.remove(&database_id);
996 self.control_stream_handle.ack_reset_database(
997 database_id,
998 reset_output.and_then(|output| output.root_err),
999 reset_request_id,
1000 );
1001 }
1002
1003 fn on_database_failure(
1007 &mut self,
1008 database_id: DatabaseId,
1009 failed_actor: Option<ActorId>,
1010 err: StreamError,
1011 message: impl Into<String>,
1012 ) {
1013 let message = message.into();
1014 error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
1015 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
1016 self.state
1017 .databases
1018 .get_mut(&database_id)
1019 .expect("should exist")
1020 .suspend(failed_actor, err, completing_futures);
1021 self.control_stream_handle
1022 .send_response(Response::ReportDatabaseFailure(
1023 ReportDatabaseFailureResponse {
1024 database_id: database_id.database_id,
1025 },
1026 ));
1027 }
1028
1029 async fn reset(&mut self, init_request: InitRequest) {
1031 join_all(
1032 self.state
1033 .databases
1034 .values_mut()
1035 .map(|database| database.abort()),
1036 )
1037 .await;
1038 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1039 m.clear();
1040 }
1041
1042 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1043 hummock
1044 .clear_shared_buffer()
1045 .instrument_await("store_clear_shared_buffer".verbose())
1046 .await
1047 }
1048 self.actor_manager.env.dml_manager_ref().clear();
1049 *self = Self::new(
1050 self.actor_manager.clone(),
1051 init_request.databases,
1052 init_request.term_id,
1053 );
1054 self.actor_manager.env.client_pool().invalidate_all();
1055 }
1056
1057 pub fn spawn(
1059 env: StreamEnvironment,
1060 streaming_metrics: Arc<StreamingMetrics>,
1061 await_tree_reg: Option<await_tree::Registry>,
1062 watermark_epoch: AtomicU64Ref,
1063 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1064 ) -> JoinHandle<()> {
1065 let runtime = {
1066 let mut builder = tokio::runtime::Builder::new_multi_thread();
1067 if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1068 builder.worker_threads(worker_threads_num);
1069 }
1070 builder
1071 .thread_name("rw-streaming")
1072 .enable_all()
1073 .build()
1074 .unwrap()
1075 };
1076
1077 let actor_manager = Arc::new(StreamActorManager {
1078 env: env.clone(),
1079 streaming_metrics,
1080 watermark_epoch,
1081 await_tree_reg,
1082 runtime: runtime.into(),
1083 });
1084 let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1085 tokio::spawn(worker.run(actor_op_rx))
1086 }
1087}
1088
1089pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1090
1091impl<T> Clone for EventSender<T> {
1092 fn clone(&self) -> Self {
1093 Self(self.0.clone())
1094 }
1095}
1096
1097impl<T> EventSender<T> {
1098 pub(super) fn send_event(&self, event: T) {
1099 self.0.send(event).expect("should be able to send event")
1100 }
1101
1102 pub(super) async fn send_and_await<RSP>(
1103 &self,
1104 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1105 ) -> StreamResult<RSP> {
1106 let (tx, rx) = oneshot::channel();
1107 let event = make_event(tx);
1108 self.send_event(event);
1109 rx.await
1110 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1111 }
1112}
1113
1114pub(crate) enum NewOutputRequest {
1115 Local(permit::Sender),
1116 Remote(permit::Sender),
1117}
1118
1119#[cfg(test)]
1120pub(crate) mod barrier_test_utils {
1121 use assert_matches::assert_matches;
1122 use futures::StreamExt;
1123 use risingwave_pb::stream_service::streaming_control_stream_request::{
1124 InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1125 };
1126 use risingwave_pb::stream_service::{
1127 InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1128 streaming_control_stream_request, streaming_control_stream_response,
1129 };
1130 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1131 use tokio::sync::oneshot;
1132 use tokio_stream::wrappers::UnboundedReceiverStream;
1133 use tonic::Status;
1134
1135 use crate::executor::Barrier;
1136 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1137 use crate::task::{
1138 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1139 };
1140
1141 pub(crate) struct LocalBarrierTestEnv {
1142 pub local_barrier_manager: LocalBarrierManager,
1143 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1144 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1145 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1146 }
1147
1148 impl LocalBarrierTestEnv {
1149 pub(crate) async fn for_test() -> Self {
1150 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1151
1152 let (request_tx, request_rx) = unbounded_channel();
1153 let (response_tx, mut response_rx) = unbounded_channel();
1154
1155 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1156 handle: ControlStreamHandle::new(
1157 response_tx,
1158 UnboundedReceiverStream::new(request_rx).boxed(),
1159 ),
1160 init_request: InitRequest {
1161 databases: vec![PbDatabaseInitialPartialGraph {
1162 database_id: TEST_DATABASE_ID.database_id,
1163 graphs: vec![PbInitialPartialGraph {
1164 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1165 subscriptions: vec![],
1166 }],
1167 }],
1168 term_id: "for_test".into(),
1169 },
1170 });
1171
1172 assert_matches!(
1173 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1174 streaming_control_stream_response::Response::Init(_)
1175 );
1176
1177 let local_barrier_manager = actor_op_tx
1178 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1179 .await
1180 .unwrap();
1181
1182 Self {
1183 local_barrier_manager,
1184 actor_op_tx,
1185 request_tx,
1186 response_rx,
1187 }
1188 }
1189
1190 pub(crate) fn inject_barrier(
1191 &self,
1192 barrier: &Barrier,
1193 actor_to_collect: impl IntoIterator<Item = ActorId>,
1194 ) {
1195 self.request_tx
1196 .send(Ok(StreamingControlStreamRequest {
1197 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1198 InjectBarrierRequest {
1199 request_id: "".to_owned(),
1200 barrier: Some(barrier.to_protobuf()),
1201 database_id: TEST_DATABASE_ID.database_id,
1202 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1203 table_ids_to_sync: vec![],
1204 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1205 actors_to_build: vec![],
1206 subscriptions_to_add: vec![],
1207 subscriptions_to_remove: vec![],
1208 },
1209 )),
1210 }))
1211 .unwrap();
1212 }
1213
1214 pub(crate) async fn flush_all_events(&self) {
1215 Self::flush_all_events_impl(&self.actor_op_tx).await
1216 }
1217
1218 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1219 let (tx, rx) = oneshot::channel();
1220 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1221 rx.await.unwrap()
1222 }
1223
1224 pub(crate) async fn take_pending_new_output_requests(
1225 &self,
1226 actor_id: ActorId,
1227 ) -> Vec<(ActorId, NewOutputRequest)> {
1228 self.actor_op_tx
1229 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1230 .await
1231 .unwrap()
1232 }
1233 }
1234}