Skip to main content

Intro

info

As of now, Workflows4s don't have a realease. The guide below aims at showcasing the basic ideas behind the library but to follow it you need to either release the code locally or write it inside workflows4s-example project.

Let's model a simplified pull request process that looks like this:

  1. Run CI/CD pipeline
  2. Close the PR if critical issue detected in the pipeline
  3. Await approval
  4. Merge if approved, close otherwise

Modeling the workflow

We will start by defining our workflow context. It controls types internal to the workflow: its state and events it uses for persistance. Those won't bother us for now because they are not important in the early phase of designing the workflow.

object Context extends WorkflowContext {
override type Event = Unit
override type State = Unit
}
import Context.*

Now we can define the shape of our workflow.

val createPR: WIO.Draft    = WIO.draft.signal()
val runPipeline: WIO.Draft = WIO.draft.step(error = "Critical Issue")
val awaitReview: WIO.Draft = WIO.draft.signal(error = "Rejected")

val mergePR: WIO.Draft = WIO.draft.step()
val closePR: WIO.Draft = WIO.draft.step()

val workflow: WIO.Draft = (
createPR >>>
runPipeline >>>
awaitReview >>>
mergePR
).handleErrorWith(closePR)

This is enough to generate the graphical representation!

val bpmnModel = BPMNConverter.convert(workflow.getModel, "process")
Bpmn.writeModelToFile(new File(s"pr-draft.bpmn").getAbsoluteFile, bpmnModel)

Tada!

run-io.svg

Implementing the workflow

Let's now implement our workflow. We have to start with defining the few underlying ADTs: state, events, errors and signals. Normally, you will define those as you go through the process od defining the steps, but for the sake of this tutorial, we are defining them upfront.

sealed trait PRState
object PRState {
case object Empty extends PRState
case class Initiated(commit: String) extends PRState
case class Checked(commit: String, pipelineResults: String) extends PRState
case class Reviewed(commit: String, pipelineResults: String, approved: Boolean) extends PRState
type Merged = Reviewed
case class Closed(state: PRState, reason: PRError) extends PRState
}

Workflows4s was designed to be as type-safe as possible. It means we can express the state as an ADT and the compiler will allow us to compose steps transitioning through those states only in the correct order.

sealed trait PREvent
object PREvent {
case class Created(commit: String) extends PREvent
case class Checked(pipelineResults: String) extends PREvent
case class Reviewed(approved: Boolean) extends PREvent
}

Workflows4s is built on the idea of event-sourcing, and each non-deterministic action (e.g. IO) is memoized through an event. Those events are used to recompute the workflow state upon recovery.

object Signals {
val createPR: SignalDef[CreateRequest, Unit] = SignalDef()
val reviewPR: SignalDef[ReviewRequest, Unit] = SignalDef()
case class CreateRequest(commit: String)
case class ReviewRequest(approve: Boolean)
}

Signals are the API of a workflow, they allow delivering information into the workflow and get some response back. In our example, we don't leverage the response part.

sealed trait PRError
object PRError {
case object CommitNotFound extends PRError
case object PipelineFailed extends PRError
case object ReviewRejected extends PRError
}

Workflows4s supports short-circuiting operations with domain errors. The mechanism is similar to Either or bi-functor IO and different parts of the workflow can use different errors. In this example, we use just one. All errors will be visible in type signatures.

Now that we have it covered, we can plug state and events into our context and start defining the steps.

object Context extends WorkflowContext {
override type Event = PREvent
override type State = PRState
}
import Context.*

Our first step creates the PR reacting to the previously defined signal.

val createPR: WIO[Any, PRError.CommitNotFound.type, PRState.Initiated] =
WIO
.handleSignal(Signals.createPR)
.using[Any]
.purely((in, req) => PREvent.Created(req.commit))
.handleEventWithError((in, evt) =>
if (evt.commit.length > 8) Left(PRError.CommitNotFound)
else Right(PRState.Initiated(evt.commit)),
)
.voidResponse
.autoNamed()

We handled the signal without side effects and added dummy validation logic in the event handler. In real life, we could do the lookup in git to verify commits existence and emit different events based on the outcome of that lookup.

voidResponse produces unit value as signal response. autoNamed fills the name of the step based on the variable name.

Step definitions are verbose by design, because they are not lightweight things. Each step in the workflow adds complexity and mental load. The workflow should have as few steps as possible.

In the next 2 steps, we run a fake side-effectful computation and handle the review signal.

val runPipeline: WIO[PRState.Initiated, PRError.PipelineFailed.type, PRState.Checked] =
WIO
.runIO[PRState.Initiated](in => IO(PREvent.Checked("<Some tests results>")))
.handleEventWithError((in, evt) =>
if (evt.pipelineResults.contains("error")) Left(PRError.PipelineFailed)
else Right(PRState.Checked(in.commit, evt.pipelineResults)),
)
.autoNamed

val awaitReview: WIO[PRState.Checked, PRError.ReviewRejected.type, PRState.Reviewed] =
WIO
.handleSignal(Signals.reviewPR)
.using[PRState.Checked]
.purely((in, req) => PREvent.Reviewed(req.approve))
.handleEventWithError((in, evt) =>
if (evt.approved) Right(PRState.Reviewed(in.commit, in.pipelineResults, evt.approved))
else Left(PRError.ReviewRejected),
)
.voidResponse
.autoNamed()

With this being done, we can finish the workflow.

val mergePR: WIO[PRState.Reviewed, Nothing, PRState.Merged]   =
WIO.pure[PRState.Reviewed].make(in => in).autoNamed()
val closePR: WIO[(PRState, PRError), Nothing, PRState.Closed] =
WIO.pure[(PRState, PRError)].make((state, err) => PRState.Closed(state, err)).autoNamed()

val workflow: WIO[Any, Nothing, PRState] = (
createPR >>>
runPipeline >>>
awaitReview >>>
mergePR
).handleErrorWith(closePR)

Done! We defined last to simple steps as pure deterministic computations and composed the steps exactly the same way as in the draft.

Let's generate the diagram again.

val bpmnModel = BPMNConverter.convert(workflow.getModel, "process")
Bpmn.writeModelToFile(new File(s"pr.bpmn").getAbsoluteFile, bpmnModel)

run-io.svg

Has anything changed? Yes, because the error names are now generated automatically from the defined ADT. Otherwise, its same process we defined initially.

Running the workflow

Let's now see how to run the workflow. To do that, we need a runtime. For simplicity’s sake, we will go with synchronous in-memory runtime.

import cats.effect.unsafe.implicits.global
val wfInstance = InMemorySyncRuntime.runWorkflow[Context.type, PRState.Empty.type](
behaviour = workflow,
state = PRState.Empty
)

wfInstance.deliverSignal(Signals.createPR, Signals.CreateRequest("some-sha"))
println(wfInstance.queryState())
// Checked(some-sha,<Some tests results>)

wfInstance.deliverSignal(Signals.reviewPR, Signals.ReviewRequest(approve = false))
println(wfInstance.queryState())
// Closed(Checked(some-sha,<Some tests results>),ReviewRejected)

And that's pretty much it. Runtime provides us a way to interact with a workflow through delivering signals and querying the state.

There is one more thing which we can do which is recovering the workflow.

Recovering the workflow

This is usually a responsibility of the runtime to fetch the events and reconstruct instance from them, but because we are using in-memory runtime here, we can do this programtically.

val recoveredInstance = InMemorySyncRuntime.runWorkflow[Context.type, PRState.Empty.type](
workflow,
PRState.Empty,
events = wfInstance.getEvents,
)
assert(wfInstance.queryState() == recoveredInstance.queryState())

We created a new instance and provided the initial events taken from the previous instance. The workflow processed them and recovered the state without executing any side-effectful operations.

You can find the whole code here.