risingwave_stream::executor::lookup::sides

Function stream_lookup_arrange_prev_epoch

source
pub fn stream_lookup_arrange_prev_epoch(
    stream: Executor,
    arrangement: Executor,
) -> impl Stream<Item = Result<ArrangeMessage, StreamExecutorError>>
Expand description

Join the stream with the previous stable snapshot of the arrangement.

For example, the executor will receive the following message sequence from stream_lookup_arrange_prev_epoch:

  • [Msg] Barrier (prev = [1], current = [2])
  • [Msg] Stream (key = a)
  • [Do] lookup a in arrangement of epoch [1] (prev epoch)
  • [Msg] Arrangement (batch)
  • [Msg] Stream (key = b)
  • [Do] lookup b in arrangement of epoch [1] (prev epoch)
  • [Do] update cache with epoch [2]
  • Barrier (prev = [2], current = [3])
  • [Msg] Arrangement (batch)