Concurrent State Machines with Cats Effect

Adam Rosien @arosien

Inner Product LLC @InnerProductLLC

ScalaCon, 2 Nov 2021

inner product logo with url 320

Don’t panic!

happyautomata 1
amanita muscaria

amarita muscaria
don’t eat this! ☠️


val x = ???
val y = ???

).parTupled // run in parallel

What can we say about which effects happen first?


Two events are
if we cannot tell by
looking at the program
which will happen first.
— The Little Book of Semaphores

you know which will happen first

you know the sequence

events are sequential



How do they coordinate their behavior?


Coordination: queue


Normal case: producers offer items, consumers take them.

Producers are blocked from offering items when the queue is full.

Consumers are blocked from taking items when the queue is empty.

Coordination: components


producers offering items vs. consumers taking items


clients requesting from (potentially failing) servers


lockers race for exclusive access


waiters vs. notifiers


group members await each other being ready, then proceed

and more…​.

Concurrent State Machines with Cats Effect

How to build
a coordinating component
with a concurrent state machine
using Cats Effect.

  1. Synchronization ☜ conceptual background

  2. Synchronization with Cats Effect ☜ practical stuff

  3. Concurrent state machines ☜ the big idea

Many thanks to 🎉Fabio Labella🎉, who taught this idea to many people.

1. Synchronization

happyautomata 3


Computer programmers are often concerned with
synchronization constraints,
which are requirements pertaining to
the order of events.
— The Little Book of Semaphores

That is, coordination in time.

  • etc.

Kinds of synchronization

Two concurrent effects
are mutually-exclusive
if they
must not happen at the same time.

Two concurrent effects
are serialized
if one must
happen before
the other.


var x = 0

// thread 1
x =

     x + 1
// thread 2
x = x + 1

x gets set to 1, but should be 2!



effects not allowed to

  • synchronized [sic]

  • volatile

  • AtomicReference[A]

  • ???



one effect must another


If x cannot start y,
y must wait for a message from x to start.

Synchronization: summary

  • concurrent: when you can’t tell from looking at the program which will happen first

  • coordination: controlling the behavior of concurrent components

  • synchronization constraints: requirements about the order of events

    • mutual exclusion: events not allowed to happen at the same time

      avoids: non-determinism of concurrent writes; lost updates due to non-determinism of concurrent updates

    • serialization: one event must happen before another

2. Synchronization with Cats Effect

happyautomata 2

Cats Effect

Cats Effect is a
framework for building
real-world applications
in a purely functional style
within the Typelevel ecosystem.

cats effect logo

Cats Effect 3 ()

Synchronization in Cats Effect

Mutual exclusion

Ref[F[_], A]

An asynchronous,
mutable reference.


Deferred[F[_], A]

A purely functional
synchronization primitive
which represents a
single value
which may not yet be available.

Atomic updates with Ref

def run(args: List[String]): IO[ExitCode] =
  for {
    counter <- Ref[IO].of(0L) (1)
    _ <- (
      worker(counter), worker(counter),
    ).parTupled (2)
  } yield ExitCode.Success
1Create a Ref that holds a Long value, initialized to 0.
2In parallel, start a number of workers that all update the shared counter, and another effect that prints the counter’s value on an interval.

Atomic updates with Ref

def worker(counter: Ref[IO, Long]): IO[Unit] =
  for {
    _ <- counter.update(_ + 1)
    _ <- worker(counter)
  } yield ()
def printCounter(counter: Ref[IO, Long]): IO[Unit] =
  for {
    _ <- IO.sleep(5.seconds)
    n <- counter.get
    _ <- IO.println(s"counter: $n")
    _ <- printCounter(counter)
  } yield ()

Atomic updates with Ref

trait Ref[F[_], A] {
  def get(): F[A]

  def set(value: A): F[Unit]
  def getAndSet(value: A): F[A]

  def update(f: A => A): F[Unit]
  def getAndUpdate(f: A => A): F[A]
  def updateAndGet(f: A => A): F[A]

  def modify[B](f: A => (A, B)): F[B]

the type F could be IO, for example

Write-once effect serialization with Deferred

trait Deferred[F[_], A] {
  def get(): F[A] (1)
  def complete(a: A): F[Unit] (2)
1Blocks until the value is produced.
2Unblocks any waiters, providing the given value to them.

the type F could be IO, for example

Synchronization with Cats Effect: summary

  • Cats Effect provides two primitives for synchronization, Ref and Deferred.

  • Ref effectfully manages a value that can be read, written, and updated concurrently (mutual exclusion).

  • Deferred provides a write-once value that may not be available yet. Other effects can be serialized to happen after the value arrives.

  • Effects from Cats Effect are safe: they are referentially transparent. This means they compose, reducing the number of bugs you can encounter and making your code easier to understand.

3. Concurrent state machines

happyautomata 4

The big idea

Ref + Deferred + 🧠

concurrent state machine

coordination component


State machines

(S, A) ⇒ (S, B)

a.k.a, objects!

class Machine(var s: S) {
  def act(a: A): B

Concurrent state machines

(S, A) ⇒ (S, B)

- class Machine(var s: S) {
+ class Machine(s: AtomicReference[S]) {
    def act(a: A): B

avoids lost updates of s under concurrency

Concurrent state machines with effects

(S, A) ⇒ F[(S, B)]

- class Machine(s: AtomicReference[S]) {
+ class Machine(s: Ref[S]) {
-   def act(a: A): B
+   def act(a: A): F[B]

the type F could be IO, for example

Concurrent state machines: the recipe

  1. Define the coordination interface:

    1. enumerate the roles of the coordinating components, along with each role’s methods; and

    2. define the behavior of each method, which may be state-dependent.

  2. Implement the interface by building a state machine where:

    1. we model the state as an algebraic data type S;

    2. we manage the state as a Ref value;

    3. each interface method is implemented as:

      1. a state transition function affecting the Ref; and

      2. any state-dependent blocking behavior is controlled via Deferred values.

Concurrent state machines: the recipe


Example: countdown latch

Ensure n events happen before subsequent events.

CountdownLatch(3).flatMap { latch =>
  val waiter = latch.get >> IO.println("Finally!")
  val notifier = IO.println("Go!") >> latch.decrement

    notifier, notifier, notifier,
    waiter, waiter


Example: countdown latch

  1. Define the coordination interface:

    1. enumerate the roles of the coordinating components, along with each role’s methods; and

    2. define the behavior of each method, which may be state-dependent.


block until latch opens


notifies the latch that work is complete

trait CountdownLatch {
  def await(): IO[Unit] (1)
  def decrement(): IO[Unit] (2)
1Block until the latch opens.
2Decrement the counter and open the latch if 0.

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. we model the state as an algebraic data type S;

sealed trait State

object State {
  case class Outstanding(
    n: Int, (1)
    whenDone: Deferred[IO, Unit] (2)
  ) extends State
  case class Done() extends State
1Number of notifications we are waiting for.
2When n reaches 0, we complete the Deferred.

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. we manage the state as a Ref value;

object CountdownLatch {
  def apply(n: Long): IO[CountdownLatch] =
    for {
      whenDone <- Deferred[IO, Unit]
      state <- Ref[IO].of[State](Outstanding(n, whenDone))
    } yield ???

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. each interface method is implemented as:

      1. a state transition function affecting the Ref; and

      2. any state-dependent blocking behavior is controlled via Deferred values.

def await: IO[Unit] =
  state.get.flatMap {
    case Outstanding(_, whenDone) =>
    case Done() =>

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. each interface method is implemented as:

      1. a state transition function affecting the Ref; and

      2. any state-dependent blocking behavior is controlled via Deferred values.

def decrement: IO[Unit] = state.modify {
  case Outstanding(1, whenDone) =>
    Done() -> whenDone.complete(())
  case Outstanding(n, whenDone) =>
    Outstanding(n - 1, whenDone) -> IO.unit
  case Done() =>
    Done() -> IO.unit

Example: countdown latch

We did it! 🎉

trait CountdownLatch {
  def await(): IO[Unit]
  def decrement(): IO[Unit]

CountdownLatch(3).flatMap { latch =>
  val waiter = latch.get >> IO.println("Finally!")
  val notifier = IO.println("Go!") >> latch.decrement

    notifier, notifier, notifier,
    waiter, waiter

Summary and next steps

happyautomata 5


  • Concurrent coordination can be provided by synchronization: constraints on the ordering of events.

  • Concurrent mediators manage and enforce those constraints among the coordinating components.

  • We can build coordinating interfaces using a concurrent state machine, composed of standard primitives, which decompose the coordination problem into cases, easing implementation.

Take a look at the source of many Cats-Effect-based libraries: you’ll better understand how they work! This technique is used everywhere.

Concurrent State Machines with Cats Effect

Adam Rosien @arosien
Inner Product LLC @InnerProductLLC

Hire us to help with your projects, or train your staff!

inner product logo with url 320