1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, tag = "3")]
10 pub database_id: u32,
11 #[prost(uint32, repeated, tag = "4")]
12 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<u32>,
13 #[prost(uint32, repeated, tag = "5")]
14 pub table_ids_to_sync: ::prost::alloc::vec::Vec<u32>,
15 #[prost(uint32, tag = "6")]
16 pub partial_graph_id: u32,
17 #[prost(message, repeated, tag = "9")]
18 pub actors_to_build: ::prost::alloc::vec::Vec<
19 inject_barrier_request::FragmentBuildActorInfo,
20 >,
21 #[prost(message, repeated, tag = "10")]
22 pub subscriptions_to_add: ::prost::alloc::vec::Vec<
23 super::stream_plan::SubscriptionUpstreamInfo,
24 >,
25 #[prost(message, repeated, tag = "11")]
26 pub subscriptions_to_remove: ::prost::alloc::vec::Vec<
27 super::stream_plan::SubscriptionUpstreamInfo,
28 >,
29}
30pub mod inject_barrier_request {
32 #[derive(prost_helpers::AnyPB)]
33 #[derive(Clone, PartialEq, ::prost::Message)]
34 pub struct FragmentBuildActorInfo {
35 #[prost(uint32, tag = "1")]
36 pub fragment_id: u32,
37 #[prost(message, optional, tag = "2")]
38 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
39 #[prost(message, repeated, tag = "3")]
40 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
41 }
42 #[derive(prost_helpers::AnyPB)]
43 #[derive(Clone, PartialEq, ::prost::Message)]
44 pub struct BuildActorInfo {
45 #[prost(uint32, tag = "1")]
46 pub actor_id: u32,
47 #[prost(map = "uint32, message", tag = "2")]
48 pub fragment_upstreams: ::std::collections::HashMap<
49 u32,
50 build_actor_info::UpstreamActors,
51 >,
52 #[prost(message, repeated, tag = "3")]
53 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
54 #[prost(message, optional, tag = "4")]
55 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
56 #[prost(string, tag = "5")]
57 pub mview_definition: ::prost::alloc::string::String,
58 #[prost(message, optional, tag = "6")]
59 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
60 }
61 pub mod build_actor_info {
63 #[derive(prost_helpers::AnyPB)]
64 #[derive(Clone, PartialEq, ::prost::Message)]
65 pub struct UpstreamActors {
66 #[prost(message, repeated, tag = "1")]
67 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
68 }
69 }
70}
71#[derive(prost_helpers::AnyPB)]
72#[derive(Clone, PartialEq, ::prost::Message)]
73pub struct BarrierCompleteResponse {
74 #[prost(string, tag = "1")]
75 pub request_id: ::prost::alloc::string::String,
76 #[prost(message, optional, tag = "2")]
77 pub status: ::core::option::Option<super::common::Status>,
78 #[prost(message, repeated, tag = "3")]
79 pub create_mview_progress: ::prost::alloc::vec::Vec<
80 barrier_complete_response::CreateMviewProgress,
81 >,
82 #[prost(message, repeated, tag = "4")]
83 pub synced_sstables: ::prost::alloc::vec::Vec<
84 barrier_complete_response::LocalSstableInfo,
85 >,
86 #[prost(uint32, tag = "5")]
87 pub worker_id: u32,
88 #[prost(map = "uint32, message", tag = "6")]
89 pub table_watermarks: ::std::collections::HashMap<
90 u32,
91 super::hummock::TableWatermarks,
92 >,
93 #[prost(message, repeated, tag = "7")]
94 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
95 #[prost(uint32, tag = "8")]
96 pub partial_graph_id: u32,
97 #[prost(uint64, tag = "9")]
99 pub epoch: u64,
100 #[prost(uint32, tag = "10")]
101 pub database_id: u32,
102 #[prost(uint32, repeated, tag = "11")]
106 pub load_finished_source_ids: ::prost::alloc::vec::Vec<u32>,
107 #[prost(map = "uint32, message", tag = "12")]
108 pub vector_index_adds: ::std::collections::HashMap<
109 u32,
110 super::hummock::vector_index_delta::VectorIndexAdds,
111 >,
112 #[prost(message, repeated, tag = "13")]
113 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
114 barrier_complete_response::CdcTableBackfillProgress,
115 >,
116 #[prost(uint32, repeated, tag = "14")]
120 pub truncate_tables: ::prost::alloc::vec::Vec<u32>,
121 #[prost(uint32, repeated, tag = "15")]
125 pub refresh_finished_tables: ::prost::alloc::vec::Vec<u32>,
126 #[prost(uint32, repeated, tag = "16")]
129 pub list_finished_source_ids: ::prost::alloc::vec::Vec<u32>,
130}
131pub mod barrier_complete_response {
133 #[derive(prost_helpers::AnyPB)]
134 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
135 pub struct CreateMviewProgress {
136 #[prost(uint32, tag = "1")]
139 pub backfill_actor_id: u32,
140 #[prost(bool, tag = "2")]
141 pub done: bool,
142 #[prost(uint64, tag = "3")]
144 pub consumed_epoch: u64,
145 #[prost(uint64, tag = "4")]
147 pub consumed_rows: u64,
148 #[prost(uint64, tag = "5")]
149 pub pending_epoch_lag: u64,
150 #[prost(uint64, tag = "6")]
152 pub buffered_rows: u64,
153 }
154 #[derive(prost_helpers::AnyPB)]
155 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
156 pub struct CdcTableBackfillProgress {
157 #[prost(uint32, tag = "1")]
158 pub actor_id: u32,
159 #[prost(uint64, tag = "2")]
160 pub epoch: u64,
161 #[prost(bool, tag = "3")]
162 pub done: bool,
163 #[prost(int64, tag = "4")]
164 pub split_id_start_inclusive: i64,
165 #[prost(int64, tag = "5")]
166 pub split_id_end_inclusive: i64,
167 #[prost(uint64, tag = "6")]
168 pub generation: u64,
169 #[prost(uint32, tag = "7")]
170 pub fragment_id: u32,
171 }
172 #[derive(prost_helpers::AnyPB)]
173 #[derive(Clone, PartialEq, ::prost::Message)]
174 pub struct LocalSstableInfo {
175 #[prost(message, optional, tag = "2")]
176 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
177 #[prost(map = "uint32, message", tag = "3")]
178 pub table_stats_map: ::std::collections::HashMap<
179 u32,
180 super::super::hummock::TableStats,
181 >,
182 #[prost(uint64, tag = "4")]
183 pub created_at: u64,
184 }
185}
186#[derive(prost_helpers::AnyPB)]
187#[derive(Clone, PartialEq, ::prost::Message)]
188pub struct StreamingControlStreamRequest {
189 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
190 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
191}
192pub mod streaming_control_stream_request {
194 #[derive(prost_helpers::AnyPB)]
195 #[derive(Clone, PartialEq, ::prost::Message)]
196 pub struct InitialPartialGraph {
197 #[prost(uint32, tag = "1")]
198 pub partial_graph_id: u32,
199 #[prost(message, repeated, tag = "2")]
200 pub subscriptions: ::prost::alloc::vec::Vec<
201 super::super::stream_plan::SubscriptionUpstreamInfo,
202 >,
203 }
204 #[derive(prost_helpers::AnyPB)]
205 #[derive(Clone, PartialEq, ::prost::Message)]
206 pub struct DatabaseInitialPartialGraph {
207 #[prost(uint32, tag = "1")]
208 pub database_id: u32,
209 #[prost(message, repeated, tag = "2")]
210 pub graphs: ::prost::alloc::vec::Vec<InitialPartialGraph>,
211 }
212 #[derive(prost_helpers::AnyPB)]
213 #[derive(Clone, PartialEq, ::prost::Message)]
214 pub struct InitRequest {
215 #[prost(message, repeated, tag = "1")]
216 pub databases: ::prost::alloc::vec::Vec<DatabaseInitialPartialGraph>,
217 #[prost(string, tag = "2")]
218 pub term_id: ::prost::alloc::string::String,
219 }
220 #[derive(prost_helpers::AnyPB)]
221 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
222 pub struct CreatePartialGraphRequest {
223 #[prost(uint32, tag = "1")]
224 pub partial_graph_id: u32,
225 #[prost(uint32, tag = "2")]
226 pub database_id: u32,
227 }
228 #[derive(prost_helpers::AnyPB)]
229 #[derive(Clone, PartialEq, ::prost::Message)]
230 pub struct RemovePartialGraphRequest {
231 #[prost(uint32, repeated, tag = "1")]
232 pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
233 #[prost(uint32, tag = "2")]
234 pub database_id: u32,
235 }
236 #[derive(prost_helpers::AnyPB)]
237 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
238 pub struct ResetDatabaseRequest {
239 #[prost(uint32, tag = "1")]
240 pub database_id: u32,
241 #[prost(uint32, tag = "2")]
242 pub reset_request_id: u32,
243 }
244 #[derive(prost_helpers::AnyPB)]
245 #[derive(Clone, PartialEq, ::prost::Oneof)]
246 pub enum Request {
247 #[prost(message, tag = "1")]
248 Init(InitRequest),
249 #[prost(message, tag = "2")]
250 InjectBarrier(super::InjectBarrierRequest),
251 #[prost(message, tag = "3")]
252 RemovePartialGraph(RemovePartialGraphRequest),
253 #[prost(message, tag = "4")]
254 CreatePartialGraph(CreatePartialGraphRequest),
255 #[prost(message, tag = "5")]
256 ResetDatabase(ResetDatabaseRequest),
257 }
258}
259#[derive(prost_helpers::AnyPB)]
260#[derive(Clone, PartialEq, ::prost::Message)]
261pub struct ScoredError {
262 #[prost(string, tag = "1")]
263 pub err_msg: ::prost::alloc::string::String,
264 #[prost(int32, tag = "2")]
265 pub score: i32,
266}
267#[derive(prost_helpers::AnyPB)]
268#[derive(Clone, PartialEq, ::prost::Message)]
269pub struct StreamingControlStreamResponse {
270 #[prost(
271 oneof = "streaming_control_stream_response::Response",
272 tags = "1, 2, 3, 4, 5"
273 )]
274 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
275}
276pub mod streaming_control_stream_response {
278 #[derive(prost_helpers::AnyPB)]
279 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
280 pub struct InitResponse {}
281 #[derive(prost_helpers::AnyPB)]
282 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
283 pub struct ShutdownResponse {}
284 #[derive(prost_helpers::AnyPB)]
285 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
286 pub struct ReportDatabaseFailureResponse {
287 #[prost(uint32, tag = "1")]
288 pub database_id: u32,
289 }
290 #[derive(prost_helpers::AnyPB)]
291 #[derive(Clone, PartialEq, ::prost::Message)]
292 pub struct ResetDatabaseResponse {
293 #[prost(uint32, tag = "1")]
294 pub database_id: u32,
295 #[prost(message, optional, tag = "2")]
296 pub root_err: ::core::option::Option<super::ScoredError>,
297 #[prost(uint32, tag = "3")]
298 pub reset_request_id: u32,
299 }
300 #[derive(prost_helpers::AnyPB)]
301 #[derive(Clone, PartialEq, ::prost::Oneof)]
302 pub enum Response {
303 #[prost(message, tag = "1")]
304 Init(InitResponse),
305 #[prost(message, tag = "2")]
306 CompleteBarrier(super::BarrierCompleteResponse),
307 #[prost(message, tag = "3")]
308 Shutdown(ShutdownResponse),
309 #[prost(message, tag = "4")]
310 ReportDatabaseFailure(ReportDatabaseFailureResponse),
311 #[prost(message, tag = "5")]
312 ResetDatabase(ResetDatabaseResponse),
313 }
314}
315#[derive(prost_helpers::AnyPB)]
316#[derive(Clone, Copy, PartialEq, ::prost::Message)]
317pub struct GetMinUncommittedObjectIdRequest {}
318#[derive(prost_helpers::AnyPB)]
319#[derive(Clone, Copy, PartialEq, ::prost::Message)]
320pub struct GetMinUncommittedObjectIdResponse {
321 #[prost(uint64, tag = "1")]
322 pub min_uncommitted_object_id: u64,
323}
324pub mod stream_service_client {
326 #![allow(
327 unused_variables,
328 dead_code,
329 missing_docs,
330 clippy::wildcard_imports,
331 clippy::let_unit_value,
332 )]
333 use tonic::codegen::*;
334 use tonic::codegen::http::Uri;
335 #[derive(Debug, Clone)]
336 pub struct StreamServiceClient<T> {
337 inner: tonic::client::Grpc<T>,
338 }
339 impl StreamServiceClient<tonic::transport::Channel> {
340 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
342 where
343 D: TryInto<tonic::transport::Endpoint>,
344 D::Error: Into<StdError>,
345 {
346 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
347 Ok(Self::new(conn))
348 }
349 }
350 impl<T> StreamServiceClient<T>
351 where
352 T: tonic::client::GrpcService<tonic::body::BoxBody>,
353 T::Error: Into<StdError>,
354 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
355 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
356 {
357 pub fn new(inner: T) -> Self {
358 let inner = tonic::client::Grpc::new(inner);
359 Self { inner }
360 }
361 pub fn with_origin(inner: T, origin: Uri) -> Self {
362 let inner = tonic::client::Grpc::with_origin(inner, origin);
363 Self { inner }
364 }
365 pub fn with_interceptor<F>(
366 inner: T,
367 interceptor: F,
368 ) -> StreamServiceClient<InterceptedService<T, F>>
369 where
370 F: tonic::service::Interceptor,
371 T::ResponseBody: Default,
372 T: tonic::codegen::Service<
373 http::Request<tonic::body::BoxBody>,
374 Response = http::Response<
375 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
376 >,
377 >,
378 <T as tonic::codegen::Service<
379 http::Request<tonic::body::BoxBody>,
380 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
381 {
382 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
383 }
384 #[must_use]
389 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
390 self.inner = self.inner.send_compressed(encoding);
391 self
392 }
393 #[must_use]
395 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
396 self.inner = self.inner.accept_compressed(encoding);
397 self
398 }
399 #[must_use]
403 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
404 self.inner = self.inner.max_decoding_message_size(limit);
405 self
406 }
407 #[must_use]
411 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
412 self.inner = self.inner.max_encoding_message_size(limit);
413 self
414 }
415 pub async fn streaming_control_stream(
416 &mut self,
417 request: impl tonic::IntoStreamingRequest<
418 Message = super::StreamingControlStreamRequest,
419 >,
420 ) -> std::result::Result<
421 tonic::Response<
422 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
423 >,
424 tonic::Status,
425 > {
426 self.inner
427 .ready()
428 .await
429 .map_err(|e| {
430 tonic::Status::unknown(
431 format!("Service was not ready: {}", e.into()),
432 )
433 })?;
434 let codec = tonic::codec::ProstCodec::default();
435 let path = http::uri::PathAndQuery::from_static(
436 "/stream_service.StreamService/StreamingControlStream",
437 );
438 let mut req = request.into_streaming_request();
439 req.extensions_mut()
440 .insert(
441 GrpcMethod::new(
442 "stream_service.StreamService",
443 "StreamingControlStream",
444 ),
445 );
446 self.inner.streaming(req, path, codec).await
447 }
448 pub async fn get_min_uncommitted_object_id(
449 &mut self,
450 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
451 ) -> std::result::Result<
452 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
453 tonic::Status,
454 > {
455 self.inner
456 .ready()
457 .await
458 .map_err(|e| {
459 tonic::Status::unknown(
460 format!("Service was not ready: {}", e.into()),
461 )
462 })?;
463 let codec = tonic::codec::ProstCodec::default();
464 let path = http::uri::PathAndQuery::from_static(
465 "/stream_service.StreamService/GetMinUncommittedObjectId",
466 );
467 let mut req = request.into_request();
468 req.extensions_mut()
469 .insert(
470 GrpcMethod::new(
471 "stream_service.StreamService",
472 "GetMinUncommittedObjectId",
473 ),
474 );
475 self.inner.unary(req, path, codec).await
476 }
477 }
478}
479pub mod stream_service_server {
481 #![allow(
482 unused_variables,
483 dead_code,
484 missing_docs,
485 clippy::wildcard_imports,
486 clippy::let_unit_value,
487 )]
488 use tonic::codegen::*;
489 #[async_trait]
491 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
492 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
494 Item = std::result::Result<
495 super::StreamingControlStreamResponse,
496 tonic::Status,
497 >,
498 >
499 + std::marker::Send
500 + 'static;
501 async fn streaming_control_stream(
502 &self,
503 request: tonic::Request<
504 tonic::Streaming<super::StreamingControlStreamRequest>,
505 >,
506 ) -> std::result::Result<
507 tonic::Response<Self::StreamingControlStreamStream>,
508 tonic::Status,
509 >;
510 async fn get_min_uncommitted_object_id(
511 &self,
512 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
513 ) -> std::result::Result<
514 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
515 tonic::Status,
516 >;
517 }
518 #[derive(Debug)]
519 pub struct StreamServiceServer<T> {
520 inner: Arc<T>,
521 accept_compression_encodings: EnabledCompressionEncodings,
522 send_compression_encodings: EnabledCompressionEncodings,
523 max_decoding_message_size: Option<usize>,
524 max_encoding_message_size: Option<usize>,
525 }
526 impl<T> StreamServiceServer<T> {
527 pub fn new(inner: T) -> Self {
528 Self::from_arc(Arc::new(inner))
529 }
530 pub fn from_arc(inner: Arc<T>) -> Self {
531 Self {
532 inner,
533 accept_compression_encodings: Default::default(),
534 send_compression_encodings: Default::default(),
535 max_decoding_message_size: None,
536 max_encoding_message_size: None,
537 }
538 }
539 pub fn with_interceptor<F>(
540 inner: T,
541 interceptor: F,
542 ) -> InterceptedService<Self, F>
543 where
544 F: tonic::service::Interceptor,
545 {
546 InterceptedService::new(Self::new(inner), interceptor)
547 }
548 #[must_use]
550 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
551 self.accept_compression_encodings.enable(encoding);
552 self
553 }
554 #[must_use]
556 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
557 self.send_compression_encodings.enable(encoding);
558 self
559 }
560 #[must_use]
564 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
565 self.max_decoding_message_size = Some(limit);
566 self
567 }
568 #[must_use]
572 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
573 self.max_encoding_message_size = Some(limit);
574 self
575 }
576 }
577 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
578 where
579 T: StreamService,
580 B: Body + std::marker::Send + 'static,
581 B::Error: Into<StdError> + std::marker::Send + 'static,
582 {
583 type Response = http::Response<tonic::body::BoxBody>;
584 type Error = std::convert::Infallible;
585 type Future = BoxFuture<Self::Response, Self::Error>;
586 fn poll_ready(
587 &mut self,
588 _cx: &mut Context<'_>,
589 ) -> Poll<std::result::Result<(), Self::Error>> {
590 Poll::Ready(Ok(()))
591 }
592 fn call(&mut self, req: http::Request<B>) -> Self::Future {
593 match req.uri().path() {
594 "/stream_service.StreamService/StreamingControlStream" => {
595 #[allow(non_camel_case_types)]
596 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
597 impl<
598 T: StreamService,
599 > tonic::server::StreamingService<
600 super::StreamingControlStreamRequest,
601 > for StreamingControlStreamSvc<T> {
602 type Response = super::StreamingControlStreamResponse;
603 type ResponseStream = T::StreamingControlStreamStream;
604 type Future = BoxFuture<
605 tonic::Response<Self::ResponseStream>,
606 tonic::Status,
607 >;
608 fn call(
609 &mut self,
610 request: tonic::Request<
611 tonic::Streaming<super::StreamingControlStreamRequest>,
612 >,
613 ) -> Self::Future {
614 let inner = Arc::clone(&self.0);
615 let fut = async move {
616 <T as StreamService>::streaming_control_stream(
617 &inner,
618 request,
619 )
620 .await
621 };
622 Box::pin(fut)
623 }
624 }
625 let accept_compression_encodings = self.accept_compression_encodings;
626 let send_compression_encodings = self.send_compression_encodings;
627 let max_decoding_message_size = self.max_decoding_message_size;
628 let max_encoding_message_size = self.max_encoding_message_size;
629 let inner = self.inner.clone();
630 let fut = async move {
631 let method = StreamingControlStreamSvc(inner);
632 let codec = tonic::codec::ProstCodec::default();
633 let mut grpc = tonic::server::Grpc::new(codec)
634 .apply_compression_config(
635 accept_compression_encodings,
636 send_compression_encodings,
637 )
638 .apply_max_message_size_config(
639 max_decoding_message_size,
640 max_encoding_message_size,
641 );
642 let res = grpc.streaming(method, req).await;
643 Ok(res)
644 };
645 Box::pin(fut)
646 }
647 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
648 #[allow(non_camel_case_types)]
649 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
650 impl<
651 T: StreamService,
652 > tonic::server::UnaryService<
653 super::GetMinUncommittedObjectIdRequest,
654 > for GetMinUncommittedObjectIdSvc<T> {
655 type Response = super::GetMinUncommittedObjectIdResponse;
656 type Future = BoxFuture<
657 tonic::Response<Self::Response>,
658 tonic::Status,
659 >;
660 fn call(
661 &mut self,
662 request: tonic::Request<
663 super::GetMinUncommittedObjectIdRequest,
664 >,
665 ) -> Self::Future {
666 let inner = Arc::clone(&self.0);
667 let fut = async move {
668 <T as StreamService>::get_min_uncommitted_object_id(
669 &inner,
670 request,
671 )
672 .await
673 };
674 Box::pin(fut)
675 }
676 }
677 let accept_compression_encodings = self.accept_compression_encodings;
678 let send_compression_encodings = self.send_compression_encodings;
679 let max_decoding_message_size = self.max_decoding_message_size;
680 let max_encoding_message_size = self.max_encoding_message_size;
681 let inner = self.inner.clone();
682 let fut = async move {
683 let method = GetMinUncommittedObjectIdSvc(inner);
684 let codec = tonic::codec::ProstCodec::default();
685 let mut grpc = tonic::server::Grpc::new(codec)
686 .apply_compression_config(
687 accept_compression_encodings,
688 send_compression_encodings,
689 )
690 .apply_max_message_size_config(
691 max_decoding_message_size,
692 max_encoding_message_size,
693 );
694 let res = grpc.unary(method, req).await;
695 Ok(res)
696 };
697 Box::pin(fut)
698 }
699 _ => {
700 Box::pin(async move {
701 let mut response = http::Response::new(empty_body());
702 let headers = response.headers_mut();
703 headers
704 .insert(
705 tonic::Status::GRPC_STATUS,
706 (tonic::Code::Unimplemented as i32).into(),
707 );
708 headers
709 .insert(
710 http::header::CONTENT_TYPE,
711 tonic::metadata::GRPC_CONTENT_TYPE,
712 );
713 Ok(response)
714 })
715 }
716 }
717 }
718 }
719 impl<T> Clone for StreamServiceServer<T> {
720 fn clone(&self) -> Self {
721 let inner = self.inner.clone();
722 Self {
723 inner,
724 accept_compression_encodings: self.accept_compression_encodings,
725 send_compression_encodings: self.send_compression_encodings,
726 max_decoding_message_size: self.max_decoding_message_size,
727 max_encoding_message_size: self.max_encoding_message_size,
728 }
729 }
730 }
731 pub const SERVICE_NAME: &str = "stream_service.StreamService";
733 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
734 const NAME: &'static str = SERVICE_NAME;
735 }
736}