risingwave_stream/executor/over_window/
general.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet, btree_map};
16use std::marker::PhantomData;
17use std::ops::RangeInclusive;
18
19use delta_btree_map::Change;
20use itertools::Itertools;
21use risingwave_common::array::stream_record::Record;
22use risingwave_common::row::RowExt;
23use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
24use risingwave_common::types::DefaultOrdered;
25use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
26use risingwave_common::util::sort_util::OrderType;
27use risingwave_expr::window_function::{
28    RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall,
29};
30
31use super::frame_finder::merge_rows_frames;
32use super::over_partition::{OverPartition, PartitionDelta};
33use super::range_cache::{CacheKey, PartitionCache};
34use crate::cache::ManagedLruCache;
35use crate::common::change_buffer::ChangeBuffer;
36use crate::common::metrics::MetricsInfo;
37use crate::consistency::consistency_panic;
38use crate::executor::monitor::OverWindowMetrics;
39use crate::executor::prelude::*;
40
41/// [`OverWindowExecutor`] consumes retractable input stream and produces window function outputs.
42/// One [`OverWindowExecutor`] can handle one combination of partition key and order key.
43///
44/// - State table schema = output schema, state table pk = `partition key | order key | input pk`.
45/// - Output schema = input schema + window function results.
46pub struct OverWindowExecutor<S: StateStore> {
47    input: Executor,
48    inner: ExecutorInner<S>,
49}
50
51struct ExecutorInner<S: StateStore> {
52    actor_ctx: ActorContextRef,
53
54    schema: Schema,
55    calls: Calls,
56    deduped_part_key_indices: Vec<usize>,
57    order_key_indices: Vec<usize>,
58    order_key_data_types: Vec<DataType>,
59    order_key_order_types: Vec<OrderType>,
60    input_stream_key: Vec<usize>,
61    state_key_to_table_sub_pk_proj: Vec<usize>,
62
63    state_table: StateTable<S>,
64    watermark_sequence: AtomicU64Ref,
65
66    /// The maximum size of the chunk produced by executor at a time.
67    chunk_size: usize,
68    cache_policy: CachePolicy,
69}
70
71struct ExecutionVars<S: StateStore> {
72    /// partition key => partition range cache.
73    cached_partitions: ManagedLruCache<OwnedRow, PartitionCache>,
74    /// partition key => recently accessed range.
75    recently_accessed_ranges: BTreeMap<DefaultOrdered<OwnedRow>, RangeInclusive<StateKey>>,
76    stats: ExecutionStats,
77    _phantom: PhantomData<S>,
78}
79
80#[derive(Default)]
81struct ExecutionStats {
82    cache_miss: u64,
83    cache_lookup: u64,
84}
85
86impl<S: StateStore> Execute for OverWindowExecutor<S> {
87    fn execute(self: Box<Self>) -> crate::executor::BoxedMessageStream {
88        self.executor_inner().boxed()
89    }
90}
91
92impl<S: StateStore> ExecutorInner<S> {
93    /// Get deduplicated partition key from a full row, which happened to be the prefix of table PK.
94    fn get_partition_key(&self, full_row: impl Row) -> OwnedRow {
95        full_row
96            .project(&self.deduped_part_key_indices)
97            .into_owned_row()
98    }
99
100    fn get_input_pk(&self, full_row: impl Row) -> OwnedRow {
101        full_row.project(&self.input_stream_key).into_owned_row()
102    }
103
104    /// `full_row` can be an input row or state table row.
105    fn encode_order_key(&self, full_row: impl Row) -> StreamExecutorResult<MemcmpEncoded> {
106        Ok(memcmp_encoding::encode_row(
107            full_row.project(&self.order_key_indices),
108            &self.order_key_order_types,
109        )?)
110    }
111
112    fn row_to_cache_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<CacheKey> {
113        Ok(CacheKey::Normal(StateKey {
114            order_key: self.encode_order_key(full_row)?,
115            pk: self.get_input_pk(full_row).into(),
116        }))
117    }
118}
119
120pub struct OverWindowExecutorArgs<S: StateStore> {
121    pub actor_ctx: ActorContextRef,
122
123    pub input: Executor,
124
125    pub schema: Schema,
126    pub calls: Vec<WindowFuncCall>,
127    pub partition_key_indices: Vec<usize>,
128    pub order_key_indices: Vec<usize>,
129    pub order_key_order_types: Vec<OrderType>,
130
131    pub state_table: StateTable<S>,
132    pub watermark_epoch: AtomicU64Ref,
133    pub metrics: Arc<StreamingMetrics>,
134
135    pub chunk_size: usize,
136    pub cache_policy: CachePolicy,
137}
138
139/// Information about the window function calls.
140/// Contains the original calls and many other information that can be derived from the calls to avoid
141/// repeated calculation.
142pub(super) struct Calls {
143    calls: Vec<WindowFuncCall>,
144
145    /// The `ROWS` frame that is the union of all `ROWS` frames.
146    pub(super) super_rows_frame_bounds: RowsFrameBounds,
147    /// All `RANGE` frames.
148    pub(super) range_frames: Vec<RangeFrameBounds>,
149    pub(super) start_is_unbounded: bool,
150    pub(super) end_is_unbounded: bool,
151    /// Deduplicated indices of all arguments of all calls.
152    pub(super) all_arg_indices: Vec<usize>,
153
154    // TODO(rc): The following flags are used to optimize for `row_number`, `rank` and `dense_rank`.
155    // We should try our best to remove these flags while maintaining the performance in the future.
156    pub(super) numbering_only: bool,
157    pub(super) has_rank: bool,
158}
159
160impl Calls {
161    fn new(calls: Vec<WindowFuncCall>) -> Self {
162        let rows_frames = calls
163            .iter()
164            .filter_map(|call| call.frame.bounds.as_rows())
165            .collect::<Vec<_>>();
166        let super_rows_frame_bounds = merge_rows_frames(&rows_frames);
167        let range_frames = calls
168            .iter()
169            .filter_map(|call| call.frame.bounds.as_range())
170            .cloned()
171            .collect::<Vec<_>>();
172
173        let start_is_unbounded = calls
174            .iter()
175            .any(|call| call.frame.bounds.start_is_unbounded());
176        let end_is_unbounded = calls
177            .iter()
178            .any(|call| call.frame.bounds.end_is_unbounded());
179
180        let all_arg_indices = calls
181            .iter()
182            .flat_map(|call| call.args.val_indices().iter().copied())
183            .dedup()
184            .collect();
185
186        let numbering_only = calls.iter().all(|call| call.kind.is_numbering());
187        let has_rank = calls.iter().any(|call| call.kind.is_rank());
188
189        Self {
190            calls,
191            super_rows_frame_bounds,
192            range_frames,
193            start_is_unbounded,
194            end_is_unbounded,
195            all_arg_indices,
196            numbering_only,
197            has_rank,
198        }
199    }
200
201    pub(super) fn iter(&self) -> impl ExactSizeIterator<Item = &WindowFuncCall> {
202        self.calls.iter()
203    }
204
205    pub(super) fn len(&self) -> usize {
206        self.calls.len()
207    }
208}
209
210impl<S: StateStore> OverWindowExecutor<S> {
211    pub fn new(args: OverWindowExecutorArgs<S>) -> Self {
212        let calls = Calls::new(args.calls);
213
214        let input_info = args.input.info().clone();
215        let input_schema = &input_info.schema;
216
217        let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded;
218        let cache_policy = if has_unbounded_frame {
219            // For unbounded frames, we finally need all entries of the partition in the cache,
220            // so for simplicity we just use full cache policy for these cases.
221            CachePolicy::Full
222        } else {
223            args.cache_policy
224        };
225
226        let order_key_data_types = args
227            .order_key_indices
228            .iter()
229            .map(|i| input_schema[*i].data_type())
230            .collect();
231
232        let state_key_to_table_sub_pk_proj = RowConverter::calc_state_key_to_table_sub_pk_proj(
233            &args.partition_key_indices,
234            &args.order_key_indices,
235            &input_info.stream_key,
236        );
237
238        let deduped_part_key_indices = {
239            let mut dedup = HashSet::new();
240            args.partition_key_indices
241                .iter()
242                .filter(|i| dedup.insert(**i))
243                .copied()
244                .collect()
245        };
246
247        Self {
248            input: args.input,
249            inner: ExecutorInner {
250                actor_ctx: args.actor_ctx,
251                schema: args.schema,
252                calls,
253                deduped_part_key_indices,
254                order_key_indices: args.order_key_indices,
255                order_key_data_types,
256                order_key_order_types: args.order_key_order_types,
257                input_stream_key: input_info.stream_key,
258                state_key_to_table_sub_pk_proj,
259                state_table: args.state_table,
260                watermark_sequence: args.watermark_epoch,
261                chunk_size: args.chunk_size,
262                cache_policy,
263            },
264        }
265    }
266
267    /// Merge changes by input pk in the given chunk, return a change iterator which guarantees that
268    /// each pk only appears once. This method also validates the consistency of the input
269    /// chunk.
270    ///
271    /// TODO(rc): We may want to optimize this by handling changes on the same pk during generating
272    /// partition [`Change`]s.
273    fn merge_changes_in_chunk<'a>(
274        this: &'_ ExecutorInner<S>,
275        chunk: &'a StreamChunk,
276    ) -> impl Iterator<Item = Record<RowRef<'a>>> {
277        let mut cb = ChangeBuffer::with_capacity(chunk.cardinality());
278        for record in chunk.records() {
279            cb.apply_record(record, |row| this.get_input_pk(row));
280        }
281        cb.into_records()
282    }
283
284    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
285    async fn apply_chunk<'a>(
286        this: &'a mut ExecutorInner<S>,
287        vars: &'a mut ExecutionVars<S>,
288        chunk: StreamChunk,
289        metrics: &'a OverWindowMetrics,
290    ) {
291        // (deduped) partition key => (
292        //   significant changes happened in the partition,
293        //   no-effect changes happened in the partition,
294        // )
295        let mut deltas: BTreeMap<DefaultOrdered<OwnedRow>, (PartitionDelta, PartitionDelta)> =
296            BTreeMap::new();
297        // input pk of update records of which the order key is changed.
298        let mut key_change_updated_pks = HashSet::new();
299
300        // Collect changes for each partition.
301        for record in Self::merge_changes_in_chunk(this, &chunk) {
302            match record {
303                Record::Insert { new_row } => {
304                    let part_key = this.get_partition_key(new_row).into();
305                    let (delta, _) = deltas.entry(part_key).or_default();
306                    delta.insert(
307                        this.row_to_cache_key(new_row)?,
308                        Change::Insert(new_row.into_owned_row()),
309                    );
310                }
311                Record::Delete { old_row } => {
312                    let part_key = this.get_partition_key(old_row).into();
313                    let (delta, _) = deltas.entry(part_key).or_default();
314                    delta.insert(this.row_to_cache_key(old_row)?, Change::Delete);
315                }
316                Record::Update { old_row, new_row } => {
317                    let old_part_key = this.get_partition_key(old_row).into();
318                    let new_part_key = this.get_partition_key(new_row).into();
319                    let old_state_key = this.row_to_cache_key(old_row)?;
320                    let new_state_key = this.row_to_cache_key(new_row)?;
321                    if old_part_key == new_part_key && old_state_key == new_state_key {
322                        // not a key-change update
323                        let (delta, no_effect_delta) = deltas.entry(old_part_key).or_default();
324                        if old_row.project(&this.calls.all_arg_indices)
325                            == new_row.project(&this.calls.all_arg_indices)
326                        {
327                            // partition key, order key and arguments are all the same
328                            no_effect_delta
329                                .insert(old_state_key, Change::Insert(new_row.into_owned_row()));
330                        } else {
331                            delta.insert(old_state_key, Change::Insert(new_row.into_owned_row()));
332                        }
333                    } else if old_part_key == new_part_key {
334                        // order-change update, split into delete + insert, will be merged after
335                        // building changes
336                        key_change_updated_pks.insert(this.get_input_pk(old_row));
337                        let (delta, _) = deltas.entry(old_part_key).or_default();
338                        delta.insert(old_state_key, Change::Delete);
339                        delta.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
340                    } else {
341                        // partition-change update, split into delete + insert
342                        // NOTE(rc): Since we append partition key to logical pk, we can't merge the
343                        // delete + insert back to update later.
344                        // TODO: IMO this behavior is problematic. Deep discussion is needed.
345                        let (old_part_delta, _) = deltas.entry(old_part_key).or_default();
346                        old_part_delta.insert(old_state_key, Change::Delete);
347                        let (new_part_delta, _) = deltas.entry(new_part_key).or_default();
348                        new_part_delta
349                            .insert(new_state_key, Change::Insert(new_row.into_owned_row()));
350                    }
351                }
352            }
353        }
354
355        // `input pk` => `Record`
356        let mut key_change_update_buffer: BTreeMap<DefaultOrdered<OwnedRow>, Record<OwnedRow>> =
357            BTreeMap::new();
358        let mut chunk_builder = StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
359
360        // Build final changes partition by partition.
361        for (part_key, (delta, no_effect_delta)) in deltas {
362            vars.stats.cache_lookup += 1;
363            if !vars.cached_partitions.contains(&part_key.0) {
364                vars.stats.cache_miss += 1;
365                vars.cached_partitions
366                    .put(part_key.0.clone(), PartitionCache::new());
367            }
368            let mut cache = vars.cached_partitions.get_mut(&part_key).unwrap();
369
370            // First, handle `Update`s that don't affect window function outputs.
371            // Be careful that changes in `delta` may (though we believe unlikely) affect the
372            // window function outputs of rows in `no_effect_delta`, so before handling `delta`
373            // we need to write all changes to state table, range cache and chunk builder.
374            for (key, change) in no_effect_delta {
375                let new_row = change.into_insert().unwrap(); // new row of an `Update`
376
377                let (old_row, from_cache) = if let Some(old_row) = cache.inner().get(&key).cloned()
378                {
379                    // Got old row from range cache.
380                    (old_row, true)
381                } else {
382                    // Retrieve old row from state table.
383                    let table_pk = (&new_row).project(this.state_table.pk_indices());
384                    // The accesses to the state table is ordered by table PK, so ideally we
385                    // can leverage the block cache under the hood.
386                    if let Some(old_row) = this.state_table.get_row(table_pk).await? {
387                        (old_row, false)
388                    } else {
389                        consistency_panic!(?part_key, ?key, ?new_row, "updating non-existing row");
390                        continue;
391                    }
392                };
393
394                // concatenate old outputs
395                let input_len = new_row.len();
396                let new_row = OwnedRow::new(
397                    new_row
398                        .into_iter()
399                        .chain(old_row.as_inner().iter().skip(input_len).cloned()) // chain old outputs
400                        .collect(),
401                );
402
403                // apply & emit the change
404                let record = Record::Update {
405                    old_row: &old_row,
406                    new_row: &new_row,
407                };
408                if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
409                    yield chunk;
410                }
411                this.state_table.write_record(record);
412                if from_cache {
413                    cache.insert(key, new_row);
414                }
415            }
416
417            let mut partition = OverPartition::new(
418                &part_key,
419                &mut cache,
420                this.cache_policy,
421                &this.calls,
422                RowConverter {
423                    state_key_to_table_sub_pk_proj: &this.state_key_to_table_sub_pk_proj,
424                    order_key_indices: &this.order_key_indices,
425                    order_key_data_types: &this.order_key_data_types,
426                    order_key_order_types: &this.order_key_order_types,
427                    input_stream_key_indices: &this.input_stream_key,
428                },
429            );
430
431            if delta.is_empty() {
432                continue;
433            }
434
435            // Build changes for current partition.
436            let (part_changes, accessed_range) =
437                partition.build_changes(&this.state_table, delta).await?;
438
439            for (key, record) in part_changes {
440                // Build chunk and yield if needed.
441                if !key_change_updated_pks.contains(&key.pk) {
442                    if let Some(chunk) = chunk_builder.append_record(record.as_ref()) {
443                        yield chunk;
444                    }
445                } else {
446                    // For key-change updates, we should wait for both `Delete` and `Insert` changes
447                    // and merge them together.
448                    let pk = key.pk.clone();
449                    let record = record.clone();
450                    if let Some(existed) = key_change_update_buffer.remove(&key.pk) {
451                        match (existed, record) {
452                            (Record::Insert { new_row }, Record::Delete { old_row })
453                            | (Record::Delete { old_row }, Record::Insert { new_row }) => {
454                                // merge `Delete` and `Insert` into `Update`
455                                if let Some(chunk) =
456                                    chunk_builder.append_record(Record::Update { old_row, new_row })
457                                {
458                                    yield chunk;
459                                }
460                            }
461                            (existed, record) => {
462                                // when stream is inconsistent, there may be an `Update` of which the old pk does not actually exist
463                                consistency_panic!(
464                                    ?existed,
465                                    ?record,
466                                    "other cases should not exist",
467                                );
468
469                                key_change_update_buffer.insert(pk, record);
470                                if let Some(chunk) = chunk_builder.append_record(existed) {
471                                    yield chunk;
472                                }
473                            }
474                        }
475                    } else {
476                        key_change_update_buffer.insert(pk, record);
477                    }
478                }
479
480                // Apply the change record.
481                partition.write_record(&mut this.state_table, key, record);
482            }
483
484            if !key_change_update_buffer.is_empty() {
485                consistency_panic!(
486                    ?key_change_update_buffer,
487                    "key-change update buffer should be empty after processing"
488                );
489                // if in non-strict mode, we can reach here, but we don't know the `StateKey`,
490                // so just ignore the buffer.
491            }
492
493            let cache_len = partition.cache_real_len();
494            let stats = partition.summarize();
495            metrics
496                .over_window_range_cache_entry_count
497                .set(cache_len as i64);
498            metrics
499                .over_window_range_cache_lookup_count
500                .inc_by(stats.lookup_count);
501            metrics
502                .over_window_range_cache_left_miss_count
503                .inc_by(stats.left_miss_count);
504            metrics
505                .over_window_range_cache_right_miss_count
506                .inc_by(stats.right_miss_count);
507            metrics
508                .over_window_accessed_entry_count
509                .inc_by(stats.accessed_entry_count);
510            metrics
511                .over_window_compute_count
512                .inc_by(stats.compute_count);
513            metrics
514                .over_window_same_output_count
515                .inc_by(stats.same_output_count);
516
517            // Update recently accessed range for later shrinking cache.
518            if !this.cache_policy.is_full()
519                && let Some(accessed_range) = accessed_range
520            {
521                match vars.recently_accessed_ranges.entry(part_key) {
522                    btree_map::Entry::Vacant(vacant) => {
523                        vacant.insert(accessed_range);
524                    }
525                    btree_map::Entry::Occupied(mut occupied) => {
526                        let recently_accessed_range = occupied.get_mut();
527                        let min_start = accessed_range
528                            .start()
529                            .min(recently_accessed_range.start())
530                            .clone();
531                        let max_end = accessed_range
532                            .end()
533                            .max(recently_accessed_range.end())
534                            .clone();
535                        *recently_accessed_range = min_start..=max_end;
536                    }
537                }
538            }
539        }
540
541        // Yield remaining changes to downstream.
542        if let Some(chunk) = chunk_builder.take() {
543            yield chunk;
544        }
545    }
546
547    #[try_stream(ok = Message, error = StreamExecutorError)]
548    async fn executor_inner(self) {
549        let OverWindowExecutor {
550            input,
551            inner: mut this,
552        } = self;
553
554        let metrics_info = MetricsInfo::new(
555            this.actor_ctx.streaming_metrics.clone(),
556            this.state_table.table_id(),
557            this.actor_ctx.id,
558            "OverWindow",
559        );
560
561        let metrics = metrics_info.metrics.new_over_window_metrics(
562            this.state_table.table_id(),
563            this.actor_ctx.id,
564            this.actor_ctx.fragment_id,
565        );
566
567        let mut vars = ExecutionVars {
568            cached_partitions: ManagedLruCache::unbounded(
569                this.watermark_sequence.clone(),
570                metrics_info,
571            ),
572            recently_accessed_ranges: Default::default(),
573            stats: Default::default(),
574            _phantom: PhantomData::<S>,
575        };
576
577        let mut input = input.execute();
578        let barrier = expect_first_barrier(&mut input).await?;
579        let first_epoch = barrier.epoch;
580        yield Message::Barrier(barrier);
581        this.state_table.init_epoch(first_epoch).await?;
582
583        #[for_await]
584        for msg in input {
585            let msg = msg?;
586            match msg {
587                Message::Watermark(_) => {
588                    // TODO(rc): ignore watermark for now, we need to think about watermark for
589                    // window functions like `lead` carefully.
590                    continue;
591                }
592                Message::Chunk(chunk) => {
593                    #[for_await]
594                    for chunk in Self::apply_chunk(&mut this, &mut vars, chunk, &metrics) {
595                        yield Message::Chunk(chunk?);
596                    }
597                    this.state_table.try_flush().await?;
598                }
599                Message::Barrier(barrier) => {
600                    let post_commit = this.state_table.commit(barrier.epoch).await?;
601
602                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
603                    yield Message::Barrier(barrier);
604
605                    vars.cached_partitions.evict();
606
607                    metrics
608                        .over_window_cached_entry_count
609                        .set(vars.cached_partitions.len() as _);
610                    metrics
611                        .over_window_cache_lookup_count
612                        .inc_by(std::mem::take(&mut vars.stats.cache_lookup));
613                    metrics
614                        .over_window_cache_miss_count
615                        .inc_by(std::mem::take(&mut vars.stats.cache_miss));
616
617                    if let Some((_, cache_may_stale)) =
618                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
619                        && cache_may_stale
620                    {
621                        vars.cached_partitions.clear();
622                        vars.recently_accessed_ranges.clear();
623                    }
624
625                    if !this.cache_policy.is_full() {
626                        for (part_key, recently_accessed_range) in
627                            std::mem::take(&mut vars.recently_accessed_ranges)
628                        {
629                            if let Some(mut range_cache) =
630                                vars.cached_partitions.get_mut(&part_key.0)
631                            {
632                                range_cache.shrink(
633                                    &part_key.0,
634                                    this.cache_policy,
635                                    recently_accessed_range,
636                                );
637                            }
638                        }
639                    }
640                }
641            }
642        }
643    }
644}
645
646/// A converter that helps convert [`StateKey`] to state table sub-PK and convert executor input/output
647/// row to [`StateKey`].
648///
649/// ## Notes
650///
651/// - [`StateKey`]: Over window range cache key type, containing order key and input pk.
652/// - State table sub-PK: State table PK = PK prefix (partition key) + sub-PK (order key + input pk).
653/// - Input/output row: Input schema is the prefix of output schema.
654///
655/// You can see that the content of [`StateKey`] is very similar to state table sub-PK. There's only
656/// one difference: the state table PK and sub-PK don't have duplicated columns, while in [`StateKey`],
657/// `order_key` and (input)`pk` may contain duplicated columns.
658#[derive(Debug, Clone, Copy)]
659pub(super) struct RowConverter<'a> {
660    state_key_to_table_sub_pk_proj: &'a [usize],
661    order_key_indices: &'a [usize],
662    order_key_data_types: &'a [DataType],
663    order_key_order_types: &'a [OrderType],
664    input_stream_key_indices: &'a [usize],
665}
666
667impl<'a> RowConverter<'a> {
668    /// Calculate the indices needed for projection from [`StateKey`] to state table sub-PK (used to do
669    /// prefixed table scanning). Ideally this function should be called only once by each executor instance.
670    /// The projection indices vec is the *selected column indices* in [`StateKey`].`order_key.chain(input_pk)`.
671    pub(super) fn calc_state_key_to_table_sub_pk_proj(
672        partition_key_indices: &[usize],
673        order_key_indices: &[usize],
674        input_stream_key_indices: &'a [usize],
675    ) -> Vec<usize> {
676        // This process is corresponding to `StreamOverWindow::infer_state_table`.
677        let mut projection =
678            Vec::with_capacity(order_key_indices.len() + input_stream_key_indices.len());
679        let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
680        for (proj_idx, key_idx) in order_key_indices
681            .iter()
682            .chain(input_stream_key_indices.iter())
683            .enumerate()
684        {
685            if col_dedup.insert(*key_idx) {
686                projection.push(proj_idx);
687            }
688        }
689        projection.shrink_to_fit();
690        projection
691    }
692
693    /// Convert [`StateKey`] to sub-PK (table PK without partition key) as [`OwnedRow`].
694    pub(super) fn state_key_to_table_sub_pk(
695        &self,
696        key: &StateKey,
697    ) -> StreamExecutorResult<OwnedRow> {
698        Ok(memcmp_encoding::decode_row(
699            &key.order_key,
700            self.order_key_data_types,
701            self.order_key_order_types,
702        )?
703        .chain(key.pk.as_inner())
704        .project(self.state_key_to_table_sub_pk_proj)
705        .into_owned_row())
706    }
707
708    /// Convert full input/output row to [`StateKey`].
709    pub(super) fn row_to_state_key(
710        &self,
711        full_row: impl Row + Copy,
712    ) -> StreamExecutorResult<StateKey> {
713        Ok(StateKey {
714            order_key: memcmp_encoding::encode_row(
715                full_row.project(self.order_key_indices),
716                self.order_key_order_types,
717            )?,
718            pk: full_row
719                .project(self.input_stream_key_indices)
720                .into_owned_row()
721                .into(),
722        })
723    }
724}