risingwave_stream/executor/lookup/
impl_.rs1use itertools::Itertools;
16use risingwave_common::catalog::ColumnDesc;
17use risingwave_common::row::RowExt;
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_common::util::iter_util::ZipEqDebug;
20use risingwave_common::util::sort_util::ColumnOrder;
21use risingwave_common_estimate_size::collections::EstimatedVec;
22use risingwave_hummock_sdk::HummockReadEpoch;
23use risingwave_storage::store::PrefetchOptions;
24use risingwave_storage::table::TableIter;
25use risingwave_storage::table::batch_table::BatchTable;
26
27use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
28use crate::cache::cache_may_stale;
29use crate::common::metrics::MetricsInfo;
30use crate::executor::join::builder::JoinStreamChunkBuilder;
31use crate::executor::lookup::LookupExecutor;
32use crate::executor::lookup::cache::LookupCache;
33use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide};
34use crate::executor::monitor::LookupExecutorMetrics;
35use crate::executor::prelude::*;
36
37pub struct LookupExecutorParams<S: StateStore> {
39 pub ctx: ActorContextRef,
40 pub info: ExecutorInfo,
41
42 pub arrangement: Executor,
45
46 pub stream: Executor,
49
50 pub arrangement_col_descs: Vec<ColumnDesc>,
57
58 pub arrangement_order_rules: Vec<ColumnOrder>,
70
71 pub column_mapping: Vec<usize>,
75
76 pub use_current_epoch: bool,
85
86 pub stream_join_key_indices: Vec<usize>,
88
89 pub arrange_join_key_indices: Vec<usize>,
91
92 pub batch_table: BatchTable<S>,
93
94 pub watermark_epoch: AtomicU64Ref,
95
96 pub chunk_size: usize,
97}
98
99impl<S: StateStore> LookupExecutor<S> {
100 pub fn new(params: LookupExecutorParams<S>) -> Self {
101 let LookupExecutorParams {
102 ctx,
103 info,
104 arrangement,
105 stream,
106 arrangement_col_descs,
107 arrangement_order_rules,
108 use_current_epoch,
109 stream_join_key_indices,
110 arrange_join_key_indices,
111 column_mapping,
112 batch_table: storage_table,
113 watermark_epoch,
114 chunk_size,
115 } = params;
116
117 let output_column_length = stream.schema().len() + arrangement.schema().len();
118
119 let schema_fields = stream
123 .schema()
124 .fields
125 .iter()
126 .chain(arrangement.schema().fields.iter())
127 .cloned()
128 .collect_vec();
129
130 assert_eq!(schema_fields.len(), output_column_length);
131
132 let schema = Schema::new(schema_fields);
133
134 let chunk_data_types = schema.data_types();
135 let arrangement_data_types = arrangement.schema().data_types();
136 let stream_data_types = stream.schema().data_types();
137
138 let arrangement_pk_indices = arrangement.pk_indices().to_vec();
139 let stream_pk_indices = stream.pk_indices().to_vec();
140
141 {
143 let mut arrange_join_key_indices = arrange_join_key_indices.clone();
144 arrange_join_key_indices.sort_unstable();
145 let mut arrangement_order_types_indices = arrangement_order_rules
146 .iter()
147 .map(|x| x.column_index)
148 .collect_vec();
149 arrangement_order_types_indices.sort_unstable();
150 assert_eq!(
151 arrange_join_key_indices,
152 &arrangement_order_types_indices[0..arrange_join_key_indices.len()],
153 "invalid join key: arrange_join_key_indices = {:?}, order_rules: {:?}",
154 arrange_join_key_indices,
155 arrangement_order_rules
156 );
157 }
158
159 assert_eq!(
161 stream_join_key_indices.len(),
162 arrange_join_key_indices.len()
163 );
164
165 let key_indices_mapping = arrangement_order_rules
167 .iter()
168 .map(|x| x.column_index) .filter_map(|x| arrange_join_key_indices.iter().position(|y| *y == x)) .map(|x| stream_join_key_indices[x]) .collect_vec();
172
173 assert_eq!(
175 info.schema
176 .fields
177 .iter()
178 .map(|x| x.data_type())
179 .collect_vec(),
180 column_mapping
181 .iter()
182 .map(|x| schema.fields[*x].data_type())
183 .collect_vec(),
184 "mismatched output schema"
185 );
186
187 let metrics_info = MetricsInfo::new(
188 ctx.streaming_metrics.clone(),
189 storage_table.table_id().table_id(),
190 ctx.id,
191 "Lookup",
192 );
193
194 Self {
195 ctx,
196 chunk_data_types,
197 last_barrier: None,
198 stream_executor: Some(stream),
199 arrangement_executor: Some(arrangement),
200 stream: StreamJoinSide {
201 key_indices: stream_join_key_indices,
202 pk_indices: stream_pk_indices,
203 col_types: stream_data_types,
204 },
205 arrangement: ArrangeJoinSide {
206 pk_indices: arrangement_pk_indices,
207 col_types: arrangement_data_types,
208 col_descs: arrangement_col_descs,
209 order_rules: arrangement_order_rules,
210 key_indices: arrange_join_key_indices,
211 use_current_epoch,
212 batch_table: storage_table,
213 },
214 column_mapping,
215 key_indices_mapping,
216 lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
217 chunk_size,
218 }
219 }
220
221 #[try_stream(ok = Message, error = StreamExecutorError)]
227 pub async fn execute_inner(mut self: Box<Self>) {
228 let input = if self.arrangement.use_current_epoch {
229 stream_lookup_arrange_this_epoch(
230 self.stream_executor.take().unwrap(),
231 self.arrangement_executor.take().unwrap(),
232 )
233 .boxed()
234 } else {
235 stream_lookup_arrange_prev_epoch(
236 self.stream_executor.take().unwrap(),
237 self.arrangement_executor.take().unwrap(),
238 )
239 .boxed()
240 };
241
242 let metrics = self.ctx.streaming_metrics.new_lookup_executor_metrics(
243 self.arrangement.batch_table.table_id(),
244 self.ctx.id,
245 self.ctx.fragment_id,
246 );
247
248 let (stream_to_output, arrange_to_output) = JoinStreamChunkBuilder::get_i2o_mapping(
249 &self.column_mapping,
250 self.stream.col_types.len(),
251 self.arrangement.col_types.len(),
252 );
253
254 let reorder_chunk_data_types = self
255 .column_mapping
256 .iter()
257 .map(|x| self.chunk_data_types[*x].clone())
258 .collect_vec();
259
260 #[for_await]
261 for msg in input {
262 let msg = msg?;
263 self.lookup_cache.evict();
264 match msg {
265 ArrangeMessage::Barrier(barrier) => {
266 self.process_barrier(&barrier);
267
268 if self.arrangement.use_current_epoch {
269 yield Message::Barrier(barrier);
272 }
273 }
274 ArrangeMessage::ArrangeReady(arrangement_chunks, barrier) => {
275 for chunk in arrangement_chunks {
280 self.lookup_cache
281 .apply_batch(chunk, &self.arrangement.key_indices)
282 }
283
284 if !self.arrangement.use_current_epoch {
285 yield Message::Barrier(barrier);
288 }
289 }
290 ArrangeMessage::Stream(chunk) => {
291 let chunk = chunk.compact();
292 let (chunk, ops) = chunk.into_parts();
293
294 let mut builder = JoinStreamChunkBuilder::new(
295 self.chunk_size,
296 reorder_chunk_data_types.clone(),
297 stream_to_output.clone(),
298 arrange_to_output.clone(),
299 );
300
301 for (op, row) in ops.iter().zip_eq_debug(chunk.rows()) {
302 for matched_row in self
303 .lookup_one_row(
304 &row,
305 self.last_barrier.as_ref().unwrap().epoch,
306 &metrics,
307 )
308 .await?
309 {
310 tracing::debug!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row);
311
312 if let Some(chunk) = builder.append_row(*op, row, &matched_row) {
313 yield Message::Chunk(chunk);
314 }
315 }
316 }
318
319 if let Some(chunk) = builder.take() {
320 yield Message::Chunk(chunk);
321 }
322 }
323 }
324 }
325 }
326
327 fn process_barrier(&mut self, barrier: &Barrier) {
329 if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
330 let previous_vnode_bitmap = self
331 .arrangement
332 .batch_table
333 .update_vnode_bitmap(vnode_bitmap.clone());
334
335 if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
337 self.lookup_cache.clear();
338 }
339 }
340
341 self.last_barrier = Some(barrier.clone());
343 }
344
345 async fn lookup_one_row(
347 &mut self,
348 stream_row: &RowRef<'_>,
349 epoch_pair: EpochPair,
350 metrics: &LookupExecutorMetrics,
351 ) -> StreamExecutorResult<Vec<OwnedRow>> {
352 let lookup_row = stream_row
355 .project(&self.key_indices_mapping)
356 .into_owned_row();
357
358 metrics.lookup_total_query_cache_count.inc();
359 if let Some(result) = self.lookup_cache.lookup(&lookup_row) {
360 return Ok(result.iter().cloned().collect_vec());
361 }
362
363 metrics.lookup_cache_miss_count.inc();
365
366 tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
367
368 let mut all_rows = EstimatedVec::new();
369 {
371 let all_data_iter = match self.arrangement.use_current_epoch {
372 true => {
373 self.arrangement
374 .batch_table
375 .batch_iter_with_pk_bounds(
376 HummockReadEpoch::NoWait(epoch_pair.curr),
377 &lookup_row,
378 ..,
379 false,
380 PrefetchOptions::default(),
381 )
382 .await?
383 }
384 false => {
385 self.arrangement
386 .batch_table
387 .batch_iter_with_pk_bounds(
388 HummockReadEpoch::NoWait(epoch_pair.prev),
389 &lookup_row,
390 ..,
391 false,
392 PrefetchOptions::default(),
393 )
394 .await?
395 }
396 };
397
398 pin_mut!(all_data_iter);
399 while let Some(row) = all_data_iter.next_row().await? {
400 all_rows.push(row);
402 }
403 }
404
405 tracing::debug!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner());
406
407 self.lookup_cache.batch_update(lookup_row, all_rows.clone());
408
409 metrics
410 .lookup_cached_entry_count
411 .set(self.lookup_cache.len() as i64);
412
413 Ok(all_rows.into_inner())
414 }
415}