hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19 #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
20 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22 ///
23 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25 /// API allows precise targeting of specific cluster members rather than broadcasting to
26 /// all members.
27 ///
28 /// # Example
29 /// ```rust
30 /// # #[cfg(feature = "deploy")] {
31 /// # use hydro_lang::prelude::*;
32 /// # use futures::StreamExt;
33 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34 /// let p1 = flow.process::<()>();
35 /// let workers: Cluster<()> = flow.cluster::<()>();
36 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39 /// .into_keyed()
40 /// .demux_bincode(&workers);
41 /// # on_worker.send_bincode(&p2).entries()
42 /// // if there are 4 members in the cluster, each receives one element
43 /// // - MemberId::<()>(0): [0]
44 /// // - MemberId::<()>(1): [1]
45 /// // - MemberId::<()>(2): [2]
46 /// // - MemberId::<()>(3): [3]
47 /// # }, |mut stream| async move {
48 /// # let mut results = Vec::new();
49 /// # for w in 0..4 {
50 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
51 /// # }
52 /// # results.sort();
53 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54 /// # }));
55 /// # }
56 /// ```
57 pub fn demux_bincode(
58 self,
59 other: &Cluster<'a, L2>,
60 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61 where
62 T: Serialize + DeserializeOwned,
63 {
64 self.demux(other, TCP.bincode())
65 }
66
67 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68 /// identifying the recipient for each group and using the configuration in `via` to set up the
69 /// message transport.
70 ///
71 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73 /// API allows precise targeting of specific cluster members rather than broadcasting to
74 /// all members.
75 ///
76 /// # Example
77 /// ```rust
78 /// # #[cfg(feature = "deploy")] {
79 /// # use hydro_lang::prelude::*;
80 /// # use futures::StreamExt;
81 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82 /// let p1 = flow.process::<()>();
83 /// let workers: Cluster<()> = flow.cluster::<()>();
84 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87 /// .into_keyed()
88 /// .demux(&workers, TCP.bincode());
89 /// # on_worker.send(&p2, TCP.bincode()).entries()
90 /// // if there are 4 members in the cluster, each receives one element
91 /// // - MemberId::<()>(0): [0]
92 /// // - MemberId::<()>(1): [1]
93 /// // - MemberId::<()>(2): [2]
94 /// // - MemberId::<()>(3): [3]
95 /// # }, |mut stream| async move {
96 /// # let mut results = Vec::new();
97 /// # for w in 0..4 {
98 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
99 /// # }
100 /// # results.sort();
101 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102 /// # }));
103 /// # }
104 /// ```
105 pub fn demux<N: NetworkFor<T>>(
106 self,
107 to: &Cluster<'a, L2>,
108 via: N,
109 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
110 where
111 T: Serialize + DeserializeOwned,
112 {
113 let _ = via;
114 let serialize_pipeline = Some(N::serialize_thunk(true));
115
116 let deserialize_pipeline = Some(N::deserialize_thunk(None));
117
118 Stream::new(
119 to.clone(),
120 HydroNode::Network {
121 serialize_fn: serialize_pipeline.map(|e| e.into()),
122 instantiate_fn: DebugInstantiate::Building,
123 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
124 input: Box::new(self.ir_node.into_inner()),
125 metadata: to.new_node_metadata(
126 Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
127 ),
128 },
129 )
130 }
131}
132
133impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
134 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
135{
136 #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
137 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
138 /// compound key where the first element is the recipient's [`MemberId`] and the second element
139 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
140 /// messages.
141 ///
142 /// # Example
143 /// ```rust
144 /// # #[cfg(feature = "deploy")] {
145 /// # use hydro_lang::prelude::*;
146 /// # use futures::StreamExt;
147 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
148 /// let p1 = flow.process::<()>();
149 /// let workers: Cluster<()> = flow.cluster::<()>();
150 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
151 /// .source_iter(q!(vec![0, 1, 2, 3]))
152 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
153 /// .into_keyed();
154 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
155 /// # on_worker.entries().send_bincode(&p2).entries()
156 /// // if there are 4 members in the cluster, each receives one element
157 /// // - MemberId::<()>(0): { 0: [123] }
158 /// // - MemberId::<()>(1): { 1: [124] }
159 /// // - ...
160 /// # }, |mut stream| async move {
161 /// # let mut results = Vec::new();
162 /// # for w in 0..4 {
163 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
164 /// # }
165 /// # results.sort();
166 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
167 /// # }));
168 /// # }
169 /// ```
170 pub fn demux_bincode(
171 self,
172 other: &Cluster<'a, L2>,
173 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
174 where
175 K: Serialize + DeserializeOwned,
176 T: Serialize + DeserializeOwned,
177 {
178 self.demux(other, TCP.bincode())
179 }
180
181 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
182 /// compound key where the first element is the recipient's [`MemberId`] and the second element
183 /// is a key that will be sent along with the value, using the configuration in `via` to set up
184 /// the message transport.
185 ///
186 /// # Example
187 /// ```rust
188 /// # #[cfg(feature = "deploy")] {
189 /// # use hydro_lang::prelude::*;
190 /// # use futures::StreamExt;
191 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
192 /// let p1 = flow.process::<()>();
193 /// let workers: Cluster<()> = flow.cluster::<()>();
194 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
195 /// .source_iter(q!(vec![0, 1, 2, 3]))
196 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
197 /// .into_keyed();
198 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.bincode());
199 /// # on_worker.entries().send(&p2, TCP.bincode()).entries()
200 /// // if there are 4 members in the cluster, each receives one element
201 /// // - MemberId::<()>(0): { 0: [123] }
202 /// // - MemberId::<()>(1): { 1: [124] }
203 /// // - ...
204 /// # }, |mut stream| async move {
205 /// # let mut results = Vec::new();
206 /// # for w in 0..4 {
207 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
208 /// # }
209 /// # results.sort();
210 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
211 /// # }));
212 /// # }
213 /// ```
214 pub fn demux<N: NetworkFor<(K, T)>>(
215 self,
216 to: &Cluster<'a, L2>,
217 via: N,
218 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
219 where
220 K: Serialize + DeserializeOwned,
221 T: Serialize + DeserializeOwned,
222 {
223 let _ = via;
224 let serialize_pipeline = Some(N::serialize_thunk(true));
225
226 let deserialize_pipeline = Some(N::deserialize_thunk(None));
227
228 KeyedStream::new(
229 to.clone(),
230 HydroNode::Network {
231 serialize_fn: serialize_pipeline.map(|e| e.into()),
232 instantiate_fn: DebugInstantiate::Building,
233 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
234 input: Box::new(
235 self.entries()
236 .map(q!(|((id, k), v)| (id, (k, v))))
237 .ir_node
238 .into_inner(),
239 ),
240 metadata: to.new_node_metadata(
241 KeyedStream::<K, T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
242 ),
243 },
244 )
245 }
246}
247
248impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
249 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
250{
251 #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
252 /// Sends each group of this stream at each source member to a specific member of a destination
253 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
254 /// [`bincode`] to serialize/deserialize messages.
255 ///
256 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
257 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
258 /// API allows precise targeting of specific cluster members rather than broadcasting to all
259 /// members.
260 ///
261 /// Each cluster member sends its local stream elements, and they are collected at each
262 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
263 ///
264 /// # Example
265 /// ```rust
266 /// # #[cfg(feature = "deploy")] {
267 /// # use hydro_lang::prelude::*;
268 /// # use futures::StreamExt;
269 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
270 /// # type Source = ();
271 /// # type Destination = ();
272 /// let source: Cluster<Source> = flow.cluster::<Source>();
273 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
274 /// .source_iter(q!(vec![0, 1, 2, 3]))
275 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
276 /// .into_keyed();
277 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
278 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
279 /// # all_received.entries().send_bincode(&p2).entries()
280 /// # }, |mut stream| async move {
281 /// // if there are 4 members in the destination cluster, each receives one message from each source member
282 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
283 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
284 /// // - ...
285 /// # let mut results = Vec::new();
286 /// # for w in 0..16 {
287 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
288 /// # }
289 /// # results.sort();
290 /// # assert_eq!(results, vec![
291 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
292 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
293 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
294 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
295 /// # ]);
296 /// # }));
297 /// # }
298 /// ```
299 pub fn demux_bincode(
300 self,
301 other: &Cluster<'a, L2>,
302 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
303 where
304 T: Serialize + DeserializeOwned,
305 {
306 self.demux(other, TCP.bincode())
307 }
308
309 /// Sends each group of this stream at each source member to a specific member of a destination
310 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
311 /// configuration in `via` to set up the message transport.
312 ///
313 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
314 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
315 /// API allows precise targeting of specific cluster members rather than broadcasting to all
316 /// members.
317 ///
318 /// Each cluster member sends its local stream elements, and they are collected at each
319 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
320 ///
321 /// # Example
322 /// ```rust
323 /// # #[cfg(feature = "deploy")] {
324 /// # use hydro_lang::prelude::*;
325 /// # use futures::StreamExt;
326 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
327 /// # type Source = ();
328 /// # type Destination = ();
329 /// let source: Cluster<Source> = flow.cluster::<Source>();
330 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
331 /// .source_iter(q!(vec![0, 1, 2, 3]))
332 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
333 /// .into_keyed();
334 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
335 /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
336 /// # all_received.entries().send(&p2, TCP.bincode()).entries()
337 /// # }, |mut stream| async move {
338 /// // if there are 4 members in the destination cluster, each receives one message from each source member
339 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
340 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
341 /// // - ...
342 /// # let mut results = Vec::new();
343 /// # for w in 0..16 {
344 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
345 /// # }
346 /// # results.sort();
347 /// # assert_eq!(results, vec![
348 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
349 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
350 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
351 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
352 /// # ]);
353 /// # }));
354 /// # }
355 /// ```
356 pub fn demux<N: NetworkFor<T>>(
357 self,
358 to: &Cluster<'a, L2>,
359 via: N,
360 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
361 where
362 T: Serialize + DeserializeOwned,
363 {
364 let _ = via;
365 let serialize_pipeline = Some(N::serialize_thunk(true));
366
367 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
368
369 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
370 to.clone(),
371 HydroNode::Network {
372 serialize_fn: serialize_pipeline.map(|e| e.into()),
373 instantiate_fn: DebugInstantiate::Building,
374 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
375 input: Box::new(self.ir_node.into_inner()),
376 metadata: to.new_node_metadata(Stream::<
377 (MemberId<L>, T),
378 Cluster<'a, L2>,
379 Unbounded,
380 O,
381 R,
382 >::collection_kind()),
383 },
384 );
385
386 raw_stream.into_keyed()
387 }
388}
389
390impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
391 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
392{
393 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
394 #[deprecated = "use KeyedStream::send(..., TCP.bincode()) instead"]
395 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
396 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
397 /// has a compound key where the first element is the sender's [`MemberId`] and the second
398 /// element is the original key.
399 ///
400 /// # Example
401 /// ```rust
402 /// # #[cfg(feature = "deploy")] {
403 /// # use hydro_lang::prelude::*;
404 /// # use futures::StreamExt;
405 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
406 /// # type Source = ();
407 /// # type Destination = ();
408 /// let source: Cluster<Source> = flow.cluster::<Source>();
409 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
410 /// .source_iter(q!(vec![0, 1, 2, 3]))
411 /// .map(q!(|x| (x, x + 123)))
412 /// .into_keyed();
413 /// let destination_process = flow.process::<Destination>();
414 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
415 /// # all_received.entries().send_bincode(&p2)
416 /// # }, |mut stream| async move {
417 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
418 /// // {
419 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
420 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
421 /// // ...
422 /// // }
423 /// # let mut results = Vec::new();
424 /// # for w in 0..16 {
425 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
426 /// # }
427 /// # results.sort();
428 /// # assert_eq!(results, vec![
429 /// # "((MemberId::<()>(0), 0), 123)",
430 /// # "((MemberId::<()>(0), 1), 124)",
431 /// # "((MemberId::<()>(0), 2), 125)",
432 /// # "((MemberId::<()>(0), 3), 126)",
433 /// # "((MemberId::<()>(1), 0), 123)",
434 /// # "((MemberId::<()>(1), 1), 124)",
435 /// # "((MemberId::<()>(1), 2), 125)",
436 /// # "((MemberId::<()>(1), 3), 126)",
437 /// # "((MemberId::<()>(2), 0), 123)",
438 /// # "((MemberId::<()>(2), 1), 124)",
439 /// # "((MemberId::<()>(2), 2), 125)",
440 /// # "((MemberId::<()>(2), 3), 126)",
441 /// # "((MemberId::<()>(3), 0), 123)",
442 /// # "((MemberId::<()>(3), 1), 124)",
443 /// # "((MemberId::<()>(3), 2), 125)",
444 /// # "((MemberId::<()>(3), 3), 126)",
445 /// # ]);
446 /// # }));
447 /// # }
448 /// ```
449 pub fn send_bincode<L2>(
450 self,
451 other: &Process<'a, L2>,
452 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
453 where
454 K: Serialize + DeserializeOwned,
455 V: Serialize + DeserializeOwned,
456 {
457 self.send(other, TCP.bincode())
458 }
459
460 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
461 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
462 /// network, using the configuration in `via` to set up the message transport. The resulting
463 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
464 /// the second element is the original key.
465 ///
466 /// # Example
467 /// ```rust
468 /// # #[cfg(feature = "deploy")] {
469 /// # use hydro_lang::prelude::*;
470 /// # use futures::StreamExt;
471 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
472 /// # type Source = ();
473 /// # type Destination = ();
474 /// let source: Cluster<Source> = flow.cluster::<Source>();
475 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
476 /// .source_iter(q!(vec![0, 1, 2, 3]))
477 /// .map(q!(|x| (x, x + 123)))
478 /// .into_keyed();
479 /// let destination_process = flow.process::<Destination>();
480 /// let all_received = to_send.send(&destination_process, TCP.bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
481 /// # all_received.entries().send(&p2, TCP.bincode())
482 /// # }, |mut stream| async move {
483 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
484 /// // {
485 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
486 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
487 /// // ...
488 /// // }
489 /// # let mut results = Vec::new();
490 /// # for w in 0..16 {
491 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
492 /// # }
493 /// # results.sort();
494 /// # assert_eq!(results, vec![
495 /// # "((MemberId::<()>(0), 0), 123)",
496 /// # "((MemberId::<()>(0), 1), 124)",
497 /// # "((MemberId::<()>(0), 2), 125)",
498 /// # "((MemberId::<()>(0), 3), 126)",
499 /// # "((MemberId::<()>(1), 0), 123)",
500 /// # "((MemberId::<()>(1), 1), 124)",
501 /// # "((MemberId::<()>(1), 2), 125)",
502 /// # "((MemberId::<()>(1), 3), 126)",
503 /// # "((MemberId::<()>(2), 0), 123)",
504 /// # "((MemberId::<()>(2), 1), 124)",
505 /// # "((MemberId::<()>(2), 2), 125)",
506 /// # "((MemberId::<()>(2), 3), 126)",
507 /// # "((MemberId::<()>(3), 0), 123)",
508 /// # "((MemberId::<()>(3), 1), 124)",
509 /// # "((MemberId::<()>(3), 2), 125)",
510 /// # "((MemberId::<()>(3), 3), 126)",
511 /// # ]);
512 /// # }));
513 /// # }
514 /// ```
515 pub fn send<L2, N: NetworkFor<(K, V)>>(
516 self,
517 to: &Process<'a, L2>,
518 via: N,
519 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
520 where
521 K: Serialize + DeserializeOwned,
522 V: Serialize + DeserializeOwned,
523 {
524 let _ = via;
525 let serialize_pipeline = Some(N::serialize_thunk(false));
526
527 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
528
529 let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
530 Stream::new(
531 to.clone(),
532 HydroNode::Network {
533 serialize_fn: serialize_pipeline.map(|e| e.into()),
534 instantiate_fn: DebugInstantiate::Building,
535 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
536 input: Box::new(self.ir_node.into_inner()),
537 metadata: to.new_node_metadata(Stream::<
538 (MemberId<L>, (K, V)),
539 Cluster<'a, L2>,
540 Unbounded,
541 O,
542 R,
543 >::collection_kind()),
544 },
545 );
546
547 raw_stream
548 .map(q!(|(sender, (k, v))| ((sender, k), v)))
549 .into_keyed()
550 }
551}