risingwave_stream/executor/
actor.rs1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use chrono_tz::{Tz, UTC};
22use futures::FutureExt;
23use futures::future::join_all;
24use hytra::TrAdder;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::log::LogSuppressor;
29use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
30use risingwave_common::types::Timestamptz;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_expr::ExprError;
33use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
34use risingwave_pb::id::SubscriberId;
35use risingwave_pb::plan_common::ExprContext;
36use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
37use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
38use risingwave_rpc_client::MetaClient;
39use thiserror_ext::AsReport;
40use tokio_stream::StreamExt;
41use tracing::Instrument;
42
43use super::StreamConsumer;
44use super::monitor::StreamingMetrics;
45use super::subtask::SubtaskHandle;
46use crate::CONFIG;
47use crate::error::StreamResult;
48use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
49
50pub struct ActorContext {
52 pub id: ActorId,
53 pub fragment_id: FragmentId,
54 pub vnode_count: usize,
55 pub mview_definition: String,
56
57 last_mem_val: Arc<AtomicUsize>,
59 cur_mem_val: Arc<AtomicUsize>,
60 total_mem_val: Arc<TrAdder<i64>>,
61
62 pub streaming_metrics: Arc<StreamingMetrics>,
63
64 pub initial_dispatch_num: usize,
66 pub initial_subscriber_ids: HashSet<SubscriberId>,
68 pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
69
70 pub meta_client: Option<MetaClient>,
72
73 pub config: Arc<StreamingConfig>,
77
78 pub time_zone: Tz,
79
80 pub stream_env: StreamEnvironment,
81}
82
83pub type ActorContextRef = Arc<ActorContext>;
84
85impl ActorContext {
86 pub fn for_test(id: impl Into<ActorId>) -> ActorContextRef {
87 Self::for_test_with_config(id, StreamingConfig::default())
88 }
89
90 pub fn for_test_with_config(
91 id: impl Into<ActorId>,
92 config: StreamingConfig,
93 ) -> ActorContextRef {
94 Arc::new(Self {
95 id: id.into(),
96 fragment_id: 0.into(),
97 vnode_count: VirtualNode::COUNT_FOR_TEST,
98 mview_definition: "".to_owned(),
99 cur_mem_val: Arc::new(0.into()),
100 last_mem_val: Arc::new(0.into()),
101 total_mem_val: Arc::new(TrAdder::new()),
102 streaming_metrics: Arc::new(StreamingMetrics::unused()),
103 initial_dispatch_num: 1,
105 initial_subscriber_ids: Default::default(),
106 initial_upstream_actors: Default::default(),
107 meta_client: None,
108 config: Arc::new(config),
109 time_zone: UTC,
110 stream_env: StreamEnvironment::for_test(),
111 })
112 }
113
114 pub fn create(
115 stream_actor: &BuildActorInfo,
116 fragment_id: FragmentId,
117 total_mem_val: Arc<TrAdder<i64>>,
118 streaming_metrics: Arc<StreamingMetrics>,
119 meta_client: Option<MetaClient>,
120 config: Arc<StreamingConfig>,
121 stream_env: StreamEnvironment,
122 ) -> ActorContextRef {
123 let time_zone = &stream_actor.expr_context.as_ref().unwrap().time_zone;
124 let time_zone = match Timestamptz::lookup_time_zone(time_zone) {
125 Ok(time_zone) => time_zone,
126 Err(err) => {
127 tracing::warn!(
128 time_zone,
129 error = %err,
130 "failed to parse actor time zone, falling back to UTC"
131 );
132 UTC
133 }
134 };
135
136 Arc::new(Self {
137 id: stream_actor.actor_id,
138 fragment_id,
139 mview_definition: stream_actor.mview_definition.clone(),
140 vnode_count: (stream_actor.vnode_bitmap.as_ref())
141 .map_or(1, |b| Bitmap::from(b).len()),
144 cur_mem_val: Arc::new(0.into()),
145 last_mem_val: Arc::new(0.into()),
146 total_mem_val,
147 streaming_metrics,
148 initial_dispatch_num: stream_actor.dispatchers.len(),
149 initial_subscriber_ids: stream_actor
150 .initial_subscriber_ids
151 .iter()
152 .copied()
153 .collect(),
154 initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
155 meta_client,
156 config,
157 time_zone,
158 stream_env,
159 })
160 }
161
162 pub fn on_compute_error(&self, err: ExprError, identity: &str) {
163 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
164 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
165 tracing::error!(target: "stream_expr_error", identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
166 }
167
168 let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
169 GLOBAL_ERROR_METRICS.user_compute_error.report([
170 "ExprError".to_owned(),
171 executor_name.to_owned(),
172 self.fragment_id.to_string(),
173 ]);
174 }
175
176 pub fn store_mem_usage(&self, val: usize) {
177 let old_value = self.cur_mem_val.load(Ordering::Relaxed);
181 self.last_mem_val.store(old_value, Ordering::Relaxed);
182 let diff = val as i64 - old_value as i64;
183
184 self.total_mem_val.inc(diff);
185
186 self.cur_mem_val.store(val, Ordering::Relaxed);
187 }
188
189 pub fn mem_usage(&self) -> usize {
190 self.cur_mem_val.load(Ordering::Relaxed)
191 }
192}
193
194pub struct Actor<C> {
196 consumer: C,
198 subtasks: Vec<SubtaskHandle>,
200
201 pub actor_context: ActorContextRef,
202 expr_context: ExprContext,
203 barrier_manager: LocalBarrierManager,
204}
205
206impl<C> Actor<C>
207where
208 C: StreamConsumer,
209{
210 pub fn new(
211 consumer: C,
212 subtasks: Vec<SubtaskHandle>,
213 _metrics: Arc<StreamingMetrics>,
214 actor_context: ActorContextRef,
215 expr_context: ExprContext,
216 barrier_manager: LocalBarrierManager,
217 ) -> Self {
218 Self {
219 consumer,
220 subtasks,
221 actor_context,
222 expr_context,
223 barrier_manager,
224 }
225 }
226
227 #[inline(always)]
228 pub async fn run(mut self) -> StreamResult<()> {
229 let expr_context = self.expr_context.clone();
230 let fragment_id = self.actor_context.fragment_id;
231 let vnode_count = self.actor_context.vnode_count;
232 let config = self.actor_context.config.clone();
233
234 let run = async move {
235 tokio::join!(
236 join_all(std::mem::take(&mut self.subtasks)),
238 self.run_consumer(),
239 )
240 .1
241 }
242 .boxed();
243
244 let run = expr_context_scope(expr_context, run);
246 let run = FRAGMENT_ID::scope(fragment_id, run);
247 let run = VNODE_COUNT::scope(vnode_count, run);
248 let run = CONFIG.scope(config, run);
249
250 run.await
251 }
252
253 async fn run_consumer(self) -> StreamResult<()> {
254 fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
255 "intentional start_actors_err"
256 )
257 .into()));
258
259 let id = self.actor_context.id;
260 let span_name = format!("Actor {id}");
261
262 let new_span = |epoch: Option<EpochPair>| {
263 tracing::info_span!(
264 parent: None,
265 "actor",
266 "otel.name" = span_name,
267 actor_id = %id,
268 prev_epoch = epoch.map(|e| e.prev),
269 curr_epoch = epoch.map(|e| e.curr),
270 )
271 };
272 let mut span = new_span(None);
273
274 let actor_count = self
275 .actor_context
276 .streaming_metrics
277 .actor_count
278 .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
279 let _actor_count_guard = actor_count.inc_guard();
280
281 let current_epoch = self
282 .actor_context
283 .streaming_metrics
284 .actor_current_epoch
285 .with_guarded_label_values(&[
286 &self.actor_context.id.to_string(),
287 &self.actor_context.fragment_id.to_string(),
288 ]);
289
290 let mut last_epoch: Option<EpochPair> = None;
291 let mut stream = Box::pin(Box::new(self.consumer).execute());
292
293 let result = loop {
295 let barrier = match stream
296 .try_next()
297 .instrument(span.clone())
298 .instrument_await(
299 last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
300 await_tree::span!("Epoch {}", e.curr)
301 }),
302 )
303 .await
304 {
305 Ok(Some(barrier)) => barrier,
306 Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
307 Err(err) => break Err(err),
308 };
309
310 fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
311 "intentional collect_actors_err"
312 )
313 .into()));
314
315 if barrier.is_stop(id) {
317 debug!(actor_id = %id, epoch = ?barrier.epoch, "stop at barrier");
318 break Ok(barrier);
319 }
320
321 current_epoch.set(barrier.epoch.curr as i64);
322
323 self.barrier_manager.collect(id, &barrier);
325
326 last_epoch = Some(barrier.epoch);
328 span = barrier.tracing_context().attach(new_span(last_epoch));
329 };
330
331 spawn_blocking_drop_stream(stream).await;
332
333 let result = result.map(|stop_barrier| {
334 self.barrier_manager.collect(id, &stop_barrier);
336 });
337
338 tracing::debug!(actor_id = %id, ok = result.is_ok(), "actor exit");
339 result
340 }
341}
342
343pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
351 let _ = tokio::task::spawn_blocking(move || drop(stream))
352 .instrument_await("drop_stream")
353 .await;
354}