risingwave_stream/executor/
actor.rs1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::VirtualNode;
27use risingwave_common::log::LogSuppressor;
28use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::plan_common::ExprContext;
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
36use risingwave_rpc_client::MetaClient;
37use thiserror_ext::AsReport;
38use tokio_stream::StreamExt;
39use tracing::Instrument;
40
41use super::StreamConsumer;
42use super::monitor::StreamingMetrics;
43use super::subtask::SubtaskHandle;
44use crate::CONFIG;
45use crate::error::StreamResult;
46use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
47
48pub struct ActorContext {
50 pub id: ActorId,
51 pub fragment_id: FragmentId,
52 pub vnode_count: usize,
53 pub mview_definition: String,
54
55 last_mem_val: Arc<AtomicUsize>,
57 cur_mem_val: Arc<AtomicUsize>,
58 total_mem_val: Arc<TrAdder<i64>>,
59
60 pub streaming_metrics: Arc<StreamingMetrics>,
61
62 pub initial_dispatch_num: usize,
64 pub initial_subscriber_ids: HashSet<SubscriberId>,
66 pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
67
68 pub meta_client: Option<MetaClient>,
70
71 pub config: Arc<StreamingConfig>,
75
76 pub stream_env: StreamEnvironment,
77}
78
79pub type ActorContextRef = Arc<ActorContext>;
80
81impl ActorContext {
82 pub fn for_test(id: impl Into<ActorId>) -> ActorContextRef {
83 Self::for_test_with_config(id, StreamingConfig::default())
84 }
85
86 pub fn for_test_with_config(
87 id: impl Into<ActorId>,
88 config: StreamingConfig,
89 ) -> ActorContextRef {
90 Arc::new(Self {
91 id: id.into(),
92 fragment_id: 0.into(),
93 vnode_count: VirtualNode::COUNT_FOR_TEST,
94 mview_definition: "".to_owned(),
95 cur_mem_val: Arc::new(0.into()),
96 last_mem_val: Arc::new(0.into()),
97 total_mem_val: Arc::new(TrAdder::new()),
98 streaming_metrics: Arc::new(StreamingMetrics::unused()),
99 initial_dispatch_num: 1,
101 initial_subscriber_ids: Default::default(),
102 initial_upstream_actors: Default::default(),
103 meta_client: None,
104 config: Arc::new(config),
105 stream_env: StreamEnvironment::for_test(),
106 })
107 }
108
109 #[allow(clippy::too_many_arguments)]
110 pub fn create(
111 stream_actor: &BuildActorInfo,
112 fragment_id: FragmentId,
113 total_mem_val: Arc<TrAdder<i64>>,
114 streaming_metrics: Arc<StreamingMetrics>,
115 meta_client: Option<MetaClient>,
116 config: Arc<StreamingConfig>,
117 stream_env: StreamEnvironment,
118 ) -> ActorContextRef {
119 Arc::new(Self {
120 id: stream_actor.actor_id,
121 fragment_id,
122 mview_definition: stream_actor.mview_definition.clone(),
123 vnode_count: (stream_actor.vnode_bitmap.as_ref())
124 .map_or(1, |b| Bitmap::from(b).len()),
127 cur_mem_val: Arc::new(0.into()),
128 last_mem_val: Arc::new(0.into()),
129 total_mem_val,
130 streaming_metrics,
131 initial_dispatch_num: stream_actor.dispatchers.len(),
132 initial_subscriber_ids: stream_actor
133 .initial_subscriber_ids
134 .iter()
135 .copied()
136 .collect(),
137 initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
138 meta_client,
139 config,
140 stream_env,
141 })
142 }
143
144 pub fn on_compute_error(&self, err: ExprError, identity: &str) {
145 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
146 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
147 tracing::error!(target: "stream_expr_error", identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
148 }
149
150 let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
151 GLOBAL_ERROR_METRICS.user_compute_error.report([
152 "ExprError".to_owned(),
153 executor_name.to_owned(),
154 self.fragment_id.to_string(),
155 ]);
156 }
157
158 pub fn store_mem_usage(&self, val: usize) {
159 let old_value = self.cur_mem_val.load(Ordering::Relaxed);
163 self.last_mem_val.store(old_value, Ordering::Relaxed);
164 let diff = val as i64 - old_value as i64;
165
166 self.total_mem_val.inc(diff);
167
168 self.cur_mem_val.store(val, Ordering::Relaxed);
169 }
170
171 pub fn mem_usage(&self) -> usize {
172 self.cur_mem_val.load(Ordering::Relaxed)
173 }
174}
175
176pub struct Actor<C> {
178 consumer: C,
180 subtasks: Vec<SubtaskHandle>,
182
183 pub actor_context: ActorContextRef,
184 expr_context: ExprContext,
185 barrier_manager: LocalBarrierManager,
186}
187
188impl<C> Actor<C>
189where
190 C: StreamConsumer,
191{
192 pub fn new(
193 consumer: C,
194 subtasks: Vec<SubtaskHandle>,
195 _metrics: Arc<StreamingMetrics>,
196 actor_context: ActorContextRef,
197 expr_context: ExprContext,
198 barrier_manager: LocalBarrierManager,
199 ) -> Self {
200 Self {
201 consumer,
202 subtasks,
203 actor_context,
204 expr_context,
205 barrier_manager,
206 }
207 }
208
209 #[inline(always)]
210 pub async fn run(mut self) -> StreamResult<()> {
211 let expr_context = self.expr_context.clone();
212 let fragment_id = self.actor_context.fragment_id;
213 let vnode_count = self.actor_context.vnode_count;
214 let config = self.actor_context.config.clone();
215
216 let run = async move {
217 tokio::join!(
218 join_all(std::mem::take(&mut self.subtasks)),
220 self.run_consumer(),
221 )
222 .1
223 }
224 .boxed();
225
226 let run = expr_context_scope(expr_context, run);
228 let run = FRAGMENT_ID::scope(fragment_id, run);
229 let run = VNODE_COUNT::scope(vnode_count, run);
230 let run = CONFIG.scope(config, run);
231
232 run.await
233 }
234
235 async fn run_consumer(self) -> StreamResult<()> {
236 fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
237 "intentional start_actors_err"
238 )
239 .into()));
240
241 let id = self.actor_context.id;
242 let span_name = format!("Actor {id}");
243
244 let new_span = |epoch: Option<EpochPair>| {
245 tracing::info_span!(
246 parent: None,
247 "actor",
248 "otel.name" = span_name,
249 actor_id = %id,
250 prev_epoch = epoch.map(|e| e.prev),
251 curr_epoch = epoch.map(|e| e.curr),
252 )
253 };
254 let mut span = new_span(None);
255
256 let actor_count = self
257 .actor_context
258 .streaming_metrics
259 .actor_count
260 .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
261 let _actor_count_guard = actor_count.inc_guard();
262
263 let current_epoch = self
264 .actor_context
265 .streaming_metrics
266 .actor_current_epoch
267 .with_guarded_label_values(&[
268 &self.actor_context.id.to_string(),
269 &self.actor_context.fragment_id.to_string(),
270 ]);
271
272 let mut last_epoch: Option<EpochPair> = None;
273 let mut stream = Box::pin(Box::new(self.consumer).execute());
274
275 let result = loop {
277 let barrier = match stream
278 .try_next()
279 .instrument(span.clone())
280 .instrument_await(
281 last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
282 await_tree::span!("Epoch {}", e.curr)
283 }),
284 )
285 .await
286 {
287 Ok(Some(barrier)) => barrier,
288 Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
289 Err(err) => break Err(err),
290 };
291
292 fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
293 "intentional collect_actors_err"
294 )
295 .into()));
296
297 if barrier.is_stop(id) {
299 debug!(actor_id = %id, epoch = ?barrier.epoch, "stop at barrier");
300 break Ok(barrier);
301 }
302
303 current_epoch.set(barrier.epoch.curr as i64);
304
305 self.barrier_manager.collect(id, &barrier);
307
308 last_epoch = Some(barrier.epoch);
310 span = barrier.tracing_context().attach(new_span(last_epoch));
311 };
312
313 spawn_blocking_drop_stream(stream).await;
314
315 let result = result.map(|stop_barrier| {
316 self.barrier_manager.collect(id, &stop_barrier);
318 });
319
320 tracing::debug!(actor_id = %id, ok = result.is_ok(), "actor exit");
321 result
322 }
323}
324
325pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
333 let _ = tokio::task::spawn_blocking(move || drop(stream))
334 .instrument_await("drop_stream")
335 .await;
336}