risingwave_stream/executor/vector_index/
mod.rs1use anyhow::anyhow;
16use futures::TryStreamExt;
17use itertools::Itertools;
18use risingwave_common::array::Op;
19use risingwave_common::catalog::TableId;
20use risingwave_common::row::{Row, RowExt};
21use risingwave_common::util::value_encoding::{BasicSerializer, ValueRowSerializer};
22use risingwave_storage::StateStore;
23use risingwave_storage::store::{
24 InitOptions, NewVectorWriterOptions, SealCurrentEpochOptions, StateStoreWriteEpochControl,
25 StateStoreWriteVector,
26};
27use risingwave_storage::vector::Vector;
28
29use crate::executor::prelude::try_stream;
30use crate::executor::{
31 BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, StreamExecutorResult,
32 expect_first_barrier,
33};
34
35pub struct VectorIndexWriteExecutor<S: StateStore> {
36 input: Executor,
37 vector_writer: S::VectorWriter,
38 serializer: BasicSerializer,
39}
40
41impl<S: StateStore> Execute for VectorIndexWriteExecutor<S> {
42 fn execute(self: Box<Self>) -> BoxedMessageStream {
43 Box::pin(self.execute_inner())
44 }
45}
46
47impl<S: StateStore> VectorIndexWriteExecutor<S> {
48 pub async fn new(input: Executor, store: S, table_id: TableId) -> StreamExecutorResult<Self> {
49 let vector_writer = store
50 .new_vector_writer(NewVectorWriterOptions { table_id })
51 .await;
52 Ok(Self {
53 input,
54 vector_writer,
55 serializer: BasicSerializer,
56 })
57 }
58
59 #[try_stream(ok = Message, error = StreamExecutorError)]
60 pub async fn execute_inner(mut self) {
61 let info_column_indices = (1..self.input.schema().len()).collect_vec();
62 let mut input = self.input.execute();
63
64 let barrier = expect_first_barrier(&mut input).await?;
65 let first_epoch = barrier.epoch;
66 yield Message::Barrier(barrier);
67 self.vector_writer
68 .init(InitOptions { epoch: first_epoch })
69 .await?;
70
71 while let Some(msg) = input.try_next().await? {
72 match msg {
73 Message::Barrier(barrier) => {
74 self.vector_writer.flush().await?;
75 self.vector_writer.seal_current_epoch(
76 barrier.epoch.curr,
77 SealCurrentEpochOptions {
78 table_watermarks: None,
79 switch_op_consistency_level: None,
80 },
81 );
82 yield Message::Barrier(barrier);
83 }
84 Message::Chunk(chunk) => {
85 for (op, row) in chunk.rows() {
86 if op != Op::Insert {
87 return Err(anyhow!(
88 "should be append-only for vector index writer but receive op {:?}",
89 op
90 )
91 .into());
92 }
93 let vector_datum = row.datum_at(0);
94 let Some(vector_datum) = vector_datum else {
95 warn!(
96 ?row,
97 "vector index writer received a row with null vector datum, skipping"
98 );
99 continue;
100 };
101 let vector = vector_datum.into_vector();
102 let vector = Vector::new(vector.into_slice());
103 let info = self
104 .serializer
105 .serialize(row.project(&info_column_indices))
106 .into();
107 self.vector_writer.insert(vector, info)?;
108 }
109 self.vector_writer.try_flush().await?;
110 yield Message::Chunk(chunk);
111 }
112 Message::Watermark(watermark) => {
113 yield Message::Watermark(watermark);
114 }
115 }
116 }
117 }
118}