risingwave_stream/executor/over_window/
general.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet, btree_map};
16use std::marker::PhantomData;
17use std::ops::RangeInclusive;
18
19use delta_btree_map::Change;
20use itertools::Itertools;
21use risingwave_common::array::Op;
22use risingwave_common::array::stream_record::Record;
23use risingwave_common::row::RowExt;
24use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
25use risingwave_common::types::DefaultOrdered;
26use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
27use risingwave_common::util::sort_util::OrderType;
28use risingwave_expr::window_function::{
29    RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall,
30};
31
32use super::frame_finder::merge_rows_frames;
33use super::over_partition::{
34    CacheKey, OverPartition, PartitionCache, PartitionDelta, new_empty_partition_cache,
35    shrink_partition_cache,
36};
37use crate::cache::ManagedLruCache;
38use crate::common::metrics::MetricsInfo;
39use crate::consistency::consistency_panic;
40use crate::executor::monitor::OverWindowMetrics;
41use crate::executor::prelude::*;
42
43/// [`OverWindowExecutor`] consumes retractable input stream and produces window function outputs.
44/// One [`OverWindowExecutor`] can handle one combination of partition key and order key.
45///
46/// - State table schema = output schema, state table pk = `partition key | order key | input pk`.
47/// - Output schema = input schema + window function results.
48pub struct OverWindowExecutor<S: StateStore> {
49    input: Executor,
50    inner: ExecutorInner<S>,
51}
52
53struct ExecutorInner<S: StateStore> {
54    actor_ctx: ActorContextRef,
55
56    schema: Schema,
57    calls: Calls,
58    deduped_part_key_indices: Vec<usize>,
59    order_key_indices: Vec<usize>,
60    order_key_data_types: Vec<DataType>,
61    order_key_order_types: Vec<OrderType>,
62    input_pk_indices: Vec<usize>,
63    state_key_to_table_sub_pk_proj: Vec<usize>,
64
65    state_table: StateTable<S>,
66    watermark_sequence: AtomicU64Ref,
67
68    /// The maximum size of the chunk produced by executor at a time.
69    chunk_size: usize,
70    cache_policy: CachePolicy,
71}
72
73struct ExecutionVars<S: StateStore> {
74    /// partition key => partition range cache.
75    cached_partitions: ManagedLruCache<OwnedRow, PartitionCache>,
76    /// partition key => recently accessed range.
77    recently_accessed_ranges: BTreeMap<DefaultOrdered<OwnedRow>, RangeInclusive<StateKey>>,
78    stats: ExecutionStats,
79    _phantom: PhantomData<S>,
80}
81
82#[derive(Default)]
83struct ExecutionStats {
84    cache_miss: u64,
85    cache_lookup: u64,
86}
87
88impl<S: StateStore> Execute for OverWindowExecutor<S> {
89    fn execute(self: Box<Self>) -> crate::executor::BoxedMessageStream {
90        self.executor_inner().boxed()
91    }
92}
93
94impl<S: StateStore> ExecutorInner<S> {
95    /// Get deduplicated partition key from a full row, which happened to be the prefix of table PK.
96    fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
97        full_row
98            .project(&self.deduped_part_key_indices)
99            .into_owned_row()
100    }
101
102    fn get_input_pk(&self, full_row: impl Row) -> OwnedRow {
103        full_row.project(&self.input_pk_indices).into_owned_row()
104    }
105
106    /// `full_row` can be an input row or state table row.
107    fn encode_order_key(&self, full_row: impl Row) -> StreamExecutorResult<MemcmpEncoded> {
108        Ok(memcmp_encoding::encode_row(
109            full_row.project(&self.order_key_indices),
110            &self.order_key_order_types,
111        )?)
112    }
113
114    fn row_to_cache_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<CacheKey> {
115        Ok(CacheKey::Normal(StateKey {
116            order_key: self.encode_order_key(full_row)?,
117            pk: self.get_input_pk(full_row).into(),
118        }))
119    }
120}
121
122pub struct OverWindowExecutorArgs<S: StateStore> {
123    pub actor_ctx: ActorContextRef,
124
125    pub input: Executor,
126
127    pub schema: Schema,
128    pub calls: Vec<WindowFuncCall>,
129    pub partition_key_indices: Vec<usize>,
130    pub order_key_indices: Vec<usize>,
131    pub order_key_order_types: Vec<OrderType>,
132
133    pub state_table: StateTable<S>,
134    pub watermark_epoch: AtomicU64Ref,
135    pub metrics: Arc<StreamingMetrics>,
136
137    pub chunk_size: usize,
138    pub cache_policy: CachePolicy,
139}
140
141/// Information about the window function calls.
142/// Contains the original calls and many other information that can be derived from the calls to avoid
143/// repeated calculation.
144pub(super) struct Calls {
145    calls: Vec<WindowFuncCall>,
146
147    /// The `ROWS` frame that is the union of all `ROWS` frames.
148    pub(super) super_rows_frame_bounds: RowsFrameBounds,
149    /// All `RANGE` frames.
150    pub(super) range_frames: Vec<RangeFrameBounds>,
151    pub(super) start_is_unbounded: bool,
152    pub(super) end_is_unbounded: bool,
153    /// Deduplicated indices of all arguments of all calls.
154    pub(super) all_arg_indices: Vec<usize>,
155
156    // TODO(rc): The following flags are used to optimize for `row_number`, `rank` and `dense_rank`.
157    // We should try our best to remove these flags while maintaining the performance in the future.
158    pub(super) numbering_only: bool,
159    pub(super) has_rank: bool,
160}
161
162impl Calls {
163    fn new(calls: Vec<WindowFuncCall>) -> Self {
164        let rows_frames = calls
165            .iter()
166            .filter_map(|call| call.frame.bounds.as_rows())
167            .collect::<Vec<_>>();
168        let super_rows_frame_bounds = merge_rows_frames(&rows_frames);
169        let range_frames = calls
170            .iter()
171            .filter_map(|call| call.frame.bounds.as_range())
172            .cloned()
173            .collect::<Vec<_>>();
174
175        let start_is_unbounded = calls
176            .iter()
177            .any(|call| call.frame.bounds.start_is_unbounded());
178        let end_is_unbounded = calls
179            .iter()
180            .any(|call| call.frame.bounds.end_is_unbounded());
181
182        let all_arg_indices = calls
183            .iter()
184            .flat_map(|call| call.args.val_indices().iter().copied())
185            .dedup()
186            .collect();
187
188        let numbering_only = calls.iter().all(|call| call.kind.is_numbering());
189        let has_rank = calls.iter().any(|call| call.kind.is_rank());
190
191        Self {
192            calls,
193            super_rows_frame_bounds,
194            range_frames,
195            start_is_unbounded,
196            end_is_unbounded,
197            all_arg_indices,
198            numbering_only,
199            has_rank,
200        }
201    }
202
203    pub(super) fn iter(&self) -> impl ExactSizeIterator<Item = &WindowFuncCall> {
204        self.calls.iter()
205    }
206
207    pub(super) fn len(&self) -> usize {
208        self.calls.len()
209    }
210}
211
212impl<S: StateStore> OverWindowExecutor<S> {
213    pub fn new(args: OverWindowExecutorArgs<S>) -> Self {
214        let calls = Calls::new(args.calls);
215
216        let input_info = args.input.info().clone();
217        let input_schema = &input_info.schema;
218
219        let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded;
220        let cache_policy = if has_unbounded_frame {
221            // For unbounded frames, we finally need all entries of the partition in the cache,
222            // so for simplicity we just use full cache policy for these cases.
223            CachePolicy::Full
224        } else {
225            args.cache_policy
226        };
227
228        let order_key_data_types = args
229            .order_key_indices
230            .iter()
231            .map(|i| input_schema[*i].data_type())
232            .collect();
233
234        let state_key_to_table_sub_pk_proj = RowConverter::calc_state_key_to_table_sub_pk_proj(
235            &args.partition_key_indices,
236            &args.order_key_indices,
237            &input_info.pk_indices,
238        );
239
240        let deduped_part_key_indices = {
241            let mut dedup = HashSet::new();
242            args.partition_key_indices
243                .iter()
244                .filter(|i| dedup.insert(**i))
245                .copied()
246                .collect()
247        };
248
249        Self {
250            input: args.input,
251            inner: ExecutorInner {
252                actor_ctx: args.actor_ctx,
253                schema: args.schema,
254                calls,
255                deduped_part_key_indices,
256                order_key_indices: args.order_key_indices,
257                order_key_data_types,
258                order_key_order_types: args.order_key_order_types,
259                input_pk_indices: input_info.pk_indices,
260                state_key_to_table_sub_pk_proj,
261                state_table: args.state_table,
262                watermark_sequence: args.watermark_epoch,
263                chunk_size: args.chunk_size,
264                cache_policy,
265            },
266        }
267    }
268
269    /// Merge changes by input pk in the given chunk, return a change iterator which guarantees that
270    /// each pk only appears once. This method also validates the consistency of the input
271    /// chunk.
272    ///
273    /// TODO(rc): We may want to optimize this by handling changes on the same pk during generating
274    /// partition [`Change`]s.
275    fn merge_changes_in_chunk<'a>(
276        this: &'_ ExecutorInner<S>,
277        chunk: &'a StreamChunk,
278    ) -> impl Iterator<Item = Record<RowRef<'a>>> {
279        let mut changes_merged = BTreeMap::new();
280        for (op, row) in chunk.rows() {
281            let pk = DefaultOrdered(this.get_input_pk(row));
282            match op {
283                Op::Insert | Op::UpdateInsert => {
284                    if let Some(prev_change) = changes_merged.get_mut(&pk) {
285                        match prev_change {
286                            Record::Delete { old_row } => {
287                                *prev_change = Record::Update {
288                                    old_row: *old_row,
289                                    new_row: row,
290                                };
291                            }
292                            _ => {
293                                consistency_panic!(
294                                    ?pk,
295                                    "inconsistent changes in input chunk, double-inserting"
296                                );
297                                if let Record::Update { old_row, .. } = prev_change {
298                                    *prev_change = Record::Update {
299                                        old_row: *old_row,
300                                        new_row: row,
301                                    };
302                                } else {
303                                    *prev_change = Record::Insert { new_row: row };
304                                }
305                            }
306                        }
307                    } else {
308                        changes_merged.insert(pk, Record::Insert { new_row: row });
309                    }
310                }
311                Op::Delete | Op::UpdateDelete => {
312                    if let Some(prev_change) = changes_merged.get_mut(&pk) {
313                        match prev_change {
314                            Record::Insert { .. } => {
315                                changes_merged.remove(&pk);
316                            }
317                            Record::Update {
318                                old_row: real_old_row,
319                                ..
320                            } => {
321                                *prev_change = Record::Delete {
322                                    old_row: *real_old_row,
323                                };
324                            }
325                            _ => {
326                                consistency_panic!(
327                                    ?pk,
328                                    "inconsistent changes in input chunk, double-deleting"
329                                );
330                                *prev_change = Record::Delete { old_row: row };
331                            }
332                        }
333                    } else {
334                        changes_merged.insert(pk, Record::Delete { old_row: row });
335                    }
336                }
337            }
338        }
339        changes_merged.into_values()
340    }
341
342    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
343    async fn apply_chunk<'a>(
344        this: &'a mut ExecutorInner<S>,
345        vars: &'a mut ExecutionVars<S>,
346        chunk: StreamChunk,
347        metrics: &'a OverWindowMetrics,
348    ) {
349        // (deduped) partition key => (
350        //   significant changes happened in the partition,
351        //   no-effect changes happened in the partition,
352        // )
353        let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, (PartitionDelta, PartitionDelta)> =
354            BTreeMap::new();
355        // input pk of update records of which the order key is changed.
356        let mut key_change_updated_pks = HashSet::new();
357
358        // Collect changes for each partition.
359        for record in Self::merge_changes_in_chunk(this, &chunk) {
360            match record {
361                Record::Insert { new_row } => {
362                    let part_key = this.get_partition_key(new_row).into();
363                    let (delta, _) = deltas.entry(part_key).or_default();
364                    delta.insert(
365                        this.row_to_cache_key(new_row)?,
366                        Change::Insert(new_row.into_owned_row()),
367                    );
368                }
369                Record::Delete { old_row } => {
370                    let part_key = this.get_partition_key(old_row).into();
371                    let (delta, _) = deltas.entry(part_key).or_default();
372                    delta.insert(this.row_to_cache_key(old_row)?, Change::Delete);
373                }
374                Record::Update { old_row, new_row } => {
375                    let old_part_key = this.get_partition_key(old_row).into();
376                    let new_part_key = this.get_partition_key(new_row).into();
377                    let old_state_key = this.row_to_cache_key(old_row)?;
378                    let new_state_key = this.row_to_cache_key(new_row)?;
379                    if old_part_key == new_part_key && old_state_key == new_state_key {
380                        // not a key-change update
381                        let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default();
382                        if old_row.project(&this.calls.all_arg_indices)
383                            == new_row.project(&this.calls.all_arg_indices)
384                        {
385                            // partition key, order key and arguments are all the same
386                            no_effect_delta
387                                .insert(old_state_key, Change::Insert(new_row.into_owned_row()));
388                        } else {
389                            delta.insert(old_state_key, Change::Insert(new_row.into_owned_row()));
390                        }
391                    } else if old_part_key == new_part_key {
392                        // order-change update, split into delete + insert, will be merged after
393                        // building changes
394                        key_change_updated_pks.insert(this.get_input_pk(old_row));
395                        let (delta, _) = deltas.entry(old_part_key).or_default();
396                        delta.insert(old_state_key, Change::Delete);
397                        delta.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
398                    } else {
399                        // partition-change update, split into delete + insert
400                        // NOTE(rc): Since we append partition key to logical pk, we can't merge the
401                        // delete + insert back to update later.
402                        // TODO: IMO this behavior is problematic. Deep discussion is needed.
403                        let (old_part_delta, _) = deltas.entry(old_part_key).or_default();
404                        old_part_delta.insert(old_state_key, Change::Delete);
405                        let (new_part_delta, _) = deltas.entry(new_part_key).or_default();
406                        new_part_delta
407                            .insert(new_state_key, Change::Insert(new_row.into_owned_row()));
408                    }
409                }
410            }
411        }
412
413        // `input pk` => `Record`
414        let mut key_change_update_buffer: BTreeMap<DefaultOrdered<OwnedRow>, Record<OwnedRow>> =
415            BTreeMap::new();
416        let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
417
418        // Build final changes partition by partition.
419        for (part_key, (delta, no_effect_delta)) in deltas {
420            vars.stats.cache_lookup += 1;
421            if !vars.cached_partitions.contains(&part_key.0) {
422                vars.stats.cache_miss += 1;
423                vars.cached_partitions
424                    .put(part_key.0.clone(), new_empty_partition_cache());
425            }
426            let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap();
427
428            // First, handle `Update`s that don't affect window function outputs.
429            // Be careful that changes in `delta` may (though we believe unlikely) affect the
430            // window function outputs of rows in `no_effect_delta`, so before handling `delta`
431            // we need to write all changes to state table, range cache and chunk builder.
432            for (key, change) in no_effect_delta {
433                let new_row = change.into_insert().unwrap(); // new row of an `Update`
434
435                let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned()
436                {
437                    // Got old row from range cache.
438                    (old_row, true)
439                } else {
440                    // Retrieve old row from state table.
441                    let table_pk = (&new_row).project(this.state_table.pk_indices());
442                    // The accesses to the state table is ordered by table PK, so ideally we
443                    // can leverage the block cache under the hood.
444                    if let Some(old_row) = this.state_table.get_row(table_pk).await? {
445                        (old_row, false)
446                    } else {
447                        consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row");
448                        continue;
449                    }
450                };
451
452                // concatenate old outputs
453                let input_len = new_row.len();
454                let new_row = OwnedRow::new(
455                    new_row
456                        .into_iter()
457                        .chain(old_row.as_inner().iter().skip(input_len).cloned()) // chain old outputs
458                        .collect(),
459                );
460
461                // apply & emit the change
462                let record = Record::Update {
463                    old_row: &old_row,
464                    new_row: &new_row,
465                };
466                if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
467                    yield chunk;
468                }
469                this.state_table.write_record(record);
470                if from_cache {
471                    cache.insert(key, new_row);
472                }
473            }
474
475            let mut partition = OverPartition::new(
476                &part_key,
477                &mut cache,
478                this.cache_policy,
479                &this.calls,
480                RowConverter {
481                    state_key_to_table_sub_pk_proj: &this.state_key_to_table_sub_pk_proj,
482                    order_key_indices: &this.order_key_indices,
483                    order_key_data_types: &this.order_key_data_types,
484                    order_key_order_types: &this.order_key_order_types,
485                    input_pk_indices: &this.input_pk_indices,
486                },
487            );
488
489            if delta.is_empty() {
490                continue;
491            }
492
493            // Build changes for current partition.
494            let (part_changes, accessed_range) =
495                partition.build_changes(&this.state_table, delta).await?;
496
497            for (key, record) in part_changes {
498                // Build chunk and yield if needed.
499                if !key_change_updated_pks.contains(&key.pk) {
500                    if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
501                        yield chunk;
502                    }
503                } else {
504                    // For key-change updates, we should wait for both `Delete` and `Insert` changes
505                    // and merge them together.
506                    let pk = key.pk.clone();
507                    let record = record.clone();
508                    if let Some(existed) = key_change_update_buffer.remove(&key.pk) {
509                        match (existed, record) {
510                            (Record::Insert { new_row }, Record::Delete { old_row })
511                            | (Record::Delete { old_row }, Record::Insert { new_row }) => {
512                                // merge `Delete` and `Insert` into `Update`
513                                if let Some(chunk) =
514                                    chunk_builder.append_record(Record::Update { old_row, new_row })
515                                {
516                                    yield chunk;
517                                }
518                            }
519                            (existed, record) => {
520                                // when stream is inconsistent, there may be an `Update` of which the old pk does not actually exist
521                                consistency_panic!(
522                                    ?existed,
523                                    ?record,
524                                    "other cases should not exist",
525                                );
526
527                                key_change_update_buffer.insert(pk, record);
528                                if let Some(chunk) = chunk_builder.append_record(existed) {
529                                    yield chunk;
530                                }
531                            }
532                        }
533                    } else {
534                        key_change_update_buffer.insert(pk, record);
535                    }
536                }
537
538                // Apply the change record.
539                partition.write_record(&mut this.state_table, key, record);
540            }
541
542            if !key_change_update_buffer.is_empty() {
543                consistency_panic!(
544                    ?key_change_update_buffer,
545                    "key-change update buffer should be empty after processing"
546                );
547                // if in non-strict mode, we can reach here, but we don't know the `StateKey`,
548                // so just ignore the buffer.
549            }
550
551            let cache_len = partition.cache_real_len();
552            let stats = partition.summarize();
553            metrics
554                .over_window_range_cache_entry_count
555                .set(cache_len as i64);
556            metrics
557                .over_window_range_cache_lookup_count
558                .inc_by(stats.lookup_count);
559            metrics
560                .over_window_range_cache_left_miss_count
561                .inc_by(stats.left_miss_count);
562            metrics
563                .over_window_range_cache_right_miss_count
564                .inc_by(stats.right_miss_count);
565            metrics
566                .over_window_accessed_entry_count
567                .inc_by(stats.accessed_entry_count);
568            metrics
569                .over_window_compute_count
570                .inc_by(stats.compute_count);
571            metrics
572                .over_window_same_output_count
573                .inc_by(stats.same_output_count);
574
575            // Update recently accessed range for later shrinking cache.
576            if !this.cache_policy.is_full()
577                && let Some(accessed_range) = accessed_range
578            {
579                match vars.recently_accessed_ranges.entry(part_key) {
580                    btree_map::Entry::Vacant(vacant) => {
581                        vacant.insert(accessed_range);
582                    }
583                    btree_map::Entry::Occupied(mut occupied) => {
584                        let recently_accessed_range = occupied.get_mut();
585                        let min_start = accessed_range
586                            .start()
587                            .min(recently_accessed_range.start())
588                            .clone();
589                        let max_end = accessed_range
590                            .end()
591                            .max(recently_accessed_range.end())
592                            .clone();
593                        *recently_accessed_range = min_start..=max_end;
594                    }
595                }
596            }
597        }
598
599        // Yield remaining changes to downstream.
600        if let Some(chunk) = chunk_builder.take() {
601            yield chunk;
602        }
603    }
604
605    #[try_stream(ok = Message, error = StreamExecutorError)]
606    async fn executor_inner(self) {
607        let OverWindowExecutor {
608            input,
609            inner: mut this,
610        } = self;
611
612        let metrics_info = MetricsInfo::new(
613            this.actor_ctx.streaming_metrics.clone(),
614            this.state_table.table_id(),
615            this.actor_ctx.id,
616            "OverWindow",
617        );
618
619        let metrics = metrics_info.metrics.new_over_window_metrics(
620            this.state_table.table_id(),
621            this.actor_ctx.id,
622            this.actor_ctx.fragment_id,
623        );
624
625        let mut vars = ExecutionVars {
626            cached_partitions: ManagedLruCache::unbounded(
627                this.watermark_sequence.clone(),
628                metrics_info,
629            ),
630            recently_accessed_ranges: Default::default(),
631            stats: Default::default(),
632            _phantom: PhantomData::<S>,
633        };
634
635        let mut input = input.execute();
636        let barrier = expect_first_barrier(&mut input).await?;
637        let first_epoch = barrier.epoch;
638        yield Message::Barrier(barrier);
639        this.state_table.init_epoch(first_epoch).await?;
640
641        #[for_await]
642        for msg in input {
643            let msg = msg?;
644            match msg {
645                Message::Watermark(_) => {
646                    // TODO(rc): ignore watermark for now, we need to think about watermark for
647                    // window functions like `lead` carefully.
648                    continue;
649                }
650                Message::Chunk(chunk) => {
651                    #[for_await]
652                    for chunk in Self::apply_chunk(&mut this, &mut vars, chunk, &metrics) {
653                        yield Message::Chunk(chunk?);
654                    }
655                    this.state_table.try_flush().await?;
656                }
657                Message::Barrier(barrier) => {
658                    let post_commit = this.state_table.commit(barrier.epoch).await?;
659
660                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
661                    yield Message::Barrier(barrier);
662
663                    vars.cached_partitions.evict();
664
665                    metrics
666                        .over_window_cached_entry_count
667                        .set(vars.cached_partitions.len() as _);
668                    metrics
669                        .over_window_cache_lookup_count
670                        .inc_by(std::mem::take(&mut vars.stats.cache_lookup));
671                    metrics
672                        .over_window_cache_miss_count
673                        .inc_by(std::mem::take(&mut vars.stats.cache_miss));
674
675                    if let Some((_, cache_may_stale)) =
676                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
677                    {
678                        if cache_may_stale {
679                            vars.cached_partitions.clear();
680                            vars.recently_accessed_ranges.clear();
681                        }
682                    }
683
684                    if !this.cache_policy.is_full() {
685                        for (part_key, recently_accessed_range) in
686                            std::mem::take(&mut vars.recently_accessed_ranges)
687                        {
688                            if let Some(mut range_cache) =
689                                vars.cached_partitions.get_mut(&part_key.0)
690                            {
691                                shrink_partition_cache(
692                                    &part_key.0,
693                                    &mut range_cache,
694                                    this.cache_policy,
695                                    recently_accessed_range,
696                                );
697                            }
698                        }
699                    }
700                }
701            }
702        }
703    }
704}
705
706/// A converter that helps convert [`StateKey`] to state table sub-PK and convert executor input/output
707/// row to [`StateKey`].
708///
709/// ## Notes
710///
711/// - [`StateKey`]: Over window range cache key type, containing order key and input pk.
712/// - State table sub-PK: State table PK = PK prefix (partition key) + sub-PK (order key + input pk).
713/// - Input/output row: Input schema is the prefix of output schema.
714///
715/// You can see that the content of [`StateKey`] is very similar to state table sub-PK. There's only
716/// one difference: the state table PK and sub-PK don't have duplicated columns, while in [`StateKey`],
717/// `order_key` and (input)`pk` may contain duplicated columns.
718#[derive(Debug, Clone, Copy)]
719pub(super) struct RowConverter<'a> {
720    state_key_to_table_sub_pk_proj: &'a [usize],
721    order_key_indices: &'a [usize],
722    order_key_data_types: &'a [DataType],
723    order_key_order_types: &'a [OrderType],
724    input_pk_indices: &'a [usize],
725}
726
727impl<'a> RowConverter<'a> {
728    /// Calculate the indices needed for projection from [`StateKey`] to state table sub-PK (used to do
729    /// prefixed table scanning). Ideally this function should be called only once by each executor instance.
730    /// The projection indices vec is the *selected column indices* in [`StateKey`].`order_key.chain(input_pk)`.
731    pub(super) fn calc_state_key_to_table_sub_pk_proj(
732        partition_key_indices: &[usize],
733        order_key_indices: &[usize],
734        input_pk_indices: &'a [usize],
735    ) -> Vec<usize> {
736        // This process is corresponding to `StreamOverWindow::infer_state_table`.
737        let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len());
738        let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
739        for (proj_idx, key_idx) in order_key_indices
740            .iter()
741            .chain(input_pk_indices.iter())
742            .enumerate()
743        {
744            if col_dedup.insert(*key_idx) {
745                projection.push(proj_idx);
746            }
747        }
748        projection.shrink_to_fit();
749        projection
750    }
751
752    /// Convert [`StateKey`] to sub-PK (table PK without partition key) as [`OwnedRow`].
753    pub(super) fn state_key_to_table_sub_pk(
754        &self,
755        key: &StateKey,
756    ) -> StreamExecutorResult<OwnedRow> {
757        Ok(memcmp_encoding::decode_row(
758            &key.order_key,
759            self.order_key_data_types,
760            self.order_key_order_types,
761        )?
762        .chain(key.pk.as_inner())
763        .project(self.state_key_to_table_sub_pk_proj)
764        .into_owned_row())
765    }
766
767    /// Convert full input/output row to [`StateKey`].
768    pub(super) fn row_to_state_key(
769        &self,
770        full_row: impl Row + Copy,
771    ) -> StreamExecutorResult<StateKey> {
772        Ok(StateKey {
773            order_key: memcmp_encoding::encode_row(
774                full_row.project(self.order_key_indices),
775                self.order_key_order_types,
776            )?,
777            pk: full_row
778                .project(self.input_pk_indices)
779                .into_owned_row()
780                .into(),
781        })
782    }
783}