risingwave_stream/executor/vector/
index_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::TryStreamExt;
16use risingwave_common::array::{Op, StreamChunk};
17use risingwave_common::bail;
18use risingwave_hummock_sdk::HummockReadEpoch;
19use risingwave_storage::StateStore;
20use risingwave_storage::table::batch_table::VectorIndexReader;
21
22use crate::executor::prelude::try_stream;
23use crate::executor::{
24    BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, expect_first_barrier,
25};
26
27pub struct VectorIndexLookupJoinExecutor<S: StateStore> {
28    input: Executor,
29
30    reader: VectorIndexReader<S>,
31    vector_column_idx: usize,
32}
33
34impl<S: StateStore> Execute for VectorIndexLookupJoinExecutor<S> {
35    fn execute(self: Box<Self>) -> BoxedMessageStream {
36        Box::pin(self.execute_inner())
37    }
38}
39
40impl<S: StateStore> VectorIndexLookupJoinExecutor<S> {
41    pub fn new(input: Executor, reader: VectorIndexReader<S>, vector_column_idx: usize) -> Self {
42        Self {
43            input,
44            reader,
45            vector_column_idx,
46        }
47    }
48
49    #[try_stream(ok = Message, error = StreamExecutorError)]
50    pub async fn execute_inner(self) {
51        let Self {
52            input,
53            reader,
54            vector_column_idx,
55        } = self;
56
57        let mut input = input.execute();
58
59        let barrier = expect_first_barrier(&mut input).await?;
60        let first_epoch = barrier.epoch;
61        yield Message::Barrier(barrier);
62
63        let mut read_snapshot = reader
64            .new_snapshot(HummockReadEpoch::Committed(first_epoch.prev))
65            .await?;
66
67        while let Some(msg) = input.try_next().await? {
68            match msg {
69                Message::Barrier(barrier) => {
70                    let is_checkpoint = barrier.is_checkpoint();
71                    let prev_epoch = barrier.epoch.prev;
72                    yield Message::Barrier(barrier);
73                    if is_checkpoint {
74                        read_snapshot = reader
75                            .new_snapshot(HummockReadEpoch::Committed(prev_epoch))
76                            .await?;
77                    }
78                }
79                Message::Chunk(chunk) => {
80                    let (chunk, ops) = chunk.into_parts();
81                    if ops.iter().any(|op| *op != Op::Insert) {
82                        bail!("streaming vector index lookup join only support append-only input");
83                    }
84                    let chunk = read_snapshot
85                        .query_expand_chunk(chunk, vector_column_idx)
86                        .await?;
87                    yield Message::Chunk(StreamChunk::from_parts(ops, chunk));
88                }
89                Message::Watermark(watermark) => {
90                    yield Message::Watermark(watermark);
91                }
92            }
93        }
94    }
95}