1. Introduction

The basic building blocks for concurrent, asynchronous and non blocking programming in clojure.

2. Rationale

This project exists as consecuence of the need of good abstraction for represent eventually available asynchronous values for the catacumba project.

It there already existing libraries in clojure ecosystem such as manifold that are well established and intends to be a "lingua franca" for asynchronous computation results. But I think that here there space for an alternative with different vision. For see more detailed information about the differences with manifold, see this faq entry.

The futura library at this moment provides three different building blocks:

  • Streams
    I did not want invent yet an other streams abstraction and luckily I found the reactive streams initiative that has already defined a strong and well defined contract on how the abstraction should behave and with backpressure awareness in its core.
    This library uses that interfaces and provides a powerfull clojure features (like transducers) to the good and interoperable stream abstraction.

  • Promises
    With promises happens something similar, but in this case JDK8 comes with good implementation with wrong name: CompletableFuture. This library provides a promise abstraction primitive build on top of jdk8 completable futures implementation and implements the monadic protocols of cats library that allows it to play in the monadic composition using generic syntactic abstractions like haskell like do syntax.

  • Atomics
    Additionally to the mentioned abstractions, this library provides an lighweight clojure friendly syntax for working with JDK Atomic types and references that are often used as concurrency primitives in the streams implementation.

  • Executors
    A clojure friendly abstraction for the java/jvm executors.

3. Project Maturity

Since futura is a young project there may be some API breakage.

4. Install

The simplest way to use futura library in a ClojureScript project is by including it as a dependency:

[funcool/futura "0.3.0"]

5. Promises

Is the abstraction that represents the result of an asynchronous operation that will be eventually available.

Clojure comes with a builtin promise abstraction but it is designed only blocking operations, and in our circumstances it is very limited approach. The futura's promise abstraction uses the jdk8 completable future as underlying implementation and all promises can be easily coerced to jdk future or completable future.

The promise consists in a container that eventually will contain a value with builtin support for error handling. So the promise has three different states:

  • pending: means that the promise does not contains any value.

  • resolved: means that the promise contains a value.

  • rejected: means thet the promise contains an error.

5.1. Creating a promise.

It there several different ways to create a promise in futura library. You can create it inmediatelly resolved with initial value, inmediatelly rejected with some exception.

Let start with a basic example using the commonly known promise delivering in clojure:

(require '[futura.promise :as p])

(def pr (p/promise))

(future
  (Thread/sleep 200)
  (p/deliver pr 20))]

(p/then pr (fn [v]
             (println v)))

;; After 200ms it will print `20`

An other way to create a promise is using a factory function that can resolve or reject promise in asynchrnous way. If you are familiar with javascript promises, you will found that very similar:

(def pr (p/promise
          (fn [deliver]
            (deliver 1))))

5.2. Blocking operations

The futura's promises can be used as drop in replacement for clojure promises, because them offers also blocking operations:

@pr
;; => 1

If you try to deref a promise that is rejected, the exception will be rereaised in the calling thread.

5.3. State checking

futura provides useful predicates that will allow check the state of a promise in any time.

Let see some examples:

(def pr (p/promise 2))

(p/promise? pr)
;; => true

(p/pending? pr)
;; => false

(p/resolved? pr)
;; => true

(p/rejected? pr)
;; => false

(p/done? pr)
;; => true

The done? predicate checks if a promise is fullfiled, independently if is resolved or rejected.

5.4. Promise chaining

Futura also offers powerful chaining methods for allow easy compose async computations. In previous examples we have seen then function, let see an other more complex example using it:

(def pr (-> (p/promise 2)
            (p/then inc)
            (p/then inc)))

(p/then pr (fn [v]
             (println v)))

;; It will print 4.

5.5. Error handling

It also exposes a chain method for error handling:

(def pr (-> (p/promise 2)
            (p/then (fn [v] (throw (ex-info "foobar" {}))))))

(p/catch pr (fn [error]
              (println "Error:" (.getMessage error))))
;; Will print something like "Error: foobar"

The catch chain function also return a promise. Promise that will be resolved or rejected depending on that will happen inside the catch handler.

5.6. Working with collections

In some circumstances you will want wait a completion of few promises at same time, and futura also provides helpers for that:

@(p/all [(p/promise 1) (p/promise 2)])
;; => [1 2]

@(p/any [(p/promise 1) (p/promise (ex-info "error" {}))])
;; => 1

6. Streams

The streams in futura library are governed by the reactive streams iniciative and its default interfaces for the jvm languages. Additionally, it comes with support for the the powerful clojure’s features as transducers and sequence abstractions and with great interoprability with existing clojure libraries.

In summary, is a reactive-streams implementation on top of clojure abstractions and works as intermediary between clojure and the java world.

The reactive streams has four participants, but in the clojure part we only need one: the Publisher.

6.1. First contact

The futura's streams api is really very simple. It consists in ver few functions.

Let’s go to create our first publisher:

(require '[futura.stream :as stream])

(def s (stream/publisher))

The publisher function without additional parameters creates a empty, and unbuffered publisher. Now you can put the items to the using the put! function:

(def prm (stream/put! s 1))

The return value of the put! function is a promise that will be resolved to true when the value is accepted by the publisher. And in case of the unbuffered publisher it will happens when one subscription will request a value.

And finally, for obtain values from the publisher, you should create a subscription:

(with-open [sub (s/subscribe s)]
  (s/take! s))

The return value of take! function is also a promise and will be resolved with a value when the first one will be available.

Take care that the subscribe function it is not analogous to the .subscribe method of the publisher. Instead of creating opaque object that acts as relation betwen the publisher and the subscriber, the subscribe function creates a open object that does not has direct relation with any subscriber.

Obviously, behind the schenes the subscribe function uses the publishers .subscribe method. So the publisher implementation is completelly interopreable with other third party libraries and the java world.

This subscription strategy has some advantages over the purposed by the reactive-streams, because you can treat it like a subscription stream. It implements convenient interfaces for the ability to treat the subscription like a clojure sequence, java iterable or even core.async channel.

You should known that everything in this implementation is lazy. Creating subscription does not automatically request all items from publisher. A subscription will only request a item to the publisher when you are request it.

Behind the scenes, publisher and subscription are implemented using core.async, so the nil values are does not allowed and represent that the publisher or the subscription is closed. The subscription as you can observe implements the .close() method that allows use it with with-open macro, but take care, the publisher can end early and subscrion will be closed also before with-open calls the .close() method.

6.2. Source the publisher

As we said previously, the real purpose of this abstraction is using it as connection with java world, surelly if you are clojure only user you may want use core.async or manifold directly without additional layer.

In fact, futura is actually used by catacumba for comunicating asyncronous streams with ratpack/netty. This abstraction is choiced because it has support for backpressure that is very important in asynchronous network applications.

But for use it as a intermediary layer with third party libraries you should will be able create publisher from the existing abstractions in clojure. Now, let se how you can do it:

Example creating a publisher instance from any object that implements Iterable:
(def pub (p/publisher (take 5 (iterate inc 1))))

(into [] pub)
;; => [1 2 3 4 5]

As you can observe the previous example, you can see that the publisher can be easily converted to sequence. That happens because publisher implements the clojure’s Seqable interface that behind the scenes uses a subscription and blocking access to all items until the subscription is closed.

So, you can create a publisher from this kind of abstractions: promise, jdk8 completable future, manifold deferred, manifold stream, sequences and iterables.

6.3. Composable trasnformations

Additionally to the previously mentioned abstractions, you can create a publisher from another publisher using the transform function. This kind of composition allows you attach a transducer for apply some kind of transformations to the stream values.

Take care de some implementation detail: the transducer is applied for subscriptions, not for the publisher. So, if you use some kind of (take N) transducer with possible infinite publisher, it will cause that each subscription will be closed after N items but the publisher will remain active accepting new subscriptions.

(def pub (->> (p/publisher (take 5 (iterate inc 1)))
              (p/transform (map inc))))

(into [] pub)
;; => [2 3 4 5 6]

6.4. Subscription as channel

One interesting thing is that open subscriptions can be used as channels so them are fully compatible for usage in core.async go block like any other channel:

(require '[clojure.core.async :refer [<! go-loop]])

(with-open [sub (s/subscribe s)]
  (go-loop []
    (when-let [value (<! sub)]
      (do-something value)
      (recur))))

7. Atomics

This is a simple clojure friendly syntax and lightweight abstraction built on top of clojure’s protocols for treat with JDK atomic types and reference types.

This is a low level, side effecting primitive and should be used only when you are really known that are you doing. If you do not know if you should use it or not, prefer using standard clojure primitives such as atom, ref, and agents.

7.1. Atomic Reference

The atomic reference privides a lock-free, thread safe object reference container. The real purpose of this type is store a reference to an arbitrary object and will be able mutate it in a thread safe, lock-fre way.

7.1.1. Creating an atomic reference

The atomic reference can be created with ref function from the futura.atomic namespace:

(require '[futura.atomic :as atomic])

(atomic/ref :foo)
;; => #object[futura.atomic.AtomicRef 0x5e42bd13 {:status :ready, :val :foo}]

7.1.2. Get and set values

The atomic reference provides the standard way to set or get values using get and set! functions:

(atomic/set! myref :baz)

(atomic/get myref)
;; :baz

Additionally it also implements the clojure IDeref interface for make it easy use with @ reader macro or deref function:

(def myref (atomic/ref :foo))

(deref myref)
;; => :foo

@myref
;; => :foo

7.1.3. Special operations

The atomic types usually offers some special operations, and this one is not an exception. The atomic refernce allow CAS operations (compare and set):

(atomic/compare-and-set! myref :baz :foobar)
;; => false

(atomic/compare-and-set! myref :foo :bar)
;; => true

And the "get and set" operation:

(atomic/get-and-set! myref :foo)
;; => :bar

@myref
;; => :foo

Additionaly it provides a way to set a value in some kind of "asynchronously" way. But take care, this method to set the value does not guarrantee that the change is visible instantly to all threads:

(atomic/eventually-set! myref :foobar)

7.2. Atomic Boolean

This is a specialized version of Atomic Reference that is higtly optimized for boolean values. It has the same operations and implements the same abstractions that previously explained atomic ref.

You can create an atomic boolean using boolean function from futura.atomic namespace:

(atomic/boolean false)
;; => #object[futura.atomic.AtomicBoolean 0x393bbfce {:status :ready, :val false}]

7.3. Atomic Long

This is a specialized version of Atomic Reference that is highly optimized for numeric operations with longs. It has the same operations and implements the same abstractions that previously explained ref and boolean atomic types.

But additionally implements a complementary abstraction that brings some powerfull operations that only fits for numeric types, such as atomic counters and similars.

You can create an atomic long using long function from futura.atomic namespace:

(atomic/long 0)
;; => #object[futura.atomic.AtomicLong 0x393bbfce {:status :ready, :val 0}]

Here some examples using the functions defined for numeric atomic types, such as "get and increment/decrement" operations:

(def mylong (atomic/long 0))

(atomic/get-and-inc! mylong)
;; => 0

(atomic/get-and-dec! mylong)
;; => 1

@mylong
;; => 0

And optionally you can increment it with user specified delta:

(atomic/get-and-add! mylong 10)
;; => 0

@mylong
;; => 10

8. Executors

This is a simple clojure friendly syntax and lightweight abstraction for the jvm executors.

It there two kind of operations you can do with futura executor api:

  • Execute a function in an executor.

  • Execute a task in an executor and wait a result of that operation using promises.

So, the first operation consists just execute a clojure function in other thread:

(require '[futura.executor :as exec])

(exec/execute (fn []
                (println "hello world")))

And the second one, consists in execute some (maybe) blocking code in other thead and wait the result using the promise abstraction:

(require '[futura.executor :as exec])
(require '[futura.promise :as p])

(def prom (exec/submit (fn []
                         (Thread/sleep 2000)
                         (+ 1 2))))

(p/then prom (fn [result]
               (println "Result:" result)))

The two explained functions are multiarity and both them accepts an additional argument for specify a user provided executor instane. So the first example is identical a this one:

(require '[futura.executor :as exec])

(exec/execute exec/*default* (fn []
                               (println "hello world")))

9. FAQ

9.1. What is the difference with manifold

Both libraries offers similar abstractions, futura offers promises and streams and manifold offers deferreds and streams. The main difference of this libraries is clearly philosoficaly:

  • manifold build own abstraction for work with streams, futura uses an existing and interoprable abstraction.

  • manifold implements its own defferred, futura uses the already one defined in jdk8 (completable future).

  • manifold raises own syntax abstraction (let-flow and similars), futura implements an existing monad abstraction from cats library that alread offers generic let like syntax that serves for compose asynchronous computations that looks sync.

The futura library obviosly is less mature that manifold because of obvios reasons that manifold exists some time ago.

10. Contribute

futura unlike Clojure and other Clojure contrib libs, does not have many restrictions for contributions. Just open a issue or pull request.

11. Get the Code

futura is open source and can be found on github.

You can clone the public repository with this command:

git clone https://github.com/funcool/futura

12. Run tests

For run tests just execute this:

lein test

13. License

futura is licensed under BSD (2-Clause) license:

Copyright (c) 2015 Andrey Antukh <niwi@niwi.be>

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.