risingwave_stream/from_proto/
vector_index.rs1use risingwave_common::bail;
16use risingwave_common::types::DataType;
17use risingwave_pb::stream_plan::VectorIndexWriteNode;
18use risingwave_storage::StateStore;
19
20use crate::error::StreamResult;
21use crate::executor::{Executor, VectorIndexWriteExecutor};
22use crate::from_proto::ExecutorBuilder;
23use crate::task::ExecutorParams;
24
25pub struct VectorIndexWriteExecutorBuilder;
26
27impl ExecutorBuilder for VectorIndexWriteExecutorBuilder {
28 type Node = VectorIndexWriteNode;
29
30 async fn new_boxed_executor(
31 params: ExecutorParams,
32 node: &Self::Node,
33 store: impl StateStore,
34 ) -> StreamResult<Executor> {
35 let [input]: [_; 1] = params.input.try_into().unwrap();
36
37 let table = node.table.as_ref().unwrap();
38 assert_eq!(table.columns.len(), input.schema().len());
39 let index_col_type = DataType::from(
40 table.columns[0]
41 .column_desc
42 .as_ref()
43 .unwrap()
44 .column_type
45 .as_ref()
46 .unwrap(),
47 );
48 let DataType::Vector(dimension) = &index_col_type else {
49 bail!("expect vector column but got: {:?}", index_col_type)
50 };
51 let DataType::Vector(input_dimension) = &input.schema().fields[0].data_type else {
52 bail!(
53 "expect first input vector column but got: {:?}",
54 index_col_type
55 )
56 };
57 assert_eq!(dimension, input_dimension);
58
59 let executor = VectorIndexWriteExecutor::new(input, store, table.id.into()).await?;
60 Ok(Executor::new(params.info, Box::new(executor)))
61 }
62}