Struct renoir::WindowedStream

source ·
pub struct WindowedStream<Op, O: Data, WinDescr>
where Op: Operator, Op::Out: KeyedItem, WinDescr: WindowDescription<<Op::Out as KeyedItem>::Value>,
{ /* private fields */ }
Expand description

A WindowedStream is a data stream partitioned by Key, where elements of each partition are divided in groups called windows. Each element can be assigned to one or multiple windows.

Windows are handled independently for each partition of the stream. Each partition may be processed in parallel.

The windowing logic is implemented through 3 traits:

  • A WindowDescription contains the parameters and logic that characterize the windowing strategy, when given a WindowAccumulator it instantiates a WindowManager.
  • A WindowManger is the struct responsible for creating the windows and forwarding the input elements to the correct window which will should pass it to its WindowAccumulator.
  • A WindowAccumulator contains the logic that should be applied to the elements of each window.

There are a set of provided window descriptions with their respective managers:

Implementations§

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn fold<NewOut: Data, F>( self, init: NewOut, fold: F ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
where F: FnMut(&mut NewOut, Out) + Clone + Send + 'static,

Folds the elements of each window into an accumulator value

fold() takes two arguments: the initial value of the accumulator and a closure used to accumulate the elements of each window.

The closure is called once for each element of each window with two arguments: a mutable reference to the accumulator and the element of the window. The closure should modify the accumulator, without returning anything.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by(|&n| n % 2)
    .window(CountWindow::tumbling(2))
    .fold(1, |acc, n| *acc *= n)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 * 2), (1, 1 * 3)]);
source

pub fn fold_first<F>( self, fold: F ) -> KeyedStream<impl Operator<Out = (Key, Out)>>
where F: FnMut(&mut Out, Out) + Clone + Send + 'static,

Folds the elements of each window into an accumulator value, starting with the first value

TODO DOCS

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data + Ord,

source

pub fn map<NewOut: Data, F: Fn(Vec<Out>) -> NewOut + Send + Clone + 'static>( self, f: F ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>

Prefer other aggregators if possible as they don’t save all elements

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn count(self) -> KeyedStream<impl Operator<Out = (Key, usize)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data + Ord,

source

pub fn max(self) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn max_by_key<K: Ord, F: Fn(&Out) -> K + Clone + Send + 'static>( self, get_key: F ) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source

pub fn max_by<F: Fn(&Out, &Out) -> Ordering + Clone + Send + 'static>( self, compare: F ) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data + Ord,

source

pub fn min(self) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn min_by_key<K: Ord, F: Fn(&Out) -> K + Clone + Send + 'static>( self, get_key: F ) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source

pub fn min_by<F: Fn(&Out, &Out) -> Ordering + Clone + Send + 'static>( self, compare: F ) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn first(self) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn last(self) -> KeyedStream<impl Operator<Out = (Key, Out)>>

source§

impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where WindowDescr: WindowDescription<Out>, OperatorChain: Operator<Out = (Key, Out)> + 'static, Key: DataKey, Out: Data,

source

pub fn sum<NewOut: Data + Default + AddAssign<Out>>( self ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>

Auto Trait Implementations§

§

impl<Op, O, WinDescr> !RefUnwindSafe for WindowedStream<Op, O, WinDescr>

§

impl<Op, O, WinDescr> Send for WindowedStream<Op, O, WinDescr>
where WinDescr: Send,

§

impl<Op, O, WinDescr> Sync for WindowedStream<Op, O, WinDescr>
where O: Sync, Op: Sync, WinDescr: Sync,

§

impl<Op, O, WinDescr> Unpin for WindowedStream<Op, O, WinDescr>
where O: Unpin, Op: Unpin, WinDescr: Unpin,

§

impl<Op, O, WinDescr> !UnwindSafe for WindowedStream<Op, O, WinDescr>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more