ap package:streamly

In many situations, the liftM operations can be replaced by uses of ap, which promotes function application.
return f `ap` x1 `ap` ... `ap` xn
is equivalent to
liftMn f x1 x2 ... xn
Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream. IMPORTANT NOTE: This could be 100x faster than serial/<> for appending a few (say 100) streams because it can fuse via stream fusion. However, it does not scale for a large number of streams (say 1000s) and becomes qudartically slow. Therefore use this for custom appending of a few streams but use concatMap or 'concatMapWith serial' for appending n streams or infinite containers of streams. Pre-release
Same as |$. Internal
Split the input stream based on a hashable component of the key field and fold each split using the given fold. Useful for map/reduce, bucketizing the input in different bins or for generating histograms.
>>> import Data.HashMap.Strict (HashMap, fromList)

>>> import qualified Streamly.Data.Fold.Prelude as Fold

>>> import qualified Streamly.Data.Stream as Stream
Consider a stream of key value pairs:
>>> input = Stream.fromList [("k1",1),("k1",1.1),("k2",2), ("k2",2.2)]
Classify each key to a different hash bin and fold the bins:
>>> classify = Fold.toHashMapIO fst (Fold.lmap snd Fold.toList)

>>> Stream.fold classify input :: IO (HashMap String [Double])
fromList [("k2",[2.0,2.2]),("k1",[1.0,1.1])]
Pre-release
Apply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.
Map each element of the input to a stream and then concurrently evaluate and concatenate the resulting streams. Multiple streams may be evaluated concurrently but earlier streams are perferred. Output from the streams are used as they arrive. Definition:
>>> parConcatMap modifier f stream = Stream.parConcat modifier $ fmap f stream
Examples:
>>> f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap cfg id $ Stream.fromList xs
The following streams finish in 4 seconds:
>>> stream1 = Stream.fromEffect (delay 4)

>>> stream2 = Stream.fromEffect (delay 2)

>>> stream3 = Stream.fromEffect (delay 1)

>>> f id [stream1, stream2, stream3]
1 sec
2 sec
4 sec
[1,2,4]
Limiting threads to 2 schedules the third stream only after one of the first two has finished, releasing a thread:
>>> f (Stream.maxThreads 2) [stream1, stream2, stream3]
...
[2,1,4]
When used with a Single thread it behaves like serial concatMap:
>>> f (Stream.maxThreads 1) [stream1, stream2, stream3]
...
[4,2,1]
>>> stream1 = Stream.fromList [1,2,3]

>>> stream2 = Stream.fromList [4,5,6]

>>> f (Stream.maxThreads 1) [stream1, stream2]
[1,2,3,4,5,6]
Schedule all streams in a round robin fashion over the available threads:
>>> f cfg xs = Stream.fold Fold.toList $ Stream.parConcatMap (Stream.interleaved True . cfg) id $ Stream.fromList xs
>>> stream1 = Stream.fromList [1,2,3]

>>> stream2 = Stream.fromList [4,5,6]

>>> f (Stream.maxThreads 1) [stream1, stream2]
[1,4,2,5,3,6]
Definition:
>>> parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)
For example, the following finishes in 3 seconds (as opposed to 6 seconds) because all actions run in parallel. Even though results are available out of order they are ordered due to the config option:
>>> f x = delay x >> return x

>>> Stream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1]
1 sec
2 sec
3 sec
[3,2,1]
parTapCount predicate fold stream taps the count of those elements in the stream that pass the predicate. The resulting count stream is sent to a fold running concurrently in another thread. For example, to print the count of elements processed every second:
>>> rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 1

>>> report = Stream.fold (Fold.drainMapM print) . rate

>>> tap = Stream.parTapCount (const True) report

>>> go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0
Note: This may not work correctly on 32-bit machines because of Int overflow. Pre-release
Deprecated: Please use parTapCount instead.
Given a continuation based transformation from a to b and a continuation based transformation from [b] to c, make continuation based transformation from [a] to c. Pre-release
Adds an orphan HashMap instance for the IsMap type class from streamly-core package. This is useful for various combinators that use a map type. We cannot define this in streamly-core as it adds several non-boot library dependencies on streamly-core.
Adapt any specific stream type to any other specific stream type. Since: 0.1.0 (Streamly)
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
>>> concatMap f = Stream.concatMapM (return . f)

>>> concatMap f = Stream.concatMapWith Stream.serial f

>>> concatMap f = Stream.concat . Stream.map f
A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream merge operation.
concatMapFoldableWith async return [1..3]
Equivalent to:
concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith) Since: 0.1.0 (Streamly)
Map a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike concatMap, it can produce an effect at the beginning of each iteration of the inner loop.
concatMapWith mixer generator stream is a two dimensional looping combinator. The generator function is used to generate streams from the elements in the input stream and the mixer function is used to merge those streams. Note we can merge streams concurrently by using a concurrent merge function. Since: 0.7.0 Since: 0.8.0 (signature change)
Like concatMapWith but carries a state which can be used to share information across multiple steps of concat.
concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial
Pre-release