risingwave_stream/from_proto/
vector_index_write.rs1use risingwave_common::bail;
16use risingwave_common::types::DataType;
17use risingwave_pb::stream_plan::VectorIndexWriteNode;
18use risingwave_storage::StateStore;
19
20use crate::error::StreamResult;
21use crate::executor::{Executor, VectorIndexWriteExecutor};
22use crate::from_proto::ExecutorBuilder;
23use crate::task::ExecutorParams;
24
25pub struct VectorIndexWriteExecutorBuilder;
26
27impl_stream_node_body!(VectorIndexWrite(VectorIndexWriteNode) => VectorIndexWriteExecutorBuilder);
28
29impl ExecutorBuilder for VectorIndexWriteExecutorBuilder {
30 type Node = VectorIndexWriteNode;
31
32 async fn new_boxed_executor(
33 params: ExecutorParams,
34 node: &Self::Node,
35 store: impl StateStore,
36 ) -> StreamResult<Executor> {
37 let [input]: [_; 1] = params.input.try_into().unwrap();
38
39 let table = node.table.as_ref().unwrap();
40 assert_eq!(table.columns.len(), input.schema().len());
41 let index_col_type = DataType::from(
42 table.columns[0]
43 .column_desc
44 .as_ref()
45 .unwrap()
46 .column_type
47 .as_ref()
48 .unwrap(),
49 );
50 let DataType::Vector(dimension) = &index_col_type else {
51 bail!("expect vector column but got: {:?}", index_col_type)
52 };
53 let DataType::Vector(input_dimension) = &input.schema().fields[0].data_type else {
54 bail!(
55 "expect first input vector column but got: {:?}",
56 index_col_type
57 )
58 };
59 assert_eq!(dimension, input_dimension);
60
61 let executor = VectorIndexWriteExecutor::new(input, store, table.id).await?;
62 Ok(Executor::new(params.info, Box::new(executor)))
63 }
64}