Streaming

neotypes allows to stream large results by lazily consuming the result and putting elements into a stream. Currently, there are four supported implementations (one for each effect type). Akka Streams (for scala.concurrent.Future), Pekko Streams (for scala.concurrent.Future), FS2 (for cats.effect.Async[F]), Monix Observables (for monix.eval.Task) & ZIO ZStreams (for zio.Task).

Usage

Akka Streams (neotypes-akka-stream)

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import neotypes.{GraphDatabase, StreamDriver}
import neotypes.akkastreams.AkkaStream
import neotypes.akkastreams.implicits._ // Brings the implicit Stream[AkkaStream] instance into the scope. // Brings the implicit Stream[AkkaStream] instance into the scope.
import neotypes.mappers.ResultMapper // Allows to decode query results. // Allows to decode query results.
import neotypes.syntax.all._ // Provides the query extension method. // Provides the query extension method.
import org.neo4j.driver.AuthTokens
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

implicit val system = ActorSystem("QuickStart")

val driver =
  GraphDatabase.streamDriver[AkkaStream]("bolt://localhost:7687", AuthTokens.basic("neo4j", "****"))

def query(driver: StreamDriver[AkkaStream, Future]): Source[String, NotUsed] =
  "MATCH (p: Person) RETURN p.name".query(ResultMapper.string).stream(driver)

val program: Future[Unit] = for {
  _ <- query(driver).runWith(Sink.foreach(println))
  _ <- driver.close
} yield ()

Await.ready(program, 5.seconds)

Pekko Streams (neotypes-pekko-stream)

import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import neotypes.{GraphDatabase, StreamDriver}
import neotypes.pekkostreams.PekkoStream
import neotypes.pekkostreams.implicits._ // Brings the implicit Stream[PekkoStream] instance into the scope. // Brings the implicit Stream[PekkoStream] instance into the scope.
import neotypes.mappers.ResultMapper // Allows to decode query results. // Allows to decode query results.
import neotypes.syntax.all._ // Provides the query extension method. // Provides the query extension method.
import org.neo4j.driver.AuthTokens
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

implicit val system = ActorSystem("QuickStart")

val driver =
  GraphDatabase.streamDriver[PekkoStream]("bolt://localhost:7687", AuthTokens.basic("neo4j", "****"))

def query(driver: StreamDriver[PekkoStream, Future]): Source[String, NotUsed] =
  "MATCH (p: Person) RETURN p.name".query(ResultMapper.string).stream(driver)

val program: Future[Unit] = for {
  _ <- query(driver).runWith(Sink.foreach(println))
  _ <- driver.close
} yield ()

Await.ready(program, 5.seconds)

FS2 (neotypes-fs2-stream)

With cats.effect.IO

import cats.effect.{IO, Resource}
import cats.effect.unsafe.implicits.global
import fs2.Stream
import neotypes.{GraphDatabase, StreamDriver}
import neotypes.cats.effect.implicits._ // Brings the implicit Async[IO] instance into the scope. // Brings the implicit Async[IO] instance into the scope.
import neotypes.fs2.Fs2IoStream
import neotypes.fs2.implicits._ // Brings the implicit Stream[Fs2IOStream] instance into the scope. // Brings the implicit Stream[Fs2IOStream] instance into the scope.
import neotypes.mappers.ResultMapper // Allows to decode query results. // Allows to decode query results.
import neotypes.syntax.all._ // Provides the query extension method. // Provides the query extension method.
import org.neo4j.driver.AuthTokens

val driverR: Resource[IO, StreamDriver[Fs2IoStream, IO]] =
  GraphDatabase.streamDriver[Fs2IoStream]("bolt://localhost:7687", AuthTokens.basic("neo4j", "****"))

val program: Stream[IO, Unit] =
  Stream.resource(driverR).flatMap { driver =>
    "MATCH (p: Person) RETURN p.name"
      .query(ResultMapper.string)
      .stream(driver)
      .foreach(IO.println)
  }

program.compile.drain.unsafeRunSync()

With other effect type

Basically the same code as above, but replacing IO with F (as long as there is an instance of cats.effect.Async[F]). And replacing the neotypes.fs2.Fs2IoStream type alias with neotypes.fs2.Fs2FStream[F]#T.

Monix Observables (neotypes-monix-stream)

import cats.effect.Resource
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import neotypes.{GraphDatabase, StreamDriver}
import neotypes.mappers.ResultMapper // Allows to decode query results.
import neotypes.monix.implicits._ // Brings the implicit Async[Task] instance into the scope.
import neotypes.monix.stream.MonixStream
import neotypes.monix.stream.implicits._ // Brings the implicit Stream[MonixStream] instance into the scope.
import neotypes.syntax.all._ // Provides the query extension method.
import org.neo4j.driver.AuthTokens
import scala.concurrent.duration._

val driverR: Resource[Task, StreamDriver[MonixStream, Task]] =
  GraphDatabase.streamDriver[MonixStream]("bolt://localhost:7687", AuthTokens.basic("neo4j", "****"))

val program: Observable[Unit] =
  Observable.fromResource(driverR).flatMap { driver =>
    "MATCH (p: Person) RETURN p.name"
      .query(ResultMapper.string)
      .stream(driver)
      .mapEval(n => Task(println(n)))
}

program.completedL.runSyncUnsafe(5.seconds)

ZIO ZStreams (neotypes-zio-stream)

import zio.{Runtime, Scope, Task, Unsafe, ZIO}
import zio.stream.ZStream
import neotypes.{GraphDatabase, StreamDriver}
import neotypes.mappers.ResultMapper // Allows to decode query results. // Allows to decode query results.
import neotypes.syntax.all._ // Provides the query extension method. // Provides the query extension method.
import neotypes.zio.implicits._ // Brings the implicit Async[Task] instance into the scope. // Brings the implicit Async[Task] instance into the scope.
import neotypes.zio.stream.ZioStream
import neotypes.zio.stream.implicits._ // Brings the implicit Stream[ZioStream] instance into the scope. // Brings the implicit Stream[ZioStream] instance into the scope.
import org.neo4j.driver.AuthTokens

val driverS: ZIO[Scope, Throwable, StreamDriver[ZioStream, Task]] =
  GraphDatabase.streamDriver[ZioStream]("bolt://localhost:7687", AuthTokens.basic("neo4j", "****"))

val program: ZStream[Scope, Throwable, String] =
  ZStream.fromZIO(driverS).flatMap { driver =>
    "MATCH (p: Person) RETURN p.name"
      .query(ResultMapper.string)
      .stream(driver)
  }


Unsafe.unsafe { implicit unsafe =>
  Runtime.default.unsafe.run {
    // ZStream.foreach returns a ZIO and we want to eliminate Scope from R
    ZIO.scoped {
      program.foreach(n => ZIO.succeed(println(n)))
    }
  }
}

Please note that the above provided type aliases are just for convenience.

You can always use:

  • Type lambdas:
val driver = GraphDatabase.streamDriver[({ type T[A] = fs2.Stream[IO, A] })#T](uri)
  • Type Alias:
type Fs2Stream[T] = fs2.Stream[IO, T]
val driver = GraphDatabase.streamDriver[Fs2Stream](uri)
val driver = GraphDatabase.streamDriver[fs2.Stream[IO, ?]](uri)

The code snippets above are lazily retrieving data from Neo4j, loading each element of the result only when it’s requested and commits the transaction once all elements are read.
This approach aims to improve performance and memory footprint with large volumes of data.

Transaction management

You have two options in transaction management:

  • Manual: if you use neotypes.Transaction and call stream, you need to ensure that the transaction is gracefully closed after reading is finished.
  • Auto-closing: you may wish to automatically rollback the transaction once all elements are consumed. This behavior is provided by DeferredQuery.stream(StreamDriver[S, F])

Alternative stream implementations

If you don’t see your stream supported. You can add your implementation of neotypes.Stream.Aux[S[_], F[_]] typeclass. And add it to the implicit scope.

The type parameters in the signature indicate:

  • F[_] - the effect that will be used to produce each element retrieval.
  • S[_] - the type of your stream.