Struct renoir::StreamContext
source · pub struct StreamContext { /* private fields */ }
Expand description
Streaming environment from which it’s possible to register new streams and start the computation.
This is the entrypoint for the library: construct an environment providing an
RuntimeConfig
, then you can ask new streams providing the source from where to read from.
If you want to use a distributed environment (i.e. using remote workers) you have to spawn them
using spawn_remote_workers
before asking for some stream.
When all the stream have been registered you have to call execute
that will consume the
environment and start the computation. This function will return when the computation ends.
TODO: example usage
Implementations§
source§impl StreamContext
impl StreamContext
sourcepub fn new(config: RuntimeConfig) -> Self
pub fn new(config: RuntimeConfig) -> Self
Construct a new environment from the config.
pub fn new_local() -> Self
sourcepub fn stream<S>(&self, source: S) -> Stream<S>
pub fn stream<S>(&self, source: S) -> Stream<S>
Construct a new stream bound to this environment starting with the specified source.
sourcepub fn execute_blocking(self)
pub fn execute_blocking(self)
Start the computation. Blocks until the computation is complete.
Execute on a thread or use the async version [execute
]
for non-blocking alternatives
sourcepub fn parallelism(&self) -> CoordUInt
pub fn parallelism(&self) -> CoordUInt
Get the total number of processing cores in the cluster.
source§impl StreamContext
impl StreamContext
sourcepub fn stream_csv<T: Data + for<'a> Deserialize<'a>>(
&self,
path: impl Into<PathBuf>
) -> Stream<CsvSource<T>>
pub fn stream_csv<T: Data + for<'a> Deserialize<'a>>( &self, path: impl Into<PathBuf> ) -> Stream<CsvSource<T>>
Convenience method, creates a CsvSource
and makes a stream using StreamContext::stream
source§impl StreamContext
impl StreamContext
sourcepub fn stream_file<P: Into<PathBuf>>(&self, path: P) -> Stream<FileSource>
pub fn stream_file<P: Into<PathBuf>>(&self, path: P) -> Stream<FileSource>
Convenience method, creates a FileSource
and makes a stream using StreamContext::stream
source§impl StreamContext
impl StreamContext
sourcepub fn stream_iter<It>(&self, iterator: It) -> Stream<IteratorSource<It>>
pub fn stream_iter<It>(&self, iterator: It) -> Stream<IteratorSource<It>>
Convenience method, creates a IteratorSource
and makes a stream using StreamContext::stream
source§impl StreamContext
impl StreamContext
sourcepub fn stream_par_iter<Source>(
&self,
generator: Source
) -> Stream<ParallelIteratorSource<Source>>
pub fn stream_par_iter<Source>( &self, generator: Source ) -> Stream<ParallelIteratorSource<Source>>
Convenience method, creates a ParallelIteratorSource
and makes a stream using StreamContext::stream
§Example:
use renoir::prelude::*;
let env = StreamContext::new_local();
env.stream_par_iter(0..10)
.for_each(|q| println!("a: {q}"));
let n = 10;
env.stream_par_iter(
move |id, instances| {
let chunk_size = (n + instances - 1) / instances;
let remaining = n - n.min(chunk_size * id);
let range = remaining.min(chunk_size);
let start = id * chunk_size;
let stop = id * chunk_size + range;
start..stop
})
.for_each(|q| println!("b: {q}"));
env.execute_blocking();