renoir/operator/window/mod.rs
1//! The types related to the windowed streams.
2
3use std::collections::{HashMap, VecDeque};
4use std::fmt::Display;
5use std::marker::PhantomData;
6
7pub use descr::*;
8// pub use aggregator::*;
9// pub use description::*;
10
11use crate::block::{GroupHasherBuilder, OperatorStructure, Replication};
12use crate::operator::{Data, DataKey, ExchangeData, Operator, StreamElement, Timestamp};
13use crate::stream::{KeyedStream, Stream, WindowedStream};
14
15mod aggr;
16mod descr;
17
18/// Trait for a window description that can be used to instantiate windows.
19/// The struct implementing this trait specifies the kind of [`WindowManager`] that will be instantiated by
20/// it and provides a method through which the
21/// Convention: WindowAccumulator expects output to be called after at least one element has been processed.
22/// Violating this convention may result in panics.
23pub trait WindowDescription<T> {
24 /// WindowManager corresponding to the WindowDescription
25 type Manager<A: WindowAccumulator<In = T>>: WindowManager<In = T, Out = A::Out> + 'static;
26 /// Build a window manager that dispatches elements of each window to a clone of the
27 /// accumulator passed as parameter
28 fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A>;
29}
30
31/// Trait for operations that can be performed on windows. Operations must be incremental
32/// processing on element at a time. If the operation requires accessing elements in random
33/// order, they should first be collected then the operation can be finizalized on the collection
34///
35/// Convention: output will always be called after at least one element has been processed
36pub trait WindowAccumulator: Clone + Send + 'static {
37 type In: Data;
38 type Out: Data;
39
40 /// Process a single input element updating the state of the accumulator
41 fn process(&mut self, el: &Self::In);
42 /// Finalize the accumulator and produce a result
43 fn output(self) -> Self::Out;
44}
45
46#[derive(Clone)]
47pub(crate) struct KeyedWindowManager<Key, In, Out, W: WindowManager> {
48 windows: HashMap<Key, W, GroupHasherBuilder>,
49 init: W,
50 _in: PhantomData<In>,
51 _out: PhantomData<Out>,
52}
53
54/// Window Managers handle the windowing logic for a single partition (group) of the stream.
55///
56/// Elements passing on the stream partition will be fed to the manager through the `process`
57/// method, the manager should then instantiate any new window if needed depending on its
58/// logic and on the element passed as input. The input elements should be forwarded to any
59/// relevant active window and the outputs for any window that has been closed after the
60/// input event should be output as return value of the `process` function.
61pub trait WindowManager: Clone + Send {
62 /// Type of the input elements
63 type In: Data;
64 /// Type of the output produced by each window
65 type Out: Data;
66 /// Type of the output of a call to `process`, it may be
67 /// a single [`WindowResult`] or an iterable collection, depending
68 /// on the windowing logic. (A single input may trigger the simultanous closure of
69 /// multiple windows)
70 type Output: IntoIterator<Item = WindowResult<Self::Out>>;
71 /// Process an input element updating any interest window.
72 /// Output the results that have become ready after processing this element.
73 fn process(&mut self, el: StreamElement<Self::In>) -> Self::Output;
74 /// Return true if the manager has no active windows and can be dropped
75 fn recycle(&self) -> bool {
76 false
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum WindowResult<T> {
82 Item(T),
83 Timestamped(T, Timestamp),
84}
85
86impl<T> WindowResult<T> {
87 #[inline]
88 pub fn new(item: T, timestamp: Option<Timestamp>) -> Self {
89 match timestamp {
90 Some(ts) => WindowResult::Timestamped(item, ts),
91 None => WindowResult::Item(item),
92 }
93 }
94
95 #[inline]
96 pub fn item(&self) -> &T {
97 match self {
98 WindowResult::Item(item) => item,
99 WindowResult::Timestamped(item, _) => item,
100 }
101 }
102
103 #[inline]
104 pub fn unwrap_item(self) -> T {
105 match self {
106 WindowResult::Item(item) => item,
107 WindowResult::Timestamped(item, _) => item,
108 }
109 }
110}
111
112impl<T> From<WindowResult<T>> for StreamElement<T> {
113 #[inline]
114 fn from(value: WindowResult<T>) -> Self {
115 match value {
116 WindowResult::Item(item) => StreamElement::Item(item),
117 WindowResult::Timestamped(item, ts) => StreamElement::Timestamped(item, ts),
118 }
119 }
120}
121
122/// This operator abstracts the window logic as an operator and delegates to the
123/// `KeyedWindowManager` and a `ProcessFunc` the job of building and processing the windows,
124/// respectively.
125#[derive(Clone)]
126pub(crate) struct WindowOperator<Key, In, Out, Prev, W>
127where
128 W: WindowManager,
129{
130 /// The previous operators in the chain.
131 prev: Prev,
132 /// The name of the actual operator that this one abstracts.
133 ///
134 /// It is used only for tracing purposes.
135 name: String,
136 /// The manager that will build the windows.
137 manager: KeyedWindowManager<Key, In, Out, W>,
138 /// A buffer for storing ready items.
139 output_buffer: VecDeque<StreamElement<(Key, Out)>>,
140}
141
142impl<Key, In, Out, Prev, W> Display for WindowOperator<Key, In, Out, Prev, W>
143where
144 W: WindowManager,
145 Prev: Display,
146{
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 write!(
149 f,
150 "{} -> {} -> WindowOperator[{}]<{}>",
151 self.prev,
152 std::any::type_name::<W>(),
153 self.name,
154 std::any::type_name::<Out>(),
155 )
156 }
157}
158
159impl<Key, In, Out, Prev, W> Operator for WindowOperator<Key, In, Out, Prev, W>
160where
161 W: WindowManager<In = In, Out = Out> + Send,
162 Prev: Operator<Out = (Key, In)>,
163 Key: DataKey,
164 In: Data,
165 Out: Data,
166{
167 type Out = (Key, Out);
168
169 fn setup(&mut self, metadata: &mut crate::ExecutionMetadata) {
170 self.prev.setup(metadata);
171 }
172
173 fn next(&mut self) -> StreamElement<(Key, Out)> {
174 loop {
175 if let Some(item) = self.output_buffer.pop_front() {
176 return item;
177 }
178
179 let el = self.prev.next();
180 match el {
181 el @ (StreamElement::Item(_) | StreamElement::Timestamped(_, _)) => {
182 let (key, el) = el.take_key();
183 let key = key.unwrap();
184
185 let mgr = self
186 .manager
187 .windows
188 .entry(key.clone())
189 .or_insert_with(|| self.manager.init.clone());
190
191 let ret = mgr.process(el);
192 self.output_buffer.extend(
193 ret.into_iter()
194 .map(|e| StreamElement::from(e).add_key(key.clone())),
195 );
196 }
197 StreamElement::FlushBatch => return StreamElement::FlushBatch,
198 el => {
199 let (_, el) = el.take_key();
200
201 self.manager.windows.retain(|key, mgr| {
202 let ret = mgr.process(el.clone());
203 self.output_buffer.extend(
204 ret.into_iter()
205 .map(|e| StreamElement::from(e).add_key(key.clone())),
206 );
207 !mgr.recycle()
208 });
209
210 // Forward system messages and watermarks
211 let msg = match el {
212 StreamElement::Watermark(w) => StreamElement::Watermark(w),
213 StreamElement::Terminate => StreamElement::Terminate,
214 StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
215 _ => unreachable!(),
216 };
217 self.output_buffer.push_back(msg);
218 }
219 }
220 }
221 }
222
223 fn structure(&self) -> crate::block::BlockStructure {
224 self.prev
225 .structure()
226 .add_operator(OperatorStructure::new::<(Key, Out), _>(&self.name))
227 }
228}
229
230impl<Key, In, Out, Prev, W> WindowOperator<Key, In, Out, Prev, W>
231where
232 W: WindowManager,
233{
234 pub(crate) fn new(
235 prev: Prev,
236 name: String,
237 manager: KeyedWindowManager<Key, In, Out, W>,
238 ) -> Self {
239 Self {
240 prev,
241 name,
242 manager,
243 output_buffer: Default::default(),
244 }
245 }
246}
247
248impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
249where
250 WindowDescr: WindowDescription<Out>,
251 OperatorChain: Operator<Out = (Key, Out)> + 'static,
252 Key: DataKey,
253 Out: Data,
254{
255 /// Add a new generic window operator to a `KeyedWindowedStream`,
256 /// after adding a Reorder operator.
257 /// This should be used by every custom window aggregator.
258 pub(crate) fn add_window_operator<A, NewOut>(
259 self,
260 name: &str,
261 accumulator: A,
262 ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
263 where
264 NewOut: Data,
265 A: WindowAccumulator<In = Out, Out = NewOut>,
266 {
267 let stream = self.inner;
268 let init = self.descr.build::<A>(accumulator);
269
270 let manager: KeyedWindowManager<Key, Out, NewOut, WindowDescr::Manager<A>> =
271 KeyedWindowManager {
272 windows: HashMap::default(),
273 init,
274 _in: PhantomData,
275 _out: PhantomData,
276 };
277
278 stream // .add_operator(Reorder::new)
279 .add_operator(|prev| WindowOperator::new(prev, name.into(), manager))
280 }
281}
282
283impl<Key: DataKey, Out: Data, OperatorChain> KeyedStream<OperatorChain>
284where
285 OperatorChain: Operator<Out = (Key, Out)> + 'static,
286{
287 /// Apply a window to the stream.
288 ///
289 /// Returns a [`WindowedStream`], with windows created following the behavior specified
290 /// by the passed [`WindowDescription`].
291 ///
292 /// ## Example
293 /// ```
294 /// # use renoir::{StreamContext, RuntimeConfig};
295 /// # use renoir::operator::source::IteratorSource;
296 /// # use renoir::operator::window::CountWindow;
297 /// # let mut env = StreamContext::new_local();
298 /// let s = env.stream_iter(0..9);
299 /// let res = s
300 /// .group_by(|&n| n % 2)
301 /// .window(CountWindow::sliding(3, 2))
302 /// .sum()
303 /// .collect_vec();
304 ///
305 /// env.execute_blocking();
306 ///
307 /// let mut res = res.get().unwrap();
308 /// res.sort_unstable();
309 /// assert_eq!(res, vec![(0, 0 + 2 + 4), (0, 4 + 6 + 8), (1, 1 + 3 + 5)]);
310 /// ```
311 pub fn window<WinOut: Data, WinDescr: WindowDescription<Out>>(
312 self,
313 descr: WinDescr,
314 ) -> WindowedStream<impl Operator<Out = (Key, Out)>, WinOut, WinDescr> {
315 WindowedStream {
316 inner: self,
317 descr,
318 _win_out: PhantomData,
319 }
320 }
321}
322
323impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
324where
325 OperatorChain: Operator<Out = Out> + 'static,
326{
327 /// Send all elements to a single node and apply a window to the stream.
328 ///
329 /// Returns a [`WindowedStream`], with key `()` with windows created following the behavior specified
330 /// by the passed [`WindowDescription`].
331 ///
332 /// **Note**: this operator cannot be parallelized, so all the stream elements are sent to a
333 /// single node where the creation and aggregation of the windows are done.
334 ///
335 /// ## Example
336 /// ```
337 /// # use renoir::{StreamContext, RuntimeConfig};
338 /// # use renoir::operator::source::IteratorSource;
339 /// # use renoir::operator::window::CountWindow;
340 /// # let mut env = StreamContext::new_local();
341 /// let s = env.stream_iter(0..5usize);
342 /// let res = s
343 /// .window_all(CountWindow::tumbling(2))
344 /// .sum::<usize>()
345 /// .drop_key()
346 /// .collect_vec();
347 ///
348 /// env.execute_blocking();
349 ///
350 /// let mut res = res.get().unwrap();
351 /// assert_eq!(res, vec![0 + 1, 2 + 3]);
352 /// ```
353 pub fn window_all<WinOut: Data, WinDescr: WindowDescription<Out>>(
354 self,
355 descr: WinDescr,
356 ) -> WindowedStream<impl Operator<Out = ((), Out)>, WinOut, WinDescr> {
357 // replication and key_by are used instead of group_by so that there is exactly one
358 // replica, since window_all cannot be parallelized
359 self.replication(Replication::new_one())
360 .key_by(|_| ())
361 .window(descr)
362 }
363}