1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1")]
20 pub sink_id: u32,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39}
40#[derive(prost_helpers::AnyPB)]
41#[derive(Clone, PartialEq, ::prost::Message)]
42pub struct SinkWriterStreamRequest {
43 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
44 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
45}
46pub mod sink_writer_stream_request {
48 #[derive(prost_helpers::AnyPB)]
49 #[derive(Clone, PartialEq, ::prost::Message)]
50 pub struct StartSink {
51 #[prost(message, optional, tag = "1")]
52 pub sink_param: ::core::option::Option<super::SinkParam>,
53 #[prost(message, optional, tag = "3")]
54 pub payload_schema: ::core::option::Option<super::TableSchema>,
55 }
56 #[derive(prost_helpers::AnyPB)]
57 #[derive(Clone, PartialEq, ::prost::Message)]
58 pub struct WriteBatch {
59 #[prost(uint64, tag = "3")]
60 pub batch_id: u64,
61 #[prost(uint64, tag = "4")]
62 pub epoch: u64,
63 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
64 pub payload: ::core::option::Option<write_batch::Payload>,
65 }
66 pub mod write_batch {
68 #[derive(prost_helpers::AnyPB)]
69 #[derive(Clone, PartialEq, ::prost::Message)]
70 pub struct StreamChunkPayload {
71 #[prost(bytes = "vec", tag = "1")]
72 pub binary_data: ::prost::alloc::vec::Vec<u8>,
73 }
74 #[derive(prost_helpers::AnyPB)]
75 #[derive(Clone, PartialEq, ::prost::Oneof)]
76 pub enum Payload {
77 #[prost(message, tag = "2")]
78 StreamChunkPayload(StreamChunkPayload),
79 #[prost(int64, tag = "5")]
83 StreamChunkRefPointer(i64),
84 }
85 }
86 #[derive(prost_helpers::AnyPB)]
87 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
88 pub struct Barrier {
89 #[prost(uint64, tag = "1")]
90 pub epoch: u64,
91 #[prost(bool, tag = "2")]
92 pub is_checkpoint: bool,
93 }
94 #[derive(prost_helpers::AnyPB)]
95 #[derive(Clone, PartialEq, ::prost::Oneof)]
96 pub enum Request {
97 #[prost(message, tag = "1")]
98 Start(StartSink),
99 #[prost(message, tag = "3")]
100 WriteBatch(WriteBatch),
101 #[prost(message, tag = "4")]
102 Barrier(Barrier),
103 }
104}
105#[derive(prost_helpers::AnyPB)]
106#[derive(Clone, PartialEq, ::prost::Message)]
107pub struct SinkWriterStreamResponse {
108 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
109 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
110}
111pub mod sink_writer_stream_response {
113 #[derive(prost_helpers::AnyPB)]
114 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
115 pub struct StartResponse {}
116 #[derive(prost_helpers::AnyPB)]
117 #[derive(Clone, PartialEq, ::prost::Message)]
118 pub struct CommitResponse {
119 #[prost(uint64, tag = "1")]
120 pub epoch: u64,
121 #[prost(message, optional, tag = "2")]
122 pub metadata: ::core::option::Option<super::SinkMetadata>,
123 }
124 #[derive(prost_helpers::AnyPB)]
125 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
126 pub struct BatchWrittenResponse {
127 #[prost(uint64, tag = "1")]
128 pub epoch: u64,
129 #[prost(uint64, tag = "2")]
130 pub batch_id: u64,
131 }
132 #[derive(prost_helpers::AnyPB)]
133 #[derive(Clone, PartialEq, ::prost::Oneof)]
134 pub enum Response {
135 #[prost(message, tag = "1")]
136 Start(StartResponse),
137 #[prost(message, tag = "2")]
138 Commit(CommitResponse),
139 #[prost(message, tag = "3")]
140 Batch(BatchWrittenResponse),
141 }
142}
143#[derive(prost_helpers::AnyPB)]
144#[derive(Clone, PartialEq, ::prost::Message)]
145pub struct ValidateSinkRequest {
146 #[prost(message, optional, tag = "1")]
147 pub sink_param: ::core::option::Option<SinkParam>,
148}
149#[derive(prost_helpers::AnyPB)]
150#[derive(Clone, PartialEq, ::prost::Message)]
151pub struct ValidateSinkResponse {
152 #[prost(message, optional, tag = "1")]
154 pub error: ::core::option::Option<ValidationError>,
155}
156#[derive(prost_helpers::AnyPB)]
157#[derive(Clone, PartialEq, ::prost::Message)]
158pub struct SinkMetadata {
159 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
160 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
161}
162pub mod sink_metadata {
164 #[derive(prost_helpers::AnyPB)]
165 #[derive(Clone, PartialEq, ::prost::Message)]
166 pub struct SerializedMetadata {
167 #[prost(bytes = "vec", tag = "1")]
168 pub metadata: ::prost::alloc::vec::Vec<u8>,
169 }
170 #[derive(prost_helpers::AnyPB)]
171 #[derive(Clone, PartialEq, ::prost::Oneof)]
172 pub enum Metadata {
173 #[prost(message, tag = "1")]
174 Serialized(SerializedMetadata),
175 }
176}
177#[derive(prost_helpers::AnyPB)]
178#[derive(Clone, PartialEq, ::prost::Message)]
179pub struct SinkCoordinatorStreamRequest {
180 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
181 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
182}
183pub mod sink_coordinator_stream_request {
185 #[derive(prost_helpers::AnyPB)]
186 #[derive(Clone, PartialEq, ::prost::Message)]
187 pub struct StartCoordinator {
188 #[prost(message, optional, tag = "1")]
189 pub param: ::core::option::Option<super::SinkParam>,
190 }
191 #[derive(prost_helpers::AnyPB)]
192 #[derive(Clone, PartialEq, ::prost::Message)]
193 pub struct CommitMetadata {
194 #[prost(uint64, tag = "1")]
195 pub epoch: u64,
196 #[prost(message, repeated, tag = "2")]
197 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
198 }
199 #[derive(prost_helpers::AnyPB)]
200 #[derive(Clone, PartialEq, ::prost::Oneof)]
201 pub enum Request {
202 #[prost(message, tag = "1")]
203 Start(StartCoordinator),
204 #[prost(message, tag = "2")]
205 Commit(CommitMetadata),
206 }
207}
208#[derive(prost_helpers::AnyPB)]
209#[derive(Clone, Copy, PartialEq, ::prost::Message)]
210pub struct SinkCoordinatorStreamResponse {
211 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
212 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
213}
214pub mod sink_coordinator_stream_response {
216 #[derive(prost_helpers::AnyPB)]
217 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
218 pub struct StartResponse {}
219 #[derive(prost_helpers::AnyPB)]
220 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
221 pub struct CommitResponse {
222 #[prost(uint64, tag = "1")]
223 pub epoch: u64,
224 }
225 #[derive(prost_helpers::AnyPB)]
226 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
227 pub enum Response {
228 #[prost(message, tag = "1")]
229 Start(StartResponse),
230 #[prost(message, tag = "2")]
231 Commit(CommitResponse),
232 }
233}
234#[derive(prost_helpers::AnyPB)]
235#[derive(Clone, PartialEq, ::prost::Message)]
236pub struct CdcMessage {
237 #[prost(string, tag = "1")]
239 pub payload: ::prost::alloc::string::String,
240 #[prost(string, tag = "2")]
241 pub partition: ::prost::alloc::string::String,
242 #[prost(string, tag = "3")]
243 pub offset: ::prost::alloc::string::String,
244 #[prost(string, tag = "4")]
245 pub full_table_name: ::prost::alloc::string::String,
246 #[prost(int64, tag = "5")]
247 pub source_ts_ms: i64,
248 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
249 pub msg_type: i32,
250 #[prost(string, tag = "7")]
252 pub key: ::prost::alloc::string::String,
253}
254pub mod cdc_message {
256 #[derive(prost_helpers::AnyPB)]
257 #[derive(
258 Clone,
259 Copy,
260 Debug,
261 PartialEq,
262 Eq,
263 Hash,
264 PartialOrd,
265 Ord,
266 ::prost::Enumeration
267 )]
268 #[repr(i32)]
269 pub enum CdcMessageType {
270 Unspecified = 0,
271 Heartbeat = 1,
272 Data = 2,
273 TransactionMeta = 3,
274 SchemaChange = 4,
275 }
276 impl CdcMessageType {
277 pub fn as_str_name(&self) -> &'static str {
282 match self {
283 Self::Unspecified => "UNSPECIFIED",
284 Self::Heartbeat => "HEARTBEAT",
285 Self::Data => "DATA",
286 Self::TransactionMeta => "TRANSACTION_META",
287 Self::SchemaChange => "SCHEMA_CHANGE",
288 }
289 }
290 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
292 match value {
293 "UNSPECIFIED" => Some(Self::Unspecified),
294 "HEARTBEAT" => Some(Self::Heartbeat),
295 "DATA" => Some(Self::Data),
296 "TRANSACTION_META" => Some(Self::TransactionMeta),
297 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
298 _ => None,
299 }
300 }
301 }
302}
303#[derive(prost_helpers::AnyPB)]
304#[derive(Clone, PartialEq, ::prost::Message)]
305pub struct GetEventStreamRequest {
306 #[prost(uint64, tag = "1")]
307 pub source_id: u64,
308 #[prost(enumeration = "SourceType", tag = "2")]
309 pub source_type: i32,
310 #[prost(string, tag = "3")]
311 pub start_offset: ::prost::alloc::string::String,
312 #[prost(btree_map = "string, string", tag = "4")]
313 pub properties: ::prost::alloc::collections::BTreeMap<
314 ::prost::alloc::string::String,
315 ::prost::alloc::string::String,
316 >,
317 #[prost(bool, tag = "5")]
318 pub snapshot_done: bool,
319 #[prost(bool, tag = "6")]
320 pub is_source_job: bool,
321}
322#[derive(prost_helpers::AnyPB)]
323#[derive(Clone, PartialEq, ::prost::Message)]
324pub struct GetEventStreamResponse {
325 #[prost(uint64, tag = "1")]
326 pub source_id: u64,
327 #[prost(message, repeated, tag = "2")]
328 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
329 #[prost(message, optional, tag = "3")]
330 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
331}
332pub mod get_event_stream_response {
334 #[derive(prost_helpers::AnyPB)]
335 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
336 pub struct ControlInfo {
337 #[prost(bool, tag = "1")]
338 pub handshake_ok: bool,
339 }
340}
341#[derive(prost_helpers::AnyPB)]
342#[derive(Clone, PartialEq, ::prost::Message)]
343pub struct ValidateSourceRequest {
344 #[prost(uint64, tag = "1")]
345 pub source_id: u64,
346 #[prost(enumeration = "SourceType", tag = "2")]
347 pub source_type: i32,
348 #[prost(btree_map = "string, string", tag = "3")]
349 pub properties: ::prost::alloc::collections::BTreeMap<
350 ::prost::alloc::string::String,
351 ::prost::alloc::string::String,
352 >,
353 #[prost(message, optional, tag = "4")]
354 pub table_schema: ::core::option::Option<TableSchema>,
355 #[prost(bool, tag = "5")]
356 pub is_source_job: bool,
357 #[prost(bool, tag = "6")]
358 pub is_backfill_table: bool,
359}
360#[derive(prost_helpers::AnyPB)]
361#[derive(Clone, PartialEq, ::prost::Message)]
362pub struct ValidateSourceResponse {
363 #[prost(message, optional, tag = "1")]
365 pub error: ::core::option::Option<ValidationError>,
366}
367#[derive(prost_helpers::AnyPB)]
368#[derive(Clone, PartialEq, ::prost::Message)]
369pub struct CoordinateRequest {
370 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
371 pub msg: ::core::option::Option<coordinate_request::Msg>,
372}
373pub mod coordinate_request {
375 #[derive(prost_helpers::AnyPB)]
378 #[derive(Clone, PartialEq, ::prost::Message)]
379 pub struct StartCoordinationRequest {
380 #[prost(message, optional, tag = "1")]
381 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
382 #[prost(message, optional, tag = "2")]
383 pub param: ::core::option::Option<super::SinkParam>,
384 }
385 #[derive(prost_helpers::AnyPB)]
386 #[derive(Clone, PartialEq, ::prost::Message)]
387 pub struct CommitRequest {
388 #[prost(uint64, tag = "1")]
389 pub epoch: u64,
390 #[prost(message, optional, tag = "2")]
391 pub metadata: ::core::option::Option<super::SinkMetadata>,
392 }
393 #[derive(prost_helpers::AnyPB)]
394 #[derive(Clone, PartialEq, ::prost::Message)]
395 pub struct UpdateVnodeBitmapRequest {
396 #[prost(message, optional, tag = "1")]
397 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
398 }
399 #[derive(prost_helpers::AnyPB)]
400 #[derive(Clone, PartialEq, ::prost::Oneof)]
401 pub enum Msg {
402 #[prost(message, tag = "1")]
403 StartRequest(StartCoordinationRequest),
404 #[prost(message, tag = "2")]
405 CommitRequest(CommitRequest),
406 #[prost(message, tag = "3")]
407 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
408 #[prost(bool, tag = "4")]
409 Stop(bool),
410 #[prost(uint64, tag = "5")]
411 AlignInitialEpochRequest(u64),
412 }
413}
414#[derive(prost_helpers::AnyPB)]
415#[derive(Clone, Copy, PartialEq, ::prost::Message)]
416pub struct CoordinateResponse {
417 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
418 pub msg: ::core::option::Option<coordinate_response::Msg>,
419}
420pub mod coordinate_response {
422 #[derive(prost_helpers::AnyPB)]
423 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
424 pub struct StartCoordinationResponse {
425 #[prost(uint64, optional, tag = "1")]
426 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
427 }
428 #[derive(prost_helpers::AnyPB)]
429 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
430 pub struct CommitResponse {
431 #[prost(uint64, tag = "1")]
432 pub epoch: u64,
433 }
434 #[derive(prost_helpers::AnyPB)]
435 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
436 pub enum Msg {
437 #[prost(message, tag = "1")]
438 StartResponse(StartCoordinationResponse),
439 #[prost(message, tag = "2")]
440 CommitResponse(CommitResponse),
441 #[prost(bool, tag = "3")]
442 Stopped(bool),
443 #[prost(uint64, tag = "4")]
444 AlignInitialEpochResponse(u64),
445 }
446}
447#[derive(prost_helpers::AnyPB)]
448#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
449#[repr(i32)]
450pub enum SourceType {
451 Unspecified = 0,
452 Mysql = 1,
453 Postgres = 2,
454 Citus = 3,
455 Mongodb = 4,
456 SqlServer = 5,
457}
458impl SourceType {
459 pub fn as_str_name(&self) -> &'static str {
464 match self {
465 Self::Unspecified => "UNSPECIFIED",
466 Self::Mysql => "MYSQL",
467 Self::Postgres => "POSTGRES",
468 Self::Citus => "CITUS",
469 Self::Mongodb => "MONGODB",
470 Self::SqlServer => "SQL_SERVER",
471 }
472 }
473 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
475 match value {
476 "UNSPECIFIED" => Some(Self::Unspecified),
477 "MYSQL" => Some(Self::Mysql),
478 "POSTGRES" => Some(Self::Postgres),
479 "CITUS" => Some(Self::Citus),
480 "MONGODB" => Some(Self::Mongodb),
481 "SQL_SERVER" => Some(Self::SqlServer),
482 _ => None,
483 }
484 }
485}
486pub mod connector_service_client {
488 #![allow(
489 unused_variables,
490 dead_code,
491 missing_docs,
492 clippy::wildcard_imports,
493 clippy::let_unit_value,
494 )]
495 use tonic::codegen::*;
496 use tonic::codegen::http::Uri;
497 #[derive(Debug, Clone)]
498 pub struct ConnectorServiceClient<T> {
499 inner: tonic::client::Grpc<T>,
500 }
501 impl ConnectorServiceClient<tonic::transport::Channel> {
502 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
504 where
505 D: TryInto<tonic::transport::Endpoint>,
506 D::Error: Into<StdError>,
507 {
508 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
509 Ok(Self::new(conn))
510 }
511 }
512 impl<T> ConnectorServiceClient<T>
513 where
514 T: tonic::client::GrpcService<tonic::body::BoxBody>,
515 T::Error: Into<StdError>,
516 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
517 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
518 {
519 pub fn new(inner: T) -> Self {
520 let inner = tonic::client::Grpc::new(inner);
521 Self { inner }
522 }
523 pub fn with_origin(inner: T, origin: Uri) -> Self {
524 let inner = tonic::client::Grpc::with_origin(inner, origin);
525 Self { inner }
526 }
527 pub fn with_interceptor<F>(
528 inner: T,
529 interceptor: F,
530 ) -> ConnectorServiceClient<InterceptedService<T, F>>
531 where
532 F: tonic::service::Interceptor,
533 T::ResponseBody: Default,
534 T: tonic::codegen::Service<
535 http::Request<tonic::body::BoxBody>,
536 Response = http::Response<
537 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
538 >,
539 >,
540 <T as tonic::codegen::Service<
541 http::Request<tonic::body::BoxBody>,
542 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
543 {
544 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
545 }
546 #[must_use]
551 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
552 self.inner = self.inner.send_compressed(encoding);
553 self
554 }
555 #[must_use]
557 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
558 self.inner = self.inner.accept_compressed(encoding);
559 self
560 }
561 #[must_use]
565 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
566 self.inner = self.inner.max_decoding_message_size(limit);
567 self
568 }
569 #[must_use]
573 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
574 self.inner = self.inner.max_encoding_message_size(limit);
575 self
576 }
577 pub async fn sink_writer_stream(
578 &mut self,
579 request: impl tonic::IntoStreamingRequest<
580 Message = super::SinkWriterStreamRequest,
581 >,
582 ) -> std::result::Result<
583 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
584 tonic::Status,
585 > {
586 self.inner
587 .ready()
588 .await
589 .map_err(|e| {
590 tonic::Status::unknown(
591 format!("Service was not ready: {}", e.into()),
592 )
593 })?;
594 let codec = tonic::codec::ProstCodec::default();
595 let path = http::uri::PathAndQuery::from_static(
596 "/connector_service.ConnectorService/SinkWriterStream",
597 );
598 let mut req = request.into_streaming_request();
599 req.extensions_mut()
600 .insert(
601 GrpcMethod::new(
602 "connector_service.ConnectorService",
603 "SinkWriterStream",
604 ),
605 );
606 self.inner.streaming(req, path, codec).await
607 }
608 pub async fn sink_coordinator_stream(
609 &mut self,
610 request: impl tonic::IntoStreamingRequest<
611 Message = super::SinkCoordinatorStreamRequest,
612 >,
613 ) -> std::result::Result<
614 tonic::Response<
615 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
616 >,
617 tonic::Status,
618 > {
619 self.inner
620 .ready()
621 .await
622 .map_err(|e| {
623 tonic::Status::unknown(
624 format!("Service was not ready: {}", e.into()),
625 )
626 })?;
627 let codec = tonic::codec::ProstCodec::default();
628 let path = http::uri::PathAndQuery::from_static(
629 "/connector_service.ConnectorService/SinkCoordinatorStream",
630 );
631 let mut req = request.into_streaming_request();
632 req.extensions_mut()
633 .insert(
634 GrpcMethod::new(
635 "connector_service.ConnectorService",
636 "SinkCoordinatorStream",
637 ),
638 );
639 self.inner.streaming(req, path, codec).await
640 }
641 pub async fn validate_sink(
642 &mut self,
643 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
644 ) -> std::result::Result<
645 tonic::Response<super::ValidateSinkResponse>,
646 tonic::Status,
647 > {
648 self.inner
649 .ready()
650 .await
651 .map_err(|e| {
652 tonic::Status::unknown(
653 format!("Service was not ready: {}", e.into()),
654 )
655 })?;
656 let codec = tonic::codec::ProstCodec::default();
657 let path = http::uri::PathAndQuery::from_static(
658 "/connector_service.ConnectorService/ValidateSink",
659 );
660 let mut req = request.into_request();
661 req.extensions_mut()
662 .insert(
663 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
664 );
665 self.inner.unary(req, path, codec).await
666 }
667 pub async fn get_event_stream(
668 &mut self,
669 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
670 ) -> std::result::Result<
671 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
672 tonic::Status,
673 > {
674 self.inner
675 .ready()
676 .await
677 .map_err(|e| {
678 tonic::Status::unknown(
679 format!("Service was not ready: {}", e.into()),
680 )
681 })?;
682 let codec = tonic::codec::ProstCodec::default();
683 let path = http::uri::PathAndQuery::from_static(
684 "/connector_service.ConnectorService/GetEventStream",
685 );
686 let mut req = request.into_request();
687 req.extensions_mut()
688 .insert(
689 GrpcMethod::new(
690 "connector_service.ConnectorService",
691 "GetEventStream",
692 ),
693 );
694 self.inner.server_streaming(req, path, codec).await
695 }
696 pub async fn validate_source(
697 &mut self,
698 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
699 ) -> std::result::Result<
700 tonic::Response<super::ValidateSourceResponse>,
701 tonic::Status,
702 > {
703 self.inner
704 .ready()
705 .await
706 .map_err(|e| {
707 tonic::Status::unknown(
708 format!("Service was not ready: {}", e.into()),
709 )
710 })?;
711 let codec = tonic::codec::ProstCodec::default();
712 let path = http::uri::PathAndQuery::from_static(
713 "/connector_service.ConnectorService/ValidateSource",
714 );
715 let mut req = request.into_request();
716 req.extensions_mut()
717 .insert(
718 GrpcMethod::new(
719 "connector_service.ConnectorService",
720 "ValidateSource",
721 ),
722 );
723 self.inner.unary(req, path, codec).await
724 }
725 }
726}
727pub mod sink_coordination_service_client {
729 #![allow(
730 unused_variables,
731 dead_code,
732 missing_docs,
733 clippy::wildcard_imports,
734 clippy::let_unit_value,
735 )]
736 use tonic::codegen::*;
737 use tonic::codegen::http::Uri;
738 #[derive(Debug, Clone)]
739 pub struct SinkCoordinationServiceClient<T> {
740 inner: tonic::client::Grpc<T>,
741 }
742 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
743 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
745 where
746 D: TryInto<tonic::transport::Endpoint>,
747 D::Error: Into<StdError>,
748 {
749 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
750 Ok(Self::new(conn))
751 }
752 }
753 impl<T> SinkCoordinationServiceClient<T>
754 where
755 T: tonic::client::GrpcService<tonic::body::BoxBody>,
756 T::Error: Into<StdError>,
757 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
758 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
759 {
760 pub fn new(inner: T) -> Self {
761 let inner = tonic::client::Grpc::new(inner);
762 Self { inner }
763 }
764 pub fn with_origin(inner: T, origin: Uri) -> Self {
765 let inner = tonic::client::Grpc::with_origin(inner, origin);
766 Self { inner }
767 }
768 pub fn with_interceptor<F>(
769 inner: T,
770 interceptor: F,
771 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
772 where
773 F: tonic::service::Interceptor,
774 T::ResponseBody: Default,
775 T: tonic::codegen::Service<
776 http::Request<tonic::body::BoxBody>,
777 Response = http::Response<
778 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
779 >,
780 >,
781 <T as tonic::codegen::Service<
782 http::Request<tonic::body::BoxBody>,
783 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
784 {
785 SinkCoordinationServiceClient::new(
786 InterceptedService::new(inner, interceptor),
787 )
788 }
789 #[must_use]
794 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
795 self.inner = self.inner.send_compressed(encoding);
796 self
797 }
798 #[must_use]
800 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
801 self.inner = self.inner.accept_compressed(encoding);
802 self
803 }
804 #[must_use]
808 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
809 self.inner = self.inner.max_decoding_message_size(limit);
810 self
811 }
812 #[must_use]
816 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
817 self.inner = self.inner.max_encoding_message_size(limit);
818 self
819 }
820 pub async fn coordinate(
821 &mut self,
822 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
823 ) -> std::result::Result<
824 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
825 tonic::Status,
826 > {
827 self.inner
828 .ready()
829 .await
830 .map_err(|e| {
831 tonic::Status::unknown(
832 format!("Service was not ready: {}", e.into()),
833 )
834 })?;
835 let codec = tonic::codec::ProstCodec::default();
836 let path = http::uri::PathAndQuery::from_static(
837 "/connector_service.SinkCoordinationService/Coordinate",
838 );
839 let mut req = request.into_streaming_request();
840 req.extensions_mut()
841 .insert(
842 GrpcMethod::new(
843 "connector_service.SinkCoordinationService",
844 "Coordinate",
845 ),
846 );
847 self.inner.streaming(req, path, codec).await
848 }
849 }
850}
851pub mod connector_service_server {
853 #![allow(
854 unused_variables,
855 dead_code,
856 missing_docs,
857 clippy::wildcard_imports,
858 clippy::let_unit_value,
859 )]
860 use tonic::codegen::*;
861 #[async_trait]
863 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
864 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
866 Item = std::result::Result<
867 super::SinkWriterStreamResponse,
868 tonic::Status,
869 >,
870 >
871 + std::marker::Send
872 + 'static;
873 async fn sink_writer_stream(
874 &self,
875 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
876 ) -> std::result::Result<
877 tonic::Response<Self::SinkWriterStreamStream>,
878 tonic::Status,
879 >;
880 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
882 Item = std::result::Result<
883 super::SinkCoordinatorStreamResponse,
884 tonic::Status,
885 >,
886 >
887 + std::marker::Send
888 + 'static;
889 async fn sink_coordinator_stream(
890 &self,
891 request: tonic::Request<
892 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
893 >,
894 ) -> std::result::Result<
895 tonic::Response<Self::SinkCoordinatorStreamStream>,
896 tonic::Status,
897 >;
898 async fn validate_sink(
899 &self,
900 request: tonic::Request<super::ValidateSinkRequest>,
901 ) -> std::result::Result<
902 tonic::Response<super::ValidateSinkResponse>,
903 tonic::Status,
904 >;
905 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
907 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
908 >
909 + std::marker::Send
910 + 'static;
911 async fn get_event_stream(
912 &self,
913 request: tonic::Request<super::GetEventStreamRequest>,
914 ) -> std::result::Result<
915 tonic::Response<Self::GetEventStreamStream>,
916 tonic::Status,
917 >;
918 async fn validate_source(
919 &self,
920 request: tonic::Request<super::ValidateSourceRequest>,
921 ) -> std::result::Result<
922 tonic::Response<super::ValidateSourceResponse>,
923 tonic::Status,
924 >;
925 }
926 #[derive(Debug)]
927 pub struct ConnectorServiceServer<T> {
928 inner: Arc<T>,
929 accept_compression_encodings: EnabledCompressionEncodings,
930 send_compression_encodings: EnabledCompressionEncodings,
931 max_decoding_message_size: Option<usize>,
932 max_encoding_message_size: Option<usize>,
933 }
934 impl<T> ConnectorServiceServer<T> {
935 pub fn new(inner: T) -> Self {
936 Self::from_arc(Arc::new(inner))
937 }
938 pub fn from_arc(inner: Arc<T>) -> Self {
939 Self {
940 inner,
941 accept_compression_encodings: Default::default(),
942 send_compression_encodings: Default::default(),
943 max_decoding_message_size: None,
944 max_encoding_message_size: None,
945 }
946 }
947 pub fn with_interceptor<F>(
948 inner: T,
949 interceptor: F,
950 ) -> InterceptedService<Self, F>
951 where
952 F: tonic::service::Interceptor,
953 {
954 InterceptedService::new(Self::new(inner), interceptor)
955 }
956 #[must_use]
958 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
959 self.accept_compression_encodings.enable(encoding);
960 self
961 }
962 #[must_use]
964 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
965 self.send_compression_encodings.enable(encoding);
966 self
967 }
968 #[must_use]
972 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
973 self.max_decoding_message_size = Some(limit);
974 self
975 }
976 #[must_use]
980 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
981 self.max_encoding_message_size = Some(limit);
982 self
983 }
984 }
985 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
986 where
987 T: ConnectorService,
988 B: Body + std::marker::Send + 'static,
989 B::Error: Into<StdError> + std::marker::Send + 'static,
990 {
991 type Response = http::Response<tonic::body::BoxBody>;
992 type Error = std::convert::Infallible;
993 type Future = BoxFuture<Self::Response, Self::Error>;
994 fn poll_ready(
995 &mut self,
996 _cx: &mut Context<'_>,
997 ) -> Poll<std::result::Result<(), Self::Error>> {
998 Poll::Ready(Ok(()))
999 }
1000 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1001 match req.uri().path() {
1002 "/connector_service.ConnectorService/SinkWriterStream" => {
1003 #[allow(non_camel_case_types)]
1004 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
1005 impl<
1006 T: ConnectorService,
1007 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1008 for SinkWriterStreamSvc<T> {
1009 type Response = super::SinkWriterStreamResponse;
1010 type ResponseStream = T::SinkWriterStreamStream;
1011 type Future = BoxFuture<
1012 tonic::Response<Self::ResponseStream>,
1013 tonic::Status,
1014 >;
1015 fn call(
1016 &mut self,
1017 request: tonic::Request<
1018 tonic::Streaming<super::SinkWriterStreamRequest>,
1019 >,
1020 ) -> Self::Future {
1021 let inner = Arc::clone(&self.0);
1022 let fut = async move {
1023 <T as ConnectorService>::sink_writer_stream(&inner, request)
1024 .await
1025 };
1026 Box::pin(fut)
1027 }
1028 }
1029 let accept_compression_encodings = self.accept_compression_encodings;
1030 let send_compression_encodings = self.send_compression_encodings;
1031 let max_decoding_message_size = self.max_decoding_message_size;
1032 let max_encoding_message_size = self.max_encoding_message_size;
1033 let inner = self.inner.clone();
1034 let fut = async move {
1035 let method = SinkWriterStreamSvc(inner);
1036 let codec = tonic::codec::ProstCodec::default();
1037 let mut grpc = tonic::server::Grpc::new(codec)
1038 .apply_compression_config(
1039 accept_compression_encodings,
1040 send_compression_encodings,
1041 )
1042 .apply_max_message_size_config(
1043 max_decoding_message_size,
1044 max_encoding_message_size,
1045 );
1046 let res = grpc.streaming(method, req).await;
1047 Ok(res)
1048 };
1049 Box::pin(fut)
1050 }
1051 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1052 #[allow(non_camel_case_types)]
1053 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1054 impl<
1055 T: ConnectorService,
1056 > tonic::server::StreamingService<
1057 super::SinkCoordinatorStreamRequest,
1058 > for SinkCoordinatorStreamSvc<T> {
1059 type Response = super::SinkCoordinatorStreamResponse;
1060 type ResponseStream = T::SinkCoordinatorStreamStream;
1061 type Future = BoxFuture<
1062 tonic::Response<Self::ResponseStream>,
1063 tonic::Status,
1064 >;
1065 fn call(
1066 &mut self,
1067 request: tonic::Request<
1068 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1069 >,
1070 ) -> Self::Future {
1071 let inner = Arc::clone(&self.0);
1072 let fut = async move {
1073 <T as ConnectorService>::sink_coordinator_stream(
1074 &inner,
1075 request,
1076 )
1077 .await
1078 };
1079 Box::pin(fut)
1080 }
1081 }
1082 let accept_compression_encodings = self.accept_compression_encodings;
1083 let send_compression_encodings = self.send_compression_encodings;
1084 let max_decoding_message_size = self.max_decoding_message_size;
1085 let max_encoding_message_size = self.max_encoding_message_size;
1086 let inner = self.inner.clone();
1087 let fut = async move {
1088 let method = SinkCoordinatorStreamSvc(inner);
1089 let codec = tonic::codec::ProstCodec::default();
1090 let mut grpc = tonic::server::Grpc::new(codec)
1091 .apply_compression_config(
1092 accept_compression_encodings,
1093 send_compression_encodings,
1094 )
1095 .apply_max_message_size_config(
1096 max_decoding_message_size,
1097 max_encoding_message_size,
1098 );
1099 let res = grpc.streaming(method, req).await;
1100 Ok(res)
1101 };
1102 Box::pin(fut)
1103 }
1104 "/connector_service.ConnectorService/ValidateSink" => {
1105 #[allow(non_camel_case_types)]
1106 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1107 impl<
1108 T: ConnectorService,
1109 > tonic::server::UnaryService<super::ValidateSinkRequest>
1110 for ValidateSinkSvc<T> {
1111 type Response = super::ValidateSinkResponse;
1112 type Future = BoxFuture<
1113 tonic::Response<Self::Response>,
1114 tonic::Status,
1115 >;
1116 fn call(
1117 &mut self,
1118 request: tonic::Request<super::ValidateSinkRequest>,
1119 ) -> Self::Future {
1120 let inner = Arc::clone(&self.0);
1121 let fut = async move {
1122 <T as ConnectorService>::validate_sink(&inner, request)
1123 .await
1124 };
1125 Box::pin(fut)
1126 }
1127 }
1128 let accept_compression_encodings = self.accept_compression_encodings;
1129 let send_compression_encodings = self.send_compression_encodings;
1130 let max_decoding_message_size = self.max_decoding_message_size;
1131 let max_encoding_message_size = self.max_encoding_message_size;
1132 let inner = self.inner.clone();
1133 let fut = async move {
1134 let method = ValidateSinkSvc(inner);
1135 let codec = tonic::codec::ProstCodec::default();
1136 let mut grpc = tonic::server::Grpc::new(codec)
1137 .apply_compression_config(
1138 accept_compression_encodings,
1139 send_compression_encodings,
1140 )
1141 .apply_max_message_size_config(
1142 max_decoding_message_size,
1143 max_encoding_message_size,
1144 );
1145 let res = grpc.unary(method, req).await;
1146 Ok(res)
1147 };
1148 Box::pin(fut)
1149 }
1150 "/connector_service.ConnectorService/GetEventStream" => {
1151 #[allow(non_camel_case_types)]
1152 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1153 impl<
1154 T: ConnectorService,
1155 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1156 for GetEventStreamSvc<T> {
1157 type Response = super::GetEventStreamResponse;
1158 type ResponseStream = T::GetEventStreamStream;
1159 type Future = BoxFuture<
1160 tonic::Response<Self::ResponseStream>,
1161 tonic::Status,
1162 >;
1163 fn call(
1164 &mut self,
1165 request: tonic::Request<super::GetEventStreamRequest>,
1166 ) -> Self::Future {
1167 let inner = Arc::clone(&self.0);
1168 let fut = async move {
1169 <T as ConnectorService>::get_event_stream(&inner, request)
1170 .await
1171 };
1172 Box::pin(fut)
1173 }
1174 }
1175 let accept_compression_encodings = self.accept_compression_encodings;
1176 let send_compression_encodings = self.send_compression_encodings;
1177 let max_decoding_message_size = self.max_decoding_message_size;
1178 let max_encoding_message_size = self.max_encoding_message_size;
1179 let inner = self.inner.clone();
1180 let fut = async move {
1181 let method = GetEventStreamSvc(inner);
1182 let codec = tonic::codec::ProstCodec::default();
1183 let mut grpc = tonic::server::Grpc::new(codec)
1184 .apply_compression_config(
1185 accept_compression_encodings,
1186 send_compression_encodings,
1187 )
1188 .apply_max_message_size_config(
1189 max_decoding_message_size,
1190 max_encoding_message_size,
1191 );
1192 let res = grpc.server_streaming(method, req).await;
1193 Ok(res)
1194 };
1195 Box::pin(fut)
1196 }
1197 "/connector_service.ConnectorService/ValidateSource" => {
1198 #[allow(non_camel_case_types)]
1199 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1200 impl<
1201 T: ConnectorService,
1202 > tonic::server::UnaryService<super::ValidateSourceRequest>
1203 for ValidateSourceSvc<T> {
1204 type Response = super::ValidateSourceResponse;
1205 type Future = BoxFuture<
1206 tonic::Response<Self::Response>,
1207 tonic::Status,
1208 >;
1209 fn call(
1210 &mut self,
1211 request: tonic::Request<super::ValidateSourceRequest>,
1212 ) -> Self::Future {
1213 let inner = Arc::clone(&self.0);
1214 let fut = async move {
1215 <T as ConnectorService>::validate_source(&inner, request)
1216 .await
1217 };
1218 Box::pin(fut)
1219 }
1220 }
1221 let accept_compression_encodings = self.accept_compression_encodings;
1222 let send_compression_encodings = self.send_compression_encodings;
1223 let max_decoding_message_size = self.max_decoding_message_size;
1224 let max_encoding_message_size = self.max_encoding_message_size;
1225 let inner = self.inner.clone();
1226 let fut = async move {
1227 let method = ValidateSourceSvc(inner);
1228 let codec = tonic::codec::ProstCodec::default();
1229 let mut grpc = tonic::server::Grpc::new(codec)
1230 .apply_compression_config(
1231 accept_compression_encodings,
1232 send_compression_encodings,
1233 )
1234 .apply_max_message_size_config(
1235 max_decoding_message_size,
1236 max_encoding_message_size,
1237 );
1238 let res = grpc.unary(method, req).await;
1239 Ok(res)
1240 };
1241 Box::pin(fut)
1242 }
1243 _ => {
1244 Box::pin(async move {
1245 let mut response = http::Response::new(empty_body());
1246 let headers = response.headers_mut();
1247 headers
1248 .insert(
1249 tonic::Status::GRPC_STATUS,
1250 (tonic::Code::Unimplemented as i32).into(),
1251 );
1252 headers
1253 .insert(
1254 http::header::CONTENT_TYPE,
1255 tonic::metadata::GRPC_CONTENT_TYPE,
1256 );
1257 Ok(response)
1258 })
1259 }
1260 }
1261 }
1262 }
1263 impl<T> Clone for ConnectorServiceServer<T> {
1264 fn clone(&self) -> Self {
1265 let inner = self.inner.clone();
1266 Self {
1267 inner,
1268 accept_compression_encodings: self.accept_compression_encodings,
1269 send_compression_encodings: self.send_compression_encodings,
1270 max_decoding_message_size: self.max_decoding_message_size,
1271 max_encoding_message_size: self.max_encoding_message_size,
1272 }
1273 }
1274 }
1275 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1277 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1278 const NAME: &'static str = SERVICE_NAME;
1279 }
1280}
1281pub mod sink_coordination_service_server {
1283 #![allow(
1284 unused_variables,
1285 dead_code,
1286 missing_docs,
1287 clippy::wildcard_imports,
1288 clippy::let_unit_value,
1289 )]
1290 use tonic::codegen::*;
1291 #[async_trait]
1293 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1294 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1296 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1297 >
1298 + std::marker::Send
1299 + 'static;
1300 async fn coordinate(
1301 &self,
1302 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1303 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1304 }
1305 #[derive(Debug)]
1306 pub struct SinkCoordinationServiceServer<T> {
1307 inner: Arc<T>,
1308 accept_compression_encodings: EnabledCompressionEncodings,
1309 send_compression_encodings: EnabledCompressionEncodings,
1310 max_decoding_message_size: Option<usize>,
1311 max_encoding_message_size: Option<usize>,
1312 }
1313 impl<T> SinkCoordinationServiceServer<T> {
1314 pub fn new(inner: T) -> Self {
1315 Self::from_arc(Arc::new(inner))
1316 }
1317 pub fn from_arc(inner: Arc<T>) -> Self {
1318 Self {
1319 inner,
1320 accept_compression_encodings: Default::default(),
1321 send_compression_encodings: Default::default(),
1322 max_decoding_message_size: None,
1323 max_encoding_message_size: None,
1324 }
1325 }
1326 pub fn with_interceptor<F>(
1327 inner: T,
1328 interceptor: F,
1329 ) -> InterceptedService<Self, F>
1330 where
1331 F: tonic::service::Interceptor,
1332 {
1333 InterceptedService::new(Self::new(inner), interceptor)
1334 }
1335 #[must_use]
1337 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1338 self.accept_compression_encodings.enable(encoding);
1339 self
1340 }
1341 #[must_use]
1343 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1344 self.send_compression_encodings.enable(encoding);
1345 self
1346 }
1347 #[must_use]
1351 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1352 self.max_decoding_message_size = Some(limit);
1353 self
1354 }
1355 #[must_use]
1359 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1360 self.max_encoding_message_size = Some(limit);
1361 self
1362 }
1363 }
1364 impl<T, B> tonic::codegen::Service<http::Request<B>>
1365 for SinkCoordinationServiceServer<T>
1366 where
1367 T: SinkCoordinationService,
1368 B: Body + std::marker::Send + 'static,
1369 B::Error: Into<StdError> + std::marker::Send + 'static,
1370 {
1371 type Response = http::Response<tonic::body::BoxBody>;
1372 type Error = std::convert::Infallible;
1373 type Future = BoxFuture<Self::Response, Self::Error>;
1374 fn poll_ready(
1375 &mut self,
1376 _cx: &mut Context<'_>,
1377 ) -> Poll<std::result::Result<(), Self::Error>> {
1378 Poll::Ready(Ok(()))
1379 }
1380 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1381 match req.uri().path() {
1382 "/connector_service.SinkCoordinationService/Coordinate" => {
1383 #[allow(non_camel_case_types)]
1384 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1385 impl<
1386 T: SinkCoordinationService,
1387 > tonic::server::StreamingService<super::CoordinateRequest>
1388 for CoordinateSvc<T> {
1389 type Response = super::CoordinateResponse;
1390 type ResponseStream = T::CoordinateStream;
1391 type Future = BoxFuture<
1392 tonic::Response<Self::ResponseStream>,
1393 tonic::Status,
1394 >;
1395 fn call(
1396 &mut self,
1397 request: tonic::Request<
1398 tonic::Streaming<super::CoordinateRequest>,
1399 >,
1400 ) -> Self::Future {
1401 let inner = Arc::clone(&self.0);
1402 let fut = async move {
1403 <T as SinkCoordinationService>::coordinate(&inner, request)
1404 .await
1405 };
1406 Box::pin(fut)
1407 }
1408 }
1409 let accept_compression_encodings = self.accept_compression_encodings;
1410 let send_compression_encodings = self.send_compression_encodings;
1411 let max_decoding_message_size = self.max_decoding_message_size;
1412 let max_encoding_message_size = self.max_encoding_message_size;
1413 let inner = self.inner.clone();
1414 let fut = async move {
1415 let method = CoordinateSvc(inner);
1416 let codec = tonic::codec::ProstCodec::default();
1417 let mut grpc = tonic::server::Grpc::new(codec)
1418 .apply_compression_config(
1419 accept_compression_encodings,
1420 send_compression_encodings,
1421 )
1422 .apply_max_message_size_config(
1423 max_decoding_message_size,
1424 max_encoding_message_size,
1425 );
1426 let res = grpc.streaming(method, req).await;
1427 Ok(res)
1428 };
1429 Box::pin(fut)
1430 }
1431 _ => {
1432 Box::pin(async move {
1433 let mut response = http::Response::new(empty_body());
1434 let headers = response.headers_mut();
1435 headers
1436 .insert(
1437 tonic::Status::GRPC_STATUS,
1438 (tonic::Code::Unimplemented as i32).into(),
1439 );
1440 headers
1441 .insert(
1442 http::header::CONTENT_TYPE,
1443 tonic::metadata::GRPC_CONTENT_TYPE,
1444 );
1445 Ok(response)
1446 })
1447 }
1448 }
1449 }
1450 }
1451 impl<T> Clone for SinkCoordinationServiceServer<T> {
1452 fn clone(&self) -> Self {
1453 let inner = self.inner.clone();
1454 Self {
1455 inner,
1456 accept_compression_encodings: self.accept_compression_encodings,
1457 send_compression_encodings: self.send_compression_encodings,
1458 max_decoding_message_size: self.max_decoding_message_size,
1459 max_encoding_message_size: self.max_encoding_message_size,
1460 }
1461 }
1462 }
1463 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1465 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1466 const NAME: &'static str = SERVICE_NAME;
1467 }
1468}