risingwave_stream/executor/
actor.rs1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::{Arc, LazyLock};
18
19use anyhow::anyhow;
20use await_tree::InstrumentAwait;
21use futures::FutureExt;
22use futures::future::join_all;
23use hytra::TrAdder;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::TableId;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::log::LogSuppresser;
29use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, IntGaugeExt};
30use risingwave_common::util::epoch::EpochPair;
31use risingwave_expr::ExprError;
32use risingwave_expr::expr_context::{FRAGMENT_ID, VNODE_COUNT, expr_context_scope};
33use risingwave_pb::plan_common::ExprContext;
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
36use risingwave_rpc_client::MetaClient;
37use thiserror_ext::AsReport;
38use tokio_stream::StreamExt;
39use tracing::Instrument;
40
41use super::StreamConsumer;
42use super::monitor::StreamingMetrics;
43use super::subtask::SubtaskHandle;
44use crate::error::StreamResult;
45use crate::task::{ActorId, FragmentId, LocalBarrierManager, StreamEnvironment};
46
47pub struct ActorContext {
49 pub id: ActorId,
50 pub fragment_id: u32,
51 pub vnode_count: usize,
52 pub mview_definition: String,
53
54 last_mem_val: Arc<AtomicUsize>,
56 cur_mem_val: Arc<AtomicUsize>,
57 total_mem_val: Arc<TrAdder<i64>>,
58
59 pub streaming_metrics: Arc<StreamingMetrics>,
60
61 pub initial_dispatch_num: usize,
63 pub related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
65 pub initial_upstream_actors: HashMap<FragmentId, UpstreamActors>,
66
67 pub meta_client: Option<MetaClient>,
69
70 pub streaming_config: Arc<StreamingConfig>,
71
72 pub stream_env: StreamEnvironment,
73}
74
75pub type ActorContextRef = Arc<ActorContext>;
76
77impl ActorContext {
78 pub fn for_test(id: ActorId) -> ActorContextRef {
79 Arc::new(Self {
80 id,
81 fragment_id: 0,
82 vnode_count: VirtualNode::COUNT_FOR_TEST,
83 mview_definition: "".to_owned(),
84 cur_mem_val: Arc::new(0.into()),
85 last_mem_val: Arc::new(0.into()),
86 total_mem_val: Arc::new(TrAdder::new()),
87 streaming_metrics: Arc::new(StreamingMetrics::unused()),
88 initial_dispatch_num: 1,
90 related_subscriptions: HashMap::new().into(),
91 initial_upstream_actors: Default::default(),
92 meta_client: None,
93 streaming_config: Arc::new(StreamingConfig::default()),
94 stream_env: StreamEnvironment::for_test(),
95 })
96 }
97
98 #[allow(clippy::too_many_arguments)]
99 pub fn create(
100 stream_actor: &BuildActorInfo,
101 fragment_id: FragmentId,
102 total_mem_val: Arc<TrAdder<i64>>,
103 streaming_metrics: Arc<StreamingMetrics>,
104 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
105 meta_client: Option<MetaClient>,
106 streaming_config: Arc<StreamingConfig>,
107 stream_env: StreamEnvironment,
108 ) -> ActorContextRef {
109 Arc::new(Self {
110 id: stream_actor.actor_id,
111 fragment_id,
112 mview_definition: stream_actor.mview_definition.clone(),
113 vnode_count: (stream_actor.vnode_bitmap.as_ref())
114 .map_or(1, |b| Bitmap::from(b).len()),
117 cur_mem_val: Arc::new(0.into()),
118 last_mem_val: Arc::new(0.into()),
119 total_mem_val,
120 streaming_metrics,
121 initial_dispatch_num: stream_actor.dispatchers.len(),
122 related_subscriptions,
123 initial_upstream_actors: stream_actor.fragment_upstreams.clone(),
124 meta_client,
125 streaming_config,
126 stream_env,
127 })
128 }
129
130 pub fn on_compute_error(&self, err: ExprError, identity: &str) {
131 static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
132 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
133 tracing::error!(identity, error = %err.as_report(), suppressed_count, "failed to evaluate expression");
134 }
135
136 let executor_name = identity.split(' ').next().unwrap_or("name_not_found");
137 GLOBAL_ERROR_METRICS.user_compute_error.report([
138 "ExprError".to_owned(),
139 executor_name.to_owned(),
140 self.fragment_id.to_string(),
141 ]);
142 }
143
144 pub fn store_mem_usage(&self, val: usize) {
145 let old_value = self.cur_mem_val.load(Ordering::Relaxed);
149 self.last_mem_val.store(old_value, Ordering::Relaxed);
150 let diff = val as i64 - old_value as i64;
151
152 self.total_mem_val.inc(diff);
153
154 self.cur_mem_val.store(val, Ordering::Relaxed);
155 }
156
157 pub fn mem_usage(&self) -> usize {
158 self.cur_mem_val.load(Ordering::Relaxed)
159 }
160}
161
162pub struct Actor<C> {
164 consumer: C,
166 subtasks: Vec<SubtaskHandle>,
168
169 pub actor_context: ActorContextRef,
170 expr_context: ExprContext,
171 barrier_manager: LocalBarrierManager,
172}
173
174impl<C> Actor<C>
175where
176 C: StreamConsumer,
177{
178 pub fn new(
179 consumer: C,
180 subtasks: Vec<SubtaskHandle>,
181 _metrics: Arc<StreamingMetrics>,
182 actor_context: ActorContextRef,
183 expr_context: ExprContext,
184 barrier_manager: LocalBarrierManager,
185 ) -> Self {
186 Self {
187 consumer,
188 subtasks,
189 actor_context,
190 expr_context,
191 barrier_manager,
192 }
193 }
194
195 #[inline(always)]
196 pub async fn run(mut self) -> StreamResult<()> {
197 let expr_context = self.expr_context.clone();
198 let fragment_id = self.actor_context.fragment_id;
199 let vnode_count = self.actor_context.vnode_count;
200
201 let run = async move {
202 tokio::join!(
203 join_all(std::mem::take(&mut self.subtasks)),
205 self.run_consumer(),
206 )
207 .1
208 }
209 .boxed();
210
211 let run = expr_context_scope(expr_context, run);
213 let run = FRAGMENT_ID::scope(fragment_id, run);
214 let run = VNODE_COUNT::scope(vnode_count, run);
215
216 run.await
217 }
218
219 async fn run_consumer(self) -> StreamResult<()> {
220 fail::fail_point!("start_actors_err", |_| Err(anyhow::anyhow!(
221 "intentional start_actors_err"
222 )
223 .into()));
224
225 let id = self.actor_context.id;
226 let span_name = format!("Actor {id}");
227
228 let new_span = |epoch: Option<EpochPair>| {
229 tracing::info_span!(
230 parent: None,
231 "actor",
232 "otel.name" = span_name,
233 actor_id = id,
234 prev_epoch = epoch.map(|e| e.prev),
235 curr_epoch = epoch.map(|e| e.curr),
236 )
237 };
238 let mut span = new_span(None);
239
240 let actor_count = self
241 .actor_context
242 .streaming_metrics
243 .actor_count
244 .with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
245 let _actor_count_guard = actor_count.inc_guard();
246
247 let current_epoch = self
248 .actor_context
249 .streaming_metrics
250 .actor_current_epoch
251 .with_guarded_label_values(&[
252 &self.actor_context.id.to_string(),
253 &self.actor_context.fragment_id.to_string(),
254 ]);
255
256 let mut last_epoch: Option<EpochPair> = None;
257 let mut stream = Box::pin(Box::new(self.consumer).execute());
258
259 let result = loop {
261 let barrier = match stream
262 .try_next()
263 .instrument(span.clone())
264 .instrument_await(
265 last_epoch.map_or(await_tree::span!("Epoch <initial>"), |e| {
266 await_tree::span!("Epoch {}", e.curr)
267 }),
268 )
269 .await
270 {
271 Ok(Some(barrier)) => barrier,
272 Ok(None) => break Err(anyhow!("actor exited unexpectedly").into()),
273 Err(err) => break Err(err),
274 };
275
276 fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
277 "intentional collect_actors_err"
278 )
279 .into()));
280
281 if barrier.is_stop(id) {
283 debug!(actor_id = id, epoch = ?barrier.epoch, "stop at barrier");
284 break Ok(barrier);
285 }
286
287 current_epoch.set(barrier.epoch.curr as i64);
288
289 self.barrier_manager.collect(id, &barrier);
291
292 last_epoch = Some(barrier.epoch);
294 span = barrier.tracing_context().attach(new_span(last_epoch));
295 };
296
297 spawn_blocking_drop_stream(stream).await;
298
299 let result = result.map(|stop_barrier| {
300 self.barrier_manager.collect(id, &stop_barrier);
302 });
303
304 tracing::debug!(actor_id = id, ok = result.is_ok(), "actor exit");
305 result
306 }
307}
308
309pub async fn spawn_blocking_drop_stream<T: Send + 'static>(stream: T) {
317 let _ = tokio::task::spawn_blocking(move || drop(stream))
318 .instrument_await("drop_stream")
319 .await;
320}