hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{Ordering, Retries, Stream};
11#[cfg(stageleft_runtime)]
12use crate::location::dynamic::DynLocation;
13use crate::location::{Cluster, MemberId, Process};
14use crate::networking::{NetworkFor, TCP};
15
16impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
17    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
18{
19    #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
20    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
21    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
22    ///
23    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
24    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
25    /// API allows precise targeting of specific cluster members rather than broadcasting to
26    /// all members.
27    ///
28    /// # Example
29    /// ```rust
30    /// # #[cfg(feature = "deploy")] {
31    /// # use hydro_lang::prelude::*;
32    /// # use futures::StreamExt;
33    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
34    /// let p1 = flow.process::<()>();
35    /// let workers: Cluster<()> = flow.cluster::<()>();
36    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
37    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
38    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
39    ///     .into_keyed()
40    ///     .demux_bincode(&workers);
41    /// # on_worker.send_bincode(&p2).entries()
42    /// // if there are 4 members in the cluster, each receives one element
43    /// // - MemberId::<()>(0): [0]
44    /// // - MemberId::<()>(1): [1]
45    /// // - MemberId::<()>(2): [2]
46    /// // - MemberId::<()>(3): [3]
47    /// # }, |mut stream| async move {
48    /// # let mut results = Vec::new();
49    /// # for w in 0..4 {
50    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
51    /// # }
52    /// # results.sort();
53    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
54    /// # }));
55    /// # }
56    /// ```
57    pub fn demux_bincode(
58        self,
59        other: &Cluster<'a, L2>,
60    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
61    where
62        T: Serialize + DeserializeOwned,
63    {
64        self.demux(other, TCP.bincode())
65    }
66
67    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
68    /// identifying the recipient for each group and using the configuration in `via` to set up the
69    /// message transport.
70    ///
71    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
72    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
73    /// API allows precise targeting of specific cluster members rather than broadcasting to
74    /// all members.
75    ///
76    /// # Example
77    /// ```rust
78    /// # #[cfg(feature = "deploy")] {
79    /// # use hydro_lang::prelude::*;
80    /// # use futures::StreamExt;
81    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
82    /// let p1 = flow.process::<()>();
83    /// let workers: Cluster<()> = flow.cluster::<()>();
84    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
85    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
86    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
87    ///     .into_keyed()
88    ///     .demux(&workers, TCP.bincode());
89    /// # on_worker.send(&p2, TCP.bincode()).entries()
90    /// // if there are 4 members in the cluster, each receives one element
91    /// // - MemberId::<()>(0): [0]
92    /// // - MemberId::<()>(1): [1]
93    /// // - MemberId::<()>(2): [2]
94    /// // - MemberId::<()>(3): [3]
95    /// # }, |mut stream| async move {
96    /// # let mut results = Vec::new();
97    /// # for w in 0..4 {
98    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
99    /// # }
100    /// # results.sort();
101    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
102    /// # }));
103    /// # }
104    /// ```
105    pub fn demux<N: NetworkFor<T>>(
106        self,
107        to: &Cluster<'a, L2>,
108        via: N,
109    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
110    where
111        T: Serialize + DeserializeOwned,
112    {
113        let _ = via;
114        let serialize_pipeline = Some(N::serialize_thunk(true));
115
116        let deserialize_pipeline = Some(N::deserialize_thunk(None));
117
118        Stream::new(
119            to.clone(),
120            HydroNode::Network {
121                serialize_fn: serialize_pipeline.map(|e| e.into()),
122                instantiate_fn: DebugInstantiate::Building,
123                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
124                input: Box::new(self.ir_node.into_inner()),
125                metadata: to.new_node_metadata(
126                    Stream::<T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
127                ),
128            },
129        )
130    }
131}
132
133impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
134    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
135{
136    #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
137    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
138    /// compound key where the first element is the recipient's [`MemberId`] and the second element
139    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
140    /// messages.
141    ///
142    /// # Example
143    /// ```rust
144    /// # #[cfg(feature = "deploy")] {
145    /// # use hydro_lang::prelude::*;
146    /// # use futures::StreamExt;
147    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
148    /// let p1 = flow.process::<()>();
149    /// let workers: Cluster<()> = flow.cluster::<()>();
150    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
151    ///     .source_iter(q!(vec![0, 1, 2, 3]))
152    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
153    ///     .into_keyed();
154    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
155    /// # on_worker.entries().send_bincode(&p2).entries()
156    /// // if there are 4 members in the cluster, each receives one element
157    /// // - MemberId::<()>(0): { 0: [123] }
158    /// // - MemberId::<()>(1): { 1: [124] }
159    /// // - ...
160    /// # }, |mut stream| async move {
161    /// # let mut results = Vec::new();
162    /// # for w in 0..4 {
163    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
164    /// # }
165    /// # results.sort();
166    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
167    /// # }));
168    /// # }
169    /// ```
170    pub fn demux_bincode(
171        self,
172        other: &Cluster<'a, L2>,
173    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
174    where
175        K: Serialize + DeserializeOwned,
176        T: Serialize + DeserializeOwned,
177    {
178        self.demux(other, TCP.bincode())
179    }
180
181    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
182    /// compound key where the first element is the recipient's [`MemberId`] and the second element
183    /// is a key that will be sent along with the value, using the configuration in `via` to set up
184    /// the message transport.
185    ///
186    /// # Example
187    /// ```rust
188    /// # #[cfg(feature = "deploy")] {
189    /// # use hydro_lang::prelude::*;
190    /// # use futures::StreamExt;
191    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
192    /// let p1 = flow.process::<()>();
193    /// let workers: Cluster<()> = flow.cluster::<()>();
194    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
195    ///     .source_iter(q!(vec![0, 1, 2, 3]))
196    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
197    ///     .into_keyed();
198    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.bincode());
199    /// # on_worker.entries().send(&p2, TCP.bincode()).entries()
200    /// // if there are 4 members in the cluster, each receives one element
201    /// // - MemberId::<()>(0): { 0: [123] }
202    /// // - MemberId::<()>(1): { 1: [124] }
203    /// // - ...
204    /// # }, |mut stream| async move {
205    /// # let mut results = Vec::new();
206    /// # for w in 0..4 {
207    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
208    /// # }
209    /// # results.sort();
210    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
211    /// # }));
212    /// # }
213    /// ```
214    pub fn demux<N: NetworkFor<(K, T)>>(
215        self,
216        to: &Cluster<'a, L2>,
217        via: N,
218    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
219    where
220        K: Serialize + DeserializeOwned,
221        T: Serialize + DeserializeOwned,
222    {
223        let _ = via;
224        let serialize_pipeline = Some(N::serialize_thunk(true));
225
226        let deserialize_pipeline = Some(N::deserialize_thunk(None));
227
228        KeyedStream::new(
229            to.clone(),
230            HydroNode::Network {
231                serialize_fn: serialize_pipeline.map(|e| e.into()),
232                instantiate_fn: DebugInstantiate::Building,
233                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
234                input: Box::new(
235                    self.entries()
236                        .map(q!(|((id, k), v)| (id, (k, v))))
237                        .ir_node
238                        .into_inner(),
239                ),
240                metadata: to.new_node_metadata(
241                    KeyedStream::<K, T, Cluster<'a, L2>, Unbounded, O, R>::collection_kind(),
242                ),
243            },
244        )
245    }
246}
247
248impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
249    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
250{
251    #[deprecated = "use KeyedStream::demux(..., TCP.bincode()) instead"]
252    /// Sends each group of this stream at each source member to a specific member of a destination
253    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
254    /// [`bincode`] to serialize/deserialize messages.
255    ///
256    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
257    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
258    /// API allows precise targeting of specific cluster members rather than broadcasting to all
259    /// members.
260    ///
261    /// Each cluster member sends its local stream elements, and they are collected at each
262    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
263    ///
264    /// # Example
265    /// ```rust
266    /// # #[cfg(feature = "deploy")] {
267    /// # use hydro_lang::prelude::*;
268    /// # use futures::StreamExt;
269    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
270    /// # type Source = ();
271    /// # type Destination = ();
272    /// let source: Cluster<Source> = flow.cluster::<Source>();
273    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
274    ///     .source_iter(q!(vec![0, 1, 2, 3]))
275    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
276    ///     .into_keyed();
277    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
278    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
279    /// # all_received.entries().send_bincode(&p2).entries()
280    /// # }, |mut stream| async move {
281    /// // if there are 4 members in the destination cluster, each receives one message from each source member
282    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
283    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
284    /// // - ...
285    /// # let mut results = Vec::new();
286    /// # for w in 0..16 {
287    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
288    /// # }
289    /// # results.sort();
290    /// # assert_eq!(results, vec![
291    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
292    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
293    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
294    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
295    /// # ]);
296    /// # }));
297    /// # }
298    /// ```
299    pub fn demux_bincode(
300        self,
301        other: &Cluster<'a, L2>,
302    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
303    where
304        T: Serialize + DeserializeOwned,
305    {
306        self.demux(other, TCP.bincode())
307    }
308
309    /// Sends each group of this stream at each source member to a specific member of a destination
310    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
311    /// configuration in `via` to set up the message transport.
312    ///
313    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
314    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
315    /// API allows precise targeting of specific cluster members rather than broadcasting to all
316    /// members.
317    ///
318    /// Each cluster member sends its local stream elements, and they are collected at each
319    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
320    ///
321    /// # Example
322    /// ```rust
323    /// # #[cfg(feature = "deploy")] {
324    /// # use hydro_lang::prelude::*;
325    /// # use futures::StreamExt;
326    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
327    /// # type Source = ();
328    /// # type Destination = ();
329    /// let source: Cluster<Source> = flow.cluster::<Source>();
330    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
331    ///     .source_iter(q!(vec![0, 1, 2, 3]))
332    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
333    ///     .into_keyed();
334    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
335    /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
336    /// # all_received.entries().send(&p2, TCP.bincode()).entries()
337    /// # }, |mut stream| async move {
338    /// // if there are 4 members in the destination cluster, each receives one message from each source member
339    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
340    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
341    /// // - ...
342    /// # let mut results = Vec::new();
343    /// # for w in 0..16 {
344    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
345    /// # }
346    /// # results.sort();
347    /// # assert_eq!(results, vec![
348    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
349    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
350    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
351    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
352    /// # ]);
353    /// # }));
354    /// # }
355    /// ```
356    pub fn demux<N: NetworkFor<T>>(
357        self,
358        to: &Cluster<'a, L2>,
359        via: N,
360    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
361    where
362        T: Serialize + DeserializeOwned,
363    {
364        let _ = via;
365        let serialize_pipeline = Some(N::serialize_thunk(true));
366
367        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
368
369        let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
370            to.clone(),
371            HydroNode::Network {
372                serialize_fn: serialize_pipeline.map(|e| e.into()),
373                instantiate_fn: DebugInstantiate::Building,
374                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
375                input: Box::new(self.ir_node.into_inner()),
376                metadata: to.new_node_metadata(Stream::<
377                    (MemberId<L>, T),
378                    Cluster<'a, L2>,
379                    Unbounded,
380                    O,
381                    R,
382                >::collection_kind()),
383            },
384        );
385
386        raw_stream.into_keyed()
387    }
388}
389
390impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
391    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
392{
393    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
394    #[deprecated = "use KeyedStream::send(..., TCP.bincode()) instead"]
395    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
396    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
397    /// has a compound key where the first element is the sender's [`MemberId`] and the second
398    /// element is the original key.
399    ///
400    /// # Example
401    /// ```rust
402    /// # #[cfg(feature = "deploy")] {
403    /// # use hydro_lang::prelude::*;
404    /// # use futures::StreamExt;
405    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
406    /// # type Source = ();
407    /// # type Destination = ();
408    /// let source: Cluster<Source> = flow.cluster::<Source>();
409    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
410    ///     .source_iter(q!(vec![0, 1, 2, 3]))
411    ///     .map(q!(|x| (x, x + 123)))
412    ///     .into_keyed();
413    /// let destination_process = flow.process::<Destination>();
414    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
415    /// # all_received.entries().send_bincode(&p2)
416    /// # }, |mut stream| async move {
417    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
418    /// // {
419    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
420    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
421    /// //     ...
422    /// // }
423    /// # let mut results = Vec::new();
424    /// # for w in 0..16 {
425    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
426    /// # }
427    /// # results.sort();
428    /// # assert_eq!(results, vec![
429    /// #   "((MemberId::<()>(0), 0), 123)",
430    /// #   "((MemberId::<()>(0), 1), 124)",
431    /// #   "((MemberId::<()>(0), 2), 125)",
432    /// #   "((MemberId::<()>(0), 3), 126)",
433    /// #   "((MemberId::<()>(1), 0), 123)",
434    /// #   "((MemberId::<()>(1), 1), 124)",
435    /// #   "((MemberId::<()>(1), 2), 125)",
436    /// #   "((MemberId::<()>(1), 3), 126)",
437    /// #   "((MemberId::<()>(2), 0), 123)",
438    /// #   "((MemberId::<()>(2), 1), 124)",
439    /// #   "((MemberId::<()>(2), 2), 125)",
440    /// #   "((MemberId::<()>(2), 3), 126)",
441    /// #   "((MemberId::<()>(3), 0), 123)",
442    /// #   "((MemberId::<()>(3), 1), 124)",
443    /// #   "((MemberId::<()>(3), 2), 125)",
444    /// #   "((MemberId::<()>(3), 3), 126)",
445    /// # ]);
446    /// # }));
447    /// # }
448    /// ```
449    pub fn send_bincode<L2>(
450        self,
451        other: &Process<'a, L2>,
452    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
453    where
454        K: Serialize + DeserializeOwned,
455        V: Serialize + DeserializeOwned,
456    {
457        self.send(other, TCP.bincode())
458    }
459
460    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
461    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
462    /// network, using the configuration in `via` to set up the message transport. The resulting
463    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
464    /// the second element is the original key.
465    ///
466    /// # Example
467    /// ```rust
468    /// # #[cfg(feature = "deploy")] {
469    /// # use hydro_lang::prelude::*;
470    /// # use futures::StreamExt;
471    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
472    /// # type Source = ();
473    /// # type Destination = ();
474    /// let source: Cluster<Source> = flow.cluster::<Source>();
475    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
476    ///     .source_iter(q!(vec![0, 1, 2, 3]))
477    ///     .map(q!(|x| (x, x + 123)))
478    ///     .into_keyed();
479    /// let destination_process = flow.process::<Destination>();
480    /// let all_received = to_send.send(&destination_process, TCP.bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
481    /// # all_received.entries().send(&p2, TCP.bincode())
482    /// # }, |mut stream| async move {
483    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
484    /// // {
485    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
486    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
487    /// //     ...
488    /// // }
489    /// # let mut results = Vec::new();
490    /// # for w in 0..16 {
491    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
492    /// # }
493    /// # results.sort();
494    /// # assert_eq!(results, vec![
495    /// #   "((MemberId::<()>(0), 0), 123)",
496    /// #   "((MemberId::<()>(0), 1), 124)",
497    /// #   "((MemberId::<()>(0), 2), 125)",
498    /// #   "((MemberId::<()>(0), 3), 126)",
499    /// #   "((MemberId::<()>(1), 0), 123)",
500    /// #   "((MemberId::<()>(1), 1), 124)",
501    /// #   "((MemberId::<()>(1), 2), 125)",
502    /// #   "((MemberId::<()>(1), 3), 126)",
503    /// #   "((MemberId::<()>(2), 0), 123)",
504    /// #   "((MemberId::<()>(2), 1), 124)",
505    /// #   "((MemberId::<()>(2), 2), 125)",
506    /// #   "((MemberId::<()>(2), 3), 126)",
507    /// #   "((MemberId::<()>(3), 0), 123)",
508    /// #   "((MemberId::<()>(3), 1), 124)",
509    /// #   "((MemberId::<()>(3), 2), 125)",
510    /// #   "((MemberId::<()>(3), 3), 126)",
511    /// # ]);
512    /// # }));
513    /// # }
514    /// ```
515    pub fn send<L2, N: NetworkFor<(K, V)>>(
516        self,
517        to: &Process<'a, L2>,
518        via: N,
519    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
520    where
521        K: Serialize + DeserializeOwned,
522        V: Serialize + DeserializeOwned,
523    {
524        let _ = via;
525        let serialize_pipeline = Some(N::serialize_thunk(false));
526
527        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
528
529        let raw_stream: Stream<(MemberId<L>, (K, V)), Process<'a, L2>, Unbounded, O, R> =
530            Stream::new(
531                to.clone(),
532                HydroNode::Network {
533                    serialize_fn: serialize_pipeline.map(|e| e.into()),
534                    instantiate_fn: DebugInstantiate::Building,
535                    deserialize_fn: deserialize_pipeline.map(|e| e.into()),
536                    input: Box::new(self.ir_node.into_inner()),
537                    metadata: to.new_node_metadata(Stream::<
538                        (MemberId<L>, (K, V)),
539                        Cluster<'a, L2>,
540                        Unbounded,
541                        O,
542                        R,
543                    >::collection_kind()),
544                },
545            );
546
547        raw_stream
548            .map(q!(|(sender, (k, v))| ((sender, k), v)))
549            .into_keyed()
550    }
551}