1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Display;
18use std::future::{pending, poll_fn};
19use std::sync::Arc;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::{BoxFuture, join_all};
25use futures::stream::{BoxStream, FuturesOrdered};
26use futures::{FutureExt, StreamExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_pb::stream_plan::barrier::BarrierKind;
29use risingwave_pb::stream_service::barrier_complete_response::{
30 PbCreateMviewProgress, PbLocalSstableInfo,
31};
32use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
33use risingwave_storage::store_impl::AsHummock;
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot;
38use tokio::task::JoinHandle;
39use tonic::{Code, Status};
40use tracing::warn;
41
42use self::managed_state::ManagedBarrierState;
43use crate::error::{ScoredStreamError, StreamError, StreamResult};
44#[cfg(test)]
45use crate::task::LocalBarrierManager;
46use crate::task::{
47 ActorId, AtomicU64Ref, PartialGraphId, StreamActorManager, StreamEnvironment, UpDownActorIds,
48};
49
50pub mod managed_state;
51#[cfg(test)]
52mod tests;
53
54use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
55use risingwave_hummock_sdk::{LocalSstableInfo, SyncResult};
56use risingwave_pb::stream_service::streaming_control_stream_request::{
57 DatabaseInitialPartialGraph, InitRequest, Request, ResetDatabaseRequest,
58};
59use risingwave_pb::stream_service::streaming_control_stream_response::{
60 InitResponse, ReportDatabaseFailureResponse, ResetDatabaseResponse, Response, ShutdownResponse,
61};
62use risingwave_pb::stream_service::{
63 BarrierCompleteResponse, InjectBarrierRequest, PbScoredError, StreamingControlStreamRequest,
64 StreamingControlStreamResponse, streaming_control_stream_response,
65};
66
67use crate::executor::Barrier;
68use crate::executor::exchange::permit::Receiver;
69use crate::executor::monitor::StreamingMetrics;
70use crate::task::barrier_worker::managed_state::{
71 DatabaseManagedBarrierState, DatabaseStatus, ManagedBarrierStateDebugInfo,
72 ManagedBarrierStateEvent, PartialGraphManagedBarrierState, ResetDatabaseOutput,
73};
74
75pub const ENABLE_BARRIER_AGGREGATION: bool = false;
78
79#[derive(Debug)]
81pub struct BarrierCompleteResult {
82 pub sync_result: Option<SyncResult>,
84
85 pub create_mview_progress: Vec<PbCreateMviewProgress>,
87}
88
89pub(super) struct ControlStreamHandle {
93 #[expect(clippy::type_complexity)]
94 pair: Option<(
95 UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
96 BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
97 )>,
98}
99
100impl ControlStreamHandle {
101 fn empty() -> Self {
102 Self { pair: None }
103 }
104
105 pub(super) fn new(
106 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
107 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
108 ) -> Self {
109 Self {
110 pair: Some((sender, request_stream)),
111 }
112 }
113
114 pub(super) fn connected(&self) -> bool {
115 self.pair.is_some()
116 }
117
118 fn reset_stream_with_err(&mut self, err: Status) {
119 if let Some((sender, _)) = self.pair.take() {
120 let err = TonicStatusWrapper::new(err);
122 warn!(error = %err.as_report(), "control stream reset with error");
123
124 let err = err.into_inner();
125 if sender.send(Err(err)).is_err() {
126 warn!("failed to notify reset of control stream");
127 }
128 }
129 }
130
131 async fn shutdown_stream(&mut self) {
134 if let Some((sender, _)) = self.pair.take() {
135 if sender
136 .send(Ok(StreamingControlStreamResponse {
137 response: Some(streaming_control_stream_response::Response::Shutdown(
138 ShutdownResponse::default(),
139 )),
140 }))
141 .is_err()
142 {
143 warn!("failed to notify shutdown of control stream");
144 } else {
145 tracing::info!("waiting for meta service to close control stream...");
146
147 sender.closed().await;
154 }
155 } else {
156 debug!("control stream has been reset, ignore shutdown");
157 }
158 }
159
160 pub(super) fn ack_reset_database(
161 &mut self,
162 database_id: DatabaseId,
163 root_err: Option<ScoredStreamError>,
164 reset_request_id: u32,
165 ) {
166 self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
167 database_id: database_id.database_id,
168 root_err: root_err.map(|err| PbScoredError {
169 err_msg: err.error.to_report_string(),
170 score: err.score.0,
171 }),
172 reset_request_id,
173 }));
174 }
175
176 fn send_response(&mut self, response: streaming_control_stream_response::Response) {
177 if let Some((sender, _)) = self.pair.as_ref() {
178 if sender
179 .send(Ok(StreamingControlStreamResponse {
180 response: Some(response),
181 }))
182 .is_err()
183 {
184 self.pair = None;
185 warn!("fail to send response. control stream reset");
186 }
187 } else {
188 debug!(?response, "control stream has been reset. ignore response");
189 }
190 }
191
192 async fn next_request(&mut self) -> StreamingControlStreamRequest {
193 if let Some((_, stream)) = &mut self.pair {
194 match stream.next().await {
195 Some(Ok(request)) => {
196 return request;
197 }
198 Some(Err(e)) => self.reset_stream_with_err(
199 anyhow!(TonicStatusWrapper::new(e)) .context("failed to get request")
201 .to_status_unnamed(Code::Internal),
202 ),
203 None => self.reset_stream_with_err(Status::internal("end of stream")),
204 }
205 }
206 pending().await
207 }
208}
209
210#[derive(strum_macros::Display)]
214pub(super) enum LocalActorOperation {
215 NewControlStream {
216 handle: ControlStreamHandle,
217 init_request: InitRequest,
218 },
219 TakeReceiver {
220 database_id: DatabaseId,
221 term_id: String,
222 ids: UpDownActorIds,
223 result_sender: oneshot::Sender<StreamResult<Receiver>>,
224 },
225 #[cfg(test)]
226 GetCurrentLocalBarrierManager(oneshot::Sender<LocalBarrierManager>),
227 #[cfg(test)]
228 TakePendingNewOutputRequest(ActorId, oneshot::Sender<Vec<(ActorId, NewOutputRequest)>>),
229 #[cfg(test)]
230 Flush(oneshot::Sender<()>),
231 InspectState {
232 result_sender: oneshot::Sender<String>,
233 },
234 Shutdown {
235 result_sender: oneshot::Sender<()>,
236 },
237}
238
239pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
240 managed_barrier_state: HashMap<DatabaseId, (String, Option<ManagedBarrierStateDebugInfo<'a>>)>,
241 has_control_stream_connected: bool,
242}
243
244impl Display for LocalBarrierWorkerDebugInfo<'_> {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 writeln!(
247 f,
248 "\nhas_control_stream_connected: {}",
249 self.has_control_stream_connected
250 )?;
251
252 for (database_id, (status, managed_barrier_state)) in &self.managed_barrier_state {
253 writeln!(
254 f,
255 "database {} status: {} managed_barrier_state:\n{}",
256 database_id.database_id,
257 status,
258 managed_barrier_state
259 .as_ref()
260 .map(ToString::to_string)
261 .unwrap_or_default()
262 )?;
263 }
264 Ok(())
265 }
266}
267
268pub(super) struct LocalBarrierWorker {
276 pub(super) state: ManagedBarrierState,
278
279 await_epoch_completed_futures: HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
281
282 control_stream_handle: ControlStreamHandle,
283
284 pub(super) actor_manager: Arc<StreamActorManager>,
285
286 pub(super) term_id: String,
287}
288
289impl LocalBarrierWorker {
290 pub(super) fn new(
291 actor_manager: Arc<StreamActorManager>,
292 initial_partial_graphs: Vec<DatabaseInitialPartialGraph>,
293 term_id: String,
294 ) -> Self {
295 let state = ManagedBarrierState::new(
296 actor_manager.clone(),
297 initial_partial_graphs,
298 term_id.clone(),
299 );
300 Self {
301 state,
302 await_epoch_completed_futures: Default::default(),
303 control_stream_handle: ControlStreamHandle::empty(),
304 actor_manager,
305 term_id,
306 }
307 }
308
309 fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> {
310 LocalBarrierWorkerDebugInfo {
311 managed_barrier_state: self
312 .state
313 .databases
314 .iter()
315 .map(|(database_id, status)| {
316 (*database_id, {
317 match status {
318 DatabaseStatus::ReceivedExchangeRequest(_) => {
319 ("ReceivedExchangeRequest".to_owned(), None)
320 }
321 DatabaseStatus::Running(state) => {
322 ("running".to_owned(), Some(state.to_debug_info()))
323 }
324 DatabaseStatus::Suspended(state) => {
325 (format!("suspended: {:?}", state.suspend_time), None)
326 }
327 DatabaseStatus::Resetting(_) => ("resetting".to_owned(), None),
328 DatabaseStatus::Unspecified => {
329 unreachable!()
330 }
331 }
332 })
333 })
334 .collect(),
335 has_control_stream_connected: self.control_stream_handle.connected(),
336 }
337 }
338
339 async fn next_completed_epoch(
340 futures: &mut HashMap<DatabaseId, FuturesOrdered<AwaitEpochCompletedFuture>>,
341 ) -> (
342 DatabaseId,
343 PartialGraphId,
344 Barrier,
345 StreamResult<BarrierCompleteResult>,
346 ) {
347 poll_fn(|cx| {
348 for (database_id, futures) in &mut *futures {
349 if let Poll::Ready(Some((partial_graph_id, barrier, result))) =
350 futures.poll_next_unpin(cx)
351 {
352 return Poll::Ready((*database_id, partial_graph_id, barrier, result));
353 }
354 }
355 Poll::Pending
356 })
357 .await
358 }
359
360 async fn run(mut self, mut actor_op_rx: UnboundedReceiver<LocalActorOperation>) {
361 loop {
362 select! {
363 biased;
364 (database_id, event) = self.state.next_event() => {
365 match event {
366 ManagedBarrierStateEvent::BarrierCollected{
367 partial_graph_id,
368 barrier,
369 } => {
370 self.complete_barrier(database_id, partial_graph_id, barrier.epoch.prev);
371 }
372 ManagedBarrierStateEvent::ActorError{
373 actor_id,
374 err,
375 } => {
376 self.on_database_failure(database_id, Some(actor_id), err, "recv actor failure");
377 }
378 ManagedBarrierStateEvent::DatabaseReset(output, reset_request_id) => {
379 self.ack_database_reset(database_id, Some(output), reset_request_id);
380 }
381 }
382 }
383 (database_id, partial_graph_id, barrier, result) = Self::next_completed_epoch(&mut self.await_epoch_completed_futures) => {
384 match result {
385 Ok(result) => {
386 self.on_epoch_completed(database_id, partial_graph_id, barrier.epoch.prev, result);
387 }
388 Err(err) => {
389 self.control_stream_handle.reset_stream_with_err(Status::internal(format!("failed to complete epoch: {} {} {:?} {:?}", database_id, partial_graph_id.0, barrier.epoch, err.as_report())));
393 }
394 }
395 },
396 actor_op = actor_op_rx.recv() => {
397 if let Some(actor_op) = actor_op {
398 match actor_op {
399 LocalActorOperation::NewControlStream { handle, init_request } => {
400 self.control_stream_handle.reset_stream_with_err(Status::internal("control stream has been reset to a new one"));
401 self.reset(init_request).await;
402 self.control_stream_handle = handle;
403 self.control_stream_handle.send_response(streaming_control_stream_response::Response::Init(InitResponse {}));
404 }
405 LocalActorOperation::Shutdown { result_sender } => {
406 if self.state.databases.values().any(|database| {
407 match database {
408 DatabaseStatus::Running(database) => {
409 !database.actor_states.is_empty()
410 }
411 DatabaseStatus::Suspended(_) | DatabaseStatus::Resetting(_) |
412 DatabaseStatus::ReceivedExchangeRequest(_) => {
413 false
414 }
415 DatabaseStatus::Unspecified => {
416 unreachable!()
417 }
418 }
419 }) {
420 tracing::warn!(
421 "shutdown with running actors, scaling or migration will be triggered"
422 );
423 }
424 self.control_stream_handle.shutdown_stream().await;
425 let _ = result_sender.send(());
426 }
427 actor_op => {
428 self.handle_actor_op(actor_op);
429 }
430 }
431 }
432 else {
433 break;
434 }
435 },
436 request = self.control_stream_handle.next_request() => {
437 let result = self.handle_streaming_control_request(request.request.expect("non empty"));
438 if let Err((database_id, err)) = result {
439 self.on_database_failure(database_id, None, err, "failed to inject barrier");
440 }
441 },
442 }
443 }
444 }
445
446 fn handle_streaming_control_request(
447 &mut self,
448 request: Request,
449 ) -> Result<(), (DatabaseId, StreamError)> {
450 match request {
451 Request::InjectBarrier(req) => {
452 let database_id = DatabaseId::new(req.database_id);
453 let result: StreamResult<()> = try {
454 let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?;
455 self.send_barrier(&barrier, req)?;
456 };
457 result.map_err(|e| (database_id, e))?;
458 Ok(())
459 }
460 Request::RemovePartialGraph(req) => {
461 self.remove_partial_graphs(
462 DatabaseId::new(req.database_id),
463 req.partial_graph_ids.into_iter().map(PartialGraphId::new),
464 );
465 Ok(())
466 }
467 Request::CreatePartialGraph(req) => {
468 self.add_partial_graph(
469 DatabaseId::new(req.database_id),
470 PartialGraphId::new(req.partial_graph_id),
471 );
472 Ok(())
473 }
474 Request::ResetDatabase(req) => {
475 self.reset_database(req);
476 Ok(())
477 }
478 Request::Init(_) => {
479 unreachable!()
480 }
481 }
482 }
483
484 fn handle_actor_op(&mut self, actor_op: LocalActorOperation) {
485 match actor_op {
486 LocalActorOperation::NewControlStream { .. } | LocalActorOperation::Shutdown { .. } => {
487 unreachable!("event {actor_op} should be handled separately in async context")
488 }
489 LocalActorOperation::TakeReceiver {
490 database_id,
491 term_id,
492 ids,
493 result_sender,
494 } => {
495 let err = if self.term_id != term_id {
496 {
497 warn!(
498 ?ids,
499 term_id,
500 current_term_id = self.term_id,
501 "take receiver on unmatched term_id"
502 );
503 anyhow!(
504 "take receiver {:?} on unmatched term_id {} to current term_id {}",
505 ids,
506 term_id,
507 self.term_id
508 )
509 }
510 } else {
511 match self.state.databases.entry(database_id) {
512 Entry::Occupied(mut entry) => match entry.get_mut() {
513 DatabaseStatus::ReceivedExchangeRequest(pending_requests) => {
514 pending_requests.push((ids, result_sender));
515 return;
516 }
517 DatabaseStatus::Running(database) => {
518 let (upstream_actor_id, actor_id) = ids;
519 database.new_actor_remote_output_request(
520 actor_id,
521 upstream_actor_id,
522 result_sender,
523 );
524 return;
525 }
526 DatabaseStatus::Suspended(_) => {
527 anyhow!("database suspended")
528 }
529 DatabaseStatus::Resetting(_) => {
530 anyhow!("database resetting")
531 }
532 DatabaseStatus::Unspecified => {
533 unreachable!()
534 }
535 },
536 Entry::Vacant(entry) => {
537 entry.insert(DatabaseStatus::ReceivedExchangeRequest(vec![(
538 ids,
539 result_sender,
540 )]));
541 return;
542 }
543 }
544 };
545 let _ = result_sender.send(Err(err.into()));
546 }
547 #[cfg(test)]
548 LocalActorOperation::GetCurrentLocalBarrierManager(sender) => {
549 let database_status = self
550 .state
551 .databases
552 .get(&crate::task::TEST_DATABASE_ID)
553 .unwrap();
554 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
555 let _ = sender.send(database_state.local_barrier_manager.clone());
556 }
557 #[cfg(test)]
558 LocalActorOperation::TakePendingNewOutputRequest(actor_id, sender) => {
559 let database_status = self
560 .state
561 .databases
562 .get_mut(&crate::task::TEST_DATABASE_ID)
563 .unwrap();
564
565 let database_state = risingwave_common::must_match!(database_status, DatabaseStatus::Running(database_state) => database_state);
566 assert!(!database_state.actor_states.contains_key(&actor_id));
567 let requests = database_state
568 .actor_pending_new_output_requests
569 .remove(&actor_id)
570 .unwrap();
571 let _ = sender.send(requests);
572 }
573 #[cfg(test)]
574 LocalActorOperation::Flush(sender) => {
575 use futures::FutureExt;
576 while let Some(request) = self.control_stream_handle.next_request().now_or_never() {
577 self.handle_streaming_control_request(
578 request.request.expect("should not be empty"),
579 )
580 .unwrap();
581 }
582 while let Some((database_id, event)) = self.state.next_event().now_or_never() {
583 match event {
584 ManagedBarrierStateEvent::BarrierCollected {
585 partial_graph_id,
586 barrier,
587 } => {
588 self.complete_barrier(
589 database_id,
590 partial_graph_id,
591 barrier.epoch.prev,
592 );
593 }
594 ManagedBarrierStateEvent::ActorError { .. }
595 | ManagedBarrierStateEvent::DatabaseReset(..) => {
596 unreachable!()
597 }
598 }
599 }
600 sender.send(()).unwrap()
601 }
602 LocalActorOperation::InspectState { result_sender } => {
603 let debug_info = self.to_debug_info();
604 let _ = result_sender.send(debug_info.to_string());
605 }
606 }
607 }
608}
609
610mod await_epoch_completed_future {
611 use std::future::Future;
612
613 use futures::FutureExt;
614 use futures::future::BoxFuture;
615 use risingwave_hummock_sdk::SyncResult;
616 use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
617
618 use crate::error::StreamResult;
619 use crate::executor::Barrier;
620 use crate::task::{BarrierCompleteResult, PartialGraphId, await_tree_key};
621
622 pub(super) type AwaitEpochCompletedFuture = impl Future<Output = (PartialGraphId, Barrier, StreamResult<BarrierCompleteResult>)>
623 + 'static;
624
625 #[define_opaque(AwaitEpochCompletedFuture)]
626 pub(super) fn instrument_complete_barrier_future(
627 partial_graph_id: PartialGraphId,
628 complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
629 barrier: Barrier,
630 barrier_await_tree_reg: Option<&await_tree::Registry>,
631 create_mview_progress: Vec<PbCreateMviewProgress>,
632 ) -> AwaitEpochCompletedFuture {
633 let prev_epoch = barrier.epoch.prev;
634 let future = async move {
635 if let Some(future) = complete_barrier_future {
636 let result = future.await;
637 result.map(Some)
638 } else {
639 Ok(None)
640 }
641 }
642 .map(move |result| {
643 (
644 partial_graph_id,
645 barrier,
646 result.map(|sync_result| BarrierCompleteResult {
647 sync_result,
648 create_mview_progress,
649 }),
650 )
651 });
652 if let Some(reg) = barrier_await_tree_reg {
653 reg.register(
654 await_tree_key::BarrierAwait { prev_epoch },
655 format!("SyncEpoch({})", prev_epoch),
656 )
657 .instrument(future)
658 .left_future()
659 } else {
660 future.right_future()
661 }
662 }
663}
664
665use await_epoch_completed_future::*;
666use risingwave_common::catalog::{DatabaseId, TableId};
667use risingwave_storage::{StateStoreImpl, dispatch_state_store};
668
669use crate::executor::exchange::permit;
670
671fn sync_epoch(
672 state_store: &StateStoreImpl,
673 streaming_metrics: &StreamingMetrics,
674 prev_epoch: u64,
675 table_ids: HashSet<TableId>,
676) -> BoxFuture<'static, StreamResult<SyncResult>> {
677 let timer = streaming_metrics.barrier_sync_latency.start_timer();
678
679 let state_store = state_store.clone();
680 let future = async move {
681 dispatch_state_store!(state_store, hummock, {
682 hummock.sync(vec![(prev_epoch, table_ids)]).await
683 })
684 };
685
686 future
687 .instrument_await(await_tree::span!("sync_epoch (epoch {})", prev_epoch))
688 .inspect_ok(move |_| {
689 timer.observe_duration();
690 })
691 .map_err(move |e| {
692 tracing::error!(
693 prev_epoch,
694 error = %e.as_report(),
695 "Failed to sync state store",
696 );
697 e.into()
698 })
699 .boxed()
700}
701
702impl LocalBarrierWorker {
703 fn complete_barrier(
704 &mut self,
705 database_id: DatabaseId,
706 partial_graph_id: PartialGraphId,
707 prev_epoch: u64,
708 ) {
709 {
710 let Some(database_state) = self
711 .state
712 .databases
713 .get_mut(&database_id)
714 .expect("should exist")
715 .state_for_request()
716 else {
717 return;
718 };
719 let (barrier, table_ids, create_mview_progress) =
720 database_state.pop_barrier_to_complete(partial_graph_id, prev_epoch);
721
722 let complete_barrier_future = match &barrier.kind {
723 BarrierKind::Unspecified => unreachable!(),
724 BarrierKind::Initial => {
725 tracing::info!(
726 epoch = prev_epoch,
727 "ignore sealing data for the first barrier"
728 );
729 tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
730 None
731 }
732 BarrierKind::Barrier => None,
733 BarrierKind::Checkpoint => Some(sync_epoch(
734 &self.actor_manager.env.state_store(),
735 &self.actor_manager.streaming_metrics,
736 prev_epoch,
737 table_ids.expect("should be Some on BarrierKind::Checkpoint"),
738 )),
739 };
740
741 self.await_epoch_completed_futures
742 .entry(database_id)
743 .or_default()
744 .push_back({
745 instrument_complete_barrier_future(
746 partial_graph_id,
747 complete_barrier_future,
748 barrier,
749 self.actor_manager.await_tree_reg.as_ref(),
750 create_mview_progress,
751 )
752 });
753 }
754 }
755
756 fn on_epoch_completed(
757 &mut self,
758 database_id: DatabaseId,
759 partial_graph_id: PartialGraphId,
760 epoch: u64,
761 result: BarrierCompleteResult,
762 ) {
763 let BarrierCompleteResult {
764 create_mview_progress,
765 sync_result,
766 } = result;
767
768 let (synced_sstables, table_watermarks, old_value_ssts) = sync_result
769 .map(|sync_result| {
770 (
771 sync_result.uncommitted_ssts,
772 sync_result.table_watermarks,
773 sync_result.old_value_ssts,
774 )
775 })
776 .unwrap_or_default();
777
778 let result = {
779 {
780 streaming_control_stream_response::Response::CompleteBarrier(
781 BarrierCompleteResponse {
782 request_id: "todo".to_owned(),
783 partial_graph_id: partial_graph_id.into(),
784 epoch,
785 status: None,
786 create_mview_progress,
787 synced_sstables: synced_sstables
788 .into_iter()
789 .map(
790 |LocalSstableInfo {
791 sst_info,
792 table_stats,
793 created_at,
794 }| PbLocalSstableInfo {
795 sst: Some(sst_info.into()),
796 table_stats_map: to_prost_table_stats_map(table_stats),
797 created_at,
798 },
799 )
800 .collect_vec(),
801 worker_id: self.actor_manager.env.worker_id(),
802 table_watermarks: table_watermarks
803 .into_iter()
804 .map(|(key, value)| (key.table_id, value.into()))
805 .collect(),
806 old_value_sstables: old_value_ssts
807 .into_iter()
808 .map(|sst| sst.sst_info.into())
809 .collect(),
810 database_id: database_id.database_id,
811 },
812 )
813 }
814 };
815
816 self.control_stream_handle.send_response(result);
817 }
818
819 fn send_barrier(
826 &mut self,
827 barrier: &Barrier,
828 request: InjectBarrierRequest,
829 ) -> StreamResult<()> {
830 debug!(
831 target: "events::stream::barrier::manager::send",
832 "send barrier {:?}, actor_ids_to_collect = {:?}",
833 barrier,
834 request.actor_ids_to_collect
835 );
836
837 let database_status = self
838 .state
839 .databases
840 .get_mut(&DatabaseId::new(request.database_id))
841 .expect("should exist");
842 if let Some(state) = database_status.state_for_request() {
843 state.transform_to_issued(barrier, request)?;
844 }
845 Ok(())
846 }
847
848 fn remove_partial_graphs(
849 &mut self,
850 database_id: DatabaseId,
851 partial_graph_ids: impl Iterator<Item = PartialGraphId>,
852 ) {
853 let Some(database_status) = self.state.databases.get_mut(&database_id) else {
854 warn!(
855 database_id = database_id.database_id,
856 "database to remove partial graph not exist"
857 );
858 return;
859 };
860 let Some(database_state) = database_status.state_for_request() else {
861 warn!(
862 database_id = database_id.database_id,
863 "ignore remove partial graph request on err database",
864 );
865 return;
866 };
867 for partial_graph_id in partial_graph_ids {
868 if let Some(graph) = database_state.graph_states.remove(&partial_graph_id) {
869 assert!(
870 graph.is_empty(),
871 "non empty graph to be removed: {}",
872 &graph
873 );
874 } else {
875 warn!(
876 partial_graph_id = partial_graph_id.0,
877 "no partial graph to remove"
878 );
879 }
880 }
881 }
882
883 fn add_partial_graph(&mut self, database_id: DatabaseId, partial_graph_id: PartialGraphId) {
884 let status = match self.state.databases.entry(database_id) {
885 Entry::Occupied(entry) => {
886 let status = entry.into_mut();
887 if let DatabaseStatus::ReceivedExchangeRequest(pending_requests) = status {
888 let mut database = DatabaseManagedBarrierState::new(
889 database_id,
890 self.term_id.clone(),
891 self.actor_manager.clone(),
892 vec![],
893 );
894 for ((upstream_actor_id, actor_id), result_sender) in pending_requests.drain(..)
895 {
896 database.new_actor_remote_output_request(
897 actor_id,
898 upstream_actor_id,
899 result_sender,
900 );
901 }
902 *status = DatabaseStatus::Running(database);
903 }
904
905 status
906 }
907 Entry::Vacant(entry) => {
908 entry.insert(DatabaseStatus::Running(DatabaseManagedBarrierState::new(
909 database_id,
910 self.term_id.clone(),
911 self.actor_manager.clone(),
912 vec![],
913 )))
914 }
915 };
916 if let Some(state) = status.state_for_request() {
917 assert!(
918 state
919 .graph_states
920 .insert(
921 partial_graph_id,
922 PartialGraphManagedBarrierState::new(&self.actor_manager)
923 )
924 .is_none()
925 );
926 }
927 }
928
929 fn reset_database(&mut self, req: ResetDatabaseRequest) {
930 let database_id = DatabaseId::new(req.database_id);
931 if let Some(database_status) = self.state.databases.get_mut(&database_id) {
932 database_status.start_reset(
933 database_id,
934 self.await_epoch_completed_futures.remove(&database_id),
935 req.reset_request_id,
936 );
937 } else {
938 self.ack_database_reset(database_id, None, req.reset_request_id);
939 }
940 }
941
942 fn ack_database_reset(
943 &mut self,
944 database_id: DatabaseId,
945 reset_output: Option<ResetDatabaseOutput>,
946 reset_request_id: u32,
947 ) {
948 info!(
949 database_id = database_id.database_id,
950 "database reset successfully"
951 );
952 if let Some(reset_database) = self.state.databases.remove(&database_id) {
953 match reset_database {
954 DatabaseStatus::Resetting(_) => {}
955 _ => {
956 unreachable!("must be resetting previously")
957 }
958 }
959 }
960 self.await_epoch_completed_futures.remove(&database_id);
961 self.control_stream_handle.ack_reset_database(
962 database_id,
963 reset_output.and_then(|output| output.root_err),
964 reset_request_id,
965 );
966 }
967
968 fn on_database_failure(
972 &mut self,
973 database_id: DatabaseId,
974 failed_actor: Option<ActorId>,
975 err: StreamError,
976 message: impl Into<String>,
977 ) {
978 let message = message.into();
979 error!(database_id = database_id.database_id, ?failed_actor, message, err = ?err.as_report(), "suspend database on error");
980 let completing_futures = self.await_epoch_completed_futures.remove(&database_id);
981 self.state
982 .databases
983 .get_mut(&database_id)
984 .expect("should exist")
985 .suspend(failed_actor, err, completing_futures);
986 self.control_stream_handle
987 .send_response(Response::ReportDatabaseFailure(
988 ReportDatabaseFailureResponse {
989 database_id: database_id.database_id,
990 },
991 ));
992 }
993
994 async fn reset(&mut self, init_request: InitRequest) {
996 join_all(
997 self.state
998 .databases
999 .values_mut()
1000 .map(|database| database.abort()),
1001 )
1002 .await;
1003 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
1004 m.clear();
1005 }
1006
1007 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
1008 hummock
1009 .clear_shared_buffer()
1010 .instrument_await("store_clear_shared_buffer".verbose())
1011 .await
1012 }
1013 self.actor_manager.env.dml_manager_ref().clear();
1014 *self = Self::new(
1015 self.actor_manager.clone(),
1016 init_request.databases,
1017 init_request.term_id,
1018 );
1019 self.actor_manager.env.client_pool().invalidate_all();
1020 }
1021
1022 pub fn spawn(
1024 env: StreamEnvironment,
1025 streaming_metrics: Arc<StreamingMetrics>,
1026 await_tree_reg: Option<await_tree::Registry>,
1027 watermark_epoch: AtomicU64Ref,
1028 actor_op_rx: UnboundedReceiver<LocalActorOperation>,
1029 ) -> JoinHandle<()> {
1030 let runtime = {
1031 let mut builder = tokio::runtime::Builder::new_multi_thread();
1032 if let Some(worker_threads_num) = env.config().actor_runtime_worker_threads_num {
1033 builder.worker_threads(worker_threads_num);
1034 }
1035 builder
1036 .thread_name("rw-streaming")
1037 .enable_all()
1038 .build()
1039 .unwrap()
1040 };
1041
1042 let actor_manager = Arc::new(StreamActorManager {
1043 env: env.clone(),
1044 streaming_metrics,
1045 watermark_epoch,
1046 await_tree_reg,
1047 runtime: runtime.into(),
1048 });
1049 let worker = LocalBarrierWorker::new(actor_manager, vec![], "uninitialized".into());
1050 tokio::spawn(worker.run(actor_op_rx))
1051 }
1052}
1053
1054pub(super) struct EventSender<T>(pub(super) UnboundedSender<T>);
1055
1056impl<T> Clone for EventSender<T> {
1057 fn clone(&self) -> Self {
1058 Self(self.0.clone())
1059 }
1060}
1061
1062impl<T> EventSender<T> {
1063 pub(super) fn send_event(&self, event: T) {
1064 self.0.send(event).expect("should be able to send event")
1065 }
1066
1067 pub(super) async fn send_and_await<RSP>(
1068 &self,
1069 make_event: impl FnOnce(oneshot::Sender<RSP>) -> T,
1070 ) -> StreamResult<RSP> {
1071 let (tx, rx) = oneshot::channel();
1072 let event = make_event(tx);
1073 self.send_event(event);
1074 rx.await
1075 .map_err(|_| anyhow!("barrier manager maybe reset").into())
1076 }
1077}
1078
1079pub(crate) enum NewOutputRequest {
1080 Local(permit::Sender),
1081 Remote(permit::Sender),
1082}
1083
1084#[cfg(test)]
1085pub(crate) mod barrier_test_utils {
1086 use assert_matches::assert_matches;
1087 use futures::StreamExt;
1088 use risingwave_pb::stream_service::streaming_control_stream_request::{
1089 InitRequest, PbDatabaseInitialPartialGraph, PbInitialPartialGraph,
1090 };
1091 use risingwave_pb::stream_service::{
1092 InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
1093 streaming_control_stream_request, streaming_control_stream_response,
1094 };
1095 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
1096 use tokio::sync::oneshot;
1097 use tokio_stream::wrappers::UnboundedReceiverStream;
1098 use tonic::Status;
1099
1100 use crate::executor::Barrier;
1101 use crate::task::barrier_worker::{ControlStreamHandle, EventSender, LocalActorOperation};
1102 use crate::task::{
1103 ActorId, LocalBarrierManager, NewOutputRequest, TEST_DATABASE_ID, TEST_PARTIAL_GRAPH_ID,
1104 };
1105
1106 pub(crate) struct LocalBarrierTestEnv {
1107 pub local_barrier_manager: LocalBarrierManager,
1108 pub(super) actor_op_tx: EventSender<LocalActorOperation>,
1109 pub request_tx: UnboundedSender<Result<StreamingControlStreamRequest, Status>>,
1110 pub response_rx: UnboundedReceiver<Result<StreamingControlStreamResponse, Status>>,
1111 }
1112
1113 impl LocalBarrierTestEnv {
1114 pub(crate) async fn for_test() -> Self {
1115 let actor_op_tx = LocalBarrierManager::spawn_for_test();
1116
1117 let (request_tx, request_rx) = unbounded_channel();
1118 let (response_tx, mut response_rx) = unbounded_channel();
1119
1120 actor_op_tx.send_event(LocalActorOperation::NewControlStream {
1121 handle: ControlStreamHandle::new(
1122 response_tx,
1123 UnboundedReceiverStream::new(request_rx).boxed(),
1124 ),
1125 init_request: InitRequest {
1126 databases: vec![PbDatabaseInitialPartialGraph {
1127 database_id: TEST_DATABASE_ID.database_id,
1128 graphs: vec![PbInitialPartialGraph {
1129 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1130 subscriptions: vec![],
1131 }],
1132 }],
1133 term_id: "for_test".into(),
1134 },
1135 });
1136
1137 assert_matches!(
1138 response_rx.recv().await.unwrap().unwrap().response.unwrap(),
1139 streaming_control_stream_response::Response::Init(_)
1140 );
1141
1142 let local_barrier_manager = actor_op_tx
1143 .send_and_await(LocalActorOperation::GetCurrentLocalBarrierManager)
1144 .await
1145 .unwrap();
1146
1147 Self {
1148 local_barrier_manager,
1149 actor_op_tx,
1150 request_tx,
1151 response_rx,
1152 }
1153 }
1154
1155 pub(crate) fn inject_barrier(
1156 &self,
1157 barrier: &Barrier,
1158 actor_to_collect: impl IntoIterator<Item = ActorId>,
1159 ) {
1160 self.request_tx
1161 .send(Ok(StreamingControlStreamRequest {
1162 request: Some(streaming_control_stream_request::Request::InjectBarrier(
1163 InjectBarrierRequest {
1164 request_id: "".to_owned(),
1165 barrier: Some(barrier.to_protobuf()),
1166 database_id: TEST_DATABASE_ID.database_id,
1167 actor_ids_to_collect: actor_to_collect.into_iter().collect(),
1168 table_ids_to_sync: vec![],
1169 partial_graph_id: TEST_PARTIAL_GRAPH_ID.into(),
1170 actors_to_build: vec![],
1171 subscriptions_to_add: vec![],
1172 subscriptions_to_remove: vec![],
1173 },
1174 )),
1175 }))
1176 .unwrap();
1177 }
1178
1179 pub(crate) async fn flush_all_events(&self) {
1180 Self::flush_all_events_impl(&self.actor_op_tx).await
1181 }
1182
1183 pub(super) async fn flush_all_events_impl(actor_op_tx: &EventSender<LocalActorOperation>) {
1184 let (tx, rx) = oneshot::channel();
1185 actor_op_tx.send_event(LocalActorOperation::Flush(tx));
1186 rx.await.unwrap()
1187 }
1188
1189 pub(crate) async fn take_pending_new_output_requests(
1190 &self,
1191 actor_id: ActorId,
1192 ) -> Vec<(ActorId, NewOutputRequest)> {
1193 self.actor_op_tx
1194 .send_and_await(|tx| LocalActorOperation::TakePendingNewOutputRequest(actor_id, tx))
1195 .await
1196 .unwrap()
1197 }
1198 }
1199}