risingwave_stream/executor/dedup/
append_only_dedup.rs1use futures::stream;
16use itertools::Itertools;
17use risingwave_common::array::Op;
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::row::RowExt;
20
21use super::cache::DedupCache;
22use crate::common::metrics::MetricsInfo;
23use crate::executor::prelude::*;
24
25pub struct AppendOnlyDedupExecutor<S: StateStore> {
28 ctx: ActorContextRef,
29
30 input: Option<Executor>,
31 dedup_cols: Vec<usize>,
32 state_table: StateTable<S>,
33 cache: DedupCache<OwnedRow>,
34}
35
36impl<S: StateStore> AppendOnlyDedupExecutor<S> {
37 pub fn new(
38 ctx: ActorContextRef,
39 input: Executor,
40 dedup_cols: Vec<usize>,
41 state_table: StateTable<S>,
42 watermark_epoch: AtomicU64Ref,
43 metrics: Arc<StreamingMetrics>,
44 ) -> Self {
45 let metrics_info =
46 MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
47 Self {
48 ctx,
49 input: Some(input),
50 dedup_cols,
51 state_table,
52 cache: DedupCache::new(watermark_epoch, metrics_info),
53 }
54 }
55
56 #[try_stream(ok = Message, error = StreamExecutorError)]
57 async fn executor_inner(mut self) {
58 let mut input = self.input.take().unwrap().execute();
59
60 let barrier = expect_first_barrier(&mut input).await?;
62 let first_epoch = barrier.epoch;
63 yield Message::Barrier(barrier);
65 self.state_table.init_epoch(first_epoch).await?;
66
67 #[for_await]
68 for msg in input {
69 self.cache.evict();
70
71 match msg? {
72 Message::Chunk(chunk) => {
73 debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));
75
76 let keys = chunk
78 .data_chunk()
79 .rows_with_holes()
80 .map(|row_ref| {
81 row_ref.map(|row| row.project(&self.dedup_cols).to_owned_row())
82 })
83 .collect_vec();
84
85 self.populate_cache(keys.iter().flatten()).await?;
88
89 let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
91 for key in keys {
92 match key {
93 Some(key) => {
94 if self.cache.dedup_insert(key) {
95 vis_builder.append(true);
97 } else {
98 vis_builder.append(false);
100 }
101 }
102 None => {
103 vis_builder.append(false);
105 }
106 }
107 }
108
109 let vis = vis_builder.finish();
110 if vis.count_ones() > 0 {
111 let (ops, columns, _) = chunk.into_inner();
113 let chunk = StreamChunk::with_visibility(ops, columns, vis);
114 self.state_table.write_chunk(chunk.clone());
115
116 yield Message::Chunk(chunk);
117 }
118 self.state_table.try_flush().await?;
119 }
120
121 Message::Barrier(barrier) => {
122 let post_commit = self.state_table.commit(barrier.epoch).await?;
123
124 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
125 yield Message::Barrier(barrier);
126
127 if let Some((_, cache_may_stale)) =
128 post_commit.post_yield_barrier(update_vnode_bitmap).await?
129 {
130 if cache_may_stale {
131 self.cache.clear();
132 }
133 }
134 }
135
136 Message::Watermark(watermark) => {
137 yield Message::Watermark(watermark);
138 }
139 }
140 }
141 }
142
143 pub async fn populate_cache<'a>(
145 &mut self,
146 keys: impl Iterator<Item = &'a OwnedRow>,
147 ) -> StreamExecutorResult<()> {
148 let mut read_from_storage = false;
149 let mut futures = vec![];
150 for key in keys {
151 if self.cache.contains(key) {
152 continue;
153 }
154 read_from_storage = true;
155
156 let table = &self.state_table;
157 futures.push(async move { (key, table.get_encoded_row(key).await) });
158 }
159
160 if read_from_storage {
161 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
162 while let Some(result) = buffered.next().await {
163 let (key, value) = result;
164 if value?.is_some() {
165 self.cache.insert(key.to_owned());
167 }
168 }
169 }
170
171 Ok(())
172 }
173}
174
175impl<S: StateStore> Execute for AppendOnlyDedupExecutor<S> {
176 fn execute(self: Box<Self>) -> BoxedMessageStream {
177 self.executor_inner().boxed()
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::sync::atomic::AtomicU64;
184
185 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
186 use risingwave_common::test_prelude::StreamChunkTestExt;
187 use risingwave_common::util::epoch::test_epoch;
188 use risingwave_common::util::sort_util::OrderType;
189 use risingwave_storage::memory::MemoryStateStore;
190
191 use super::*;
192 use crate::common::table::test_utils::gen_pbtable;
193 use crate::executor::test_utils::MockSource;
194
195 #[tokio::test]
196 async fn test_dedup_executor() {
197 let table_id = TableId::new(1);
198 let column_descs = vec![
199 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
200 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
201 ];
202 let schema = Schema::new(vec![
203 Field::unnamed(DataType::Int64),
204 Field::unnamed(DataType::Int64),
205 ]);
206 let dedup_col_indices = vec![0];
207 let pk_indices = dedup_col_indices.clone();
208 let order_types = vec![OrderType::ascending()];
209
210 let state_store = MemoryStateStore::new();
211 let state_table = StateTable::from_table_catalog(
212 &gen_pbtable(table_id, column_descs, order_types, pk_indices.clone(), 0),
213 state_store,
214 None,
215 )
216 .await;
217
218 let (mut tx, source) = MockSource::channel();
219 let source = source.into_executor(schema, pk_indices);
220 let mut dedup_executor = AppendOnlyDedupExecutor::new(
221 ActorContext::for_test(123),
222 source,
223 dedup_col_indices,
224 state_table,
225 Arc::new(AtomicU64::new(0)),
226 Arc::new(StreamingMetrics::unused()),
227 )
228 .boxed()
229 .execute();
230
231 tx.push_barrier(test_epoch(1), false);
232 dedup_executor.next().await.unwrap().unwrap();
233
234 let chunk = StreamChunk::from_pretty(
235 " I I
236 + 1 1
237 + 2 2 D
238 + 1 7",
239 );
240 tx.push_chunk(chunk);
241 let msg = dedup_executor.next().await.unwrap().unwrap();
242 assert_eq!(
243 msg.into_chunk().unwrap(),
244 StreamChunk::from_pretty(
245 " I I
246 + 1 1
247 + 2 2 D
248 + 1 7 D",
249 )
250 );
251
252 tx.push_barrier(test_epoch(2), false);
253 dedup_executor.next().await.unwrap().unwrap();
254
255 let chunk = StreamChunk::from_pretty(
256 " I I
257 + 3 9
258 + 2 5
259 + 1 20",
260 );
261 tx.push_chunk(chunk);
262 let msg = dedup_executor.next().await.unwrap().unwrap();
263 assert_eq!(
264 msg.into_chunk().unwrap(),
265 StreamChunk::from_pretty(
266 " I I
267 + 3 9
268 + 2 5
269 + 1 20 D",
270 )
271 );
272 }
273}