1use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Instant;
21
22use async_recursion::async_recursion;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::join_all;
25use futures::stream::BoxStream;
26use futures::{FutureExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{ColumnId, DatabaseId, Field, Schema, TableId};
30use risingwave_common::config::MetricLevel;
31use risingwave_common::must_match;
32use risingwave_common::operator::{unique_executor_id, unique_operator_id};
33use risingwave_pb::common::ActorInfo;
34use risingwave_pb::plan_common::StorageTableDesc;
35use risingwave_pb::stream_plan;
36use risingwave_pb::stream_plan::stream_node::NodeBody;
37use risingwave_pb::stream_plan::{StreamNode, StreamScanNode, StreamScanType};
38use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
39use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
40use risingwave_pb::stream_service::{
41 StreamingControlStreamRequest, StreamingControlStreamResponse,
42};
43use risingwave_storage::monitor::HummockTraceFutureExt;
44use risingwave_storage::table::batch_table::BatchTable;
45use risingwave_storage::{StateStore, dispatch_state_store};
46use thiserror_ext::AsReport;
47use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
48use tokio::task::JoinHandle;
49use tonic::Status;
50
51use crate::common::table::state_table::StateTable;
52use crate::error::StreamResult;
53use crate::executor::exchange::permit::Receiver;
54use crate::executor::monitor::StreamingMetrics;
55use crate::executor::subtask::SubtaskHandle;
56use crate::executor::{
57 Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Execute, Executor,
58 ExecutorInfo, MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor,
59 WrapperExecutor,
60};
61use crate::from_proto::{MergeExecutorBuilder, create_executor};
62use crate::task::barrier_manager::{
63 ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
64};
65use crate::task::{
66 ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, StreamEnvironment,
67 UpDownActorIds,
68};
69
70#[cfg(test)]
71pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
72 std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
73
74pub type ActorHandle = JoinHandle<()>;
75
76pub type AtomicU64Ref = Arc<AtomicU64>;
77
78pub mod await_tree_key {
79 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
81 pub struct Actor(pub crate::task::ActorId);
82
83 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
85 pub struct BarrierAwait {
86 pub prev_epoch: u64,
87 }
88}
89
90#[derive(Clone)]
92pub struct LocalStreamManager {
93 await_tree_reg: Option<await_tree::Registry>,
94
95 pub env: StreamEnvironment,
96
97 actor_op_tx: EventSender<LocalActorOperation>,
98}
99
100#[derive(Clone)]
104pub struct ActorEvalErrorReport {
105 pub actor_context: ActorContextRef,
106 pub identity: Arc<str>,
107}
108
109impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
110 fn report(&self, err: risingwave_expr::ExprError) {
111 self.actor_context.on_compute_error(err, &self.identity);
112 }
113}
114
115pub struct ExecutorParams {
116 pub env: StreamEnvironment,
117
118 pub info: ExecutorInfo,
120
121 pub executor_id: u64,
123
124 pub operator_id: u64,
126
127 pub op_info: String,
130
131 pub input: Vec<Executor>,
133
134 pub fragment_id: FragmentId,
136
137 pub executor_stats: Arc<StreamingMetrics>,
139
140 pub actor_context: ActorContextRef,
142
143 pub vnode_bitmap: Option<Bitmap>,
145
146 pub eval_error_report: ActorEvalErrorReport,
148
149 pub watermark_epoch: AtomicU64Ref,
151
152 pub shared_context: Arc<SharedContext>,
153
154 pub local_barrier_manager: LocalBarrierManager,
155}
156
157impl Debug for ExecutorParams {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("ExecutorParams")
160 .field("info", &self.info)
161 .field("executor_id", &self.executor_id)
162 .field("operator_id", &self.operator_id)
163 .field("op_info", &self.op_info)
164 .field("input", &self.input.len())
165 .field("actor_id", &self.actor_context.id)
166 .finish_non_exhaustive()
167 }
168}
169
170impl LocalStreamManager {
171 pub fn new(
172 env: StreamEnvironment,
173 streaming_metrics: Arc<StreamingMetrics>,
174 await_tree_config: Option<await_tree::Config>,
175 watermark_epoch: AtomicU64Ref,
176 ) -> Self {
177 if !env.config().unsafe_enable_strict_consistency {
178 risingwave_storage::hummock::utils::disable_sanity_check();
181 }
182
183 let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);
184
185 let (actor_op_tx, actor_op_rx) = unbounded_channel();
186
187 let _join_handle = LocalBarrierWorker::spawn(
188 env.clone(),
189 streaming_metrics,
190 await_tree_reg.clone(),
191 watermark_epoch,
192 actor_op_rx,
193 );
194 Self {
195 await_tree_reg,
196 env,
197 actor_op_tx: EventSender(actor_op_tx),
198 }
199 }
200
201 pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
203 self.await_tree_reg.as_ref()
204 }
205
206 pub fn handle_new_control_stream(
209 &self,
210 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
211 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
212 init_request: InitRequest,
213 ) {
214 self.actor_op_tx
215 .send_event(LocalActorOperation::NewControlStream {
216 handle: ControlStreamHandle::new(sender, request_stream),
217 init_request,
218 })
219 }
220
221 pub async fn take_receiver(
222 &self,
223 database_id: DatabaseId,
224 term_id: String,
225 ids: UpDownActorIds,
226 ) -> StreamResult<Receiver> {
227 self.actor_op_tx
228 .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
229 database_id,
230 term_id,
231 ids,
232 result_sender,
233 })
234 .await?
235 }
236
237 pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
238 info!("start inspecting barrier state");
239 let start = Instant::now();
240 self.actor_op_tx
241 .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
242 .inspect(|result| {
243 info!(
244 ok = result.is_ok(),
245 time = ?start.elapsed(),
246 "finish inspecting barrier state"
247 );
248 })
249 .await
250 }
251
252 pub async fn shutdown(&self) -> StreamResult<()> {
253 self.actor_op_tx
254 .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
255 .await
256 }
257}
258
259impl LocalBarrierWorker {
260 pub(super) async fn reset(&mut self, init_request: InitRequest) {
262 join_all(
263 self.state
264 .databases
265 .values_mut()
266 .map(|database| database.abort()),
267 )
268 .await;
269 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
270 m.clear();
271 }
272
273 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
274 hummock
275 .clear_shared_buffer()
276 .instrument_await("store_clear_shared_buffer".verbose())
277 .await
278 }
279 self.actor_manager.env.dml_manager_ref().clear();
280 *self = Self::new(
281 self.actor_manager.clone(),
282 init_request.databases,
283 init_request.term_id,
284 );
285 }
286}
287
288impl StreamActorManager {
289 fn create_dispatcher(
291 &self,
292 env: StreamEnvironment,
293 input: Executor,
294 dispatchers: &[stream_plan::Dispatcher],
295 actor_id: ActorId,
296 fragment_id: FragmentId,
297 shared_context: &Arc<SharedContext>,
298 ) -> StreamResult<DispatchExecutor> {
299 let dispatcher_impls = dispatchers
300 .iter()
301 .map(|dispatcher| DispatcherImpl::new(shared_context, actor_id, dispatcher))
302 .try_collect()?;
303
304 Ok(DispatchExecutor::new(
305 input,
306 dispatcher_impls,
307 actor_id,
308 fragment_id,
309 shared_context.clone(),
310 self.streaming_metrics.clone(),
311 env.config().developer.chunk_size,
312 ))
313 }
314
315 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
316 unique_executor_id(actor_context.id, node.operator_id)
319 }
320
321 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
322 let schema: Schema = node.fields.iter().map(Field::from).collect();
323
324 let pk_indices = node
325 .get_stream_key()
326 .iter()
327 .map(|idx| *idx as usize)
328 .collect::<Vec<_>>();
329
330 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
331 ExecutorInfo {
332 schema,
333 pk_indices,
334 identity,
335 id: executor_id,
336 }
337 }
338
339 fn create_snapshot_backfill_input(
340 &self,
341 upstream_node: &StreamNode,
342 actor_context: &ActorContextRef,
343 shared_context: &Arc<SharedContext>,
344 chunk_size: usize,
345 ) -> StreamResult<MergeExecutorInput> {
346 let info = Self::get_executor_info(
347 upstream_node,
348 Self::get_executor_id(actor_context, upstream_node),
349 );
350
351 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
352 upstream_merge
353 });
354
355 MergeExecutorBuilder::new_input(
356 shared_context.clone(),
357 self.streaming_metrics.clone(),
358 actor_context.clone(),
359 info,
360 upstream_merge,
361 chunk_size,
362 )
363 }
364
365 #[expect(clippy::too_many_arguments)]
366 async fn create_snapshot_backfill_node(
367 &self,
368 stream_node: &StreamNode,
369 node: &StreamScanNode,
370 actor_context: &ActorContextRef,
371 vnode_bitmap: Option<Bitmap>,
372 shared_context: &Arc<SharedContext>,
373 env: StreamEnvironment,
374 local_barrier_manager: &LocalBarrierManager,
375 state_store: impl StateStore,
376 ) -> StreamResult<Executor> {
377 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
378 let chunk_size = env.config().developer.chunk_size;
379 let upstream = self.create_snapshot_backfill_input(
380 upstream_node,
381 actor_context,
382 shared_context,
383 chunk_size,
384 )?;
385
386 let table_desc: &StorageTableDesc = node.get_table_desc()?;
387
388 let output_indices = node
389 .output_indices
390 .iter()
391 .map(|&i| i as usize)
392 .collect_vec();
393
394 let column_ids = node
395 .upstream_column_ids
396 .iter()
397 .map(ColumnId::from)
398 .collect_vec();
399
400 let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
401
402 let vnodes = vnode_bitmap.map(Arc::new);
403 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
404
405 let upstream_table =
406 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
407
408 let state_table = node.get_state_table()?;
409 let state_table =
410 StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
411
412 let executor = SnapshotBackfillExecutor::new(
413 upstream_table,
414 state_table,
415 upstream,
416 output_indices,
417 actor_context.clone(),
418 progress,
419 chunk_size,
420 node.rate_limit.into(),
421 barrier_rx,
422 self.streaming_metrics.clone(),
423 node.snapshot_backfill_epoch,
424 )
425 .boxed();
426
427 let info = Self::get_executor_info(
428 stream_node,
429 Self::get_executor_id(actor_context, stream_node),
430 );
431
432 if crate::consistency::insane() {
433 let mut troubled_info = info.clone();
434 troubled_info.identity = format!("{} (troubled)", info.identity);
435 Ok((
436 info,
437 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
438 )
439 .into())
440 } else {
441 Ok((info, executor).into())
442 }
443 }
444
445 #[allow(clippy::too_many_arguments)]
447 #[async_recursion]
448 async fn create_nodes_inner(
449 &self,
450 fragment_id: FragmentId,
451 node: &stream_plan::StreamNode,
452 env: StreamEnvironment,
453 store: impl StateStore,
454 actor_context: &ActorContextRef,
455 vnode_bitmap: Option<Bitmap>,
456 has_stateful: bool,
457 subtasks: &mut Vec<SubtaskHandle>,
458 shared_context: &Arc<SharedContext>,
459 local_barrier_manager: &LocalBarrierManager,
460 ) -> StreamResult<Executor> {
461 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
462 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
463 {
464 return dispatch_state_store!(env.state_store(), store, {
465 self.create_snapshot_backfill_node(
466 node,
467 stream_scan,
468 actor_context,
469 vnode_bitmap,
470 shared_context,
471 env,
472 local_barrier_manager,
473 store,
474 )
475 .await
476 });
477 }
478
479 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
482 matches!(
483 stream_node.get_node_body().unwrap(),
484 NodeBody::HashAgg(_)
485 | NodeBody::HashJoin(_)
486 | NodeBody::DeltaIndexJoin(_)
487 | NodeBody::Lookup(_)
488 | NodeBody::StreamScan(_)
489 | NodeBody::StreamCdcScan(_)
490 | NodeBody::DynamicFilter(_)
491 | NodeBody::GroupTopN(_)
492 | NodeBody::Now(_)
493 )
494 }
495 let is_stateful = is_stateful_executor(node);
496
497 let mut input = Vec::with_capacity(node.input.iter().len());
499 for input_stream_node in &node.input {
500 input.push(
501 self.create_nodes_inner(
502 fragment_id,
503 input_stream_node,
504 env.clone(),
505 store.clone(),
506 actor_context,
507 vnode_bitmap.clone(),
508 has_stateful || is_stateful,
509 subtasks,
510 shared_context,
511 local_barrier_manager,
512 )
513 .await?,
514 );
515 }
516
517 let op_info = node.get_identity().clone();
518
519 let executor_id = Self::get_executor_id(actor_context, node);
522 let operator_id = unique_operator_id(fragment_id, node.operator_id);
523
524 let info = Self::get_executor_info(node, executor_id);
525
526 let eval_error_report = ActorEvalErrorReport {
527 actor_context: actor_context.clone(),
528 identity: info.identity.clone().into(),
529 };
530
531 let executor_params = ExecutorParams {
533 env: env.clone(),
534
535 info: info.clone(),
536 executor_id,
537 operator_id,
538 op_info,
539 input,
540 fragment_id,
541 executor_stats: self.streaming_metrics.clone(),
542 actor_context: actor_context.clone(),
543 vnode_bitmap,
544 eval_error_report,
545 watermark_epoch: self.watermark_epoch.clone(),
546 shared_context: shared_context.clone(),
547 local_barrier_manager: local_barrier_manager.clone(),
548 };
549
550 let executor = create_executor(executor_params, node, store).await?;
551
552 let wrapped = WrapperExecutor::new(
554 executor,
555 actor_context.clone(),
556 env.config().developer.enable_executor_row_count,
557 env.config().developer.enable_explain_analyze_stats,
558 );
559 let executor = (info, wrapped).into();
560
561 let executor = if has_stateful && is_stateful {
563 let _ = subtasks;
569 executor
570 } else {
571 executor
572 };
573
574 Ok(executor)
575 }
576
577 #[expect(clippy::too_many_arguments)]
579 async fn create_nodes(
580 &self,
581 fragment_id: FragmentId,
582 node: &stream_plan::StreamNode,
583 env: StreamEnvironment,
584 actor_context: &ActorContextRef,
585 vnode_bitmap: Option<Bitmap>,
586 shared_context: &Arc<SharedContext>,
587 local_barrier_manager: &LocalBarrierManager,
588 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
589 let mut subtasks = vec![];
590
591 let executor = dispatch_state_store!(env.state_store(), store, {
592 self.create_nodes_inner(
593 fragment_id,
594 node,
595 env,
596 store,
597 actor_context,
598 vnode_bitmap,
599 false,
600 &mut subtasks,
601 shared_context,
602 local_barrier_manager,
603 )
604 .await
605 })?;
606
607 Ok((executor, subtasks))
608 }
609
610 async fn create_actor(
611 self: Arc<Self>,
612 actor: BuildActorInfo,
613 fragment_id: FragmentId,
614 node: Arc<StreamNode>,
615 shared_context: Arc<SharedContext>,
616 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
617 local_barrier_manager: LocalBarrierManager,
618 ) -> StreamResult<Actor<DispatchExecutor>> {
619 {
620 let actor_id = actor.actor_id;
621 let streaming_config = self.env.config().clone();
622 let actor_context = ActorContext::create(
623 &actor,
624 fragment_id,
625 self.env.total_mem_usage(),
626 self.streaming_metrics.clone(),
627 related_subscriptions,
628 self.env.meta_client().clone(),
629 streaming_config,
630 );
631 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
632 let expr_context = actor.expr_context.clone().unwrap();
633
634 let (executor, subtasks) = self
635 .create_nodes(
636 fragment_id,
637 &node,
638 self.env.clone(),
639 &actor_context,
640 vnode_bitmap,
641 &shared_context,
642 &local_barrier_manager,
643 )
644 .await?;
645
646 let dispatcher = self.create_dispatcher(
647 self.env.clone(),
648 executor,
649 &actor.dispatchers,
650 actor_id,
651 fragment_id,
652 &shared_context,
653 )?;
654 let actor = Actor::new(
655 dispatcher,
656 subtasks,
657 self.streaming_metrics.clone(),
658 actor_context.clone(),
659 expr_context,
660 local_barrier_manager,
661 );
662 Ok(actor)
663 }
664 }
665}
666
667impl StreamActorManager {
668 pub(super) fn spawn_actor(
669 self: &Arc<Self>,
670 actor: BuildActorInfo,
671 fragment_id: FragmentId,
672 node: Arc<StreamNode>,
673 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
674 current_shared_context: Arc<SharedContext>,
675 local_barrier_manager: LocalBarrierManager,
676 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
677 {
678 let monitor = tokio_metrics::TaskMonitor::new();
679 let stream_actor_ref = &actor;
680 let actor_id = stream_actor_ref.actor_id;
681 let handle = {
682 let trace_span =
683 format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
684 let barrier_manager = local_barrier_manager.clone();
685 let actor = self
687 .clone()
688 .create_actor(
689 actor,
690 fragment_id,
691 node,
692 current_shared_context,
693 related_subscriptions,
694 barrier_manager.clone()
695 ).boxed().and_then(|actor| actor.run()).map(move |result| {
696 if let Err(err) = result {
697 tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
700 barrier_manager.notify_failure(actor_id, err);
701 }
702 });
703 let traced = match &self.await_tree_reg {
704 Some(m) => m
705 .register(await_tree_key::Actor(actor_id), trace_span)
706 .instrument(actor)
707 .left_future(),
708 None => actor.right_future(),
709 };
710 let instrumented = monitor.instrument(traced);
711 let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
712 let may_track_hummock = with_config.may_trace_hummock();
714
715 self.runtime.spawn(may_track_hummock)
716 };
717
718 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
719 || self.env.config().developer.enable_actor_tokio_metrics
720 {
721 tracing::info!("Tokio metrics are enabled.");
722 let streaming_metrics = self.streaming_metrics.clone();
723 let actor_monitor_task = self.runtime.spawn(async move {
724 let metrics = streaming_metrics.new_actor_metrics(actor_id);
725 loop {
726 let task_metrics = monitor.cumulative();
727 metrics
728 .actor_execution_time
729 .set(task_metrics.total_poll_duration.as_secs_f64());
730 metrics
731 .actor_fast_poll_duration
732 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
733 metrics
734 .actor_fast_poll_cnt
735 .set(task_metrics.total_fast_poll_count as i64);
736 metrics
737 .actor_slow_poll_duration
738 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
739 metrics
740 .actor_slow_poll_cnt
741 .set(task_metrics.total_slow_poll_count as i64);
742 metrics
743 .actor_poll_duration
744 .set(task_metrics.total_poll_duration.as_secs_f64());
745 metrics
746 .actor_poll_cnt
747 .set(task_metrics.total_poll_count as i64);
748 metrics
749 .actor_idle_duration
750 .set(task_metrics.total_idle_duration.as_secs_f64());
751 metrics
752 .actor_idle_cnt
753 .set(task_metrics.total_idled_count as i64);
754 metrics
755 .actor_scheduled_duration
756 .set(task_metrics.total_scheduled_duration.as_secs_f64());
757 metrics
758 .actor_scheduled_cnt
759 .set(task_metrics.total_scheduled_count as i64);
760 tokio::time::sleep(Duration::from_secs(1)).await;
761 }
762 });
763 Some(actor_monitor_task)
764 } else {
765 None
766 };
767 (handle, monitor_handle)
768 }
769 }
770}
771
772impl LocalBarrierWorker {
773 pub fn update_actor_info(
776 &mut self,
777 database_id: DatabaseId,
778 new_actor_infos: impl Iterator<Item = ActorInfo>,
779 ) {
780 Self::get_or_insert_database_shared_context(
781 &mut self.state.current_shared_context,
782 database_id,
783 &self.actor_manager,
784 &self.term_id,
785 )
786 .add_actors(new_actor_infos);
787 }
788}
789
790#[cfg(test)]
791pub mod test_utils {
792 use risingwave_pb::common::HostAddress;
793
794 use super::*;
795
796 pub fn helper_make_local_actor(actor_id: u32) -> ActorInfo {
797 ActorInfo {
798 actor_id,
799 host: Some(HostAddress {
800 host: LOCAL_TEST_ADDR.host.clone(),
801 port: LOCAL_TEST_ADDR.port as i32,
802 }),
803 }
804 }
805}