fn new_stream_simple_agg( core: Agg<StreamPlanRef>, must_output_per_barrier: bool, ) -> Result<StreamSimpleAgg>