risingwave_stream/executor/lookup/
impl_.rs1use std::ops::Bound;
16
17use futures::TryStreamExt;
18use itertools::Itertools;
19use risingwave_common::catalog::ColumnDesc;
20use risingwave_common::row::RowExt;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_common_estimate_size::collections::EstimatedVec;
24use risingwave_storage::row_serde::value_serde::ValueRowSerde;
25use risingwave_storage::store::PrefetchOptions;
26
27use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::ReplicatedStateTable;
30use crate::executor::join::builder::JoinStreamChunkBuilder;
31use crate::executor::lookup::LookupExecutor;
32use crate::executor::lookup::cache::LookupCache;
33use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide};
34use crate::executor::monitor::LookupExecutorMetrics;
35use crate::executor::prelude::*;
36
37pub struct LookupExecutorParams<S: StateStore, SD: ValueRowSerde> {
39 pub ctx: ActorContextRef,
40 pub info: ExecutorInfo,
41
42 pub arrangement: Executor,
45
46 pub stream: Executor,
49
50 pub arrangement_col_descs: Vec<ColumnDesc>,
57
58 pub arrangement_order_rules: Vec<ColumnOrder>,
70
71 pub column_mapping: Vec<usize>,
75
76 pub use_current_epoch: bool,
85
86 pub stream_join_key_indices: Vec<usize>,
88
89 pub arrange_join_key_indices: Vec<usize>,
91
92 pub state_table: ReplicatedStateTable<S, SD>,
93
94 pub watermark_epoch: AtomicU64Ref,
95
96 pub chunk_size: usize,
97}
98
99impl<S: StateStore, SD: ValueRowSerde> LookupExecutor<S, SD> {
100 pub fn new(params: LookupExecutorParams<S, SD>) -> Self {
101 let LookupExecutorParams {
102 ctx,
103 info,
104 arrangement,
105 stream,
106 arrangement_col_descs,
107 arrangement_order_rules,
108 use_current_epoch,
109 stream_join_key_indices,
110 arrange_join_key_indices,
111 column_mapping,
112 state_table: storage_table,
113 watermark_epoch,
114 chunk_size,
115 } = params;
116
117 let output_column_length = stream.schema().len() + arrangement.schema().len();
118
119 let schema_fields = stream
123 .schema()
124 .fields
125 .iter()
126 .chain(arrangement.schema().fields.iter())
127 .cloned()
128 .collect_vec();
129
130 assert_eq!(schema_fields.len(), output_column_length);
131
132 let schema = Schema::new(schema_fields);
133
134 let chunk_data_types = schema.data_types();
135 let arrangement_data_types = arrangement.schema().data_types();
136 let stream_data_types = stream.schema().data_types();
137
138 let arrangement_pk_indices = arrangement.stream_key().to_vec();
139 let stream_pk_indices = stream.stream_key().to_vec();
140
141 {
143 let mut arrange_join_key_indices = arrange_join_key_indices.clone();
144 arrange_join_key_indices.sort_unstable();
145 let mut arrangement_order_types_indices = arrangement_order_rules
146 .iter()
147 .map(|x| x.column_index)
148 .collect_vec();
149 arrangement_order_types_indices.sort_unstable();
150 assert_eq!(
151 arrange_join_key_indices,
152 &arrangement_order_types_indices[0..arrange_join_key_indices.len()],
153 "invalid join key: arrange_join_key_indices = {:?}, order_rules: {:?}",
154 arrange_join_key_indices,
155 arrangement_order_rules
156 );
157 }
158
159 assert_eq!(
161 stream_join_key_indices.len(),
162 arrange_join_key_indices.len()
163 );
164
165 let key_indices_mapping = arrangement_order_rules
167 .iter()
168 .map(|x| x.column_index) .filter_map(|x| arrange_join_key_indices.iter().position(|y| *y == x)) .map(|x| stream_join_key_indices[x]) .collect_vec();
172
173 assert_eq!(
175 info.schema
176 .fields
177 .iter()
178 .map(|x| x.data_type())
179 .collect_vec(),
180 column_mapping
181 .iter()
182 .map(|x| schema.fields[*x].data_type())
183 .collect_vec(),
184 "mismatched output schema"
185 );
186
187 let metrics_info = MetricsInfo::new(
188 ctx.streaming_metrics.clone(),
189 storage_table.table_id(),
190 ctx.id,
191 "Lookup",
192 );
193
194 Self {
195 ctx,
196 chunk_data_types,
197 stream_executor: Some(stream),
198 arrangement_executor: Some(arrangement),
199 stream: StreamJoinSide {
200 key_indices: stream_join_key_indices,
201 pk_indices: stream_pk_indices,
202 col_types: stream_data_types,
203 },
204 arrangement: ArrangeJoinSide {
205 pk_indices: arrangement_pk_indices,
206 col_types: arrangement_data_types,
207 col_descs: arrangement_col_descs,
208 order_rules: arrangement_order_rules,
209 key_indices: arrange_join_key_indices,
210 use_current_epoch,
211 state_table: storage_table,
212 },
213 column_mapping,
214 key_indices_mapping,
215 lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
216 chunk_size,
217 }
218 }
219
220 #[try_stream(ok = Message, error = StreamExecutorError)]
226 pub async fn execute_inner(mut self: Box<Self>) {
227 let mut stream_input = self.stream_executor.take().unwrap().execute();
228 let mut arrangement_input = self.arrangement_executor.take().unwrap().execute();
229
230 let first_barrier = expect_first_barrier(&mut stream_input).await?;
231 let arrangement_first_barrier = expect_first_barrier(&mut arrangement_input).await?;
232 if first_barrier.epoch != arrangement_first_barrier.epoch {
233 return Err(StreamExecutorError::align_barrier(
234 first_barrier.clone(),
235 arrangement_first_barrier,
236 ));
237 }
238
239 let input = if self.arrangement.use_current_epoch {
240 stream_lookup_arrange_this_epoch(stream_input, arrangement_input).boxed()
241 } else {
242 stream_lookup_arrange_prev_epoch(stream_input, arrangement_input).boxed()
243 };
244
245 let metrics = self.ctx.streaming_metrics.new_lookup_executor_metrics(
246 self.arrangement.state_table.table_id(),
247 self.ctx.id,
248 self.ctx.fragment_id,
249 );
250
251 let (stream_to_output, arrange_to_output) = JoinStreamChunkBuilder::get_i2o_mapping(
252 &self.column_mapping,
253 self.stream.col_types.len(),
254 self.arrangement.col_types.len(),
255 );
256
257 let reorder_chunk_data_types = self
258 .column_mapping
259 .iter()
260 .map(|x| self.chunk_data_types[*x].clone())
261 .collect_vec();
262
263 yield Message::Barrier(first_barrier.clone());
267 self.arrangement
268 .state_table
269 .init_epoch(first_barrier.epoch)
270 .await?;
271
272 #[for_await]
273 for msg in input {
274 let msg = msg?;
275 self.lookup_cache.evict();
276 match msg {
277 ArrangeMessage::Barrier(barrier) => {
278 barrier.assume_no_update_vnode_bitmap(self.ctx.id)?;
279 self.arrangement
280 .state_table
281 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
282 .await?;
283
284 if self.arrangement.use_current_epoch {
285 yield Message::Barrier(barrier.clone());
288 }
289 }
290 ArrangeMessage::ArrangeReady(arrangement_chunks, barrier) => {
291 for chunk in &arrangement_chunks {
295 self.arrangement.state_table.write_chunk(chunk.clone());
296 }
297
298 for chunk in arrangement_chunks {
299 self.lookup_cache
300 .apply_batch(chunk, &self.arrangement.key_indices)
301 }
302
303 if !self.arrangement.use_current_epoch {
304 yield Message::Barrier(barrier);
307 }
308 }
309 ArrangeMessage::Stream(chunk) => {
310 let chunk = chunk.compact_vis();
311 let (chunk, ops) = chunk.into_parts();
312
313 let mut builder = JoinStreamChunkBuilder::new(
314 self.chunk_size,
315 reorder_chunk_data_types.clone(),
316 stream_to_output.clone(),
317 arrange_to_output.clone(),
318 );
319
320 for (op, row) in ops.iter().zip_eq_debug(chunk.rows()) {
321 for matched_row in self.lookup_one_row(&row, &metrics).await? {
322 tracing::debug!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row);
323
324 if let Some(chunk) = builder.append_row(*op, row, &matched_row) {
325 yield Message::Chunk(chunk);
326 }
327 }
328 }
330
331 if let Some(chunk) = builder.take() {
332 yield Message::Chunk(chunk);
333 }
334 }
335 }
336 }
337 }
338
339 async fn lookup_one_row(
341 &mut self,
342 stream_row: &RowRef<'_>,
343 metrics: &LookupExecutorMetrics,
344 ) -> StreamExecutorResult<Vec<OwnedRow>> {
345 let lookup_row = stream_row
348 .project(&self.key_indices_mapping)
349 .into_owned_row();
350
351 metrics.lookup_total_query_cache_count.inc();
352 if let Some(result) = self.lookup_cache.lookup(&lookup_row) {
353 return Ok(result.iter().cloned().collect_vec());
354 }
355
356 metrics.lookup_cache_miss_count.inc();
358
359 tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
360
361 let mut all_rows = EstimatedVec::new();
362 {
363 let all_data_iter = self
364 .arrangement
365 .state_table
366 .iter_with_prefix(
367 &lookup_row,
368 &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
369 PrefetchOptions::default(),
370 )
371 .await?;
372
373 pin_mut!(all_data_iter);
374 while let Some(row) = all_data_iter.try_next().await? {
375 all_rows.push(row);
376 }
377 }
378
379 tracing::debug!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner());
380
381 self.lookup_cache.batch_update(lookup_row, all_rows.clone());
382
383 metrics
384 .lookup_cached_entry_count
385 .set(self.lookup_cache.len() as i64);
386
387 Ok(all_rows.into_inner())
388 }
389}