1use std::collections::{BTreeMap, HashSet, btree_map};
16use std::marker::PhantomData;
17use std::ops::RangeInclusive;
18
19use delta_btree_map::Change;
20use itertools::Itertools;
21use risingwave_common::array::Op;
22use risingwave_common::array::stream_record::Record;
23use risingwave_common::row::RowExt;
24use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
25use risingwave_common::types::DefaultOrdered;
26use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
27use risingwave_common::util::sort_util::OrderType;
28use risingwave_expr::window_function::{
29 RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall,
30};
31
32use super::frame_finder::merge_rows_frames;
33use super::over_partition::{
34 CacheKey, OverPartition, PartitionCache, PartitionDelta, new_empty_partition_cache,
35 shrink_partition_cache,
36};
37use crate::cache::ManagedLruCache;
38use crate::common::metrics::MetricsInfo;
39use crate::consistency::consistency_panic;
40use crate::executor::monitor::OverWindowMetrics;
41use crate::executor::prelude::*;
42
43pub struct OverWindowExecutor<S: StateStore> {
49 input: Executor,
50 inner: ExecutorInner<S>,
51}
52
53struct ExecutorInner<S: StateStore> {
54 actor_ctx: ActorContextRef,
55
56 schema: Schema,
57 calls: Calls,
58 deduped_part_key_indices: Vec<usize>,
59 order_key_indices: Vec<usize>,
60 order_key_data_types: Vec<DataType>,
61 order_key_order_types: Vec<OrderType>,
62 input_pk_indices: Vec<usize>,
63 state_key_to_table_sub_pk_proj: Vec<usize>,
64
65 state_table: StateTable<S>,
66 watermark_sequence: AtomicU64Ref,
67
68 chunk_size: usize,
70 cache_policy: CachePolicy,
71}
72
73struct ExecutionVars<S: StateStore> {
74 cached_partitions: ManagedLruCache<OwnedRow, PartitionCache>,
76 recently_accessed_ranges: BTreeMap<DefaultOrdered<OwnedRow>, RangeInclusive<StateKey>>,
78 stats: ExecutionStats,
79 _phantom: PhantomData<S>,
80}
81
82#[derive(Default)]
83struct ExecutionStats {
84 cache_miss: u64,
85 cache_lookup: u64,
86}
87
88impl<S: StateStore> Execute for OverWindowExecutor<S> {
89 fn execute(self: Box<Self>) -> crate::executor::BoxedMessageStream {
90 self.executor_inner().boxed()
91 }
92}
93
94impl<S: StateStore> ExecutorInner<S> {
95 fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
97 full_row
98 .project(&self.deduped_part_key_indices)
99 .into_owned_row()
100 }
101
102 fn get_input_pk(&self, full_row: impl Row) -> OwnedRow {
103 full_row.project(&self.input_pk_indices).into_owned_row()
104 }
105
106 fn encode_order_key(&self, full_row: impl Row) -> StreamExecutorResult<MemcmpEncoded> {
108 Ok(memcmp_encoding::encode_row(
109 full_row.project(&self.order_key_indices),
110 &self.order_key_order_types,
111 )?)
112 }
113
114 fn row_to_cache_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<CacheKey> {
115 Ok(CacheKey::Normal(StateKey {
116 order_key: self.encode_order_key(full_row)?,
117 pk: self.get_input_pk(full_row).into(),
118 }))
119 }
120}
121
122pub struct OverWindowExecutorArgs<S: StateStore> {
123 pub actor_ctx: ActorContextRef,
124
125 pub input: Executor,
126
127 pub schema: Schema,
128 pub calls: Vec<WindowFuncCall>,
129 pub partition_key_indices: Vec<usize>,
130 pub order_key_indices: Vec<usize>,
131 pub order_key_order_types: Vec<OrderType>,
132
133 pub state_table: StateTable<S>,
134 pub watermark_epoch: AtomicU64Ref,
135 pub metrics: Arc<StreamingMetrics>,
136
137 pub chunk_size: usize,
138 pub cache_policy: CachePolicy,
139}
140
141pub(super) struct Calls {
145 calls: Vec<WindowFuncCall>,
146
147 pub(super) super_rows_frame_bounds: RowsFrameBounds,
149 pub(super) range_frames: Vec<RangeFrameBounds>,
151 pub(super) start_is_unbounded: bool,
152 pub(super) end_is_unbounded: bool,
153 pub(super) all_arg_indices: Vec<usize>,
155
156 pub(super) numbering_only: bool,
159 pub(super) has_rank: bool,
160}
161
162impl Calls {
163 fn new(calls: Vec<WindowFuncCall>) -> Self {
164 let rows_frames = calls
165 .iter()
166 .filter_map(|call| call.frame.bounds.as_rows())
167 .collect::<Vec<_>>();
168 let super_rows_frame_bounds = merge_rows_frames(&rows_frames);
169 let range_frames = calls
170 .iter()
171 .filter_map(|call| call.frame.bounds.as_range())
172 .cloned()
173 .collect::<Vec<_>>();
174
175 let start_is_unbounded = calls
176 .iter()
177 .any(|call| call.frame.bounds.start_is_unbounded());
178 let end_is_unbounded = calls
179 .iter()
180 .any(|call| call.frame.bounds.end_is_unbounded());
181
182 let all_arg_indices = calls
183 .iter()
184 .flat_map(|call| call.args.val_indices().iter().copied())
185 .dedup()
186 .collect();
187
188 let numbering_only = calls.iter().all(|call| call.kind.is_numbering());
189 let has_rank = calls.iter().any(|call| call.kind.is_rank());
190
191 Self {
192 calls,
193 super_rows_frame_bounds,
194 range_frames,
195 start_is_unbounded,
196 end_is_unbounded,
197 all_arg_indices,
198 numbering_only,
199 has_rank,
200 }
201 }
202
203 pub(super) fn iter(&self) -> impl ExactSizeIterator<Item = &WindowFuncCall> {
204 self.calls.iter()
205 }
206
207 pub(super) fn len(&self) -> usize {
208 self.calls.len()
209 }
210}
211
212impl<S: StateStore> OverWindowExecutor<S> {
213 pub fn new(args: OverWindowExecutorArgs<S>) -> Self {
214 let calls = Calls::new(args.calls);
215
216 let input_info = args.input.info().clone();
217 let input_schema = &input_info.schema;
218
219 let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded;
220 let cache_policy = if has_unbounded_frame {
221 CachePolicy::Full
224 } else {
225 args.cache_policy
226 };
227
228 let order_key_data_types = args
229 .order_key_indices
230 .iter()
231 .map(|i| input_schema[*i].data_type())
232 .collect();
233
234 let state_key_to_table_sub_pk_proj = RowConverter::calc_state_key_to_table_sub_pk_proj(
235 &args.partition_key_indices,
236 &args.order_key_indices,
237 &input_info.pk_indices,
238 );
239
240 let deduped_part_key_indices = {
241 let mut dedup = HashSet::new();
242 args.partition_key_indices
243 .iter()
244 .filter(|i| dedup.insert(**i))
245 .copied()
246 .collect()
247 };
248
249 Self {
250 input: args.input,
251 inner: ExecutorInner {
252 actor_ctx: args.actor_ctx,
253 schema: args.schema,
254 calls,
255 deduped_part_key_indices,
256 order_key_indices: args.order_key_indices,
257 order_key_data_types,
258 order_key_order_types: args.order_key_order_types,
259 input_pk_indices: input_info.pk_indices,
260 state_key_to_table_sub_pk_proj,
261 state_table: args.state_table,
262 watermark_sequence: args.watermark_epoch,
263 chunk_size: args.chunk_size,
264 cache_policy,
265 },
266 }
267 }
268
269 fn merge_changes_in_chunk<'a>(
276 this: &'_ ExecutorInner<S>,
277 chunk: &'a StreamChunk,
278 ) -> impl Iterator<Item = Record<RowRef<'a>>> {
279 let mut changes_merged = BTreeMap::new();
280 for (op, row) in chunk.rows() {
281 let pk = DefaultOrdered(this.get_input_pk(row));
282 match op {
283 Op::Insert | Op::UpdateInsert => {
284 if let Some(prev_change) = changes_merged.get_mut(&pk) {
285 match prev_change {
286 Record::Delete { old_row } => {
287 *prev_change = Record::Update {
288 old_row: *old_row,
289 new_row: row,
290 };
291 }
292 _ => {
293 consistency_panic!(
294 ?pk,
295 "inconsistent changes in input chunk, double-inserting"
296 );
297 if let Record::Update { old_row, .. } = prev_change {
298 *prev_change = Record::Update {
299 old_row: *old_row,
300 new_row: row,
301 };
302 } else {
303 *prev_change = Record::Insert { new_row: row };
304 }
305 }
306 }
307 } else {
308 changes_merged.insert(pk, Record::Insert { new_row: row });
309 }
310 }
311 Op::Delete | Op::UpdateDelete => {
312 if let Some(prev_change) = changes_merged.get_mut(&pk) {
313 match prev_change {
314 Record::Insert { .. } => {
315 changes_merged.remove(&pk);
316 }
317 Record::Update {
318 old_row: real_old_row,
319 ..
320 } => {
321 *prev_change = Record::Delete {
322 old_row: *real_old_row,
323 };
324 }
325 _ => {
326 consistency_panic!(
327 ?pk,
328 "inconsistent changes in input chunk, double-deleting"
329 );
330 *prev_change = Record::Delete { old_row: row };
331 }
332 }
333 } else {
334 changes_merged.insert(pk, Record::Delete { old_row: row });
335 }
336 }
337 }
338 }
339 changes_merged.into_values()
340 }
341
342 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
343 async fn apply_chunk<'a>(
344 this: &'a mut ExecutorInner<S>,
345 vars: &'a mut ExecutionVars<S>,
346 chunk: StreamChunk,
347 metrics: &'a OverWindowMetrics,
348 ) {
349 let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, (PartitionDelta, PartitionDelta)> =
354 BTreeMap::new();
355 let mut key_change_updated_pks = HashSet::new();
357
358 for record in Self::merge_changes_in_chunk(this, &chunk) {
360 match record {
361 Record::Insert { new_row } => {
362 let part_key = this.get_partition_key(new_row).into();
363 let (delta, _) = deltas.entry(part_key).or_default();
364 delta.insert(
365 this.row_to_cache_key(new_row)?,
366 Change::Insert(new_row.into_owned_row()),
367 );
368 }
369 Record::Delete { old_row } => {
370 let part_key = this.get_partition_key(old_row).into();
371 let (delta, _) = deltas.entry(part_key).or_default();
372 delta.insert(this.row_to_cache_key(old_row)?, Change::Delete);
373 }
374 Record::Update { old_row, new_row } => {
375 let old_part_key = this.get_partition_key(old_row).into();
376 let new_part_key = this.get_partition_key(new_row).into();
377 let old_state_key = this.row_to_cache_key(old_row)?;
378 let new_state_key = this.row_to_cache_key(new_row)?;
379 if old_part_key == new_part_key && old_state_key == new_state_key {
380 let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default();
382 if old_row.project(&this.calls.all_arg_indices)
383 == new_row.project(&this.calls.all_arg_indices)
384 {
385 no_effect_delta
387 .insert(old_state_key, Change::Insert(new_row.into_owned_row()));
388 } else {
389 delta.insert(old_state_key, Change::Insert(new_row.into_owned_row()));
390 }
391 } else if old_part_key == new_part_key {
392 key_change_updated_pks.insert(this.get_input_pk(old_row));
395 let (delta, _) = deltas.entry(old_part_key).or_default();
396 delta.insert(old_state_key, Change::Delete);
397 delta.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
398 } else {
399 let (old_part_delta, _) = deltas.entry(old_part_key).or_default();
404 old_part_delta.insert(old_state_key, Change::Delete);
405 let (new_part_delta, _) = deltas.entry(new_part_key).or_default();
406 new_part_delta
407 .insert(new_state_key, Change::Insert(new_row.into_owned_row()));
408 }
409 }
410 }
411 }
412
413 let mut key_change_update_buffer: BTreeMap<DefaultOrdered<OwnedRow>, Record<OwnedRow>> =
415 BTreeMap::new();
416 let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
417
418 for (part_key, (delta, no_effect_delta)) in deltas {
420 vars.stats.cache_lookup += 1;
421 if !vars.cached_partitions.contains(&part_key.0) {
422 vars.stats.cache_miss += 1;
423 vars.cached_partitions
424 .put(part_key.0.clone(), new_empty_partition_cache());
425 }
426 let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap();
427
428 for (key, change) in no_effect_delta {
433 let new_row = change.into_insert().unwrap(); let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned()
436 {
437 (old_row, true)
439 } else {
440 let table_pk = (&new_row).project(this.state_table.pk_indices());
442 if let Some(old_row) = this.state_table.get_row(table_pk).await? {
445 (old_row, false)
446 } else {
447 consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row");
448 continue;
449 }
450 };
451
452 let input_len = new_row.len();
454 let new_row = OwnedRow::new(
455 new_row
456 .into_iter()
457 .chain(old_row.as_inner().iter().skip(input_len).cloned()) .collect(),
459 );
460
461 let record = Record::Update {
463 old_row: &old_row,
464 new_row: &new_row,
465 };
466 if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
467 yield chunk;
468 }
469 this.state_table.write_record(record);
470 if from_cache {
471 cache.insert(key, new_row);
472 }
473 }
474
475 let mut partition = OverPartition::new(
476 &part_key,
477 &mut cache,
478 this.cache_policy,
479 &this.calls,
480 RowConverter {
481 state_key_to_table_sub_pk_proj: &this.state_key_to_table_sub_pk_proj,
482 order_key_indices: &this.order_key_indices,
483 order_key_data_types: &this.order_key_data_types,
484 order_key_order_types: &this.order_key_order_types,
485 input_pk_indices: &this.input_pk_indices,
486 },
487 );
488
489 if delta.is_empty() {
490 continue;
491 }
492
493 let (part_changes, accessed_range) =
495 partition.build_changes(&this.state_table, delta).await?;
496
497 for (key, record) in part_changes {
498 if !key_change_updated_pks.contains(&key.pk) {
500 if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
501 yield chunk;
502 }
503 } else {
504 let pk = key.pk.clone();
507 let record = record.clone();
508 if let Some(existed) = key_change_update_buffer.remove(&key.pk) {
509 match (existed, record) {
510 (Record::Insert { new_row }, Record::Delete { old_row })
511 | (Record::Delete { old_row }, Record::Insert { new_row }) => {
512 if let Some(chunk) =
514 chunk_builder.append_record(Record::Update { old_row, new_row })
515 {
516 yield chunk;
517 }
518 }
519 (existed, record) => {
520 consistency_panic!(
522 ?existed,
523 ?record,
524 "other cases should not exist",
525 );
526
527 key_change_update_buffer.insert(pk, record);
528 if let Some(chunk) = chunk_builder.append_record(existed) {
529 yield chunk;
530 }
531 }
532 }
533 } else {
534 key_change_update_buffer.insert(pk, record);
535 }
536 }
537
538 partition.write_record(&mut this.state_table, key, record);
540 }
541
542 if !key_change_update_buffer.is_empty() {
543 consistency_panic!(
544 ?key_change_update_buffer,
545 "key-change update buffer should be empty after processing"
546 );
547 }
550
551 let cache_len = partition.cache_real_len();
552 let stats = partition.summarize();
553 metrics
554 .over_window_range_cache_entry_count
555 .set(cache_len as i64);
556 metrics
557 .over_window_range_cache_lookup_count
558 .inc_by(stats.lookup_count);
559 metrics
560 .over_window_range_cache_left_miss_count
561 .inc_by(stats.left_miss_count);
562 metrics
563 .over_window_range_cache_right_miss_count
564 .inc_by(stats.right_miss_count);
565 metrics
566 .over_window_accessed_entry_count
567 .inc_by(stats.accessed_entry_count);
568 metrics
569 .over_window_compute_count
570 .inc_by(stats.compute_count);
571 metrics
572 .over_window_same_output_count
573 .inc_by(stats.same_output_count);
574
575 if !this.cache_policy.is_full()
577 && let Some(accessed_range) = accessed_range
578 {
579 match vars.recently_accessed_ranges.entry(part_key) {
580 btree_map::Entry::Vacant(vacant) => {
581 vacant.insert(accessed_range);
582 }
583 btree_map::Entry::Occupied(mut occupied) => {
584 let recently_accessed_range = occupied.get_mut();
585 let min_start = accessed_range
586 .start()
587 .min(recently_accessed_range.start())
588 .clone();
589 let max_end = accessed_range
590 .end()
591 .max(recently_accessed_range.end())
592 .clone();
593 *recently_accessed_range = min_start..=max_end;
594 }
595 }
596 }
597 }
598
599 if let Some(chunk) = chunk_builder.take() {
601 yield chunk;
602 }
603 }
604
605 #[try_stream(ok = Message, error = StreamExecutorError)]
606 async fn executor_inner(self) {
607 let OverWindowExecutor {
608 input,
609 inner: mut this,
610 } = self;
611
612 let metrics_info = MetricsInfo::new(
613 this.actor_ctx.streaming_metrics.clone(),
614 this.state_table.table_id(),
615 this.actor_ctx.id,
616 "OverWindow",
617 );
618
619 let metrics = metrics_info.metrics.new_over_window_metrics(
620 this.state_table.table_id(),
621 this.actor_ctx.id,
622 this.actor_ctx.fragment_id,
623 );
624
625 let mut vars = ExecutionVars {
626 cached_partitions: ManagedLruCache::unbounded(
627 this.watermark_sequence.clone(),
628 metrics_info,
629 ),
630 recently_accessed_ranges: Default::default(),
631 stats: Default::default(),
632 _phantom: PhantomData::<S>,
633 };
634
635 let mut input = input.execute();
636 let barrier = expect_first_barrier(&mut input).await?;
637 let first_epoch = barrier.epoch;
638 yield Message::Barrier(barrier);
639 this.state_table.init_epoch(first_epoch).await?;
640
641 #[for_await]
642 for msg in input {
643 let msg = msg?;
644 match msg {
645 Message::Watermark(_) => {
646 continue;
649 }
650 Message::Chunk(chunk) => {
651 #[for_await]
652 for chunk in Self::apply_chunk(&mut this, &mut vars, chunk, &metrics) {
653 yield Message::Chunk(chunk?);
654 }
655 this.state_table.try_flush().await?;
656 }
657 Message::Barrier(barrier) => {
658 let post_commit = this.state_table.commit(barrier.epoch).await?;
659
660 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
661 yield Message::Barrier(barrier);
662
663 vars.cached_partitions.evict();
664
665 metrics
666 .over_window_cached_entry_count
667 .set(vars.cached_partitions.len() as _);
668 metrics
669 .over_window_cache_lookup_count
670 .inc_by(std::mem::take(&mut vars.stats.cache_lookup));
671 metrics
672 .over_window_cache_miss_count
673 .inc_by(std::mem::take(&mut vars.stats.cache_miss));
674
675 if let Some((_, cache_may_stale)) =
676 post_commit.post_yield_barrier(update_vnode_bitmap).await?
677 {
678 if cache_may_stale {
679 vars.cached_partitions.clear();
680 vars.recently_accessed_ranges.clear();
681 }
682 }
683
684 if !this.cache_policy.is_full() {
685 for (part_key, recently_accessed_range) in
686 std::mem::take(&mut vars.recently_accessed_ranges)
687 {
688 if let Some(mut range_cache) =
689 vars.cached_partitions.get_mut(&part_key.0)
690 {
691 shrink_partition_cache(
692 &part_key.0,
693 &mut range_cache,
694 this.cache_policy,
695 recently_accessed_range,
696 );
697 }
698 }
699 }
700 }
701 }
702 }
703 }
704}
705
706#[derive(Debug, Clone, Copy)]
719pub(super) struct RowConverter<'a> {
720 state_key_to_table_sub_pk_proj: &'a [usize],
721 order_key_indices: &'a [usize],
722 order_key_data_types: &'a [DataType],
723 order_key_order_types: &'a [OrderType],
724 input_pk_indices: &'a [usize],
725}
726
727impl<'a> RowConverter<'a> {
728 pub(super) fn calc_state_key_to_table_sub_pk_proj(
732 partition_key_indices: &[usize],
733 order_key_indices: &[usize],
734 input_pk_indices: &'a [usize],
735 ) -> Vec<usize> {
736 let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len());
738 let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
739 for (proj_idx, key_idx) in order_key_indices
740 .iter()
741 .chain(input_pk_indices.iter())
742 .enumerate()
743 {
744 if col_dedup.insert(*key_idx) {
745 projection.push(proj_idx);
746 }
747 }
748 projection.shrink_to_fit();
749 projection
750 }
751
752 pub(super) fn state_key_to_table_sub_pk(
754 &self,
755 key: &StateKey,
756 ) -> StreamExecutorResult<OwnedRow> {
757 Ok(memcmp_encoding::decode_row(
758 &key.order_key,
759 self.order_key_data_types,
760 self.order_key_order_types,
761 )?
762 .chain(key.pk.as_inner())
763 .project(self.state_key_to_table_sub_pk_proj)
764 .into_owned_row())
765 }
766
767 pub(super) fn row_to_state_key(
769 &self,
770 full_row: impl Row + Copy,
771 ) -> StreamExecutorResult<StateKey> {
772 Ok(StateKey {
773 order_key: memcmp_encoding::encode_row(
774 full_row.project(self.order_key_indices),
775 self.order_key_order_types,
776 )?,
777 pk: full_row
778 .project(self.input_pk_indices)
779 .into_owned_row()
780 .into(),
781 })
782 }
783}