1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1")]
20 pub sink_id: u32,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39}
40#[derive(prost_helpers::AnyPB)]
41#[derive(Clone, PartialEq, ::prost::Message)]
42pub struct SinkWriterStreamRequest {
43 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
44 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
45}
46pub mod sink_writer_stream_request {
48 #[derive(prost_helpers::AnyPB)]
49 #[derive(Clone, PartialEq, ::prost::Message)]
50 pub struct StartSink {
51 #[prost(message, optional, tag = "1")]
52 pub sink_param: ::core::option::Option<super::SinkParam>,
53 #[prost(message, optional, tag = "3")]
54 pub payload_schema: ::core::option::Option<super::TableSchema>,
55 }
56 #[derive(prost_helpers::AnyPB)]
57 #[derive(Clone, PartialEq, ::prost::Message)]
58 pub struct WriteBatch {
59 #[prost(uint64, tag = "3")]
60 pub batch_id: u64,
61 #[prost(uint64, tag = "4")]
62 pub epoch: u64,
63 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
64 pub payload: ::core::option::Option<write_batch::Payload>,
65 }
66 pub mod write_batch {
68 #[derive(prost_helpers::AnyPB)]
69 #[derive(Clone, PartialEq, ::prost::Message)]
70 pub struct StreamChunkPayload {
71 #[prost(bytes = "vec", tag = "1")]
72 pub binary_data: ::prost::alloc::vec::Vec<u8>,
73 }
74 #[derive(prost_helpers::AnyPB)]
75 #[derive(Clone, PartialEq, ::prost::Oneof)]
76 pub enum Payload {
77 #[prost(message, tag = "2")]
78 StreamChunkPayload(StreamChunkPayload),
79 #[prost(int64, tag = "5")]
83 StreamChunkRefPointer(i64),
84 }
85 }
86 #[derive(prost_helpers::AnyPB)]
87 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
88 pub struct Barrier {
89 #[prost(uint64, tag = "1")]
90 pub epoch: u64,
91 #[prost(bool, tag = "2")]
92 pub is_checkpoint: bool,
93 }
94 #[derive(prost_helpers::AnyPB)]
95 #[derive(Clone, PartialEq, ::prost::Oneof)]
96 pub enum Request {
97 #[prost(message, tag = "1")]
98 Start(StartSink),
99 #[prost(message, tag = "3")]
100 WriteBatch(WriteBatch),
101 #[prost(message, tag = "4")]
102 Barrier(Barrier),
103 }
104}
105#[derive(prost_helpers::AnyPB)]
106#[derive(Clone, PartialEq, ::prost::Message)]
107pub struct SinkWriterStreamResponse {
108 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
109 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
110}
111pub mod sink_writer_stream_response {
113 #[derive(prost_helpers::AnyPB)]
114 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
115 pub struct StartResponse {}
116 #[derive(prost_helpers::AnyPB)]
117 #[derive(Clone, PartialEq, ::prost::Message)]
118 pub struct CommitResponse {
119 #[prost(uint64, tag = "1")]
120 pub epoch: u64,
121 #[prost(message, optional, tag = "2")]
122 pub metadata: ::core::option::Option<super::SinkMetadata>,
123 }
124 #[derive(prost_helpers::AnyPB)]
125 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
126 pub struct BatchWrittenResponse {
127 #[prost(uint64, tag = "1")]
128 pub epoch: u64,
129 #[prost(uint64, tag = "2")]
130 pub batch_id: u64,
131 }
132 #[derive(prost_helpers::AnyPB)]
133 #[derive(Clone, PartialEq, ::prost::Oneof)]
134 pub enum Response {
135 #[prost(message, tag = "1")]
136 Start(StartResponse),
137 #[prost(message, tag = "2")]
138 Commit(CommitResponse),
139 #[prost(message, tag = "3")]
140 Batch(BatchWrittenResponse),
141 }
142}
143#[derive(prost_helpers::AnyPB)]
144#[derive(Clone, PartialEq, ::prost::Message)]
145pub struct ValidateSinkRequest {
146 #[prost(message, optional, tag = "1")]
147 pub sink_param: ::core::option::Option<SinkParam>,
148}
149#[derive(prost_helpers::AnyPB)]
150#[derive(Clone, PartialEq, ::prost::Message)]
151pub struct ValidateSinkResponse {
152 #[prost(message, optional, tag = "1")]
154 pub error: ::core::option::Option<ValidationError>,
155}
156#[derive(prost_helpers::AnyPB)]
157#[derive(Clone, PartialEq, ::prost::Message)]
158pub struct SinkMetadata {
159 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
160 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
161}
162pub mod sink_metadata {
164 #[derive(prost_helpers::AnyPB)]
165 #[derive(Clone, PartialEq, ::prost::Message)]
166 pub struct SerializedMetadata {
167 #[prost(bytes = "vec", tag = "1")]
168 pub metadata: ::prost::alloc::vec::Vec<u8>,
169 }
170 #[derive(prost_helpers::AnyPB)]
171 #[derive(Clone, PartialEq, ::prost::Oneof)]
172 pub enum Metadata {
173 #[prost(message, tag = "1")]
174 Serialized(SerializedMetadata),
175 }
176}
177#[derive(prost_helpers::AnyPB)]
178#[derive(Clone, PartialEq, ::prost::Message)]
179pub struct SinkCoordinatorStreamRequest {
180 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
181 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
182}
183pub mod sink_coordinator_stream_request {
185 #[derive(prost_helpers::AnyPB)]
186 #[derive(Clone, PartialEq, ::prost::Message)]
187 pub struct StartCoordinator {
188 #[prost(message, optional, tag = "1")]
189 pub param: ::core::option::Option<super::SinkParam>,
190 }
191 #[derive(prost_helpers::AnyPB)]
192 #[derive(Clone, PartialEq, ::prost::Message)]
193 pub struct CommitMetadata {
194 #[prost(uint64, tag = "1")]
195 pub epoch: u64,
196 #[prost(message, repeated, tag = "2")]
197 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
198 }
199 #[derive(prost_helpers::AnyPB)]
200 #[derive(Clone, PartialEq, ::prost::Oneof)]
201 pub enum Request {
202 #[prost(message, tag = "1")]
203 Start(StartCoordinator),
204 #[prost(message, tag = "2")]
205 Commit(CommitMetadata),
206 }
207}
208#[derive(prost_helpers::AnyPB)]
209#[derive(Clone, Copy, PartialEq, ::prost::Message)]
210pub struct SinkCoordinatorStreamResponse {
211 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
212 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
213}
214pub mod sink_coordinator_stream_response {
216 #[derive(prost_helpers::AnyPB)]
217 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
218 pub struct StartResponse {}
219 #[derive(prost_helpers::AnyPB)]
220 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
221 pub struct CommitResponse {
222 #[prost(uint64, tag = "1")]
223 pub epoch: u64,
224 }
225 #[derive(prost_helpers::AnyPB)]
226 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
227 pub enum Response {
228 #[prost(message, tag = "1")]
229 Start(StartResponse),
230 #[prost(message, tag = "2")]
231 Commit(CommitResponse),
232 }
233}
234#[derive(prost_helpers::AnyPB)]
235#[derive(Clone, PartialEq, ::prost::Message)]
236pub struct CdcMessage {
237 #[prost(string, tag = "1")]
239 pub payload: ::prost::alloc::string::String,
240 #[prost(string, tag = "2")]
241 pub partition: ::prost::alloc::string::String,
242 #[prost(string, tag = "3")]
243 pub offset: ::prost::alloc::string::String,
244 #[prost(string, tag = "4")]
245 pub full_table_name: ::prost::alloc::string::String,
246 #[prost(int64, tag = "5")]
247 pub source_ts_ms: i64,
248 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
249 pub msg_type: i32,
250 #[prost(string, tag = "7")]
252 pub key: ::prost::alloc::string::String,
253}
254pub mod cdc_message {
256 #[derive(prost_helpers::AnyPB)]
257 #[derive(
258 Clone,
259 Copy,
260 Debug,
261 PartialEq,
262 Eq,
263 Hash,
264 PartialOrd,
265 Ord,
266 ::prost::Enumeration
267 )]
268 #[repr(i32)]
269 pub enum CdcMessageType {
270 Unspecified = 0,
271 Heartbeat = 1,
272 Data = 2,
273 TransactionMeta = 3,
274 SchemaChange = 4,
275 }
276 impl CdcMessageType {
277 pub fn as_str_name(&self) -> &'static str {
282 match self {
283 Self::Unspecified => "UNSPECIFIED",
284 Self::Heartbeat => "HEARTBEAT",
285 Self::Data => "DATA",
286 Self::TransactionMeta => "TRANSACTION_META",
287 Self::SchemaChange => "SCHEMA_CHANGE",
288 }
289 }
290 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
292 match value {
293 "UNSPECIFIED" => Some(Self::Unspecified),
294 "HEARTBEAT" => Some(Self::Heartbeat),
295 "DATA" => Some(Self::Data),
296 "TRANSACTION_META" => Some(Self::TransactionMeta),
297 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
298 _ => None,
299 }
300 }
301 }
302}
303#[derive(prost_helpers::AnyPB)]
304#[derive(Clone, PartialEq, ::prost::Message)]
305pub struct GetEventStreamRequest {
306 #[prost(uint64, tag = "1")]
307 pub source_id: u64,
308 #[prost(enumeration = "SourceType", tag = "2")]
309 pub source_type: i32,
310 #[prost(string, tag = "3")]
311 pub start_offset: ::prost::alloc::string::String,
312 #[prost(btree_map = "string, string", tag = "4")]
313 pub properties: ::prost::alloc::collections::BTreeMap<
314 ::prost::alloc::string::String,
315 ::prost::alloc::string::String,
316 >,
317 #[prost(bool, tag = "5")]
318 pub snapshot_done: bool,
319 #[prost(bool, tag = "6")]
320 pub is_source_job: bool,
321}
322#[derive(prost_helpers::AnyPB)]
323#[derive(Clone, PartialEq, ::prost::Message)]
324pub struct GetEventStreamResponse {
325 #[prost(uint64, tag = "1")]
326 pub source_id: u64,
327 #[prost(message, repeated, tag = "2")]
328 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
329 #[prost(message, optional, tag = "3")]
330 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
331}
332pub mod get_event_stream_response {
334 #[derive(prost_helpers::AnyPB)]
335 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
336 pub struct ControlInfo {
337 #[prost(bool, tag = "1")]
338 pub handshake_ok: bool,
339 }
340}
341#[derive(prost_helpers::AnyPB)]
342#[derive(Clone, PartialEq, ::prost::Message)]
343pub struct ValidateSourceRequest {
344 #[prost(uint64, tag = "1")]
345 pub source_id: u64,
346 #[prost(enumeration = "SourceType", tag = "2")]
347 pub source_type: i32,
348 #[prost(btree_map = "string, string", tag = "3")]
349 pub properties: ::prost::alloc::collections::BTreeMap<
350 ::prost::alloc::string::String,
351 ::prost::alloc::string::String,
352 >,
353 #[prost(message, optional, tag = "4")]
354 pub table_schema: ::core::option::Option<TableSchema>,
355 #[prost(bool, tag = "5")]
356 pub is_source_job: bool,
357 #[prost(bool, tag = "6")]
358 pub is_backfill_table: bool,
359}
360#[derive(prost_helpers::AnyPB)]
361#[derive(Clone, PartialEq, ::prost::Message)]
362pub struct ValidateSourceResponse {
363 #[prost(message, optional, tag = "1")]
365 pub error: ::core::option::Option<ValidationError>,
366}
367#[derive(prost_helpers::AnyPB)]
368#[derive(Clone, PartialEq, ::prost::Message)]
369pub struct CoordinateRequest {
370 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4")]
371 pub msg: ::core::option::Option<coordinate_request::Msg>,
372}
373pub mod coordinate_request {
375 #[derive(prost_helpers::AnyPB)]
378 #[derive(Clone, PartialEq, ::prost::Message)]
379 pub struct StartCoordinationRequest {
380 #[prost(message, optional, tag = "1")]
381 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
382 #[prost(message, optional, tag = "2")]
383 pub param: ::core::option::Option<super::SinkParam>,
384 }
385 #[derive(prost_helpers::AnyPB)]
386 #[derive(Clone, PartialEq, ::prost::Message)]
387 pub struct CommitRequest {
388 #[prost(uint64, tag = "1")]
389 pub epoch: u64,
390 #[prost(message, optional, tag = "2")]
391 pub metadata: ::core::option::Option<super::SinkMetadata>,
392 }
393 #[derive(prost_helpers::AnyPB)]
394 #[derive(Clone, PartialEq, ::prost::Message)]
395 pub struct UpdateVnodeBitmapRequest {
396 #[prost(message, optional, tag = "1")]
397 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
398 }
399 #[derive(prost_helpers::AnyPB)]
400 #[derive(Clone, PartialEq, ::prost::Oneof)]
401 pub enum Msg {
402 #[prost(message, tag = "1")]
403 StartRequest(StartCoordinationRequest),
404 #[prost(message, tag = "2")]
405 CommitRequest(CommitRequest),
406 #[prost(message, tag = "3")]
407 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
408 #[prost(bool, tag = "4")]
409 Stop(bool),
410 }
411}
412#[derive(prost_helpers::AnyPB)]
413#[derive(Clone, Copy, PartialEq, ::prost::Message)]
414pub struct CoordinateResponse {
415 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2")]
416 pub msg: ::core::option::Option<coordinate_response::Msg>,
417}
418pub mod coordinate_response {
420 #[derive(prost_helpers::AnyPB)]
421 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
422 pub struct StartCoordinationResponse {
423 #[prost(uint64, optional, tag = "1")]
424 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
425 }
426 #[derive(prost_helpers::AnyPB)]
427 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
428 pub struct CommitResponse {
429 #[prost(uint64, tag = "1")]
430 pub epoch: u64,
431 }
432 #[derive(prost_helpers::AnyPB)]
433 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
434 pub enum Msg {
435 #[prost(message, tag = "1")]
436 StartResponse(StartCoordinationResponse),
437 #[prost(message, tag = "2")]
438 CommitResponse(CommitResponse),
439 }
440}
441#[derive(prost_helpers::AnyPB)]
442#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
443#[repr(i32)]
444pub enum SourceType {
445 Unspecified = 0,
446 Mysql = 1,
447 Postgres = 2,
448 Citus = 3,
449 Mongodb = 4,
450 SqlServer = 5,
451}
452impl SourceType {
453 pub fn as_str_name(&self) -> &'static str {
458 match self {
459 Self::Unspecified => "UNSPECIFIED",
460 Self::Mysql => "MYSQL",
461 Self::Postgres => "POSTGRES",
462 Self::Citus => "CITUS",
463 Self::Mongodb => "MONGODB",
464 Self::SqlServer => "SQL_SERVER",
465 }
466 }
467 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
469 match value {
470 "UNSPECIFIED" => Some(Self::Unspecified),
471 "MYSQL" => Some(Self::Mysql),
472 "POSTGRES" => Some(Self::Postgres),
473 "CITUS" => Some(Self::Citus),
474 "MONGODB" => Some(Self::Mongodb),
475 "SQL_SERVER" => Some(Self::SqlServer),
476 _ => None,
477 }
478 }
479}
480pub mod connector_service_client {
482 #![allow(
483 unused_variables,
484 dead_code,
485 missing_docs,
486 clippy::wildcard_imports,
487 clippy::let_unit_value,
488 )]
489 use tonic::codegen::*;
490 use tonic::codegen::http::Uri;
491 #[derive(Debug, Clone)]
492 pub struct ConnectorServiceClient<T> {
493 inner: tonic::client::Grpc<T>,
494 }
495 impl ConnectorServiceClient<tonic::transport::Channel> {
496 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
498 where
499 D: TryInto<tonic::transport::Endpoint>,
500 D::Error: Into<StdError>,
501 {
502 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
503 Ok(Self::new(conn))
504 }
505 }
506 impl<T> ConnectorServiceClient<T>
507 where
508 T: tonic::client::GrpcService<tonic::body::BoxBody>,
509 T::Error: Into<StdError>,
510 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
511 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
512 {
513 pub fn new(inner: T) -> Self {
514 let inner = tonic::client::Grpc::new(inner);
515 Self { inner }
516 }
517 pub fn with_origin(inner: T, origin: Uri) -> Self {
518 let inner = tonic::client::Grpc::with_origin(inner, origin);
519 Self { inner }
520 }
521 pub fn with_interceptor<F>(
522 inner: T,
523 interceptor: F,
524 ) -> ConnectorServiceClient<InterceptedService<T, F>>
525 where
526 F: tonic::service::Interceptor,
527 T::ResponseBody: Default,
528 T: tonic::codegen::Service<
529 http::Request<tonic::body::BoxBody>,
530 Response = http::Response<
531 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
532 >,
533 >,
534 <T as tonic::codegen::Service<
535 http::Request<tonic::body::BoxBody>,
536 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
537 {
538 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
539 }
540 #[must_use]
545 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
546 self.inner = self.inner.send_compressed(encoding);
547 self
548 }
549 #[must_use]
551 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
552 self.inner = self.inner.accept_compressed(encoding);
553 self
554 }
555 #[must_use]
559 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
560 self.inner = self.inner.max_decoding_message_size(limit);
561 self
562 }
563 #[must_use]
567 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
568 self.inner = self.inner.max_encoding_message_size(limit);
569 self
570 }
571 pub async fn sink_writer_stream(
572 &mut self,
573 request: impl tonic::IntoStreamingRequest<
574 Message = super::SinkWriterStreamRequest,
575 >,
576 ) -> std::result::Result<
577 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
578 tonic::Status,
579 > {
580 self.inner
581 .ready()
582 .await
583 .map_err(|e| {
584 tonic::Status::unknown(
585 format!("Service was not ready: {}", e.into()),
586 )
587 })?;
588 let codec = tonic::codec::ProstCodec::default();
589 let path = http::uri::PathAndQuery::from_static(
590 "/connector_service.ConnectorService/SinkWriterStream",
591 );
592 let mut req = request.into_streaming_request();
593 req.extensions_mut()
594 .insert(
595 GrpcMethod::new(
596 "connector_service.ConnectorService",
597 "SinkWriterStream",
598 ),
599 );
600 self.inner.streaming(req, path, codec).await
601 }
602 pub async fn sink_coordinator_stream(
603 &mut self,
604 request: impl tonic::IntoStreamingRequest<
605 Message = super::SinkCoordinatorStreamRequest,
606 >,
607 ) -> std::result::Result<
608 tonic::Response<
609 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
610 >,
611 tonic::Status,
612 > {
613 self.inner
614 .ready()
615 .await
616 .map_err(|e| {
617 tonic::Status::unknown(
618 format!("Service was not ready: {}", e.into()),
619 )
620 })?;
621 let codec = tonic::codec::ProstCodec::default();
622 let path = http::uri::PathAndQuery::from_static(
623 "/connector_service.ConnectorService/SinkCoordinatorStream",
624 );
625 let mut req = request.into_streaming_request();
626 req.extensions_mut()
627 .insert(
628 GrpcMethod::new(
629 "connector_service.ConnectorService",
630 "SinkCoordinatorStream",
631 ),
632 );
633 self.inner.streaming(req, path, codec).await
634 }
635 pub async fn validate_sink(
636 &mut self,
637 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
638 ) -> std::result::Result<
639 tonic::Response<super::ValidateSinkResponse>,
640 tonic::Status,
641 > {
642 self.inner
643 .ready()
644 .await
645 .map_err(|e| {
646 tonic::Status::unknown(
647 format!("Service was not ready: {}", e.into()),
648 )
649 })?;
650 let codec = tonic::codec::ProstCodec::default();
651 let path = http::uri::PathAndQuery::from_static(
652 "/connector_service.ConnectorService/ValidateSink",
653 );
654 let mut req = request.into_request();
655 req.extensions_mut()
656 .insert(
657 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
658 );
659 self.inner.unary(req, path, codec).await
660 }
661 pub async fn get_event_stream(
662 &mut self,
663 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
664 ) -> std::result::Result<
665 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
666 tonic::Status,
667 > {
668 self.inner
669 .ready()
670 .await
671 .map_err(|e| {
672 tonic::Status::unknown(
673 format!("Service was not ready: {}", e.into()),
674 )
675 })?;
676 let codec = tonic::codec::ProstCodec::default();
677 let path = http::uri::PathAndQuery::from_static(
678 "/connector_service.ConnectorService/GetEventStream",
679 );
680 let mut req = request.into_request();
681 req.extensions_mut()
682 .insert(
683 GrpcMethod::new(
684 "connector_service.ConnectorService",
685 "GetEventStream",
686 ),
687 );
688 self.inner.server_streaming(req, path, codec).await
689 }
690 pub async fn validate_source(
691 &mut self,
692 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
693 ) -> std::result::Result<
694 tonic::Response<super::ValidateSourceResponse>,
695 tonic::Status,
696 > {
697 self.inner
698 .ready()
699 .await
700 .map_err(|e| {
701 tonic::Status::unknown(
702 format!("Service was not ready: {}", e.into()),
703 )
704 })?;
705 let codec = tonic::codec::ProstCodec::default();
706 let path = http::uri::PathAndQuery::from_static(
707 "/connector_service.ConnectorService/ValidateSource",
708 );
709 let mut req = request.into_request();
710 req.extensions_mut()
711 .insert(
712 GrpcMethod::new(
713 "connector_service.ConnectorService",
714 "ValidateSource",
715 ),
716 );
717 self.inner.unary(req, path, codec).await
718 }
719 }
720}
721pub mod sink_coordination_service_client {
723 #![allow(
724 unused_variables,
725 dead_code,
726 missing_docs,
727 clippy::wildcard_imports,
728 clippy::let_unit_value,
729 )]
730 use tonic::codegen::*;
731 use tonic::codegen::http::Uri;
732 #[derive(Debug, Clone)]
733 pub struct SinkCoordinationServiceClient<T> {
734 inner: tonic::client::Grpc<T>,
735 }
736 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
737 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
739 where
740 D: TryInto<tonic::transport::Endpoint>,
741 D::Error: Into<StdError>,
742 {
743 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
744 Ok(Self::new(conn))
745 }
746 }
747 impl<T> SinkCoordinationServiceClient<T>
748 where
749 T: tonic::client::GrpcService<tonic::body::BoxBody>,
750 T::Error: Into<StdError>,
751 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
752 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
753 {
754 pub fn new(inner: T) -> Self {
755 let inner = tonic::client::Grpc::new(inner);
756 Self { inner }
757 }
758 pub fn with_origin(inner: T, origin: Uri) -> Self {
759 let inner = tonic::client::Grpc::with_origin(inner, origin);
760 Self { inner }
761 }
762 pub fn with_interceptor<F>(
763 inner: T,
764 interceptor: F,
765 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
766 where
767 F: tonic::service::Interceptor,
768 T::ResponseBody: Default,
769 T: tonic::codegen::Service<
770 http::Request<tonic::body::BoxBody>,
771 Response = http::Response<
772 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
773 >,
774 >,
775 <T as tonic::codegen::Service<
776 http::Request<tonic::body::BoxBody>,
777 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
778 {
779 SinkCoordinationServiceClient::new(
780 InterceptedService::new(inner, interceptor),
781 )
782 }
783 #[must_use]
788 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
789 self.inner = self.inner.send_compressed(encoding);
790 self
791 }
792 #[must_use]
794 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
795 self.inner = self.inner.accept_compressed(encoding);
796 self
797 }
798 #[must_use]
802 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
803 self.inner = self.inner.max_decoding_message_size(limit);
804 self
805 }
806 #[must_use]
810 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
811 self.inner = self.inner.max_encoding_message_size(limit);
812 self
813 }
814 pub async fn coordinate(
815 &mut self,
816 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
817 ) -> std::result::Result<
818 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
819 tonic::Status,
820 > {
821 self.inner
822 .ready()
823 .await
824 .map_err(|e| {
825 tonic::Status::unknown(
826 format!("Service was not ready: {}", e.into()),
827 )
828 })?;
829 let codec = tonic::codec::ProstCodec::default();
830 let path = http::uri::PathAndQuery::from_static(
831 "/connector_service.SinkCoordinationService/Coordinate",
832 );
833 let mut req = request.into_streaming_request();
834 req.extensions_mut()
835 .insert(
836 GrpcMethod::new(
837 "connector_service.SinkCoordinationService",
838 "Coordinate",
839 ),
840 );
841 self.inner.streaming(req, path, codec).await
842 }
843 }
844}
845pub mod connector_service_server {
847 #![allow(
848 unused_variables,
849 dead_code,
850 missing_docs,
851 clippy::wildcard_imports,
852 clippy::let_unit_value,
853 )]
854 use tonic::codegen::*;
855 #[async_trait]
857 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
858 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
860 Item = std::result::Result<
861 super::SinkWriterStreamResponse,
862 tonic::Status,
863 >,
864 >
865 + std::marker::Send
866 + 'static;
867 async fn sink_writer_stream(
868 &self,
869 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
870 ) -> std::result::Result<
871 tonic::Response<Self::SinkWriterStreamStream>,
872 tonic::Status,
873 >;
874 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
876 Item = std::result::Result<
877 super::SinkCoordinatorStreamResponse,
878 tonic::Status,
879 >,
880 >
881 + std::marker::Send
882 + 'static;
883 async fn sink_coordinator_stream(
884 &self,
885 request: tonic::Request<
886 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
887 >,
888 ) -> std::result::Result<
889 tonic::Response<Self::SinkCoordinatorStreamStream>,
890 tonic::Status,
891 >;
892 async fn validate_sink(
893 &self,
894 request: tonic::Request<super::ValidateSinkRequest>,
895 ) -> std::result::Result<
896 tonic::Response<super::ValidateSinkResponse>,
897 tonic::Status,
898 >;
899 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
901 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
902 >
903 + std::marker::Send
904 + 'static;
905 async fn get_event_stream(
906 &self,
907 request: tonic::Request<super::GetEventStreamRequest>,
908 ) -> std::result::Result<
909 tonic::Response<Self::GetEventStreamStream>,
910 tonic::Status,
911 >;
912 async fn validate_source(
913 &self,
914 request: tonic::Request<super::ValidateSourceRequest>,
915 ) -> std::result::Result<
916 tonic::Response<super::ValidateSourceResponse>,
917 tonic::Status,
918 >;
919 }
920 #[derive(Debug)]
921 pub struct ConnectorServiceServer<T> {
922 inner: Arc<T>,
923 accept_compression_encodings: EnabledCompressionEncodings,
924 send_compression_encodings: EnabledCompressionEncodings,
925 max_decoding_message_size: Option<usize>,
926 max_encoding_message_size: Option<usize>,
927 }
928 impl<T> ConnectorServiceServer<T> {
929 pub fn new(inner: T) -> Self {
930 Self::from_arc(Arc::new(inner))
931 }
932 pub fn from_arc(inner: Arc<T>) -> Self {
933 Self {
934 inner,
935 accept_compression_encodings: Default::default(),
936 send_compression_encodings: Default::default(),
937 max_decoding_message_size: None,
938 max_encoding_message_size: None,
939 }
940 }
941 pub fn with_interceptor<F>(
942 inner: T,
943 interceptor: F,
944 ) -> InterceptedService<Self, F>
945 where
946 F: tonic::service::Interceptor,
947 {
948 InterceptedService::new(Self::new(inner), interceptor)
949 }
950 #[must_use]
952 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
953 self.accept_compression_encodings.enable(encoding);
954 self
955 }
956 #[must_use]
958 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
959 self.send_compression_encodings.enable(encoding);
960 self
961 }
962 #[must_use]
966 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
967 self.max_decoding_message_size = Some(limit);
968 self
969 }
970 #[must_use]
974 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
975 self.max_encoding_message_size = Some(limit);
976 self
977 }
978 }
979 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
980 where
981 T: ConnectorService,
982 B: Body + std::marker::Send + 'static,
983 B::Error: Into<StdError> + std::marker::Send + 'static,
984 {
985 type Response = http::Response<tonic::body::BoxBody>;
986 type Error = std::convert::Infallible;
987 type Future = BoxFuture<Self::Response, Self::Error>;
988 fn poll_ready(
989 &mut self,
990 _cx: &mut Context<'_>,
991 ) -> Poll<std::result::Result<(), Self::Error>> {
992 Poll::Ready(Ok(()))
993 }
994 fn call(&mut self, req: http::Request<B>) -> Self::Future {
995 match req.uri().path() {
996 "/connector_service.ConnectorService/SinkWriterStream" => {
997 #[allow(non_camel_case_types)]
998 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
999 impl<
1000 T: ConnectorService,
1001 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1002 for SinkWriterStreamSvc<T> {
1003 type Response = super::SinkWriterStreamResponse;
1004 type ResponseStream = T::SinkWriterStreamStream;
1005 type Future = BoxFuture<
1006 tonic::Response<Self::ResponseStream>,
1007 tonic::Status,
1008 >;
1009 fn call(
1010 &mut self,
1011 request: tonic::Request<
1012 tonic::Streaming<super::SinkWriterStreamRequest>,
1013 >,
1014 ) -> Self::Future {
1015 let inner = Arc::clone(&self.0);
1016 let fut = async move {
1017 <T as ConnectorService>::sink_writer_stream(&inner, request)
1018 .await
1019 };
1020 Box::pin(fut)
1021 }
1022 }
1023 let accept_compression_encodings = self.accept_compression_encodings;
1024 let send_compression_encodings = self.send_compression_encodings;
1025 let max_decoding_message_size = self.max_decoding_message_size;
1026 let max_encoding_message_size = self.max_encoding_message_size;
1027 let inner = self.inner.clone();
1028 let fut = async move {
1029 let method = SinkWriterStreamSvc(inner);
1030 let codec = tonic::codec::ProstCodec::default();
1031 let mut grpc = tonic::server::Grpc::new(codec)
1032 .apply_compression_config(
1033 accept_compression_encodings,
1034 send_compression_encodings,
1035 )
1036 .apply_max_message_size_config(
1037 max_decoding_message_size,
1038 max_encoding_message_size,
1039 );
1040 let res = grpc.streaming(method, req).await;
1041 Ok(res)
1042 };
1043 Box::pin(fut)
1044 }
1045 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1046 #[allow(non_camel_case_types)]
1047 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1048 impl<
1049 T: ConnectorService,
1050 > tonic::server::StreamingService<
1051 super::SinkCoordinatorStreamRequest,
1052 > for SinkCoordinatorStreamSvc<T> {
1053 type Response = super::SinkCoordinatorStreamResponse;
1054 type ResponseStream = T::SinkCoordinatorStreamStream;
1055 type Future = BoxFuture<
1056 tonic::Response<Self::ResponseStream>,
1057 tonic::Status,
1058 >;
1059 fn call(
1060 &mut self,
1061 request: tonic::Request<
1062 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1063 >,
1064 ) -> Self::Future {
1065 let inner = Arc::clone(&self.0);
1066 let fut = async move {
1067 <T as ConnectorService>::sink_coordinator_stream(
1068 &inner,
1069 request,
1070 )
1071 .await
1072 };
1073 Box::pin(fut)
1074 }
1075 }
1076 let accept_compression_encodings = self.accept_compression_encodings;
1077 let send_compression_encodings = self.send_compression_encodings;
1078 let max_decoding_message_size = self.max_decoding_message_size;
1079 let max_encoding_message_size = self.max_encoding_message_size;
1080 let inner = self.inner.clone();
1081 let fut = async move {
1082 let method = SinkCoordinatorStreamSvc(inner);
1083 let codec = tonic::codec::ProstCodec::default();
1084 let mut grpc = tonic::server::Grpc::new(codec)
1085 .apply_compression_config(
1086 accept_compression_encodings,
1087 send_compression_encodings,
1088 )
1089 .apply_max_message_size_config(
1090 max_decoding_message_size,
1091 max_encoding_message_size,
1092 );
1093 let res = grpc.streaming(method, req).await;
1094 Ok(res)
1095 };
1096 Box::pin(fut)
1097 }
1098 "/connector_service.ConnectorService/ValidateSink" => {
1099 #[allow(non_camel_case_types)]
1100 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1101 impl<
1102 T: ConnectorService,
1103 > tonic::server::UnaryService<super::ValidateSinkRequest>
1104 for ValidateSinkSvc<T> {
1105 type Response = super::ValidateSinkResponse;
1106 type Future = BoxFuture<
1107 tonic::Response<Self::Response>,
1108 tonic::Status,
1109 >;
1110 fn call(
1111 &mut self,
1112 request: tonic::Request<super::ValidateSinkRequest>,
1113 ) -> Self::Future {
1114 let inner = Arc::clone(&self.0);
1115 let fut = async move {
1116 <T as ConnectorService>::validate_sink(&inner, request)
1117 .await
1118 };
1119 Box::pin(fut)
1120 }
1121 }
1122 let accept_compression_encodings = self.accept_compression_encodings;
1123 let send_compression_encodings = self.send_compression_encodings;
1124 let max_decoding_message_size = self.max_decoding_message_size;
1125 let max_encoding_message_size = self.max_encoding_message_size;
1126 let inner = self.inner.clone();
1127 let fut = async move {
1128 let method = ValidateSinkSvc(inner);
1129 let codec = tonic::codec::ProstCodec::default();
1130 let mut grpc = tonic::server::Grpc::new(codec)
1131 .apply_compression_config(
1132 accept_compression_encodings,
1133 send_compression_encodings,
1134 )
1135 .apply_max_message_size_config(
1136 max_decoding_message_size,
1137 max_encoding_message_size,
1138 );
1139 let res = grpc.unary(method, req).await;
1140 Ok(res)
1141 };
1142 Box::pin(fut)
1143 }
1144 "/connector_service.ConnectorService/GetEventStream" => {
1145 #[allow(non_camel_case_types)]
1146 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1147 impl<
1148 T: ConnectorService,
1149 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1150 for GetEventStreamSvc<T> {
1151 type Response = super::GetEventStreamResponse;
1152 type ResponseStream = T::GetEventStreamStream;
1153 type Future = BoxFuture<
1154 tonic::Response<Self::ResponseStream>,
1155 tonic::Status,
1156 >;
1157 fn call(
1158 &mut self,
1159 request: tonic::Request<super::GetEventStreamRequest>,
1160 ) -> Self::Future {
1161 let inner = Arc::clone(&self.0);
1162 let fut = async move {
1163 <T as ConnectorService>::get_event_stream(&inner, request)
1164 .await
1165 };
1166 Box::pin(fut)
1167 }
1168 }
1169 let accept_compression_encodings = self.accept_compression_encodings;
1170 let send_compression_encodings = self.send_compression_encodings;
1171 let max_decoding_message_size = self.max_decoding_message_size;
1172 let max_encoding_message_size = self.max_encoding_message_size;
1173 let inner = self.inner.clone();
1174 let fut = async move {
1175 let method = GetEventStreamSvc(inner);
1176 let codec = tonic::codec::ProstCodec::default();
1177 let mut grpc = tonic::server::Grpc::new(codec)
1178 .apply_compression_config(
1179 accept_compression_encodings,
1180 send_compression_encodings,
1181 )
1182 .apply_max_message_size_config(
1183 max_decoding_message_size,
1184 max_encoding_message_size,
1185 );
1186 let res = grpc.server_streaming(method, req).await;
1187 Ok(res)
1188 };
1189 Box::pin(fut)
1190 }
1191 "/connector_service.ConnectorService/ValidateSource" => {
1192 #[allow(non_camel_case_types)]
1193 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1194 impl<
1195 T: ConnectorService,
1196 > tonic::server::UnaryService<super::ValidateSourceRequest>
1197 for ValidateSourceSvc<T> {
1198 type Response = super::ValidateSourceResponse;
1199 type Future = BoxFuture<
1200 tonic::Response<Self::Response>,
1201 tonic::Status,
1202 >;
1203 fn call(
1204 &mut self,
1205 request: tonic::Request<super::ValidateSourceRequest>,
1206 ) -> Self::Future {
1207 let inner = Arc::clone(&self.0);
1208 let fut = async move {
1209 <T as ConnectorService>::validate_source(&inner, request)
1210 .await
1211 };
1212 Box::pin(fut)
1213 }
1214 }
1215 let accept_compression_encodings = self.accept_compression_encodings;
1216 let send_compression_encodings = self.send_compression_encodings;
1217 let max_decoding_message_size = self.max_decoding_message_size;
1218 let max_encoding_message_size = self.max_encoding_message_size;
1219 let inner = self.inner.clone();
1220 let fut = async move {
1221 let method = ValidateSourceSvc(inner);
1222 let codec = tonic::codec::ProstCodec::default();
1223 let mut grpc = tonic::server::Grpc::new(codec)
1224 .apply_compression_config(
1225 accept_compression_encodings,
1226 send_compression_encodings,
1227 )
1228 .apply_max_message_size_config(
1229 max_decoding_message_size,
1230 max_encoding_message_size,
1231 );
1232 let res = grpc.unary(method, req).await;
1233 Ok(res)
1234 };
1235 Box::pin(fut)
1236 }
1237 _ => {
1238 Box::pin(async move {
1239 let mut response = http::Response::new(empty_body());
1240 let headers = response.headers_mut();
1241 headers
1242 .insert(
1243 tonic::Status::GRPC_STATUS,
1244 (tonic::Code::Unimplemented as i32).into(),
1245 );
1246 headers
1247 .insert(
1248 http::header::CONTENT_TYPE,
1249 tonic::metadata::GRPC_CONTENT_TYPE,
1250 );
1251 Ok(response)
1252 })
1253 }
1254 }
1255 }
1256 }
1257 impl<T> Clone for ConnectorServiceServer<T> {
1258 fn clone(&self) -> Self {
1259 let inner = self.inner.clone();
1260 Self {
1261 inner,
1262 accept_compression_encodings: self.accept_compression_encodings,
1263 send_compression_encodings: self.send_compression_encodings,
1264 max_decoding_message_size: self.max_decoding_message_size,
1265 max_encoding_message_size: self.max_encoding_message_size,
1266 }
1267 }
1268 }
1269 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1271 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1272 const NAME: &'static str = SERVICE_NAME;
1273 }
1274}
1275pub mod sink_coordination_service_server {
1277 #![allow(
1278 unused_variables,
1279 dead_code,
1280 missing_docs,
1281 clippy::wildcard_imports,
1282 clippy::let_unit_value,
1283 )]
1284 use tonic::codegen::*;
1285 #[async_trait]
1287 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1288 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1290 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1291 >
1292 + std::marker::Send
1293 + 'static;
1294 async fn coordinate(
1295 &self,
1296 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1297 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1298 }
1299 #[derive(Debug)]
1300 pub struct SinkCoordinationServiceServer<T> {
1301 inner: Arc<T>,
1302 accept_compression_encodings: EnabledCompressionEncodings,
1303 send_compression_encodings: EnabledCompressionEncodings,
1304 max_decoding_message_size: Option<usize>,
1305 max_encoding_message_size: Option<usize>,
1306 }
1307 impl<T> SinkCoordinationServiceServer<T> {
1308 pub fn new(inner: T) -> Self {
1309 Self::from_arc(Arc::new(inner))
1310 }
1311 pub fn from_arc(inner: Arc<T>) -> Self {
1312 Self {
1313 inner,
1314 accept_compression_encodings: Default::default(),
1315 send_compression_encodings: Default::default(),
1316 max_decoding_message_size: None,
1317 max_encoding_message_size: None,
1318 }
1319 }
1320 pub fn with_interceptor<F>(
1321 inner: T,
1322 interceptor: F,
1323 ) -> InterceptedService<Self, F>
1324 where
1325 F: tonic::service::Interceptor,
1326 {
1327 InterceptedService::new(Self::new(inner), interceptor)
1328 }
1329 #[must_use]
1331 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1332 self.accept_compression_encodings.enable(encoding);
1333 self
1334 }
1335 #[must_use]
1337 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1338 self.send_compression_encodings.enable(encoding);
1339 self
1340 }
1341 #[must_use]
1345 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1346 self.max_decoding_message_size = Some(limit);
1347 self
1348 }
1349 #[must_use]
1353 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1354 self.max_encoding_message_size = Some(limit);
1355 self
1356 }
1357 }
1358 impl<T, B> tonic::codegen::Service<http::Request<B>>
1359 for SinkCoordinationServiceServer<T>
1360 where
1361 T: SinkCoordinationService,
1362 B: Body + std::marker::Send + 'static,
1363 B::Error: Into<StdError> + std::marker::Send + 'static,
1364 {
1365 type Response = http::Response<tonic::body::BoxBody>;
1366 type Error = std::convert::Infallible;
1367 type Future = BoxFuture<Self::Response, Self::Error>;
1368 fn poll_ready(
1369 &mut self,
1370 _cx: &mut Context<'_>,
1371 ) -> Poll<std::result::Result<(), Self::Error>> {
1372 Poll::Ready(Ok(()))
1373 }
1374 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1375 match req.uri().path() {
1376 "/connector_service.SinkCoordinationService/Coordinate" => {
1377 #[allow(non_camel_case_types)]
1378 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1379 impl<
1380 T: SinkCoordinationService,
1381 > tonic::server::StreamingService<super::CoordinateRequest>
1382 for CoordinateSvc<T> {
1383 type Response = super::CoordinateResponse;
1384 type ResponseStream = T::CoordinateStream;
1385 type Future = BoxFuture<
1386 tonic::Response<Self::ResponseStream>,
1387 tonic::Status,
1388 >;
1389 fn call(
1390 &mut self,
1391 request: tonic::Request<
1392 tonic::Streaming<super::CoordinateRequest>,
1393 >,
1394 ) -> Self::Future {
1395 let inner = Arc::clone(&self.0);
1396 let fut = async move {
1397 <T as SinkCoordinationService>::coordinate(&inner, request)
1398 .await
1399 };
1400 Box::pin(fut)
1401 }
1402 }
1403 let accept_compression_encodings = self.accept_compression_encodings;
1404 let send_compression_encodings = self.send_compression_encodings;
1405 let max_decoding_message_size = self.max_decoding_message_size;
1406 let max_encoding_message_size = self.max_encoding_message_size;
1407 let inner = self.inner.clone();
1408 let fut = async move {
1409 let method = CoordinateSvc(inner);
1410 let codec = tonic::codec::ProstCodec::default();
1411 let mut grpc = tonic::server::Grpc::new(codec)
1412 .apply_compression_config(
1413 accept_compression_encodings,
1414 send_compression_encodings,
1415 )
1416 .apply_max_message_size_config(
1417 max_decoding_message_size,
1418 max_encoding_message_size,
1419 );
1420 let res = grpc.streaming(method, req).await;
1421 Ok(res)
1422 };
1423 Box::pin(fut)
1424 }
1425 _ => {
1426 Box::pin(async move {
1427 let mut response = http::Response::new(empty_body());
1428 let headers = response.headers_mut();
1429 headers
1430 .insert(
1431 tonic::Status::GRPC_STATUS,
1432 (tonic::Code::Unimplemented as i32).into(),
1433 );
1434 headers
1435 .insert(
1436 http::header::CONTENT_TYPE,
1437 tonic::metadata::GRPC_CONTENT_TYPE,
1438 );
1439 Ok(response)
1440 })
1441 }
1442 }
1443 }
1444 }
1445 impl<T> Clone for SinkCoordinationServiceServer<T> {
1446 fn clone(&self) -> Self {
1447 let inner = self.inner.clone();
1448 Self {
1449 inner,
1450 accept_compression_encodings: self.accept_compression_encodings,
1451 send_compression_encodings: self.send_compression_encodings,
1452 max_decoding_message_size: self.max_decoding_message_size,
1453 max_encoding_message_size: self.max_encoding_message_size,
1454 }
1455 }
1456 }
1457 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1459 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1460 const NAME: &'static str = SERVICE_NAME;
1461 }
1462}