Struct renoir::KeyedStream
source · pub struct KeyedStream<OperatorChain>(pub Stream<OperatorChain>)
where
OperatorChain: Operator,
OperatorChain::Out: KeyedItem;
Expand description
A KeyedStream
is like a set of Stream
s, each of which partitioned by some Key
. Internally
it’s just a stream whose elements are (K, V)
pairs and the operators behave following the
KeyedStream
semantics.
The type of the Key
must be a valid key inside an hashmap.
Tuple Fields§
§0: Stream<OperatorChain>
Implementations§
source§impl<Key: ExchangeDataKey, In: ExchangeData + Default, OperatorChain> KeyedStream<OperatorChain>
impl<Key: ExchangeDataKey, In: ExchangeData + Default, OperatorChain> KeyedStream<OperatorChain>
sourcepub fn delta_iterate<U: ExchangeData, D: ExchangeData, O: ExchangeData, Body, BodyOperator>(
self,
num_iterations: usize,
process_delta: impl Fn(&Key, &mut In, D) + Clone + Send + 'static,
make_update: impl Fn(&Key, &mut In) -> U + Clone + Send + 'static,
make_output: impl Fn(&Key, In) -> O + Clone + Send + 'static,
condition: impl Fn(&D) -> bool + Clone + Send + 'static,
body: Body
) -> KeyedStream<impl Operator<Out = (Key, O)>>where
Body: FnOnce(KeyedStream<DeltaIterate<Key, In, U, D, O>>) -> KeyedStream<BodyOperator> + 'static,
BodyOperator: Operator<Out = (Key, D)> + 'static,
pub fn delta_iterate<U: ExchangeData, D: ExchangeData, O: ExchangeData, Body, BodyOperator>(
self,
num_iterations: usize,
process_delta: impl Fn(&Key, &mut In, D) + Clone + Send + 'static,
make_update: impl Fn(&Key, &mut In) -> U + Clone + Send + 'static,
make_output: impl Fn(&Key, In) -> O + Clone + Send + 'static,
condition: impl Fn(&D) -> bool + Clone + Send + 'static,
body: Body
) -> KeyedStream<impl Operator<Out = (Key, O)>>where
Body: FnOnce(KeyedStream<DeltaIterate<Key, In, U, D, O>>) -> KeyedStream<BodyOperator> + 'static,
BodyOperator: Operator<Out = (Key, D)> + 'static,
TODO DOCS
source§impl<K: DataKey + ExchangeData + Debug, V1: Data + ExchangeData + Debug, O1> KeyedStream<O1>
impl<K: DataKey + ExchangeData + Debug, V1: Data + ExchangeData + Debug, O1> KeyedStream<O1>
pub fn join_outer<V2: Data + ExchangeData + Debug, O2>( self, rhs: KeyedStream<O2> ) -> KeyedStream<impl Operator<Out = (K, (Option<V1>, Option<V2>))>>
pub fn join<V2: Data + ExchangeData + Debug, O2>( self, rhs: KeyedStream<O2> ) -> KeyedStream<impl Operator<Out = (K, (V1, V2))>>
source§impl<Key, Out, OperatorChain> KeyedStream<OperatorChain>where
OperatorChain: Operator<Out = (Key, Out)> + 'static,
Key: ExchangeData + DataKey,
Out: ExchangeData,
impl<Key, Out, OperatorChain> KeyedStream<OperatorChain>where
OperatorChain: Operator<Out = (Key, Out)> + 'static,
Key: ExchangeData + DataKey,
Out: ExchangeData,
pub fn window_join<Out2, OperatorChain2, WindowDescr>(
self,
descr: WindowDescr,
right: KeyedStream<OperatorChain2>
) -> KeyedStream<impl Operator<Out = (Key, (Out, Out2))>>where
OperatorChain2: Operator<Out = (Key, Out2)> + 'static,
Out2: ExchangeData,
WindowDescr: WindowDescription<MergeElement<Out, Out2>> + 'static,
source§impl<Key: DataKey, Out: Data, OperatorChain> KeyedStream<OperatorChain>where
OperatorChain: Operator<Out = (Key, Out)> + 'static,
impl<Key: DataKey, Out: Data, OperatorChain> KeyedStream<OperatorChain>where
OperatorChain: Operator<Out = (Key, Out)> + 'static,
sourcepub fn window<WinOut: Data, WinDescr: WindowDescription<Out>>(
self,
descr: WinDescr
) -> WindowedStream<impl Operator<Out = (Key, Out)>, WinOut, WinDescr>
pub fn window<WinOut: Data, WinDescr: WindowDescription<Out>>( self, descr: WinDescr ) -> WindowedStream<impl Operator<Out = (Key, Out)>, WinOut, WinDescr>
Apply a window to the stream.
Returns a WindowedStream
, with windows created following the behavior specified
by the passed WindowDescription
.
§Example
let s = env.stream_iter(0..9);
let res = s
.group_by(|&n| n % 2)
.window(CountWindow::sliding(3, 2))
.sum()
.collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (0, 4 + 6 + 8), (1, 1 + 3 + 5)]);
source§impl<Op, K, I> KeyedStream<Op>
impl<Op, K, I> KeyedStream<Op>
sourcepub fn add_timestamps<F, G>(
self,
timestamp_gen: F,
watermark_gen: G
) -> KeyedStream<impl Operator<Out = Op::Out>>
pub fn add_timestamps<F, G>( self, timestamp_gen: F, watermark_gen: G ) -> KeyedStream<impl Operator<Out = Op::Out>>
Given a keyed stream without timestamps nor watermarks, tag each item with a timestamp and insert watermarks.
The two functions given to this operator are the following:
timestamp_gen
returns the timestamp assigned to the provided element of the streamwatermark_gen
returns an optional watermark to add after the provided element
Note that the two functions must follow the watermark semantics. TODO: link to watermark semantics
§Example
In this example the stream contains the integers from 0 to 9 and group them by parity, each will be tagged with a timestamp with the value of the item as milliseconds, and after each even number a watermark will be inserted.
use renoir::operator::Timestamp;
let s = env.stream_iter(0..10);
s
.group_by(|i| i % 2)
.add_timestamps(
|&(_k, n)| n,
|&(_k, n), &ts| if n % 2 == 0 { Some(ts) } else { None }
);
pub fn drop_timestamps(self) -> KeyedStream<impl Operator<Out = Op::Out>>
sourcepub fn batch_mode(self, batch_mode: BatchMode) -> Self
pub fn batch_mode(self, batch_mode: BatchMode) -> Self
Change the batch mode for this stream.
This change will be propagated to all the operators following, even of the next blocks, until it’s changed again.
§Example
use renoir::BatchMode;
let s = env.stream_iter(0..10).group_by(|&n| n % 2);
s.batch_mode(BatchMode::fixed(1024));
sourcepub fn filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
Remove from the stream all the elements for which the provided function returns None
and
keep the elements that returned Some(_)
.
Note: this is very similar to Iteartor::filter_map
§Example
let s = env.stream_iter(0..10).group_by(|&n| n % 2);
let res = s.filter_map(|(_key, n)| if n % 3 == 0 { Some(n * 4) } else { None }).collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 24), (1, 12), (1, 36)]);
sourcepub fn filter<F>(self, predicate: F) -> KeyedStream<impl Operator<Out = (K, I)>>
pub fn filter<F>(self, predicate: F) -> KeyedStream<impl Operator<Out = (K, I)>>
Remove from the stream all the elements for which the provided predicate returns false
.
Note: this is very similar to Iteartor::filter
§Example
let s = env.stream_iter(0..10).group_by(|&n| n % 2);
let res = s.filter(|&(_key, n)| n % 3 == 0).collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 6), (1, 3), (1, 9)]);
sourcepub fn flat_map<O, It, F>(
self,
f: F
) -> KeyedStream<impl Operator<Out = (K, O)>>where
It: IntoIterator<Item = O> + 'static,
<It as IntoIterator>::IntoIter: Send + 'static,
F: Fn(Op::Out) -> It + Send + Clone + 'static,
O: Data,
pub fn flat_map<O, It, F>(
self,
f: F
) -> KeyedStream<impl Operator<Out = (K, O)>>where
It: IntoIterator<Item = O> + 'static,
<It as IntoIterator>::IntoIter: Send + 'static,
F: Fn(Op::Out) -> It + Send + Clone + 'static,
O: Data,
Apply a mapping operation to each element of the stream, the resulting stream will be the flatMaped values of the result of the mapping.
Note: this is very similar to Iteartor::flat_map
.
§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.flat_map(|(_key, n)| vec![n, n]).collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 0), (0, 2), (0, 2), (1, 1), (1, 1)]);
sourcepub fn inspect<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
pub fn inspect<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
Apply the given function to all the elements of the stream, consuming the stream.
§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
s.inspect(|(key, n)| println!("Item: {} has key {}", n, key)).for_each(std::mem::drop);
env.execute_blocking();
sourcepub fn fold<O, F>(
self,
init: O,
f: F
) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn fold<O, F>( self, init: O, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
Perform the folding operation separately for each key.
Note that there is a difference between stream.group_by(keyer).fold(...)
and
stream.group_by_fold(keyer, ...)
. The first performs the network shuffle of every item in
the stream, and later performs the folding (i.e. nearly all the elements will be sent to
the network). The latter avoids sending the items by performing first a local reduction on
each host, and then send only the locally folded results (i.e. one message per replica, per
key); then the global step is performed aggregating the results.
The resulting stream will still be keyed and will contain only a single message per key (the final result).
Note that the output type may be different from the input type. Consider using
KeyedStream::reduce
if the output type is the same as the input type.
Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
let res = s
.fold(0, |acc, value| *acc += value)
.collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
sourcepub fn reduce<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
pub fn reduce<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
Perform the reduction operation separately for each key.
Note that there is a difference between stream.group_by(keyer).reduce(...)
and
stream.group_by_reduce(keyer, ...)
. The first performs the network shuffle of every item in
the stream, and later performs the reduction (i.e. nearly all the elements will be sent to
the network). The latter avoids sending the items by performing first a local reduction on
each host, and then send only the locally reduced results (i.e. one message per replica, per
key); then the global step is performed aggregating the results.
The resulting stream will still be keyed and will contain only a single message per key (the final result).
Note that the output type must be the same as the input type, if you need a different type
consider using KeyedStream::fold
.
Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
let res = s
.reduce(|acc, value| *acc += value)
.collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
sourcepub fn map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
Map the elements of the stream into new elements.
Note: this is very similar to Iteartor::map
.
§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
let res = s.map(|(_key, n)| 10 * n).collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 20), (0, 40), (1, 10), (1, 30)]);
sourcepub fn reorder(self) -> KeyedStream<impl Operator<Out = (K, I)>>
pub fn reorder(self) -> KeyedStream<impl Operator<Out = (K, I)>>
§TODO
Reorder timestamped items
sourcepub fn rich_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn rich_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
Map the elements of the stream into new elements. The mapping function can be stateful.
This is exactly like Stream::rich_map
, but the function is cloned for each key. This
means that each key will have a unique mapping function (and therefore a unique state).
sourcepub fn rich_flat_map<O, It, F>(
self,
f: F
) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn rich_flat_map<O, It, F>( self, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
Apply a mapping operation to each element of the stream, the resulting stream will be the flattened values of the result of the mapping. The mapping function can be stateful.
This is exactly like Stream::rich_flat_map
, but the function is cloned for each key.
This means that each key will have a unique mapping function (and therefore a unique state).
sourcepub fn rich_filter_map<O, F>(
self,
f: F
) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn rich_filter_map<O, F>( self, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
Remove from the stream all the elements for which the provided function returns None
and
keep the elements that returned Some(_)
. The mapping function can be stateful.
This is exactly like Stream::rich_filter_map
, but the function is cloned for each key.
This means that each key will have a unique mapping function (and therefore a unique state).
sourcepub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
Map the elements of the stream into new elements. The mapping function can be stateful.
This is exactly like Stream::rich_map
, but the function is cloned for each key. This
means that each key will have a unique mapping function (and therefore a unique state).
sourcepub fn unkey(self) -> Stream<impl Operator<Out = (K, I)>>
pub fn unkey(self) -> Stream<impl Operator<Out = (K, I)>>
Make this KeyedStream
a normal Stream
of key-value pairs.
§Example
let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
let res = stream.unkey().collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1), (1, 3)]);
sourcepub fn drop_key(self) -> Stream<impl Operator<Out = I>>
pub fn drop_key(self) -> Stream<impl Operator<Out = I>>
Forget about the key of this KeyedStream
and return a Stream
containing just the
values.
§Example
let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
let res = stream.drop_key().collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, (0..4).collect::<Vec<_>>());
source§impl<K, I, Op> KeyedStream<Op>
impl<K, I, Op> KeyedStream<Op>
sourcepub fn interval_join<I2, Op2>(
self,
right: KeyedStream<Op2>,
lower_bound: Timestamp,
upper_bound: Timestamp
) -> KeyedStream<impl Operator<Out = (K, (I, I2))>>
pub fn interval_join<I2, Op2>( self, right: KeyedStream<Op2>, lower_bound: Timestamp, upper_bound: Timestamp ) -> KeyedStream<impl Operator<Out = (K, (I, I2))>>
Given two streams with timestamps join them according to an interval centered around the timestamp of the left side.
This means that an element on the left side with timestamp T will be joined to all the
elements on the right with timestamp Q such that T - lower_bound <= Q <= T + upper_bound
.
Only items with the same key can be joined together.
Note: this operator will split the current block.
§Example
TODO: example
sourcepub fn merge<Op2>(
self,
oth: KeyedStream<Op2>
) -> KeyedStream<impl Operator<Out = (K, I)>>
pub fn merge<Op2>( self, oth: KeyedStream<Op2> ) -> KeyedStream<impl Operator<Out = (K, I)>>
Merge the items of this stream with the items of another stream with the same type.
Note: the order of the resulting items is not specified.
Note: this operator will split the current block.
§Example
let s1 = env.stream_iter(0..3).group_by(|&n| n % 2);
let s2 = env.stream_iter(3..5).group_by(|&n| n % 2);
let res = s1.merge(s2).collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
sourcepub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>>
pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>>
Perform a network shuffle sending the messages to a random replica.
This operator returns a Stream
instead of a KeyedStream
as after
shuffling the messages between replicas, the keyed semantics are lost.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..5);
let res = s.shuffle();
sourcepub fn collect_channel(self) -> Receiver<(K, I)>
pub fn collect_channel(self) -> Receiver<(K, I)>
Close the stream and send resulting items to a channel on a single host.
If the stream is distributed among multiple replicas, parallelism will be set to 1 to gather all results
Note: the order of items and keys is unspecified.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..10u32);
let rx = s.collect_channel();
env.execute_blocking();
let mut v = Vec::new();
while let Ok(x) = rx.recv() {
v.push(x)
}
assert_eq!(v, (0..10u32).collect::<Vec<_>>());
sourcepub fn collect_channel_parallel(self) -> Receiver<(K, I)>
pub fn collect_channel_parallel(self) -> Receiver<(K, I)>
Close the stream and send resulting items to a channel on each single host.
Each host sends its outputs to the channel without repartitioning. Elements will be sent to the channel on the same host that produced the output.
Note: the order of items and keys is unspecified.
§Example
let s = env.stream_iter(0..10u32);
let rx = s.collect_channel();
env.execute_blocking();
let mut v = Vec::new();
while let Ok(x) = rx.recv() {
v.push(x)
}
assert_eq!(v, (0..10u32).collect::<Vec<_>>());
sourcepub fn collect_vec(self) -> StreamOutput<Vec<(K, I)>>
pub fn collect_vec(self) -> StreamOutput<Vec<(K, I)>>
Close the stream and store all the resulting items into a Vec
on a single host.
If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.
Note: the collected items are the pairs (key, value)
.
Note: the order of items and keys is unspecified.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
sourcepub fn collect_vec_all(self) -> StreamOutput<Vec<(K, I)>>
pub fn collect_vec_all(self) -> StreamOutput<Vec<(K, I)>>
Close the stream and store all the resulting items into replicated Vec
on all hosts.
If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.
Note: the collected items are the pairs (key, value)
.
Note: the order of items and keys is unspecified.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec_all();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
sourcepub fn collect<C: FromIterator<(K, I)> + Send + 'static>(
self
) -> StreamOutput<C>
pub fn collect<C: FromIterator<(K, I)> + Send + 'static>( self ) -> StreamOutput<C>
Close the stream and store all the resulting items into a collection on a single host.
If the stream is distributed among multiple replicas, parallelism will be set to 1 to gather all results
Note: the order of items and keys is unspecified.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
sourcepub fn collect_all<C: FromIterator<(K, I)> + Send + 'static>(
self
) -> StreamOutput<C>
pub fn collect_all<C: FromIterator<(K, I)> + Send + 'static>( self ) -> StreamOutput<C>
Close the stream and store all the resulting items into a collection on each single host.
Partitioning will be set to Host and results will be replicated
Note: the order of items and keys is unspecified.
Note: this operator will split the current block.
§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
source§impl<K, I, O, It, Op> KeyedStream<Op>
impl<K, I, O, It, Op> KeyedStream<Op>
sourcepub fn flatten(self) -> KeyedStream<impl Operator<Out = (K, O)>>
pub fn flatten(self) -> KeyedStream<impl Operator<Out = (K, O)>>
Transform this stream of containers into a stream of all the contained values.
Note: this is very similar to Iteartor::flatten
§Example
let s = env
.stream_iter((vec![
vec![0, 1, 2],
vec![3, 4, 5],
vec![6, 7]
].into_iter()))
.group_by(|v| v[0] % 2);
let res = s.flatten().collect_vec();
env.execute_blocking();
let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 1), (0, 2), (0, 6), (0, 7), (1, 3), (1, 4), (1, 5)]);