renoir/operator/window/aggr/
sum.rs1use std::ops::AddAssign;
2
3use super::{super::*, Fold};
4use crate::operator::{Data, DataKey, Operator};
5use crate::stream::{KeyedStream, WindowedStream};
6
7impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
8where
9 WindowDescr: WindowDescription<Out>,
10 OperatorChain: Operator<Out = (Key, Out)> + 'static,
11 Key: DataKey,
12 Out: Data,
13{
14 pub fn sum<NewOut: Data + Default + AddAssign<Out>>(
15 self,
16 ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>> {
17 let acc = Fold::new(NewOut::default(), |sum, x: &Out| *sum += x.clone());
18 self.add_window_operator("WindowSum", acc)
19 }
20}