streaming
librarypersistent
pipes-postgresql-simple
by Oliver Charles
[…] P J Landin’s original use for streams was to model the histories of loop variables, but he also observed that streams could have been used as a model for I/O in ALGOL 60.
– A Survey Of Stream Processing, R. Stephens, 1995
Stream processing defines a pipeline of operators that transform, combine, or reduce (even to a single scalar) large amounts of data. Characteristically, data is accessed strictly linearly rather than randomly and repeatedly – and processed uniformly. The upside of the limited expressiveness is the opportunity to process large amount of data efficiently, in constant and small space.
– Oleg Kiselyov, http://okmij.org/ftp/Streams.html
… with apologies to James Iry
english -XAllowAmbiguousGrammar
iteratee
enumerator
conduit
pipes
Enemies, as well as lovers, come to resemble each other over a period of time.
– Sydney J. Harris
machines
io-streams
quiver
streaming
I’m doing a (free) stream processing library (just a hobby, won’t be big and professional like conduit) for Haskell. This has been brewing since august, and is starting to get ready. I’d like any feedback on things people like/dislike in pipes, as my library is an attempt to implement
FreeT
in the style ofPipes.Internal
.
It’s probably a terrible idea!
streaming
is an attempt to implementFreeT
in the style ofPipes.Internal
, with a zillion more associated functions. There is a Prelude especially for the fundamental ‘Producer’ case -Stream ((,) a) m r
and its iterations,Stream (Stream ((,)a) m) m r
.
pipes
data Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure r
conduit
newtype ConduitM i o m r = ConduitM
{ unConduitM :: forall b.
(r -> Pipe i i o () m b) -> Pipe i i o () m b }
data Pipe l i o u m r =
HaveOutput (Pipe l i o u m r) (m ()) o
| NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
| Done r
| PipeM (m (Pipe l i o u m r))
| Leftover (Pipe l i o u m r) l
streaming
data Stream f m r = Step !(f (Stream f m r))
| Effect (m (Stream f m r))
| Return r
Wait, that can’t be it, can it?
-- | A left-strict pair; the base functor
-- for streams of individual elements.
data Of a b = !a :> b
deriving (Functor)
Stream (Of a) m r ≈ m ([a], r)
f
rather than Step !a (Stream f m r)
?
Stream (Of a) m r
is analogous to a Source
or Producer
in conduit and pipes
forall
vs ()
vs Void
confusion!streaming
Streaming
module operates on generic (Functor f)
; functions have unique names.Streaming.Prelude
typically uses Of
and should be imported qualified.
Streaming
.import qualified Streaming.Prelude as S
import Text.Read (readMaybe)
double :: Int -> Int
double = (*2)
doubleLines :: IO ()
doubleLines = S.print
. S.map double
. S.mapMaybe readMaybe
. S.takeWhile (not . null)
$ S.repeatM getLine
streaming
requires minimal mental switching.f
vs Of
S.map :: (Monad m) => (a -> b)
-> Stream (Of a) m r -> Stream (Of b) m r
S.mapM :: (Monad m) => (a -> m b)
-> Stream (Of a) m r -> Stream (Of b) m r
S.maps :: (Functor f, Monad m) => (forall x. f x -> g x)
-> Stream f m r -> Stream g m r
S.mapped :: (Functor f, Monad m) => (forall x. f x -> m (g x))
-> Stream f m r -> Stream g m r
What if you want to handle parsing errors?
import qualified Streaming.Prelude as S
import Text.Read (readEither)
doubleLinesError :: IO ()
doubleLinesError = S.print
. S.map double
. S.stdoutLn
. S.map ("Not an Int: " ++)
. S.partitionEithers
. S.map readEither
. S.takeWhile (not . null)
$ S.repeatM getLine
S.partitionEithers :: (Monad m)
=> Stream (Of (Either a b)) m r
-> Stream (Of a) (Stream (Of b) m) r
Stream f m (Stream f m r)
Stream f (Stream g m) r
Stream (Stream f m) m r
Stream (Of (Stream f m v)) m r
Stream (ByteString m) m r
streaming-bytestring
.-- | Once authenticated, send the data through to the API.
sendData :: Client UploadAPI -> Options -> IO ()
sendData f opts =
withBinaryFileContents (dataFile opts) $
withErr
. withClientErrors -- Handle errors from sending the data
. tryStreamData f opts -- Send data to API
. withClientErrors -- Handle errors from parsing the CSV
. transformData -- Convert the CSV values to what we need
. decodeByName -- Convert the file contents into DBData
where
-- | If some high-level CSV parsing exception occurs, print it.
withErr :: ExceptT CsvParseException IO () -> IO ()
withErr = (either (liftIO . print) return =<<) . runExceptT
-- | Take a stream of values and convert it into a stream of streams,
-- each of which has no two values with the same result of the
-- provided function.
disjoint :: forall a b m r. (Eq b, Hashable b, Monad m)
=> (a -> b) -> Stream (Of a) m r
-> Stream (Stream (Of a) m) m r
disjoint f = loop
where
-- Keep finding disjoint streams until the stream is exhausted.
loop stream = S.effect $ do
e <- S.next stream
return $ case e of
Left r -> return r
Right (a, stream') -> S.wrap $
loop <$> (S.yield a *> nextDisjoint (f a) stream')
-- Get the next disjoint stream; i.e. split the stream when the
-- first duplicate value is found.
--
-- Provided is an initial seed for values to be compared against.
nextDisjoint :: b -> Stream (Of a) m r
-> Stream (Of a) m (Stream (Of a) m r)
nextDisjoint initB = S.breakWhen step (False, HS.singleton initB)
fst id
where
-- breakWhen does the test /after/ the step, so we use an
-- extra boolean to denote if it's broken.
-- PRECONDITION: before calling set, the boolean is True
step (_, set) a = (HS.member b set, HS.insert b set)
where
b = f a
ResourceT
supportbracket
/withXXX
/continuation idiomstreaming-with
|
|
-- | Treat a 'Conduit' as a function between 'Stream's.
asStream :: (Monad m) => Conduit i m o
-> Stream (Of i) m () -> Stream (Of o) m ()
-- | Treat a function between 'Stream's as a 'Conduit'.
asConduit :: (Monad m)
=> (Stream (Of i) m () -> Stream (Of o) m r)
-> ConduitM i o m r