risingwave_stream/executor/vector_index/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use futures::TryStreamExt;
17use itertools::Itertools;
18use risingwave_common::array::Op;
19use risingwave_common::catalog::TableId;
20use risingwave_common::row::{Row, RowExt};
21use risingwave_common::util::value_encoding::{BasicSerializer, ValueRowSerializer};
22use risingwave_storage::StateStore;
23use risingwave_storage::store::{
24    InitOptions, NewVectorWriterOptions, SealCurrentEpochOptions, StateStoreWriteEpochControl,
25    StateStoreWriteVector,
26};
27
28use crate::executor::prelude::try_stream;
29use crate::executor::{
30    BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, StreamExecutorResult,
31    expect_first_barrier,
32};
33
34pub struct VectorIndexWriteExecutor<S: StateStore> {
35    input: Executor,
36    vector_writer: S::VectorWriter,
37    serializer: BasicSerializer,
38}
39
40impl<S: StateStore> Execute for VectorIndexWriteExecutor<S> {
41    fn execute(self: Box<Self>) -> BoxedMessageStream {
42        Box::pin(self.execute_inner())
43    }
44}
45
46impl<S: StateStore> VectorIndexWriteExecutor<S> {
47    pub async fn new(input: Executor, store: S, table_id: TableId) -> StreamExecutorResult<Self> {
48        let vector_writer = store
49            .new_vector_writer(NewVectorWriterOptions { table_id })
50            .await;
51        Ok(Self {
52            input,
53            vector_writer,
54            serializer: BasicSerializer,
55        })
56    }
57
58    #[try_stream(ok = Message, error = StreamExecutorError)]
59    pub async fn execute_inner(mut self) {
60        let info_column_indices = (1..self.input.schema().len()).collect_vec();
61        let mut input = self.input.execute();
62
63        let barrier = expect_first_barrier(&mut input).await?;
64        let first_epoch = barrier.epoch;
65        yield Message::Barrier(barrier);
66        self.vector_writer
67            .init(InitOptions { epoch: first_epoch })
68            .await?;
69
70        while let Some(msg) = input.try_next().await? {
71            match msg {
72                Message::Barrier(barrier) => {
73                    self.vector_writer.flush().await?;
74                    self.vector_writer.seal_current_epoch(
75                        barrier.epoch.curr,
76                        SealCurrentEpochOptions {
77                            table_watermarks: None,
78                            switch_op_consistency_level: None,
79                        },
80                    );
81                    yield Message::Barrier(barrier);
82                }
83                Message::Chunk(chunk) => {
84                    for (op, row) in chunk.rows() {
85                        if op != Op::Insert {
86                            return Err(anyhow!(
87                                "should be append-only for vector index writer but receive op {:?}",
88                                op
89                            )
90                            .into());
91                        }
92                        let vector_datum = row.datum_at(0);
93                        let Some(vector_datum) = vector_datum else {
94                            warn!(
95                                ?row,
96                                "vector index writer received a row with null vector datum, skipping"
97                            );
98                            continue;
99                        };
100                        let vector = vector_datum.into_vector();
101                        let info = self
102                            .serializer
103                            .serialize(row.project(&info_column_indices))
104                            .into();
105                        self.vector_writer.insert(vector, info)?;
106                    }
107                    self.vector_writer.try_flush().await?;
108                    yield Message::Chunk(chunk);
109                }
110                Message::Watermark(watermark) => {
111                    yield Message::Watermark(watermark);
112                }
113            }
114        }
115    }
116}