risingwave_stream/executor/over_window/
general.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet, btree_map};
16use std::marker::PhantomData;
17use std::ops::RangeInclusive;
18
19use delta_btree_map::Change;
20use itertools::Itertools;
21use risingwave_common::array::Op;
22use risingwave_common::array::stream_record::Record;
23use risingwave_common::row::RowExt;
24use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
25use risingwave_common::types::DefaultOrdered;
26use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
27use risingwave_common::util::sort_util::OrderType;
28use risingwave_expr::window_function::{
29    RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall,
30};
31
32use super::frame_finder::merge_rows_frames;
33use super::over_partition::{OverPartition, PartitionDelta};
34use super::range_cache::{CacheKey, PartitionCache};
35use crate::cache::ManagedLruCache;
36use crate::common::metrics::MetricsInfo;
37use crate::consistency::consistency_panic;
38use crate::executor::monitor::OverWindowMetrics;
39use crate::executor::prelude::*;
40
41/// [`OverWindowExecutor`] consumes retractable input stream and produces window function outputs.
42/// One [`OverWindowExecutor`] can handle one combination of partition key and order key.
43///
44/// - State table schema = output schema, state table pk = `partition key | order key | input pk`.
45/// - Output schema = input schema + window function results.
46pub struct OverWindowExecutor<S: StateStore> {
47    input: Executor,
48    inner: ExecutorInner<S>,
49}
50
51struct ExecutorInner<S: StateStore> {
52    actor_ctx: ActorContextRef,
53
54    schema: Schema,
55    calls: Calls,
56    deduped_part_key_indices: Vec<usize>,
57    order_key_indices: Vec<usize>,
58    order_key_data_types: Vec<DataType>,
59    order_key_order_types: Vec<OrderType>,
60    input_pk_indices: Vec<usize>,
61    state_key_to_table_sub_pk_proj: Vec<usize>,
62
63    state_table: StateTable<S>,
64    watermark_sequence: AtomicU64Ref,
65
66    /// The maximum size of the chunk produced by executor at a time.
67    chunk_size: usize,
68    cache_policy: CachePolicy,
69}
70
71struct ExecutionVars<S: StateStore> {
72    /// partition key => partition range cache.
73    cached_partitions: ManagedLruCache<OwnedRow, PartitionCache>,
74    /// partition key => recently accessed range.
75    recently_accessed_ranges: BTreeMap<DefaultOrdered<OwnedRow>, RangeInclusive<StateKey>>,
76    stats: ExecutionStats,
77    _phantom: PhantomData<S>,
78}
79
80#[derive(Default)]
81struct ExecutionStats {
82    cache_miss: u64,
83    cache_lookup: u64,
84}
85
86impl<S: StateStore> Execute for OverWindowExecutor<S> {
87    fn execute(self: Box<Self>) -> crate::executor::BoxedMessageStream {
88        self.executor_inner().boxed()
89    }
90}
91
92impl<S: StateStore> ExecutorInner<S> {
93    /// Get deduplicated partition key from a full row, which happened to be the prefix of table PK.
94    fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
95        full_row
96            .project(&self.deduped_part_key_indices)
97            .into_owned_row()
98    }
99
100    fn get_input_pk(&self, full_row: impl Row) -> OwnedRow {
101        full_row.project(&self.input_pk_indices).into_owned_row()
102    }
103
104    /// `full_row` can be an input row or state table row.
105    fn encode_order_key(&self, full_row: impl Row) -> StreamExecutorResult<MemcmpEncoded> {
106        Ok(memcmp_encoding::encode_row(
107            full_row.project(&self.order_key_indices),
108            &self.order_key_order_types,
109        )?)
110    }
111
112    fn row_to_cache_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<CacheKey> {
113        Ok(CacheKey::Normal(StateKey {
114            order_key: self.encode_order_key(full_row)?,
115            pk: self.get_input_pk(full_row).into(),
116        }))
117    }
118}
119
120pub struct OverWindowExecutorArgs<S: StateStore> {
121    pub actor_ctx: ActorContextRef,
122
123    pub input: Executor,
124
125    pub schema: Schema,
126    pub calls: Vec<WindowFuncCall>,
127    pub partition_key_indices: Vec<usize>,
128    pub order_key_indices: Vec<usize>,
129    pub order_key_order_types: Vec<OrderType>,
130
131    pub state_table: StateTable<S>,
132    pub watermark_epoch: AtomicU64Ref,
133    pub metrics: Arc<StreamingMetrics>,
134
135    pub chunk_size: usize,
136    pub cache_policy: CachePolicy,
137}
138
139/// Information about the window function calls.
140/// Contains the original calls and many other information that can be derived from the calls to avoid
141/// repeated calculation.
142pub(super) struct Calls {
143    calls: Vec<WindowFuncCall>,
144
145    /// The `ROWS` frame that is the union of all `ROWS` frames.
146    pub(super) super_rows_frame_bounds: RowsFrameBounds,
147    /// All `RANGE` frames.
148    pub(super) range_frames: Vec<RangeFrameBounds>,
149    pub(super) start_is_unbounded: bool,
150    pub(super) end_is_unbounded: bool,
151    /// Deduplicated indices of all arguments of all calls.
152    pub(super) all_arg_indices: Vec<usize>,
153
154    // TODO(rc): The following flags are used to optimize for `row_number`, `rank` and `dense_rank`.
155    // We should try our best to remove these flags while maintaining the performance in the future.
156    pub(super) numbering_only: bool,
157    pub(super) has_rank: bool,
158}
159
160impl Calls {
161    fn new(calls: Vec<WindowFuncCall>) -> Self {
162        let rows_frames = calls
163            .iter()
164            .filter_map(|call| call.frame.bounds.as_rows())
165            .collect::<Vec<_>>();
166        let super_rows_frame_bounds = merge_rows_frames(&rows_frames);
167        let range_frames = calls
168            .iter()
169            .filter_map(|call| call.frame.bounds.as_range())
170            .cloned()
171            .collect::<Vec<_>>();
172
173        let start_is_unbounded = calls
174            .iter()
175            .any(|call| call.frame.bounds.start_is_unbounded());
176        let end_is_unbounded = calls
177            .iter()
178            .any(|call| call.frame.bounds.end_is_unbounded());
179
180        let all_arg_indices = calls
181            .iter()
182            .flat_map(|call| call.args.val_indices().iter().copied())
183            .dedup()
184            .collect();
185
186        let numbering_only = calls.iter().all(|call| call.kind.is_numbering());
187        let has_rank = calls.iter().any(|call| call.kind.is_rank());
188
189        Self {
190            calls,
191            super_rows_frame_bounds,
192            range_frames,
193            start_is_unbounded,
194            end_is_unbounded,
195            all_arg_indices,
196            numbering_only,
197            has_rank,
198        }
199    }
200
201    pub(super) fn iter(&self) -> impl ExactSizeIterator<Item = &WindowFuncCall> {
202        self.calls.iter()
203    }
204
205    pub(super) fn len(&self) -> usize {
206        self.calls.len()
207    }
208}
209
210impl<S: StateStore> OverWindowExecutor<S> {
211    pub fn new(args: OverWindowExecutorArgs<S>) -> Self {
212        let calls = Calls::new(args.calls);
213
214        let input_info = args.input.info().clone();
215        let input_schema = &input_info.schema;
216
217        let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded;
218        let cache_policy = if has_unbounded_frame {
219            // For unbounded frames, we finally need all entries of the partition in the cache,
220            // so for simplicity we just use full cache policy for these cases.
221            CachePolicy::Full
222        } else {
223            args.cache_policy
224        };
225
226        let order_key_data_types = args
227            .order_key_indices
228            .iter()
229            .map(|i| input_schema[*i].data_type())
230            .collect();
231
232        let state_key_to_table_sub_pk_proj = RowConverter::calc_state_key_to_table_sub_pk_proj(
233            &args.partition_key_indices,
234            &args.order_key_indices,
235            &input_info.pk_indices,
236        );
237
238        let deduped_part_key_indices = {
239            let mut dedup = HashSet::new();
240            args.partition_key_indices
241                .iter()
242                .filter(|i| dedup.insert(**i))
243                .copied()
244                .collect()
245        };
246
247        Self {
248            input: args.input,
249            inner: ExecutorInner {
250                actor_ctx: args.actor_ctx,
251                schema: args.schema,
252                calls,
253                deduped_part_key_indices,
254                order_key_indices: args.order_key_indices,
255                order_key_data_types,
256                order_key_order_types: args.order_key_order_types,
257                input_pk_indices: input_info.pk_indices,
258                state_key_to_table_sub_pk_proj,
259                state_table: args.state_table,
260                watermark_sequence: args.watermark_epoch,
261                chunk_size: args.chunk_size,
262                cache_policy,
263            },
264        }
265    }
266
267    /// Merge changes by input pk in the given chunk, return a change iterator which guarantees that
268    /// each pk only appears once. This method also validates the consistency of the input
269    /// chunk.
270    ///
271    /// TODO(rc): We may want to optimize this by handling changes on the same pk during generating
272    /// partition [`Change`]s.
273    fn merge_changes_in_chunk<'a>(
274        this: &'_ ExecutorInner<S>,
275        chunk: &'a StreamChunk,
276    ) -> impl Iterator<Item = Record<RowRef<'a>>> {
277        let mut changes_merged = BTreeMap::new();
278        for (op, row) in chunk.rows() {
279            let pk = DefaultOrdered(this.get_input_pk(row));
280            match op {
281                Op::Insert | Op::UpdateInsert => {
282                    if let Some(prev_change) = changes_merged.get_mut(&pk) {
283                        match prev_change {
284                            Record::Delete { old_row } => {
285                                *prev_change = Record::Update {
286                                    old_row: *old_row,
287                                    new_row: row,
288                                };
289                            }
290                            _ => {
291                                consistency_panic!(
292                                    ?pk,
293                                    ?row,
294                                    ?prev_change,
295                                    "inconsistent changes in input chunk, double-inserting"
296                                );
297                                if let Record::Update { old_row, .. } = prev_change {
298                                    *prev_change = Record::Update {
299                                        old_row: *old_row,
300                                        new_row: row,
301                                    };
302                                } else {
303                                    *prev_change = Record::Insert { new_row: row };
304                                }
305                            }
306                        }
307                    } else {
308                        changes_merged.insert(pk, Record::Insert { new_row: row });
309                    }
310                }
311                Op::Delete | Op::UpdateDelete => {
312                    if let Some(prev_change) = changes_merged.get_mut(&pk) {
313                        match prev_change {
314                            Record::Insert { .. } => {
315                                changes_merged.remove(&pk);
316                            }
317                            Record::Update {
318                                old_row: real_old_row,
319                                ..
320                            } => {
321                                *prev_change = Record::Delete {
322                                    old_row: *real_old_row,
323                                };
324                            }
325                            _ => {
326                                consistency_panic!(
327                                    ?pk,
328                                    "inconsistent changes in input chunk, double-deleting"
329                                );
330                                *prev_change = Record::Delete { old_row: row };
331                            }
332                        }
333                    } else {
334                        changes_merged.insert(pk, Record::Delete { old_row: row });
335                    }
336                }
337            }
338        }
339        changes_merged.into_values()
340    }
341
342    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
343    async fn apply_chunk<'a>(
344        this: &'a mut ExecutorInner<S>,
345        vars: &'a mut ExecutionVars<S>,
346        chunk: StreamChunk,
347        metrics: &'a OverWindowMetrics,
348    ) {
349        // (deduped) partition key => (
350        //   significant changes happened in the partition,
351        //   no-effect changes happened in the partition,
352        // )
353        let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, (PartitionDelta, PartitionDelta)> =
354            BTreeMap::new();
355        // input pk of update records of which the order key is changed.
356        let mut key_change_updated_pks = HashSet::new();
357
358        // Collect changes for each partition.
359        for record in Self::merge_changes_in_chunk(this, &chunk) {
360            match record {
361                Record::Insert { new_row } => {
362                    let part_key = this.get_partition_key(new_row).into();
363                    let (delta, _) = deltas.entry(part_key).or_default();
364                    delta.insert(
365                        this.row_to_cache_key(new_row)?,
366                        Change::Insert(new_row.into_owned_row()),
367                    );
368                }
369                Record::Delete { old_row } => {
370                    let part_key = this.get_partition_key(old_row).into();
371                    let (delta, _) = deltas.entry(part_key).or_default();
372                    delta.insert(this.row_to_cache_key(old_row)?, Change::Delete);
373                }
374                Record::Update { old_row, new_row } => {
375                    let old_part_key = this.get_partition_key(old_row).into();
376                    let new_part_key = this.get_partition_key(new_row).into();
377                    let old_state_key = this.row_to_cache_key(old_row)?;
378                    let new_state_key = this.row_to_cache_key(new_row)?;
379                    if old_part_key == new_part_key && old_state_key == new_state_key {
380                        // not a key-change update
381                        let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default();
382                        if old_row.project(&this.calls.all_arg_indices)
383                            == new_row.project(&this.calls.all_arg_indices)
384                        {
385                            // partition key, order key and arguments are all the same
386                            no_effect_delta
387                                .insert(old_state_key, Change::Insert(new_row.into_owned_row()));
388                        } else {
389                            delta.insert(old_state_key, Change::Insert(new_row.into_owned_row()));
390                        }
391                    } else if old_part_key == new_part_key {
392                        // order-change update, split into delete + insert, will be merged after
393                        // building changes
394                        key_change_updated_pks.insert(this.get_input_pk(old_row));
395                        let (delta, _) = deltas.entry(old_part_key).or_default();
396                        delta.insert(old_state_key, Change::Delete);
397                        delta.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
398                    } else {
399                        // partition-change update, split into delete + insert
400                        // NOTE(rc): Since we append partition key to logical pk, we can't merge the
401                        // delete + insert back to update later.
402                        // TODO: IMO this behavior is problematic. Deep discussion is needed.
403                        let (old_part_delta, _) = deltas.entry(old_part_key).or_default();
404                        old_part_delta.insert(old_state_key, Change::Delete);
405                        let (new_part_delta, _) = deltas.entry(new_part_key).or_default();
406                        new_part_delta
407                            .insert(new_state_key, Change::Insert(new_row.into_owned_row()));
408                    }
409                }
410            }
411        }
412
413        // `input pk` => `Record`
414        let mut key_change_update_buffer: BTreeMap<DefaultOrdered<OwnedRow>, Record<OwnedRow>> =
415            BTreeMap::new();
416        let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
417
418        // Build final changes partition by partition.
419        for (part_key, (delta, no_effect_delta)) in deltas {
420            vars.stats.cache_lookup += 1;
421            if !vars.cached_partitions.contains(&part_key.0) {
422                vars.stats.cache_miss += 1;
423                vars.cached_partitions
424                    .put(part_key.0.clone(), PartitionCache::new());
425            }
426            let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap();
427
428            // First, handle `Update`s that don't affect window function outputs.
429            // Be careful that changes in `delta` may (though we believe unlikely) affect the
430            // window function outputs of rows in `no_effect_delta`, so before handling `delta`
431            // we need to write all changes to state table, range cache and chunk builder.
432            for (key, change) in no_effect_delta {
433                let new_row = change.into_insert().unwrap(); // new row of an `Update`
434
435                let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned()
436                {
437                    // Got old row from range cache.
438                    (old_row, true)
439                } else {
440                    // Retrieve old row from state table.
441                    let table_pk = (&new_row).project(this.state_table.pk_indices());
442                    // The accesses to the state table is ordered by table PK, so ideally we
443                    // can leverage the block cache under the hood.
444                    if let Some(old_row) = this.state_table.get_row(table_pk).await? {
445                        (old_row, false)
446                    } else {
447                        consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row");
448                        continue;
449                    }
450                };
451
452                // concatenate old outputs
453                let input_len = new_row.len();
454                let new_row = OwnedRow::new(
455                    new_row
456                        .into_iter()
457                        .chain(old_row.as_inner().iter().skip(input_len).cloned()) // chain old outputs
458                        .collect(),
459                );
460
461                // apply & emit the change
462                let record = Record::Update {
463                    old_row: &old_row,
464                    new_row: &new_row,
465                };
466                if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
467                    yield chunk;
468                }
469                this.state_table.write_record(record);
470                if from_cache {
471                    cache.insert(key, new_row);
472                }
473            }
474
475            let mut partition = OverPartition::new(
476                &part_key,
477                &mut cache,
478                this.cache_policy,
479                &this.calls,
480                RowConverter {
481                    state_key_to_table_sub_pk_proj: &this.state_key_to_table_sub_pk_proj,
482                    order_key_indices: &this.order_key_indices,
483                    order_key_data_types: &this.order_key_data_types,
484                    order_key_order_types: &this.order_key_order_types,
485                    input_pk_indices: &this.input_pk_indices,
486                },
487            );
488
489            if delta.is_empty() {
490                continue;
491            }
492
493            // Build changes for current partition.
494            let (part_changes, accessed_range) =
495                partition.build_changes(&this.state_table, delta).await?;
496
497            for (key, record) in part_changes {
498                // Build chunk and yield if needed.
499                if !key_change_updated_pks.contains(&key.pk) {
500                    if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
501                        yield chunk;
502                    }
503                } else {
504                    // For key-change updates, we should wait for both `Delete` and `Insert` changes
505                    // and merge them together.
506                    let pk = key.pk.clone();
507                    let record = record.clone();
508                    if let Some(existed) = key_change_update_buffer.remove(&key.pk) {
509                        match (existed, record) {
510                            (Record::Insert { new_row }, Record::Delete { old_row })
511                            | (Record::Delete { old_row }, Record::Insert { new_row }) => {
512                                // merge `Delete` and `Insert` into `Update`
513                                if let Some(chunk) =
514                                    chunk_builder.append_record(Record::Update { old_row, new_row })
515                                {
516                                    yield chunk;
517                                }
518                            }
519                            (existed, record) => {
520                                // when stream is inconsistent, there may be an `Update` of which the old pk does not actually exist
521                                consistency_panic!(
522                                    ?existed,
523                                    ?record,
524                                    "other cases should not exist",
525                                );
526
527                                key_change_update_buffer.insert(pk, record);
528                                if let Some(chunk) = chunk_builder.append_record(existed) {
529                                    yield chunk;
530                                }
531                            }
532                        }
533                    } else {
534                        key_change_update_buffer.insert(pk, record);
535                    }
536                }
537
538                // Apply the change record.
539                partition.write_record(&mut this.state_table, key, record);
540            }
541
542            if !key_change_update_buffer.is_empty() {
543                consistency_panic!(
544                    ?key_change_update_buffer,
545                    "key-change update buffer should be empty after processing"
546                );
547                // if in non-strict mode, we can reach here, but we don't know the `StateKey`,
548                // so just ignore the buffer.
549            }
550
551            let cache_len = partition.cache_real_len();
552            let stats = partition.summarize();
553            metrics
554                .over_window_range_cache_entry_count
555                .set(cache_len as i64);
556            metrics
557                .over_window_range_cache_lookup_count
558                .inc_by(stats.lookup_count);
559            metrics
560                .over_window_range_cache_left_miss_count
561                .inc_by(stats.left_miss_count);
562            metrics
563                .over_window_range_cache_right_miss_count
564                .inc_by(stats.right_miss_count);
565            metrics
566                .over_window_accessed_entry_count
567                .inc_by(stats.accessed_entry_count);
568            metrics
569                .over_window_compute_count
570                .inc_by(stats.compute_count);
571            metrics
572                .over_window_same_output_count
573                .inc_by(stats.same_output_count);
574
575            // Update recently accessed range for later shrinking cache.
576            if !this.cache_policy.is_full()
577                && let Some(accessed_range) = accessed_range
578            {
579                match vars.recently_accessed_ranges.entry(part_key) {
580                    btree_map::Entry::Vacant(vacant) => {
581                        vacant.insert(accessed_range);
582                    }
583                    btree_map::Entry::Occupied(mut occupied) => {
584                        let recently_accessed_range = occupied.get_mut();
585                        let min_start = accessed_range
586                            .start()
587                            .min(recently_accessed_range.start())
588                            .clone();
589                        let max_end = accessed_range
590                            .end()
591                            .max(recently_accessed_range.end())
592                            .clone();
593                        *recently_accessed_range = min_start..=max_end;
594                    }
595                }
596            }
597        }
598
599        // Yield remaining changes to downstream.
600        if let Some(chunk) = chunk_builder.take() {
601            yield chunk;
602        }
603    }
604
605    #[try_stream(ok = Message, error = StreamExecutorError)]
606    async fn executor_inner(self) {
607        let OverWindowExecutor {
608            input,
609            inner: mut this,
610        } = self;
611
612        let metrics_info = MetricsInfo::new(
613            this.actor_ctx.streaming_metrics.clone(),
614            this.state_table.table_id(),
615            this.actor_ctx.id,
616            "OverWindow",
617        );
618
619        let metrics = metrics_info.metrics.new_over_window_metrics(
620            this.state_table.table_id(),
621            this.actor_ctx.id,
622            this.actor_ctx.fragment_id,
623        );
624
625        let mut vars = ExecutionVars {
626            cached_partitions: ManagedLruCache::unbounded(
627                this.watermark_sequence.clone(),
628                metrics_info,
629            ),
630            recently_accessed_ranges: Default::default(),
631            stats: Default::default(),
632            _phantom: PhantomData::<S>,
633        };
634
635        let mut input = input.execute();
636        let barrier = expect_first_barrier(&mut input).await?;
637        let first_epoch = barrier.epoch;
638        yield Message::Barrier(barrier);
639        this.state_table.init_epoch(first_epoch).await?;
640
641        #[for_await]
642        for msg in input {
643            let msg = msg?;
644            match msg {
645                Message::Watermark(_) => {
646                    // TODO(rc): ignore watermark for now, we need to think about watermark for
647                    // window functions like `lead` carefully.
648                    continue;
649                }
650                Message::Chunk(chunk) => {
651                    #[for_await]
652                    for chunk in Self::apply_chunk(&mut this, &mut vars, chunk, &metrics) {
653                        yield Message::Chunk(chunk?);
654                    }
655                    this.state_table.try_flush().await?;
656                }
657                Message::Barrier(barrier) => {
658                    let post_commit = this.state_table.commit(barrier.epoch).await?;
659
660                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
661                    yield Message::Barrier(barrier);
662
663                    vars.cached_partitions.evict();
664
665                    metrics
666                        .over_window_cached_entry_count
667                        .set(vars.cached_partitions.len() as _);
668                    metrics
669                        .over_window_cache_lookup_count
670                        .inc_by(std::mem::take(&mut vars.stats.cache_lookup));
671                    metrics
672                        .over_window_cache_miss_count
673                        .inc_by(std::mem::take(&mut vars.stats.cache_miss));
674
675                    if let Some((_, cache_may_stale)) =
676                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
677                    {
678                        if cache_may_stale {
679                            vars.cached_partitions.clear();
680                            vars.recently_accessed_ranges.clear();
681                        }
682                    }
683
684                    if !this.cache_policy.is_full() {
685                        for (part_key, recently_accessed_range) in
686                            std::mem::take(&mut vars.recently_accessed_ranges)
687                        {
688                            if let Some(mut range_cache) =
689                                vars.cached_partitions.get_mut(&part_key.0)
690                            {
691                                range_cache.shrink(
692                                    &part_key.0,
693                                    this.cache_policy,
694                                    recently_accessed_range,
695                                );
696                            }
697                        }
698                    }
699                }
700            }
701        }
702    }
703}
704
705/// A converter that helps convert [`StateKey`] to state table sub-PK and convert executor input/output
706/// row to [`StateKey`].
707///
708/// ## Notes
709///
710/// - [`StateKey`]: Over window range cache key type, containing order key and input pk.
711/// - State table sub-PK: State table PK = PK prefix (partition key) + sub-PK (order key + input pk).
712/// - Input/output row: Input schema is the prefix of output schema.
713///
714/// You can see that the content of [`StateKey`] is very similar to state table sub-PK. There's only
715/// one difference: the state table PK and sub-PK don't have duplicated columns, while in [`StateKey`],
716/// `order_key` and (input)`pk` may contain duplicated columns.
717#[derive(Debug, Clone, Copy)]
718pub(super) struct RowConverter<'a> {
719    state_key_to_table_sub_pk_proj: &'a [usize],
720    order_key_indices: &'a [usize],
721    order_key_data_types: &'a [DataType],
722    order_key_order_types: &'a [OrderType],
723    input_pk_indices: &'a [usize],
724}
725
726impl<'a> RowConverter<'a> {
727    /// Calculate the indices needed for projection from [`StateKey`] to state table sub-PK (used to do
728    /// prefixed table scanning). Ideally this function should be called only once by each executor instance.
729    /// The projection indices vec is the *selected column indices* in [`StateKey`].`order_key.chain(input_pk)`.
730    pub(super) fn calc_state_key_to_table_sub_pk_proj(
731        partition_key_indices: &[usize],
732        order_key_indices: &[usize],
733        input_pk_indices: &'a [usize],
734    ) -> Vec<usize> {
735        // This process is corresponding to `StreamOverWindow::infer_state_table`.
736        let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len());
737        let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
738        for (proj_idx, key_idx) in order_key_indices
739            .iter()
740            .chain(input_pk_indices.iter())
741            .enumerate()
742        {
743            if col_dedup.insert(*key_idx) {
744                projection.push(proj_idx);
745            }
746        }
747        projection.shrink_to_fit();
748        projection
749    }
750
751    /// Convert [`StateKey`] to sub-PK (table PK without partition key) as [`OwnedRow`].
752    pub(super) fn state_key_to_table_sub_pk(
753        &self,
754        key: &StateKey,
755    ) -> StreamExecutorResult<OwnedRow> {
756        Ok(memcmp_encoding::decode_row(
757            &key.order_key,
758            self.order_key_data_types,
759            self.order_key_order_types,
760        )?
761        .chain(key.pk.as_inner())
762        .project(self.state_key_to_table_sub_pk_proj)
763        .into_owned_row())
764    }
765
766    /// Convert full input/output row to [`StateKey`].
767    pub(super) fn row_to_state_key(
768        &self,
769        full_row: impl Row + Copy,
770    ) -> StreamExecutorResult<StateKey> {
771        Ok(StateKey {
772            order_key: memcmp_encoding::encode_row(
773                full_row.project(self.order_key_indices),
774                self.order_key_order_types,
775            )?,
776            pk: full_row
777                .project(self.input_pk_indices)
778                .into_owned_row()
779                .into(),
780        })
781    }
782}