streaming librarypersistentpipes-postgresql-simple by Oliver Charles
[…] P J Landin’s original use for streams was to model the histories of loop variables, but he also observed that streams could have been used as a model for I/O in ALGOL 60.
– A Survey Of Stream Processing, R. Stephens, 1995
Stream processing defines a pipeline of operators that transform, combine, or reduce (even to a single scalar) large amounts of data. Characteristically, data is accessed strictly linearly rather than randomly and repeatedly – and processed uniformly. The upside of the limited expressiveness is the opportunity to process large amount of data efficiently, in constant and small space.
– Oleg Kiselyov, http://okmij.org/ftp/Streams.html
… with apologies to James Iry
english -XAllowAmbiguousGrammariterateeenumeratorconduitpipesEnemies, as well as lovers, come to resemble each other over a period of time.
– Sydney J. Harris
machinesio-streamsquiverstreamingI’m doing a (free) stream processing library (just a hobby, won’t be big and professional like conduit) for Haskell. This has been brewing since august, and is starting to get ready. I’d like any feedback on things people like/dislike in pipes, as my library is an attempt to implement
FreeTin the style ofPipes.Internal.
It’s probably a terrible idea!
streamingis an attempt to implementFreeTin the style ofPipes.Internal, with a zillion more associated functions. There is a Prelude especially for the fundamental ‘Producer’ case -Stream ((,) a) m rand its iterations,Stream (Stream ((,)a) m) m r.
pipesdata Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure rconduitnewtype ConduitM i o m r = ConduitM
{ unConduitM :: forall b.
(r -> Pipe i i o () m b) -> Pipe i i o () m b }
data Pipe l i o u m r =
HaveOutput (Pipe l i o u m r) (m ()) o
| NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
| Done r
| PipeM (m (Pipe l i o u m r))
| Leftover (Pipe l i o u m r) lstreamingdata Stream f m r = Step !(f (Stream f m r))
| Effect (m (Stream f m r))
| Return rWait, that can’t be it, can it?
-- | A left-strict pair; the base functor
-- for streams of individual elements.
data Of a b = !a :> b
deriving (Functor)Stream (Of a) m r ≈ m ([a], r)f rather than Step !a (Stream f m r)?
Stream (Of a) m r is analogous to a Source or Producer in conduit and pipes
forall vs () vs Void confusion!streamingStreaming module operates on generic (Functor f); functions have unique names.Streaming.Prelude typically uses Of and should be imported qualified.
Streaming.import qualified Streaming.Prelude as S
import Text.Read (readMaybe)
double :: Int -> Int
double = (*2)
doubleLines :: IO ()
doubleLines = S.print
. S.map double
. S.mapMaybe readMaybe
. S.takeWhile (not . null)
$ S.repeatM getLinestreaming requires minimal mental switching.f vs OfS.map :: (Monad m) => (a -> b)
-> Stream (Of a) m r -> Stream (Of b) m r
S.mapM :: (Monad m) => (a -> m b)
-> Stream (Of a) m r -> Stream (Of b) m r
S.maps :: (Functor f, Monad m) => (forall x. f x -> g x)
-> Stream f m r -> Stream g m r
S.mapped :: (Functor f, Monad m) => (forall x. f x -> m (g x))
-> Stream f m r -> Stream g m rWhat if you want to handle parsing errors?
import qualified Streaming.Prelude as S
import Text.Read (readEither)
doubleLinesError :: IO ()
doubleLinesError = S.print
. S.map double
. S.stdoutLn
. S.map ("Not an Int: " ++)
. S.partitionEithers
. S.map readEither
. S.takeWhile (not . null)
$ S.repeatM getLineS.partitionEithers :: (Monad m)
=> Stream (Of (Either a b)) m r
-> Stream (Of a) (Stream (Of b) m) rStream f m (Stream f m r)
Stream f (Stream g m) r
Stream (Stream f m) m r
Stream (Of (Stream f m v)) m r
Stream (ByteString m) m r
streaming-bytestring.-- | Once authenticated, send the data through to the API.
sendData :: Client UploadAPI -> Options -> IO ()
sendData f opts =
withBinaryFileContents (dataFile opts) $
withErr
. withClientErrors -- Handle errors from sending the data
. tryStreamData f opts -- Send data to API
. withClientErrors -- Handle errors from parsing the CSV
. transformData -- Convert the CSV values to what we need
. decodeByName -- Convert the file contents into DBData
where
-- | If some high-level CSV parsing exception occurs, print it.
withErr :: ExceptT CsvParseException IO () -> IO ()
withErr = (either (liftIO . print) return =<<) . runExceptT-- | Take a stream of values and convert it into a stream of streams,
-- each of which has no two values with the same result of the
-- provided function.
disjoint :: forall a b m r. (Eq b, Hashable b, Monad m)
=> (a -> b) -> Stream (Of a) m r
-> Stream (Stream (Of a) m) m r
disjoint f = loop
where
-- Keep finding disjoint streams until the stream is exhausted.
loop stream = S.effect $ do
e <- S.next stream
return $ case e of
Left r -> return r
Right (a, stream') -> S.wrap $
loop <$> (S.yield a *> nextDisjoint (f a) stream')
-- Get the next disjoint stream; i.e. split the stream when the
-- first duplicate value is found.
--
-- Provided is an initial seed for values to be compared against.
nextDisjoint :: b -> Stream (Of a) m r
-> Stream (Of a) m (Stream (Of a) m r)
nextDisjoint initB = S.breakWhen step (False, HS.singleton initB)
fst id
where
-- breakWhen does the test /after/ the step, so we use an
-- extra boolean to denote if it's broken.
-- PRECONDITION: before calling set, the boolean is True
step (_, set) a = (HS.member b set, HS.insert b set)
where
b = f aResourceT supportbracket/withXXX/continuation idiomstreaming-with
|
|
-- | Treat a 'Conduit' as a function between 'Stream's.
asStream :: (Monad m) => Conduit i m o
-> Stream (Of i) m () -> Stream (Of o) m ()
-- | Treat a function between 'Stream's as a 'Conduit'.
asConduit :: (Monad m)
=> (Stream (Of i) m () -> Stream (Of o) m r)
-> ConduitM i o m r