1. Introduction
The basic building blocks for concurrent, asynchronous and non blocking programming in clojure.
2. Rationale
This project exists as consecuence of the need of good abstraction for represent eventually available asynchronous values for the catacumba project.
It there already existing libraries in clojure ecosystem such as manifold that are well established and intends to be a "lingua franca" for asynchronous computation results. But I think that here there space for an alternative with different vision. For see more detailed information about the differences with manifold, see this faq entry.
The futura library at this moment provides three different building blocks:
-
Streams
I did not want invent yet an other streams abstraction and luckily I found the reactive streams initiative that has already defined a strong and well defined contract on how the abstraction should behave and with backpressure awareness in its core.
This library uses that interfaces and provides a powerfull clojure features (like transducers) to the good and interoperable stream abstraction. -
Promises
With promises happens something similar, but in this case JDK8 comes with good implementation with wrong name:CompletableFuture
. This library provides a promise abstraction primitive build on top of jdk8 completable futures implementation and implements the monadic protocols of cats library that allows it to play in the monadic composition using generic syntactic abstractions like haskell likedo syntax
. -
Atomics
Additionally to the mentioned abstractions, this library provides an lighweight clojure friendly syntax for working with JDK Atomic types and references that are often used as concurrency primitives in the streams implementation. -
Executors
A clojure friendly abstraction for the java/jvm executors.
3. Project Maturity
Since futura is a young project there may be some API breakage.
4. Install
The simplest way to use futura library in a ClojureScript project is by including it as a dependency:
[funcool/futura "0.3.0"]
5. Promises
Is the abstraction that represents the result of an asynchronous operation that will be eventually available.
Clojure comes with a builtin promise abstraction but it is designed only blocking operations, and in our circumstances it is very limited approach. The futura's promise abstraction uses the jdk8 completable future as underlying implementation and all promises can be easily coerced to jdk future or completable future.
The promise consists in a container that eventually will contain a value with builtin support for error handling. So the promise has three different states:
-
pending
: means that the promise does not contains any value. -
resolved
: means that the promise contains a value. -
rejected
: means thet the promise contains an error.
5.1. Creating a promise.
It there several different ways to create a promise in futura library. You can create it inmediatelly resolved with initial value, inmediatelly rejected with some exception.
Let start with a basic example using the commonly known promise delivering in clojure:
(require '[futura.promise :as p])
(def pr (p/promise))
(future
(Thread/sleep 200)
(p/deliver pr 20))]
(p/then pr (fn [v]
(println v)))
;; After 200ms it will print `20`
An other way to create a promise is using a factory function that can resolve or reject promise in asynchrnous way. If you are familiar with javascript promises, you will found that very similar:
(def pr (p/promise
(fn [deliver]
(deliver 1))))
5.2. Blocking operations
The futura's promises can be used as drop in replacement for clojure promises, because them offers also blocking operations:
@pr
;; => 1
If you try to deref a promise that is rejected, the exception will be rereaised in the calling thread.
5.3. State checking
futura provides useful predicates that will allow check the state of a promise in any time.
Let see some examples:
(def pr (p/promise 2))
(p/promise? pr)
;; => true
(p/pending? pr)
;; => false
(p/resolved? pr)
;; => true
(p/rejected? pr)
;; => false
(p/done? pr)
;; => true
The done?
predicate checks if a promise is fullfiled, independently if is resolved or rejected.
5.4. Promise chaining
Futura also offers powerful chaining methods for allow easy compose async computations. In
previous examples we have seen then
function, let see an other more complex example using it:
(def pr (-> (p/promise 2)
(p/then inc)
(p/then inc)))
(p/then pr (fn [v]
(println v)))
;; It will print 4.
5.5. Error handling
It also exposes a chain method for error handling:
(def pr (-> (p/promise 2)
(p/then (fn [v] (throw (ex-info "foobar" {}))))))
(p/catch pr (fn [error]
(println "Error:" (.getMessage error))))
;; Will print something like "Error: foobar"
The catch
chain function also return a promise. Promise that will be resolved or rejected
depending on that will happen inside the catch handler.
5.6. Working with collections
In some circumstances you will want wait a completion of few promises at same time, and futura also provides helpers for that:
@(p/all [(p/promise 1) (p/promise 2)])
;; => [1 2]
@(p/any [(p/promise 1) (p/promise (ex-info "error" {}))])
;; => 1
6. Streams
The streams in futura library are governed by the reactive streams iniciative and its default interfaces for the jvm languages. Additionally, it comes with support for the the powerful clojure’s features as transducers and sequence abstractions and with great interoprability with existing clojure libraries.
In summary, is a reactive-streams implementation on top of clojure abstractions and works as intermediary between clojure and the java world.
The reactive streams has four participants, but in the clojure part we only need one: the Publisher.
6.1. First contact
The futura's streams api is really very simple. It consists in ver few functions.
Let’s go to create our first publisher:
(require '[futura.stream :as stream])
(def s (stream/publisher))
The publisher
function without additional parameters creates a empty, and unbuffered publisher. Now
you can put the items to the using the put!
function:
(def prm (stream/put! s 1))
The return value of the put!
function is a promise that will be resolved to true when the value is
accepted by the publisher. And in case of the unbuffered publisher it will happens when one subscription
will request a value.
And finally, for obtain values from the publisher, you should create a subscription:
(with-open [sub (s/subscribe s)]
(s/take! s))
The return value of take!
function is also a promise and will be resolved with a value when the first
one will be available.
Take care that the subscribe
function it is not analogous to the .subscribe
method of the publisher.
Instead of creating opaque object that acts as relation betwen the publisher and the subscriber, the
subscribe
function creates a open object that does not has direct relation with any subscriber.
Obviously, behind the schenes the subscribe
function uses the publishers .subscribe
method. So the
publisher implementation is completelly interopreable with other third party libraries and the java world.
This subscription strategy has some advantages over the purposed by the reactive-streams, because you can treat it like a subscription stream. It implements convenient interfaces for the ability to treat the subscription like a clojure sequence, java iterable or even core.async channel.
You should known that everything in this implementation is lazy. Creating subscription does not automatically request all items from publisher. A subscription will only request a item to the publisher when you are request it.
Behind the scenes, publisher and subscription are implemented using core.async, so the nil values
are does not allowed and represent that the publisher or the subscription is closed. The subscription
as you can observe implements the .close()
method that allows use it with with-open
macro, but take
care, the publisher can end early and subscrion will be closed also before with-open calls the
.close()
method.
6.2. Source the publisher
As we said previously, the real purpose of this abstraction is using it as connection with java world, surelly if you are clojure only user you may want use core.async or manifold directly without additional layer.
In fact, futura is actually used by catacumba for comunicating asyncronous streams with ratpack/netty. This abstraction is choiced because it has support for backpressure that is very important in asynchronous network applications.
But for use it as a intermediary layer with third party libraries you should will be able create publisher from the existing abstractions in clojure. Now, let se how you can do it:
Iterable
:(def pub (p/publisher (take 5 (iterate inc 1))))
(into [] pub)
;; => [1 2 3 4 5]
As you can observe the previous example, you can see that the publisher can be easily converted
to sequence. That happens because publisher implements the clojure’s Seqable
interface that behind
the scenes uses a subscription and blocking access to all items until the subscription is closed.
So, you can create a publisher from this kind of abstractions: promise, jdk8 completable future, manifold deferred, manifold stream, sequences and iterables.
6.3. Composable trasnformations
Additionally to the previously mentioned abstractions, you can create a publisher from another
publisher using the transform
function. This kind of composition allows you attach a transducer
for apply some kind of transformations to the stream values.
Take care de some implementation detail: the transducer is applied for subscriptions, not for the
publisher. So, if you use some kind of (take N)
transducer with possible infinite publisher, it will
cause that each subscription will be closed after N
items but the publisher will remain active
accepting new subscriptions.
(def pub (->> (p/publisher (take 5 (iterate inc 1)))
(p/transform (map inc))))
(into [] pub)
;; => [2 3 4 5 6]
6.4. Subscription as channel
One interesting thing is that open subscriptions can be used as channels so them are fully compatible for usage in core.async go block like any other channel:
(require '[clojure.core.async :refer [<! go-loop]])
(with-open [sub (s/subscribe s)]
(go-loop []
(when-let [value (<! sub)]
(do-something value)
(recur))))
7. Atomics
This is a simple clojure friendly syntax and lightweight abstraction built on top of clojure’s protocols for treat with JDK atomic types and reference types.
This is a low level, side effecting primitive and should be used only when you are really known that are you doing. If you do not know if you should use it or not, prefer using standard clojure primitives such as atom, ref, and agents.
7.1. Atomic Reference
The atomic reference privides a lock-free, thread safe object reference container. The real purpose of this type is store a reference to an arbitrary object and will be able mutate it in a thread safe, lock-fre way.
7.1.1. Creating an atomic reference
The atomic reference can be created with ref
function from the futura.atomic
namespace:
(require '[futura.atomic :as atomic])
(atomic/ref :foo)
;; => #object[futura.atomic.AtomicRef 0x5e42bd13 {:status :ready, :val :foo}]
7.1.2. Get and set values
The atomic reference provides the standard way to set or get values using get
and set!
functions:
(atomic/set! myref :baz)
(atomic/get myref)
;; :baz
Additionally it also implements the clojure IDeref
interface for make it easy use with @
reader
macro or deref
function:
(def myref (atomic/ref :foo))
(deref myref)
;; => :foo
@myref
;; => :foo
7.1.3. Special operations
The atomic types usually offers some special operations, and this one is not an exception. The atomic refernce allow CAS operations (compare and set):
(atomic/compare-and-set! myref :baz :foobar)
;; => false
(atomic/compare-and-set! myref :foo :bar)
;; => true
And the "get and set" operation:
(atomic/get-and-set! myref :foo)
;; => :bar
@myref
;; => :foo
Additionaly it provides a way to set a value in some kind of "asynchronously" way. But take care, this method to set the value does not guarrantee that the change is visible instantly to all threads:
(atomic/eventually-set! myref :foobar)
7.2. Atomic Boolean
This is a specialized version of Atomic Reference that is higtly optimized for boolean values. It has the same operations and implements the same abstractions that previously explained atomic ref.
You can create an atomic boolean using boolean
function from futura.atomic
namespace:
(atomic/boolean false)
;; => #object[futura.atomic.AtomicBoolean 0x393bbfce {:status :ready, :val false}]
7.3. Atomic Long
This is a specialized version of Atomic Reference that is highly optimized for numeric operations
with longs. It has the same operations and implements the same abstractions that previously explained
ref
and boolean
atomic types.
But additionally implements a complementary abstraction that brings some powerfull operations that only fits for numeric types, such as atomic counters and similars.
You can create an atomic long using long
function from futura.atomic
namespace:
(atomic/long 0)
;; => #object[futura.atomic.AtomicLong 0x393bbfce {:status :ready, :val 0}]
Here some examples using the functions defined for numeric atomic types, such as "get and increment/decrement" operations:
(def mylong (atomic/long 0))
(atomic/get-and-inc! mylong)
;; => 0
(atomic/get-and-dec! mylong)
;; => 1
@mylong
;; => 0
And optionally you can increment it with user specified delta:
(atomic/get-and-add! mylong 10)
;; => 0
@mylong
;; => 10
8. Executors
This is a simple clojure friendly syntax and lightweight abstraction for the jvm executors.
It there two kind of operations you can do with futura executor api:
-
Execute a function in an executor.
-
Execute a task in an executor and wait a result of that operation using promises.
So, the first operation consists just execute a clojure function in other thread:
(require '[futura.executor :as exec])
(exec/execute (fn []
(println "hello world")))
And the second one, consists in execute some (maybe) blocking code in other thead and wait the result using the promise abstraction:
(require '[futura.executor :as exec])
(require '[futura.promise :as p])
(def prom (exec/submit (fn []
(Thread/sleep 2000)
(+ 1 2))))
(p/then prom (fn [result]
(println "Result:" result)))
The two explained functions are multiarity and both them accepts an additional argument for specify a user provided executor instane. So the first example is identical a this one:
(require '[futura.executor :as exec])
(exec/execute exec/*default* (fn []
(println "hello world")))
9. FAQ
9.1. What is the difference with manifold
Both libraries offers similar abstractions, futura offers promises and streams and manifold offers deferreds and streams. The main difference of this libraries is clearly philosoficaly:
-
manifold build own abstraction for work with streams, futura uses an existing and interoprable abstraction.
-
manifold implements its own defferred, futura uses the already one defined in jdk8 (completable future).
-
manifold raises own syntax abstraction (
let-flow
and similars), futura implements an existing monad abstraction from cats library that alread offers generic let like syntax that serves for compose asynchronous computations that looks sync.
The futura library obviosly is less mature that manifold because of obvios reasons that manifold exists some time ago.
10. Contribute
futura unlike Clojure and other Clojure contrib libs, does not have many restrictions for contributions. Just open a issue or pull request.
11. Get the Code
futura is open source and can be found on github.
You can clone the public repository with this command:
git clone https://github.com/funcool/futura
12. Run tests
For run tests just execute this:
lein test
13. License
futura is licensed under BSD (2-Clause) license:
Copyright (c) 2015 Andrey Antukh <niwi@niwi.be> All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.