risingwave_stream/executor/
actor.rs1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::log::LogSuppressor;
28use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::plan_common::ExprContext;
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
36use risingwave_rpc_client::MetaClient;
37use thiserror_ext::AsReport;
38use tokio_stream::StreamExt;
39use tracing::Instrument;
40
41use super::StreamConsumer;
42use super::monitor::StreamingMetrics;
43use super::subtask::SubtaskHandle;
44use crate::CONFIG;
45use crate::error::StreamResult;
46use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
47
48pub struct ActorContext {
50 pub id: ActorId,
51 pub fragment_id: FragmentId,
52 pub vnode_count: usize,
53 pub mview_definition: String,
54
55 last_mem_val: Arc<AtomicUsize>,
57 cur_mem_val: Arc<AtomicUsize>,
58 total_mem_val: Arc<TrAdder<i64>>,
59
60 pub streaming_metrics: Arc<StreamingMetrics>,
61
62 pub initial_dispatch_num: usize,
64 pub initial_subscriber_ids: HashSet<SubscriberId>,
66 pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
67
68 pub meta_client: Option<MetaClient>,
70
71 pub config: Arc<StreamingConfig>,
75
76 pub stream_env: StreamEnvironment,
77}
78
79pub type ActorContextRef = Arc<ActorContext>;
80
81impl ActorContext {
82 pub fn for_test(id: impl Into<ActorId>) -> ActorContextRef {
83 Arc::new(Self {
84 id: id.into(),
85 fragment_id: 0.into(),
86 vnode_count: VirtualNode::COUNT_FOR_TEST,
87 mview_definition: "".to_owned(),
88 cur_mem_val: Arc::new(0.into()),
89 last_mem_val: Arc::new(0.into()),
90 total_mem_val: Arc::new(TrAdder::new()),
91 streaming_metrics: Arc::new(StreamingMetrics::unused()),
92 initial_dispatch_num: 1,
94 initial_subscriber_ids: Default::default(),
95 initial_upstream_actors: Default::default(),
96 meta_client: None,
97 config: Arc::new(StreamingConfig::default()),
98 stream_env: StreamEnvironment::for_test(),
99 })
100 }
101
102 #[allow(clippy::too_many_arguments)]
103 pub fn create(
104 stream_actor: &BuildActorInfo,
105 fragment_id: FragmentId,
106 total_mem_val: Arc<TrAdder<i64>>,
107 streaming_metrics: Arc<StreamingMetrics>,
108 meta_client: Option<MetaClient>,
109 config: Arc<StreamingConfig>,
110 stream_env: StreamEnvironment,
111 ) -> ActorContextRef {
112 Arc::new(Self {
113 id: stream_actor.actor_id,
114 fragment_id,
115 mview_definition: stream_actor.mview_definition.clone(),
116 vnode_count: (stream_actor.vnode_bitmap.as_ref())
117 .map_or(1, |b| Bitmap::from(b).len()),
120 cur_mem_val: Arc::new(0.into()),
121 last_mem_val: Arc::new(0.into()),
122 total_mem_val,
123 streaming_metrics,
124 initial_dispatch_num: stream_actor.dispatchers.len(),
125 initial_subscriber_ids: stream_actor
126 .initial_subscriber_ids
127 .iter()
128 .copied()
129 .collect(),
130 initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
131 meta_client,
132 config,
133 stream_env,
134 })
135 }
136
137 pub fn on_compute_error(&self, err: ExprError, identity: &str) {
138 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
139 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
140 tracing::error!(target: "stream_expr_error", identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
141 }
142
143 let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
144 GLOBAL_ERROR_METRICS.user_compute_error.report([
145 "ExprError".to_owned(),
146 executor_name.to_owned(),
147 self.fragment_id.to_string(),
148 ]);
149 }
150
151 pub fn store_mem_usage(&self, val: usize) {
152 let old_value = self.cur_mem_val.load(Ordering::Relaxed);
156 self.last_mem_val.store(old_value, Ordering::Relaxed);
157 let diff = val as i64 - old_value as i64;
158
159 self.total_mem_val.inc(diff);
160
161 self.cur_mem_val.store(val, Ordering::Relaxed);
162 }
163
164 pub fn mem_usage(&self) -> usize {
165 self.cur_mem_val.load(Ordering::Relaxed)
166 }
167}
168
169pub struct Actor<C> {
171 consumer: C,
173 subtasks: Vec<SubtaskHandle>,
175
176 pub actor_context: ActorContextRef,
177 expr_context: ExprContext,
178 barrier_manager: LocalBarrierManager,
179}
180
181impl<C> Actor<C>
182where
183 C: StreamConsumer,
184{
185 pub fn new(
186 consumer: C,
187 subtasks: Vec<SubtaskHandle>,
188 _metrics: Arc<StreamingMetrics>,
189 actor_context: ActorContextRef,
190 expr_context: ExprContext,
191 barrier_manager: LocalBarrierManager,
192 ) -> Self {
193 Self {
194 consumer,
195 subtasks,
196 actor_context,
197 expr_context,
198 barrier_manager,
199 }
200 }
201
202 #[inline(always)]
203 pub async fn run(mut self) -> StreamResult<()> {
204 let expr_context = self.expr_context.clone();
205 let fragment_id = self.actor_context.fragment_id;
206 let vnode_count = self.actor_context.vnode_count;
207 let config = self.actor_context.config.clone();
208
209 let run = async move {
210 tokio::join!(
211 join_all(std::mem::take(&mut self.subtasks)),
213 self.run_consumer(),
214 )
215 .1
216 }
217 .boxed();
218
219 let run = expr_context_scope(expr_context, run);
221 let run = FRAGMENT_ID::scope(fragment_id, run);
222 let run = VNODE_COUNT::scope(vnode_count, run);
223 let run = CONFIG.scope(config, run);
224
225 run.await
226 }
227
228 async fn run_consumer(self) -> StreamResult<()> {
229 fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
230 "intentional start_actors_err"
231 )
232 .into()));
233
234 let id = self.actor_context.id;
235 let span_name = format!("Actor {id}");
236
237 let new_span = |epoch: Option<EpochPair>| {
238 tracing::info_span!(
239 parent: None,
240 "actor",
241 "otel.name" = span_name,
242 actor_id = %id,
243 prev_epoch = epoch.map(|e| e.prev),
244 curr_epoch = epoch.map(|e| e.curr),
245 )
246 };
247 let mut span = new_span(None);
248
249 let actor_count = self
250 .actor_context
251 .streaming_metrics
252 .actor_count
253 .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
254 let _actor_count_guard = actor_count.inc_guard();
255
256 let current_epoch = self
257 .actor_context
258 .streaming_metrics
259 .actor_current_epoch
260 .with_guarded_label_values(&[
261 &self.actor_context.id.to_string(),
262 &self.actor_context.fragment_id.to_string(),
263 ]);
264
265 let mut last_epoch: Option<EpochPair> = None;
266 let mut stream = Box::pin(Box::new(self.consumer).execute());
267
268 let result = loop {
270 let barrier = match stream
271 .try_next()
272 .instrument(span.clone())
273 .instrument_await(
274 last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
275 await_tree::span!("Epoch {}", e.curr)
276 }),
277 )
278 .await
279 {
280 Ok(Some(barrier)) => barrier,
281 Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
282 Err(err) => break Err(err),
283 };
284
285 fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
286 "intentional collect_actors_err"
287 )
288 .into()));
289
290 if barrier.is_stop(id) {
292 debug!(actor_id = %id, epoch = ?barrier.epoch, "stop at barrier");
293 break Ok(barrier);
294 }
295
296 current_epoch.set(barrier.epoch.curr as i64);
297
298 self.barrier_manager.collect(id, &barrier);
300
301 last_epoch = Some(barrier.epoch);
303 span = barrier.tracing_context().attach(new_span(last_epoch));
304 };
305
306 spawn_blocking_drop_stream(stream).await;
307
308 let result = result.map(|stop_barrier| {
309 self.barrier_manager.collect(id, &stop_barrier);
311 });
312
313 tracing::debug!(actor_id = %id, ok = result.is_ok(), "actor exit");
314 result
315 }
316}
317
318pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
326 let _ = tokio::task::spawn_blocking(move || drop(stream))
327 .instrument_await("drop_stream")
328 .await;
329}