
amarita muscaria
don’t eat this! ☠️
val x = ???
val y = ???
(
x.doStuff(),
y.doOtherStuff()
).parTupled // run in parallel
What can we say about which effects happen first?
Two events are
concurrent
if we cannot tell by
looking at the program
which will happen first.
you know which will happen first
☟
you know the sequence
☟
events are sequential
How do they coordinate their behavior?
Normal case: producers offer items, consumers take them.
Producers are blocked from offering items when the queue is full.
Consumers are blocked from taking items when the queue is empty.
queue | producers offering items vs. consumers taking items |
circuit-breaker | clients requesting from (potentially failing) servers |
lock | lockers race for exclusive access |
latch | waiters vs. notifiers |
barrier | group members await each other being ready, then proceed |
and more….
How to build
a coordinating component
with a concurrent state machine
using Cats Effect.
Synchronization ☜ conceptual background
Synchronization with Cats Effect ☜ practical stuff
Concurrent state machines ☜ the big idea
Many thanks to 🎉Fabio Labella🎉, who taught this idea to many people.
Computer programmers are often concerned with
synchronization constraints,
which are requirements pertaining to
the order of events.
That is, coordination in time.
etc.
Two concurrent effects
are mutually-exclusive
if they
must not happen at the same time.
Two concurrent effects
are serialized
if one must
happen before
the other.
var x = 0
// thread 1
x =
x + 1
//
//
//
//
// thread 2
x = x + 1
x
gets set to 1
, but should be 2
!
effects not allowed to
synchronized
[sic]
volatile
AtomicReference[A]
???
one effect must another
If x
cannot start y
,
y
must wait for a message from x
to start.
concurrent: when you can’t tell from looking at the program which will happen first
coordination: controlling the behavior of concurrent components
synchronization constraints: requirements about the order of events
mutual exclusion:
events not allowed to happen at the same time
avoids: non-determinism of concurrent writes; lost updates due to non-determinism of concurrent updates
serialization: one event must happen before another
Cats Effect is a
high-performance,
asynchronous,
composable
framework for building
real-world applications
in a purely functional style
within the Typelevel ecosystem.
Cats Effect 3 ()
Mutual exclusion
☟
Ref[F[_], A]
An asynchronous,
concurrent
mutable reference.
Serialization
☟
Deferred[F[_], A]
A purely functional
synchronization primitive
which represents a
single value
which may not yet be available.
Ref
def run(args: List[String]): IO[ExitCode] =
for {
counter <- Ref[IO].of(0L) (1)
_ <- (
worker(counter), worker(counter),
printCounter(counter)
).parTupled (2)
} yield ExitCode.Success
1 | Create a Ref that holds a Long value, initialized to 0 . |
2 | In parallel, start a number of workers that all update the shared counter, and another effect that prints the counter’s value on an interval. |
Ref
def worker(counter: Ref[IO, Long]): IO[Unit] =
for {
_ <- counter.update(_ + 1)
_ <- worker(counter)
} yield ()
def printCounter(counter: Ref[IO, Long]): IO[Unit] =
for {
_ <- IO.sleep(5.seconds)
n <- counter.get
_ <- IO.println(s"counter: $n")
_ <- printCounter(counter)
} yield ()
Ref
trait Ref[F[_], A] {
def get(): F[A]
def set(value: A): F[Unit]
def getAndSet(value: A): F[A]
def update(f: A => A): F[Unit]
def getAndUpdate(f: A => A): F[A]
def updateAndGet(f: A => A): F[A]
def modify[B](f: A => (A, B)): F[B]
}
the type F
could be IO
, for example
Deferred
trait Deferred[F[_], A] {
def get(): F[A] (1)
def complete(a: A): F[Unit] (2)
}
1 | Blocks until the value is produced. |
2 | Unblocks any waiters, providing the given value to them. |
the type F
could be IO
, for example
Cats Effect provides two primitives for synchronization, Ref
and Deferred
.
Ref
effectfully manages a value that can be read, written, and updated concurrently (mutual exclusion).
Deferred
provides a write-once value that may not be available yet. Other effects can be serialized to happen after the value arrives.
Effects from Cats Effect are safe: they are referentially transparent. This means they compose, reducing the number of bugs you can encounter and making your code easier to understand.
Ref
+ Deferred
+ 🧠
☟
concurrent state machine
☟
coordination component
🎉
(S, A) ⇒ (S, B)
a.k.a, objects!
class Machine(var s: S) {
def act(a: A): B
}
(S, A) ⇒ (S, B)
- class Machine(var s: S) {
+ class Machine(s: AtomicReference[S]) {
def act(a: A): B
}
avoids lost updates of s
under concurrency
(S, A) ⇒ F[(S, B)]
- class Machine(s: AtomicReference[S]) {
+ class Machine(s: Ref[S]) {
- def act(a: A): B
+ def act(a: A): F[B]
}
the type F
could be IO
, for example
Define the coordination interface:
enumerate the roles of the coordinating components, along with each role’s methods; and
define the behavior of each method, which may be state-dependent.
Implement the interface by building a state machine where:
we model the state as an algebraic data type S
;
we manage the state as a Ref
value;
each interface method is implemented as:
a state transition function affecting the Ref
; and
any state-dependent blocking behavior is controlled via Deferred
values.
🙀
Ensure n
events happen before subsequent events.
CountdownLatch(3).flatMap { latch =>
val waiter = latch.get >> IO.println("Finally!")
val notifier = IO.println("Go!") >> latch.decrement
(
notifier, notifier, notifier,
waiter, waiter
).parTupled
}
Go!
Go!
Go!
Finally!
Finally!
Define the coordination interface:
enumerate the roles of the coordinating components, along with each role’s methods; and
define the behavior of each method, which may be state-dependent.
Waiters | block until latch opens |
Notifiers | notifies the latch that work is complete |
trait CountdownLatch {
def await(): IO[Unit] (1)
def decrement(): IO[Unit] (2)
}
1 | Block until the latch opens. |
2 | Decrement the counter and open the latch if 0 . |
Implement the interface by building a state machine where:
we model the state as an algebraic data type S
;
sealed trait State
object State {
case class Outstanding(
n: Int, (1)
whenDone: Deferred[IO, Unit] (2)
) extends State
case class Done() extends State
}
1 | Number of notifications we are waiting for. |
2 | When n reaches 0 , we complete the Deferred . |
Implement the interface by building a state machine where:
we manage the state as a Ref
value;
object CountdownLatch {
def apply(n: Long): IO[CountdownLatch] =
for {
whenDone <- Deferred[IO, Unit]
state <- Ref[IO].of[State](Outstanding(n, whenDone))
} yield ???
}
Implement the interface by building a state machine where:
each interface method is implemented as:
a state transition function affecting the Ref
; and
any state-dependent blocking behavior is controlled via Deferred
values.
def await: IO[Unit] =
state.get.flatMap {
case Outstanding(_, whenDone) =>
whenDone.get
case Done() =>
IO.unit
}
Implement the interface by building a state machine where:
each interface method is implemented as:
a state transition function affecting the Ref
; and
any state-dependent blocking behavior is controlled via Deferred
values.
def decrement: IO[Unit] = state.modify {
case Outstanding(1, whenDone) =>
Done() -> whenDone.complete(())
case Outstanding(n, whenDone) =>
Outstanding(n - 1, whenDone) -> IO.unit
case Done() =>
Done() -> IO.unit
}.flatten
We did it! 🎉
trait CountdownLatch {
def await(): IO[Unit]
def decrement(): IO[Unit]
}
CountdownLatch(3).flatMap { latch =>
val waiter = latch.get >> IO.println("Finally!")
val notifier = IO.println("Go!") >> latch.decrement
(
notifier, notifier, notifier,
waiter, waiter
).parTupled
}
Concurrent coordination can be provided by synchronization: constraints on the ordering of events.
Concurrent mediators manage and enforce those constraints among the coordinating components.
We can build coordinating interfaces using a concurrent state machine, composed of standard primitives, which decompose the coordination problem into cases, easing implementation.
Take a look at the source of many Cats-Effect-based libraries: you’ll better understand how they work! This technique is used everywhere.
Cats Effect. https://typelevel.org/cats-effect
Allen B. Downey. The Little Book of Semaphores. https://greenteapress.com/wp/semaphores
Fabio Labella. https://systemfw.org/
Adam Rosien. Essential Effects essentialeffects.dev ☜ my book
Adam Rosien @arosien
Inner Product LLC @InnerProductLLC
Hire us to help with your projects, or train your staff!