risingwave_stream/executor/over_window/
eowc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16use std::ops::Bound;
17
18use itertools::Itertools;
19use risingwave_common::array::stream_record::Record;
20use risingwave_common::array::{ArrayRef, Op};
21use risingwave_common::row::RowExt;
22use risingwave_common::types::{ToDatumRef, ToOwnedDatum};
23use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
24use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
25use risingwave_common::util::sort_util::OrderType;
26use risingwave_common::{must_match, row};
27use risingwave_common_estimate_size::EstimateSize;
28use risingwave_common_estimate_size::collections::EstimatedVecDeque;
29use risingwave_expr::window_function::{
30    StateEvictHint, StateKey, WindowFuncCall, WindowStates, create_window_state,
31};
32use risingwave_storage::store::PrefetchOptions;
33
34use crate::cache::ManagedLruCache;
35use crate::common::metrics::MetricsInfo;
36use crate::executor::prelude::*;
37
38struct Partition {
39    states: WindowStates,
40    curr_row_buffer: EstimatedVecDeque<OwnedRow>,
41}
42
43impl EstimateSize for Partition {
44    fn estimated_heap_size(&self) -> usize {
45        let mut total_size = self.curr_row_buffer.estimated_heap_size();
46        for state in self.states.iter() {
47            total_size += state.estimated_heap_size();
48        }
49        total_size
50    }
51}
52
53type PartitionCache = ManagedLruCache<MemcmpEncoded, Partition>; // TODO(rc): use `K: HashKey` as key like in hash agg?
54
55/// [`EowcOverWindowExecutor`] consumes ordered input (on order key column with watermark in
56/// ascending order) and outputs window function results. One [`EowcOverWindowExecutor`] can handle
57/// one combination of partition key and order key.
58///
59/// The reason not to use [`SortBuffer`] is that the table schemas of [`EowcOverWindowExecutor`] and
60/// [`SortBuffer`] are different, since we don't have something like a _grouped_ sort buffer.
61///
62/// [`SortBuffer`]: crate::executor::eowc::SortBuffer
63///
64/// Basic idea:
65///
66/// ```text
67/// ──────────────┬────────────────────────────────────────────────────── curr evict row
68///               │ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING
69///        (1)    │ ─┬─
70///               │  │RANGE BETWEEN '1hr' PRECEDING AND '1hr' FOLLOWING
71///        ─┬─    │  │ ─┬─
72///   LAG(1)│        │  │
73/// ────────┴──┬─────┼──┼──────────────────────────────────────────────── curr output row
74///     LEAD(1)│     │  │GROUPS 1 PRECEDING AND 1 FOLLOWING
75///                  │
76///                  │ (2)
77/// ─────────────────┴─────────────────────────────────────────────────── curr input row
78/// (1): additional buffered input (unneeded) for some window
79/// (2): additional delay (already able to output) for some window
80/// ```
81///
82/// - State table schema = input schema, state table pk = `partition key | order key | input pk`.
83/// - Output schema = input schema + window function results.
84/// - Rows in range (`curr evict row`, `curr input row`] are in state table.
85/// - `curr evict row` <= min(last evict rows of all `WindowState`s).
86/// - `WindowState` should output agg result for `curr output row`.
87/// - Recover: iterate through state table, push rows to `WindowState`, ignore ready windows.
88pub struct EowcOverWindowExecutor<S: StateStore> {
89    input: Executor,
90    inner: ExecutorInner<S>,
91}
92
93struct ExecutorInner<S: StateStore> {
94    actor_ctx: ActorContextRef,
95
96    schema: Schema,
97    calls: Vec<WindowFuncCall>,
98    input_pk_indices: Vec<usize>,
99    partition_key_indices: Vec<usize>,
100    order_key_index: usize, // no `OrderType` here, cuz we expect the input is ascending
101    state_table: StateTable<S>,
102    state_table_schema_len: usize,
103    watermark_sequence: AtomicU64Ref,
104}
105
106struct ExecutionVars<S: StateStore> {
107    partitions: PartitionCache,
108    _phantom: PhantomData<S>,
109}
110
111impl<S: StateStore> Execute for EowcOverWindowExecutor<S> {
112    fn execute(self: Box<Self>) -> BoxedMessageStream {
113        self.executor_inner().boxed()
114    }
115}
116
117pub struct EowcOverWindowExecutorArgs<S: StateStore> {
118    pub actor_ctx: ActorContextRef,
119
120    pub input: Executor,
121
122    pub schema: Schema,
123    pub calls: Vec<WindowFuncCall>,
124    pub partition_key_indices: Vec<usize>,
125    pub order_key_index: usize,
126    pub state_table: StateTable<S>,
127    pub watermark_epoch: AtomicU64Ref,
128}
129
130impl<S: StateStore> EowcOverWindowExecutor<S> {
131    pub fn new(args: EowcOverWindowExecutorArgs<S>) -> Self {
132        let input_info = args.input.info().clone();
133
134        Self {
135            input: args.input,
136            inner: ExecutorInner {
137                actor_ctx: args.actor_ctx,
138                schema: args.schema,
139                calls: args.calls,
140                input_pk_indices: input_info.pk_indices,
141                partition_key_indices: args.partition_key_indices,
142                order_key_index: args.order_key_index,
143                state_table: args.state_table,
144                state_table_schema_len: input_info.schema.len(),
145                watermark_sequence: args.watermark_epoch,
146            },
147        }
148    }
149
150    async fn ensure_key_in_cache(
151        this: &ExecutorInner<S>,
152        cache: &mut PartitionCache,
153        partition_key: impl Row,
154        encoded_partition_key: &MemcmpEncoded,
155    ) -> StreamExecutorResult<()> {
156        if cache.contains(encoded_partition_key) {
157            return Ok(());
158        }
159
160        let mut partition = Partition {
161            states: WindowStates::new(this.calls.iter().map(create_window_state).try_collect()?),
162            curr_row_buffer: Default::default(),
163        };
164
165        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
166        // Recover states from state table.
167        let table_iter = this
168            .state_table
169            .iter_with_prefix(partition_key, sub_range, PrefetchOptions::default())
170            .await?;
171
172        #[for_await]
173        for keyed_row in table_iter {
174            let row = keyed_row?.into_owned_row();
175            let order_key_enc = memcmp_encoding::encode_row(
176                row::once(Some(
177                    row.datum_at(this.order_key_index)
178                        .expect("order key column must be non-NULL")
179                        .into_scalar_impl(),
180                )),
181                &[OrderType::ascending()],
182            )?;
183            let pk = (&row).project(&this.input_pk_indices).into_owned_row();
184            let key = StateKey {
185                order_key: order_key_enc,
186                pk: pk.into(),
187            };
188            for (call, state) in this.calls.iter().zip_eq_fast(partition.states.iter_mut()) {
189                state.append(
190                    key.clone(),
191                    (&row)
192                        .project(call.args.val_indices())
193                        .into_owned_row()
194                        .as_inner()
195                        .into(),
196                );
197            }
198            partition.curr_row_buffer.push_back(row);
199        }
200
201        // Ensure states correctness.
202        assert!(partition.states.are_aligned());
203
204        // Ignore ready windows (all ready windows were outputted before).
205        while partition.states.are_ready() {
206            partition.states.just_slide()?;
207            partition.curr_row_buffer.pop_front();
208        }
209
210        cache.put(encoded_partition_key.clone(), partition);
211        Ok(())
212    }
213
214    async fn apply_chunk(
215        this: &mut ExecutorInner<S>,
216        vars: &mut ExecutionVars<S>,
217        chunk: StreamChunk,
218    ) -> StreamExecutorResult<Option<StreamChunk>> {
219        let mut builders = this.schema.create_array_builders(chunk.capacity()); // just an estimate
220
221        // We assume that the input is sorted by order key.
222        for record in chunk.records() {
223            let input_row = must_match!(record, Record::Insert { new_row } => new_row);
224
225            let partition_key = input_row
226                .project(&this.partition_key_indices)
227                .into_owned_row();
228            let encoded_partition_key = memcmp_encoding::encode_row(
229                &partition_key,
230                &vec![OrderType::ascending(); this.partition_key_indices.len()],
231            )?;
232
233            // Get the partition.
234            Self::ensure_key_in_cache(
235                this,
236                &mut vars.partitions,
237                &partition_key,
238                &encoded_partition_key,
239            )
240            .await?;
241            let partition: &mut Partition =
242                &mut vars.partitions.get_mut(&encoded_partition_key).unwrap();
243
244            // Materialize input to state table.
245            this.state_table.insert(input_row);
246
247            // Feed the row to all window states.
248            let order_key_enc = memcmp_encoding::encode_row(
249                row::once(Some(
250                    input_row
251                        .datum_at(this.order_key_index)
252                        .expect("order key column must be non-NULL")
253                        .into_scalar_impl(),
254                )),
255                &[OrderType::ascending()],
256            )?;
257            let pk = input_row.project(&this.input_pk_indices).into_owned_row();
258            let key = StateKey {
259                order_key: order_key_enc,
260                pk: pk.into(),
261            };
262            for (call, state) in this.calls.iter().zip_eq_fast(partition.states.iter_mut()) {
263                state.append(
264                    key.clone(),
265                    input_row
266                        .project(call.args.val_indices())
267                        .into_owned_row()
268                        .as_inner()
269                        .into(),
270                );
271            }
272            partition
273                .curr_row_buffer
274                .push_back(input_row.into_owned_row());
275
276            while partition.states.are_ready() {
277                // The partition is ready to output, so we can produce a row.
278
279                // Get all outputs.
280                let (ret_values, evict_hint) = partition.states.slide()?;
281                let curr_row = partition
282                    .curr_row_buffer
283                    .pop_front()
284                    .expect("ready window must have corresponding current row");
285
286                // Append to output builders.
287                for (builder, datum) in builders.iter_mut().zip_eq_debug(
288                    curr_row
289                        .iter()
290                        .chain(ret_values.iter().map(|v| v.to_datum_ref())),
291                ) {
292                    builder.append(datum);
293                }
294
295                // Evict unneeded rows from state table.
296                if let StateEvictHint::CanEvict(keys_to_evict) = evict_hint {
297                    for key in keys_to_evict {
298                        let order_key = memcmp_encoding::decode_row(
299                            &key.order_key,
300                            &[this.schema[this.order_key_index].data_type()],
301                            &[OrderType::ascending()],
302                        )?;
303                        let state_row_pk = (&partition_key).chain(order_key).chain(key.pk);
304                        let state_row = {
305                            // FIXME(rc): quite hacky here, we may need `state_table.delete_by_pk`
306                            let mut state_row = vec![None; this.state_table_schema_len];
307                            for (i_in_pk, &i) in this.state_table.pk_indices().iter().enumerate() {
308                                state_row[i] = state_row_pk.datum_at(i_in_pk).to_owned_datum();
309                            }
310                            OwnedRow::new(state_row)
311                        };
312                        // NOTE: We don't know the value of the row here, so the table must allow
313                        // inconsistent ops.
314                        this.state_table.delete(state_row);
315                    }
316                }
317            }
318        }
319
320        let columns: Vec<ArrayRef> = builders.into_iter().map(|b| b.finish().into()).collect();
321        let chunk_size = columns[0].len();
322        Ok(if chunk_size > 0 {
323            Some(StreamChunk::new(vec![Op::Insert; chunk_size], columns))
324        } else {
325            None
326        })
327    }
328
329    #[try_stream(ok = Message, error = StreamExecutorError)]
330    async fn executor_inner(self) {
331        let EowcOverWindowExecutor {
332            input,
333            inner: mut this,
334        } = self;
335
336        let metrics_info = MetricsInfo::new(
337            this.actor_ctx.streaming_metrics.clone(),
338            this.state_table.table_id(),
339            this.actor_ctx.id,
340            "EowcOverWindow",
341        );
342
343        let mut vars = ExecutionVars {
344            partitions: ManagedLruCache::unbounded(this.watermark_sequence.clone(), metrics_info),
345            _phantom: PhantomData::<S>,
346        };
347
348        let mut input = input.execute();
349        let barrier = expect_first_barrier(&mut input).await?;
350        let first_epoch = barrier.epoch;
351        yield Message::Barrier(barrier);
352        this.state_table.init_epoch(first_epoch).await?;
353
354        #[for_await]
355        for msg in input {
356            let msg = msg?;
357            match msg {
358                Message::Watermark(_) => {
359                    continue;
360                }
361                Message::Chunk(chunk) => {
362                    let output_chunk = Self::apply_chunk(&mut this, &mut vars, chunk).await?;
363                    if let Some(chunk) = output_chunk {
364                        yield Message::Chunk(chunk);
365                    }
366                    this.state_table.try_flush().await?;
367                }
368                Message::Barrier(barrier) => {
369                    let post_commit = this.state_table.commit(barrier.epoch).await?;
370                    vars.partitions.evict();
371
372                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
373                    yield Message::Barrier(barrier);
374
375                    if let Some((_, cache_may_stale)) =
376                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
377                    {
378                        if cache_may_stale {
379                            vars.partitions.clear();
380                        }
381                    }
382                }
383            }
384        }
385    }
386}