1use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Instant;
21
22use async_recursion::async_recursion;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::join_all;
25use futures::stream::BoxStream;
26use futures::{FutureExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{ColumnId, DatabaseId, Field, Schema, TableId};
30use risingwave_common::config::MetricLevel;
31use risingwave_common::must_match;
32use risingwave_common::operator::{unique_executor_id, unique_operator_id};
33use risingwave_pb::plan_common::StorageTableDesc;
34use risingwave_pb::stream_plan;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{StreamNode, StreamScanNode, StreamScanType};
37use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
38use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
39use risingwave_pb::stream_service::{
40 StreamingControlStreamRequest, StreamingControlStreamResponse,
41};
42use risingwave_storage::monitor::HummockTraceFutureExt;
43use risingwave_storage::table::batch_table::BatchTable;
44use risingwave_storage::{StateStore, dispatch_state_store};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
47use tokio::task::JoinHandle;
48use tonic::Status;
49
50use crate::common::table::state_table::StateTable;
51use crate::error::StreamResult;
52use crate::executor::exchange::permit::Receiver;
53use crate::executor::monitor::StreamingMetrics;
54use crate::executor::subtask::SubtaskHandle;
55use crate::executor::{
56 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
57 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
58};
59use crate::from_proto::{MergeExecutorBuilder, create_executor};
60use crate::task::barrier_manager::{
61 ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
62};
63use crate::task::{
64 ActorId, FragmentId, LocalBarrierManager, NewOutputRequest, StreamActorManager,
65 StreamEnvironment, UpDownActorIds,
66};
67
68#[cfg(test)]
69pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
70 std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
71
72pub type ActorHandle = JoinHandle<()>;
73
74pub type AtomicU64Ref = Arc<AtomicU64>;
75
76pub mod await_tree_key {
77 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79 pub struct Actor(pub crate::task::ActorId);
80
81 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83 pub struct BarrierAwait {
84 pub prev_epoch: u64,
85 }
86}
87
88#[derive(Clone)]
90pub struct LocalStreamManager {
91 await_tree_reg: Option<await_tree::Registry>,
92
93 pub env: StreamEnvironment,
94
95 actor_op_tx: EventSender<LocalActorOperation>,
96}
97
98#[derive(Clone)]
102pub struct ActorEvalErrorReport {
103 pub actor_context: ActorContextRef,
104 pub identity: Arc<str>,
105}
106
107impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
108 fn report(&self, err: risingwave_expr::ExprError) {
109 self.actor_context.on_compute_error(err, &self.identity);
110 }
111}
112
113pub struct ExecutorParams {
114 pub env: StreamEnvironment,
115
116 pub info: ExecutorInfo,
118
119 pub executor_id: u64,
121
122 pub operator_id: u64,
124
125 pub op_info: String,
128
129 pub input: Vec<Executor>,
131
132 pub fragment_id: FragmentId,
134
135 pub executor_stats: Arc<StreamingMetrics>,
137
138 pub actor_context: ActorContextRef,
140
141 pub vnode_bitmap: Option<Bitmap>,
143
144 pub eval_error_report: ActorEvalErrorReport,
146
147 pub watermark_epoch: AtomicU64Ref,
149
150 pub local_barrier_manager: LocalBarrierManager,
151}
152
153impl Debug for ExecutorParams {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("ExecutorParams")
156 .field("info", &self.info)
157 .field("executor_id", &self.executor_id)
158 .field("operator_id", &self.operator_id)
159 .field("op_info", &self.op_info)
160 .field("input", &self.input.len())
161 .field("actor_id", &self.actor_context.id)
162 .finish_non_exhaustive()
163 }
164}
165
166impl LocalStreamManager {
167 pub fn new(
168 env: StreamEnvironment,
169 streaming_metrics: Arc<StreamingMetrics>,
170 await_tree_config: Option<await_tree::Config>,
171 watermark_epoch: AtomicU64Ref,
172 ) -> Self {
173 if !env.config().unsafe_enable_strict_consistency {
174 risingwave_storage::hummock::utils::disable_sanity_check();
177 }
178
179 let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);
180
181 let (actor_op_tx, actor_op_rx) = unbounded_channel();
182
183 let _join_handle = LocalBarrierWorker::spawn(
184 env.clone(),
185 streaming_metrics,
186 await_tree_reg.clone(),
187 watermark_epoch,
188 actor_op_rx,
189 );
190 Self {
191 await_tree_reg,
192 env,
193 actor_op_tx: EventSender(actor_op_tx),
194 }
195 }
196
197 pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
199 self.await_tree_reg.as_ref()
200 }
201
202 pub fn handle_new_control_stream(
205 &self,
206 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
207 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
208 init_request: InitRequest,
209 ) {
210 self.actor_op_tx
211 .send_event(LocalActorOperation::NewControlStream {
212 handle: ControlStreamHandle::new(sender, request_stream),
213 init_request,
214 })
215 }
216
217 pub async fn take_receiver(
218 &self,
219 database_id: DatabaseId,
220 term_id: String,
221 ids: UpDownActorIds,
222 ) -> StreamResult<Receiver> {
223 self.actor_op_tx
224 .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
225 database_id,
226 term_id,
227 ids,
228 result_sender,
229 })
230 .await?
231 }
232
233 pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
234 info!("start inspecting barrier state");
235 let start = Instant::now();
236 self.actor_op_tx
237 .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
238 .inspect(|result| {
239 info!(
240 ok = result.is_ok(),
241 time = ?start.elapsed(),
242 "finish inspecting barrier state"
243 );
244 })
245 .await
246 }
247
248 pub async fn shutdown(&self) -> StreamResult<()> {
249 self.actor_op_tx
250 .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
251 .await
252 }
253}
254
255impl LocalBarrierWorker {
256 pub(super) async fn reset(&mut self, init_request: InitRequest) {
258 join_all(
259 self.state
260 .databases
261 .values_mut()
262 .map(|database| database.abort()),
263 )
264 .await;
265 if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
266 m.clear();
267 }
268
269 if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
270 hummock
271 .clear_shared_buffer()
272 .instrument_await("store_clear_shared_buffer".verbose())
273 .await
274 }
275 self.actor_manager.env.dml_manager_ref().clear();
276 *self = Self::new(
277 self.actor_manager.clone(),
278 init_request.databases,
279 init_request.term_id,
280 );
281 self.actor_manager.env.client_pool().invalidate_all();
282 }
283}
284
285impl StreamActorManager {
286 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
287 unique_executor_id(actor_context.id, node.operator_id)
290 }
291
292 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
293 let schema: Schema = node.fields.iter().map(Field::from).collect();
294
295 let pk_indices = node
296 .get_stream_key()
297 .iter()
298 .map(|idx| *idx as usize)
299 .collect::<Vec<_>>();
300
301 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
302 ExecutorInfo {
303 schema,
304 pk_indices,
305 identity,
306 id: executor_id,
307 }
308 }
309
310 async fn create_snapshot_backfill_input(
311 &self,
312 upstream_node: &StreamNode,
313 actor_context: &ActorContextRef,
314 local_barrier_manager: &LocalBarrierManager,
315 chunk_size: usize,
316 ) -> StreamResult<MergeExecutorInput> {
317 let info = Self::get_executor_info(
318 upstream_node,
319 Self::get_executor_id(actor_context, upstream_node),
320 );
321
322 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
323 upstream_merge
324 });
325
326 MergeExecutorBuilder::new_input(
327 local_barrier_manager.clone(),
328 self.streaming_metrics.clone(),
329 actor_context.clone(),
330 info,
331 upstream_merge,
332 chunk_size,
333 )
334 .await
335 }
336
337 #[expect(clippy::too_many_arguments)]
338 async fn create_snapshot_backfill_node(
339 &self,
340 stream_node: &StreamNode,
341 node: &StreamScanNode,
342 actor_context: &ActorContextRef,
343 vnode_bitmap: Option<Bitmap>,
344 env: StreamEnvironment,
345 local_barrier_manager: &LocalBarrierManager,
346 state_store: impl StateStore,
347 ) -> StreamResult<Executor> {
348 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
349 let chunk_size = env.config().developer.chunk_size;
350 let upstream = self
351 .create_snapshot_backfill_input(
352 upstream_node,
353 actor_context,
354 local_barrier_manager,
355 chunk_size,
356 )
357 .await?;
358
359 let table_desc: &StorageTableDesc = node.get_table_desc()?;
360
361 let output_indices = node
362 .output_indices
363 .iter()
364 .map(|&i| i as usize)
365 .collect_vec();
366
367 let column_ids = node
368 .upstream_column_ids
369 .iter()
370 .map(ColumnId::from)
371 .collect_vec();
372
373 let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
374
375 let vnodes = vnode_bitmap.map(Arc::new);
376 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
377
378 let upstream_table =
379 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
380
381 let state_table = node.get_state_table()?;
382 let state_table =
383 StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
384
385 let executor = SnapshotBackfillExecutor::new(
386 upstream_table,
387 state_table,
388 upstream,
389 output_indices,
390 actor_context.clone(),
391 progress,
392 chunk_size,
393 node.rate_limit.into(),
394 barrier_rx,
395 self.streaming_metrics.clone(),
396 node.snapshot_backfill_epoch,
397 )
398 .boxed();
399
400 let info = Self::get_executor_info(
401 stream_node,
402 Self::get_executor_id(actor_context, stream_node),
403 );
404
405 if crate::consistency::insane() {
406 let mut troubled_info = info.clone();
407 troubled_info.identity = format!("{} (troubled)", info.identity);
408 Ok((
409 info,
410 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
411 )
412 .into())
413 } else {
414 Ok((info, executor).into())
415 }
416 }
417
418 #[expect(clippy::too_many_arguments)]
420 #[async_recursion]
421 async fn create_nodes_inner(
422 &self,
423 fragment_id: FragmentId,
424 node: &stream_plan::StreamNode,
425 env: StreamEnvironment,
426 store: impl StateStore,
427 actor_context: &ActorContextRef,
428 vnode_bitmap: Option<Bitmap>,
429 has_stateful: bool,
430 subtasks: &mut Vec<SubtaskHandle>,
431 local_barrier_manager: &LocalBarrierManager,
432 ) -> StreamResult<Executor> {
433 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
434 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
435 {
436 return dispatch_state_store!(env.state_store(), store, {
437 self.create_snapshot_backfill_node(
438 node,
439 stream_scan,
440 actor_context,
441 vnode_bitmap,
442 env,
443 local_barrier_manager,
444 store,
445 )
446 .await
447 });
448 }
449
450 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
453 matches!(
454 stream_node.get_node_body().unwrap(),
455 NodeBody::HashAgg(_)
456 | NodeBody::HashJoin(_)
457 | NodeBody::DeltaIndexJoin(_)
458 | NodeBody::Lookup(_)
459 | NodeBody::StreamScan(_)
460 | NodeBody::StreamCdcScan(_)
461 | NodeBody::DynamicFilter(_)
462 | NodeBody::GroupTopN(_)
463 | NodeBody::Now(_)
464 )
465 }
466 let is_stateful = is_stateful_executor(node);
467
468 let mut input = Vec::with_capacity(node.input.iter().len());
470 for input_stream_node in &node.input {
471 input.push(
472 self.create_nodes_inner(
473 fragment_id,
474 input_stream_node,
475 env.clone(),
476 store.clone(),
477 actor_context,
478 vnode_bitmap.clone(),
479 has_stateful || is_stateful,
480 subtasks,
481 local_barrier_manager,
482 )
483 .await?,
484 );
485 }
486
487 let op_info = node.get_identity().clone();
488
489 let executor_id = Self::get_executor_id(actor_context, node);
492 let operator_id = unique_operator_id(fragment_id, node.operator_id);
493
494 let info = Self::get_executor_info(node, executor_id);
495
496 let eval_error_report = ActorEvalErrorReport {
497 actor_context: actor_context.clone(),
498 identity: info.identity.clone().into(),
499 };
500
501 let executor_params = ExecutorParams {
503 env: env.clone(),
504
505 info: info.clone(),
506 executor_id,
507 operator_id,
508 op_info,
509 input,
510 fragment_id,
511 executor_stats: self.streaming_metrics.clone(),
512 actor_context: actor_context.clone(),
513 vnode_bitmap,
514 eval_error_report,
515 watermark_epoch: self.watermark_epoch.clone(),
516 local_barrier_manager: local_barrier_manager.clone(),
517 };
518
519 let executor = create_executor(executor_params, node, store).await?;
520
521 let wrapped = WrapperExecutor::new(
523 executor,
524 actor_context.clone(),
525 env.config().developer.enable_executor_row_count,
526 env.config().developer.enable_explain_analyze_stats,
527 );
528 let executor = (info, wrapped).into();
529
530 let executor = if has_stateful && is_stateful {
532 let _ = subtasks;
538 executor
539 } else {
540 executor
541 };
542
543 Ok(executor)
544 }
545
546 async fn create_nodes(
548 &self,
549 fragment_id: FragmentId,
550 node: &stream_plan::StreamNode,
551 env: StreamEnvironment,
552 actor_context: &ActorContextRef,
553 vnode_bitmap: Option<Bitmap>,
554 local_barrier_manager: &LocalBarrierManager,
555 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
556 let mut subtasks = vec![];
557
558 let executor = dispatch_state_store!(env.state_store(), store, {
559 self.create_nodes_inner(
560 fragment_id,
561 node,
562 env,
563 store,
564 actor_context,
565 vnode_bitmap,
566 false,
567 &mut subtasks,
568 local_barrier_manager,
569 )
570 .await
571 })?;
572
573 Ok((executor, subtasks))
574 }
575
576 async fn create_actor(
577 self: Arc<Self>,
578 actor: BuildActorInfo,
579 fragment_id: FragmentId,
580 node: Arc<StreamNode>,
581 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
582 local_barrier_manager: LocalBarrierManager,
583 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
584 ) -> StreamResult<Actor<DispatchExecutor>> {
585 {
586 let actor_id = actor.actor_id;
587 let streaming_config = self.env.config().clone();
588 let actor_context = ActorContext::create(
589 &actor,
590 fragment_id,
591 self.env.total_mem_usage(),
592 self.streaming_metrics.clone(),
593 related_subscriptions,
594 self.env.meta_client().clone(),
595 streaming_config,
596 );
597 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
598 let expr_context = actor.expr_context.clone().unwrap();
599
600 let (executor, subtasks) = self
601 .create_nodes(
602 fragment_id,
603 &node,
604 self.env.clone(),
605 &actor_context,
606 vnode_bitmap,
607 &local_barrier_manager,
608 )
609 .await?;
610
611 let dispatcher = DispatchExecutor::new(
612 executor,
613 new_output_request_rx,
614 actor.dispatchers,
615 actor_id,
616 fragment_id,
617 local_barrier_manager.clone(),
618 self.streaming_metrics.clone(),
619 )
620 .await?;
621 let actor = Actor::new(
622 dispatcher,
623 subtasks,
624 self.streaming_metrics.clone(),
625 actor_context.clone(),
626 expr_context,
627 local_barrier_manager,
628 );
629 Ok(actor)
630 }
631 }
632}
633
634impl StreamActorManager {
635 pub(super) fn spawn_actor(
636 self: &Arc<Self>,
637 actor: BuildActorInfo,
638 fragment_id: FragmentId,
639 node: Arc<StreamNode>,
640 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
641 local_barrier_manager: LocalBarrierManager,
642 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
643 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
644 {
645 let monitor = tokio_metrics::TaskMonitor::new();
646 let stream_actor_ref = &actor;
647 let actor_id = stream_actor_ref.actor_id;
648 let handle = {
649 let trace_span =
650 format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
651 let barrier_manager = local_barrier_manager.clone();
652 let actor = self
654 .clone()
655 .create_actor(
656 actor,
657 fragment_id,
658 node,
659 related_subscriptions,
660 barrier_manager.clone(),
661 new_output_request_rx
662 ).boxed().and_then(|actor| actor.run()).map(move |result| {
663 if let Err(err) = result {
664 tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
667 barrier_manager.notify_failure(actor_id, err);
668 }
669 });
670 let traced = match &self.await_tree_reg {
671 Some(m) => m
672 .register(await_tree_key::Actor(actor_id), trace_span)
673 .instrument(actor)
674 .left_future(),
675 None => actor.right_future(),
676 };
677 let instrumented = monitor.instrument(traced);
678 let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
679 let may_track_hummock = with_config.may_trace_hummock();
681
682 self.runtime.spawn(may_track_hummock)
683 };
684
685 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
686 || self.env.config().developer.enable_actor_tokio_metrics
687 {
688 tracing::info!("Tokio metrics are enabled.");
689 let streaming_metrics = self.streaming_metrics.clone();
690 let actor_monitor_task = self.runtime.spawn(async move {
691 let metrics = streaming_metrics.new_actor_metrics(actor_id);
692 loop {
693 let task_metrics = monitor.cumulative();
694 metrics
695 .actor_execution_time
696 .set(task_metrics.total_poll_duration.as_secs_f64());
697 metrics
698 .actor_fast_poll_duration
699 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
700 metrics
701 .actor_fast_poll_cnt
702 .set(task_metrics.total_fast_poll_count as i64);
703 metrics
704 .actor_slow_poll_duration
705 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
706 metrics
707 .actor_slow_poll_cnt
708 .set(task_metrics.total_slow_poll_count as i64);
709 metrics
710 .actor_poll_duration
711 .set(task_metrics.total_poll_duration.as_secs_f64());
712 metrics
713 .actor_poll_cnt
714 .set(task_metrics.total_poll_count as i64);
715 metrics
716 .actor_idle_duration
717 .set(task_metrics.total_idle_duration.as_secs_f64());
718 metrics
719 .actor_idle_cnt
720 .set(task_metrics.total_idled_count as i64);
721 metrics
722 .actor_scheduled_duration
723 .set(task_metrics.total_scheduled_duration.as_secs_f64());
724 metrics
725 .actor_scheduled_cnt
726 .set(task_metrics.total_scheduled_count as i64);
727 tokio::time::sleep(Duration::from_secs(1)).await;
728 }
729 });
730 Some(actor_monitor_task)
731 } else {
732 None
733 };
734 (handle, monitor_handle)
735 }
736 }
737}
738
739#[cfg(test)]
740pub mod test_utils {
741 use risingwave_pb::common::{ActorInfo, HostAddress};
742
743 use super::*;
744
745 pub fn helper_make_local_actor(actor_id: u32) -> ActorInfo {
746 ActorInfo {
747 actor_id,
748 host: Some(HostAddress {
749 host: LOCAL_TEST_ADDR.host.clone(),
750 port: LOCAL_TEST_ADDR.port as i32,
751 }),
752 }
753 }
754}