renoir/operator/window/aggr/
min.rs

1use std::cmp::Ordering;
2
3use super::{super::*, FoldFirst};
4use crate::operator::{Data, DataKey, Operator};
5use crate::stream::{KeyedStream, WindowedStream};
6
7impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
8where
9    WindowDescr: WindowDescription<Out>,
10    OperatorChain: Operator<Out = (Key, Out)> + 'static,
11    Key: DataKey,
12    Out: Data + Ord,
13{
14    pub fn min(self) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
15        let acc = FoldFirst::<Out, _>::new(|min, x| {
16            if *x < *min {
17                *min = x.clone()
18            }
19        });
20        self.add_window_operator("WindowMin", acc)
21    }
22}
23
24impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
25where
26    WindowDescr: WindowDescription<Out>,
27    OperatorChain: Operator<Out = (Key, Out)> + 'static,
28    Key: DataKey,
29    Out: Data,
30{
31    pub fn min_by_key<K: Ord, F: Fn(&Out) -> K + Clone + Send + 'static>(
32        self,
33        get_key: F,
34    ) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
35        let acc = FoldFirst::<Out, _>::new(move |min, x| {
36            if (get_key)(x) < (get_key)(min) {
37                *min = x.clone()
38            }
39        });
40        self.add_window_operator("WindowMin", acc)
41    }
42
43    pub fn min_by<F: Fn(&Out, &Out) -> Ordering + Clone + Send + 'static>(
44        self,
45        compare: F,
46    ) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
47        let acc = FoldFirst::<Out, _>::new(move |min, x| {
48            if (compare)(x, min).is_lt() {
49                *min = x.clone()
50            }
51        });
52        self.add_window_operator("WindowMin", acc)
53    }
54}