Stream Processing!
… no, not parallel programming …
… and not just for Haskell …
[…] P J Landin’s original use for streams was to model the histories of loop variables, but he also observed that streams could have been used as a model for I/O in ALGOL 60.
– A Survey Of Stream Processing, R. Stephens, 1995
Stream processing defines a pipeline of operators that transform, combine, or reduce (even to a single scalar) large amounts of data. Characteristically, data is accessed strictly linearly rather than randomly and repeatedly – and processed uniformly. The upside of the limited expressiveness is the opportunity to process large amount of data efficiently, in constant and small space.
– Oleg Kiselyov, http://okmij.org/ftp/Streams.html
iteratee
(John Lato, 2009 – 2014)enumerator
(John Millikin, 2010 – 2011)conduit
(Michael Snoyman, 2011 –)pipes
(Gabriel Gonzalez, 2012 –)machines
(Edward Kmett, 2012 –)io-streams
(Gregory Collins, 2013 –)quiver
(Patryk Zadarnowski, 2015)streaming
(Michael Thompson, 2015 –)streamly
(Harendra Kumar, 2017 –)streaming
?Stream (Of a) m r ≈ m ([a], r)
-- | Once authenticated, send the data through to the API.
sendData :: Client UploadAPI -> Options -> IO ()
sendData f opts =
withBinaryFileContents (dataFile opts) $
withErr
. withClientErrors -- Handle errors from sending the data
. tryStreamData f opts -- Send data to API
. withClientErrors -- Handle errors from parsing the CSV
. transformData -- Convert the CSV values to what we need
. decodeByName -- Convert the file contents into DBData
where
-- | If some high-level CSV parsing exception occurs, print it.
withErr :: ExceptT CsvParseException IO () -> IO ()
withErr = (either (liftIO . print) return =<<) . runExceptT
-- | Take a stream of values and convert it into a stream of streams,
-- each of which has no two values with the same result of the
-- provided function.
disjoint :: forall a b m r. (Eq b, Hashable b, Monad m)
=> (a -> b) -> Stream (Of a) m r
-> Stream (Stream (Of a) m) m r
disjoint f = loop
where
-- Keep finding disjoint streams until the stream is exhausted.
loop stream = S.effect $ do
e <- S.next stream
return $ case e of
Left r -> return r
Right (a, stream') -> S.wrap $
loop <$> (S.yield a *> nextDisjoint (f a) stream')
-- Get the next disjoint stream; i.e. split the stream when the
-- first duplicate value is found.
--
-- Provided is an initial seed for values to be compared against.
nextDisjoint :: b -> Stream (Of a) m r
-> Stream (Of a) m (Stream (Of a) m r)
nextDisjoint initB = S.breakWhen step (False, HS.singleton initB)
fst id
where
-- breakWhen does the test /after/ the step, so we use an
-- extra boolean to denote if it's broken.
-- PRECONDITION: before calling set, the boolean is True
step (_, set) a = (HS.member b set, HS.insert b set)
where
b = f a
Look at that convenient README!
Time for some basic Haskell!
… does anyone want to do this?
Stream
type
What’s with that f
in Stream f m r
?
Stream
can take any functorStream
s can contain other Stream
sStream
s compare to pipes and conduits.Stream f m (Stream f m r)
Stream f (Stream g m) r
Stream (Stream f m) m r
Stream (Of (Stream f m v)) m r
Stream (ByteString m) m r
streaming-bytestring
.Stream
s in different ways.Including the infamous infinite Fibonacci definition!
Stream
s are list-like.The whole reason we started this!
Stream
s.Try to actually do something with what you’ve (hopefully) learnt today!