renoir/operator/window/
mod.rs

1//! The types related to the windowed streams.
2
3use std::collections::{HashMap, VecDeque};
4use std::fmt::Display;
5use std::marker::PhantomData;
6
7pub use descr::*;
8// pub use aggregator::*;
9// pub use description::*;
10
11use crate::block::{GroupHasherBuilder, OperatorStructure, Replication};
12use crate::operator::{Data, DataKey, ExchangeData, Operator, StreamElement, Timestamp};
13use crate::stream::{KeyedStream, Stream, WindowedStream};
14
15mod aggr;
16mod descr;
17
18/// Trait for a window description that can be used to instantiate windows.
19/// The struct implementing this trait specifies the kind of [`WindowManager`] that will be instantiated by
20/// it and provides a method through which the
21/// Convention: WindowAccumulator expects output to be called after at least one element has been processed.
22/// Violating this convention may result in panics.
23pub trait WindowDescription<T> {
24    /// WindowManager corresponding to the WindowDescription
25    type Manager<A: WindowAccumulator<In = T>>: WindowManager<In = T, Out = A::Out> + 'static;
26    /// Build a window manager that dispatches elements of each window to a clone of the
27    /// accumulator passed as parameter
28    fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A>;
29}
30
31/// Trait for operations that can be performed on windows. Operations must be incremental
32/// processing on element at a time. If the operation requires accessing elements in random
33/// order, they should first be collected then the operation can be finizalized on the collection
34///
35/// Convention: output will always be called after at least one element has been processed
36pub trait WindowAccumulator: Clone + Send + 'static {
37    type In: Data;
38    type Out: Data;
39
40    /// Process a single input element updating the state of the accumulator
41    fn process(&mut self, el: &Self::In);
42    /// Finalize the accumulator and produce a result
43    fn output(self) -> Self::Out;
44}
45
46#[derive(Clone)]
47pub(crate) struct KeyedWindowManager<Key, In, Out, W: WindowManager> {
48    windows: HashMap<Key, W, GroupHasherBuilder>,
49    init: W,
50    _in: PhantomData<In>,
51    _out: PhantomData<Out>,
52}
53
54/// Window Managers handle the windowing logic for a single partition (group) of the stream.
55///
56/// Elements passing on the stream partition will be fed to the manager through the `process`
57/// method, the manager should then instantiate any new window if needed depending on its
58/// logic and on the element passed as input. The input elements should be forwarded to any
59/// relevant active window and the outputs for any window that has been closed after the
60/// input event should be output as return value of the `process` function.
61pub trait WindowManager: Clone + Send {
62    /// Type of the input elements
63    type In: Data;
64    /// Type of the output produced by each window
65    type Out: Data;
66    /// Type of the output of a call to `process`, it may be
67    /// a single [`WindowResult`] or an iterable collection, depending
68    /// on the windowing logic. (A single input may trigger the simultanous closure of
69    /// multiple windows)
70    type Output: IntoIterator<Item = WindowResult<Self::Out>>;
71    /// Process an input element updating any interest window.
72    /// Output the results that have become ready after processing this element.
73    fn process(&mut self, el: StreamElement<Self::In>) -> Self::Output;
74    /// Return true if the manager has no active windows and can be dropped
75    fn recycle(&self) -> bool {
76        false
77    }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum WindowResult<T> {
82    Item(T),
83    Timestamped(T, Timestamp),
84}
85
86impl<T> WindowResult<T> {
87    #[inline]
88    pub fn new(item: T, timestamp: Option<Timestamp>) -> Self {
89        match timestamp {
90            Some(ts) => WindowResult::Timestamped(item, ts),
91            None => WindowResult::Item(item),
92        }
93    }
94
95    #[inline]
96    pub fn item(&self) -> &T {
97        match self {
98            WindowResult::Item(item) => item,
99            WindowResult::Timestamped(item, _) => item,
100        }
101    }
102
103    #[inline]
104    pub fn unwrap_item(self) -> T {
105        match self {
106            WindowResult::Item(item) => item,
107            WindowResult::Timestamped(item, _) => item,
108        }
109    }
110}
111
112impl<T> From<WindowResult<T>> for StreamElement<T> {
113    #[inline]
114    fn from(value: WindowResult<T>) -> Self {
115        match value {
116            WindowResult::Item(item) => StreamElement::Item(item),
117            WindowResult::Timestamped(item, ts) => StreamElement::Timestamped(item, ts),
118        }
119    }
120}
121
122/// This operator abstracts the window logic as an operator and delegates to the
123/// `KeyedWindowManager` and a `ProcessFunc` the job of building and processing the windows,
124/// respectively.
125#[derive(Clone)]
126pub(crate) struct WindowOperator<Key, In, Out, Prev, W>
127where
128    W: WindowManager,
129{
130    /// The previous operators in the chain.
131    prev: Prev,
132    /// The name of the actual operator that this one abstracts.
133    ///
134    /// It is used only for tracing purposes.
135    name: String,
136    /// The manager that will build the windows.
137    manager: KeyedWindowManager<Key, In, Out, W>,
138    /// A buffer for storing ready items.
139    output_buffer: VecDeque<StreamElement<(Key, Out)>>,
140}
141
142impl<Key, In, Out, Prev, W> Display for WindowOperator<Key, In, Out, Prev, W>
143where
144    W: WindowManager,
145    Prev: Display,
146{
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        write!(
149            f,
150            "{} -> {} -> WindowOperator[{}]<{}>",
151            self.prev,
152            std::any::type_name::<W>(),
153            self.name,
154            std::any::type_name::<Out>(),
155        )
156    }
157}
158
159impl<Key, In, Out, Prev, W> Operator for WindowOperator<Key, In, Out, Prev, W>
160where
161    W: WindowManager<In = In, Out = Out> + Send,
162    Prev: Operator<Out = (Key, In)>,
163    Key: DataKey,
164    In: Data,
165    Out: Data,
166{
167    type Out = (Key, Out);
168
169    fn setup(&mut self, metadata: &mut crate::ExecutionMetadata) {
170        self.prev.setup(metadata);
171    }
172
173    fn next(&mut self) -> StreamElement<(Key, Out)> {
174        loop {
175            if let Some(item) = self.output_buffer.pop_front() {
176                return item;
177            }
178
179            let el = self.prev.next();
180            match el {
181                el @ (StreamElement::Item(_) | StreamElement::Timestamped(_, _)) => {
182                    let (key, el) = el.take_key();
183                    let key = key.unwrap();
184
185                    let mgr = self
186                        .manager
187                        .windows
188                        .entry(key.clone())
189                        .or_insert_with(|| self.manager.init.clone());
190
191                    let ret = mgr.process(el);
192                    self.output_buffer.extend(
193                        ret.into_iter()
194                            .map(|e| StreamElement::from(e).add_key(key.clone())),
195                    );
196                }
197                StreamElement::FlushBatch => return StreamElement::FlushBatch,
198                el => {
199                    let (_, el) = el.take_key();
200
201                    self.manager.windows.retain(|key, mgr| {
202                        let ret = mgr.process(el.clone());
203                        self.output_buffer.extend(
204                            ret.into_iter()
205                                .map(|e| StreamElement::from(e).add_key(key.clone())),
206                        );
207                        !mgr.recycle()
208                    });
209
210                    // Forward system messages and watermarks
211                    let msg = match el {
212                        StreamElement::Watermark(w) => StreamElement::Watermark(w),
213                        StreamElement::Terminate => StreamElement::Terminate,
214                        StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
215                        _ => unreachable!(),
216                    };
217                    self.output_buffer.push_back(msg);
218                }
219            }
220        }
221    }
222
223    fn structure(&self) -> crate::block::BlockStructure {
224        self.prev
225            .structure()
226            .add_operator(OperatorStructure::new::<(Key, Out), _>(&self.name))
227    }
228}
229
230impl<Key, In, Out, Prev, W> WindowOperator<Key, In, Out, Prev, W>
231where
232    W: WindowManager,
233{
234    pub(crate) fn new(
235        prev: Prev,
236        name: String,
237        manager: KeyedWindowManager<Key, In, Out, W>,
238    ) -> Self {
239        Self {
240            prev,
241            name,
242            manager,
243            output_buffer: Default::default(),
244        }
245    }
246}
247
248impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
249where
250    WindowDescr: WindowDescription<Out>,
251    OperatorChain: Operator<Out = (Key, Out)> + 'static,
252    Key: DataKey,
253    Out: Data,
254{
255    /// Add a new generic window operator to a `KeyedWindowedStream`,
256    /// after adding a Reorder operator.
257    /// This should be used by every custom window aggregator.
258    pub(crate) fn add_window_operator<A, NewOut>(
259        self,
260        name: &str,
261        accumulator: A,
262    ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
263    where
264        NewOut: Data,
265        A: WindowAccumulator<In = Out, Out = NewOut>,
266    {
267        let stream = self.inner;
268        let init = self.descr.build::<A>(accumulator);
269
270        let manager: KeyedWindowManager<Key, Out, NewOut, WindowDescr::Manager<A>> =
271            KeyedWindowManager {
272                windows: HashMap::default(),
273                init,
274                _in: PhantomData,
275                _out: PhantomData,
276            };
277
278        stream // .add_operator(Reorder::new)
279            .add_operator(|prev| WindowOperator::new(prev, name.into(), manager))
280    }
281}
282
283impl<Key: DataKey, Out: Data, OperatorChain> KeyedStream<OperatorChain>
284where
285    OperatorChain: Operator<Out = (Key, Out)> + 'static,
286{
287    /// Apply a window to the stream.
288    ///
289    /// Returns a [`WindowedStream`], with windows created following the behavior specified
290    /// by the passed [`WindowDescription`].
291    ///
292    /// ## Example
293    /// ```
294    /// # use renoir::{StreamContext, RuntimeConfig};
295    /// # use renoir::operator::source::IteratorSource;
296    /// # use renoir::operator::window::CountWindow;
297    /// # let mut env = StreamContext::new_local();
298    /// let s = env.stream_iter(0..9);
299    /// let res = s
300    ///     .group_by(|&n| n % 2)
301    ///     .window(CountWindow::sliding(3, 2))
302    ///     .sum()
303    ///     .collect_vec();
304    ///
305    /// env.execute_blocking();
306    ///
307    /// let mut res = res.get().unwrap();
308    /// res.sort_unstable();
309    /// assert_eq!(res, vec![(0, 0 + 2 + 4), (0, 4 + 6 + 8), (1, 1 + 3 + 5)]);
310    /// ```
311    pub fn window<WinOut: Data, WinDescr: WindowDescription<Out>>(
312        self,
313        descr: WinDescr,
314    ) -> WindowedStream<impl Operator<Out = (Key, Out)>, WinOut, WinDescr> {
315        WindowedStream {
316            inner: self,
317            descr,
318            _win_out: PhantomData,
319        }
320    }
321}
322
323impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
324where
325    OperatorChain: Operator<Out = Out> + 'static,
326{
327    /// Send all elements to a single node and apply a window to the stream.
328    ///
329    /// Returns a [`WindowedStream`], with key `()` with windows created following the behavior specified
330    /// by the passed [`WindowDescription`].
331    ///
332    /// **Note**: this operator cannot be parallelized, so all the stream elements are sent to a
333    /// single node where the creation and aggregation of the windows are done.
334    ///
335    /// ## Example
336    /// ```
337    /// # use renoir::{StreamContext, RuntimeConfig};
338    /// # use renoir::operator::source::IteratorSource;
339    /// # use renoir::operator::window::CountWindow;
340    /// # let mut env = StreamContext::new_local();
341    /// let s = env.stream_iter(0..5usize);
342    /// let res = s
343    ///     .window_all(CountWindow::tumbling(2))
344    ///     .sum::<usize>()
345    ///     .drop_key()
346    ///     .collect_vec();
347    ///
348    /// env.execute_blocking();
349    ///
350    /// let mut res = res.get().unwrap();
351    /// assert_eq!(res, vec![0 + 1, 2 + 3]);
352    /// ```
353    pub fn window_all<WinOut: Data, WinDescr: WindowDescription<Out>>(
354        self,
355        descr: WinDescr,
356    ) -> WindowedStream<impl Operator<Out = ((), Out)>, WinOut, WinDescr> {
357        // replication and key_by are used instead of group_by so that there is exactly one
358        // replica, since window_all cannot be parallelized
359        self.replication(Replication::new_one())
360            .key_by(|_| ())
361            .window(descr)
362    }
363}