risingwave_frontend/optimizer/plan_node/
stream_vector_index_write.rs1use std::num::NonZeroU32;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_common::catalog::{
20 ColumnCatalog, ConflictBehavior, CreateType, Engine, StreamJobStatus, TableId,
21};
22use risingwave_common::hash::VnodeCount;
23use risingwave_common::id::FragmentId;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_pb::catalog::PbVectorIndexInfo;
26use risingwave_pb::stream_plan::stream_node::PbNodeBody;
27
28use crate::TableCatalog;
29use crate::catalog::table_catalog::TableType;
30use crate::catalog::{DatabaseId, SchemaId};
31use crate::error::ErrorCode;
32use crate::optimizer::StreamOptimizedLogicalPlanRoot;
33use crate::optimizer::plan_node::derive::{derive_columns, derive_pk};
34use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
35use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
36use crate::optimizer::plan_node::stream::StreamPlanNodeMetadata;
37use crate::optimizer::plan_node::utils::{Distill, childless_record, plan_can_use_background_ddl};
38use crate::optimizer::plan_node::{
39 ExprRewritable, PlanBase, PlanNodeMeta, PlanTreeNodeUnary, Stream, StreamNode,
40 StreamPlanRef as PlanRef, reorganize_elements_id,
41};
42use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist, StreamKind};
43use crate::stream_fragmenter::BuildFragmentGraphState;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
46pub struct StreamVectorIndexWrite {
47 pub base: PlanBase<Stream>,
48 input: PlanRef,
50 table: TableCatalog,
51}
52
53impl StreamVectorIndexWrite {
54 fn new(input: PlanRef, table: TableCatalog) -> crate::error::Result<Self> {
55 if input.stream_kind() != StreamKind::AppendOnly {
56 return Err(ErrorCode::NotSupported(
57 "cannot create vector index on non-append-only workflow".to_owned(),
58 "try create vector index on append only workflow".to_owned(),
59 )
60 .into());
61 }
62 let base = PlanBase::new_stream(
63 input.ctx(),
64 input.schema().clone(),
65 Some(table.stream_key()),
66 input.functional_dependency().clone(),
67 input.distribution().clone(),
68 StreamKind::AppendOnly,
69 input.emit_on_window_close(),
70 input.watermark_columns().clone(),
71 input.columns_monotonicity().clone(),
72 );
73 Ok(Self { base, input, table })
74 }
75
76 pub fn create(
77 StreamOptimizedLogicalPlanRoot {
78 plan: input,
79 required_dist: user_distributed_by,
80 required_order: user_order_by,
81 out_fields: user_cols,
82 out_names,
83 ..
84 }: StreamOptimizedLogicalPlanRoot,
85 name: String,
86 database_id: DatabaseId,
87 schema_id: SchemaId,
88 definition: String,
89 cardinality: Cardinality,
90 retention_seconds: Option<NonZeroU32>,
91 vector_index_info: PbVectorIndexInfo,
92 ) -> crate::error::Result<Self> {
93 let input = RequiredDist::PhysicalDist(Distribution::Single)
94 .streaming_enforce_if_not_satisfies(input)?;
95 let input = reorganize_elements_id(input);
97 let columns = derive_columns(input.schema(), out_names, &user_cols)?;
98
99 let create_type = if input.ctx().session_ctx().config().background_ddl()
100 && plan_can_use_background_ddl(&input)
101 {
102 CreateType::Background
103 } else {
104 CreateType::Foreground
105 };
106
107 let table = Self::derive_table_catalog(
108 input.clone(),
109 name,
110 database_id,
111 schema_id,
112 user_distributed_by,
113 user_order_by,
114 columns,
115 definition,
116 cardinality,
117 retention_seconds,
118 create_type,
119 vector_index_info,
120 )?;
121
122 Self::new(input, table)
123 }
124
125 #[expect(clippy::too_many_arguments)]
126 fn derive_table_catalog(
127 rewritten_input: PlanRef,
128 name: String,
129 database_id: DatabaseId,
130 schema_id: SchemaId,
131 user_distributed_by: RequiredDist,
132 user_order_by: Order,
133 columns: Vec<ColumnCatalog>,
134 definition: String,
135 cardinality: Cardinality,
136 retention_seconds: Option<NonZeroU32>,
137 create_type: CreateType,
138 vector_index_info: PbVectorIndexInfo,
139 ) -> crate::error::Result<TableCatalog> {
140 let input = rewritten_input;
141
142 let value_indices = (0..columns.len()).collect_vec();
143 assert_eq!(input.distribution(), &Distribution::Single);
144 let distribution_key = vec![];
145 let append_only = input.append_only();
146 let watermark_columns = input.watermark_columns().indices().collect();
149
150 let (table_pk, stream_key) = derive_pk(input, user_distributed_by, user_order_by, &columns);
151 let read_prefix_len_hint = table_pk.len();
154 Ok(TableCatalog {
157 id: TableId::placeholder(),
158 schema_id,
159 database_id,
160 associated_source_id: None,
161 name,
162 columns,
163 pk: table_pk,
164 stream_key,
165 distribution_key,
166 table_type: TableType::VectorIndex,
167 append_only,
168 owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
169 fragment_id: FragmentId::placeholder(),
170 dml_fragment_id: None,
171 vnode_col_index: None,
172 row_id_index: None,
173 value_indices,
174 definition,
175 conflict_behavior: ConflictBehavior::NoCheck,
176 version_column_indices: vec![],
177 read_prefix_len_hint,
178 version: None,
179 watermark_columns,
180 dist_key_in_pk: vec![],
181 cardinality,
182 created_at_epoch: None,
183 initialized_at_epoch: None,
184 cleaned_by_watermark: false,
185 create_type,
186 stream_job_status: StreamJobStatus::Creating,
187 description: None,
188 initialized_at_cluster_version: None,
189 created_at_cluster_version: None,
190 retention_seconds: retention_seconds.map(|i| i.into()),
191 cdc_table_id: None,
192 vnode_count: VnodeCount::Singleton, webhook_info: None,
194 job_id: None,
195 engine: Engine::Hummock,
196 clean_watermark_index_in_pk: None,
197 refreshable: false,
198 vector_index_info: Some(vector_index_info),
199 cdc_table_type: None,
200 })
201 }
202
203 pub(crate) fn table(&self) -> &TableCatalog {
204 &self.table
205 }
206}
207
208impl Distill for StreamVectorIndexWrite {
209 fn distill<'a>(&self) -> XmlNode<'a> {
210 let table = self.table();
211
212 let vector_column_name = Pretty::from(table.columns[0].name().to_owned());
213
214 let column_names = (table.columns.iter())
215 .map(|col| col.name_with_hidden().to_string())
216 .map(Pretty::from)
217 .collect();
218
219 let stream_key = (table.stream_key().iter())
220 .map(|&k| table.columns[k].name().to_owned())
221 .map(Pretty::from)
222 .collect();
223
224 let vec = vec![
225 ("vector_column", vector_column_name),
226 ("columns", Pretty::Array(column_names)),
227 ("stream_key", Pretty::Array(stream_key)),
228 ];
229
230 childless_record("StreamVectorIndexWrite", vec)
231 }
232}
233
234impl PlanTreeNodeUnary<Stream> for StreamVectorIndexWrite {
235 fn input(&self) -> PlanRef {
236 self.input.clone()
237 }
238
239 fn clone_with_input(&self, input: PlanRef) -> Self {
240 let new = Self::new(input, self.table().clone()).unwrap();
241 new.base
242 .schema()
243 .fields
244 .iter()
245 .zip_eq_fast(self.base.schema().fields.iter())
246 .for_each(|(a, b)| {
247 assert_eq!(a.data_type, b.data_type);
248 });
249 assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
250 new
251 }
252}
253
254impl_plan_tree_node_for_unary! { Stream, StreamVectorIndexWrite }
255
256impl StreamNode for StreamVectorIndexWrite {
257 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
258 use risingwave_pb::stream_plan::*;
259
260 let table = self.table();
261 PbNodeBody::VectorIndexWrite(Box::new(VectorIndexWriteNode {
262 table: Some(table.to_prost()),
263 }))
264 }
265}
266
267impl ExprRewritable<Stream> for StreamVectorIndexWrite {}
268
269impl ExprVisitable for StreamVectorIndexWrite {}