hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::DynLocation;
19use crate::location::external_process::ExternalBincodeStream;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::networking::{NetworkFor, TCP};
22use crate::nondet::NonDet;
23#[cfg(feature = "sim")]
24use crate::sim::SimReceiver;
25use crate::staging_util::get_this_crate;
26
27// same as the one in `hydro_std`, but internal use only
28fn track_membership<'a, C, L: Location<'a> + NoTick>(
29    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
30) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
31    membership.fold(
32        q!(|| false),
33        q!(|present, event| {
34            match event {
35                MembershipEvent::Joined => *present = true,
36                MembershipEvent::Left => *present = false,
37            }
38        }),
39    )
40}
41
42fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
43    let root = get_this_crate();
44
45    if is_demux {
46        parse_quote! {
47            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
48                |(id, data)| {
49                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
50                }
51            )
52        }
53    } else {
54        parse_quote! {
55            ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
56                |data| {
57                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
58                }
59            )
60        }
61    }
62}
63
64pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
65    serialize_bincode_with_type(is_demux, &quote_type::<T>())
66}
67
68fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
69    let root = get_this_crate();
70
71    if let Some(c_type) = tagged {
72        parse_quote! {
73            |res| {
74                let (id, b) = res.unwrap();
75                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
76            }
77        }
78    } else {
79        parse_quote! {
80            |res| {
81                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
82            }
83        }
84    }
85}
86
87pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
88    deserialize_bincode_with_type(tagged, &quote_type::<T>())
89}
90
91impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
92    #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
93    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
94    /// using [`bincode`] to serialize/deserialize messages.
95    ///
96    /// The returned stream captures the elements received at the destination, where values will
97    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
98    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
99    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
100    /// dropped no further messages will be sent.
101    ///
102    /// # Example
103    /// ```rust
104    /// # #[cfg(feature = "deploy")] {
105    /// # use hydro_lang::prelude::*;
106    /// # use futures::StreamExt;
107    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
108    /// let p1 = flow.process::<()>();
109    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
110    /// let p2 = flow.process::<()>();
111    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
112    /// // 1, 2, 3
113    /// # on_p2.send_bincode(&p_out)
114    /// # }, |mut stream| async move {
115    /// # for w in 1..=3 {
116    /// #     assert_eq!(stream.next().await, Some(w));
117    /// # }
118    /// # }));
119    /// # }
120    /// ```
121    pub fn send_bincode<L2>(
122        self,
123        other: &Process<'a, L2>,
124    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
125    where
126        T: Serialize + DeserializeOwned,
127    {
128        self.send(other, TCP.bincode())
129    }
130
131    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
132    /// using the configuration in `via` to set up the message transport.
133    ///
134    /// The returned stream captures the elements received at the destination, where values will
135    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
136    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
137    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
138    /// dropped no further messages will be sent.
139    ///
140    /// # Example
141    /// ```rust
142    /// # #[cfg(feature = "deploy")] {
143    /// # use hydro_lang::prelude::*;
144    /// # use futures::StreamExt;
145    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
146    /// let p1 = flow.process::<()>();
147    /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
148    /// let p2 = flow.process::<()>();
149    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
150    /// // 1, 2, 3
151    /// # on_p2.send(&p_out, TCP.bincode())
152    /// # }, |mut stream| async move {
153    /// # for w in 1..=3 {
154    /// #     assert_eq!(stream.next().await, Some(w));
155    /// # }
156    /// # }));
157    /// # }
158    /// ```
159    pub fn send<L2, N: NetworkFor<T>>(
160        self,
161        to: &Process<'a, L2>,
162        via: N,
163    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
164    where
165        T: Serialize + DeserializeOwned,
166    {
167        let _ = via;
168        let serialize_pipeline = Some(N::serialize_thunk(false));
169        let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171        Stream::new(
172            to.clone(),
173            HydroNode::Network {
174                serialize_fn: serialize_pipeline.map(|e| e.into()),
175                instantiate_fn: DebugInstantiate::Building,
176                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
177                input: Box::new(self.ir_node.into_inner()),
178                metadata: to.new_node_metadata(
179                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
180                ),
181            },
182        )
183    }
184
185    #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
186    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
187    /// using [`bincode`] to serialize/deserialize messages.
188    ///
189    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
190    /// membership information. This is a common pattern in distributed systems for broadcasting data to
191    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
192    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
193    /// each element to all cluster members.
194    ///
195    /// # Non-Determinism
196    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
197    /// to the current cluster members _at that point in time_. Depending on when we are notified of
198    /// membership changes, we will broadcast each element to different members.
199    ///
200    /// # Example
201    /// ```rust
202    /// # #[cfg(feature = "deploy")] {
203    /// # use hydro_lang::prelude::*;
204    /// # use futures::StreamExt;
205    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
206    /// let p1 = flow.process::<()>();
207    /// let workers: Cluster<()> = flow.cluster::<()>();
208    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
209    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
210    /// # on_worker.send_bincode(&p2).entries()
211    /// // if there are 4 members in the cluster, each receives one element
212    /// // - MemberId::<()>(0): [123]
213    /// // - MemberId::<()>(1): [123]
214    /// // - MemberId::<()>(2): [123]
215    /// // - MemberId::<()>(3): [123]
216    /// # }, |mut stream| async move {
217    /// # let mut results = Vec::new();
218    /// # for w in 0..4 {
219    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
220    /// # }
221    /// # results.sort();
222    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
223    /// # }));
224    /// # }
225    /// ```
226    pub fn broadcast_bincode<L2: 'a>(
227        self,
228        other: &Cluster<'a, L2>,
229        nondet_membership: NonDet,
230    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
231    where
232        T: Clone + Serialize + DeserializeOwned,
233    {
234        self.broadcast(other, TCP.bincode(), nondet_membership)
235    }
236
237    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
238    /// using the configuration in `via` to set up the message transport.
239    ///
240    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
241    /// membership information. This is a common pattern in distributed systems for broadcasting data to
242    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
243    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
244    /// each element to all cluster members.
245    ///
246    /// # Non-Determinism
247    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
248    /// to the current cluster members _at that point in time_. Depending on when we are notified of
249    /// membership changes, we will broadcast each element to different members.
250    ///
251    /// # Example
252    /// ```rust
253    /// # #[cfg(feature = "deploy")] {
254    /// # use hydro_lang::prelude::*;
255    /// # use futures::StreamExt;
256    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
257    /// let p1 = flow.process::<()>();
258    /// let workers: Cluster<()> = flow.cluster::<()>();
259    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
260    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
261    /// # on_worker.send(&p2, TCP.bincode()).entries()
262    /// // if there are 4 members in the cluster, each receives one element
263    /// // - MemberId::<()>(0): [123]
264    /// // - MemberId::<()>(1): [123]
265    /// // - MemberId::<()>(2): [123]
266    /// // - MemberId::<()>(3): [123]
267    /// # }, |mut stream| async move {
268    /// # let mut results = Vec::new();
269    /// # for w in 0..4 {
270    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
271    /// # }
272    /// # results.sort();
273    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
274    /// # }));
275    /// # }
276    /// ```
277    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
278        self,
279        to: &Cluster<'a, L2>,
280        via: N,
281        nondet_membership: NonDet,
282    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
283    where
284        T: Clone + Serialize + DeserializeOwned,
285    {
286        let ids = track_membership(self.location.source_cluster_members(to));
287        sliced! {
288            let members_snapshot = use(ids, nondet_membership);
289            let elements = use(self, nondet_membership);
290
291            let current_members = members_snapshot.filter(q!(|b| *b));
292            elements.repeat_with_keys(current_members)
293        }
294        .demux(to, via)
295    }
296
297    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
298    /// serialization. The external process can receive these elements by establishing a TCP
299    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
300    ///
301    /// # Example
302    /// ```rust
303    /// # #[cfg(feature = "deploy")] {
304    /// # use hydro_lang::prelude::*;
305    /// # use futures::StreamExt;
306    /// # tokio_test::block_on(async move {
307    /// let flow = FlowBuilder::new();
308    /// let process = flow.process::<()>();
309    /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
310    /// let external = flow.external::<()>();
311    /// let external_handle = numbers.send_bincode_external(&external);
312    ///
313    /// let mut deployment = hydro_deploy::Deployment::new();
314    /// let nodes = flow
315    ///     .with_process(&process, deployment.Localhost())
316    ///     .with_external(&external, deployment.Localhost())
317    ///     .deploy(&mut deployment);
318    ///
319    /// deployment.deploy().await.unwrap();
320    /// // establish the TCP connection
321    /// let mut external_recv_stream = nodes.connect(external_handle).await;
322    /// deployment.start().await.unwrap();
323    ///
324    /// for w in 1..=3 {
325    ///     assert_eq!(external_recv_stream.next().await, Some(w));
326    /// }
327    /// # });
328    /// # }
329    /// ```
330    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
331    where
332        T: Serialize + DeserializeOwned,
333    {
334        let serialize_pipeline = Some(serialize_bincode::<T>(false));
335
336        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
337
338        let external_key = flow_state_borrow.next_external_out;
339        flow_state_borrow.next_external_out += 1;
340
341        flow_state_borrow.push_root(HydroRoot::SendExternal {
342            to_external_id: other.id,
343            to_key: external_key,
344            to_many: false,
345            unpaired: true,
346            serialize_fn: serialize_pipeline.map(|e| e.into()),
347            instantiate_fn: DebugInstantiate::Building,
348            input: Box::new(self.ir_node.into_inner()),
349            op_metadata: HydroIrOpMetadata::new(),
350        });
351
352        ExternalBincodeStream {
353            process_id: other.id,
354            port_id: external_key,
355            _phantom: PhantomData,
356        }
357    }
358
359    #[cfg(feature = "sim")]
360    /// Sets up a simulation output port for this stream, allowing test code to receive elements
361    /// sent to this stream during simulation.
362    pub fn sim_output(self) -> SimReceiver<T, O, R>
363    where
364        T: Serialize + DeserializeOwned,
365    {
366        let external_location: External<'a, ()> = External {
367            id: 0,
368            flow_state: self.location.flow_state().clone(),
369            _phantom: PhantomData,
370        };
371
372        let external = self.send_bincode_external(&external_location);
373
374        SimReceiver(external.port_id, PhantomData)
375    }
376}
377
378impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
379    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
380{
381    #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
382    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
383    /// using [`bincode`] to serialize/deserialize messages.
384    ///
385    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
386    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
387    /// this API allows precise targeting of specific cluster members rather than broadcasting to
388    /// all members.
389    ///
390    /// # Example
391    /// ```rust
392    /// # #[cfg(feature = "deploy")] {
393    /// # use hydro_lang::prelude::*;
394    /// # use futures::StreamExt;
395    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
396    /// let p1 = flow.process::<()>();
397    /// let workers: Cluster<()> = flow.cluster::<()>();
398    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
399    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
400    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
401    ///     .demux_bincode(&workers);
402    /// # on_worker.send_bincode(&p2).entries()
403    /// // if there are 4 members in the cluster, each receives one element
404    /// // - MemberId::<()>(0): [0]
405    /// // - MemberId::<()>(1): [1]
406    /// // - MemberId::<()>(2): [2]
407    /// // - MemberId::<()>(3): [3]
408    /// # }, |mut stream| async move {
409    /// # let mut results = Vec::new();
410    /// # for w in 0..4 {
411    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
412    /// # }
413    /// # results.sort();
414    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
415    /// # }));
416    /// # }
417    /// ```
418    pub fn demux_bincode(
419        self,
420        other: &Cluster<'a, L2>,
421    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
422    where
423        T: Serialize + DeserializeOwned,
424    {
425        self.demux(other, TCP.bincode())
426    }
427
428    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
429    /// using the configuration in `via` to set up the message transport.
430    ///
431    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
432    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
433    /// this API allows precise targeting of specific cluster members rather than broadcasting to
434    /// all members.
435    ///
436    /// # Example
437    /// ```rust
438    /// # #[cfg(feature = "deploy")] {
439    /// # use hydro_lang::prelude::*;
440    /// # use futures::StreamExt;
441    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
442    /// let p1 = flow.process::<()>();
443    /// let workers: Cluster<()> = flow.cluster::<()>();
444    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
445    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
446    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
447    ///     .demux(&workers, TCP.bincode());
448    /// # on_worker.send(&p2, TCP.bincode()).entries()
449    /// // if there are 4 members in the cluster, each receives one element
450    /// // - MemberId::<()>(0): [0]
451    /// // - MemberId::<()>(1): [1]
452    /// // - MemberId::<()>(2): [2]
453    /// // - MemberId::<()>(3): [3]
454    /// # }, |mut stream| async move {
455    /// # let mut results = Vec::new();
456    /// # for w in 0..4 {
457    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
458    /// # }
459    /// # results.sort();
460    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
461    /// # }));
462    /// # }
463    /// ```
464    pub fn demux<N: NetworkFor<T>>(
465        self,
466        to: &Cluster<'a, L2>,
467        via: N,
468    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
469    where
470        T: Serialize + DeserializeOwned,
471    {
472        self.into_keyed().demux(to, via)
473    }
474}
475
476impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
477    #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
478    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
479    /// [`bincode`] to serialize/deserialize messages.
480    ///
481    /// This provides load balancing by evenly distributing work across cluster members. The
482    /// distribution is deterministic based on element order - the first element goes to member 0,
483    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
484    ///
485    /// # Non-Determinism
486    /// The set of cluster members may asynchronously change over time. Each element is distributed
487    /// based on the current cluster membership _at that point in time_. Depending on when cluster
488    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
489    /// membership is stable, the order of members in the round-robin pattern may change across runs.
490    ///
491    /// # Ordering Requirements
492    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
493    /// order of messages and retries affects the round-robin pattern.
494    ///
495    /// # Example
496    /// ```rust
497    /// # #[cfg(feature = "deploy")] {
498    /// # use hydro_lang::prelude::*;
499    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
500    /// # use futures::StreamExt;
501    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
502    /// let p1 = flow.process::<()>();
503    /// let workers: Cluster<()> = flow.cluster::<()>();
504    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
505    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
506    /// on_worker.send_bincode(&p2)
507    /// # .first().values() // we use first to assert that each member gets one element
508    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
509    /// // - MemberId::<()>(?): [1]
510    /// // - MemberId::<()>(?): [2]
511    /// // - MemberId::<()>(?): [3]
512    /// // - MemberId::<()>(?): [4]
513    /// # }, |mut stream| async move {
514    /// # let mut results = Vec::new();
515    /// # for w in 0..4 {
516    /// #     results.push(stream.next().await.unwrap());
517    /// # }
518    /// # results.sort();
519    /// # assert_eq!(results, vec![1, 2, 3, 4]);
520    /// # }));
521    /// # }
522    /// ```
523    pub fn round_robin_bincode<L2: 'a>(
524        self,
525        other: &Cluster<'a, L2>,
526        nondet_membership: NonDet,
527    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
528    where
529        T: Serialize + DeserializeOwned,
530    {
531        self.round_robin(other, TCP.bincode(), nondet_membership)
532    }
533
534    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
535    /// the configuration in `via` to set up the message transport.
536    ///
537    /// This provides load balancing by evenly distributing work across cluster members. The
538    /// distribution is deterministic based on element order - the first element goes to member 0,
539    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
540    ///
541    /// # Non-Determinism
542    /// The set of cluster members may asynchronously change over time. Each element is distributed
543    /// based on the current cluster membership _at that point in time_. Depending on when cluster
544    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
545    /// membership is stable, the order of members in the round-robin pattern may change across runs.
546    ///
547    /// # Ordering Requirements
548    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
549    /// order of messages and retries affects the round-robin pattern.
550    ///
551    /// # Example
552    /// ```rust
553    /// # #[cfg(feature = "deploy")] {
554    /// # use hydro_lang::prelude::*;
555    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
556    /// # use futures::StreamExt;
557    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
558    /// let p1 = flow.process::<()>();
559    /// let workers: Cluster<()> = flow.cluster::<()>();
560    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
561    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
562    /// on_worker.send(&p2, TCP.bincode())
563    /// # .first().values() // we use first to assert that each member gets one element
564    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
565    /// // - MemberId::<()>(?): [1]
566    /// // - MemberId::<()>(?): [2]
567    /// // - MemberId::<()>(?): [3]
568    /// // - MemberId::<()>(?): [4]
569    /// # }, |mut stream| async move {
570    /// # let mut results = Vec::new();
571    /// # for w in 0..4 {
572    /// #     results.push(stream.next().await.unwrap());
573    /// # }
574    /// # results.sort();
575    /// # assert_eq!(results, vec![1, 2, 3, 4]);
576    /// # }));
577    /// # }
578    /// ```
579    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
580        self,
581        to: &Cluster<'a, L2>,
582        via: N,
583        nondet_membership: NonDet,
584    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
585    where
586        T: Serialize + DeserializeOwned,
587    {
588        let ids = track_membership(self.location.source_cluster_members(to));
589        sliced! {
590            let members_snapshot = use(ids, nondet_membership);
591            let elements = use(self.enumerate(), nondet_membership);
592
593            let current_members = members_snapshot
594                .filter(q!(|b| *b))
595                .keys()
596                .assume_ordering(nondet_membership)
597                .collect_vec();
598
599            elements
600                .cross_singleton(current_members)
601                .map(q!(|(data, members)| (
602                    members[data.0 % members.len()].clone(),
603                    data.1
604                )))
605        }
606        .demux(to, via)
607    }
608}
609
610impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
611    #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
612    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
613    /// [`bincode`] to serialize/deserialize messages.
614    ///
615    /// This provides load balancing by evenly distributing work across cluster members. The
616    /// distribution is deterministic based on element order - the first element goes to member 0,
617    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
618    ///
619    /// # Non-Determinism
620    /// The set of cluster members may asynchronously change over time. Each element is distributed
621    /// based on the current cluster membership _at that point in time_. Depending on when cluster
622    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
623    /// membership is stable, the order of members in the round-robin pattern may change across runs.
624    ///
625    /// # Ordering Requirements
626    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
627    /// order of messages and retries affects the round-robin pattern.
628    ///
629    /// # Example
630    /// ```rust
631    /// # #[cfg(feature = "deploy")] {
632    /// # use hydro_lang::prelude::*;
633    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
634    /// # use hydro_lang::location::MemberId;
635    /// # use futures::StreamExt;
636    /// # std::thread::spawn(|| {
637    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
638    /// let p1 = flow.process::<()>();
639    /// let workers1: Cluster<()> = flow.cluster::<()>();
640    /// let workers2: Cluster<()> = flow.cluster::<()>();
641    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
642    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
643    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
644    /// on_worker2.send_bincode(&p2)
645    /// # .entries()
646    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
647    /// # }, |mut stream| async move {
648    /// # let mut results = Vec::new();
649    /// # let mut locations = std::collections::HashSet::new();
650    /// # for w in 0..=16 {
651    /// #     let (location, v) = stream.next().await.unwrap();
652    /// #     locations.insert(location);
653    /// #     results.push(v);
654    /// # }
655    /// # results.sort();
656    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
657    /// # assert_eq!(locations.len(), 16);
658    /// # }));
659    /// # }).join().unwrap();
660    /// # }
661    /// ```
662    pub fn round_robin_bincode<L2: 'a>(
663        self,
664        other: &Cluster<'a, L2>,
665        nondet_membership: NonDet,
666    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
667    where
668        T: Serialize + DeserializeOwned,
669    {
670        self.round_robin(other, TCP.bincode(), nondet_membership)
671    }
672
673    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
674    /// the configuration in `via` to set up the message transport.
675    ///
676    /// This provides load balancing by evenly distributing work across cluster members. The
677    /// distribution is deterministic based on element order - the first element goes to member 0,
678    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
679    ///
680    /// # Non-Determinism
681    /// The set of cluster members may asynchronously change over time. Each element is distributed
682    /// based on the current cluster membership _at that point in time_. Depending on when cluster
683    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
684    /// membership is stable, the order of members in the round-robin pattern may change across runs.
685    ///
686    /// # Ordering Requirements
687    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
688    /// order of messages and retries affects the round-robin pattern.
689    ///
690    /// # Example
691    /// ```rust
692    /// # #[cfg(feature = "deploy")] {
693    /// # use hydro_lang::prelude::*;
694    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
695    /// # use hydro_lang::location::MemberId;
696    /// # use futures::StreamExt;
697    /// # std::thread::spawn(|| {
698    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
699    /// let p1 = flow.process::<()>();
700    /// let workers1: Cluster<()> = flow.cluster::<()>();
701    /// let workers2: Cluster<()> = flow.cluster::<()>();
702    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
703    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.bincode(), nondet!(/** assuming stable membership */));
704    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
705    /// on_worker2.send(&p2, TCP.bincode())
706    /// # .entries()
707    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
708    /// # }, |mut stream| async move {
709    /// # let mut results = Vec::new();
710    /// # let mut locations = std::collections::HashSet::new();
711    /// # for w in 0..=16 {
712    /// #     let (location, v) = stream.next().await.unwrap();
713    /// #     locations.insert(location);
714    /// #     results.push(v);
715    /// # }
716    /// # results.sort();
717    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
718    /// # assert_eq!(locations.len(), 16);
719    /// # }));
720    /// # }).join().unwrap();
721    /// # }
722    /// ```
723    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
724        self,
725        to: &Cluster<'a, L2>,
726        via: N,
727        nondet_membership: NonDet,
728    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
729    where
730        T: Serialize + DeserializeOwned,
731    {
732        let ids = track_membership(self.location.source_cluster_members(to));
733        sliced! {
734            let members_snapshot = use(ids, nondet_membership);
735            let elements = use(self.enumerate(), nondet_membership);
736
737            let current_members = members_snapshot
738                .filter(q!(|b| *b))
739                .keys()
740                .assume_ordering(nondet_membership)
741                .collect_vec();
742
743            elements
744                .cross_singleton(current_members)
745                .map(q!(|(data, members)| (
746                    members[data.0 % members.len()].clone(),
747                    data.1
748                )))
749        }
750        .demux(to, via)
751    }
752}
753
754impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
755    #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
756    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
757    /// using [`bincode`] to serialize/deserialize messages.
758    ///
759    /// Each cluster member sends its local stream elements, and they are collected at the destination
760    /// as a [`KeyedStream`] where keys identify the source cluster member.
761    ///
762    /// # Example
763    /// ```rust
764    /// # #[cfg(feature = "deploy")] {
765    /// # use hydro_lang::prelude::*;
766    /// # use futures::StreamExt;
767    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
768    /// let workers: Cluster<()> = flow.cluster::<()>();
769    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
770    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
771    /// # all_received.entries()
772    /// # }, |mut stream| async move {
773    /// // if there are 4 members in the cluster, we should receive 4 elements
774    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
775    /// # let mut results = Vec::new();
776    /// # for w in 0..4 {
777    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
778    /// # }
779    /// # results.sort();
780    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
781    /// # }));
782    /// # }
783    /// ```
784    ///
785    /// If you don't need to know the source for each element, you can use `.values()`
786    /// to get just the data:
787    /// ```rust
788    /// # #[cfg(feature = "deploy")] {
789    /// # use hydro_lang::prelude::*;
790    /// # use hydro_lang::live_collections::stream::NoOrder;
791    /// # use futures::StreamExt;
792    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
793    /// # let workers: Cluster<()> = flow.cluster::<()>();
794    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
795    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
796    /// # values
797    /// # }, |mut stream| async move {
798    /// # let mut results = Vec::new();
799    /// # for w in 0..4 {
800    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
801    /// # }
802    /// # results.sort();
803    /// // if there are 4 members in the cluster, we should receive 4 elements
804    /// // 1, 1, 1, 1
805    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
806    /// # }));
807    /// # }
808    /// ```
809    pub fn send_bincode<L2>(
810        self,
811        other: &Process<'a, L2>,
812    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
813    where
814        T: Serialize + DeserializeOwned,
815    {
816        self.send(other, TCP.bincode())
817    }
818
819    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
820    /// using the configuration in `via` to set up the message transport.
821    ///
822    /// Each cluster member sends its local stream elements, and they are collected at the destination
823    /// as a [`KeyedStream`] where keys identify the source cluster member.
824    ///
825    /// # Example
826    /// ```rust
827    /// # #[cfg(feature = "deploy")] {
828    /// # use hydro_lang::prelude::*;
829    /// # use futures::StreamExt;
830    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
831    /// let workers: Cluster<()> = flow.cluster::<()>();
832    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
833    /// let all_received = numbers.send(&process, TCP.bincode()); // KeyedStream<MemberId<()>, i32, ...>
834    /// # all_received.entries()
835    /// # }, |mut stream| async move {
836    /// // if there are 4 members in the cluster, we should receive 4 elements
837    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
838    /// # let mut results = Vec::new();
839    /// # for w in 0..4 {
840    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
841    /// # }
842    /// # results.sort();
843    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
844    /// # }));
845    /// # }
846    /// ```
847    ///
848    /// If you don't need to know the source for each element, you can use `.values()`
849    /// to get just the data:
850    /// ```rust
851    /// # #[cfg(feature = "deploy")] {
852    /// # use hydro_lang::prelude::*;
853    /// # use hydro_lang::live_collections::stream::NoOrder;
854    /// # use futures::StreamExt;
855    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
856    /// # let workers: Cluster<()> = flow.cluster::<()>();
857    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
858    /// let values: Stream<i32, _, _, NoOrder> = numbers.send(&process, TCP.bincode()).values();
859    /// # values
860    /// # }, |mut stream| async move {
861    /// # let mut results = Vec::new();
862    /// # for w in 0..4 {
863    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
864    /// # }
865    /// # results.sort();
866    /// // if there are 4 members in the cluster, we should receive 4 elements
867    /// // 1, 1, 1, 1
868    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
869    /// # }));
870    /// # }
871    /// ```
872    pub fn send<L2, N: NetworkFor<T>>(
873        self,
874        to: &Process<'a, L2>,
875        via: N,
876    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
877    where
878        T: Serialize + DeserializeOwned,
879    {
880        let _ = via;
881        let serialize_pipeline = Some(N::serialize_thunk(false));
882
883        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
884
885        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
886            to.clone(),
887            HydroNode::Network {
888                serialize_fn: serialize_pipeline.map(|e| e.into()),
889                instantiate_fn: DebugInstantiate::Building,
890                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
891                input: Box::new(self.ir_node.into_inner()),
892                metadata: to.new_node_metadata(Stream::<
893                    (MemberId<L>, T),
894                    Process<'a, L2>,
895                    Unbounded,
896                    O,
897                    R,
898                >::collection_kind()),
899            },
900        );
901
902        raw_stream.into_keyed()
903    }
904
905    #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
906    /// Broadcasts elements of this stream at each source member to all members of a destination
907    /// cluster, using [`bincode`] to serialize/deserialize messages.
908    ///
909    /// Each source member sends each of its stream elements to **every** member of the cluster
910    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
911    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
912    /// **only data elements** and sends each element to all cluster members.
913    ///
914    /// # Non-Determinism
915    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
916    /// to the current cluster members known _at that point in time_ at the source member. Depending
917    /// on when each source member is notified of membership changes, it will broadcast each element
918    /// to different members.
919    ///
920    /// # Example
921    /// ```rust
922    /// # #[cfg(feature = "deploy")] {
923    /// # use hydro_lang::prelude::*;
924    /// # use hydro_lang::location::MemberId;
925    /// # use futures::StreamExt;
926    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
927    /// # type Source = ();
928    /// # type Destination = ();
929    /// let source: Cluster<Source> = flow.cluster::<Source>();
930    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
931    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
932    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
933    /// # on_destination.entries().send_bincode(&p2).entries()
934    /// // if there are 4 members in the desination, each receives one element from each source member
935    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
936    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
937    /// // - ...
938    /// # }, |mut stream| async move {
939    /// # let mut results = Vec::new();
940    /// # for w in 0..16 {
941    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
942    /// # }
943    /// # results.sort();
944    /// # assert_eq!(results, vec![
945    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
946    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
947    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
948    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
949    /// # ]);
950    /// # }));
951    /// # }
952    /// ```
953    pub fn broadcast_bincode<L2: 'a>(
954        self,
955        other: &Cluster<'a, L2>,
956        nondet_membership: NonDet,
957    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
958    where
959        T: Clone + Serialize + DeserializeOwned,
960    {
961        self.broadcast(other, TCP.bincode(), nondet_membership)
962    }
963
964    /// Broadcasts elements of this stream at each source member to all members of a destination
965    /// cluster, using the configuration in `via` to set up the message transport.
966    ///
967    /// Each source member sends each of its stream elements to **every** member of the cluster
968    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
969    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
970    /// **only data elements** and sends each element to all cluster members.
971    ///
972    /// # Non-Determinism
973    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
974    /// to the current cluster members known _at that point in time_ at the source member. Depending
975    /// on when each source member is notified of membership changes, it will broadcast each element
976    /// to different members.
977    ///
978    /// # Example
979    /// ```rust
980    /// # #[cfg(feature = "deploy")] {
981    /// # use hydro_lang::prelude::*;
982    /// # use hydro_lang::location::MemberId;
983    /// # use futures::StreamExt;
984    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
985    /// # type Source = ();
986    /// # type Destination = ();
987    /// let source: Cluster<Source> = flow.cluster::<Source>();
988    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
989    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
990    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.bincode(), nondet!(/** assuming stable membership */));
991    /// # on_destination.entries().send(&p2, TCP.bincode()).entries()
992    /// // if there are 4 members in the desination, each receives one element from each source member
993    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
994    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
995    /// // - ...
996    /// # }, |mut stream| async move {
997    /// # let mut results = Vec::new();
998    /// # for w in 0..16 {
999    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1000    /// # }
1001    /// # results.sort();
1002    /// # assert_eq!(results, vec![
1003    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1004    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1005    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1006    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1007    /// # ]);
1008    /// # }));
1009    /// # }
1010    /// ```
1011    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1012        self,
1013        to: &Cluster<'a, L2>,
1014        via: N,
1015        nondet_membership: NonDet,
1016    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1017    where
1018        T: Clone + Serialize + DeserializeOwned,
1019    {
1020        let ids = track_membership(self.location.source_cluster_members(to));
1021        sliced! {
1022            let members_snapshot = use(ids, nondet_membership);
1023            let elements = use(self, nondet_membership);
1024
1025            let current_members = members_snapshot.filter(q!(|b| *b));
1026            elements.repeat_with_keys(current_members)
1027        }
1028        .demux(to, via)
1029    }
1030}
1031
1032impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1033    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1034{
1035    #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
1036    /// Sends elements of this stream at each source member to specific members of a destination
1037    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1038    ///
1039    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1040    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1041    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1042    /// all members.
1043    ///
1044    /// Each cluster member sends its local stream elements, and they are collected at each
1045    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1046    ///
1047    /// # Example
1048    /// ```rust
1049    /// # #[cfg(feature = "deploy")] {
1050    /// # use hydro_lang::prelude::*;
1051    /// # use futures::StreamExt;
1052    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1053    /// # type Source = ();
1054    /// # type Destination = ();
1055    /// let source: Cluster<Source> = flow.cluster::<Source>();
1056    /// let to_send: Stream<_, Cluster<_>, _> = source
1057    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1058    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1059    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1060    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1061    /// # all_received.entries().send_bincode(&p2).entries()
1062    /// # }, |mut stream| async move {
1063    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1064    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1065    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1066    /// // - ...
1067    /// # let mut results = Vec::new();
1068    /// # for w in 0..16 {
1069    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1070    /// # }
1071    /// # results.sort();
1072    /// # assert_eq!(results, vec![
1073    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1074    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1075    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1076    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1077    /// # ]);
1078    /// # }));
1079    /// # }
1080    /// ```
1081    pub fn demux_bincode(
1082        self,
1083        other: &Cluster<'a, L2>,
1084    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1085    where
1086        T: Serialize + DeserializeOwned,
1087    {
1088        self.demux(other, TCP.bincode())
1089    }
1090
1091    /// Sends elements of this stream at each source member to specific members of a destination
1092    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1093    /// message transport.
1094    ///
1095    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1096    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1097    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1098    /// all members.
1099    ///
1100    /// Each cluster member sends its local stream elements, and they are collected at each
1101    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1102    ///
1103    /// # Example
1104    /// ```rust
1105    /// # #[cfg(feature = "deploy")] {
1106    /// # use hydro_lang::prelude::*;
1107    /// # use futures::StreamExt;
1108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1109    /// # type Source = ();
1110    /// # type Destination = ();
1111    /// let source: Cluster<Source> = flow.cluster::<Source>();
1112    /// let to_send: Stream<_, Cluster<_>, _> = source
1113    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1114    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1115    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1116    /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1117    /// # all_received.entries().send(&p2, TCP.bincode()).entries()
1118    /// # }, |mut stream| async move {
1119    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1120    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1121    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1122    /// // - ...
1123    /// # let mut results = Vec::new();
1124    /// # for w in 0..16 {
1125    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1126    /// # }
1127    /// # results.sort();
1128    /// # assert_eq!(results, vec![
1129    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1130    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1131    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1132    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1133    /// # ]);
1134    /// # }));
1135    /// # }
1136    /// ```
1137    pub fn demux<N: NetworkFor<T>>(
1138        self,
1139        to: &Cluster<'a, L2>,
1140        via: N,
1141    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1142    where
1143        T: Serialize + DeserializeOwned,
1144    {
1145        self.into_keyed().demux(to, via)
1146    }
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151    #[cfg(feature = "sim")]
1152    use stageleft::q;
1153
1154    #[cfg(feature = "sim")]
1155    use crate::location::{Location, MemberId};
1156    #[cfg(feature = "sim")]
1157    use crate::networking::TCP;
1158    #[cfg(feature = "sim")]
1159    use crate::nondet::nondet;
1160    #[cfg(feature = "sim")]
1161    use crate::prelude::FlowBuilder;
1162
1163    #[cfg(feature = "sim")]
1164    #[test]
1165    fn sim_send_bincode_o2o() {
1166        use crate::networking::TCP;
1167
1168        let flow = FlowBuilder::new();
1169        let node = flow.process::<()>();
1170        let node2 = flow.process::<()>();
1171
1172        let (in_send, input) = node.sim_input();
1173
1174        let out_recv = input
1175            .send(&node2, TCP.bincode())
1176            .batch(&node2.tick(), nondet!(/** test */))
1177            .count()
1178            .all_ticks()
1179            .sim_output();
1180
1181        let instances = flow.sim().exhaustive(async || {
1182            in_send.send(());
1183            in_send.send(());
1184            in_send.send(());
1185
1186            let received = out_recv.collect::<Vec<_>>().await;
1187            assert!(received.into_iter().sum::<usize>() == 3);
1188        });
1189
1190        assert_eq!(instances, 4); // 2^{3 - 1}
1191    }
1192
1193    #[cfg(feature = "sim")]
1194    #[test]
1195    fn sim_send_bincode_m2o() {
1196        let flow = FlowBuilder::new();
1197        let cluster = flow.cluster::<()>();
1198        let node = flow.process::<()>();
1199
1200        let input = cluster.source_iter(q!(vec![1]));
1201
1202        let out_recv = input
1203            .send(&node, TCP.bincode())
1204            .entries()
1205            .batch(&node.tick(), nondet!(/** test */))
1206            .all_ticks()
1207            .sim_output();
1208
1209        let instances = flow
1210            .sim()
1211            .with_cluster_size(&cluster, 4)
1212            .exhaustive(async || {
1213                out_recv
1214                    .assert_yields_only_unordered(vec![
1215                        (MemberId::from_raw_id(0), 1),
1216                        (MemberId::from_raw_id(1), 1),
1217                        (MemberId::from_raw_id(2), 1),
1218                        (MemberId::from_raw_id(3), 1),
1219                    ])
1220                    .await
1221            });
1222
1223        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1224    }
1225
1226    #[cfg(feature = "sim")]
1227    #[test]
1228    fn sim_send_bincode_multiple_m2o() {
1229        let flow = FlowBuilder::new();
1230        let cluster1 = flow.cluster::<()>();
1231        let cluster2 = flow.cluster::<()>();
1232        let node = flow.process::<()>();
1233
1234        let out_recv_1 = cluster1
1235            .source_iter(q!(vec![1]))
1236            .send(&node, TCP.bincode())
1237            .entries()
1238            .sim_output();
1239
1240        let out_recv_2 = cluster2
1241            .source_iter(q!(vec![2]))
1242            .send(&node, TCP.bincode())
1243            .entries()
1244            .sim_output();
1245
1246        let instances = flow
1247            .sim()
1248            .with_cluster_size(&cluster1, 3)
1249            .with_cluster_size(&cluster2, 4)
1250            .exhaustive(async || {
1251                out_recv_1
1252                    .assert_yields_only_unordered(vec![
1253                        (MemberId::from_raw_id(0), 1),
1254                        (MemberId::from_raw_id(1), 1),
1255                        (MemberId::from_raw_id(2), 1),
1256                    ])
1257                    .await;
1258
1259                out_recv_2
1260                    .assert_yields_only_unordered(vec![
1261                        (MemberId::from_raw_id(0), 2),
1262                        (MemberId::from_raw_id(1), 2),
1263                        (MemberId::from_raw_id(2), 2),
1264                        (MemberId::from_raw_id(3), 2),
1265                    ])
1266                    .await;
1267            });
1268
1269        assert_eq!(instances, 1);
1270    }
1271
1272    #[cfg(feature = "sim")]
1273    #[test]
1274    fn sim_send_bincode_o2m() {
1275        let flow = FlowBuilder::new();
1276        let cluster = flow.cluster::<()>();
1277        let node = flow.process::<()>();
1278
1279        let input = node.source_iter(q!(vec![
1280            (MemberId::from_raw_id(0), 123),
1281            (MemberId::from_raw_id(1), 456),
1282        ]));
1283
1284        let out_recv = input
1285            .demux(&cluster, TCP.bincode())
1286            .map(q!(|x| x + 1))
1287            .send(&node, TCP.bincode())
1288            .entries()
1289            .sim_output();
1290
1291        flow.sim()
1292            .with_cluster_size(&cluster, 4)
1293            .exhaustive(async || {
1294                out_recv
1295                    .assert_yields_only_unordered(vec![
1296                        (MemberId::from_raw_id(0), 124),
1297                        (MemberId::from_raw_id(1), 457),
1298                    ])
1299                    .await
1300            });
1301    }
1302
1303    #[cfg(feature = "sim")]
1304    #[test]
1305    fn sim_broadcast_bincode_o2m() {
1306        let flow = FlowBuilder::new();
1307        let cluster = flow.cluster::<()>();
1308        let node = flow.process::<()>();
1309
1310        let input = node.source_iter(q!(vec![123, 456]));
1311
1312        let out_recv = input
1313            .broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1314            .map(q!(|x| x + 1))
1315            .send(&node, TCP.bincode())
1316            .entries()
1317            .sim_output();
1318
1319        let mut c_1_produced = false;
1320        let mut c_2_produced = false;
1321
1322        flow.sim()
1323            .with_cluster_size(&cluster, 2)
1324            .exhaustive(async || {
1325                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1326
1327                // check that order is preserved
1328                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1329                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1330                    c_1_produced = true;
1331                }
1332
1333                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1334                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1335                    c_2_produced = true;
1336                }
1337            });
1338
1339        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1340    }
1341
1342    #[cfg(feature = "sim")]
1343    #[test]
1344    fn sim_send_bincode_m2m() {
1345        let flow = FlowBuilder::new();
1346        let cluster = flow.cluster::<()>();
1347        let node = flow.process::<()>();
1348
1349        let input = node.source_iter(q!(vec![
1350            (MemberId::from_raw_id(0), 123),
1351            (MemberId::from_raw_id(1), 456),
1352        ]));
1353
1354        let out_recv = input
1355            .demux(&cluster, TCP.bincode())
1356            .map(q!(|x| x + 1))
1357            .flat_map_ordered(q!(|x| vec![
1358                (MemberId::from_raw_id(0), x),
1359                (MemberId::from_raw_id(1), x),
1360            ]))
1361            .demux(&cluster, TCP.bincode())
1362            .entries()
1363            .send(&node, TCP.bincode())
1364            .entries()
1365            .sim_output();
1366
1367        flow.sim()
1368            .with_cluster_size(&cluster, 4)
1369            .exhaustive(async || {
1370                out_recv
1371                    .assert_yields_only_unordered(vec![
1372                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1373                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1374                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1375                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1376                    ])
1377                    .await
1378            });
1379    }
1380}