risingwave_stream/executor/dedup/
append_only_dedup.rs1use futures::stream;
16use itertools::Itertools;
17use risingwave_common::array::Op;
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::row::RowExt;
20
21use crate::cache::ManagedLruCache;
22use crate::common::metrics::MetricsInfo;
23use crate::executor::prelude::*;
24
25type Cache = ManagedLruCache<OwnedRow, ()>;
26
27pub struct AppendOnlyDedupExecutor<S: StateStore> {
30 ctx: ActorContextRef,
31
32 input: Option<Executor>,
33 dedup_cols: Vec<usize>,
34 state_table: StateTable<S>,
35 cache: Cache,
36}
37
38impl<S: StateStore> AppendOnlyDedupExecutor<S> {
39 pub fn new(
40 ctx: ActorContextRef,
41 input: Executor,
42 dedup_cols: Vec<usize>,
43 state_table: StateTable<S>,
44 watermark_epoch: AtomicU64Ref,
45 metrics: Arc<StreamingMetrics>,
46 ) -> Self {
47 let metrics_info =
48 MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
49 Self {
50 ctx,
51 input: Some(input),
52 dedup_cols,
53 state_table,
54 cache: Cache::unbounded(watermark_epoch, metrics_info),
55 }
56 }
57
58 #[try_stream(ok = Message, error = StreamExecutorError)]
59 async fn executor_inner(mut self) {
60 let mut input = self.input.take().unwrap().execute();
61
62 let barrier = expect_first_barrier(&mut input).await?;
64 let first_epoch = barrier.epoch;
65 yield Message::Barrier(barrier);
67 self.state_table.init_epoch(first_epoch).await?;
68
69 #[for_await]
70 for msg in input {
71 self.cache.evict();
72
73 match msg? {
74 Message::Chunk(chunk) => {
75 debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));
77
78 let dedup_keys = chunk
80 .data_chunk()
81 .rows_with_holes()
82 .map(|row_ref| {
83 row_ref.map(|row| row.project(&self.dedup_cols).to_owned_row())
84 })
85 .collect_vec();
86
87 self.populate_cache(dedup_keys.iter().flatten()).await?;
90
91 let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
93 for key in dedup_keys {
94 match key {
95 Some(key) => {
96 if self.cache.put(key, ()).is_none() {
97 vis_builder.append(true);
102 } else {
103 vis_builder.append(false);
105 }
106 }
107 None => {
108 vis_builder.append(false);
110 }
111 }
112 }
113
114 let vis = vis_builder.finish();
115 if vis.count_ones() > 0 {
116 let (ops, columns, _) = chunk.into_inner();
118 let chunk = StreamChunk::with_visibility(ops, columns, vis);
119 self.state_table.write_chunk(chunk.clone());
120 self.state_table.try_flush().await?;
121
122 yield Message::Chunk(chunk);
123 }
124 }
125
126 Message::Barrier(barrier) => {
127 let post_commit = self.state_table.commit(barrier.epoch).await?;
128
129 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
130 yield Message::Barrier(barrier);
131
132 if let Some((_, cache_may_stale)) =
133 post_commit.post_yield_barrier(update_vnode_bitmap).await?
134 && cache_may_stale
135 {
136 self.cache.clear();
137 }
138 }
139
140 Message::Watermark(watermark) => {
141 yield Message::Watermark(watermark);
142 }
143 }
144 }
145 }
146
147 pub async fn populate_cache<'a>(
149 &mut self,
150 keys: impl Iterator<Item = &'a OwnedRow>,
151 ) -> StreamExecutorResult<()> {
152 let mut futures = vec![];
153 for key in keys {
154 if self.cache.contains(key) {
155 continue;
156 }
157
158 let table = &self.state_table;
159 futures.push(async move { (key, table.get_encoded_row(key).await) });
160 }
161
162 if !futures.is_empty() {
163 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
164 while let Some(result) = buffered.next().await {
165 let (key, value) = result;
166 if value?.is_some() {
167 self.cache.put(key.to_owned(), ());
169 }
170 }
171 }
172
173 Ok(())
174 }
175}
176
177impl<S: StateStore> Execute for AppendOnlyDedupExecutor<S> {
178 fn execute(self: Box<Self>) -> BoxedMessageStream {
179 self.executor_inner().boxed()
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use std::sync::atomic::AtomicU64;
186
187 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
188 use risingwave_common::test_prelude::StreamChunkTestExt;
189 use risingwave_common::util::epoch::test_epoch;
190 use risingwave_common::util::sort_util::OrderType;
191 use risingwave_storage::memory::MemoryStateStore;
192
193 use super::*;
194 use crate::common::table::test_utils::gen_pbtable;
195 use crate::executor::test_utils::MockSource;
196
197 #[tokio::test]
198 async fn test_dedup_executor() {
199 let table_id = TableId::new(1);
200 let column_descs = vec![
201 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
202 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
203 ];
204 let schema = Schema::new(vec![
205 Field::unnamed(DataType::Int64),
206 Field::unnamed(DataType::Int64),
207 ]);
208 let dedup_col_indices = vec![0];
209 let pk_indices = dedup_col_indices.clone();
210 let order_types = vec![OrderType::ascending()];
211
212 let state_store = MemoryStateStore::new();
213 let state_table = StateTable::from_table_catalog(
214 &gen_pbtable(table_id, column_descs, order_types, pk_indices.clone(), 0),
215 state_store,
216 None,
217 )
218 .await;
219
220 let (mut tx, source) = MockSource::channel();
221 let source = source.into_executor(schema, pk_indices);
222 let mut dedup_executor = AppendOnlyDedupExecutor::new(
223 ActorContext::for_test(123),
224 source,
225 dedup_col_indices,
226 state_table,
227 Arc::new(AtomicU64::new(0)),
228 Arc::new(StreamingMetrics::unused()),
229 )
230 .boxed()
231 .execute();
232
233 tx.push_barrier(test_epoch(1), false);
234 dedup_executor.next().await.unwrap().unwrap();
235
236 let chunk = StreamChunk::from_pretty(
237 " I I
238 + 1 1
239 + 2 2 D
240 + 1 7",
241 );
242 tx.push_chunk(chunk);
243 let msg = dedup_executor.next().await.unwrap().unwrap();
244 assert_eq!(
245 msg.into_chunk().unwrap(),
246 StreamChunk::from_pretty(
247 " I I
248 + 1 1
249 + 2 2 D
250 + 1 7 D",
251 )
252 );
253
254 tx.push_barrier(test_epoch(2), false);
255 dedup_executor.next().await.unwrap().unwrap();
256
257 let chunk = StreamChunk::from_pretty(
258 " I I
259 + 3 9
260 + 2 5
261 + 1 20",
262 );
263 tx.push_chunk(chunk);
264 let msg = dedup_executor.next().await.unwrap().unwrap();
265 assert_eq!(
266 msg.into_chunk().unwrap(),
267 StreamChunk::from_pretty(
268 " I I
269 + 3 9
270 + 2 5
271 + 1 20 D",
272 )
273 );
274 }
275}