I Streamed a Stream

Ivan Lazar Miljenovic

25 October, 2017

The streaming library

About the library

  • Unfortunately named
  • Performs well compared to the competition
  • Whilst being a lot simpler!

Making the transition

  • Introduced to quiver by Patryk Zadarnowski
  • Adopted a package using pipes
    • Converted to quiver as I needed return values
  • New project: let’s pick a library that’s actually used by others.
    • Needed PostgreSQL support

Need PostgreSQL support

  • Conduit: all solutions used persistent
  • pipes-postgresql-simple by Oliver Charles
    • Wait, it’s deprecated?
    • What’s this “streaming” library?

Stream Processing

[…] P J Landin’s original use for streams was to model the histories of loop variables, but he also observed that streams could have been used as a model for I/O in ALGOL 60.

A Survey Of Stream Processing, R. Stephens, 1995

Stream processing defines a pipeline of operators that transform, combine, or reduce (even to a single scalar) large amounts of data. Characteristically, data is accessed strictly linearly rather than randomly and repeatedly – and processed uniformly. The upside of the limited expressiveness is the opportunity to process large amount of data efficiently, in constant and small space.

Oleg Kiselyov, http://okmij.org/ftp/Streams.html

A Brief, Incomplete, and Mostly Wrong History of Stream Processing

… with apologies to James Iry

Pre-History (aka pre-2000)

  • Lazy I/O
  • People grumbled, but accepted it.
  • After all: “avoid success at all costs”.
  • english -XAllowAmbiguousGrammar

Iteratees

  • By Oleg Kiselyov
  • Dates back to ~2008
  • Everyone praised it for its amazing promise…
  • … but being by Oleg, hardly anyone understood how it worked

iteratee

  • Implementation of Iteratees by John Lato
  • 2009 – 2014 (with Hackage update in 2016)

enumerator

  • Alternative implementation of Iteratees by John Millikin
  • 2010 – 2011
  • Often referred to as an “iteratee” still

conduit

  • By Michael Snowman
  • First released 2011, still active
  • Declared as a more industrial/production-grade solution than pipes
    • Without defining either term
    • Even though pipes didn’t yet exist

pipes

  • By Gabriel Gonzalez
  • First released in 2012, still active
  • Tried to validate anti-Haskeller’s opinions of “too much maths” by basing it on Category Theory.

The great conduit-pipes war

  • Fought primarily on the battlefields of Twitter and Reddit
  • Devolved into “best frenemies” situation
  • Enemies, as well as lovers, come to resemble each other over a period of time.

    Sydney J. Harris

machines

  • By Edward Kmett
    • As such, lots of great ideas with minimal documentation
  • First released in 2012, still active
  • “Networked stream transducers”
  • Allows for multiple inputs
  • Ceci n’est pas une pipe

io-streams

  • By Gregory Collins
  • First released in 2013, still active
  • Aims to be simpler (e.g. no monad transformers) than the others
  • Developed for use with Snap Framework

quiver

  • By Patryk Zadarnowski
  • All releases in 2015
  • What you get when you say “You know what pipes needs? More complexity!”
  • Return values actually useful!

streaming

  • By Michael Thompson
    • Now maintained by Andrew Martin and a GitHub organization
  • First released in 2015
  • “The freely generated stream on a streamable functor”

Streaming Announcement

I’m doing a (free) stream processing library (just a hobby, won’t be big and professional like conduit) for Haskell. This has been brewing since august, and is starting to get ready. I’d like any feedback on things people like/dislike in pipes, as my library is an attempt to implement FreeT in the style of Pipes.Internal.

Actual Streaming Announcement

It’s probably a terrible idea!

streaming is an attempt to implement FreeT in the style of Pipes.Internal, with a zillion more associated functions. There is a Prelude especially for the fundamental ‘Producer’ case - Stream ((,) a) m r and its iterations, Stream (Stream ((,)a) m) m r.

Compare the types

pipes

data Proxy a' a b' b m r
    = Request a' (a  -> Proxy a' a b' b m r )
    | Respond b  (b' -> Proxy a' a b' b m r )
    | M          (m    (Proxy a' a b' b m r))
    | Pure    r

conduit

newtype ConduitM i o m r = ConduitM
  { unConduitM :: forall b.
     (r -> Pipe i i o () m b) -> Pipe i i o () m b }

data Pipe l i o u m r =
    HaveOutput (Pipe l i o u m r) (m ()) o
  | NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
  | Done r
  | PipeM (m (Pipe l i o u m r))
  | Leftover (Pipe l i o u m r) l

streaming

data Stream f m r = Step !(f (Stream f m r))
                  | Effect (m (Stream f m r))
                  | Return r

Wait, that can’t be it, can it?

Outputting values

-- | A left-strict pair; the base functor
--   for streams of individual elements.
data Of a b = !a :> b
  deriving (Functor)
Stream (Of a) m r ≈ m ([a], r)
  • Why f rather than Step !a (Stream f m r)?
    • Patience!

What this means

  • Stream (Of a) m r is analogous to a Source or Producer in conduit and pipes
    • No bidirectional support, but how often is that needed?
  • No need for forall vs () vs Void confusion!
  • All other stream processors represent how to transform input to output.
    • Streaming instead uses normal functions and function composition!

Using streaming

Modules

  • Streaming module operates on generic (Functor f); functions have unique names.
  • Streaming.Prelude typically uses Of and should be imported qualified.
    • Re-exports Streaming.

Example

import qualified Streaming.Prelude as S
import           Text.Read            (readMaybe)

double :: Int -> Int
double = (*2)

doubleLines :: IO ()
doubleLines = S.print
              . S.map double
              . S.mapMaybe readMaybe
              . S.takeWhile (not . null)
              $ S.repeatM getLine

Minimal tutorials required!

  • Notice the lack of streaming-specific operators!
  • If you’re used to Haskell and using lists, then using streaming requires minimal mental switching.
  • Though there are a few “gotchas”…
  • … that actually reflect the power of what it provides.

f vs Of

S.map :: (Monad m) => (a -> b)
         -> Stream (Of a) m r -> Stream (Of b) m r

S.mapM :: (Monad m) => (a -> m b)
          -> Stream (Of a) m r -> Stream (Of b) m r

S.maps :: (Functor f, Monad m) => (forall x. f x -> g x)
          -> Stream f m r -> Stream g m r

S.mapped :: (Functor f, Monad m) => (forall x. f x -> m (g x))
            -> Stream f m r -> Stream g m r

Example redux

What if you want to handle parsing errors?

import qualified Streaming.Prelude as S
import           Text.Read            (readEither)

doubleLinesError :: IO ()
doubleLinesError = S.print
                   . S.map double
                   . S.stdoutLn
                   . S.map ("Not an Int: " ++)
                   . S.partitionEithers
                   . S.map readEither
                   . S.takeWhile (not . null)
                   $ S.repeatM getLine

Pop Quiz

Will this:

  1. Print all error cases first?
  2. Print error cases as they occur?

What black magic is this?

S.partitionEithers :: (Monad m)
                      => Stream (Of (Either a b)) m r
                      -> Stream (Of a) (Stream (Of b) m) r

Streams of Streams

  • Stream f m (Stream f m r)
    • Leftovers (ala Conduit)
  • Stream f (Stream g m) r
    • A stream created as a Monadic effect
  • Stream (Stream f m) m r
    • An actual stream of streams (e.g. grouping).
  • Stream (Of (Stream f m v)) m r
    • Don’t think this is useful
  • Stream (ByteString m) m r
    • Using streaming-bytestring.

Production Example

-- | Once authenticated, send the data through to the API.
sendData :: Client UploadAPI -> Options -> IO ()
sendData f opts =
  withBinaryFileContents (dataFile opts) $
    withErr
    . withClientErrors     -- Handle errors from sending the data
    . tryStreamData f opts -- Send data to API
    . withClientErrors     -- Handle errors from parsing the CSV
    . transformData        -- Convert the CSV values to what we need
    . decodeByName         -- Convert the file contents into DBData
  where
    -- | If some high-level CSV parsing exception occurs, print it.
    withErr :: ExceptT CsvParseException IO () -> IO ()
    withErr = (either (liftIO . print) return =<<) . runExceptT

Manual Streaming

-- | Take a stream of values and convert it into a stream of streams,
--   each of which has no two values with the same result of the
--   provided function.
disjoint :: forall a b m r. (Eq b, Hashable b, Monad m)
            => (a -> b) -> Stream (Of a) m r
            -> Stream (Stream (Of a) m) m r
disjoint f = loop
  where
    -- Keep finding disjoint streams until the stream is exhausted.
    loop stream = S.effect $ do
      e <- S.next stream
      return $ case e of
                 Left r -> return r
                 Right (a, stream') -> S.wrap $
                   loop <$> (S.yield a *> nextDisjoint (f a) stream')

    -- Get the next disjoint stream; i.e. split the stream when the
    -- first duplicate value is found.
    --
    -- Provided is an initial seed for values to be compared against.
    nextDisjoint :: b -> Stream (Of a) m r
                    -> Stream (Of a) m (Stream (Of a) m r)
    nextDisjoint initB = S.breakWhen step (False, HS.singleton initB)
                                     fst id
      where
        -- breakWhen does the test /after/ the step, so we use an
        -- extra boolean to denote if it's broken.
        -- PRECONDITION: before calling set, the boolean is True
        step (_, set) a = (HS.member b set, HS.insert b set)
          where
            b = f a

How does it compare?

Resource Handling

  • Currently has ResourceT support
  • Found not to work very well in practice
  • Consensus on using bracket/withXXX/continuation idiom
  • My solution: streaming-with
    • Helper class to make it easier to write and use brackets

Available Packages

streaming streaming-utils streaming-wai streaming-png streaming-osm streaming-with streaming-binary

streaming-bytestring streaming-postgresql-simple streaming-conduit streaming-eversion streaming-cassava streaming-concurrency streaming-process

Usage

  • Not used in as many packages as conduit or pipes.
  • It is used in Sparkle by Tweag though.

Just remember

  • Pipes, Conduits, etc. are “functions” on how to transform inputs to outputs
  • Streams are just how to get values.
-- | Treat a 'Conduit' as a function between 'Stream's.
asStream :: (Monad m) => Conduit i m o
            -> Stream (Of i) m () -> Stream (Of o) m ()

-- | Treat a function between 'Stream's as a 'Conduit'.
asConduit :: (Monad m)
             => (Stream (Of i) m () -> Stream (Of o) m r)
             -> ConduitM i o m r

Stream on!