Struct renoir::Stream

source ·
pub struct Stream<Op>
where Op: Operator,
{ /* private fields */ }
Expand description

A Stream represents a chain of operators that work on a flow of data. The type of the elements that is leaving the stream is Out.

Internally a stream is composed by a chain of blocks, each of which can be seen as a simpler stream with input and output types.

A block is internally composed of a chain of operators, nested like the Iterator from std. The type of the chain inside the block is OperatorChain and it’s required as type argument of the stream. This type only represents the chain inside the last block of the stream, not all the blocks inside of it.

Implementations§

source§

impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
where OperatorChain: Operator<Out = Out> + 'static,

source

pub fn iterate<Body, StateUpdate, State, L, G, C, OperatorChain2>( self, num_iterations: usize, initial_state: State, body: Body, local_fold: L, global_fold: G, loop_condition: C ) -> (Stream<impl Operator<Out = State>>, Stream<impl Operator<Out = Out>>)
where Body: FnOnce(Stream<Iterate<Out, State>>, IterationStateHandle<State>) -> Stream<OperatorChain2>, OperatorChain2: Operator<Out = Out> + 'static, L: Fn(&mut StateUpdate, Out) + Send + Clone + 'static, G: Fn(&mut State, StateUpdate) + Send + Clone + 'static, C: Fn(&mut State) -> bool + Send + Clone + 'static, StateUpdate: ExchangeData + Default, State: ExchangeData + Sync,

Construct an iterative dataflow where the input stream is fed inside a cycle. What comes out of the loop will be fed back at the next iteration.

This iteration is stateful, this means that all the replicas have a read-only access to the iteration state. The initial value of the state is given as parameter. When an iteration ends all the elements are reduced locally at each replica producing a DeltaUpdate. Those delta updates are later reduced on a single node that, using the global_fold function will compute the state for the next iteration. This state is also used in loop_condition to check whether the next iteration should start or not. loop_condition is also allowed to mutate the state.

The initial value of DeltaUpdate is initialized with Default::default().

The content of the loop has a new scope: it’s defined by the body function that takes as parameter the stream of data coming inside the iteration and a reference to the state. This function should return the stream of the data that exits from the loop (that will be fed back).

This construct produces two stream:

  • the first is a stream with a single item: the final state of the iteration
  • the second if the set of elements that exited the loop during the last iteration (i.e. the ones that should have been fed back in the next iteration).

Note: due to an internal limitation, it’s not currently possible to add an iteration operator when the stream has limited parallelism. This means, for example, that after a non-parallel source you have to add a shuffle.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..3).shuffle();
let (state, items) = s.iterate(
    3, // at most 3 iterations
    0, // the initial state is zero
    |s, state| s.map(|n| n + 10),
    |delta: &mut i32, n| *delta += n,
    |state, delta| *state += delta,
    |_state| true,
);
let state = state.collect_vec();
let items = items.collect_vec();
env.execute_blocking();

assert_eq!(state.get().unwrap(), vec![10 + 11 + 12 + 20 + 21 + 22 + 30 + 31 + 32]);
let mut sorted = items.get().unwrap();
sorted.sort();
assert_eq!(sorted, vec![30, 31, 32]);
source§

impl<Out: Data, OperatorChain> Stream<OperatorChain>
where OperatorChain: Operator<Out = Out> + 'static,

source

pub fn replay<Body, DeltaUpdate: ExchangeData + Default, State: ExchangeData + Sync, OperatorChain2>( self, num_iterations: usize, initial_state: State, body: Body, local_fold: impl Fn(&mut DeltaUpdate, Out) + Send + Clone + 'static, global_fold: impl Fn(&mut State, DeltaUpdate) + Send + Clone + 'static, loop_condition: impl Fn(&mut State) -> bool + Send + Clone + 'static ) -> Stream<impl Operator<Out = State>>
where Body: FnOnce(Stream<Replay<Out, State, OperatorChain>>, IterationStateHandle<State>) -> Stream<OperatorChain2>, OperatorChain2: Operator<Out = Out> + 'static,

Construct an iterative dataflow where the input stream is repeatedly fed inside a cycle, i.e. what comes into the cycle is replayed at every iteration.

This iteration is stateful, this means that all the replicas have a read-only access to the iteration state. The initial value of the state is given as parameter. When an iteration ends all the elements are reduced locally at each replica producing a DeltaUpdate. Those delta updates are later reduced on a single node that, using the global_fold function will compute the state for the next iteration. This state is also used in loop_condition to check whether the next iteration should start or not. loop_condition is also allowed to mutate the state.

The initial value of DeltaUpdate is initialized with Default::default().

The content of the loop has a new scope: it’s defined by the body function that takes as parameter the stream of data coming inside the iteration and a reference to the state. This function should return the stream of the data that exits from the loop (that will be fed back).

This construct produces a single stream with a single element: the final state of the iteration.

Note: due to an internal limitation, it’s not currently possible to add an iteration operator when the stream has limited parallelism. This means, for example, that after a non-parallel source you have to add a shuffle.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..3).shuffle();
let state = s.replay(
    3, // at most 3 iterations
    0, // the initial state is zero
    |s, state| s.map(|n| n + 10),
    |delta: &mut i32, n| *delta += n,
    |state, delta| *state += delta,
    |_state| true,
);
let state = state.collect_vec();
env.execute_blocking();

assert_eq!(state.get().unwrap(), vec![3 * (10 + 11 + 12)]);
source§

impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
where OperatorChain: Operator<Out = Out> + 'static,

source

pub fn join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>( self, rhs: Stream<OperatorChain2>, keyer1: Keyer1, keyer2: Keyer2 ) -> KeyedStream<impl Operator<Out = (Key, InnerJoinTuple<Out, Out2>)>>
where Key: DataKey, OperatorChain2: Operator<Out = Out2> + 'static, Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>, Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,

Given two stream, create a stream with all the pairs (left item from the left stream, right item from the right), such that the key obtained with keyer1 on an item from the left is equal to the key obtained with keyer2 on an item from the right.

This is an inner join, very similar to SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b).

This is a shortcut for: self.join_with(...).ship_hash().local_hash().inner().

Note: this operator will split the current block.

§Example
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
source

pub fn left_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>( self, rhs: Stream<OperatorChain2>, keyer1: Keyer1, keyer2: Keyer2 ) -> KeyedStream<impl Operator<Out = (Key, LeftJoinTuple<Out, Out2>)>>
where Key: DataKey, OperatorChain2: Operator<Out = Out2> + 'static, Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>, Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,

Given two stream, create a stream with all the pairs (left item from the left stream, right item from the right), such that the key obtained with keyer1 on an item from the left is equal to the key obtained with keyer2 on an item from the right.

This is a left join, meaning that if an item from the left does not find and element from the right with which make a pair, an extra pair (left, None) is generated. If you want to have a right join, you just need to switch the two sides and use a left join.

This is very similar to SELECT a, b FROM a LEFT JOIN b ON keyer1(a) = keyer2(b).

This is a shortcut for: self.join_with(...).ship_hash().local_hash().left().

Note: this operator will split the current block.

§Example
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.left_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, Some(0)), (0, Some(2)), (0, Some(4)), (1, Some(1)), (1, Some(3)), (2, None), (3, None), (4, None)]);
source

pub fn outer_join<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>( self, rhs: Stream<OperatorChain2>, keyer1: Keyer1, keyer2: Keyer2 ) -> KeyedStream<impl Operator<Out = (Key, OuterJoinTuple<Out, Out2>)>>
where Key: DataKey, OperatorChain2: Operator<Out = Out2> + 'static, Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>, Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,

Given two stream, create a stream with all the pairs (left item from the left stream, right item from the right), such that the key obtained with keyer1 on an item from the left is equal to the key obtained with keyer2 on an item from the right.

This is a full-outer join, meaning that if an item from the left does not find and element from the right with which make a pair, an extra pair (left, None) is generated. Similarly if an element from the right does not appear in any pair, a new one is generated with (None, right).

This is very similar to SELECT a, b FROM a FULL OUTER JOIN b ON keyer1(a) = keyer2(b).

This is a shortcut for: self.join_with(...).ship_hash().local_hash().outer().

Note: this operator will split the current block.

§Example
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.outer_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(Some(0), Some(0)), (Some(0), Some(2)), (Some(0), Some(4)), (Some(1), Some(1)), (Some(1), Some(3)), (Some(2), None), (Some(3), None), (Some(4), None)]);
source

pub fn join_with<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>( self, rhs: Stream<OperatorChain2>, keyer1: Keyer1, keyer2: Keyer2 ) -> JoinStream<Key, Out, Out2, OperatorChain, OperatorChain2, Keyer1, Keyer2>
where OperatorChain2: Operator<Out = Out2>, Keyer1: Fn(&Out) -> Key + KeyerFn<Key, Out>, Keyer2: Fn(&Out2) -> Key + KeyerFn<Key, Out2>,

Given two streams, start building a join operator.

The returned type allows you to customize the behaviour of the join. You can select which ship strategy and which local strategy to use.

Ship strategies

  • hash: the hash of the key is used to select where to send the elements
  • broadcast right: the left stream is left locally, while the right stream is broadcasted

Local strategies

  • hash: build an hashmap to match the tuples
  • sort and merge: collect all the tuples, sort them by key and merge them

The first strategy to select is the ship strategy. After choosing that you have to select the local strategy.

§Example
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_hash();
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_broadcast_right();
source§

impl<Op> Stream<Op>
where Op: Operator + 'static, Op::Out: ExchangeData,

source

pub fn merge<Op2>( self, oth: Stream<Op2> ) -> Stream<impl Operator<Out = Op::Out>>
where Op: 'static, Op2: Operator<Out = Op::Out> + 'static,

Merge the items of this stream with the items of another stream with the same type.

Note: the order of the resulting items is not specified.

Note: this operator will split the current block.

§Example
let s1 = env.stream_iter(0..10);
let s2 = env.stream_iter(10..20);
let res = s1.merge(s2).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, (0..20).collect::<Vec<_>>());
source§

impl<Out: ExchangeData, OperatorChain> Stream<OperatorChain>
where OperatorChain: Operator<Out = Out> + 'static,

source

pub fn window_all<WinOut: Data, WinDescr: WindowDescription<Out>>( self, descr: WinDescr ) -> WindowedStream<impl Operator<Out = ((), Out)>, WinOut, WinDescr>

Send all elements to a single node and apply a window to the stream.

Returns a WindowedStream, with key () with windows created following the behavior specified by the passed WindowDescription.

Note: this operator cannot be parallelized, so all the stream elements are sent to a single node where the creation and aggregation of the windows are done.

§Example
let s = env.stream_iter(0..5usize);
let res = s
    .window_all(CountWindow::tumbling(2))
    .sum::<usize>()
    .drop_key()
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
assert_eq!(res, vec![0 + 1, 2 + 3]);
source§

impl<Op> Stream<Op>
where Op: Operator + 'static,

source

pub fn add_timestamps<F, G>( self, timestamp_gen: F, watermark_gen: G ) -> Stream<AddTimestamp<F, G, Op>>
where F: FnMut(&Op::Out) -> Timestamp + Clone + Send + 'static, G: FnMut(&Op::Out, &Timestamp) -> Option<Timestamp> + Clone + Send + 'static,

Given a stream without timestamps nor watermarks, tag each item with a timestamp and insert watermarks.

The two functions given to this operator are the following:

  • timestamp_gen returns the timestamp assigned to the provided element of the stream
  • watermark_gen returns an optional watermark to add after the provided element

Note that the two functions must follow the watermark semantics. TODO: link to watermark semantics

§Example

In this example the stream contains the integers from 0 to 9, each will be tagged with a timestamp with the value of the item as milliseconds, and after each even number a watermark will be inserted.

use renoir::operator::Timestamp;

let s = env.stream_iter(0..10);
s.add_timestamps(
    |&n| n,
    |&n, &ts| if n % 2 == 0 { Some(ts) } else { None }
);
source

pub fn drop_timestamps(self) -> Stream<DropTimestamp<Op>>

source

pub fn batch_mode(self, batch_mode: BatchMode) -> Self

Change the batch mode for this stream.

This change will be propagated to all the operators following, even of the next blocks, until it’s changed again.

§Example
use renoir::BatchMode;

let s = env.stream_iter(0..10);
s.batch_mode(BatchMode::fixed(1024));
source

pub fn filter_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
where F: Fn(Op::Out) -> Option<O> + Send + Clone + 'static, O: Data,

Remove from the stream all the elements for which the provided function returns None and keep the elements that returned Some(_).

Note: this is very similar to Iteartor::filter_map

§Example
let s = env.stream_iter(0..10);
let res = s.filter_map(|n| if n % 2 == 0 { Some(n * 3) } else { None }).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0, 6, 12, 18, 24])
source

pub fn filter<F>(self, predicate: F) -> Stream<impl Operator<Out = Op::Out>>
where F: Fn(&Op::Out) -> bool + Clone + Send + 'static,

Remove from the stream all the elements for which the provided predicate returns false.

Note: this is very similar to Iteartor::filter

§Example
let s = env.stream_iter(0..10);
let res = s.filter(|&n| n % 2 == 0).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0, 2, 4, 6, 8])
source

pub fn reorder(self) -> Stream<impl Operator<Out = Op::Out>>

Reorder timestamped items

§Example
§TODO
source

pub fn rich_filter_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
where F: FnMut(Op::Out) -> Option<O> + Send + Clone + 'static, O: Data,

Remove from the stream all the elements for which the provided function returns None and keep the elements that returned Some(_). The mapping function can be stateful.

This is equivalent to Stream::filter_map but with a stateful function.

Since the mapping function can be stateful, it is a FnMut. This allows expressing simple algorithms with very few lines of code (see examples).

The mapping function is cloned inside each replica, and they will not share state between each other. If you want that only a single replica handles all the items you may want to change the parallelism of this operator with Stream::replication.

§Examples

This will emit only the positive prefix-sums.

let s = env.stream_iter((std::array::IntoIter::new([1, 2, -5, 3, 1])));
let res = s.rich_filter_map({
    let mut sum = 0;
    move |x| {
        sum += x;
        if sum >= 0 {
            Some(sum)
        } else {
            None
        }
    }
}).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![1, 1 + 2, /* 1 + 2 - 5, */ 1 + 2 - 5 + 3, 1 + 2 - 5 + 3 + 1]);
source

pub fn rich_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
where F: FnMut(Op::Out) -> O + Send + Clone + 'static, O: Send + 'static,

Map the elements of the stream into new elements. The mapping function can be stateful.

This is equivalent to Stream::map but with a stateful function.

Since the mapping function can be stateful, it is a FnMut. This allows expressing simple algorithms with very few lines of code (see examples).

The mapping function is cloned inside each replica, and they will not share state between each other. If you want that only a single replica handles all the items you may want to change the parallelism of this operator with Stream::replication.

§Examples

This is a simple implementation of the prefix-sum using a single replica (i.e. each element is mapped to the sum of all the elements up to that point). Note that this won’t work if there are more replicas.

let s = env.stream_iter(1..=5);
let res = s.rich_map({
    let mut sum = 0;
    move |x| {
        sum += x;
        sum
    }
}).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![1, 1 + 2, 1 + 2 + 3, 1 + 2 + 3 + 4, 1 + 2 + 3 + 4 + 5]);

This will enumerate all the elements that reach a replica. This is basically equivalent to the enumerate function in Python.

let s = env.stream_iter(1..=5);
let res = s.rich_map({
    let mut id = 0;
    move |x| {
        id += 1;
        (id - 1, x)
    }
}).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]);
source

pub fn map<O: Send, F>(self, f: F) -> Stream<impl Operator<Out = O>>
where F: Fn(Op::Out) -> O + Send + Clone + 'static,

Map the elements of the stream into new elements.

Note: this is very similar to Iteartor::map.

§Example
let s = env.stream_iter(0..5);
let res = s.map(|n| n * 10).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0, 10, 20, 30, 40]);
source

pub fn map_memo_by<K: DataKey + Sync, O: Clone + Send + Sync + 'static, F, Fk>( self, f: F, fk: Fk, capacity: usize ) -> Stream<impl Operator<Out = O>>
where F: Fn(Op::Out) -> O + Send + Clone + 'static, Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,

Map the elements of the stream into new elements. Use memoization to cache outputs for previously seen inputs.

The cache is implemented through a per-process [quick_cache::sync::Cache]. The maximum number of elements to be cached is passed as the capacity parameter.

The outputs are cached according to the key produced by the fk function.

§Example
let s = env.stream_iter(5..15);
let res = s.map_memo_by(|n| (n * n) % 7, |n| n % 7, 5).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
source

pub fn fold<O, F>(self, init: O, f: F) -> Stream<impl Operator<Out = O>>
where F: Fn(&mut O, Op::Out) + Send + Clone + 'static, Op::Out: ExchangeData, O: Send + Clone,

Fold the stream into a stream that emits a single value.

The folding operator consists in adding to the current accumulation value (initially the value provided as init) the value of the current item in the stream.

The folding function is provided with a mutable reference to the current accumulator and the owned item of the stream. The function should modify the accumulator without returning anything.

Note that the output type may be different from the input type. Consider using Stream::reduce if the output type is the same as the input type.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator is not parallelized, it creates a bottleneck where all the stream elements are sent to and the folding is done using a single thread.

Note: this is very similar to Iteartor::fold.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s.fold(0, |acc, value| *acc += value).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
source

pub fn fold_assoc<O, F, G>( self, init: O, local: F, global: G ) -> Stream<impl Operator<Out = O>>
where F: Fn(&mut O, Op::Out) + Send + Clone + 'static, G: Fn(&mut O, O) + Send + Clone + 'static, O: ExchangeData,

Fold the stream into a stream that emits a single value.

The folding operator consists in adding to the current accumulation value (initially the value provided as init) the value of the current item in the stream.

This method is very similary to Stream::fold, but performs the folding distributely. To do so the folding function must be associative, in particular the folding process is performed in 2 steps:

  • local: the local function is used to fold the elements present in each replica of the stream independently. All those replicas will start with the same init value.
  • global: all the partial results (the elements produced by the local step) have to be aggregated into a single result. This is done using the global folding function.

Note that the output type may be different from the input type, therefore requireing different function for the aggregation. Consider using Stream::reduce_assoc if the output type is the same as the input type.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
source

pub fn group_by_fold<K, O, Fk, F, G>( self, keyer: Fk, init: O, local: F, global: G ) -> KeyedStream<impl Operator<Out = (K, O)>>
where Fk: Fn(&Op::Out) -> K + Send + Clone + 'static, F: Fn(&mut O, Op::Out) + Send + Clone + 'static, G: Fn(&mut O, O) + Send + Clone + 'static, K: ExchangeDataKey, O: ExchangeData, Op::Out: Clone,

Perform the folding operation separately for each key.

This is equivalent of partitioning the stream using the keyer function, and then applying Stream::fold_assoc to each partition separately.

Note however that there is a difference between stream.group_by(keyer).fold(...) and stream.group_by_fold(keyer, ...). The first performs the network shuffle of every item in the stream, and later performs the folding (i.e. nearly all the elements will be sent to the network). The latter avoids sending the items by performing first a local reduction on each host, and then send only the locally folded results (i.e. one message per replica, per key); then the global step is performed aggregating the results.

The resulting stream will still be keyed and will contain only a single message per key (the final result).

Note that the output type may be different from the input type, therefore requireing different function for the aggregation. Consider using Stream::group_by_reduce if the output type is the same as the input type.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_fold(|&n| n % 2, 0, |acc, value| *acc += value, |acc, value| *acc += value)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
source

pub fn fold_scan<O, SL, SG, L, G, F>( self, local_fold: L, global_fold: G, global_init: SG, map: F ) -> Stream<impl Operator<Out = O>>
where Op::Out: ExchangeData, L: Fn(&mut SL, Op::Out) + Send + Clone + 'static, G: Fn(&mut SG, SL) + Send + Clone + 'static, F: Fn(Op::Out, &SG) -> O + Send + Clone + 'static, SL: ExchangeData + Default, SG: ExchangeData + Sync, O: ExchangeData,

source

pub fn reduce_scan<O, S, F1, F2, R>( self, first_map: F1, reduce: R, second_map: F2 ) -> Stream<impl Operator<Out = O>>
where Op::Out: ExchangeData, F1: Fn(Op::Out) -> S + Send + Clone + 'static, F2: Fn(Op::Out, &S) -> O + Send + Clone + 'static, R: Fn(S, S) -> S + Send + Clone + 'static, S: ExchangeData + Sync, O: ExchangeData,

source

pub fn unique_assoc(self) -> Stream<impl Operator<Out = Op::Out>>
where Op::Out: Hash + Eq + Clone + ExchangeData + Sync,

Deduplicate elements. The resulting stream will contain exactly one occurrence for each unique element in the input stream

The current implementation requires Hash and Eq and it will repartition the stream setting replication to Unlimited

source

pub fn key_by<K, Fk>( self, keyer: Fk ) -> KeyedStream<impl Operator<Out = (K, Op::Out)>>
where Fk: Fn(&Op::Out) -> K + Send + Clone + 'static, K: DataKey,

Construct a KeyedStream from a Stream without shuffling the data.

Note: this violates the semantics of KeyedStream, without sending all the values with the same key to the same replica some of the following operators may misbehave. You probably need to use Stream::group_by instead.

§Example
let s = env.stream_iter(0..5);
let res = s.key_by(|&n| n % 2).collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
source

pub fn inspect<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
where F: FnMut(&Op::Out) + Send + Clone + 'static,

Apply the given function to all the elements of the stream, consuming the stream.

§Example
let s = env.stream_iter(0..5);
s.inspect(|n| println!("Item: {}", n)).for_each(std::mem::drop);

env.execute_blocking();
source

pub fn rich_flat_map<It, F>(self, f: F) -> Stream<impl Operator<Out = It::Item>>
where It: IntoIterator + Send + 'static, <It as IntoIterator>::IntoIter: Send + 'static, <It as IntoIterator>::Item: Send, F: FnMut(Op::Out) -> It + Send + Clone + 'static,

Apply a mapping operation to each element of the stream, the resulting stream will be the flattened values of the result of the mapping. The mapping function can be stateful.

This is equivalent to Stream::flat_map but with a stateful function.

Since the mapping function can be stateful, it is a FnMut. This allows expressing simple algorithms with very few lines of code (see examples).

The mapping function is cloned inside each replica, and they will not share state between each other. If you want that only a single replica handles all the items you may want to change the parallelism of this operator with Stream::replication.

§Examples

This will emit only the positive prefix-sums.

let s = env.stream_iter(0..=3);
let res = s.rich_flat_map({
    let mut elements = Vec::new();
    move |y| {
        let new_pairs = elements
            .iter()
            .map(|&x: &u32| (x, y))
            .collect::<Vec<_>>();
        elements.push(y);
        new_pairs
    }
}).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![(0, 1), (0, 2), (1, 2), (0, 3), (1, 3), (2, 3)]);
source

pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
where F: FnMut(ElementGenerator<'_, Op>) -> StreamElement<O> + Clone + Send + 'static, O: Send,

Map the elements of the stream into new elements. The mapping function can be stateful.

This version of rich_flat_map is a lower level primitive that gives full control over the inner types used in streams. It can be used to define custom unary operators.

The closure must follow these rules to ensure the correct behaviour of renoir:

  • Watermark messages must be sent when no more items with lower timestamp will ever be produced
  • FlushBatch messages must be forwarded if received
  • For each FlushAndRestart and Terminate message received, the operator must generate one and only one message of the same kind. No other messages of this kind should be created

The mapping function is cloned inside each replica, and they will not share state between each other. If you want that only a single replica handles all the items you may want to change the parallelism of this operator with Stream::replication.

§Examples

TODO

source

pub fn flat_map<It, F>(self, f: F) -> Stream<impl Operator<Out = It::Item>>
where It: IntoIterator, It::IntoIter: Send, It::Item: Send, F: Fn(Op::Out) -> It + Send + Clone,

Apply a mapping operation to each element of the stream, the resulting stream will be the flatMaped values of the result of the mapping.

Note: this is very similar to Iteartor::flat_map

§Example
let s = env.stream_iter(0..3);
let res = s.flat_map(|n| vec![n, n]).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0, 0, 1, 1, 2, 2]);
source

pub fn for_each<F>(self, f: F)
where F: FnMut(Op::Out) + Send + Clone + 'static,

Apply the given function to all the elements of the stream, consuming the stream.

§Example
let s = env.stream_iter(0..5);
s.for_each(|n| println!("Item: {}", n));

env.execute_blocking();
source

pub fn flatten( self ) -> Stream<impl Operator<Out = <Op::Out as IntoIterator>::Item>>
where Op::Out: IntoIterator, <Op::Out as IntoIterator>::IntoIter: Send, <Op::Out as IntoIterator>::Item: Send,

Transform this stream of containers into a stream of all the contained values.

Note: this is very similar to Iteartor::flatten

§Example
let s = env.stream_iter((vec![
    vec![1, 2, 3],
    vec![],
    vec![4, 5],
].into_iter()));
let res = s.flatten().collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]);
source§

impl<I, Op> Stream<Op>
where I: ExchangeData, Op: Operator<Out = I> + 'static,

source

pub fn broadcast(self) -> Stream<impl Operator<Out = Op::Out>>

Duplicate each element of the stream and forward it to all the replicas of the next block.

Note: this will duplicate the elements of the stream, this is potentially a very expensive operation.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10);
s.broadcast();
source

pub fn group_by<K, Fk>( self, keyer: Fk ) -> KeyedStream<impl Operator<Out = (K, I)>>
where Fk: Fn(&Op::Out) -> K + Send + Clone + 'static, K: DataKey,

Given a stream, make a KeyedStream partitioning the values according to a key generated by the keyer function provided.

The returned KeyedStream is partitioned by key, and all the operators added to it will be evaluated after the network shuffle. Therefore all the items are sent to the network (if their destination is not the local host). In many cases this behaviour can be avoided by using the associative variant of the operators (e.g. Stream::group_by_reduce, Stream::group_by_sum, …).

Note: the keys are not sent to the network, they are built on the sending side, and rebuilt on the receiving side.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let keyed = s.group_by(|&n| n % 2); // partition even and odd elements
source

pub fn group_by_max_element<K, V, Fk, Fv>( self, keyer: Fk, get_value: Fv ) -> KeyedStream<impl Operator<Out = (K, I)>>
where Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K, Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V, K: ExchangeDataKey, V: Ord,

Find, for each partition of the stream, the item with the largest value.

The stream is partitioned using the keyer function and the value to compare is obtained with get_value.

This operation is associative, therefore the computation is done in parallel before sending all the elements to the network.

Note: the comparison is done using the value returned by get_value, but the resulting items have the same type as the input.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_max_element(|&n| n % 2, |&n| n)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 4), (1, 3)]);
source

pub fn group_by_sum<K, V, Fk, Fv>( self, keyer: Fk, get_value: Fv ) -> KeyedStream<impl Operator<Out = (K, V)>>
where Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K, Fv: Fn(Op::Out) -> V + Clone + Send + 'static, V: ExchangeData + AddAssign, K: ExchangeDataKey,

Find, for each partition of the stream, the sum of the values of the items.

The stream is partitioned using the keyer function and the value to sum is obtained with get_value.

This operation is associative, therefore the computation is done in parallel before sending all the elements to the network.

Note: this is similar to the SQL: SELECT SUM(value) ... GROUP BY key

Note: the type of the result does not have to be a number, any type that implements AddAssign is accepted.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_sum(|&n| n % 2, |n| n)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
source

pub fn group_by_avg<K, V, Fk, Fv>( self, keyer: Fk, get_value: Fv ) -> KeyedStream<impl Operator<Out = (K, V)>>
where Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K, Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V, V: ExchangeData + AddAssign + Div<f64, Output = V>, K: ExchangeDataKey,

Find, for each partition of the stream, the average of the values of the items.

The stream is partitioned using the keyer function and the value to average is obtained with get_value.

This operation is associative, therefore the computation is done in parallel before sending all the elements to the network.

Note: this is similar to the SQL: SELECT AVG(value) ... GROUP BY key

Note: the type of the result does not have to be a number, any type that implements AddAssign and can be divided by f64 is accepted.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_avg(|&n| n % 2, |&n| n as f64)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_by_key(|(k, _)| *k);
assert_eq!(res, vec![(0, (0.0 + 2.0 + 4.0) / 3.0), (1, (1.0 + 3.0) / 2.0)]);
source

pub fn group_by_count<K, Fk>( self, keyer: Fk ) -> KeyedStream<impl Operator<Out = (K, usize)>>
where Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K, K: ExchangeDataKey,

Count, for each partition of the stream, the number of items.

The stream is partitioned using the keyer function.

This operation is associative, therefore the computation is done in parallel before sending all the elements to the network.

Note: this is similar to the SQL: SELECT COUNT(*) ... GROUP BY key

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_count(|&n| n % 2)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_by_key(|(k, _)| *k);
assert_eq!(res, vec![(0, 3), (1, 2)]);
source

pub fn group_by_min_element<K, V, Fk, Fv>( self, keyer: Fk, get_value: Fv ) -> KeyedStream<impl Operator<Out = (K, I)>>
where Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K, Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V, K: ExchangeDataKey, V: Ord,

Find, for each partition of the stream, the item with the smallest value.

The stream is partitioned using the keyer function and the value to compare is obtained with get_value.

This operation is associative, therefore the computation is done in parallel before sending all the elements to the network.

Note: the comparison is done using the value returned by get_value, but the resulting items have the same type as the input.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_min_element(|&n| n % 2, |&n| n)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0), (1, 1)]);
source

pub fn group_by_reduce<K, Fk, F>( self, keyer: Fk, f: F ) -> KeyedStream<impl Operator<Out = (K, I)>>
where Fk: Fn(&Op::Out) -> K + Send + Clone + 'static, F: Fn(&mut I, I) + Send + Clone + 'static, K: ExchangeDataKey,

Perform the reduction operation separately for each key.

This is equivalent of partitioning the stream using the keyer function, and then applying Stream::reduce_assoc to each partition separately.

Note however that there is a difference between stream.group_by(keyer).reduce(...) and stream.group_by_reduce(keyer, ...). The first performs the network shuffle of every item in the stream, and later performs the reduction (i.e. nearly all the elements will be sent to the network). The latter avoids sending the items by performing first a local reduction on each host, and then send only the locally reduced results (i.e. one message per replica, per key); then the global step is performed aggregating the results.

The resulting stream will still be keyed and will contain only a single message per key (the final result).

Note that the output type must be the same as the input type, if you need a different type consider using Stream::group_by_fold.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s
    .group_by_reduce(|&n| n % 2, |acc, value| *acc += value)
    .collect_vec();

env.execute_blocking();

let mut res = res.get().unwrap();
res.sort_unstable();
assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
source

pub fn interval_join<I2, Op2>( self, right: Stream<Op2>, lower_bound: Timestamp, upper_bound: Timestamp ) -> Stream<impl Operator<Out = (I, I2)>>
where I2: ExchangeData, Op2: Operator<Out = I2> + 'static,

Given two streams with timestamps join them according to an interval centered around the timestamp of the left side.

This means that an element on the left side with timestamp T will be joined to all the elements on the right with timestamp Q such that T - lower_bound <= Q <= T + upper_bound.

Note: this operator is not parallelized, all the elements are sent to a single node to perform the join.

Note: this operator will split the current block.

§Example

TODO: example

source

pub fn replication( self, replication: Replication ) -> Stream<impl Operator<Out = Op::Out>>

Change the maximum parallelism of the following operators.

Note: this operator is pretty advanced, some operators may need to be fully replicated and will fail otherwise.

source

pub fn repartition_by<Fk: KeyerFn<u64, Op::Out>>( self, replication: Replication, partition_fn: Fk ) -> Stream<impl Operator<Out = Op::Out>>

Advanced operator that allows changing the replication and forwarding strategy

Note: this operator is advanced and is only intended to add functionality that is not achievable with other operators. Use with care

source

pub fn reduce<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
where F: Fn(I, I) -> I + Send + Clone + 'static,

Reduce the stream into a stream that emits a single value.

The reducing operator consists in adding to the current accumulation value the value of the current item in the stream.

The reducing function is provided with a mutable reference to the current accumulator and the owned item of the stream. The function should modify the accumulator without returning anything.

Note that the output type must be the same as the input type, if you need a different type consider using Stream::fold.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator is not parallelized, it creates a bottleneck where all the stream elements are sent to and the folding is done using a single thread.

Note: this is very similar to Iteartor::reduce.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s.reduce(|a, b| a + b).collect::<Vec<_>>();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
source

pub fn reduce_assoc<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
where F: Fn(I, I) -> I + Send + Clone + 'static,

Reduce the stream into a stream that emits a single value.

The reducing operator consists in adding to the current accumulation value the value of the current item in the stream.

This method is very similary to Stream::reduce, but performs the reduction distributely. To do so the reducing function must be associative, in particular the reducing process is performed in 2 steps:

  • local: the reducing function is used to reduce the elements present in each replica of the stream independently.
  • global: all the partial results (the elements produced by the local step) have to be aggregated into a single result.

Note that the output type must be the same as the input type, if you need a different type consider using Stream::fold_assoc.

Note: this operator will retain all the messages of the stream and emit the values only when the stream ends. Therefore this is not properly streaming.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s.reduce_assoc(|a, b| a + b).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
source

pub fn route(self) -> RouterBuilder<I, Op>

Route each element depending on its content.

  • Routes are created with the add_route method, a new stream is created for each route.
  • Each element is routed to the first stream for which the routing condition evaluates to true.
  • If no route condition is satisfied, the element is dropped

Note: this operator will split the current block.

§Example
let mut routes = s.route()
    .add_route(|&i| i < 5)
    .add_route(|&i| i % 2 == 0)
    .build()
    .into_iter();
assert_eq!(routes.len(), 2);
// 0 1 2 3 4
routes.next().unwrap().for_each(|i| eprintln!("route1: {i}"));
// 6 8
routes.next().unwrap().for_each(|i| eprintln!("route2: {i}"));
// 5 7 9 ignored
env.execute_blocking();
source

pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>>

Perform a network shuffle sending the messages to a random replica.

This can be useful if for some reason the load is very unbalanced (e.g. after a very unbalanced Stream::group_by).

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let res = s.shuffle();
source

pub fn split(self, splits: usize) -> Vec<Stream<impl Operator<Out = Op::Out>>>

Split the stream into splits streams, each with all the elements of the first one.

This will effectively duplicate every item in the stream into the newly created streams.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..5);
let mut splits = s.split(3);
let a = splits.pop().unwrap();
let b = splits.pop().unwrap();
let c = splits.pop().unwrap();
source

pub fn zip<I2, Op2>( self, oth: Stream<Op2> ) -> Stream<impl Operator<Out = (I, I2)>>
where Op2: Operator<Out = I2> + 'static, I2: ExchangeData,

Given two Streams, zip their elements together: the resulting stream will be a stream of pairs, each of which is an element from both streams respectively.

Note: all the elements after the end of one of the streams are discarded (i.e. the resulting stream will have a number of elements that is the minimum between the lengths of the two input streams).

Note: this operator will split the current block.

§Example
let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
let res = s1.zip(s2).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![('A', 1), ('B', 2), ('C', 3)]);
source

pub fn collect_channel(self) -> Receiver<I>

Close the stream and send resulting items to a channel on a single host.

If the stream is distributed among multiple replicas, parallelism will be set to 1 to gather all results

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10u32);
let rx = s.collect_channel();

env.execute_blocking();
let mut v = Vec::new();
while let Ok(x) = rx.recv() {
    v.push(x)
}
assert_eq!(v, (0..10u32).collect::<Vec<_>>());
source

pub fn collect_channel_parallel(self) -> Receiver<I>

Close the stream and send resulting items to a channel on each single host.

Each host sends its outputs to the channel without repartitioning. Elements will be sent to the channel on the same host that produced the output.

Note: the order of items and keys is unspecified.

§Example
let s = env.stream_iter(0..10u32);
let rx = s.collect_channel();

env.execute_blocking();
let mut v = Vec::new();
while let Ok(x) = rx.recv() {
    v.push(x)
}
assert_eq!(v, (0..10u32).collect::<Vec<_>>());
source

pub fn collect_count(self) -> StreamOutput<usize>

Close the stream and store all the resulting items into a Vec on a single host.

If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10);
let res = s.collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
source

pub fn collect_vec(self) -> StreamOutput<Vec<I>>

Close the stream and store all the resulting items into a Vec on a single host.

If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10);
let res = s.collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
source

pub fn collect_vec_all(self) -> StreamOutput<Vec<I>>

Close the stream and store all the resulting items into a Vec on a single host.

If the stream is distributed among multiple replicas, a bottleneck is placed where all the replicas sends the items to.

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10);
let res = s.collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
source

pub fn collect<C: FromIterator<I> + Send + 'static>(self) -> StreamOutput<C>

Close the stream and store all the resulting items into a collection on a single host.

If the stream is distributed among multiple replicas, parallelism will be set to 1 to gather all results

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10);
let res = s.collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
source

pub fn collect_all<C: FromIterator<I> + Send + 'static>(self) -> StreamOutput<C>

Close the stream and store all the resulting items into a collection on each single host.

Partitioning will be set to Host and results will be replicated

Note: the order of items and keys is unspecified.

Note: this operator will split the current block.

§Example
let s = env.stream_iter(0..10);
let res = s.collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
source§

impl<Op> Stream<Op>
where Op::Out: Clone + Hash + Eq + Sync, Op: Operator + 'static,

source

pub fn map_memo<O: Data + Sync, F>( self, f: F, capacity: usize ) -> Stream<impl Operator<Out = O>>
where F: Fn(Op::Out) -> O + Send + Clone + 'static,

Map the elements of the stream into new elements. Use memoization to cache outputs for previously seen inputs.

The cache is implemented through a per-process [quick_cache::sync::Cache]. The maximum number of elements to be cached is passed as the capacity parameter.

§Example
let s = env.stream_iter((0..4).cycle().take(10));
let res = s.map_memo(|n| n * n, 5).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
source§

impl<Op> Stream<Op>
where Op: Operator,

source

pub fn add_operator<Op2, GetOp>(self, get_operator: GetOp) -> Stream<Op2>
where Op2: Operator, GetOp: FnOnce(Op) -> Op2,

Add a new operator to the current chain inside the stream. This consumes the stream and returns a new one with the operator added.

get_operator is a function that is given the previous chain of operators and should return the new chain of operators. The new chain cannot be simply passed as argument since it is required to do a partial move of the InnerBlock structure.

Note: this is an advanced function that manipulates the block structure. Probably it is not what you are looking for.

source§

impl<OperatorChain> Stream<OperatorChain>
where OperatorChain: Operator, OperatorChain::Out: KeyedItem,

source

pub fn to_keyed(self) -> KeyedStream<OperatorChain>

TODO DOCS

Auto Trait Implementations§

§

impl<Op> !RefUnwindSafe for Stream<Op>

§

impl<Op> Send for Stream<Op>

§

impl<Op> Sync for Stream<Op>
where Op: Sync,

§

impl<Op> Unpin for Stream<Op>
where Op: Unpin,

§

impl<Op> !UnwindSafe for Stream<Op>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more