risingwave_stream/executor/dedup/
append_only_dedup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::stream;
16use itertools::Itertools;
17use risingwave_common::array::Op;
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::row::RowExt;
20
21use crate::cache::ManagedLruCache;
22use crate::common::metrics::MetricsInfo;
23use crate::executor::prelude::*;
24
25type Cache = ManagedLruCache<OwnedRow, ()>;
26
27/// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous
28/// messages. It only accepts append-only input, and its output will be append-only as well.
29pub struct AppendOnlyDedupExecutor<S: StateStore> {
30    ctx: ActorContextRef,
31
32    input: Option<Executor>,
33    dedup_cols: Vec<usize>,
34    state_table: StateTable<S>,
35    cache: Cache,
36}
37
38impl<S: StateStore> AppendOnlyDedupExecutor<S> {
39    pub fn new(
40        ctx: ActorContextRef,
41        input: Executor,
42        dedup_cols: Vec<usize>,
43        state_table: StateTable<S>,
44        watermark_epoch: AtomicU64Ref,
45        metrics: Arc<StreamingMetrics>,
46    ) -> Self {
47        let metrics_info =
48            MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
49        Self {
50            ctx,
51            input: Some(input),
52            dedup_cols,
53            state_table,
54            cache: Cache::unbounded(watermark_epoch, metrics_info),
55        }
56    }
57
58    #[try_stream(ok = Message, error = StreamExecutorError)]
59    async fn executor_inner(mut self) {
60        let mut input = self.input.take().unwrap().execute();
61
62        // Consume the first barrier message and initialize state table.
63        let barrier = expect_first_barrier(&mut input).await?;
64        let first_epoch = barrier.epoch;
65        // The first barrier message should be propagated.
66        yield Message::Barrier(barrier);
67        self.state_table.init_epoch(first_epoch).await?;
68
69        #[for_await]
70        for msg in input {
71            self.cache.evict();
72
73            match msg? {
74                Message::Chunk(chunk) => {
75                    // Append-only dedup executor only receives INSERT messages.
76                    debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));
77
78                    // Extract dedup keys for all rows (regardless of visibility) in the chunk.
79                    let dedup_keys = chunk
80                        .data_chunk()
81                        .rows_with_holes()
82                        .map(|row_ref| {
83                            row_ref.map(|row| row.project(&self.dedup_cols).to_owned_row())
84                        })
85                        .collect_vec();
86
87                    // Ensure that if a key for a visible row exists before, then it is in the
88                    // cache, by querying the storage.
89                    self.populate_cache(dedup_keys.iter().flatten()).await?;
90
91                    // Now check for duplication and insert new keys into the cache.
92                    let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
93                    for key in dedup_keys {
94                        match key {
95                            Some(key) => {
96                                if self.cache.put(key, ()).is_none() {
97                                    // The key doesn't exist before. The row should be visible.
98                                    // Note that we can do deduplication in such a simple way because
99                                    // this executor only accepts append-only input. Otherwise, we can
100                                    // only do this if dedup columns contain all the input columns.
101                                    vis_builder.append(true);
102                                } else {
103                                    // The key exists before. The row shouldn't be visible.
104                                    vis_builder.append(false);
105                                }
106                            }
107                            None => {
108                                // The row is originally invisible.
109                                vis_builder.append(false);
110                            }
111                        }
112                    }
113
114                    let vis = vis_builder.finish();
115                    if vis.count_ones() > 0 {
116                        // Construct the new chunk and write the data to state table.
117                        let (ops, columns, _) = chunk.into_inner();
118                        let chunk = StreamChunk::with_visibility(ops, columns, vis);
119                        self.state_table.write_chunk(chunk.clone());
120                        self.state_table.try_flush().await?;
121
122                        yield Message::Chunk(chunk);
123                    }
124                }
125
126                Message::Barrier(barrier) => {
127                    let post_commit = self.state_table.commit(barrier.epoch).await?;
128
129                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
130                    yield Message::Barrier(barrier);
131
132                    if let Some((_, cache_may_stale)) =
133                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
134                        && cache_may_stale
135                    {
136                        self.cache.clear();
137                    }
138                }
139
140                Message::Watermark(watermark) => {
141                    yield Message::Watermark(watermark);
142                }
143            }
144        }
145    }
146
147    /// Populate the cache with keys that exist in storage before.
148    pub async fn populate_cache<'a>(
149        &mut self,
150        keys: impl Iterator<Item = &'a OwnedRow>,
151    ) -> StreamExecutorResult<()> {
152        let mut futures = vec![];
153        for key in keys {
154            if self.cache.contains(key) {
155                continue;
156            }
157
158            let table = &self.state_table;
159            futures.push(async move { (key, table.get_encoded_row(key).await) });
160        }
161
162        if !futures.is_empty() {
163            let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
164            while let Some(result) = buffered.next().await {
165                let (key, value) = result;
166                if value?.is_some() {
167                    // Only insert into the cache when we have this key in storage.
168                    self.cache.put(key.to_owned(), ());
169                }
170            }
171        }
172
173        Ok(())
174    }
175}
176
177impl<S: StateStore> Execute for AppendOnlyDedupExecutor<S> {
178    fn execute(self: Box<Self>) -> BoxedMessageStream {
179        self.executor_inner().boxed()
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use std::sync::atomic::AtomicU64;
186
187    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
188    use risingwave_common::test_prelude::StreamChunkTestExt;
189    use risingwave_common::util::epoch::test_epoch;
190    use risingwave_common::util::sort_util::OrderType;
191    use risingwave_storage::memory::MemoryStateStore;
192
193    use super::*;
194    use crate::common::table::test_utils::gen_pbtable;
195    use crate::executor::test_utils::MockSource;
196
197    #[tokio::test]
198    async fn test_dedup_executor() {
199        let table_id = TableId::new(1);
200        let column_descs = vec![
201            ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
202            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
203        ];
204        let schema = Schema::new(vec![
205            Field::unnamed(DataType::Int64),
206            Field::unnamed(DataType::Int64),
207        ]);
208        let dedup_col_indices = vec![0];
209        let pk_indices = dedup_col_indices.clone();
210        let order_types = vec![OrderType::ascending()];
211
212        let state_store = MemoryStateStore::new();
213        let state_table = StateTable::from_table_catalog(
214            &gen_pbtable(table_id, column_descs, order_types, pk_indices.clone(), 0),
215            state_store,
216            None,
217        )
218        .await;
219
220        let (mut tx, source) = MockSource::channel();
221        let source = source.into_executor(schema, pk_indices);
222        let mut dedup_executor = AppendOnlyDedupExecutor::new(
223            ActorContext::for_test(123),
224            source,
225            dedup_col_indices,
226            state_table,
227            Arc::new(AtomicU64::new(0)),
228            Arc::new(StreamingMetrics::unused()),
229        )
230        .boxed()
231        .execute();
232
233        tx.push_barrier(test_epoch(1), false);
234        dedup_executor.next().await.unwrap().unwrap();
235
236        let chunk = StreamChunk::from_pretty(
237            " I I
238            + 1 1
239            + 2 2 D
240            + 1 7",
241        );
242        tx.push_chunk(chunk);
243        let msg = dedup_executor.next().await.unwrap().unwrap();
244        assert_eq!(
245            msg.into_chunk().unwrap(),
246            StreamChunk::from_pretty(
247                " I I
248                + 1 1
249                + 2 2 D
250                + 1 7 D",
251            )
252        );
253
254        tx.push_barrier(test_epoch(2), false);
255        dedup_executor.next().await.unwrap().unwrap();
256
257        let chunk = StreamChunk::from_pretty(
258            " I I
259            + 3 9
260            + 2 5
261            + 1 20",
262        );
263        tx.push_chunk(chunk);
264        let msg = dedup_executor.next().await.unwrap().unwrap();
265        assert_eq!(
266            msg.into_chunk().unwrap(),
267            StreamChunk::from_pretty(
268                " I I
269                + 3 9
270                + 2 5
271                + 1 20 D",
272            )
273        );
274    }
275}