risingwave_stream/executor/
actor.rs1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::log::LogSuppressor;
28use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
32use risingwave_pb::plan_common::ExprContext;
33use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
34use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
35use risingwave_rpc_client::MetaClient;
36use thiserror_ext::AsReport;
37use tokio_stream::StreamExt;
38use tracing::Instrument;
39
40use super::StreamConsumer;
41use super::monitor::StreamingMetrics;
42use super::subtask::SubtaskHandle;
43use crate::error::StreamResult;
44use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
45
46pub struct ActorContext {
48 pub id: ActorId,
49 pub fragment_id: u32,
50 pub vnode_count: usize,
51 pub mview_definition: String,
52
53 last_mem_val: Arc<AtomicUsize>,
55 cur_mem_val: Arc<AtomicUsize>,
56 total_mem_val: Arc<TrAdder<i64>>,
57
58 pub streaming_metrics: Arc<StreamingMetrics>,
59
60 pub initial_dispatch_num: usize,
62 pub initial_subscriber_ids: HashSet<u32>,
64 pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
65
66 pub meta_client: Option<MetaClient>,
68
69 pub streaming_config: Arc<StreamingConfig>,
70
71 pub stream_env: StreamEnvironment,
72}
73
74pub type ActorContextRef = Arc<ActorContext>;
75
76impl ActorContext {
77 pub fn for_test(id: ActorId) -> ActorContextRef {
78 Arc::new(Self {
79 id,
80 fragment_id: 0,
81 vnode_count: VirtualNode::COUNT_FOR_TEST,
82 mview_definition: "".to_owned(),
83 cur_mem_val: Arc::new(0.into()),
84 last_mem_val: Arc::new(0.into()),
85 total_mem_val: Arc::new(TrAdder::new()),
86 streaming_metrics: Arc::new(StreamingMetrics::unused()),
87 initial_dispatch_num: 1,
89 initial_subscriber_ids: Default::default(),
90 initial_upstream_actors: Default::default(),
91 meta_client: None,
92 streaming_config: Arc::new(StreamingConfig::default()),
93 stream_env: StreamEnvironment::for_test(),
94 })
95 }
96
97 #[allow(clippy::too_many_arguments)]
98 pub fn create(
99 stream_actor: &BuildActorInfo,
100 fragment_id: FragmentId,
101 total_mem_val: Arc<TrAdder<i64>>,
102 streaming_metrics: Arc<StreamingMetrics>,
103 meta_client: Option<MetaClient>,
104 streaming_config: Arc<StreamingConfig>,
105 stream_env: StreamEnvironment,
106 ) -> ActorContextRef {
107 Arc::new(Self {
108 id: stream_actor.actor_id,
109 fragment_id,
110 mview_definition: stream_actor.mview_definition.clone(),
111 vnode_count: (stream_actor.vnode_bitmap.as_ref())
112 .map_or(1, |b| Bitmap::from(b).len()),
115 cur_mem_val: Arc::new(0.into()),
116 last_mem_val: Arc::new(0.into()),
117 total_mem_val,
118 streaming_metrics,
119 initial_dispatch_num: stream_actor.dispatchers.len(),
120 initial_subscriber_ids: stream_actor
121 .initial_subscriber_ids
122 .iter()
123 .copied()
124 .collect(),
125 initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
126 meta_client,
127 streaming_config,
128 stream_env,
129 })
130 }
131
132 pub fn on_compute_error(&self, err: ExprError, identity: &str) {
133 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
134 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
135 tracing::error!(identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
136 }
137
138 let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
139 GLOBAL_ERROR_METRICS.user_compute_error.report([
140 "ExprError".to_owned(),
141 executor_name.to_owned(),
142 self.fragment_id.to_string(),
143 ]);
144 }
145
146 pub fn store_mem_usage(&self, val: usize) {
147 let old_value = self.cur_mem_val.load(Ordering::Relaxed);
151 self.last_mem_val.store(old_value, Ordering::Relaxed);
152 let diff = val as i64 - old_value as i64;
153
154 self.total_mem_val.inc(diff);
155
156 self.cur_mem_val.store(val, Ordering::Relaxed);
157 }
158
159 pub fn mem_usage(&self) -> usize {
160 self.cur_mem_val.load(Ordering::Relaxed)
161 }
162}
163
164pub struct Actor<C> {
166 consumer: C,
168 subtasks: Vec<SubtaskHandle>,
170
171 pub actor_context: ActorContextRef,
172 expr_context: ExprContext,
173 barrier_manager: LocalBarrierManager,
174}
175
176impl<C> Actor<C>
177where
178 C: StreamConsumer,
179{
180 pub fn new(
181 consumer: C,
182 subtasks: Vec<SubtaskHandle>,
183 _metrics: Arc<StreamingMetrics>,
184 actor_context: ActorContextRef,
185 expr_context: ExprContext,
186 barrier_manager: LocalBarrierManager,
187 ) -> Self {
188 Self {
189 consumer,
190 subtasks,
191 actor_context,
192 expr_context,
193 barrier_manager,
194 }
195 }
196
197 #[inline(always)]
198 pub async fn run(mut self) -> StreamResult<()> {
199 let expr_context = self.expr_context.clone();
200 let fragment_id = self.actor_context.fragment_id;
201 let vnode_count = self.actor_context.vnode_count;
202
203 let run = async move {
204 tokio::join!(
205 join_all(std::mem::take(&mut self.subtasks)),
207 self.run_consumer(),
208 )
209 .1
210 }
211 .boxed();
212
213 let run = expr_context_scope(expr_context, run);
215 let run = FRAGMENT_ID::scope(fragment_id, run);
216 let run = VNODE_COUNT::scope(vnode_count, run);
217
218 run.await
219 }
220
221 async fn run_consumer(self) -> StreamResult<()> {
222 fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
223 "intentional start_actors_err"
224 )
225 .into()));
226
227 let id = self.actor_context.id;
228 let span_name = format!("Actor {id}");
229
230 let new_span = |epoch: Option<EpochPair>| {
231 tracing::info_span!(
232 parent: None,
233 "actor",
234 "otel.name" = span_name,
235 actor_id = id,
236 prev_epoch = epoch.map(|e| e.prev),
237 curr_epoch = epoch.map(|e| e.curr),
238 )
239 };
240 let mut span = new_span(None);
241
242 let actor_count = self
243 .actor_context
244 .streaming_metrics
245 .actor_count
246 .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
247 let _actor_count_guard = actor_count.inc_guard();
248
249 let current_epoch = self
250 .actor_context
251 .streaming_metrics
252 .actor_current_epoch
253 .with_guarded_label_values(&[
254 &self.actor_context.id.to_string(),
255 &self.actor_context.fragment_id.to_string(),
256 ]);
257
258 let mut last_epoch: Option<EpochPair> = None;
259 let mut stream = Box::pin(Box::new(self.consumer).execute());
260
261 let result = loop {
263 let barrier = match stream
264 .try_next()
265 .instrument(span.clone())
266 .instrument_await(
267 last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
268 await_tree::span!("Epoch {}", e.curr)
269 }),
270 )
271 .await
272 {
273 Ok(Some(barrier)) => barrier,
274 Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
275 Err(err) => break Err(err),
276 };
277
278 fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
279 "intentional collect_actors_err"
280 )
281 .into()));
282
283 if barrier.is_stop(id) {
285 debug!(actor_id = id, epoch = ?barrier.epoch, "stop at barrier");
286 break Ok(barrier);
287 }
288
289 current_epoch.set(barrier.epoch.curr as i64);
290
291 self.barrier_manager.collect(id, &barrier);
293
294 last_epoch = Some(barrier.epoch);
296 span = barrier.tracing_context().attach(new_span(last_epoch));
297 };
298
299 spawn_blocking_drop_stream(stream).await;
300
301 let result = result.map(|stop_barrier| {
302 self.barrier_manager.collect(id, &stop_barrier);
304 });
305
306 tracing::debug!(actor_id = id, ok = result.is_ok(), "actor exit");
307 result
308 }
309}
310
311pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
319 let _ = tokio::task::spawn_blocking(move || drop(stream))
320 .instrument_await("drop_stream")
321 .await;
322}