risingwave_stream/executor/eowc/
sort_buffer.rs1use std::collections::BTreeSet;
16use std::marker::PhantomData;
17use std::ops::Bound;
18
19use anyhow::Context;
20use bytes::Bytes;
21use futures::StreamExt;
22use futures_async_stream::{for_await, try_stream};
23use risingwave_common::array::StreamChunk;
24use risingwave_common::array::stream_record::Record;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::row::{self, OwnedRow, Row, RowExt};
27use risingwave_common::types::{
28 DefaultOrd, DefaultOrdered, ScalarImpl, ScalarRefImpl, ToOwnedDatum,
29};
30use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33use risingwave_storage::table::KeyedRow;
34use risingwave_storage::table::merge_sort::merge_sort;
35
36use crate::common::state_cache::{StateCache, StateCacheFiller, TopNStateCache};
37use crate::common::table::state_table::StateTable;
38use crate::executor::{StreamExecutorError, StreamExecutorResult};
39
40type CacheKey = (
41 DefaultOrdered<ScalarImpl>, MemcmpEncoded, );
44
45fn row_to_cache_key<S: StateStore>(
46 sort_column_index: usize,
47 row: impl Row,
48 buffer_table: &StateTable<S>,
49) -> CacheKey {
50 let timestamp_val = row
51 .datum_at(sort_column_index)
52 .to_owned_datum()
53 .expect("watermark column is expected to be non-null");
54 let mut pk = vec![];
55 buffer_table
56 .pk_serde()
57 .serialize((&row).project(buffer_table.pk_indices()), &mut pk);
58 (timestamp_val.into(), pk.into())
59}
60
61const CACHE_CAPACITY: usize = 2048;
63
64pub struct SortBuffer<S: StateStore> {
69 sort_column_index: usize,
71
72 cache: TopNStateCache<CacheKey, OwnedRow>,
74
75 _phantom: PhantomData<S>,
76}
77
78impl<S: StateStore> SortBuffer<S> {
79 pub fn new(sort_column_index: usize, buffer_table: &StateTable<S>) -> Self {
81 assert_eq!(
82 sort_column_index,
83 buffer_table.pk_indices()[0],
84 "the column to sort on must be the first pk column of the buffer table"
85 );
86
87 Self {
88 sort_column_index,
89 cache: TopNStateCache::new(CACHE_CAPACITY),
90 _phantom: PhantomData,
91 }
92 }
93
94 pub fn insert(&mut self, new_row: impl Row, buffer_table: &mut StateTable<S>) {
96 buffer_table.insert(&new_row);
97 let key = row_to_cache_key(self.sort_column_index, &new_row, buffer_table);
98 self.cache.insert(key, new_row.into_owned_row());
99 }
100
101 pub fn delete(&mut self, old_row: impl Row, buffer_table: &mut StateTable<S>) {
103 buffer_table.delete(&old_row);
104 let key = row_to_cache_key(self.sort_column_index, &old_row, buffer_table);
105 self.cache.delete(&key);
106 }
107
108 pub fn update(
110 &mut self,
111 old_row: impl Row,
112 new_row: impl Row,
113 buffer_table: &mut StateTable<S>,
114 ) {
115 buffer_table.update(&old_row, &new_row);
116 let key = row_to_cache_key(self.sort_column_index, &old_row, buffer_table);
117 self.cache.delete(&key);
118 self.cache.insert(key, new_row.into_owned_row());
119 }
120
121 pub fn apply_change(&mut self, change: Record<impl Row>, buffer_table: &mut StateTable<S>) {
123 match change {
124 Record::Insert { new_row } => self.insert(new_row, buffer_table),
125 Record::Delete { old_row } => self.delete(old_row, buffer_table),
126 Record::Update { old_row, new_row } => self.update(old_row, new_row, buffer_table),
127 }
128 }
129
130 pub fn apply_chunk(&mut self, chunk: StreamChunk, buffer_table: &mut StateTable<S>) {
132 for record in chunk.records() {
133 self.apply_change(record, buffer_table);
134 }
135 }
136
137 #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
139 pub async fn consume<'a>(
140 &'a mut self,
141 watermark: ScalarImpl,
142 buffer_table: &'a mut StateTable<S>,
143 ) {
144 let mut last_table_pk = None;
145 loop {
146 if !self.cache.is_synced() {
147 self.refill_cache(last_table_pk.take(), buffer_table)
150 .await?;
151 }
152
153 #[for_await]
154 for res in self.consume_from_cache(watermark.as_scalar_ref_impl()) {
155 let row = res?;
156 last_table_pk = Some((&row).project(buffer_table.pk_indices()).into_owned_row());
157 yield row;
158 }
159
160 if self.cache.is_synced() {
161 break;
164 }
165 }
166
167 buffer_table.update_watermark(watermark);
170 }
171
172 #[try_stream(ok = OwnedRow, error = StreamExecutorError)]
173 async fn consume_from_cache<'a>(&'a mut self, watermark: ScalarRefImpl<'a>) {
174 while self.cache.is_synced() {
175 let Some(key) = self.cache.first_key_value().map(|(k, _)| k.clone()) else {
176 break;
177 };
178 if key.0.as_scalar_ref_impl().default_cmp(&watermark).is_lt() {
179 let row = self.cache.delete(&key).unwrap();
180 yield row;
181 } else {
182 break;
183 }
184 }
185 }
186
187 pub async fn refill_cache(
189 &mut self,
190 last_table_pk: Option<OwnedRow>,
191 buffer_table: &StateTable<S>,
192 ) -> StreamExecutorResult<()> {
193 let mut filler = self.cache.begin_syncing();
194
195 let pk_range = (
196 last_table_pk
197 .map(Bound::Excluded)
198 .unwrap_or(Bound::Unbounded),
199 Bound::<row::Empty>::Unbounded,
200 );
201
202 let streams: Vec<_> =
203 futures::future::try_join_all(buffer_table.vnodes().iter_vnodes().map(|vnode| {
204 buffer_table.iter_keyed_row_with_vnode(
205 vnode,
206 &pk_range,
207 PrefetchOptions::new(filler.capacity().is_none(), false),
208 )
209 }))
210 .await?
211 .into_iter()
212 .map(Box::pin)
213 .collect();
214
215 #[for_await]
216 for kv in merge_sort(streams).take(filler.capacity().unwrap_or(usize::MAX)) {
217 let row = key_value_to_full_row(kv?, buffer_table)?;
218 let key = row_to_cache_key(self.sort_column_index, &row, buffer_table);
219 filler.insert_unchecked(key, row);
220 }
221
222 filler.finish();
223 Ok(())
224 }
225}
226
227fn key_value_to_full_row<S: StateStore>(
230 keyed_row: KeyedRow<Bytes>,
231 table: &StateTable<S>,
232) -> StreamExecutorResult<OwnedRow> {
233 let Some(val_indices) = table.value_indices() else {
234 return Ok(keyed_row.into_owned_row());
235 };
236 let pk_indices = table.pk_indices();
237 let indices: BTreeSet<_> = val_indices
238 .iter()
239 .chain(pk_indices.iter())
240 .copied()
241 .collect();
242 let len = indices.iter().max().unwrap() + 1;
243 assert!(indices.iter().copied().eq(0..len));
244
245 let mut row = vec![None; len];
246 let key = table
247 .pk_serde()
248 .deserialize(keyed_row.key())
249 .context("failed to deserialize pk")?;
250 for (i, v) in key.into_iter().enumerate() {
251 row[pk_indices[i]] = v;
252 }
253 for (i, v) in keyed_row.into_owned_row().into_iter().enumerate() {
254 row[val_indices[i]] = v;
255 }
256 Ok(OwnedRow::new(row))
257}