risingwave_stream/executor/dedup/
append_only_dedup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::stream;
16use itertools::Itertools;
17use risingwave_common::array::Op;
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::row::RowExt;
20
21use crate::cache::ManagedLruCache;
22use crate::common::metrics::MetricsInfo;
23use crate::executor::prelude::*;
24
25type Cache = ManagedLruCache<OwnedRow, ()>;
26
27/// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous
28/// messages. It only accepts append-only input, and its output will be append-only as well.
29pub struct AppendOnlyDedupExecutor<S: StateStore> {
30    ctx: ActorContextRef,
31
32    input: Option<Executor>,
33    dedup_cols: Vec<usize>,
34    state_table: StateTable<S>,
35    cache: Cache,
36}
37
38impl<S: StateStore> AppendOnlyDedupExecutor<S> {
39    pub fn new(
40        ctx: ActorContextRef,
41        input: Executor,
42        dedup_cols: Vec<usize>,
43        state_table: StateTable<S>,
44        watermark_epoch: AtomicU64Ref,
45        metrics: Arc<StreamingMetrics>,
46    ) -> Self {
47        let metrics_info =
48            MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
49        Self {
50            ctx,
51            input: Some(input),
52            dedup_cols,
53            state_table,
54            cache: Cache::unbounded(watermark_epoch, metrics_info),
55        }
56    }
57
58    #[try_stream(ok = Message, error = StreamExecutorError)]
59    async fn executor_inner(mut self) {
60        let mut input = self.input.take().unwrap().execute();
61
62        // Consume the first barrier message and initialize state table.
63        let barrier = expect_first_barrier(&mut input).await?;
64        let first_epoch = barrier.epoch;
65        // The first barrier message should be propagated.
66        yield Message::Barrier(barrier);
67        self.state_table.init_epoch(first_epoch).await?;
68
69        #[for_await]
70        for msg in input {
71            self.cache.evict();
72
73            match msg? {
74                Message::Chunk(chunk) => {
75                    // Append-only dedup executor only receives INSERT messages.
76                    debug_assert!(chunk.ops().iter().all(|&op| op == Op::Insert));
77
78                    // Extract dedup keys for all rows (regardless of visibility) in the chunk.
79                    let dedup_keys = chunk
80                        .data_chunk()
81                        .rows_with_holes()
82                        .map(|row_ref| {
83                            row_ref.map(|row| row.project(&self.dedup_cols).to_owned_row())
84                        })
85                        .collect_vec();
86
87                    // Ensure that if a key for a visible row exists before, then it is in the
88                    // cache, by querying the storage.
89                    self.populate_cache(dedup_keys.iter().flatten()).await?;
90
91                    // Now check for duplication and insert new keys into the cache.
92                    let mut vis_builder = BitmapBuilder::with_capacity(chunk.capacity());
93                    for key in dedup_keys {
94                        match key {
95                            Some(key) => {
96                                if self.cache.put(key, ()).is_none() {
97                                    // The key doesn't exist before. The row should be visible.
98                                    // Note that we can do deduplication in such a simple way because
99                                    // this executor only accepts append-only input. Otherwise, we can
100                                    // only do this if dedup columns contain all the input columns.
101                                    vis_builder.append(true);
102                                } else {
103                                    // The key exists before. The row shouldn't be visible.
104                                    vis_builder.append(false);
105                                }
106                            }
107                            None => {
108                                // The row is originally invisible.
109                                vis_builder.append(false);
110                            }
111                        }
112                    }
113
114                    let vis = vis_builder.finish();
115                    if vis.count_ones() > 0 {
116                        // Construct the new chunk and write the data to state table.
117                        let (ops, columns, _) = chunk.into_inner();
118                        let chunk = StreamChunk::with_visibility(ops, columns, vis);
119                        self.state_table.write_chunk(chunk.clone());
120                        self.state_table.try_flush().await?;
121
122                        yield Message::Chunk(chunk);
123                    }
124                }
125
126                Message::Barrier(barrier) => {
127                    let post_commit = self.state_table.commit(barrier.epoch).await?;
128
129                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
130                    yield Message::Barrier(barrier);
131
132                    if let Some((_, cache_may_stale)) =
133                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
134                    {
135                        if cache_may_stale {
136                            self.cache.clear();
137                        }
138                    }
139                }
140
141                Message::Watermark(watermark) => {
142                    yield Message::Watermark(watermark);
143                }
144            }
145        }
146    }
147
148    /// Populate the cache with keys that exist in storage before.
149    pub async fn populate_cache<'a>(
150        &mut self,
151        keys: impl Iterator<Item = &'a OwnedRow>,
152    ) -> StreamExecutorResult<()> {
153        let mut futures = vec![];
154        for key in keys {
155            if self.cache.contains(key) {
156                continue;
157            }
158
159            let table = &self.state_table;
160            futures.push(async move { (key, table.get_encoded_row(key).await) });
161        }
162
163        if !futures.is_empty() {
164            let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
165            while let Some(result) = buffered.next().await {
166                let (key, value) = result;
167                if value?.is_some() {
168                    // Only insert into the cache when we have this key in storage.
169                    self.cache.put(key.to_owned(), ());
170                }
171            }
172        }
173
174        Ok(())
175    }
176}
177
178impl<S: StateStore> Execute for AppendOnlyDedupExecutor<S> {
179    fn execute(self: Box<Self>) -> BoxedMessageStream {
180        self.executor_inner().boxed()
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use std::sync::atomic::AtomicU64;
187
188    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
189    use risingwave_common::test_prelude::StreamChunkTestExt;
190    use risingwave_common::util::epoch::test_epoch;
191    use risingwave_common::util::sort_util::OrderType;
192    use risingwave_storage::memory::MemoryStateStore;
193
194    use super::*;
195    use crate::common::table::test_utils::gen_pbtable;
196    use crate::executor::test_utils::MockSource;
197
198    #[tokio::test]
199    async fn test_dedup_executor() {
200        let table_id = TableId::new(1);
201        let column_descs = vec![
202            ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
203            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64),
204        ];
205        let schema = Schema::new(vec![
206            Field::unnamed(DataType::Int64),
207            Field::unnamed(DataType::Int64),
208        ]);
209        let dedup_col_indices = vec![0];
210        let pk_indices = dedup_col_indices.clone();
211        let order_types = vec![OrderType::ascending()];
212
213        let state_store = MemoryStateStore::new();
214        let state_table = StateTable::from_table_catalog(
215            &gen_pbtable(table_id, column_descs, order_types, pk_indices.clone(), 0),
216            state_store,
217            None,
218        )
219        .await;
220
221        let (mut tx, source) = MockSource::channel();
222        let source = source.into_executor(schema, pk_indices);
223        let mut dedup_executor = AppendOnlyDedupExecutor::new(
224            ActorContext::for_test(123),
225            source,
226            dedup_col_indices,
227            state_table,
228            Arc::new(AtomicU64::new(0)),
229            Arc::new(StreamingMetrics::unused()),
230        )
231        .boxed()
232        .execute();
233
234        tx.push_barrier(test_epoch(1), false);
235        dedup_executor.next().await.unwrap().unwrap();
236
237        let chunk = StreamChunk::from_pretty(
238            " I I
239            + 1 1
240            + 2 2 D
241            + 1 7",
242        );
243        tx.push_chunk(chunk);
244        let msg = dedup_executor.next().await.unwrap().unwrap();
245        assert_eq!(
246            msg.into_chunk().unwrap(),
247            StreamChunk::from_pretty(
248                " I I
249                + 1 1
250                + 2 2 D
251                + 1 7 D",
252            )
253        );
254
255        tx.push_barrier(test_epoch(2), false);
256        dedup_executor.next().await.unwrap().unwrap();
257
258        let chunk = StreamChunk::from_pretty(
259            " I I
260            + 3 9
261            + 2 5
262            + 1 20",
263        );
264        tx.push_chunk(chunk);
265        let msg = dedup_executor.next().await.unwrap().unwrap();
266        assert_eq!(
267            msg.into_chunk().unwrap(),
268            StreamChunk::from_pretty(
269                " I I
270                + 3 9
271                + 2 5
272                + 1 20 D",
273            )
274        );
275    }
276}