risingwave_stream/executor/vector_index/
mod.rs1use anyhow::anyhow;
16use futures::TryStreamExt;
17use itertools::Itertools;
18use risingwave_common::array::Op;
19use risingwave_common::catalog::TableId;
20use risingwave_common::row::{Row, RowExt};
21use risingwave_common::util::value_encoding::{BasicSerializer, ValueRowSerializer};
22use risingwave_storage::StateStore;
23use risingwave_storage::store::{
24 InitOptions, NewVectorWriterOptions, SealCurrentEpochOptions, StateStoreWriteEpochControl,
25 StateStoreWriteVector,
26};
27
28use crate::executor::prelude::try_stream;
29use crate::executor::{
30 BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, StreamExecutorResult,
31 expect_first_barrier,
32};
33
34pub struct VectorIndexWriteExecutor<S: StateStore> {
35 input: Executor,
36 vector_writer: S::VectorWriter,
37 serializer: BasicSerializer,
38}
39
40impl<S: StateStore> Execute for VectorIndexWriteExecutor<S> {
41 fn execute(self: Box<Self>) -> BoxedMessageStream {
42 Box::pin(self.execute_inner())
43 }
44}
45
46impl<S: StateStore> VectorIndexWriteExecutor<S> {
47 pub async fn new(input: Executor, store: S, table_id: TableId) -> StreamExecutorResult<Self> {
48 let vector_writer = store
49 .new_vector_writer(NewVectorWriterOptions { table_id })
50 .await;
51 Ok(Self {
52 input,
53 vector_writer,
54 serializer: BasicSerializer,
55 })
56 }
57
58 #[try_stream(ok = Message, error = StreamExecutorError)]
59 pub async fn execute_inner(mut self) {
60 let info_column_indices = (1..self.input.schema().len()).collect_vec();
61 let mut input = self.input.execute();
62
63 let barrier = expect_first_barrier(&mut input).await?;
64 let first_epoch = barrier.epoch;
65 yield Message::Barrier(barrier);
66 self.vector_writer
67 .init(InitOptions { epoch: first_epoch })
68 .await?;
69
70 while let Some(msg) = input.try_next().await? {
71 match msg {
72 Message::Barrier(barrier) => {
73 self.vector_writer.flush().await?;
74 self.vector_writer.seal_current_epoch(
75 barrier.epoch.curr,
76 SealCurrentEpochOptions {
77 table_watermarks: None,
78 switch_op_consistency_level: None,
79 },
80 );
81 yield Message::Barrier(barrier);
82 }
83 Message::Chunk(chunk) => {
84 for (op, row) in chunk.rows() {
85 if op != Op::Insert {
86 return Err(anyhow!(
87 "should be append-only for vector index writer but receive op {:?}",
88 op
89 )
90 .into());
91 }
92 let vector_datum = row.datum_at(0);
93 let Some(vector_datum) = vector_datum else {
94 warn!(
95 ?row,
96 "vector index writer received a row with null vector datum, skipping"
97 );
98 continue;
99 };
100 let vector = vector_datum.into_vector();
101 let info = self
102 .serializer
103 .serialize(row.project(&info_column_indices))
104 .into();
105 self.vector_writer.insert(vector, info)?;
106 }
107 self.vector_writer.try_flush().await?;
108 yield Message::Chunk(chunk);
109 }
110 Message::Watermark(watermark) => {
111 yield Message::Watermark(watermark);
112 }
113 }
114 }
115 }
116}