Concurrent State Machines with Cats Effect

Adam Rosien @arosien

Inner Product LLC @InnerProductLLC

ScalaCon, 2 Nov 2021

inner product logo with url 320

Don’t panic!

happyautomata 1
amanita muscaria

amarita muscaria
don’t eat this! ☠️

Concurrency

Diagram
val x = ???
val y = ???

(
  x.doStuff(),
  y.doOtherStuff()
).parTupled // run in parallel

What can we say about which effects happen first?

Concurrency

Two events are
concurrent
if we cannot tell by
looking at the program
which will happen first.
— The Little Book of Semaphores

you know which will happen first


you know the sequence


events are sequential

Coordination

coordination

How do they coordinate their behavior?

Diagram

Coordination: queue

Diagram

Normal case: producers offer items, consumers take them.

Producers are blocked from offering items when the queue is full.

Consumers are blocked from taking items when the queue is empty.

Coordination: components

queue

producers offering items vs. consumers taking items

circuit-breaker

clients requesting from (potentially failing) servers

lock

lockers race for exclusive access

latch

waiters vs. notifiers

barrier

group members await each other being ready, then proceed

and more…​.

Concurrent State Machines with Cats Effect

How to build
a coordinating component
with a concurrent state machine
using Cats Effect.

  1. Synchronization ☜ conceptual background

  2. Synchronization with Cats Effect ☜ practical stuff

  3. Concurrent state machines ☜ the big idea

Many thanks to 🎉Fabio Labella🎉, who taught this idea to many people.

1. Synchronization

happyautomata 3

Synchronization

Computer programmers are often concerned with
synchronization constraints,
which are requirements pertaining to
the order of events.
— The Little Book of Semaphores

That is, coordination in time.

  • etc.

Kinds of synchronization

Two concurrent effects
are mutually-exclusive
if they
must not happen at the same time.

Two concurrent effects
are serialized
if one must
happen before
the other.

Mutual-exclusion

var x = 0

// thread 1
x =


     x + 1
//
//
//
//
// thread 2
x = x + 1

x gets set to 1, but should be 2!

Mutual-exclusion

mutual-exclusion

effects not allowed to

  • synchronized [sic]

  • volatile

  • AtomicReference[A]

  • ???

Serialization

serialization

one effect must another

Diagram

If x cannot start y,
y must wait for a message from x to start.

Synchronization: summary

  • concurrent: when you can’t tell from looking at the program which will happen first

  • coordination: controlling the behavior of concurrent components

  • synchronization constraints: requirements about the order of events

    • mutual exclusion: events not allowed to happen at the same time

      avoids: non-determinism of concurrent writes; lost updates due to non-determinism of concurrent updates

    • serialization: one event must happen before another

2. Synchronization with Cats Effect

happyautomata 2

Cats Effect

Cats Effect is a
high-performance,
asynchronous,
composable
framework for building
real-world applications
in a purely functional style
within the Typelevel ecosystem.

cats effect logo

Cats Effect 3 ()

Synchronization in Cats Effect

Mutual exclusion

Ref[F[_], A]

An asynchronous,
concurrent
mutable reference.

Serialization

Deferred[F[_], A]

A purely functional
synchronization primitive
which represents a
single value
which may not yet be available.

Atomic updates with Ref

def run(args: List[String]): IO[ExitCode] =
  for {
    counter <- Ref[IO].of(0L) (1)
    _ <- (
      worker(counter), worker(counter),
      printCounter(counter)
    ).parTupled (2)
  } yield ExitCode.Success
1Create a Ref that holds a Long value, initialized to 0.
2In parallel, start a number of workers that all update the shared counter, and another effect that prints the counter’s value on an interval.

Atomic updates with Ref

def worker(counter: Ref[IO, Long]): IO[Unit] =
  for {
    _ <- counter.update(_ + 1)
    _ <- worker(counter)
  } yield ()
def printCounter(counter: Ref[IO, Long]): IO[Unit] =
  for {
    _ <- IO.sleep(5.seconds)
    n <- counter.get
    _ <- IO.println(s"counter: $n")
    _ <- printCounter(counter)
  } yield ()

Atomic updates with Ref

trait Ref[F[_], A] {
  def get(): F[A]

  def set(value: A): F[Unit]
  def getAndSet(value: A): F[A]

  def update(f: A => A): F[Unit]
  def getAndUpdate(f: A => A): F[A]
  def updateAndGet(f: A => A): F[A]

  def modify[B](f: A => (A, B)): F[B]
}

the type F could be IO, for example

Write-once effect serialization with Deferred

trait Deferred[F[_], A] {
  def get(): F[A] (1)
  def complete(a: A): F[Unit] (2)
}
1Blocks until the value is produced.
2Unblocks any waiters, providing the given value to them.

the type F could be IO, for example

Synchronization with Cats Effect: summary

  • Cats Effect provides two primitives for synchronization, Ref and Deferred.

  • Ref effectfully manages a value that can be read, written, and updated concurrently (mutual exclusion).

  • Deferred provides a write-once value that may not be available yet. Other effects can be serialized to happen after the value arrives.

  • Effects from Cats Effect are safe: they are referentially transparent. This means they compose, reducing the number of bugs you can encounter and making your code easier to understand.

3. Concurrent state machines

happyautomata 4

The big idea

Ref + Deferred + 🧠

concurrent state machine

coordination component

🎉

State machines

(S, A) ⇒ (S, B)

a.k.a, objects!

class Machine(var s: S) {
  def act(a: A): B
}

Concurrent state machines

(S, A) ⇒ (S, B)

- class Machine(var s: S) {
+ class Machine(s: AtomicReference[S]) {
    def act(a: A): B
  }

avoids lost updates of s under concurrency

Concurrent state machines with effects

(S, A) ⇒ F[(S, B)]

- class Machine(s: AtomicReference[S]) {
+ class Machine(s: Ref[S]) {
-   def act(a: A): B
+   def act(a: A): F[B]
  }

the type F could be IO, for example

Concurrent state machines: the recipe

  1. Define the coordination interface:

    1. enumerate the roles of the coordinating components, along with each role’s methods; and

    2. define the behavior of each method, which may be state-dependent.

  2. Implement the interface by building a state machine where:

    1. we model the state as an algebraic data type S;

    2. we manage the state as a Ref value;

    3. each interface method is implemented as:

      1. a state transition function affecting the Ref; and

      2. any state-dependent blocking behavior is controlled via Deferred values.

Concurrent state machines: the recipe

🙀

Example: countdown latch

Ensure n events happen before subsequent events.

CountdownLatch(3).flatMap { latch =>
  val waiter = latch.get >> IO.println("Finally!")
  val notifier = IO.println("Go!") >> latch.decrement

  (
    notifier, notifier, notifier,
    waiter, waiter
  ).parTupled
}

Go!
Go!
Go!
Finally!
Finally!

Example: countdown latch

  1. Define the coordination interface:

    1. enumerate the roles of the coordinating components, along with each role’s methods; and

    2. define the behavior of each method, which may be state-dependent.

Roles
Waiters

block until latch opens

Notifiers

notifies the latch that work is complete

trait CountdownLatch {
  def await(): IO[Unit] (1)
  def decrement(): IO[Unit] (2)
}
1Block until the latch opens.
2Decrement the counter and open the latch if 0.

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. we model the state as an algebraic data type S;

sealed trait State

object State {
  case class Outstanding(
    n: Int, (1)
    whenDone: Deferred[IO, Unit] (2)
  ) extends State
  case class Done() extends State
}
1Number of notifications we are waiting for.
2When n reaches 0, we complete the Deferred.

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. we manage the state as a Ref value;

object CountdownLatch {
  def apply(n: Long): IO[CountdownLatch] =
    for {
      whenDone <- Deferred[IO, Unit]
      state <- Ref[IO].of[State](Outstanding(n, whenDone))
    } yield ???
}

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. each interface method is implemented as:

      1. a state transition function affecting the Ref; and

      2. any state-dependent blocking behavior is controlled via Deferred values.

def await: IO[Unit] =
  state.get.flatMap {
    case Outstanding(_, whenDone) =>
      whenDone.get
    case Done() =>
      IO.unit
  }

Example: countdown latch

  1. Implement the interface by building a state machine where:

    1. each interface method is implemented as:

      1. a state transition function affecting the Ref; and

      2. any state-dependent blocking behavior is controlled via Deferred values.

def decrement: IO[Unit] = state.modify {
  case Outstanding(1, whenDone) =>
    Done() -> whenDone.complete(())
  case Outstanding(n, whenDone) =>
    Outstanding(n - 1, whenDone) -> IO.unit
  case Done() =>
    Done() -> IO.unit
}.flatten

Example: countdown latch

We did it! 🎉

trait CountdownLatch {
  def await(): IO[Unit]
  def decrement(): IO[Unit]
}

CountdownLatch(3).flatMap { latch =>
  val waiter = latch.get >> IO.println("Finally!")
  val notifier = IO.println("Go!") >> latch.decrement

  (
    notifier, notifier, notifier,
    waiter, waiter
  ).parTupled
}

Summary and next steps

happyautomata 5

Summary

  • Concurrent coordination can be provided by synchronization: constraints on the ordering of events.

  • Concurrent mediators manage and enforce those constraints among the coordinating components.

  • We can build coordinating interfaces using a concurrent state machine, composed of standard primitives, which decompose the coordination problem into cases, easing implementation.

Take a look at the source of many Cats-Effect-based libraries: you’ll better understand how they work! This technique is used everywhere.

Concurrent State Machines with Cats Effect

Adam Rosien @arosien
Inner Product LLC @InnerProductLLC

Hire us to help with your projects, or train your staff!

inner product logo with url 320