1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
10 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
11 #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
12 pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
13 #[prost(uint64, tag = "6", wrapper = "crate::id::PartialGraphId")]
14 pub partial_graph_id: crate::id::PartialGraphId,
15 #[prost(message, repeated, tag = "9")]
16 pub actors_to_build: ::prost::alloc::vec::Vec<
17 inject_barrier_request::FragmentBuildActorInfo,
18 >,
19}
20pub mod inject_barrier_request {
22 #[derive(prost_helpers::AnyPB)]
23 #[derive(Clone, PartialEq, ::prost::Message)]
24 pub struct FragmentBuildActorInfo {
25 #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
26 pub fragment_id: crate::id::FragmentId,
27 #[prost(message, optional, tag = "2")]
28 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
29 #[prost(message, repeated, tag = "3")]
30 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
31 }
32 #[derive(prost_helpers::AnyPB)]
33 #[derive(Clone, PartialEq, ::prost::Message)]
34 pub struct BuildActorInfo {
35 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
36 pub actor_id: crate::id::ActorId,
37 #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
38 pub fragment_upstreams: ::std::collections::HashMap<
39 crate::id::FragmentId,
40 build_actor_info::UpstreamActors,
41 >,
42 #[prost(message, repeated, tag = "3")]
43 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
44 #[prost(message, optional, tag = "4")]
45 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
46 #[prost(string, tag = "5")]
47 pub mview_definition: ::prost::alloc::string::String,
48 #[prost(message, optional, tag = "6")]
49 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
50 #[prost(string, tag = "9")]
51 pub config_override: ::prost::alloc::string::String,
52 #[prost(uint32, repeated, tag = "7", wrapper = "crate::id::SubscriberId")]
53 pub initial_subscriber_ids: ::prost::alloc::vec::Vec<crate::id::SubscriberId>,
54 }
55 pub mod build_actor_info {
57 #[derive(prost_helpers::AnyPB)]
58 #[derive(Clone, PartialEq, ::prost::Message)]
59 pub struct UpstreamActors {
60 #[prost(message, repeated, tag = "1")]
61 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62 }
63 }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68 #[prost(string, tag = "1")]
69 pub request_id: ::prost::alloc::string::String,
70 #[prost(message, optional, tag = "2")]
71 pub status: ::core::option::Option<super::common::Status>,
72 #[prost(message, repeated, tag = "3")]
73 pub create_mview_progress: ::prost::alloc::vec::Vec<
74 barrier_complete_response::CreateMviewProgress,
75 >,
76 #[prost(message, repeated, tag = "4")]
77 pub synced_sstables: ::prost::alloc::vec::Vec<
78 barrier_complete_response::LocalSstableInfo,
79 >,
80 #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
81 pub worker_id: crate::id::WorkerId,
82 #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83 pub table_watermarks: ::std::collections::HashMap<
84 crate::id::TableId,
85 super::hummock::TableWatermarks,
86 >,
87 #[prost(message, repeated, tag = "7")]
88 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89 #[prost(uint64, tag = "8", wrapper = "crate::id::PartialGraphId")]
90 pub partial_graph_id: crate::id::PartialGraphId,
91 #[prost(uint64, tag = "9")]
93 pub epoch: u64,
94 #[prost(message, repeated, tag = "11")]
95 pub load_finished_sources: ::prost::alloc::vec::Vec<
96 barrier_complete_response::LoadFinishedSource,
97 >,
98 #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
99 pub vector_index_adds: ::std::collections::HashMap<
100 crate::id::TableId,
101 super::hummock::vector_index_delta::VectorIndexAdds,
102 >,
103 #[prost(message, repeated, tag = "13")]
104 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
105 barrier_complete_response::CdcTableBackfillProgress,
106 >,
107 #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
111 pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
112 #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
116 pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
117 #[prost(message, repeated, tag = "16")]
118 pub list_finished_sources: ::prost::alloc::vec::Vec<
119 barrier_complete_response::ListFinishedSource,
120 >,
121}
122pub mod barrier_complete_response {
124 #[derive(prost_helpers::AnyPB)]
125 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
126 pub struct CreateMviewProgress {
127 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
130 pub backfill_actor_id: crate::id::ActorId,
131 #[prost(bool, tag = "2")]
132 pub done: bool,
133 #[prost(uint64, tag = "3")]
135 pub consumed_epoch: u64,
136 #[prost(uint64, tag = "4")]
138 pub consumed_rows: u64,
139 #[prost(uint64, tag = "5")]
140 pub pending_epoch_lag: u64,
141 #[prost(uint64, tag = "6")]
143 pub buffered_rows: u64,
144 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
145 pub fragment_id: crate::id::FragmentId,
146 }
147 #[derive(prost_helpers::AnyPB)]
148 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
149 pub struct CdcTableBackfillProgress {
150 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
151 pub actor_id: crate::id::ActorId,
152 #[prost(uint64, tag = "2")]
153 pub epoch: u64,
154 #[prost(bool, tag = "3")]
155 pub done: bool,
156 #[prost(int64, tag = "4")]
157 pub split_id_start_inclusive: i64,
158 #[prost(int64, tag = "5")]
159 pub split_id_end_inclusive: i64,
160 #[prost(uint64, tag = "6")]
161 pub generation: u64,
162 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
163 pub fragment_id: crate::id::FragmentId,
164 }
165 #[derive(prost_helpers::AnyPB)]
166 #[derive(Clone, PartialEq, ::prost::Message)]
167 pub struct LocalSstableInfo {
168 #[prost(message, optional, tag = "2")]
169 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
170 #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
171 pub table_stats_map: ::std::collections::HashMap<
172 crate::id::TableId,
173 super::super::hummock::TableStats,
174 >,
175 #[prost(uint64, tag = "4")]
176 pub created_at: u64,
177 }
178 #[derive(prost_helpers::AnyPB)]
182 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
183 pub struct LoadFinishedSource {
184 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
186 pub reporter_actor_id: crate::id::ActorId,
187 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
189 pub table_id: crate::id::TableId,
190 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
192 pub associated_source_id: crate::id::SourceId,
193 }
194 #[derive(prost_helpers::AnyPB)]
197 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
198 pub struct ListFinishedSource {
199 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
201 pub reporter_actor_id: crate::id::ActorId,
202 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
204 pub table_id: crate::id::TableId,
205 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
207 pub associated_source_id: crate::id::SourceId,
208 }
209}
210#[derive(prost_helpers::AnyPB)]
211#[derive(Clone, PartialEq, ::prost::Message)]
212pub struct StreamingControlStreamRequest {
213 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
214 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
215}
216pub mod streaming_control_stream_request {
218 #[derive(prost_helpers::AnyPB)]
219 #[derive(Clone, PartialEq, ::prost::Message)]
220 pub struct InitRequest {
221 #[prost(string, tag = "1")]
222 pub term_id: ::prost::alloc::string::String,
223 }
224 #[derive(prost_helpers::AnyPB)]
225 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226 pub struct CreatePartialGraphRequest {
227 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
228 pub partial_graph_id: crate::id::PartialGraphId,
229 }
230 #[derive(prost_helpers::AnyPB)]
231 #[derive(Clone, PartialEq, ::prost::Message)]
232 pub struct RemovePartialGraphRequest {
233 #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
234 pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
235 }
236 #[derive(prost_helpers::AnyPB)]
237 #[derive(Clone, PartialEq, ::prost::Message)]
238 pub struct ResetPartialGraphsRequest {
239 #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
240 pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
241 }
242 #[derive(prost_helpers::AnyPB)]
243 #[derive(Clone, PartialEq, ::prost::Oneof)]
244 pub enum Request {
245 #[prost(message, tag = "1")]
246 Init(InitRequest),
247 #[prost(message, tag = "2")]
248 InjectBarrier(super::InjectBarrierRequest),
249 #[prost(message, tag = "3")]
250 RemovePartialGraph(RemovePartialGraphRequest),
251 #[prost(message, tag = "4")]
252 CreatePartialGraph(CreatePartialGraphRequest),
253 #[prost(message, tag = "5")]
254 ResetPartialGraphs(ResetPartialGraphsRequest),
255 }
256}
257#[derive(prost_helpers::AnyPB)]
258#[derive(Clone, PartialEq, ::prost::Message)]
259pub struct ScoredError {
260 #[prost(string, tag = "1")]
261 pub err_msg: ::prost::alloc::string::String,
262 #[prost(int32, tag = "2")]
263 pub score: i32,
264}
265#[derive(prost_helpers::AnyPB)]
266#[derive(Clone, PartialEq, ::prost::Message)]
267pub struct StreamingControlStreamResponse {
268 #[prost(
269 oneof = "streaming_control_stream_response::Response",
270 tags = "1, 2, 3, 4, 5"
271 )]
272 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
273}
274pub mod streaming_control_stream_response {
276 #[derive(prost_helpers::AnyPB)]
277 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
278 pub struct InitResponse {}
279 #[derive(prost_helpers::AnyPB)]
280 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
281 pub struct ShutdownResponse {}
282 #[derive(prost_helpers::AnyPB)]
283 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
284 pub struct ReportPartialGraphFailureResponse {
285 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
286 pub partial_graph_id: crate::id::PartialGraphId,
287 }
288 #[derive(prost_helpers::AnyPB)]
289 #[derive(Clone, PartialEq, ::prost::Message)]
290 pub struct ResetPartialGraphResponse {
291 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
292 pub partial_graph_id: crate::id::PartialGraphId,
293 #[prost(message, optional, tag = "2")]
294 pub root_err: ::core::option::Option<super::ScoredError>,
295 }
296 #[derive(prost_helpers::AnyPB)]
297 #[derive(Clone, PartialEq, ::prost::Oneof)]
298 pub enum Response {
299 #[prost(message, tag = "1")]
300 Init(InitResponse),
301 #[prost(message, tag = "2")]
302 CompleteBarrier(super::BarrierCompleteResponse),
303 #[prost(message, tag = "3")]
304 Shutdown(ShutdownResponse),
305 #[prost(message, tag = "4")]
306 ReportPartialGraphFailure(ReportPartialGraphFailureResponse),
307 #[prost(message, tag = "5")]
308 ResetPartialGraph(ResetPartialGraphResponse),
309 }
310}
311#[derive(prost_helpers::AnyPB)]
312#[derive(Clone, Copy, PartialEq, ::prost::Message)]
313pub struct GetMinUncommittedObjectIdRequest {}
314#[derive(prost_helpers::AnyPB)]
315#[derive(Clone, Copy, PartialEq, ::prost::Message)]
316pub struct GetMinUncommittedObjectIdResponse {
317 #[prost(uint64, tag = "1")]
318 pub min_uncommitted_object_id: u64,
319}
320pub mod stream_service_client {
322 #![allow(
323 unused_variables,
324 dead_code,
325 missing_docs,
326 clippy::wildcard_imports,
327 clippy::let_unit_value,
328 )]
329 use tonic::codegen::*;
330 use tonic::codegen::http::Uri;
331 #[derive(Debug, Clone)]
332 pub struct StreamServiceClient<T> {
333 inner: tonic::client::Grpc<T>,
334 }
335 impl StreamServiceClient<tonic::transport::Channel> {
336 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
338 where
339 D: TryInto<tonic::transport::Endpoint>,
340 D::Error: Into<StdError>,
341 {
342 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
343 Ok(Self::new(conn))
344 }
345 }
346 impl<T> StreamServiceClient<T>
347 where
348 T: tonic::client::GrpcService<tonic::body::BoxBody>,
349 T::Error: Into<StdError>,
350 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
351 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
352 {
353 pub fn new(inner: T) -> Self {
354 let inner = tonic::client::Grpc::new(inner);
355 Self { inner }
356 }
357 pub fn with_origin(inner: T, origin: Uri) -> Self {
358 let inner = tonic::client::Grpc::with_origin(inner, origin);
359 Self { inner }
360 }
361 pub fn with_interceptor<F>(
362 inner: T,
363 interceptor: F,
364 ) -> StreamServiceClient<InterceptedService<T, F>>
365 where
366 F: tonic::service::Interceptor,
367 T::ResponseBody: Default,
368 T: tonic::codegen::Service<
369 http::Request<tonic::body::BoxBody>,
370 Response = http::Response<
371 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
372 >,
373 >,
374 <T as tonic::codegen::Service<
375 http::Request<tonic::body::BoxBody>,
376 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
377 {
378 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
379 }
380 #[must_use]
385 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
386 self.inner = self.inner.send_compressed(encoding);
387 self
388 }
389 #[must_use]
391 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
392 self.inner = self.inner.accept_compressed(encoding);
393 self
394 }
395 #[must_use]
399 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
400 self.inner = self.inner.max_decoding_message_size(limit);
401 self
402 }
403 #[must_use]
407 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
408 self.inner = self.inner.max_encoding_message_size(limit);
409 self
410 }
411 pub async fn streaming_control_stream(
412 &mut self,
413 request: impl tonic::IntoStreamingRequest<
414 Message = super::StreamingControlStreamRequest,
415 >,
416 ) -> std::result::Result<
417 tonic::Response<
418 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
419 >,
420 tonic::Status,
421 > {
422 self.inner
423 .ready()
424 .await
425 .map_err(|e| {
426 tonic::Status::unknown(
427 format!("Service was not ready: {}", e.into()),
428 )
429 })?;
430 let codec = tonic::codec::ProstCodec::default();
431 let path = http::uri::PathAndQuery::from_static(
432 "/stream_service.StreamService/StreamingControlStream",
433 );
434 let mut req = request.into_streaming_request();
435 req.extensions_mut()
436 .insert(
437 GrpcMethod::new(
438 "stream_service.StreamService",
439 "StreamingControlStream",
440 ),
441 );
442 self.inner.streaming(req, path, codec).await
443 }
444 pub async fn get_min_uncommitted_object_id(
445 &mut self,
446 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
447 ) -> std::result::Result<
448 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
449 tonic::Status,
450 > {
451 self.inner
452 .ready()
453 .await
454 .map_err(|e| {
455 tonic::Status::unknown(
456 format!("Service was not ready: {}", e.into()),
457 )
458 })?;
459 let codec = tonic::codec::ProstCodec::default();
460 let path = http::uri::PathAndQuery::from_static(
461 "/stream_service.StreamService/GetMinUncommittedObjectId",
462 );
463 let mut req = request.into_request();
464 req.extensions_mut()
465 .insert(
466 GrpcMethod::new(
467 "stream_service.StreamService",
468 "GetMinUncommittedObjectId",
469 ),
470 );
471 self.inner.unary(req, path, codec).await
472 }
473 }
474}
475pub mod stream_service_server {
477 #![allow(
478 unused_variables,
479 dead_code,
480 missing_docs,
481 clippy::wildcard_imports,
482 clippy::let_unit_value,
483 )]
484 use tonic::codegen::*;
485 #[async_trait]
487 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
488 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
490 Item = std::result::Result<
491 super::StreamingControlStreamResponse,
492 tonic::Status,
493 >,
494 >
495 + std::marker::Send
496 + 'static;
497 async fn streaming_control_stream(
498 &self,
499 request: tonic::Request<
500 tonic::Streaming<super::StreamingControlStreamRequest>,
501 >,
502 ) -> std::result::Result<
503 tonic::Response<Self::StreamingControlStreamStream>,
504 tonic::Status,
505 >;
506 async fn get_min_uncommitted_object_id(
507 &self,
508 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
509 ) -> std::result::Result<
510 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
511 tonic::Status,
512 >;
513 }
514 #[derive(Debug)]
515 pub struct StreamServiceServer<T> {
516 inner: Arc<T>,
517 accept_compression_encodings: EnabledCompressionEncodings,
518 send_compression_encodings: EnabledCompressionEncodings,
519 max_decoding_message_size: Option<usize>,
520 max_encoding_message_size: Option<usize>,
521 }
522 impl<T> StreamServiceServer<T> {
523 pub fn new(inner: T) -> Self {
524 Self::from_arc(Arc::new(inner))
525 }
526 pub fn from_arc(inner: Arc<T>) -> Self {
527 Self {
528 inner,
529 accept_compression_encodings: Default::default(),
530 send_compression_encodings: Default::default(),
531 max_decoding_message_size: None,
532 max_encoding_message_size: None,
533 }
534 }
535 pub fn with_interceptor<F>(
536 inner: T,
537 interceptor: F,
538 ) -> InterceptedService<Self, F>
539 where
540 F: tonic::service::Interceptor,
541 {
542 InterceptedService::new(Self::new(inner), interceptor)
543 }
544 #[must_use]
546 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
547 self.accept_compression_encodings.enable(encoding);
548 self
549 }
550 #[must_use]
552 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
553 self.send_compression_encodings.enable(encoding);
554 self
555 }
556 #[must_use]
560 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
561 self.max_decoding_message_size = Some(limit);
562 self
563 }
564 #[must_use]
568 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
569 self.max_encoding_message_size = Some(limit);
570 self
571 }
572 }
573 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
574 where
575 T: StreamService,
576 B: Body + std::marker::Send + 'static,
577 B::Error: Into<StdError> + std::marker::Send + 'static,
578 {
579 type Response = http::Response<tonic::body::BoxBody>;
580 type Error = std::convert::Infallible;
581 type Future = BoxFuture<Self::Response, Self::Error>;
582 fn poll_ready(
583 &mut self,
584 _cx: &mut Context<'_>,
585 ) -> Poll<std::result::Result<(), Self::Error>> {
586 Poll::Ready(Ok(()))
587 }
588 fn call(&mut self, req: http::Request<B>) -> Self::Future {
589 match req.uri().path() {
590 "/stream_service.StreamService/StreamingControlStream" => {
591 #[allow(non_camel_case_types)]
592 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
593 impl<
594 T: StreamService,
595 > tonic::server::StreamingService<
596 super::StreamingControlStreamRequest,
597 > for StreamingControlStreamSvc<T> {
598 type Response = super::StreamingControlStreamResponse;
599 type ResponseStream = T::StreamingControlStreamStream;
600 type Future = BoxFuture<
601 tonic::Response<Self::ResponseStream>,
602 tonic::Status,
603 >;
604 fn call(
605 &mut self,
606 request: tonic::Request<
607 tonic::Streaming<super::StreamingControlStreamRequest>,
608 >,
609 ) -> Self::Future {
610 let inner = Arc::clone(&self.0);
611 let fut = async move {
612 <T as StreamService>::streaming_control_stream(
613 &inner,
614 request,
615 )
616 .await
617 };
618 Box::pin(fut)
619 }
620 }
621 let accept_compression_encodings = self.accept_compression_encodings;
622 let send_compression_encodings = self.send_compression_encodings;
623 let max_decoding_message_size = self.max_decoding_message_size;
624 let max_encoding_message_size = self.max_encoding_message_size;
625 let inner = self.inner.clone();
626 let fut = async move {
627 let method = StreamingControlStreamSvc(inner);
628 let codec = tonic::codec::ProstCodec::default();
629 let mut grpc = tonic::server::Grpc::new(codec)
630 .apply_compression_config(
631 accept_compression_encodings,
632 send_compression_encodings,
633 )
634 .apply_max_message_size_config(
635 max_decoding_message_size,
636 max_encoding_message_size,
637 );
638 let res = grpc.streaming(method, req).await;
639 Ok(res)
640 };
641 Box::pin(fut)
642 }
643 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
644 #[allow(non_camel_case_types)]
645 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
646 impl<
647 T: StreamService,
648 > tonic::server::UnaryService<
649 super::GetMinUncommittedObjectIdRequest,
650 > for GetMinUncommittedObjectIdSvc<T> {
651 type Response = super::GetMinUncommittedObjectIdResponse;
652 type Future = BoxFuture<
653 tonic::Response<Self::Response>,
654 tonic::Status,
655 >;
656 fn call(
657 &mut self,
658 request: tonic::Request<
659 super::GetMinUncommittedObjectIdRequest,
660 >,
661 ) -> Self::Future {
662 let inner = Arc::clone(&self.0);
663 let fut = async move {
664 <T as StreamService>::get_min_uncommitted_object_id(
665 &inner,
666 request,
667 )
668 .await
669 };
670 Box::pin(fut)
671 }
672 }
673 let accept_compression_encodings = self.accept_compression_encodings;
674 let send_compression_encodings = self.send_compression_encodings;
675 let max_decoding_message_size = self.max_decoding_message_size;
676 let max_encoding_message_size = self.max_encoding_message_size;
677 let inner = self.inner.clone();
678 let fut = async move {
679 let method = GetMinUncommittedObjectIdSvc(inner);
680 let codec = tonic::codec::ProstCodec::default();
681 let mut grpc = tonic::server::Grpc::new(codec)
682 .apply_compression_config(
683 accept_compression_encodings,
684 send_compression_encodings,
685 )
686 .apply_max_message_size_config(
687 max_decoding_message_size,
688 max_encoding_message_size,
689 );
690 let res = grpc.unary(method, req).await;
691 Ok(res)
692 };
693 Box::pin(fut)
694 }
695 _ => {
696 Box::pin(async move {
697 let mut response = http::Response::new(empty_body());
698 let headers = response.headers_mut();
699 headers
700 .insert(
701 tonic::Status::GRPC_STATUS,
702 (tonic::Code::Unimplemented as i32).into(),
703 );
704 headers
705 .insert(
706 http::header::CONTENT_TYPE,
707 tonic::metadata::GRPC_CONTENT_TYPE,
708 );
709 Ok(response)
710 })
711 }
712 }
713 }
714 }
715 impl<T> Clone for StreamServiceServer<T> {
716 fn clone(&self) -> Self {
717 let inner = self.inner.clone();
718 Self {
719 inner,
720 accept_compression_encodings: self.accept_compression_encodings,
721 send_compression_encodings: self.send_compression_encodings,
722 max_decoding_message_size: self.max_decoding_message_size,
723 max_encoding_message_size: self.max_encoding_message_size,
724 }
725 }
726 }
727 pub const SERVICE_NAME: &str = "stream_service.StreamService";
729 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
730 const NAME: &'static str = SERVICE_NAME;
731 }
732}