renoir/operator/window/aggr/
sum.rs

1use std::ops::AddAssign;
2
3use super::{super::*, Fold};
4use crate::operator::{Data, DataKey, Operator};
5use crate::stream::{KeyedStream, WindowedStream};
6
7impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
8where
9    WindowDescr: WindowDescription<Out>,
10    OperatorChain: Operator<Out = (Key, Out)> + 'static,
11    Key: DataKey,
12    Out: Data,
13{
14    pub fn sum<NewOut: Data + Default + AddAssign<Out>>(
15        self,
16    ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>> {
17        let acc = Fold::new(NewOut::default(), |sum, x: &Out| *sum += x.clone());
18        self.add_window_operator("WindowSum", acc)
19    }
20}