1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20 pub sink_id: crate::id::SinkId,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39 #[prost(bool, tag = "9")]
43 pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51pub mod sink_writer_stream_request {
53 #[derive(prost_helpers::AnyPB)]
54 #[derive(Clone, PartialEq, ::prost::Message)]
55 pub struct StartSink {
56 #[prost(message, optional, tag = "1")]
57 pub sink_param: ::core::option::Option<super::SinkParam>,
58 #[prost(message, optional, tag = "3")]
59 pub payload_schema: ::core::option::Option<super::TableSchema>,
60 }
61 #[derive(prost_helpers::AnyPB)]
62 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
63 pub struct WriteBatch {
64 #[prost(uint64, tag = "3")]
65 pub batch_id: u64,
66 #[prost(uint64, tag = "4")]
67 pub epoch: u64,
68 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69 pub payload: ::core::option::Option<write_batch::Payload>,
70 }
71 pub mod write_batch {
73 #[derive(prost_helpers::AnyPB)]
74 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
75 pub struct StreamChunkPayload {
76 #[prost(bytes = "vec", tag = "1")]
77 pub binary_data: ::prost::alloc::vec::Vec<u8>,
78 }
79 #[derive(prost_helpers::AnyPB)]
80 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
81 pub enum Payload {
82 #[prost(message, tag = "2")]
83 StreamChunkPayload(StreamChunkPayload),
84 #[prost(int64, tag = "5")]
88 StreamChunkRefPointer(i64),
89 }
90 }
91 #[derive(prost_helpers::AnyPB)]
92 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
93 pub struct Barrier {
94 #[prost(uint64, tag = "1")]
95 pub epoch: u64,
96 #[prost(bool, tag = "2")]
97 pub is_checkpoint: bool,
98 }
99 #[derive(prost_helpers::AnyPB)]
100 #[derive(Clone, PartialEq, ::prost::Oneof)]
101 pub enum Request {
102 #[prost(message, tag = "1")]
103 Start(StartSink),
104 #[prost(message, tag = "3")]
105 WriteBatch(WriteBatch),
106 #[prost(message, tag = "4")]
107 Barrier(Barrier),
108 }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116pub mod sink_writer_stream_response {
118 #[derive(prost_helpers::AnyPB)]
119 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
120 pub struct StartResponse {}
121 #[derive(prost_helpers::AnyPB)]
122 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
123 pub struct CommitResponse {
124 #[prost(uint64, tag = "1")]
125 pub epoch: u64,
126 #[prost(message, optional, tag = "2")]
127 pub metadata: ::core::option::Option<super::SinkMetadata>,
128 }
129 #[derive(prost_helpers::AnyPB)]
130 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
131 pub struct BatchWrittenResponse {
132 #[prost(uint64, tag = "1")]
133 pub epoch: u64,
134 #[prost(uint64, tag = "2")]
135 pub batch_id: u64,
136 }
137 #[derive(prost_helpers::AnyPB)]
138 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
139 pub enum Response {
140 #[prost(message, tag = "1")]
141 Start(StartResponse),
142 #[prost(message, tag = "2")]
143 Commit(CommitResponse),
144 #[prost(message, tag = "3")]
145 Batch(BatchWrittenResponse),
146 }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151 #[prost(message, optional, tag = "1")]
152 pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
156pub struct ValidateSinkResponse {
157 #[prost(message, optional, tag = "1")]
159 pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
163pub struct SinkMetadata {
164 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167pub mod sink_metadata {
169 #[derive(prost_helpers::AnyPB)]
170 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
171 pub struct SerializedMetadata {
172 #[prost(bytes = "vec", tag = "1")]
173 pub metadata: ::prost::alloc::vec::Vec<u8>,
174 }
175 #[derive(prost_helpers::AnyPB)]
176 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
177 pub enum Metadata {
178 #[prost(message, tag = "1")]
179 Serialized(SerializedMetadata),
180 }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188pub mod sink_coordinator_stream_request {
190 #[derive(prost_helpers::AnyPB)]
191 #[derive(Clone, PartialEq, ::prost::Message)]
192 pub struct StartCoordinator {
193 #[prost(message, optional, tag = "1")]
194 pub param: ::core::option::Option<super::SinkParam>,
195 }
196 #[derive(prost_helpers::AnyPB)]
197 #[derive(Clone, PartialEq, ::prost::Message)]
198 pub struct CommitMetadata {
199 #[prost(uint64, tag = "1")]
200 pub epoch: u64,
201 #[prost(message, repeated, tag = "2")]
202 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203 }
204 #[derive(prost_helpers::AnyPB)]
205 #[derive(Clone, PartialEq, ::prost::Oneof)]
206 pub enum Request {
207 #[prost(message, tag = "1")]
208 Start(StartCoordinator),
209 #[prost(message, tag = "2")]
210 Commit(CommitMetadata),
211 }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219pub mod sink_coordinator_stream_response {
221 #[derive(prost_helpers::AnyPB)]
222 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
223 pub struct StartResponse {}
224 #[derive(prost_helpers::AnyPB)]
225 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
226 pub struct CommitResponse {
227 #[prost(uint64, tag = "1")]
228 pub epoch: u64,
229 }
230 #[derive(prost_helpers::AnyPB)]
231 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
232 pub enum Response {
233 #[prost(message, tag = "1")]
234 Start(StartResponse),
235 #[prost(message, tag = "2")]
236 Commit(CommitResponse),
237 }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
241pub struct CdcMessage {
242 #[prost(string, tag = "1")]
244 pub payload: ::prost::alloc::string::String,
245 #[prost(string, tag = "2")]
246 pub partition: ::prost::alloc::string::String,
247 #[prost(string, tag = "3")]
248 pub offset: ::prost::alloc::string::String,
249 #[prost(string, tag = "4")]
250 pub full_table_name: ::prost::alloc::string::String,
251 #[prost(int64, tag = "5")]
252 pub source_ts_ms: i64,
253 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254 pub msg_type: i32,
255 #[prost(string, tag = "7")]
257 pub key: ::prost::alloc::string::String,
258 #[prost(enumeration = "SourceType", tag = "8")]
261 pub source_type: i32,
262}
263pub mod cdc_message {
265 #[derive(prost_helpers::AnyPB)]
266 #[derive(
267 Clone,
268 Copy,
269 Debug,
270 PartialEq,
271 Eq,
272 Hash,
273 PartialOrd,
274 Ord,
275 ::prost::Enumeration
276 )]
277 #[repr(i32)]
278 pub enum CdcMessageType {
279 Unspecified = 0,
280 Heartbeat = 1,
281 Data = 2,
282 TransactionMeta = 3,
283 SchemaChange = 4,
284 }
285 impl CdcMessageType {
286 pub fn as_str_name(&self) -> &'static str {
291 match self {
292 Self::Unspecified => "UNSPECIFIED",
293 Self::Heartbeat => "HEARTBEAT",
294 Self::Data => "DATA",
295 Self::TransactionMeta => "TRANSACTION_META",
296 Self::SchemaChange => "SCHEMA_CHANGE",
297 }
298 }
299 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
301 match value {
302 "UNSPECIFIED" => Some(Self::Unspecified),
303 "HEARTBEAT" => Some(Self::Heartbeat),
304 "DATA" => Some(Self::Data),
305 "TRANSACTION_META" => Some(Self::TransactionMeta),
306 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
307 _ => None,
308 }
309 }
310 }
311}
312#[derive(prost_helpers::AnyPB)]
313#[derive(Clone, PartialEq, ::prost::Message)]
314pub struct GetEventStreamRequest {
315 #[prost(uint64, tag = "1")]
316 pub source_id: u64,
317 #[prost(enumeration = "SourceType", tag = "2")]
318 pub source_type: i32,
319 #[prost(string, tag = "3")]
320 pub start_offset: ::prost::alloc::string::String,
321 #[prost(btree_map = "string, string", tag = "4")]
322 pub properties: ::prost::alloc::collections::BTreeMap<
323 ::prost::alloc::string::String,
324 ::prost::alloc::string::String,
325 >,
326 #[prost(bool, tag = "5")]
327 pub snapshot_done: bool,
328 #[prost(bool, tag = "6")]
329 pub is_source_job: bool,
330}
331#[derive(prost_helpers::AnyPB)]
332#[derive(Clone, PartialEq, ::prost::Message)]
333pub struct GetEventStreamResponse {
334 #[prost(uint64, tag = "1")]
335 pub source_id: u64,
336 #[prost(message, repeated, tag = "2")]
337 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
338 #[prost(message, optional, tag = "3")]
339 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
340}
341pub mod get_event_stream_response {
343 #[derive(prost_helpers::AnyPB)]
344 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
345 pub struct ControlInfo {
346 #[prost(bool, tag = "1")]
347 pub handshake_ok: bool,
348 }
349}
350#[derive(prost_helpers::AnyPB)]
351#[derive(Clone, PartialEq, ::prost::Message)]
352pub struct ValidateSourceRequest {
353 #[prost(uint64, tag = "1")]
354 pub source_id: u64,
355 #[prost(enumeration = "SourceType", tag = "2")]
356 pub source_type: i32,
357 #[prost(btree_map = "string, string", tag = "3")]
358 pub properties: ::prost::alloc::collections::BTreeMap<
359 ::prost::alloc::string::String,
360 ::prost::alloc::string::String,
361 >,
362 #[prost(message, optional, tag = "4")]
363 pub table_schema: ::core::option::Option<TableSchema>,
364 #[prost(bool, tag = "5")]
365 pub is_source_job: bool,
366 #[prost(bool, tag = "6")]
367 pub is_backfill_table: bool,
368}
369#[derive(prost_helpers::AnyPB)]
370#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
371pub struct ValidateSourceResponse {
372 #[prost(message, optional, tag = "1")]
374 pub error: ::core::option::Option<ValidationError>,
375}
376#[derive(prost_helpers::AnyPB)]
377#[derive(Clone, PartialEq, ::prost::Message)]
378pub struct CoordinateRequest {
379 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
380 pub msg: ::core::option::Option<coordinate_request::Msg>,
381}
382pub mod coordinate_request {
384 #[derive(prost_helpers::AnyPB)]
387 #[derive(Clone, PartialEq, ::prost::Message)]
388 pub struct StartCoordinationRequest {
389 #[prost(message, optional, tag = "1")]
390 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
391 #[prost(message, optional, tag = "2")]
392 pub param: ::core::option::Option<super::SinkParam>,
393 }
394 #[derive(prost_helpers::AnyPB)]
395 #[derive(Clone, PartialEq, ::prost::Message)]
396 pub struct CommitRequest {
397 #[prost(uint64, tag = "1")]
398 pub epoch: u64,
399 #[prost(message, optional, tag = "2")]
400 pub metadata: ::core::option::Option<super::SinkMetadata>,
401 #[prost(message, optional, tag = "3")]
402 pub schema_change: ::core::option::Option<
403 super::super::stream_plan::SinkSchemaChange,
404 >,
405 }
406 #[derive(prost_helpers::AnyPB)]
407 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
408 pub struct UpdateVnodeBitmapRequest {
409 #[prost(message, optional, tag = "1")]
410 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
411 }
412 #[derive(prost_helpers::AnyPB)]
413 #[derive(Clone, PartialEq, ::prost::Oneof)]
414 pub enum Msg {
415 #[prost(message, tag = "1")]
416 StartRequest(StartCoordinationRequest),
417 #[prost(message, tag = "2")]
418 CommitRequest(CommitRequest),
419 #[prost(message, tag = "3")]
420 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
421 #[prost(bool, tag = "4")]
422 Stop(bool),
423 #[prost(uint64, tag = "5")]
424 AlignInitialEpochRequest(u64),
425 }
426}
427#[derive(prost_helpers::AnyPB)]
428#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
429pub struct CoordinateResponse {
430 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
431 pub msg: ::core::option::Option<coordinate_response::Msg>,
432}
433pub mod coordinate_response {
435 #[derive(prost_helpers::AnyPB)]
436 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
437 pub struct StartCoordinationResponse {
438 #[prost(uint64, optional, tag = "1")]
439 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
440 }
441 #[derive(prost_helpers::AnyPB)]
442 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
443 pub struct CommitResponse {
444 #[prost(uint64, tag = "1")]
445 pub epoch: u64,
446 }
447 #[derive(prost_helpers::AnyPB)]
448 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
449 pub enum Msg {
450 #[prost(message, tag = "1")]
451 StartResponse(StartCoordinationResponse),
452 #[prost(message, tag = "2")]
453 CommitResponse(CommitResponse),
454 #[prost(bool, tag = "3")]
455 Stopped(bool),
456 #[prost(uint64, tag = "4")]
457 AlignInitialEpochResponse(u64),
458 }
459}
460#[derive(prost_helpers::AnyPB)]
461#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
462#[repr(i32)]
463pub enum SourceType {
464 Unspecified = 0,
465 Mysql = 1,
466 Postgres = 2,
467 Citus = 3,
468 Mongodb = 4,
469 SqlServer = 5,
470}
471impl SourceType {
472 pub fn as_str_name(&self) -> &'static str {
477 match self {
478 Self::Unspecified => "UNSPECIFIED",
479 Self::Mysql => "MYSQL",
480 Self::Postgres => "POSTGRES",
481 Self::Citus => "CITUS",
482 Self::Mongodb => "MONGODB",
483 Self::SqlServer => "SQL_SERVER",
484 }
485 }
486 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
488 match value {
489 "UNSPECIFIED" => Some(Self::Unspecified),
490 "MYSQL" => Some(Self::Mysql),
491 "POSTGRES" => Some(Self::Postgres),
492 "CITUS" => Some(Self::Citus),
493 "MONGODB" => Some(Self::Mongodb),
494 "SQL_SERVER" => Some(Self::SqlServer),
495 _ => None,
496 }
497 }
498}
499pub mod connector_service_client {
501 #![allow(
502 unused_variables,
503 dead_code,
504 missing_docs,
505 clippy::wildcard_imports,
506 clippy::let_unit_value,
507 )]
508 use tonic::codegen::*;
509 use tonic::codegen::http::Uri;
510 #[derive(Debug, Clone)]
511 pub struct ConnectorServiceClient<T> {
512 inner: tonic::client::Grpc<T>,
513 }
514 impl ConnectorServiceClient<tonic::transport::Channel> {
515 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
517 where
518 D: TryInto<tonic::transport::Endpoint>,
519 D::Error: Into<StdError>,
520 {
521 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
522 Ok(Self::new(conn))
523 }
524 }
525 impl<T> ConnectorServiceClient<T>
526 where
527 T: tonic::client::GrpcService<tonic::body::Body>,
528 T::Error: Into<StdError>,
529 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
530 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
531 {
532 pub fn new(inner: T) -> Self {
533 let inner = tonic::client::Grpc::new(inner);
534 Self { inner }
535 }
536 pub fn with_origin(inner: T, origin: Uri) -> Self {
537 let inner = tonic::client::Grpc::with_origin(inner, origin);
538 Self { inner }
539 }
540 pub fn with_interceptor<F>(
541 inner: T,
542 interceptor: F,
543 ) -> ConnectorServiceClient<InterceptedService<T, F>>
544 where
545 F: tonic::service::Interceptor,
546 T::ResponseBody: Default,
547 T: tonic::codegen::Service<
548 http::Request<tonic::body::Body>,
549 Response = http::Response<
550 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
551 >,
552 >,
553 <T as tonic::codegen::Service<
554 http::Request<tonic::body::Body>,
555 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
556 {
557 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
558 }
559 #[must_use]
564 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
565 self.inner = self.inner.send_compressed(encoding);
566 self
567 }
568 #[must_use]
570 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
571 self.inner = self.inner.accept_compressed(encoding);
572 self
573 }
574 #[must_use]
578 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
579 self.inner = self.inner.max_decoding_message_size(limit);
580 self
581 }
582 #[must_use]
586 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
587 self.inner = self.inner.max_encoding_message_size(limit);
588 self
589 }
590 pub async fn sink_writer_stream(
591 &mut self,
592 request: impl tonic::IntoStreamingRequest<
593 Message = super::SinkWriterStreamRequest,
594 >,
595 ) -> std::result::Result<
596 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
597 tonic::Status,
598 > {
599 self.inner
600 .ready()
601 .await
602 .map_err(|e| {
603 tonic::Status::unknown(
604 format!("Service was not ready: {}", e.into()),
605 )
606 })?;
607 let codec = tonic_prost::ProstCodec::default();
608 let path = http::uri::PathAndQuery::from_static(
609 "/connector_service.ConnectorService/SinkWriterStream",
610 );
611 let mut req = request.into_streaming_request();
612 req.extensions_mut()
613 .insert(
614 GrpcMethod::new(
615 "connector_service.ConnectorService",
616 "SinkWriterStream",
617 ),
618 );
619 self.inner.streaming(req, path, codec).await
620 }
621 pub async fn sink_coordinator_stream(
622 &mut self,
623 request: impl tonic::IntoStreamingRequest<
624 Message = super::SinkCoordinatorStreamRequest,
625 >,
626 ) -> std::result::Result<
627 tonic::Response<
628 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
629 >,
630 tonic::Status,
631 > {
632 self.inner
633 .ready()
634 .await
635 .map_err(|e| {
636 tonic::Status::unknown(
637 format!("Service was not ready: {}", e.into()),
638 )
639 })?;
640 let codec = tonic_prost::ProstCodec::default();
641 let path = http::uri::PathAndQuery::from_static(
642 "/connector_service.ConnectorService/SinkCoordinatorStream",
643 );
644 let mut req = request.into_streaming_request();
645 req.extensions_mut()
646 .insert(
647 GrpcMethod::new(
648 "connector_service.ConnectorService",
649 "SinkCoordinatorStream",
650 ),
651 );
652 self.inner.streaming(req, path, codec).await
653 }
654 pub async fn validate_sink(
655 &mut self,
656 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
657 ) -> std::result::Result<
658 tonic::Response<super::ValidateSinkResponse>,
659 tonic::Status,
660 > {
661 self.inner
662 .ready()
663 .await
664 .map_err(|e| {
665 tonic::Status::unknown(
666 format!("Service was not ready: {}", e.into()),
667 )
668 })?;
669 let codec = tonic_prost::ProstCodec::default();
670 let path = http::uri::PathAndQuery::from_static(
671 "/connector_service.ConnectorService/ValidateSink",
672 );
673 let mut req = request.into_request();
674 req.extensions_mut()
675 .insert(
676 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
677 );
678 self.inner.unary(req, path, codec).await
679 }
680 pub async fn get_event_stream(
681 &mut self,
682 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
683 ) -> std::result::Result<
684 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
685 tonic::Status,
686 > {
687 self.inner
688 .ready()
689 .await
690 .map_err(|e| {
691 tonic::Status::unknown(
692 format!("Service was not ready: {}", e.into()),
693 )
694 })?;
695 let codec = tonic_prost::ProstCodec::default();
696 let path = http::uri::PathAndQuery::from_static(
697 "/connector_service.ConnectorService/GetEventStream",
698 );
699 let mut req = request.into_request();
700 req.extensions_mut()
701 .insert(
702 GrpcMethod::new(
703 "connector_service.ConnectorService",
704 "GetEventStream",
705 ),
706 );
707 self.inner.server_streaming(req, path, codec).await
708 }
709 pub async fn validate_source(
710 &mut self,
711 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
712 ) -> std::result::Result<
713 tonic::Response<super::ValidateSourceResponse>,
714 tonic::Status,
715 > {
716 self.inner
717 .ready()
718 .await
719 .map_err(|e| {
720 tonic::Status::unknown(
721 format!("Service was not ready: {}", e.into()),
722 )
723 })?;
724 let codec = tonic_prost::ProstCodec::default();
725 let path = http::uri::PathAndQuery::from_static(
726 "/connector_service.ConnectorService/ValidateSource",
727 );
728 let mut req = request.into_request();
729 req.extensions_mut()
730 .insert(
731 GrpcMethod::new(
732 "connector_service.ConnectorService",
733 "ValidateSource",
734 ),
735 );
736 self.inner.unary(req, path, codec).await
737 }
738 }
739}
740pub mod connector_service_server {
742 #![allow(
743 unused_variables,
744 dead_code,
745 missing_docs,
746 clippy::wildcard_imports,
747 clippy::let_unit_value,
748 )]
749 use tonic::codegen::*;
750 #[async_trait]
752 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
753 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
755 Item = std::result::Result<
756 super::SinkWriterStreamResponse,
757 tonic::Status,
758 >,
759 >
760 + std::marker::Send
761 + 'static;
762 async fn sink_writer_stream(
763 &self,
764 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
765 ) -> std::result::Result<
766 tonic::Response<Self::SinkWriterStreamStream>,
767 tonic::Status,
768 >;
769 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
771 Item = std::result::Result<
772 super::SinkCoordinatorStreamResponse,
773 tonic::Status,
774 >,
775 >
776 + std::marker::Send
777 + 'static;
778 async fn sink_coordinator_stream(
779 &self,
780 request: tonic::Request<
781 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
782 >,
783 ) -> std::result::Result<
784 tonic::Response<Self::SinkCoordinatorStreamStream>,
785 tonic::Status,
786 >;
787 async fn validate_sink(
788 &self,
789 request: tonic::Request<super::ValidateSinkRequest>,
790 ) -> std::result::Result<
791 tonic::Response<super::ValidateSinkResponse>,
792 tonic::Status,
793 >;
794 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
796 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
797 >
798 + std::marker::Send
799 + 'static;
800 async fn get_event_stream(
801 &self,
802 request: tonic::Request<super::GetEventStreamRequest>,
803 ) -> std::result::Result<
804 tonic::Response<Self::GetEventStreamStream>,
805 tonic::Status,
806 >;
807 async fn validate_source(
808 &self,
809 request: tonic::Request<super::ValidateSourceRequest>,
810 ) -> std::result::Result<
811 tonic::Response<super::ValidateSourceResponse>,
812 tonic::Status,
813 >;
814 }
815 #[derive(Debug)]
816 pub struct ConnectorServiceServer<T> {
817 inner: Arc<T>,
818 accept_compression_encodings: EnabledCompressionEncodings,
819 send_compression_encodings: EnabledCompressionEncodings,
820 max_decoding_message_size: Option<usize>,
821 max_encoding_message_size: Option<usize>,
822 }
823 impl<T> ConnectorServiceServer<T> {
824 pub fn new(inner: T) -> Self {
825 Self::from_arc(Arc::new(inner))
826 }
827 pub fn from_arc(inner: Arc<T>) -> Self {
828 Self {
829 inner,
830 accept_compression_encodings: Default::default(),
831 send_compression_encodings: Default::default(),
832 max_decoding_message_size: None,
833 max_encoding_message_size: None,
834 }
835 }
836 pub fn with_interceptor<F>(
837 inner: T,
838 interceptor: F,
839 ) -> InterceptedService<Self, F>
840 where
841 F: tonic::service::Interceptor,
842 {
843 InterceptedService::new(Self::new(inner), interceptor)
844 }
845 #[must_use]
847 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
848 self.accept_compression_encodings.enable(encoding);
849 self
850 }
851 #[must_use]
853 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
854 self.send_compression_encodings.enable(encoding);
855 self
856 }
857 #[must_use]
861 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
862 self.max_decoding_message_size = Some(limit);
863 self
864 }
865 #[must_use]
869 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
870 self.max_encoding_message_size = Some(limit);
871 self
872 }
873 }
874 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
875 where
876 T: ConnectorService,
877 B: Body + std::marker::Send + 'static,
878 B::Error: Into<StdError> + std::marker::Send + 'static,
879 {
880 type Response = http::Response<tonic::body::Body>;
881 type Error = std::convert::Infallible;
882 type Future = BoxFuture<Self::Response, Self::Error>;
883 fn poll_ready(
884 &mut self,
885 _cx: &mut Context<'_>,
886 ) -> Poll<std::result::Result<(), Self::Error>> {
887 Poll::Ready(Ok(()))
888 }
889 fn call(&mut self, req: http::Request<B>) -> Self::Future {
890 match req.uri().path() {
891 "/connector_service.ConnectorService/SinkWriterStream" => {
892 #[allow(non_camel_case_types)]
893 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
894 impl<
895 T: ConnectorService,
896 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
897 for SinkWriterStreamSvc<T> {
898 type Response = super::SinkWriterStreamResponse;
899 type ResponseStream = T::SinkWriterStreamStream;
900 type Future = BoxFuture<
901 tonic::Response<Self::ResponseStream>,
902 tonic::Status,
903 >;
904 fn call(
905 &mut self,
906 request: tonic::Request<
907 tonic::Streaming<super::SinkWriterStreamRequest>,
908 >,
909 ) -> Self::Future {
910 let inner = Arc::clone(&self.0);
911 let fut = async move {
912 <T as ConnectorService>::sink_writer_stream(&inner, request)
913 .await
914 };
915 Box::pin(fut)
916 }
917 }
918 let accept_compression_encodings = self.accept_compression_encodings;
919 let send_compression_encodings = self.send_compression_encodings;
920 let max_decoding_message_size = self.max_decoding_message_size;
921 let max_encoding_message_size = self.max_encoding_message_size;
922 let inner = self.inner.clone();
923 let fut = async move {
924 let method = SinkWriterStreamSvc(inner);
925 let codec = tonic_prost::ProstCodec::default();
926 let mut grpc = tonic::server::Grpc::new(codec)
927 .apply_compression_config(
928 accept_compression_encodings,
929 send_compression_encodings,
930 )
931 .apply_max_message_size_config(
932 max_decoding_message_size,
933 max_encoding_message_size,
934 );
935 let res = grpc.streaming(method, req).await;
936 Ok(res)
937 };
938 Box::pin(fut)
939 }
940 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
941 #[allow(non_camel_case_types)]
942 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
943 impl<
944 T: ConnectorService,
945 > tonic::server::StreamingService<
946 super::SinkCoordinatorStreamRequest,
947 > for SinkCoordinatorStreamSvc<T> {
948 type Response = super::SinkCoordinatorStreamResponse;
949 type ResponseStream = T::SinkCoordinatorStreamStream;
950 type Future = BoxFuture<
951 tonic::Response<Self::ResponseStream>,
952 tonic::Status,
953 >;
954 fn call(
955 &mut self,
956 request: tonic::Request<
957 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
958 >,
959 ) -> Self::Future {
960 let inner = Arc::clone(&self.0);
961 let fut = async move {
962 <T as ConnectorService>::sink_coordinator_stream(
963 &inner,
964 request,
965 )
966 .await
967 };
968 Box::pin(fut)
969 }
970 }
971 let accept_compression_encodings = self.accept_compression_encodings;
972 let send_compression_encodings = self.send_compression_encodings;
973 let max_decoding_message_size = self.max_decoding_message_size;
974 let max_encoding_message_size = self.max_encoding_message_size;
975 let inner = self.inner.clone();
976 let fut = async move {
977 let method = SinkCoordinatorStreamSvc(inner);
978 let codec = tonic_prost::ProstCodec::default();
979 let mut grpc = tonic::server::Grpc::new(codec)
980 .apply_compression_config(
981 accept_compression_encodings,
982 send_compression_encodings,
983 )
984 .apply_max_message_size_config(
985 max_decoding_message_size,
986 max_encoding_message_size,
987 );
988 let res = grpc.streaming(method, req).await;
989 Ok(res)
990 };
991 Box::pin(fut)
992 }
993 "/connector_service.ConnectorService/ValidateSink" => {
994 #[allow(non_camel_case_types)]
995 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
996 impl<
997 T: ConnectorService,
998 > tonic::server::UnaryService<super::ValidateSinkRequest>
999 for ValidateSinkSvc<T> {
1000 type Response = super::ValidateSinkResponse;
1001 type Future = BoxFuture<
1002 tonic::Response<Self::Response>,
1003 tonic::Status,
1004 >;
1005 fn call(
1006 &mut self,
1007 request: tonic::Request<super::ValidateSinkRequest>,
1008 ) -> Self::Future {
1009 let inner = Arc::clone(&self.0);
1010 let fut = async move {
1011 <T as ConnectorService>::validate_sink(&inner, request)
1012 .await
1013 };
1014 Box::pin(fut)
1015 }
1016 }
1017 let accept_compression_encodings = self.accept_compression_encodings;
1018 let send_compression_encodings = self.send_compression_encodings;
1019 let max_decoding_message_size = self.max_decoding_message_size;
1020 let max_encoding_message_size = self.max_encoding_message_size;
1021 let inner = self.inner.clone();
1022 let fut = async move {
1023 let method = ValidateSinkSvc(inner);
1024 let codec = tonic_prost::ProstCodec::default();
1025 let mut grpc = tonic::server::Grpc::new(codec)
1026 .apply_compression_config(
1027 accept_compression_encodings,
1028 send_compression_encodings,
1029 )
1030 .apply_max_message_size_config(
1031 max_decoding_message_size,
1032 max_encoding_message_size,
1033 );
1034 let res = grpc.unary(method, req).await;
1035 Ok(res)
1036 };
1037 Box::pin(fut)
1038 }
1039 "/connector_service.ConnectorService/GetEventStream" => {
1040 #[allow(non_camel_case_types)]
1041 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1042 impl<
1043 T: ConnectorService,
1044 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1045 for GetEventStreamSvc<T> {
1046 type Response = super::GetEventStreamResponse;
1047 type ResponseStream = T::GetEventStreamStream;
1048 type Future = BoxFuture<
1049 tonic::Response<Self::ResponseStream>,
1050 tonic::Status,
1051 >;
1052 fn call(
1053 &mut self,
1054 request: tonic::Request<super::GetEventStreamRequest>,
1055 ) -> Self::Future {
1056 let inner = Arc::clone(&self.0);
1057 let fut = async move {
1058 <T as ConnectorService>::get_event_stream(&inner, request)
1059 .await
1060 };
1061 Box::pin(fut)
1062 }
1063 }
1064 let accept_compression_encodings = self.accept_compression_encodings;
1065 let send_compression_encodings = self.send_compression_encodings;
1066 let max_decoding_message_size = self.max_decoding_message_size;
1067 let max_encoding_message_size = self.max_encoding_message_size;
1068 let inner = self.inner.clone();
1069 let fut = async move {
1070 let method = GetEventStreamSvc(inner);
1071 let codec = tonic_prost::ProstCodec::default();
1072 let mut grpc = tonic::server::Grpc::new(codec)
1073 .apply_compression_config(
1074 accept_compression_encodings,
1075 send_compression_encodings,
1076 )
1077 .apply_max_message_size_config(
1078 max_decoding_message_size,
1079 max_encoding_message_size,
1080 );
1081 let res = grpc.server_streaming(method, req).await;
1082 Ok(res)
1083 };
1084 Box::pin(fut)
1085 }
1086 "/connector_service.ConnectorService/ValidateSource" => {
1087 #[allow(non_camel_case_types)]
1088 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1089 impl<
1090 T: ConnectorService,
1091 > tonic::server::UnaryService<super::ValidateSourceRequest>
1092 for ValidateSourceSvc<T> {
1093 type Response = super::ValidateSourceResponse;
1094 type Future = BoxFuture<
1095 tonic::Response<Self::Response>,
1096 tonic::Status,
1097 >;
1098 fn call(
1099 &mut self,
1100 request: tonic::Request<super::ValidateSourceRequest>,
1101 ) -> Self::Future {
1102 let inner = Arc::clone(&self.0);
1103 let fut = async move {
1104 <T as ConnectorService>::validate_source(&inner, request)
1105 .await
1106 };
1107 Box::pin(fut)
1108 }
1109 }
1110 let accept_compression_encodings = self.accept_compression_encodings;
1111 let send_compression_encodings = self.send_compression_encodings;
1112 let max_decoding_message_size = self.max_decoding_message_size;
1113 let max_encoding_message_size = self.max_encoding_message_size;
1114 let inner = self.inner.clone();
1115 let fut = async move {
1116 let method = ValidateSourceSvc(inner);
1117 let codec = tonic_prost::ProstCodec::default();
1118 let mut grpc = tonic::server::Grpc::new(codec)
1119 .apply_compression_config(
1120 accept_compression_encodings,
1121 send_compression_encodings,
1122 )
1123 .apply_max_message_size_config(
1124 max_decoding_message_size,
1125 max_encoding_message_size,
1126 );
1127 let res = grpc.unary(method, req).await;
1128 Ok(res)
1129 };
1130 Box::pin(fut)
1131 }
1132 _ => {
1133 Box::pin(async move {
1134 let mut response = http::Response::new(
1135 tonic::body::Body::default(),
1136 );
1137 let headers = response.headers_mut();
1138 headers
1139 .insert(
1140 tonic::Status::GRPC_STATUS,
1141 (tonic::Code::Unimplemented as i32).into(),
1142 );
1143 headers
1144 .insert(
1145 http::header::CONTENT_TYPE,
1146 tonic::metadata::GRPC_CONTENT_TYPE,
1147 );
1148 Ok(response)
1149 })
1150 }
1151 }
1152 }
1153 }
1154 impl<T> Clone for ConnectorServiceServer<T> {
1155 fn clone(&self) -> Self {
1156 let inner = self.inner.clone();
1157 Self {
1158 inner,
1159 accept_compression_encodings: self.accept_compression_encodings,
1160 send_compression_encodings: self.send_compression_encodings,
1161 max_decoding_message_size: self.max_decoding_message_size,
1162 max_encoding_message_size: self.max_encoding_message_size,
1163 }
1164 }
1165 }
1166 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1168 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1169 const NAME: &'static str = SERVICE_NAME;
1170 }
1171}
1172pub mod sink_coordination_service_client {
1174 #![allow(
1175 unused_variables,
1176 dead_code,
1177 missing_docs,
1178 clippy::wildcard_imports,
1179 clippy::let_unit_value,
1180 )]
1181 use tonic::codegen::*;
1182 use tonic::codegen::http::Uri;
1183 #[derive(Debug, Clone)]
1184 pub struct SinkCoordinationServiceClient<T> {
1185 inner: tonic::client::Grpc<T>,
1186 }
1187 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
1188 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
1190 where
1191 D: TryInto<tonic::transport::Endpoint>,
1192 D::Error: Into<StdError>,
1193 {
1194 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
1195 Ok(Self::new(conn))
1196 }
1197 }
1198 impl<T> SinkCoordinationServiceClient<T>
1199 where
1200 T: tonic::client::GrpcService<tonic::body::Body>,
1201 T::Error: Into<StdError>,
1202 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
1203 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
1204 {
1205 pub fn new(inner: T) -> Self {
1206 let inner = tonic::client::Grpc::new(inner);
1207 Self { inner }
1208 }
1209 pub fn with_origin(inner: T, origin: Uri) -> Self {
1210 let inner = tonic::client::Grpc::with_origin(inner, origin);
1211 Self { inner }
1212 }
1213 pub fn with_interceptor<F>(
1214 inner: T,
1215 interceptor: F,
1216 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
1217 where
1218 F: tonic::service::Interceptor,
1219 T::ResponseBody: Default,
1220 T: tonic::codegen::Service<
1221 http::Request<tonic::body::Body>,
1222 Response = http::Response<
1223 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
1224 >,
1225 >,
1226 <T as tonic::codegen::Service<
1227 http::Request<tonic::body::Body>,
1228 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
1229 {
1230 SinkCoordinationServiceClient::new(
1231 InterceptedService::new(inner, interceptor),
1232 )
1233 }
1234 #[must_use]
1239 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1240 self.inner = self.inner.send_compressed(encoding);
1241 self
1242 }
1243 #[must_use]
1245 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1246 self.inner = self.inner.accept_compressed(encoding);
1247 self
1248 }
1249 #[must_use]
1253 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1254 self.inner = self.inner.max_decoding_message_size(limit);
1255 self
1256 }
1257 #[must_use]
1261 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1262 self.inner = self.inner.max_encoding_message_size(limit);
1263 self
1264 }
1265 pub async fn coordinate(
1266 &mut self,
1267 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
1268 ) -> std::result::Result<
1269 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
1270 tonic::Status,
1271 > {
1272 self.inner
1273 .ready()
1274 .await
1275 .map_err(|e| {
1276 tonic::Status::unknown(
1277 format!("Service was not ready: {}", e.into()),
1278 )
1279 })?;
1280 let codec = tonic_prost::ProstCodec::default();
1281 let path = http::uri::PathAndQuery::from_static(
1282 "/connector_service.SinkCoordinationService/Coordinate",
1283 );
1284 let mut req = request.into_streaming_request();
1285 req.extensions_mut()
1286 .insert(
1287 GrpcMethod::new(
1288 "connector_service.SinkCoordinationService",
1289 "Coordinate",
1290 ),
1291 );
1292 self.inner.streaming(req, path, codec).await
1293 }
1294 }
1295}
1296pub mod sink_coordination_service_server {
1298 #![allow(
1299 unused_variables,
1300 dead_code,
1301 missing_docs,
1302 clippy::wildcard_imports,
1303 clippy::let_unit_value,
1304 )]
1305 use tonic::codegen::*;
1306 #[async_trait]
1308 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1309 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1311 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1312 >
1313 + std::marker::Send
1314 + 'static;
1315 async fn coordinate(
1316 &self,
1317 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1318 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1319 }
1320 #[derive(Debug)]
1321 pub struct SinkCoordinationServiceServer<T> {
1322 inner: Arc<T>,
1323 accept_compression_encodings: EnabledCompressionEncodings,
1324 send_compression_encodings: EnabledCompressionEncodings,
1325 max_decoding_message_size: Option<usize>,
1326 max_encoding_message_size: Option<usize>,
1327 }
1328 impl<T> SinkCoordinationServiceServer<T> {
1329 pub fn new(inner: T) -> Self {
1330 Self::from_arc(Arc::new(inner))
1331 }
1332 pub fn from_arc(inner: Arc<T>) -> Self {
1333 Self {
1334 inner,
1335 accept_compression_encodings: Default::default(),
1336 send_compression_encodings: Default::default(),
1337 max_decoding_message_size: None,
1338 max_encoding_message_size: None,
1339 }
1340 }
1341 pub fn with_interceptor<F>(
1342 inner: T,
1343 interceptor: F,
1344 ) -> InterceptedService<Self, F>
1345 where
1346 F: tonic::service::Interceptor,
1347 {
1348 InterceptedService::new(Self::new(inner), interceptor)
1349 }
1350 #[must_use]
1352 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1353 self.accept_compression_encodings.enable(encoding);
1354 self
1355 }
1356 #[must_use]
1358 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1359 self.send_compression_encodings.enable(encoding);
1360 self
1361 }
1362 #[must_use]
1366 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1367 self.max_decoding_message_size = Some(limit);
1368 self
1369 }
1370 #[must_use]
1374 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1375 self.max_encoding_message_size = Some(limit);
1376 self
1377 }
1378 }
1379 impl<T, B> tonic::codegen::Service<http::Request<B>>
1380 for SinkCoordinationServiceServer<T>
1381 where
1382 T: SinkCoordinationService,
1383 B: Body + std::marker::Send + 'static,
1384 B::Error: Into<StdError> + std::marker::Send + 'static,
1385 {
1386 type Response = http::Response<tonic::body::Body>;
1387 type Error = std::convert::Infallible;
1388 type Future = BoxFuture<Self::Response, Self::Error>;
1389 fn poll_ready(
1390 &mut self,
1391 _cx: &mut Context<'_>,
1392 ) -> Poll<std::result::Result<(), Self::Error>> {
1393 Poll::Ready(Ok(()))
1394 }
1395 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1396 match req.uri().path() {
1397 "/connector_service.SinkCoordinationService/Coordinate" => {
1398 #[allow(non_camel_case_types)]
1399 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1400 impl<
1401 T: SinkCoordinationService,
1402 > tonic::server::StreamingService<super::CoordinateRequest>
1403 for CoordinateSvc<T> {
1404 type Response = super::CoordinateResponse;
1405 type ResponseStream = T::CoordinateStream;
1406 type Future = BoxFuture<
1407 tonic::Response<Self::ResponseStream>,
1408 tonic::Status,
1409 >;
1410 fn call(
1411 &mut self,
1412 request: tonic::Request<
1413 tonic::Streaming<super::CoordinateRequest>,
1414 >,
1415 ) -> Self::Future {
1416 let inner = Arc::clone(&self.0);
1417 let fut = async move {
1418 <T as SinkCoordinationService>::coordinate(&inner, request)
1419 .await
1420 };
1421 Box::pin(fut)
1422 }
1423 }
1424 let accept_compression_encodings = self.accept_compression_encodings;
1425 let send_compression_encodings = self.send_compression_encodings;
1426 let max_decoding_message_size = self.max_decoding_message_size;
1427 let max_encoding_message_size = self.max_encoding_message_size;
1428 let inner = self.inner.clone();
1429 let fut = async move {
1430 let method = CoordinateSvc(inner);
1431 let codec = tonic_prost::ProstCodec::default();
1432 let mut grpc = tonic::server::Grpc::new(codec)
1433 .apply_compression_config(
1434 accept_compression_encodings,
1435 send_compression_encodings,
1436 )
1437 .apply_max_message_size_config(
1438 max_decoding_message_size,
1439 max_encoding_message_size,
1440 );
1441 let res = grpc.streaming(method, req).await;
1442 Ok(res)
1443 };
1444 Box::pin(fut)
1445 }
1446 _ => {
1447 Box::pin(async move {
1448 let mut response = http::Response::new(
1449 tonic::body::Body::default(),
1450 );
1451 let headers = response.headers_mut();
1452 headers
1453 .insert(
1454 tonic::Status::GRPC_STATUS,
1455 (tonic::Code::Unimplemented as i32).into(),
1456 );
1457 headers
1458 .insert(
1459 http::header::CONTENT_TYPE,
1460 tonic::metadata::GRPC_CONTENT_TYPE,
1461 );
1462 Ok(response)
1463 })
1464 }
1465 }
1466 }
1467 }
1468 impl<T> Clone for SinkCoordinationServiceServer<T> {
1469 fn clone(&self) -> Self {
1470 let inner = self.inner.clone();
1471 Self {
1472 inner,
1473 accept_compression_encodings: self.accept_compression_encodings,
1474 send_compression_encodings: self.send_compression_encodings,
1475 max_decoding_message_size: self.max_decoding_message_size,
1476 max_encoding_message_size: self.max_encoding_message_size,
1477 }
1478 }
1479 }
1480 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1482 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1483 const NAME: &'static str = SERVICE_NAME;
1484 }
1485}