risingwave_stream/executor/over_window/
eowc.rs1use std::marker::PhantomData;
16use std::ops::Bound;
17
18use itertools::Itertools;
19use risingwave_common::array::stream_record::Record;
20use risingwave_common::array::{ArrayRef, Op};
21use risingwave_common::row::RowExt;
22use risingwave_common::types::{ToDatumRef, ToOwnedDatum};
23use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
24use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
25use risingwave_common::util::sort_util::OrderType;
26use risingwave_common::{must_match, row};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVecDeque;
29use risingwave_expr::window_function::{
30 StateEvictHint, StateKey, WindowFuncCall, WindowStates, create_window_state,
31};
32use risingwave_storage::store::PrefetchOptions;
33
34use crate::cache::ManagedLruCache;
35use crate::common::metrics::MetricsInfo;
36use crate::executor::prelude::*;
37
38struct Partition {
39 states: WindowStates,
40 curr_row_buffer: EstimatedVecDeque<OwnedRow>,
41}
42
43impl EstimateSize for Partition {
44 fn estimated_heap_size(&self) -> usize {
45 let mut total_size = self.curr_row_buffer.estimated_heap_size();
46 for state in self.states.iter() {
47 total_size += state.estimated_heap_size();
48 }
49 total_size
50 }
51}
52
53type PartitionCache = ManagedLruCache<MemcmpEncoded, Partition>; pub struct EowcOverWindowExecutor<S: StateStore> {
89 input: Executor,
90 inner: ExecutorInner<S>,
91}
92
93struct ExecutorInner<S: StateStore> {
94 actor_ctx: ActorContextRef,
95
96 schema: Schema,
97 calls: Vec<WindowFuncCall>,
98 input_pk_indices: Vec<usize>,
99 partition_key_indices: Vec<usize>,
100 order_key_index: usize, state_table: StateTable<S>,
102 state_table_schema_len: usize,
103 watermark_sequence: AtomicU64Ref,
104}
105
106struct ExecutionVars<S: StateStore> {
107 partitions: PartitionCache,
108 _phantom: PhantomData<S>,
109}
110
111impl<S: StateStore> Execute for EowcOverWindowExecutor<S> {
112 fn execute(self: Box<Self>) -> BoxedMessageStream {
113 self.executor_inner().boxed()
114 }
115}
116
117pub struct EowcOverWindowExecutorArgs<S: StateStore> {
118 pub actor_ctx: ActorContextRef,
119
120 pub input: Executor,
121
122 pub schema: Schema,
123 pub calls: Vec<WindowFuncCall>,
124 pub partition_key_indices: Vec<usize>,
125 pub order_key_index: usize,
126 pub state_table: StateTable<S>,
127 pub watermark_epoch: AtomicU64Ref,
128}
129
130impl<S: StateStore> EowcOverWindowExecutor<S> {
131 pub fn new(args: EowcOverWindowExecutorArgs<S>) -> Self {
132 let input_info = args.input.info().clone();
133
134 Self {
135 input: args.input,
136 inner: ExecutorInner {
137 actor_ctx: args.actor_ctx,
138 schema: args.schema,
139 calls: args.calls,
140 input_pk_indices: input_info.pk_indices,
141 partition_key_indices: args.partition_key_indices,
142 order_key_index: args.order_key_index,
143 state_table: args.state_table,
144 state_table_schema_len: input_info.schema.len(),
145 watermark_sequence: args.watermark_epoch,
146 },
147 }
148 }
149
150 async fn ensure_key_in_cache(
151 this: &ExecutorInner<S>,
152 cache: &mut PartitionCache,
153 partition_key: impl Row,
154 encoded_partition_key: &MemcmpEncoded,
155 ) -> StreamExecutorResult<()> {
156 if cache.contains(encoded_partition_key) {
157 return Ok(());
158 }
159
160 let mut partition = Partition {
161 states: WindowStates::new(this.calls.iter().map(create_window_state).try_collect()?),
162 curr_row_buffer: Default::default(),
163 };
164
165 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
166 let table_iter = this
168 .state_table
169 .iter_with_prefix(partition_key, sub_range, PrefetchOptions::default())
170 .await?;
171
172 #[for_await]
173 for keyed_row in table_iter {
174 let row = keyed_row?.into_owned_row();
175 let order_key_enc = memcmp_encoding::encode_row(
176 row::once(Some(
177 row.datum_at(this.order_key_index)
178 .expect("order key column must be non-NULL")
179 .into_scalar_impl(),
180 )),
181 &[OrderType::ascending()],
182 )?;
183 let pk = (&row).project(&this.input_pk_indices).into_owned_row();
184 let key = StateKey {
185 order_key: order_key_enc,
186 pk: pk.into(),
187 };
188 for (call, state) in this.calls.iter().zip_eq_fast(partition.states.iter_mut()) {
189 state.append(
190 key.clone(),
191 (&row)
192 .project(call.args.val_indices())
193 .into_owned_row()
194 .as_inner()
195 .into(),
196 );
197 }
198 partition.curr_row_buffer.push_back(row);
199 }
200
201 assert!(partition.states.are_aligned());
203
204 while partition.states.are_ready() {
206 partition.states.just_slide()?;
207 partition.curr_row_buffer.pop_front();
208 }
209
210 cache.put(encoded_partition_key.clone(), partition);
211 Ok(())
212 }
213
214 async fn apply_chunk(
215 this: &mut ExecutorInner<S>,
216 vars: &mut ExecutionVars<S>,
217 chunk: StreamChunk,
218 ) -> StreamExecutorResult<Option<StreamChunk>> {
219 let mut builders = this.schema.create_array_builders(chunk.capacity()); for record in chunk.records() {
223 let input_row = must_match!(record, Record::Insert { new_row } => new_row);
224
225 let partition_key = input_row
226 .project(&this.partition_key_indices)
227 .into_owned_row();
228 let encoded_partition_key = memcmp_encoding::encode_row(
229 &partition_key,
230 &vec![OrderType::ascending(); this.partition_key_indices.len()],
231 )?;
232
233 Self::ensure_key_in_cache(
235 this,
236 &mut vars.partitions,
237 &partition_key,
238 &encoded_partition_key,
239 )
240 .await?;
241 let partition: &mut Partition =
242 &mut vars.partitions.get_mut(&encoded_partition_key).unwrap();
243
244 this.state_table.insert(input_row);
246
247 let order_key_enc = memcmp_encoding::encode_row(
249 row::once(Some(
250 input_row
251 .datum_at(this.order_key_index)
252 .expect("order key column must be non-NULL")
253 .into_scalar_impl(),
254 )),
255 &[OrderType::ascending()],
256 )?;
257 let pk = input_row.project(&this.input_pk_indices).into_owned_row();
258 let key = StateKey {
259 order_key: order_key_enc,
260 pk: pk.into(),
261 };
262 for (call, state) in this.calls.iter().zip_eq_fast(partition.states.iter_mut()) {
263 state.append(
264 key.clone(),
265 input_row
266 .project(call.args.val_indices())
267 .into_owned_row()
268 .as_inner()
269 .into(),
270 );
271 }
272 partition
273 .curr_row_buffer
274 .push_back(input_row.into_owned_row());
275
276 while partition.states.are_ready() {
277 let (ret_values, evict_hint) = partition.states.slide()?;
281 let curr_row = partition
282 .curr_row_buffer
283 .pop_front()
284 .expect("ready window must have corresponding current row");
285
286 for (builder, datum) in builders.iter_mut().zip_eq_debug(
288 curr_row
289 .iter()
290 .chain(ret_values.iter().map(|v| v.to_datum_ref())),
291 ) {
292 builder.append(datum);
293 }
294
295 if let StateEvictHint::CanEvict(keys_to_evict) = evict_hint {
297 for key in keys_to_evict {
298 let order_key = memcmp_encoding::decode_row(
299 &key.order_key,
300 &[this.schema[this.order_key_index].data_type()],
301 &[OrderType::ascending()],
302 )?;
303 let state_row_pk = (&partition_key).chain(order_key).chain(key.pk);
304 let state_row = {
305 let mut state_row = vec![None; this.state_table_schema_len];
307 for (i_in_pk, &i) in this.state_table.pk_indices().iter().enumerate() {
308 state_row[i] = state_row_pk.datum_at(i_in_pk).to_owned_datum();
309 }
310 OwnedRow::new(state_row)
311 };
312 this.state_table.delete(state_row);
315 }
316 }
317 }
318 }
319
320 let columns: Vec<ArrayRef> = builders.into_iter().map(|b| b.finish().into()).collect();
321 let chunk_size = columns[0].len();
322 Ok(if chunk_size > 0 {
323 Some(StreamChunk::new(vec![Op::Insert; chunk_size], columns))
324 } else {
325 None
326 })
327 }
328
329 #[try_stream(ok = Message, error = StreamExecutorError)]
330 async fn executor_inner(self) {
331 let EowcOverWindowExecutor {
332 input,
333 inner: mut this,
334 } = self;
335
336 let metrics_info = MetricsInfo::new(
337 this.actor_ctx.streaming_metrics.clone(),
338 this.state_table.table_id(),
339 this.actor_ctx.id,
340 "EowcOverWindow",
341 );
342
343 let mut vars = ExecutionVars {
344 partitions: ManagedLruCache::unbounded(this.watermark_sequence.clone(), metrics_info),
345 _phantom: PhantomData::<S>,
346 };
347
348 let mut input = input.execute();
349 let barrier = expect_first_barrier(&mut input).await?;
350 let first_epoch = barrier.epoch;
351 yield Message::Barrier(barrier);
352 this.state_table.init_epoch(first_epoch).await?;
353
354 #[for_await]
355 for msg in input {
356 let msg = msg?;
357 match msg {
358 Message::Watermark(_) => {
359 continue;
360 }
361 Message::Chunk(chunk) => {
362 let output_chunk = Self::apply_chunk(&mut this, &mut vars, chunk).await?;
363 if let Some(chunk) = output_chunk {
364 yield Message::Chunk(chunk);
365 }
366 this.state_table.try_flush().await?;
367 }
368 Message::Barrier(barrier) => {
369 let post_commit = this.state_table.commit(barrier.epoch).await?;
370 vars.partitions.evict();
371
372 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
373 yield Message::Barrier(barrier);
374
375 if let Some((_, cache_may_stale)) =
376 post_commit.post_yield_barrier(update_vnode_bitmap).await?
377 {
378 if cache_may_stale {
379 vars.partitions.clear();
380 }
381 }
382 }
383 }
384 }
385 }
386}