risingwave_stream/executor/eowc/
sort_buffer.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::marker::PhantomData;
17use std::ops::Bound;
18
19use anyhow::Context;
20use bytes::Bytes;
21use futures::StreamExt;
22use futures_async_stream::{for_await, try_stream};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::array::stream_record::Record;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::row::{self, OwnedRow, Row, RowExt};
27use risingwave_common::types::{
28    DefaultOrd, DefaultOrdered, ScalarImpl, ScalarRefImpl, ToOwnedDatum,
29};
30use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33use risingwave_storage::table::KeyedRow;
34use risingwave_storage::table::merge_sort::merge_sort;
35
36use crate::common::state_cache::{StateCache, StateCacheFiller, TopNStateCache};
37use crate::common::table::state_table::StateTable;
38use crate::executor::{StreamExecutorError, StreamExecutorResult};
39
40type CacheKey = (
41    DefaultOrdered<ScalarImpl>, // sort (watermark) column value
42    MemcmpEncoded,              // memcmp-encoded pk
43);
44
45fn row_to_cache_key<S: StateStore>(
46    sort_column_index: usize,
47    row: impl Row,
48    buffer_table: &StateTable<S>,
49) -> CacheKey {
50    let timestamp_val = row
51        .datum_at(sort_column_index)
52        .to_owned_datum()
53        .expect("watermark column is expected to be non-null");
54    let mut pk = vec![];
55    buffer_table
56        .pk_serde()
57        .serialize((&row).project(buffer_table.pk_indices()), &mut pk);
58    (timestamp_val.into(), pk.into())
59}
60
61// TODO(rc): need to make this configurable?
62const CACHE_CAPACITY: usize = 2048;
63
64/// [`SortBuffer`] is a common component that consume an unordered stream and produce an ordered
65/// stream by watermark. This component maintains a buffer table passed in, whose schema is same as
66/// [`SortBuffer`]'s input and output. Generally, the component acts as a buffer that output the
67/// data it received with a delay, commonly used to implement emit-on-window-close policy.
68pub struct SortBuffer<S: StateStore> {
69    /// The timestamp column to sort on.
70    sort_column_index: usize,
71
72    /// Cache of buffer table.
73    cache: TopNStateCache<CacheKey, OwnedRow>,
74
75    _phantom: PhantomData<S>,
76}
77
78impl<S: StateStore> SortBuffer<S> {
79    /// Create a new [`SortBuffer`].
80    pub fn new(sort_column_index: usize, buffer_table: &StateTable<S>) -> Self {
81        assert_eq!(
82            sort_column_index,
83            buffer_table.pk_indices()[0],
84            "the column to sort on must be the first pk column of the buffer table"
85        );
86
87        Self {
88            sort_column_index,
89            cache: TopNStateCache::new(CACHE_CAPACITY),
90            _phantom: PhantomData,
91        }
92    }
93
94    /// Insert a new row into the buffer.
95    pub fn insert(&mut self, new_row: impl Row, buffer_table: &mut StateTable<S>) {
96        buffer_table.insert(&new_row);
97        let key = row_to_cache_key(self.sort_column_index, &new_row, buffer_table);
98        self.cache.insert(key, new_row.into_owned_row());
99    }
100
101    /// Delete a row from the buffer.
102    pub fn delete(&mut self, old_row: impl Row, buffer_table: &mut StateTable<S>) {
103        buffer_table.delete(&old_row);
104        let key = row_to_cache_key(self.sort_column_index, &old_row, buffer_table);
105        self.cache.delete(&key);
106    }
107
108    /// Update a row in the buffer.
109    pub fn update(
110        &mut self,
111        old_row: impl Row,
112        new_row: impl Row,
113        buffer_table: &mut StateTable<S>,
114    ) {
115        buffer_table.update(&old_row, &new_row);
116        let key = row_to_cache_key(self.sort_column_index, &old_row, buffer_table);
117        self.cache.delete(&key);
118        self.cache.insert(key, new_row.into_owned_row());
119    }
120
121    /// Apply a change to the buffer, insert/delete/update.
122    pub fn apply_change(&mut self, change: Record<impl Row>, buffer_table: &mut StateTable<S>) {
123        match change {
124            Record::Insert { new_row } => self.insert(new_row, buffer_table),
125            Record::Delete { old_row } => self.delete(old_row, buffer_table),
126            Record::Update { old_row, new_row } => self.update(old_row, new_row, buffer_table),
127        }
128    }
129
130    /// Apply a stream chunk to the buffer.
131    pub fn apply_chunk(&mut self, chunk: StreamChunk, buffer_table: &mut StateTable<S>) {
132        for record in chunk.records() {
133            self.apply_change(record, buffer_table);
134        }
135    }
136
137    /// Consume rows under `watermark` from the buffer.
138    #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
139    pub async fn consume<'a>(
140        &'a mut self,
141        watermark: ScalarImpl,
142        buffer_table: &'a mut StateTable<S>,
143    ) {
144        let mut last_table_pk = None;
145        loop {
146            if !self.cache.is_synced() {
147                // Refill the cache, then consume from the cache, to ensure strong row ordering
148                // and prefetch for the next watermark.
149                self.refill_cache(last_table_pk.take(), buffer_table)
150                    .await?;
151            }
152
153            #[for_await]
154            for res in self.consume_from_cache(watermark.as_scalar_ref_impl()) {
155                let row = res?;
156                last_table_pk = Some((&row).project(buffer_table.pk_indices()).into_owned_row());
157                yield row;
158            }
159
160            if self.cache.is_synced() {
161                // The cache is still synced after consuming, meaning that there is no more rows
162                // under the watermark to yield.
163                break;
164            }
165        }
166
167        // TODO(rc): Need something like `table.range_delete()`. Here we call
168        // `update_watermark(watermark)` as an alternative to `range_delete((..watermark))`.
169        buffer_table.update_watermark(watermark);
170    }
171
172    #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
173    async fn consume_from_cache<'a>(&'a mut self, watermark: ScalarRefImpl<'a>) {
174        while self.cache.is_synced() {
175            let Some(key) = self.cache.first_key_value().map(|(k, _)| k.clone()) else {
176                break;
177            };
178            if key.0.as_scalar_ref_impl().default_cmp(&watermark).is_lt() {
179                let row = self.cache.delete(&key).unwrap();
180                yield row;
181            } else {
182                break;
183            }
184        }
185    }
186
187    /// Clear the cache and refill it with the current content of the buffer table.
188    pub async fn refill_cache(
189        &mut self,
190        last_table_pk: Option<OwnedRow>,
191        buffer_table: &StateTable<S>,
192    ) -> StreamExecutorResult<()> {
193        let mut filler = self.cache.begin_syncing();
194
195        let pk_range = (
196            last_table_pk
197                .map(Bound::Excluded)
198                .unwrap_or(Bound::Unbounded),
199            Bound::<row::Empty>::Unbounded,
200        );
201
202        let streams: Vec<_> =
203            futures::future::try_join_all(buffer_table.vnodes().iter_vnodes().map(|vnode| {
204                buffer_table.iter_keyed_row_with_vnode(
205                    vnode,
206                    &pk_range,
207                    PrefetchOptions::new(filler.capacity().is_none(), false),
208                )
209            }))
210            .await?
211            .into_iter()
212            .map(Box::pin)
213            .collect();
214
215        #[for_await]
216        for kv in merge_sort(streams).take(filler.capacity().unwrap_or(usize::MAX)) {
217            let row = key_value_to_full_row(kv?, buffer_table)?;
218            let key = row_to_cache_key(self.sort_column_index, &row, buffer_table);
219            filler.insert_unchecked(key, row);
220        }
221
222        filler.finish();
223        Ok(())
224    }
225}
226
227/// Merge the key part and value part of a row into a full row. This is needed for state table with
228/// non-None value indices.
229fn key_value_to_full_row<S: StateStore>(
230    keyed_row: KeyedRow<Bytes>,
231    table: &StateTable<S>,
232) -> StreamExecutorResult<OwnedRow> {
233    let Some(val_indices) = table.value_indices() else {
234        return Ok(keyed_row.into_owned_row());
235    };
236    let pk_indices = table.pk_indices();
237    let indices: BTreeSet<_> = val_indices
238        .iter()
239        .chain(pk_indices.iter())
240        .copied()
241        .collect();
242    let len = indices.iter().max().unwrap() + 1;
243    assert!(indices.iter().copied().eq(0..len));
244
245    let mut row = vec![None; len];
246    let key = table
247        .pk_serde()
248        .deserialize(keyed_row.key())
249        .context("failed to deserialize pk")?;
250    for (i, v) in key.into_iter().enumerate() {
251        row[pk_indices[i]] = v;
252    }
253    for (i, v) in keyed_row.into_owned_row().into_iter().enumerate() {
254        row[val_indices[i]] = v;
255    }
256    Ok(OwnedRow::new(row))
257}