Struct renoir::KeyedStream

source ·
pub struct KeyedStream<OperatorChain>(pub Stream<OperatorChain>)
where
    OperatorChain: Operator,
    OperatorChain::Out: KeyedItem;
Expand description

A KeyedStream is like a set of Streams, each of which partitioned by some Key. Internally it’s just a stream whose elements are (K, V) pairs and the operators behave following the KeyedStream semantics.

The type of the Key must be a valid key inside an hashmap.

Tuple Fields§

§0: Stream<OperatorChain>

Implementations§

source§

impl<Key: ExchangeDataKey, In: ExchangeData + Default, OperatorChain> KeyedStream<OperatorChain>
where OperatorChain: Operator<Out = (Key, In)> + 'static,

source

pub fn delta_iterate<U: ExchangeData, D: ExchangeData, O: ExchangeData, Body, BodyOperator>( self, num_iterations: usize, process_delta: impl Fn(&Key, &mut In, D) + Clone + Send + 'static, make_update: impl Fn(&Key, &mut In) -> U + Clone + Send + 'static, make_output: impl Fn(&Key, In) -> O + Clone + Send + 'static, condition: impl Fn(&D) -> bool + Clone + Send + 'static, body: Body ) -> KeyedStream<impl Operator<Out = (Key, O)>>
where Body: FnOnce(KeyedStream<DeltaIterate<Key, In, U, D, O>>) -> KeyedStream<BodyOperator> + 'static, BodyOperator: Operator<Out = (Key, D)> + 'static,

TODO DOCS

source§

impl<K: DataKey + ExchangeData + Debug, V1: Data + ExchangeData + Debug, O1> KeyedStream<O1>
where O1: Operator<Out = (K, V1)> + 'static,

source

pub fn join_outer<V2: Data + ExchangeData + Debug, O2>( self, rhs: KeyedStream<O2> ) -> KeyedStream<impl Operator<Out = (K, (Option<V1>, Option<V2>))>>
where O2: Operator<Out = (K, V2)> + 'static,

source

pub fn join<V2: Data + ExchangeData + Debug, O2>( self, rhs: KeyedStream<O2> ) -> KeyedStream<impl Operator<Out = (K, (V1, V2))>>
where O2: Operator<Out = (K, V2)> + 'static,

source§

impl<Key, Out, OperatorChain> KeyedStream<OperatorChain>
where OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: ExchangeData + DataKey, Out: ExchangeData,

source

pub fn window_join<Out2, OperatorChain2, WindowDescr>( self, descr: WindowDescr, right: KeyedStream<OperatorChain2> ) -> KeyedStream<impl Operator<Out = (Key, (Out, Out2))>>
where OperatorChain2: Operator<Out = (Key, Out2)> + 'static, Out2: ExchangeData, WindowDescr: WindowDescription<MergeElement<Out, Out2>> + 'static,

source§

impl<Key: DataKey, Out: Data, OperatorChain> KeyedStream<OperatorChain>
where OperatorChain: Operator<Out = (Key, Out)> + 'static,

source

pub fn window<WinOut: Data, WinDescr: WindowDescription<Out>>( self, descr: WinDescr ) -> WindowedStream<impl Operator<Out = (Key, Out)>, WinOut, WinDescr>

Apply a window to the stream.

Returns a WindowedStream, with windows created following the behavior specified by the passed WindowDescription.

§Example
let s = env.stream_iter(0..9);
let res = s
    .group_by(|&n| n % 2)
    .window(CountWindow::sliding(3, 2))
    .sum()
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (0, 4 + 6 + 8), (1, 1 + 3 + 5)]);
source§

impl<Op, K, I> KeyedStream<Op>
where K: DataKey, I: Send + 'static, Op: Operator<Out = (K, I)> + 'static,

source

pub fn add_timestamps<F, G>( self, timestamp_gen: F, watermark_gen: G ) -> KeyedStream<impl Operator<Out = Op::Out>>
where F: FnMut(&Op::Out) -> Timestamp + Clone + Send + 'static, G: FnMut(&Op::Out, &Timestamp) -> Option<Timestamp> + Clone + Send + 'static,

Given a keyed stream without timestamps nor watermarks, tag each item with a timestamp and insert watermarks.

The two functions given to this operator are the following:

  • timestamp_gen returns the timestamp assigned to the provided element of the stream
  • watermark_gen returns an optional watermark to add after the provided element

Note that the two functions must follow the watermark semantics. TODO: link to watermark semantics

§Example

In this example the stream contains the integers from 0 to 9 and group them by parity, each will be tagged with a timestamp with the value of the item as milliseconds, and after each even number a watermark will be inserted.

use renoir::operator::Timestamp;

let s = env.stream_iter(0..10);
s
    .group_by(|i| i % 2)
    .add_timestamps(
    |&(_k, n)| n,
    |&(_k, n), &ts| if n % 2 == 0 { Some(ts) } else { None }
);
source

pub fn drop_timestamps(self) -> KeyedStream<impl Operator<Out = Op::Out>>

source

pub fn batch_mode(self, batch_mode: BatchMode) -> Self

Change the batch mode for this stream.

This change will be propagated to all the operators following, even of the next blocks, until it’s changed again.

§Example
use renoir::BatchMode;

let s = env.stream_iter(0..10).group_by(|&n| n % 2);
s.batch_mode(BatchMode::fixed(1024));
source

pub fn filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
where F: Fn((&K, I)) -> Option<O> + Send + Clone + 'static, O: Send + 'static,

Remove from the stream all the elements for which the provided function returns None and keep the elements that returned Some(_).

Note: this is very similar to Iteartor::filter_map

§Example
let s = env.stream_iter(0..10).group_by(|&n| n % 2);
let res = s.filter_map(|(_key, n)| if n % 3 == 0 { Some(n * 4) } else { None }).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 24), (1, 12), (1, 36)]);
source

pub fn filter<F>(self, predicate: F) -> KeyedStream<impl Operator<Out = (K, I)>>
where F: Fn(&(K, I)) -> bool + Clone + Send + 'static,

Remove from the stream all the elements for which the provided predicate returns false.

Note: this is very similar to Iteartor::filter

§Example
let s = env.stream_iter(0..10).group_by(|&n| n % 2);
let res = s.filter(|&(_key, n)| n % 3 == 0).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 6), (1, 3), (1, 9)]);
source

pub fn flat_map<O, It, F>( self, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
where It: IntoIterator<Item = O> + 'static, <It as IntoIterator>::IntoIter: Send + 'static, F: Fn(Op::Out) -> It + Send + Clone + 'static, O: Data,

Apply a mapping operation to each element of the stream, the resulting stream will be the flatMaped values of the result of the mapping.

Note: this is very similar to Iteartor::flat_map.

§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.flat_map(|(_key, n)| vec![n, n]).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 0), (0, 2), (0, 2), (1, 1), (1, 1)]);
source

pub fn inspect<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
where F: FnMut(&(K, I)) + Send + Clone + 'static,

Apply the given function to all the elements of the stream, consuming the stream.

§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
s.inspect(|(key, n)| println!("Item: {} has key {}", n, key)).for_each(std::mem::drop);

env.execute_blocking();
source

pub fn fold<O, F>( self, init: O, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
where F: Fn(&mut O, <Op::Out as KeyedItem>::Value) + Send + Clone + 'static, O: Send + Clone,

Perform the folding operation separately for each key.

Note that there is a difference between stream.group_by(keyer).fold(...) and stream.group_by_fold(keyer, ...). The first performs the network shuffle of every item in the stream, and later performs the folding (i.e. nearly all the elements will be sent to the network). The latter avoids sending the items by performing first a local reduction on each host, and then send only the locally folded results (i.e. one message per replica, per key); then the global step is performed aggregating the results.

The resulting stream will still be keyed and will contain only a single message per key (the final result).

Note that the output type may be different from the input type. Consider using KeyedStream::reduce if the output type is the same as the input type.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
let res = s
    .fold(0, |acc, value| *acc += value)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
source

pub fn reduce<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
where I: Clone + 'static, F: Fn(&mut I, I) + Send + Clone + 'static,

Perform the reduction operation separately for each key.

Note that there is a difference between stream.group_by(keyer).reduce(...) and stream.group_by_reduce(keyer, ...). The first performs the network shuffle of every item in the stream, and later performs the reduction (i.e. nearly all the elements will be sent to the network). The latter avoids sending the items by performing first a local reduction on each host, and then send only the locally reduced results (i.e. one message per replica, per key); then the global step is performed aggregating the results.

The resulting stream will still be keyed and will contain only a single message per key (the final result).

Note that the output type must be the same as the input type, if you need a different type consider using KeyedStream::fold.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
let res = s
    .reduce(|acc, value| *acc += value)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
source

pub fn map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
where F: Fn((&K, I)) -> O + Send + Clone + 'static, O: Send,

Map the elements of the stream into new elements.

Note: this is very similar to Iteartor::map.

§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
let res = s.map(|(_key, n)| 10 * n).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 20), (0, 40), (1, 10), (1, 30)]);
source

pub fn reorder(self) -> KeyedStream<impl Operator<Out = (K, I)>>

§TODO

Reorder timestamped items

source

pub fn rich_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
where F: FnMut((&K, I)) -> O + Clone + Send + 'static, O: Data,

Map the elements of the stream into new elements. The mapping function can be stateful.

This is exactly like Stream::rich_map, but the function is cloned for each key. This means that each key will have a unique mapping function (and therefore a unique state).

source

pub fn rich_flat_map<O, It, F>( self, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
where It: IntoIterator<Item = O> + Data, <It as IntoIterator>::IntoIter: Clone + Send + 'static, F: FnMut((&K, I)) -> It + Clone + Send + 'static, O: Data,

Apply a mapping operation to each element of the stream, the resulting stream will be the flattened values of the result of the mapping. The mapping function can be stateful.

This is exactly like Stream::rich_flat_map, but the function is cloned for each key. This means that each key will have a unique mapping function (and therefore a unique state).

source

pub fn rich_filter_map<O, F>( self, f: F ) -> KeyedStream<impl Operator<Out = (K, O)>>
where F: FnMut((&K, I)) -> Option<O> + Send + Clone + 'static, O: Data,

Remove from the stream all the elements for which the provided function returns None and keep the elements that returned Some(_). The mapping function can be stateful.

This is exactly like Stream::rich_filter_map, but the function is cloned for each key. This means that each key will have a unique mapping function (and therefore a unique state).

source

pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
where F: FnMut(ElementGenerator<'_, Op>) -> StreamElement<O> + Clone + Send + 'static, O: Data,

Map the elements of the stream into new elements. The mapping function can be stateful.

This is exactly like Stream::rich_map, but the function is cloned for each key. This means that each key will have a unique mapping function (and therefore a unique state).

source

pub fn unkey(self) -> Stream<impl Operator<Out = (K, I)>>

Make this KeyedStream a normal Stream of key-value pairs.

§Example
let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
let res = stream.unkey().collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1), (1, 3)]);
source

pub fn drop_key(self) -> Stream<impl Operator<Out = I>>

Forget about the key of this KeyedStream and return a Stream containing just the values.

§Example
let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
let res = stream.drop_key().collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, (0..4).collect::<Vec<_>>());
source

pub fn for_each<F>(self, f: F)
where F: FnMut((K, I)) + Send + Clone + 'static,

Apply the given function to all the elements of the stream, consuming the stream.

§Example
let s = env.stream_iter(0..5).group_by(|&n| n % 2);
s.for_each(|(key, n)| println!("Item: {} has key {}", n, key));

env.execute_blocking();
source§

impl<K, I, Op> KeyedStream<Op>
where Op: Operator<Out = (K, I)> + 'static, K: ExchangeDataKey, I: ExchangeData,

source

pub fn interval_join<I2, Op2>( self, right: KeyedStream<Op2>, lower_bound: Timestamp, upper_bound: Timestamp ) -> KeyedStream<impl Operator<Out = (K, (I, I2))>>
where I2: ExchangeData, Op2: Operator<Out = (K, I2)> + 'static,

Given two streams with timestamps join them according to an interval centered around the timestamp of the left side.

This means that an element on the left side with timestamp T will be joined to all the elements on the right with timestamp Q such that T - lower_bound <= Q <= T + upper_bound. Only items with the same key can be joined together.

Note: this operator will split the current block.

§Example

TODO: example

source

pub fn merge<Op2>( self, oth: KeyedStream<Op2> ) -> KeyedStream<impl Operator<Out = (K, I)>>
where Op2: Operator<Out = (K, I)> + 'static,

Merge the items of this stream with the items of another stream with the same type.

Note: the order of the resulting items is not specified.

Note: this operator will split the current block.

§Example
let s1 = env.stream_iter(0..3).group_by(|&n| n % 2);
let s2 = env.stream_iter(3..5).group_by(|&n| n % 2);
let res = s1.merge(s2).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
source

pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>>

Perform a network shuffle sending the messages to a random replica.

This operator returns a Stream instead of a KeyedStream as after shuffling the messages between replicas, the keyed semantics are lost.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s.shuffle();
source

pub fn collect_channel(self) -> Receiver<(K, I)>

Close the stream and send resulting items to a channel on a single host.

If the stream is distributed among multiple replicas, parallelism will be set to 1 to gather all results

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10u32);
let rx = s.collect_channel();

env.execute_blocking();
let mut v = Vec::new();
while let Ok(x) = rx.recv() {
    v.push(x)
}
assert_eq!(v, (0..10u32).collect::<Vec<_>>());
source

pub fn collect_channel_parallel(self) -> Receiver<(K, I)>

Close the stream and send resulting items to a channel on each single host.

Each host sends its outputs to the channel without repartitioning. Elements will be sent to the channel on the same host that produced the output.

Note: the order of items and keys is unspecified.

§Example
let s = env.stream_iter(0..10u32);
let rx = s.collect_channel();

env.execute_blocking();
let mut v = Vec::new();
while let Ok(x) = rx.recv() {
    v.push(x)
}
assert_eq!(v, (0..10u32).collect::<Vec<_>>());
source

pub fn collect_vec(self) -> StreamOutput<Vec<(K, I)>>

Close the stream and store all the resulting items into a Vec on a single host.

If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.

Note: the collected items are the pairs (key, value).

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
source

pub fn collect_vec_all(self) -> StreamOutput<Vec<(K, I)>>

Close the stream and store all the resulting items into replicated Vec on all hosts.

If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.

Note: the collected items are the pairs (key, value).

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec_all();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
source

pub fn collect<C: FromIterator<(K, I)> + Send + 'static>( self ) -> StreamOutput<C>

Close the stream and store all the resulting items into a collection on a single host.

If the stream is distributed among multiple replicas, parallelism will be set to 1 to gather all results

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
source

pub fn collect_all<C: FromIterator<(K, I)> + Send + 'static>( self ) -> StreamOutput<C>

Close the stream and store all the resulting items into a collection on each single host.

Partitioning will be set to Host and results will be replicated

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..3).group_by(|&n| n % 2);
let res = s.collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable(); // the output order is nondeterministic
assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
source§

impl<K, I, O, It, Op> KeyedStream<Op>
where K: DataKey, Op: Operator<Out = (K, I)> + 'static, It: Iterator<Item = O> + Clone + Send + 'static, I: Data + IntoIterator<IntoIter = It, Item = It::Item>, O: Data + Clone,

source

pub fn flatten(self) -> KeyedStream<impl Operator<Out = (K, O)>>

Transform this stream of containers into a stream of all the contained values.

Note: this is very similar to Iteartor::flatten

§Example
let s = env
    .stream_iter((vec![
        vec![0, 1, 2],
        vec![3, 4, 5],
        vec![6, 7]
    ].into_iter()))
    .group_by(|v| v[0] % 2);
let res = s.flatten().collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 1), (0, 2), (0, 6), (0, 7), (1, 3), (1, 4), (1, 5)]);

Auto Trait Implementations§

§

impl<OperatorChain> !RefUnwindSafe for KeyedStream<OperatorChain>

§

impl<OperatorChain> Send for KeyedStream<OperatorChain>

§

impl<OperatorChain> Sync for KeyedStream<OperatorChain>
where OperatorChain: Sync,

§

impl<OperatorChain> Unpin for KeyedStream<OperatorChain>
where OperatorChain: Unpin,

§

impl<OperatorChain> !UnwindSafe for KeyedStream<OperatorChain>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more