risingwave_stream/executor/vector_index/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use futures::TryStreamExt;
17use itertools::Itertools;
18use risingwave_common::array::Op;
19use risingwave_common::catalog::TableId;
20use risingwave_common::row::{Row, RowExt};
21use risingwave_common::util::value_encoding::{BasicSerializer, ValueRowSerializer};
22use risingwave_storage::StateStore;
23use risingwave_storage::store::{
24    InitOptions, NewVectorWriterOptions, SealCurrentEpochOptions, StateStoreWriteEpochControl,
25    StateStoreWriteVector,
26};
27use risingwave_storage::vector::Vector;
28
29use crate::executor::prelude::try_stream;
30use crate::executor::{
31    BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, StreamExecutorResult,
32    expect_first_barrier,
33};
34
35pub struct VectorIndexWriteExecutor<S: StateStore> {
36    input: Executor,
37    vector_writer: S::VectorWriter,
38    serializer: BasicSerializer,
39}
40
41impl<S: StateStore> Execute for VectorIndexWriteExecutor<S> {
42    fn execute(self: Box<Self>) -> BoxedMessageStream {
43        Box::pin(self.execute_inner())
44    }
45}
46
47impl<S: StateStore> VectorIndexWriteExecutor<S> {
48    pub async fn new(input: Executor, store: S, table_id: TableId) -> StreamExecutorResult<Self> {
49        let vector_writer = store
50            .new_vector_writer(NewVectorWriterOptions { table_id })
51            .await;
52        Ok(Self {
53            input,
54            vector_writer,
55            serializer: BasicSerializer,
56        })
57    }
58
59    #[try_stream(ok = Message, error = StreamExecutorError)]
60    pub async fn execute_inner(mut self) {
61        let info_column_indices = (1..self.input.schema().len()).collect_vec();
62        let mut input = self.input.execute();
63
64        let barrier = expect_first_barrier(&mut input).await?;
65        let first_epoch = barrier.epoch;
66        yield Message::Barrier(barrier);
67        self.vector_writer
68            .init(InitOptions { epoch: first_epoch })
69            .await?;
70
71        while let Some(msg) = input.try_next().await? {
72            match msg {
73                Message::Barrier(barrier) => {
74                    self.vector_writer.flush().await?;
75                    self.vector_writer.seal_current_epoch(
76                        barrier.epoch.curr,
77                        SealCurrentEpochOptions {
78                            table_watermarks: None,
79                            switch_op_consistency_level: None,
80                        },
81                    );
82                    yield Message::Barrier(barrier);
83                }
84                Message::Chunk(chunk) => {
85                    for (op, row) in chunk.rows() {
86                        if op != Op::Insert {
87                            return Err(anyhow!(
88                                "should be append-only for vector index writer but receive op {:?}",
89                                op
90                            )
91                            .into());
92                        }
93                        let vector_datum = row.datum_at(0);
94                        let Some(vector_datum) = vector_datum else {
95                            warn!(
96                                ?row,
97                                "vector index writer received a row with null vector datum, skipping"
98                            );
99                            continue;
100                        };
101                        let vector = vector_datum.into_vector();
102                        let vector = Vector::new(vector.into_slice());
103                        let info = self
104                            .serializer
105                            .serialize(row.project(&info_column_indices))
106                            .into();
107                        self.vector_writer.insert(vector, info)?;
108                    }
109                    self.vector_writer.try_flush().await?;
110                    yield Message::Chunk(chunk);
111                }
112                Message::Watermark(watermark) => {
113                    yield Message::Watermark(watermark);
114                }
115            }
116        }
117    }
118}