risingwave_stream/executor/vector/
index_lookup_join.rs1use futures::TryStreamExt;
16use risingwave_common::array::{Op, StreamChunk};
17use risingwave_common::bail;
18use risingwave_hummock_sdk::HummockReadEpoch;
19use risingwave_storage::StateStore;
20use risingwave_storage::table::batch_table::VectorIndexReader;
21
22use crate::executor::prelude::try_stream;
23use crate::executor::{
24 BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, expect_first_barrier,
25};
26
27pub struct VectorIndexLookupJoinExecutor<S: StateStore> {
28 input: Executor,
29
30 reader: VectorIndexReader<S>,
31 vector_column_idx: usize,
32}
33
34impl<S: StateStore> Execute for VectorIndexLookupJoinExecutor<S> {
35 fn execute(self: Box<Self>) -> BoxedMessageStream {
36 Box::pin(self.execute_inner())
37 }
38}
39
40impl<S: StateStore> VectorIndexLookupJoinExecutor<S> {
41 pub fn new(input: Executor, reader: VectorIndexReader<S>, vector_column_idx: usize) -> Self {
42 Self {
43 input,
44 reader,
45 vector_column_idx,
46 }
47 }
48
49 #[try_stream(ok = Message, error = StreamExecutorError)]
50 pub async fn execute_inner(self) {
51 let Self {
52 input,
53 reader,
54 vector_column_idx,
55 } = self;
56
57 let mut input = input.execute();
58
59 let barrier = expect_first_barrier(&mut input).await?;
60 let first_epoch = barrier.epoch;
61 yield Message::Barrier(barrier);
62
63 let mut read_snapshot = reader
64 .new_snapshot(HummockReadEpoch::Committed(first_epoch.prev))
65 .await?;
66
67 while let Some(msg) = input.try_next().await? {
68 match msg {
69 Message::Barrier(barrier) => {
70 let is_checkpoint = barrier.is_checkpoint();
71 let prev_epoch = barrier.epoch.prev;
72 yield Message::Barrier(barrier);
73 if is_checkpoint {
74 read_snapshot = reader
75 .new_snapshot(HummockReadEpoch::Committed(prev_epoch))
76 .await?;
77 }
78 }
79 Message::Chunk(chunk) => {
80 let (chunk, ops) = chunk.into_parts();
81 if ops.iter().any(|op| *op != Op::Insert) {
82 bail!("streaming vector index lookup join only support append-only input");
83 }
84 let chunk = read_snapshot
85 .query_expand_chunk(chunk, vector_column_idx)
86 .await?;
87 yield Message::Chunk(StreamChunk::from_parts(ops, chunk));
88 }
89 Message::Watermark(watermark) => {
90 yield Message::Watermark(watermark);
91 }
92 }
93 }
94 }
95}