risingwave_stream/executor/dedup/
append_only_dedup.rs1use futures::stream;
16use itertools::Itertools;
17use risingwave_common::array::Op;
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::row::RowExt;
20
21use crate::cache::ManagedLruCache;
22use crate::common::metrics::MetricsInfo;
23use crate::executor::prelude::*;
24
25type Cache = ManagedLruCache<OwnedRow, ()>;
26
27pub struct AppendOnlyDedupExecutor<S: StateStore> {
30 ctx: ActorContextRef,
31
32 input: Option<Executor>,
33 dedup_cols: Vec<usize>,
34 state_table: StateTable<S>,
35 cache: Cache,
36}
37
38impl<S: StateStore> AppendOnlyDedupExecutor<S> {
39 pub fn new(
40 ctx: ActorContextRef,
41 input: Executor,
42 dedup_cols: Vec<usize>,
43 state_table: StateTable<S>,
44 watermark_epoch: AtomicU64Ref,
45 metrics: Arc<StreamingMetrics>,
46 ) -> Self {
47 let metrics_info =
48 MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
49 Self {
50 ctx,
51 input: Some(input),
52 dedup_cols,
53 state_table,
54 cache: Cache::unbounded(watermark_epoch, metrics_info),
55 }
56 }
57
58 #[try_stream(ok = Message, error = StreamExecutorError)]
59 async fn executor_inner(mut self) {
60 let mut input = self.input.take().unwrap().execute();
61
62 let barrier = expect_first_barrier(&mut input).await?;
64 let first_epoch = barrier.epoch;
65 yield Message::Barrier(barrier);
67 self.state_table.init_epoch(first_epoch).await?;
68
69 #[for_await]
70 for msg in input {
71 self.cache.evict();
72
73 match msg? {
74 Message::Chunk(chunk) => {
75 debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));
77
78 let dedup_keys = chunk
80 .data_chunk()
81 .rows_with_holes()
82 .map(|row_ref| {
83 row_ref.map(|row| row.project(&self.dedup_cols).to_owned_row())
84 })
85 .collect_vec();
86
87 self.populate_cache(dedup_keys.iter().flatten()).await?;
90
91 let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
93 for key in dedup_keys {
94 match key {
95 Some(key) => {
96 if self.cache.put(key, ()).is_none() {
97 vis_builder.append(true);
102 } else {
103 vis_builder.append(false);
105 }
106 }
107 None => {
108 vis_builder.append(false);
110 }
111 }
112 }
113
114 let vis = vis_builder.finish();
115 if vis.count_ones() > 0 {
116 let (ops, columns, _) = chunk.into_inner();
118 let chunk = StreamChunk::with_visibility(ops, columns, vis);
119 self.state_table.write_chunk(chunk.clone());
120 self.state_table.try_flush().await?;
121
122 yield Message::Chunk(chunk);
123 }
124 }
125
126 Message::Barrier(barrier) => {
127 let post_commit = self.state_table.commit(barrier.epoch).await?;
128
129 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
130 yield Message::Barrier(barrier);
131
132 if let Some((_, cache_may_stale)) =
133 post_commit.post_yield_barrier(update_vnode_bitmap).await?
134 {
135 if cache_may_stale {
136 self.cache.clear();
137 }
138 }
139 }
140
141 Message::Watermark(watermark) => {
142 yield Message::Watermark(watermark);
143 }
144 }
145 }
146 }
147
148 pub async fn populate_cache<'a>(
150 &mut self,
151 keys: impl Iterator<Item = &'a OwnedRow>,
152 ) -> StreamExecutorResult<()> {
153 let mut futures = vec![];
154 for key in keys {
155 if self.cache.contains(key) {
156 continue;
157 }
158
159 let table = &self.state_table;
160 futures.push(async move { (key, table.get_encoded_row(key).await) });
161 }
162
163 if !futures.is_empty() {
164 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
165 while let Some(result) = buffered.next().await {
166 let (key, value) = result;
167 if value?.is_some() {
168 self.cache.put(key.to_owned(), ());
170 }
171 }
172 }
173
174 Ok(())
175 }
176}
177
178impl<S: StateStore> Execute for AppendOnlyDedupExecutor<S> {
179 fn execute(self: Box<Self>) -> BoxedMessageStream {
180 self.executor_inner().boxed()
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use std::sync::atomic::AtomicU64;
187
188 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
189 use risingwave_common::test_prelude::StreamChunkTestExt;
190 use risingwave_common::util::epoch::test_epoch;
191 use risingwave_common::util::sort_util::OrderType;
192 use risingwave_storage::memory::MemoryStateStore;
193
194 use super::*;
195 use crate::common::table::test_utils::gen_pbtable;
196 use crate::executor::test_utils::MockSource;
197
198 #[tokio::test]
199 async fn test_dedup_executor() {
200 let table_id = TableId::new(1);
201 let column_descs = vec![
202 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
203 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
204 ];
205 let schema = Schema::new(vec![
206 Field::unnamed(DataType::Int64),
207 Field::unnamed(DataType::Int64),
208 ]);
209 let dedup_col_indices = vec![0];
210 let pk_indices = dedup_col_indices.clone();
211 let order_types = vec![OrderType::ascending()];
212
213 let state_store = MemoryStateStore::new();
214 let state_table = StateTable::from_table_catalog(
215 &gen_pbtable(table_id, column_descs, order_types, pk_indices.clone(), 0),
216 state_store,
217 None,
218 )
219 .await;
220
221 let (mut tx, source) = MockSource::channel();
222 let source = source.into_executor(schema, pk_indices);
223 let mut dedup_executor = AppendOnlyDedupExecutor::new(
224 ActorContext::for_test(123),
225 source,
226 dedup_col_indices,
227 state_table,
228 Arc::new(AtomicU64::new(0)),
229 Arc::new(StreamingMetrics::unused()),
230 )
231 .boxed()
232 .execute();
233
234 tx.push_barrier(test_epoch(1), false);
235 dedup_executor.next().await.unwrap().unwrap();
236
237 let chunk = StreamChunk::from_pretty(
238 " I I
239 + 1 1
240 + 2 2 D
241 + 1 7",
242 );
243 tx.push_chunk(chunk);
244 let msg = dedup_executor.next().await.unwrap().unwrap();
245 assert_eq!(
246 msg.into_chunk().unwrap(),
247 StreamChunk::from_pretty(
248 " I I
249 + 1 1
250 + 2 2 D
251 + 1 7 D",
252 )
253 );
254
255 tx.push_barrier(test_epoch(2), false);
256 dedup_executor.next().await.unwrap().unwrap();
257
258 let chunk = StreamChunk::from_pretty(
259 " I I
260 + 3 9
261 + 2 5
262 + 1 20",
263 );
264 tx.push_chunk(chunk);
265 let msg = dedup_executor.next().await.unwrap().unwrap();
266 assert_eq!(
267 msg.into_chunk().unwrap(),
268 StreamChunk::from_pretty(
269 " I I
270 + 3 9
271 + 2 5
272 + 1 20 D",
273 )
274 );
275 }
276}