risingwave_stream/executor/dedup/
append_only_dedup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::stream;
16use itertools::Itertools;
17use risingwave_common::array::Op;
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::row::RowExt;
20
21use super::cache::DedupCache;
22use crate::common::metrics::MetricsInfo;
23use crate::executor::prelude::*;
24
25/// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous
26/// messages. It only accepts append-only input, and its output will be append-only as well.
27pub struct AppendOnlyDedupExecutor<S: StateStore> {
28    ctx: ActorContextRef,
29
30    input: Option<Executor>,
31    dedup_cols: Vec<usize>,
32    state_table: StateTable<S>,
33    cache: DedupCache<OwnedRow>,
34}
35
36impl<S: StateStore> AppendOnlyDedupExecutor<S> {
37    pub fn new(
38        ctx: ActorContextRef,
39        input: Executor,
40        dedup_cols: Vec<usize>,
41        state_table: StateTable<S>,
42        watermark_epoch: AtomicU64Ref,
43        metrics: Arc<StreamingMetrics>,
44    ) -> Self {
45        let metrics_info =
46            MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
47        Self {
48            ctx,
49            input: Some(input),
50            dedup_cols,
51            state_table,
52            cache: DedupCache::new(watermark_epoch, metrics_info),
53        }
54    }
55
56    #[try_stream(ok = Message, error = StreamExecutorError)]
57    async fn executor_inner(mut self) {
58        let mut input = self.input.take().unwrap().execute();
59
60        // Consume the first barrier message and initialize state table.
61        let barrier = expect_first_barrier(&mut input).await?;
62        let first_epoch = barrier.epoch;
63        // The first barrier message should be propagated.
64        yield Message::Barrier(barrier);
65        self.state_table.init_epoch(first_epoch).await?;
66
67        #[for_await]
68        for msg in input {
69            self.cache.evict();
70
71            match msg? {
72                Message::Chunk(chunk) => {
73                    // Append-only dedup executor only receives INSERT messages.
74                    debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));
75
76                    // Extract pk for all rows (regardless of visibility) in the chunk.
77                    let keys = chunk
78                        .data_chunk()
79                        .rows_with_holes()
80                        .map(|row_ref| {
81                            row_ref.map(|row| row.project(&self.dedup_cols).to_owned_row())
82                        })
83                        .collect_vec();
84
85                    // Ensure that if a key for a visible row exists before, then it is in the
86                    // cache, by querying the storage.
87                    self.populate_cache(keys.iter().flatten()).await?;
88
89                    // Now check for duplication and insert new keys into the cache.
90                    let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
91                    for key in keys {
92                        match key {
93                            Some(key) => {
94                                if self.cache.dedup_insert(key) {
95                                    // The key doesn't exist before. The row should be visible.
96                                    vis_builder.append(true);
97                                } else {
98                                    // The key exists before. The row shouldn't be visible.
99                                    vis_builder.append(false);
100                                }
101                            }
102                            None => {
103                                // The row is originally invisible.
104                                vis_builder.append(false);
105                            }
106                        }
107                    }
108
109                    let vis = vis_builder.finish();
110                    if vis.count_ones() > 0 {
111                        // Construct the new chunk and write the data to state table.
112                        let (ops, columns, _) = chunk.into_inner();
113                        let chunk = StreamChunk::with_visibility(ops, columns, vis);
114                        self.state_table.write_chunk(chunk.clone());
115
116                        yield Message::Chunk(chunk);
117                    }
118                    self.state_table.try_flush().await?;
119                }
120
121                Message::Barrier(barrier) => {
122                    let post_commit = self.state_table.commit(barrier.epoch).await?;
123
124                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
125                    yield Message::Barrier(barrier);
126
127                    if let Some((_, cache_may_stale)) =
128                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
129                    {
130                        if cache_may_stale {
131                            self.cache.clear();
132                        }
133                    }
134                }
135
136                Message::Watermark(watermark) => {
137                    yield Message::Watermark(watermark);
138                }
139            }
140        }
141    }
142
143    /// Populate the cache with keys that exist in storage before.
144    pub async fn populate_cache<'a>(
145        &mut self,
146        keys: impl Iterator<Item = &'a OwnedRow>,
147    ) -> StreamExecutorResult<()> {
148        let mut read_from_storage = false;
149        let mut futures = vec![];
150        for key in keys {
151            if self.cache.contains(key) {
152                continue;
153            }
154            read_from_storage = true;
155
156            let table = &self.state_table;
157            futures.push(async move { (key, table.get_encoded_row(key).await) });
158        }
159
160        if read_from_storage {
161            let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
162            while let Some(result) = buffered.next().await {
163                let (key, value) = result;
164                if value?.is_some() {
165                    // Only insert into the cache when we have this key in storage.
166                    self.cache.insert(key.to_owned());
167                }
168            }
169        }
170
171        Ok(())
172    }
173}
174
175impl<S: StateStore> Execute for AppendOnlyDedupExecutor<S> {
176    fn execute(self: Box<Self>) -> BoxedMessageStream {
177        self.executor_inner().boxed()
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use std::sync::atomic::AtomicU64;
184
185    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
186    use risingwave_common::test_prelude::StreamChunkTestExt;
187    use risingwave_common::util::epoch::test_epoch;
188    use risingwave_common::util::sort_util::OrderType;
189    use risingwave_storage::memory::MemoryStateStore;
190
191    use super::*;
192    use crate::common::table::test_utils::gen_pbtable;
193    use crate::executor::test_utils::MockSource;
194
195    #[tokio::test]
196    async fn test_dedup_executor() {
197        let table_id = TableId::new(1);
198        let column_descs = vec![
199            ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
200            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
201        ];
202        let schema = Schema::new(vec![
203            Field::unnamed(DataType::Int64),
204            Field::unnamed(DataType::Int64),
205        ]);
206        let dedup_col_indices = vec![0];
207        let pk_indices = dedup_col_indices.clone();
208        let order_types = vec![OrderType::ascending()];
209
210        let state_store = MemoryStateStore::new();
211        let state_table = StateTable::from_table_catalog(
212            &gen_pbtable(table_id, column_descs, order_types, pk_indices.clone(), 0),
213            state_store,
214            None,
215        )
216        .await;
217
218        let (mut tx, source) = MockSource::channel();
219        let source = source.into_executor(schema, pk_indices);
220        let mut dedup_executor = AppendOnlyDedupExecutor::new(
221            ActorContext::for_test(123),
222            source,
223            dedup_col_indices,
224            state_table,
225            Arc::new(AtomicU64::new(0)),
226            Arc::new(StreamingMetrics::unused()),
227        )
228        .boxed()
229        .execute();
230
231        tx.push_barrier(test_epoch(1), false);
232        dedup_executor.next().await.unwrap().unwrap();
233
234        let chunk = StreamChunk::from_pretty(
235            " I I
236            + 1 1
237            + 2 2 D
238            + 1 7",
239        );
240        tx.push_chunk(chunk);
241        let msg = dedup_executor.next().await.unwrap().unwrap();
242        assert_eq!(
243            msg.into_chunk().unwrap(),
244            StreamChunk::from_pretty(
245                " I I
246                + 1 1
247                + 2 2 D
248                + 1 7 D",
249            )
250        );
251
252        tx.push_barrier(test_epoch(2), false);
253        dedup_executor.next().await.unwrap().unwrap();
254
255        let chunk = StreamChunk::from_pretty(
256            " I I
257            + 3 9
258            + 2 5
259            + 1 20",
260        );
261        tx.push_chunk(chunk);
262        let msg = dedup_executor.next().await.unwrap().unwrap();
263        assert_eq!(
264            msg.into_chunk().unwrap(),
265            StreamChunk::from_pretty(
266                " I I
267                + 3 9
268                + 2 5
269                + 1 20 D",
270            )
271        );
272    }
273}