risingwave_pb/
stream_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5    #[prost(string, tag = "1")]
6    pub request_id: ::prost::alloc::string::String,
7    #[prost(message, optional, tag = "2")]
8    pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9    #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
10    pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
11    #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
12    pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
13    #[prost(uint64, tag = "6", wrapper = "crate::id::PartialGraphId")]
14    pub partial_graph_id: crate::id::PartialGraphId,
15    #[prost(message, repeated, tag = "9")]
16    pub actors_to_build: ::prost::alloc::vec::Vec<
17        inject_barrier_request::FragmentBuildActorInfo,
18    >,
19}
20/// Nested message and enum types in `InjectBarrierRequest`.
21pub mod inject_barrier_request {
22    #[derive(prost_helpers::AnyPB)]
23    #[derive(Clone, PartialEq, ::prost::Message)]
24    pub struct FragmentBuildActorInfo {
25        #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
26        pub fragment_id: crate::id::FragmentId,
27        #[prost(message, optional, tag = "2")]
28        pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
29        #[prost(message, repeated, tag = "3")]
30        pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
31    }
32    #[derive(prost_helpers::AnyPB)]
33    #[derive(Clone, PartialEq, ::prost::Message)]
34    pub struct BuildActorInfo {
35        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
36        pub actor_id: crate::id::ActorId,
37        #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
38        pub fragment_upstreams: ::std::collections::HashMap<
39            crate::id::FragmentId,
40            build_actor_info::UpstreamActors,
41        >,
42        #[prost(message, repeated, tag = "3")]
43        pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
44        #[prost(message, optional, tag = "4")]
45        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
46        #[prost(string, tag = "5")]
47        pub mview_definition: ::prost::alloc::string::String,
48        #[prost(message, optional, tag = "6")]
49        pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
50        #[prost(string, tag = "9")]
51        pub config_override: ::prost::alloc::string::String,
52        #[prost(uint32, repeated, tag = "7", wrapper = "crate::id::SubscriberId")]
53        pub initial_subscriber_ids: ::prost::alloc::vec::Vec<crate::id::SubscriberId>,
54    }
55    /// Nested message and enum types in `BuildActorInfo`.
56    pub mod build_actor_info {
57        #[derive(prost_helpers::AnyPB)]
58        #[derive(Clone, PartialEq, ::prost::Message)]
59        pub struct UpstreamActors {
60            #[prost(message, repeated, tag = "1")]
61            pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62        }
63    }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68    #[prost(string, tag = "1")]
69    pub request_id: ::prost::alloc::string::String,
70    #[prost(message, optional, tag = "2")]
71    pub status: ::core::option::Option<super::common::Status>,
72    #[prost(message, repeated, tag = "3")]
73    pub create_mview_progress: ::prost::alloc::vec::Vec<
74        barrier_complete_response::CreateMviewProgress,
75    >,
76    #[prost(message, repeated, tag = "4")]
77    pub synced_sstables: ::prost::alloc::vec::Vec<
78        barrier_complete_response::LocalSstableInfo,
79    >,
80    #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
81    pub worker_id: crate::id::WorkerId,
82    #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83    pub table_watermarks: ::std::collections::HashMap<
84        crate::id::TableId,
85        super::hummock::TableWatermarks,
86    >,
87    #[prost(message, repeated, tag = "7")]
88    pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89    #[prost(uint64, tag = "8", wrapper = "crate::id::PartialGraphId")]
90    pub partial_graph_id: crate::id::PartialGraphId,
91    /// prev_epoch of barrier
92    #[prost(uint64, tag = "9")]
93    pub epoch: u64,
94    #[prost(message, repeated, tag = "11")]
95    pub load_finished_sources: ::prost::alloc::vec::Vec<
96        barrier_complete_response::LoadFinishedSource,
97    >,
98    #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
99    pub vector_index_adds: ::std::collections::HashMap<
100        crate::id::TableId,
101        super::hummock::vector_index_delta::VectorIndexAdds,
102    >,
103    #[prost(message, repeated, tag = "13")]
104    pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
105        barrier_complete_response::CdcTableBackfillProgress,
106    >,
107    /// Used for truncating tables in storage layer.
108    /// MaterializeExecutor reports the tables to truncate, and then
109    /// meta will apply truncate operation in next commit epoch.
110    #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
111    pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
112    /// Used for reporting materialized view refresh completion.
113    /// MaterializeExecutor reports when refresh has finished, and then
114    /// meta will update the table's refresh state to Finished.
115    #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
116    pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
117    #[prost(message, repeated, tag = "16")]
118    pub list_finished_sources: ::prost::alloc::vec::Vec<
119        barrier_complete_response::ListFinishedSource,
120    >,
121    #[prost(message, repeated, tag = "17")]
122    pub cdc_source_offset_updated: ::prost::alloc::vec::Vec<
123        barrier_complete_response::CdcSourceOffsetUpdated,
124    >,
125}
126/// Nested message and enum types in `BarrierCompleteResponse`.
127pub mod barrier_complete_response {
128    #[derive(prost_helpers::AnyPB)]
129    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
130    pub struct CreateMviewProgress {
131        /// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
132        /// See <<https://github.com/risingwavelabs/risingwave/issues/6236>.>
133        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
134        pub backfill_actor_id: crate::id::ActorId,
135        #[prost(bool, tag = "2")]
136        pub done: bool,
137        /// MV backfill snapshot read epoch (0 for Done / Source backfill)
138        #[prost(uint64, tag = "3")]
139        pub consumed_epoch: u64,
140        /// MV backfill snapshot read rows / Source backfilled rows
141        #[prost(uint64, tag = "4")]
142        pub consumed_rows: u64,
143        #[prost(uint64, tag = "5")]
144        pub pending_epoch_lag: u64,
145        /// Buffered rows that are yet to be consumed (used by locality backfill to report precise progress)
146        #[prost(uint64, tag = "6")]
147        pub buffered_rows: u64,
148        #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
149        pub fragment_id: crate::id::FragmentId,
150    }
151    #[derive(prost_helpers::AnyPB)]
152    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
153    pub struct CdcTableBackfillProgress {
154        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
155        pub actor_id: crate::id::ActorId,
156        #[prost(uint64, tag = "2")]
157        pub epoch: u64,
158        #[prost(bool, tag = "3")]
159        pub done: bool,
160        #[prost(int64, tag = "4")]
161        pub split_id_start_inclusive: i64,
162        #[prost(int64, tag = "5")]
163        pub split_id_end_inclusive: i64,
164        #[prost(uint64, tag = "6")]
165        pub generation: u64,
166        #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
167        pub fragment_id: crate::id::FragmentId,
168    }
169    #[derive(prost_helpers::AnyPB)]
170    #[derive(Clone, PartialEq, ::prost::Message)]
171    pub struct LocalSstableInfo {
172        #[prost(message, optional, tag = "2")]
173        pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
174        #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
175        pub table_stats_map: ::std::collections::HashMap<
176            crate::id::TableId,
177            super::super::hummock::TableStats,
178        >,
179        #[prost(uint64, tag = "4")]
180        pub created_at: u64,
181    }
182    /// Used for refreshable batch source.
183    /// SourceExecutor reports the source load is finished, and then
184    /// meta will issue a LoadFinish barrier to notify MaterializeExecutor to start diff calculation.
185    #[derive(prost_helpers::AnyPB)]
186    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
187    pub struct LoadFinishedSource {
188        /// The actor that reported the completion event.
189        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
190        pub reporter_actor_id: crate::id::ActorId,
191        /// The table ID for the refreshable batch source.
192        #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
193        pub table_id: crate::id::TableId,
194        /// The source identifier associated with the finished load.
195        #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
196        pub associated_source_id: crate::id::SourceId,
197    }
198    /// SourceExecutor reports the source list is finished, and then
199    /// meta will issue a ListFinish barrier to notify the source to start loading.
200    #[derive(prost_helpers::AnyPB)]
201    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
202    pub struct ListFinishedSource {
203        /// The actor that reported the completion event.
204        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
205        pub reporter_actor_id: crate::id::ActorId,
206        /// The table ID for the refreshable batch source.
207        #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
208        pub table_id: crate::id::TableId,
209        /// The source identifier associated with the completed listing.
210        #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
211        pub associated_source_id: crate::id::SourceId,
212    }
213    /// SourceExecutor reports that CDC source has updated offset at least once.
214    /// Meta will mark the CDC source as created only after receiving this notification.
215    #[derive(prost_helpers::AnyPB)]
216    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
217    pub struct CdcSourceOffsetUpdated {
218        /// The actor that reported the offset update.
219        #[prost(uint32, tag = "1")]
220        pub reporter_actor_id: u32,
221        /// The source identifier for the CDC source.
222        #[prost(uint32, tag = "2")]
223        pub source_id: u32,
224    }
225}
226#[derive(prost_helpers::AnyPB)]
227#[derive(Clone, PartialEq, ::prost::Message)]
228pub struct StreamingControlStreamRequest {
229    #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
230    pub request: ::core::option::Option<streaming_control_stream_request::Request>,
231}
232/// Nested message and enum types in `StreamingControlStreamRequest`.
233pub mod streaming_control_stream_request {
234    #[derive(prost_helpers::AnyPB)]
235    #[derive(Clone, PartialEq, ::prost::Message)]
236    pub struct InitRequest {
237        #[prost(string, tag = "1")]
238        pub term_id: ::prost::alloc::string::String,
239    }
240    #[derive(prost_helpers::AnyPB)]
241    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
242    pub struct CreatePartialGraphRequest {
243        #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
244        pub partial_graph_id: crate::id::PartialGraphId,
245    }
246    #[derive(prost_helpers::AnyPB)]
247    #[derive(Clone, PartialEq, ::prost::Message)]
248    pub struct RemovePartialGraphRequest {
249        #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
250        pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
251    }
252    #[derive(prost_helpers::AnyPB)]
253    #[derive(Clone, PartialEq, ::prost::Message)]
254    pub struct ResetPartialGraphsRequest {
255        #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
256        pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
257    }
258    #[derive(prost_helpers::AnyPB)]
259    #[derive(Clone, PartialEq, ::prost::Oneof)]
260    pub enum Request {
261        #[prost(message, tag = "1")]
262        Init(InitRequest),
263        #[prost(message, tag = "2")]
264        InjectBarrier(super::InjectBarrierRequest),
265        #[prost(message, tag = "3")]
266        RemovePartialGraph(RemovePartialGraphRequest),
267        #[prost(message, tag = "4")]
268        CreatePartialGraph(CreatePartialGraphRequest),
269        #[prost(message, tag = "5")]
270        ResetPartialGraphs(ResetPartialGraphsRequest),
271    }
272}
273#[derive(prost_helpers::AnyPB)]
274#[derive(Clone, PartialEq, ::prost::Message)]
275pub struct ScoredError {
276    #[prost(string, tag = "1")]
277    pub err_msg: ::prost::alloc::string::String,
278    #[prost(int32, tag = "2")]
279    pub score: i32,
280}
281#[derive(prost_helpers::AnyPB)]
282#[derive(Clone, PartialEq, ::prost::Message)]
283pub struct StreamingControlStreamResponse {
284    #[prost(
285        oneof = "streaming_control_stream_response::Response",
286        tags = "1, 2, 3, 4, 5"
287    )]
288    pub response: ::core::option::Option<streaming_control_stream_response::Response>,
289}
290/// Nested message and enum types in `StreamingControlStreamResponse`.
291pub mod streaming_control_stream_response {
292    #[derive(prost_helpers::AnyPB)]
293    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
294    pub struct InitResponse {}
295    #[derive(prost_helpers::AnyPB)]
296    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
297    pub struct ShutdownResponse {}
298    #[derive(prost_helpers::AnyPB)]
299    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
300    pub struct ReportPartialGraphFailureResponse {
301        #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
302        pub partial_graph_id: crate::id::PartialGraphId,
303    }
304    #[derive(prost_helpers::AnyPB)]
305    #[derive(Clone, PartialEq, ::prost::Message)]
306    pub struct ResetPartialGraphResponse {
307        #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
308        pub partial_graph_id: crate::id::PartialGraphId,
309        #[prost(message, optional, tag = "2")]
310        pub root_err: ::core::option::Option<super::ScoredError>,
311    }
312    #[derive(prost_helpers::AnyPB)]
313    #[derive(Clone, PartialEq, ::prost::Oneof)]
314    pub enum Response {
315        #[prost(message, tag = "1")]
316        Init(InitResponse),
317        #[prost(message, tag = "2")]
318        CompleteBarrier(super::BarrierCompleteResponse),
319        #[prost(message, tag = "3")]
320        Shutdown(ShutdownResponse),
321        #[prost(message, tag = "4")]
322        ReportPartialGraphFailure(ReportPartialGraphFailureResponse),
323        #[prost(message, tag = "5")]
324        ResetPartialGraph(ResetPartialGraphResponse),
325    }
326}
327#[derive(prost_helpers::AnyPB)]
328#[derive(Clone, Copy, PartialEq, ::prost::Message)]
329pub struct GetMinUncommittedObjectIdRequest {}
330#[derive(prost_helpers::AnyPB)]
331#[derive(Clone, Copy, PartialEq, ::prost::Message)]
332pub struct GetMinUncommittedObjectIdResponse {
333    #[prost(uint64, tag = "1", wrapper = "crate::id::HummockRawObjectId")]
334    pub min_uncommitted_object_id: crate::id::HummockRawObjectId,
335}
336/// Generated client implementations.
337pub mod stream_service_client {
338    #![allow(
339        unused_variables,
340        dead_code,
341        missing_docs,
342        clippy::wildcard_imports,
343        clippy::let_unit_value,
344    )]
345    use tonic::codegen::*;
346    use tonic::codegen::http::Uri;
347    #[derive(Debug, Clone)]
348    pub struct StreamServiceClient<T> {
349        inner: tonic::client::Grpc<T>,
350    }
351    impl StreamServiceClient<tonic::transport::Channel> {
352        /// Attempt to create a new client by connecting to a given endpoint.
353        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
354        where
355            D: TryInto<tonic::transport::Endpoint>,
356            D::Error: Into<StdError>,
357        {
358            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
359            Ok(Self::new(conn))
360        }
361    }
362    impl<T> StreamServiceClient<T>
363    where
364        T: tonic::client::GrpcService<tonic::body::BoxBody>,
365        T::Error: Into<StdError>,
366        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
367        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
368    {
369        pub fn new(inner: T) -> Self {
370            let inner = tonic::client::Grpc::new(inner);
371            Self { inner }
372        }
373        pub fn with_origin(inner: T, origin: Uri) -> Self {
374            let inner = tonic::client::Grpc::with_origin(inner, origin);
375            Self { inner }
376        }
377        pub fn with_interceptor<F>(
378            inner: T,
379            interceptor: F,
380        ) -> StreamServiceClient<InterceptedService<T, F>>
381        where
382            F: tonic::service::Interceptor,
383            T::ResponseBody: Default,
384            T: tonic::codegen::Service<
385                http::Request<tonic::body::BoxBody>,
386                Response = http::Response<
387                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
388                >,
389            >,
390            <T as tonic::codegen::Service<
391                http::Request<tonic::body::BoxBody>,
392            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
393        {
394            StreamServiceClient::new(InterceptedService::new(inner, interceptor))
395        }
396        /// Compress requests with the given encoding.
397        ///
398        /// This requires the server to support it otherwise it might respond with an
399        /// error.
400        #[must_use]
401        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
402            self.inner = self.inner.send_compressed(encoding);
403            self
404        }
405        /// Enable decompressing responses.
406        #[must_use]
407        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
408            self.inner = self.inner.accept_compressed(encoding);
409            self
410        }
411        /// Limits the maximum size of a decoded message.
412        ///
413        /// Default: `4MB`
414        #[must_use]
415        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
416            self.inner = self.inner.max_decoding_message_size(limit);
417            self
418        }
419        /// Limits the maximum size of an encoded message.
420        ///
421        /// Default: `usize::MAX`
422        #[must_use]
423        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
424            self.inner = self.inner.max_encoding_message_size(limit);
425            self
426        }
427        pub async fn streaming_control_stream(
428            &mut self,
429            request: impl tonic::IntoStreamingRequest<
430                Message = super::StreamingControlStreamRequest,
431            >,
432        ) -> std::result::Result<
433            tonic::Response<
434                tonic::codec::Streaming<super::StreamingControlStreamResponse>,
435            >,
436            tonic::Status,
437        > {
438            self.inner
439                .ready()
440                .await
441                .map_err(|e| {
442                    tonic::Status::unknown(
443                        format!("Service was not ready: {}", e.into()),
444                    )
445                })?;
446            let codec = tonic::codec::ProstCodec::default();
447            let path = http::uri::PathAndQuery::from_static(
448                "/stream_service.StreamService/StreamingControlStream",
449            );
450            let mut req = request.into_streaming_request();
451            req.extensions_mut()
452                .insert(
453                    GrpcMethod::new(
454                        "stream_service.StreamService",
455                        "StreamingControlStream",
456                    ),
457                );
458            self.inner.streaming(req, path, codec).await
459        }
460        pub async fn get_min_uncommitted_object_id(
461            &mut self,
462            request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
463        ) -> std::result::Result<
464            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
465            tonic::Status,
466        > {
467            self.inner
468                .ready()
469                .await
470                .map_err(|e| {
471                    tonic::Status::unknown(
472                        format!("Service was not ready: {}", e.into()),
473                    )
474                })?;
475            let codec = tonic::codec::ProstCodec::default();
476            let path = http::uri::PathAndQuery::from_static(
477                "/stream_service.StreamService/GetMinUncommittedObjectId",
478            );
479            let mut req = request.into_request();
480            req.extensions_mut()
481                .insert(
482                    GrpcMethod::new(
483                        "stream_service.StreamService",
484                        "GetMinUncommittedObjectId",
485                    ),
486                );
487            self.inner.unary(req, path, codec).await
488        }
489    }
490}
491/// Generated server implementations.
492pub mod stream_service_server {
493    #![allow(
494        unused_variables,
495        dead_code,
496        missing_docs,
497        clippy::wildcard_imports,
498        clippy::let_unit_value,
499    )]
500    use tonic::codegen::*;
501    /// Generated trait containing gRPC methods that should be implemented for use with StreamServiceServer.
502    #[async_trait]
503    pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
504        /// Server streaming response type for the StreamingControlStream method.
505        type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
506                Item = std::result::Result<
507                    super::StreamingControlStreamResponse,
508                    tonic::Status,
509                >,
510            >
511            + std::marker::Send
512            + 'static;
513        async fn streaming_control_stream(
514            &self,
515            request: tonic::Request<
516                tonic::Streaming<super::StreamingControlStreamRequest>,
517            >,
518        ) -> std::result::Result<
519            tonic::Response<Self::StreamingControlStreamStream>,
520            tonic::Status,
521        >;
522        async fn get_min_uncommitted_object_id(
523            &self,
524            request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
525        ) -> std::result::Result<
526            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
527            tonic::Status,
528        >;
529    }
530    #[derive(Debug)]
531    pub struct StreamServiceServer<T> {
532        inner: Arc<T>,
533        accept_compression_encodings: EnabledCompressionEncodings,
534        send_compression_encodings: EnabledCompressionEncodings,
535        max_decoding_message_size: Option<usize>,
536        max_encoding_message_size: Option<usize>,
537    }
538    impl<T> StreamServiceServer<T> {
539        pub fn new(inner: T) -> Self {
540            Self::from_arc(Arc::new(inner))
541        }
542        pub fn from_arc(inner: Arc<T>) -> Self {
543            Self {
544                inner,
545                accept_compression_encodings: Default::default(),
546                send_compression_encodings: Default::default(),
547                max_decoding_message_size: None,
548                max_encoding_message_size: None,
549            }
550        }
551        pub fn with_interceptor<F>(
552            inner: T,
553            interceptor: F,
554        ) -> InterceptedService<Self, F>
555        where
556            F: tonic::service::Interceptor,
557        {
558            InterceptedService::new(Self::new(inner), interceptor)
559        }
560        /// Enable decompressing requests with the given encoding.
561        #[must_use]
562        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
563            self.accept_compression_encodings.enable(encoding);
564            self
565        }
566        /// Compress responses with the given encoding, if the client supports it.
567        #[must_use]
568        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
569            self.send_compression_encodings.enable(encoding);
570            self
571        }
572        /// Limits the maximum size of a decoded message.
573        ///
574        /// Default: `4MB`
575        #[must_use]
576        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
577            self.max_decoding_message_size = Some(limit);
578            self
579        }
580        /// Limits the maximum size of an encoded message.
581        ///
582        /// Default: `usize::MAX`
583        #[must_use]
584        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
585            self.max_encoding_message_size = Some(limit);
586            self
587        }
588    }
589    impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
590    where
591        T: StreamService,
592        B: Body + std::marker::Send + 'static,
593        B::Error: Into<StdError> + std::marker::Send + 'static,
594    {
595        type Response = http::Response<tonic::body::BoxBody>;
596        type Error = std::convert::Infallible;
597        type Future = BoxFuture<Self::Response, Self::Error>;
598        fn poll_ready(
599            &mut self,
600            _cx: &mut Context<'_>,
601        ) -> Poll<std::result::Result<(), Self::Error>> {
602            Poll::Ready(Ok(()))
603        }
604        fn call(&mut self, req: http::Request<B>) -> Self::Future {
605            match req.uri().path() {
606                "/stream_service.StreamService/StreamingControlStream" => {
607                    #[allow(non_camel_case_types)]
608                    struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
609                    impl<
610                        T: StreamService,
611                    > tonic::server::StreamingService<
612                        super::StreamingControlStreamRequest,
613                    > for StreamingControlStreamSvc<T> {
614                        type Response = super::StreamingControlStreamResponse;
615                        type ResponseStream = T::StreamingControlStreamStream;
616                        type Future = BoxFuture<
617                            tonic::Response<Self::ResponseStream>,
618                            tonic::Status,
619                        >;
620                        fn call(
621                            &mut self,
622                            request: tonic::Request<
623                                tonic::Streaming<super::StreamingControlStreamRequest>,
624                            >,
625                        ) -> Self::Future {
626                            let inner = Arc::clone(&self.0);
627                            let fut = async move {
628                                <T as StreamService>::streaming_control_stream(
629                                        &inner,
630                                        request,
631                                    )
632                                    .await
633                            };
634                            Box::pin(fut)
635                        }
636                    }
637                    let accept_compression_encodings = self.accept_compression_encodings;
638                    let send_compression_encodings = self.send_compression_encodings;
639                    let max_decoding_message_size = self.max_decoding_message_size;
640                    let max_encoding_message_size = self.max_encoding_message_size;
641                    let inner = self.inner.clone();
642                    let fut = async move {
643                        let method = StreamingControlStreamSvc(inner);
644                        let codec = tonic::codec::ProstCodec::default();
645                        let mut grpc = tonic::server::Grpc::new(codec)
646                            .apply_compression_config(
647                                accept_compression_encodings,
648                                send_compression_encodings,
649                            )
650                            .apply_max_message_size_config(
651                                max_decoding_message_size,
652                                max_encoding_message_size,
653                            );
654                        let res = grpc.streaming(method, req).await;
655                        Ok(res)
656                    };
657                    Box::pin(fut)
658                }
659                "/stream_service.StreamService/GetMinUncommittedObjectId" => {
660                    #[allow(non_camel_case_types)]
661                    struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
662                    impl<
663                        T: StreamService,
664                    > tonic::server::UnaryService<
665                        super::GetMinUncommittedObjectIdRequest,
666                    > for GetMinUncommittedObjectIdSvc<T> {
667                        type Response = super::GetMinUncommittedObjectIdResponse;
668                        type Future = BoxFuture<
669                            tonic::Response<Self::Response>,
670                            tonic::Status,
671                        >;
672                        fn call(
673                            &mut self,
674                            request: tonic::Request<
675                                super::GetMinUncommittedObjectIdRequest,
676                            >,
677                        ) -> Self::Future {
678                            let inner = Arc::clone(&self.0);
679                            let fut = async move {
680                                <T as StreamService>::get_min_uncommitted_object_id(
681                                        &inner,
682                                        request,
683                                    )
684                                    .await
685                            };
686                            Box::pin(fut)
687                        }
688                    }
689                    let accept_compression_encodings = self.accept_compression_encodings;
690                    let send_compression_encodings = self.send_compression_encodings;
691                    let max_decoding_message_size = self.max_decoding_message_size;
692                    let max_encoding_message_size = self.max_encoding_message_size;
693                    let inner = self.inner.clone();
694                    let fut = async move {
695                        let method = GetMinUncommittedObjectIdSvc(inner);
696                        let codec = tonic::codec::ProstCodec::default();
697                        let mut grpc = tonic::server::Grpc::new(codec)
698                            .apply_compression_config(
699                                accept_compression_encodings,
700                                send_compression_encodings,
701                            )
702                            .apply_max_message_size_config(
703                                max_decoding_message_size,
704                                max_encoding_message_size,
705                            );
706                        let res = grpc.unary(method, req).await;
707                        Ok(res)
708                    };
709                    Box::pin(fut)
710                }
711                _ => {
712                    Box::pin(async move {
713                        let mut response = http::Response::new(empty_body());
714                        let headers = response.headers_mut();
715                        headers
716                            .insert(
717                                tonic::Status::GRPC_STATUS,
718                                (tonic::Code::Unimplemented as i32).into(),
719                            );
720                        headers
721                            .insert(
722                                http::header::CONTENT_TYPE,
723                                tonic::metadata::GRPC_CONTENT_TYPE,
724                            );
725                        Ok(response)
726                    })
727                }
728            }
729        }
730    }
731    impl<T> Clone for StreamServiceServer<T> {
732        fn clone(&self) -> Self {
733            let inner = self.inner.clone();
734            Self {
735                inner,
736                accept_compression_encodings: self.accept_compression_encodings,
737                send_compression_encodings: self.send_compression_encodings,
738                max_decoding_message_size: self.max_decoding_message_size,
739                max_encoding_message_size: self.max_encoding_message_size,
740            }
741        }
742    }
743    /// Generated gRPC service name
744    pub const SERVICE_NAME: &str = "stream_service.StreamService";
745    impl<T> tonic::server::NamedService for StreamServiceServer<T> {
746        const NAME: &'static str = SERVICE_NAME;
747    }
748}