>>> import Data.HashMap.Strict (HashMap, fromList) >>> import qualified Streamly.Data.Fold.Prelude as Fold >>> import qualified Streamly.Data.Stream as StreamConsider a stream of key value pairs:
>>> input = Stream.fromList [("k1",1),("k1",1.1),("k2",2), ("k2",2.2)]Classify each key to a different hash bin and fold the bins:
>>> classify = Fold.toHashMapIO fst (Fold.lmap snd Fold.toList) >>> Stream.fold classify input :: IO (HashMap String [Double]) fromList [("k2",[2.0,2.2]),("k1",[1.0,1.1])]Pre-release
>>> parConcatMap modifier f stream = Stream.parConcat modifier $ fmap f streamExamples:
>>> f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap cfg id $ Stream.fromList xsThe following streams finish in 4 seconds:
>>> stream1 = Stream.fromEffect (delay 4) >>> stream2 = Stream.fromEffect (delay 2) >>> stream3 = Stream.fromEffect (delay 1) >>> f id [stream1, stream2, stream3] 1 sec 2 sec 4 sec [1,2,4]Limiting threads to 2 schedules the third stream only after one of the first two has finished, releasing a thread:
>>> f (Stream.maxThreads 2) [stream1, stream2, stream3] ... [2,1,4]When used with a Single thread it behaves like serial concatMap:
>>> f (Stream.maxThreads 1) [stream1, stream2, stream3] ... [4,2,1]
>>> stream1 = Stream.fromList [1,2,3] >>> stream2 = Stream.fromList [4,5,6] >>> f (Stream.maxThreads 1) [stream1, stream2] [1,2,3,4,5,6]Schedule all streams in a round robin fashion over the available threads:
>>> f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap (Stream.interleaved True . cfg) id $ Stream.fromList xs
>>> stream1 = Stream.fromList [1,2,3] >>> stream2 = Stream.fromList [4,5,6] >>> f (Stream.maxThreads 1) [stream1, stream2] [1,4,2,5,3,6]
>>> parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)For example, the following finishes in 3 seconds (as opposed to 6 seconds) because all actions run in parallel. Even though results are available out of order they are ordered due to the config option:
>>> f x = delay x >> return x >>> Stream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1] 1 sec 2 sec 3 sec [3,2,1]
>>> rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 1 >>> report = Stream.fold (Fold.drainMapM print) . rate >>> tap = Stream.parTapCount (const True) report >>> go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0Note: This may not work correctly on 32-bit machines because of Int overflow. Pre-release
>>> concatMap f = Stream.concatMapM (return . f) >>> concatMap f = Stream.concatMapWith Stream.serial f >>> concatMap f = Stream.concat . Stream.map f
concatMapFoldableWith async return [1..3]Equivalent to:
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith) Since: 0.1.0 (Streamly)
concatSmapMWith combine f initial = concatMapWith combine id . smapM f initialPre-release