Scheduling & Executors

Introduction

Additionally to the promise abstraction, promesa library comes with many helpers and factories for execution and scheduling of tasks for asynchronous execution.

Although this API works in the JS runtime and some of the function has general utility, the main target is the JVM platform.

Async Tasks

Firstly, lets define async task: a function that is executed out of current flow using a different thread. Here, promesa library exposes mainly two functions:

  • promesa.exec/run!: useful when you want run a function in a different thread and you don’t care abour the return value; it returns a promise that will be fullfilled when the callback terminates.
  • promesa.exec/submit! useful when you want run a function in a different thread and you need the return value; it returns a promise that will be fullfilled with the return value of the function.

Let see some examples:

(require '[promesa.exec :as px])


@(px/run! (fn []
            (prn "I'm running in different thread")
            1))
;; => nil

@(px/submit! (fn []
               (prn "I'm running in different thread")
               1))
;; => 1

The both functions optionally accepts as first argument an executor instance that allows specify the executor where you want execute the specified function. If no executor is provided, the default one is used (binded on the promesa.exec/*default-executor* dynamic var).

Also, in both cases, the returned promise is cancellable, so if for some reason the function is still not execued, the cancellation will prevent the execution. You can cancel a cancellable promise with p/cancel! function.

Delayed Tasks

This consists in a simple helper that allows scheduling execution of a function after some amount of time.

(require '[promesa.exec :as exec])
(exec/schedule! 1000 (fn []
                       (println "hello world")))

This example shows you how you can schedule a function call to be executed 1 second in the future. It works the same way for both Clojure and ClojureScript.

The tasks can be cancelled using its return value:

(def task (exec/schedule! 1000 #(do-stuff)))

(p/cancel! task)

The execution model depends on the platform: on the JVM a default scheduler executor is used and the the scheduled function will be executed in different thread; on JS runtime the function will be executed in a microtask.

Executors Factories

A collection of factories function for create executors instances (JVM only):

  • px/cached-executor: creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
  • px/fixed-executor: creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
  • px/single-executor: creates an Executor that uses a single worker thread operating off an unbounded queue
  • px/scheduled-executor: creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
  • px/forkjoin-executor: creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention.

Since v9.0.x there are new factories that uses the JDK>=19 preview API:

  • px/thread-per-task-executor: creates an Executor that starts a new Thread for each task.
  • px/vthread-per-task-executor: creates an Executor that starts a new virtual Thread for each task.

Helpers

pmap (experimental)

This is a simplified clojure.core/pmap analogous function that allows execute a potentially computationally expensive or io bound functions in parallell.

It returns a lazy chunked seq (uses the clojure’s default chunk size: 32) and the maximum parallelism is determined by the provided executor.

Example:

(defn slow-inc
  [x]
  (Thread/sleep 1000)
  (inc x))

(time
 (doall
  (px/pmap slow-inc (range 10))))

;; "Elapsed time: 2002.724345 msecs"
;; => (1 2 3 4 5 6 7 8 9 10)

(time
 (doall
  (map slow-inc (range 10))))

;; Elapsed time: 10001.912614 msecs"
;; => (1 2 3 4 5 6 7 8 9 10)

with-executor macro (experimental)

This allows run a scoped code with the px/*default-executor* binded to the provided executor. The provided executor can be a function for lazy executor instantiation.

It optionally accepts metadata on the executor part for indicate:

  • ^:shutdown: shutdown the pool before return
  • ^:interrupt: shutdown and interrupt before return

There an example on how you can customize the executor for pmap:

(time
 (px/with-executor ^:shutdown (px/fixed-executor :parallelism 2)
   (doall (px/pmap slow-inc (range 10)))))

;; "Elapsed time: 5004.506274 msecs"
;; => (1 2 3 4 5 6 7 8 9 10)