1use std::collections::{BTreeMap, HashSet, btree_map};
16use std::marker::PhantomData;
17use std::ops::RangeInclusive;
18
19use delta_btree_map::Change;
20use itertools::Itertools;
21use risingwave_common::array::stream_record::Record;
22use risingwave_common::row::RowExt;
23use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
24use risingwave_common::types::DefaultOrdered;
25use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_expr::window_function::{
28 RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall,
29};
30
31use super::frame_finder::merge_rows_frames;
32use super::over_partition::{OverPartition, PartitionDelta};
33use super::range_cache::{CacheKey, PartitionCache};
34use crate::cache::ManagedLruCache;
35use crate::common::change_buffer::ChangeBuffer;
36use crate::common::metrics::MetricsInfo;
37use crate::consistency::consistency_panic;
38use crate::executor::monitor::OverWindowMetrics;
39use crate::executor::prelude::*;
40
41pub struct OverWindowExecutor<S: StateStore> {
47 input: Executor,
48 inner: ExecutorInner<S>,
49}
50
51struct ExecutorInner<S: StateStore> {
52 actor_ctx: ActorContextRef,
53
54 schema: Schema,
55 calls: Calls,
56 deduped_part_key_indices: Vec<usize>,
57 order_key_indices: Vec<usize>,
58 order_key_data_types: Vec<DataType>,
59 order_key_order_types: Vec<OrderType>,
60 input_stream_key: Vec<usize>,
61 state_key_to_table_sub_pk_proj: Vec<usize>,
62
63 state_table: StateTable<S>,
64 watermark_sequence: AtomicU64Ref,
65
66 chunk_size: usize,
68 cache_policy: CachePolicy,
69}
70
71struct ExecutionVars<S: StateStore> {
72 cached_partitions: ManagedLruCache<OwnedRow, PartitionCache>,
74 recently_accessed_ranges: BTreeMap<DefaultOrdered<OwnedRow>, RangeInclusive<StateKey>>,
76 stats: ExecutionStats,
77 _phantom: PhantomData<S>,
78}
79
80#[derive(Default)]
81struct ExecutionStats {
82 cache_miss: u64,
83 cache_lookup: u64,
84}
85
86impl<S: StateStore> Execute for OverWindowExecutor<S> {
87 fn execute(self: Box<Self>) -> crate::executor::BoxedMessageStream {
88 self.executor_inner().boxed()
89 }
90}
91
92impl<S: StateStore> ExecutorInner<S> {
93 fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
95 full_row
96 .project(&self.deduped_part_key_indices)
97 .into_owned_row()
98 }
99
100 fn get_input_pk(&self, full_row: impl Row) -> OwnedRow {
101 full_row.project(&self.input_stream_key).into_owned_row()
102 }
103
104 fn encode_order_key(&self, full_row: impl Row) -> StreamExecutorResult<MemcmpEncoded> {
106 Ok(memcmp_encoding::encode_row(
107 full_row.project(&self.order_key_indices),
108 &self.order_key_order_types,
109 )?)
110 }
111
112 fn row_to_cache_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<CacheKey> {
113 Ok(CacheKey::Normal(StateKey {
114 order_key: self.encode_order_key(full_row)?,
115 pk: self.get_input_pk(full_row).into(),
116 }))
117 }
118}
119
120pub struct OverWindowExecutorArgs<S: StateStore> {
121 pub actor_ctx: ActorContextRef,
122
123 pub input: Executor,
124
125 pub schema: Schema,
126 pub calls: Vec<WindowFuncCall>,
127 pub partition_key_indices: Vec<usize>,
128 pub order_key_indices: Vec<usize>,
129 pub order_key_order_types: Vec<OrderType>,
130
131 pub state_table: StateTable<S>,
132 pub watermark_epoch: AtomicU64Ref,
133 pub metrics: Arc<StreamingMetrics>,
134
135 pub chunk_size: usize,
136 pub cache_policy: CachePolicy,
137}
138
139pub(super) struct Calls {
143 calls: Vec<WindowFuncCall>,
144
145 pub(super) super_rows_frame_bounds: RowsFrameBounds,
147 pub(super) range_frames: Vec<RangeFrameBounds>,
149 pub(super) start_is_unbounded: bool,
150 pub(super) end_is_unbounded: bool,
151 pub(super) all_arg_indices: Vec<usize>,
153
154 pub(super) numbering_only: bool,
157 pub(super) has_rank: bool,
158}
159
160impl Calls {
161 fn new(calls: Vec<WindowFuncCall>) -> Self {
162 let rows_frames = calls
163 .iter()
164 .filter_map(|call| call.frame.bounds.as_rows())
165 .collect::<Vec<_>>();
166 let super_rows_frame_bounds = merge_rows_frames(&rows_frames);
167 let range_frames = calls
168 .iter()
169 .filter_map(|call| call.frame.bounds.as_range())
170 .cloned()
171 .collect::<Vec<_>>();
172
173 let start_is_unbounded = calls
174 .iter()
175 .any(|call| call.frame.bounds.start_is_unbounded());
176 let end_is_unbounded = calls
177 .iter()
178 .any(|call| call.frame.bounds.end_is_unbounded());
179
180 let all_arg_indices = calls
181 .iter()
182 .flat_map(|call| call.args.val_indices().iter().copied())
183 .dedup()
184 .collect();
185
186 let numbering_only = calls.iter().all(|call| call.kind.is_numbering());
187 let has_rank = calls.iter().any(|call| call.kind.is_rank());
188
189 Self {
190 calls,
191 super_rows_frame_bounds,
192 range_frames,
193 start_is_unbounded,
194 end_is_unbounded,
195 all_arg_indices,
196 numbering_only,
197 has_rank,
198 }
199 }
200
201 pub(super) fn iter(&self) -> impl ExactSizeIterator<Item = &WindowFuncCall> {
202 self.calls.iter()
203 }
204
205 pub(super) fn len(&self) -> usize {
206 self.calls.len()
207 }
208}
209
210impl<S: StateStore> OverWindowExecutor<S> {
211 pub fn new(args: OverWindowExecutorArgs<S>) -> Self {
212 let calls = Calls::new(args.calls);
213
214 let input_info = args.input.info().clone();
215 let input_schema = &input_info.schema;
216
217 let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded;
218 let cache_policy = if has_unbounded_frame {
219 CachePolicy::Full
222 } else {
223 args.cache_policy
224 };
225
226 let order_key_data_types = args
227 .order_key_indices
228 .iter()
229 .map(|i| input_schema[*i].data_type())
230 .collect();
231
232 let state_key_to_table_sub_pk_proj = RowConverter::calc_state_key_to_table_sub_pk_proj(
233 &args.partition_key_indices,
234 &args.order_key_indices,
235 &input_info.stream_key,
236 );
237
238 let deduped_part_key_indices = {
239 let mut dedup = HashSet::new();
240 args.partition_key_indices
241 .iter()
242 .filter(|i| dedup.insert(**i))
243 .copied()
244 .collect()
245 };
246
247 Self {
248 input: args.input,
249 inner: ExecutorInner {
250 actor_ctx: args.actor_ctx,
251 schema: args.schema,
252 calls,
253 deduped_part_key_indices,
254 order_key_indices: args.order_key_indices,
255 order_key_data_types,
256 order_key_order_types: args.order_key_order_types,
257 input_stream_key: input_info.stream_key,
258 state_key_to_table_sub_pk_proj,
259 state_table: args.state_table,
260 watermark_sequence: args.watermark_epoch,
261 chunk_size: args.chunk_size,
262 cache_policy,
263 },
264 }
265 }
266
267 fn merge_changes_in_chunk<'a>(
274 this: &'_ ExecutorInner<S>,
275 chunk: &'a StreamChunk,
276 ) -> impl Iterator<Item = Record<RowRef<'a>>> {
277 let mut cb = ChangeBuffer::with_capacity(chunk.cardinality());
278 for record in chunk.records() {
279 cb.apply_record(record, |row| this.get_input_pk(row));
280 }
281 cb.into_records()
282 }
283
284 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
285 async fn apply_chunk<'a>(
286 this: &'a mut ExecutorInner<S>,
287 vars: &'a mut ExecutionVars<S>,
288 chunk: StreamChunk,
289 metrics: &'a OverWindowMetrics,
290 ) {
291 let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, (PartitionDelta, PartitionDelta)> =
296 BTreeMap::new();
297 let mut key_change_updated_pks = HashSet::new();
299
300 for record in Self::merge_changes_in_chunk(this, &chunk) {
302 match record {
303 Record::Insert { new_row } => {
304 let part_key = this.get_partition_key(new_row).into();
305 let (delta, _) = deltas.entry(part_key).or_default();
306 delta.insert(
307 this.row_to_cache_key(new_row)?,
308 Change::Insert(new_row.into_owned_row()),
309 );
310 }
311 Record::Delete { old_row } => {
312 let part_key = this.get_partition_key(old_row).into();
313 let (delta, _) = deltas.entry(part_key).or_default();
314 delta.insert(this.row_to_cache_key(old_row)?, Change::Delete);
315 }
316 Record::Update { old_row, new_row } => {
317 let old_part_key = this.get_partition_key(old_row).into();
318 let new_part_key = this.get_partition_key(new_row).into();
319 let old_state_key = this.row_to_cache_key(old_row)?;
320 let new_state_key = this.row_to_cache_key(new_row)?;
321 if old_part_key == new_part_key && old_state_key == new_state_key {
322 let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default();
324 if old_row.project(&this.calls.all_arg_indices)
325 == new_row.project(&this.calls.all_arg_indices)
326 {
327 no_effect_delta
329 .insert(old_state_key, Change::Insert(new_row.into_owned_row()));
330 } else {
331 delta.insert(old_state_key, Change::Insert(new_row.into_owned_row()));
332 }
333 } else if old_part_key == new_part_key {
334 key_change_updated_pks.insert(this.get_input_pk(old_row));
337 let (delta, _) = deltas.entry(old_part_key).or_default();
338 delta.insert(old_state_key, Change::Delete);
339 delta.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
340 } else {
341 let (old_part_delta, _) = deltas.entry(old_part_key).or_default();
346 old_part_delta.insert(old_state_key, Change::Delete);
347 let (new_part_delta, _) = deltas.entry(new_part_key).or_default();
348 new_part_delta
349 .insert(new_state_key, Change::Insert(new_row.into_owned_row()));
350 }
351 }
352 }
353 }
354
355 let mut key_change_update_buffer: BTreeMap<DefaultOrdered<OwnedRow>, Record<OwnedRow>> =
357 BTreeMap::new();
358 let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
359
360 for (part_key, (delta, no_effect_delta)) in deltas {
362 vars.stats.cache_lookup += 1;
363 if !vars.cached_partitions.contains(&part_key.0) {
364 vars.stats.cache_miss += 1;
365 vars.cached_partitions
366 .put(part_key.0.clone(), PartitionCache::new());
367 }
368 let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap();
369
370 for (key, change) in no_effect_delta {
375 let new_row = change.into_insert().unwrap(); let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned()
378 {
379 (old_row, true)
381 } else {
382 let table_pk = (&new_row).project(this.state_table.pk_indices());
384 if let Some(old_row) = this.state_table.get_row(table_pk).await? {
387 (old_row, false)
388 } else {
389 consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row");
390 continue;
391 }
392 };
393
394 let input_len = new_row.len();
396 let new_row = OwnedRow::new(
397 new_row
398 .into_iter()
399 .chain(old_row.as_inner().iter().skip(input_len).cloned()) .collect(),
401 );
402
403 let record = Record::Update {
405 old_row: &old_row,
406 new_row: &new_row,
407 };
408 if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
409 yield chunk;
410 }
411 this.state_table.write_record(record);
412 if from_cache {
413 cache.insert(key, new_row);
414 }
415 }
416
417 let mut partition = OverPartition::new(
418 &part_key,
419 &mut cache,
420 this.cache_policy,
421 &this.calls,
422 RowConverter {
423 state_key_to_table_sub_pk_proj: &this.state_key_to_table_sub_pk_proj,
424 order_key_indices: &this.order_key_indices,
425 order_key_data_types: &this.order_key_data_types,
426 order_key_order_types: &this.order_key_order_types,
427 input_stream_key_indices: &this.input_stream_key,
428 },
429 );
430
431 if delta.is_empty() {
432 continue;
433 }
434
435 let (part_changes, accessed_range) =
437 partition.build_changes(&this.state_table, delta).await?;
438
439 for (key, record) in part_changes {
440 if !key_change_updated_pks.contains(&key.pk) {
442 if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
443 yield chunk;
444 }
445 } else {
446 let pk = key.pk.clone();
449 let record = record.clone();
450 if let Some(existed) = key_change_update_buffer.remove(&key.pk) {
451 match (existed, record) {
452 (Record::Insert { new_row }, Record::Delete { old_row })
453 | (Record::Delete { old_row }, Record::Insert { new_row }) => {
454 if let Some(chunk) =
456 chunk_builder.append_record(Record::Update { old_row, new_row })
457 {
458 yield chunk;
459 }
460 }
461 (existed, record) => {
462 consistency_panic!(
464 ?existed,
465 ?record,
466 "other cases should not exist",
467 );
468
469 key_change_update_buffer.insert(pk, record);
470 if let Some(chunk) = chunk_builder.append_record(existed) {
471 yield chunk;
472 }
473 }
474 }
475 } else {
476 key_change_update_buffer.insert(pk, record);
477 }
478 }
479
480 partition.write_record(&mut this.state_table, key, record);
482 }
483
484 if !key_change_update_buffer.is_empty() {
485 consistency_panic!(
486 ?key_change_update_buffer,
487 "key-change update buffer should be empty after processing"
488 );
489 }
492
493 let cache_len = partition.cache_real_len();
494 let stats = partition.summarize();
495 metrics
496 .over_window_range_cache_entry_count
497 .set(cache_len as i64);
498 metrics
499 .over_window_range_cache_lookup_count
500 .inc_by(stats.lookup_count);
501 metrics
502 .over_window_range_cache_left_miss_count
503 .inc_by(stats.left_miss_count);
504 metrics
505 .over_window_range_cache_right_miss_count
506 .inc_by(stats.right_miss_count);
507 metrics
508 .over_window_accessed_entry_count
509 .inc_by(stats.accessed_entry_count);
510 metrics
511 .over_window_compute_count
512 .inc_by(stats.compute_count);
513 metrics
514 .over_window_same_output_count
515 .inc_by(stats.same_output_count);
516
517 if !this.cache_policy.is_full()
519 && let Some(accessed_range) = accessed_range
520 {
521 match vars.recently_accessed_ranges.entry(part_key) {
522 btree_map::Entry::Vacant(vacant) => {
523 vacant.insert(accessed_range);
524 }
525 btree_map::Entry::Occupied(mut occupied) => {
526 let recently_accessed_range = occupied.get_mut();
527 let min_start = accessed_range
528 .start()
529 .min(recently_accessed_range.start())
530 .clone();
531 let max_end = accessed_range
532 .end()
533 .max(recently_accessed_range.end())
534 .clone();
535 *recently_accessed_range = min_start..=max_end;
536 }
537 }
538 }
539 }
540
541 if let Some(chunk) = chunk_builder.take() {
543 yield chunk;
544 }
545 }
546
547 #[try_stream(ok = Message, error = StreamExecutorError)]
548 async fn executor_inner(self) {
549 let OverWindowExecutor {
550 input,
551 inner: mut this,
552 } = self;
553
554 let metrics_info = MetricsInfo::new(
555 this.actor_ctx.streaming_metrics.clone(),
556 this.state_table.table_id(),
557 this.actor_ctx.id,
558 "OverWindow",
559 );
560
561 let metrics = metrics_info.metrics.new_over_window_metrics(
562 this.state_table.table_id(),
563 this.actor_ctx.id,
564 this.actor_ctx.fragment_id,
565 );
566
567 let mut vars = ExecutionVars {
568 cached_partitions: ManagedLruCache::unbounded(
569 this.watermark_sequence.clone(),
570 metrics_info,
571 ),
572 recently_accessed_ranges: Default::default(),
573 stats: Default::default(),
574 _phantom: PhantomData::<S>,
575 };
576
577 let mut input = input.execute();
578 let barrier = expect_first_barrier(&mut input).await?;
579 let first_epoch = barrier.epoch;
580 yield Message::Barrier(barrier);
581 this.state_table.init_epoch(first_epoch).await?;
582
583 #[for_await]
584 for msg in input {
585 let msg = msg?;
586 match msg {
587 Message::Watermark(_) => {
588 continue;
591 }
592 Message::Chunk(chunk) => {
593 #[for_await]
594 for chunk in Self::apply_chunk(&mut this, &mut vars, chunk, &metrics) {
595 yield Message::Chunk(chunk?);
596 }
597 this.state_table.try_flush().await?;
598 }
599 Message::Barrier(barrier) => {
600 let post_commit = this.state_table.commit(barrier.epoch).await?;
601
602 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
603 yield Message::Barrier(barrier);
604
605 vars.cached_partitions.evict();
606
607 metrics
608 .over_window_cached_entry_count
609 .set(vars.cached_partitions.len() as _);
610 metrics
611 .over_window_cache_lookup_count
612 .inc_by(std::mem::take(&mut vars.stats.cache_lookup));
613 metrics
614 .over_window_cache_miss_count
615 .inc_by(std::mem::take(&mut vars.stats.cache_miss));
616
617 if let Some((_, cache_may_stale)) =
618 post_commit.post_yield_barrier(update_vnode_bitmap).await?
619 && cache_may_stale
620 {
621 vars.cached_partitions.clear();
622 vars.recently_accessed_ranges.clear();
623 }
624
625 if !this.cache_policy.is_full() {
626 for (part_key, recently_accessed_range) in
627 std::mem::take(&mut vars.recently_accessed_ranges)
628 {
629 if let Some(mut range_cache) =
630 vars.cached_partitions.get_mut(&part_key.0)
631 {
632 range_cache.shrink(
633 &part_key.0,
634 this.cache_policy,
635 recently_accessed_range,
636 );
637 }
638 }
639 }
640 }
641 }
642 }
643 }
644}
645
646#[derive(Debug, Clone, Copy)]
659pub(super) struct RowConverter<'a> {
660 state_key_to_table_sub_pk_proj: &'a [usize],
661 order_key_indices: &'a [usize],
662 order_key_data_types: &'a [DataType],
663 order_key_order_types: &'a [OrderType],
664 input_stream_key_indices: &'a [usize],
665}
666
667impl<'a> RowConverter<'a> {
668 pub(super) fn calc_state_key_to_table_sub_pk_proj(
672 partition_key_indices: &[usize],
673 order_key_indices: &[usize],
674 input_stream_key_indices: &'a [usize],
675 ) -> Vec<usize> {
676 let mut projection =
678 Vec::with_capacity(order_key_indices.len() + input_stream_key_indices.len());
679 let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
680 for (proj_idx, key_idx) in order_key_indices
681 .iter()
682 .chain(input_stream_key_indices.iter())
683 .enumerate()
684 {
685 if col_dedup.insert(*key_idx) {
686 projection.push(proj_idx);
687 }
688 }
689 projection.shrink_to_fit();
690 projection
691 }
692
693 pub(super) fn state_key_to_table_sub_pk(
695 &self,
696 key: &StateKey,
697 ) -> StreamExecutorResult<OwnedRow> {
698 Ok(memcmp_encoding::decode_row(
699 &key.order_key,
700 self.order_key_data_types,
701 self.order_key_order_types,
702 )?
703 .chain(key.pk.as_inner())
704 .project(self.state_key_to_table_sub_pk_proj)
705 .into_owned_row())
706 }
707
708 pub(super) fn row_to_state_key(
710 &self,
711 full_row: impl Row + Copy,
712 ) -> StreamExecutorResult<StateKey> {
713 Ok(StateKey {
714 order_key: memcmp_encoding::encode_row(
715 full_row.project(self.order_key_indices),
716 self.order_key_order_types,
717 )?,
718 pk: full_row
719 .project(self.input_stream_key_indices)
720 .into_owned_row()
721 .into(),
722 })
723 }
724}