Introduction

beicon is a small and concise library that provides reactive streams API for Clojure and ClojureScript.

Warning

This documentation does not cover the whole API, so if you miss some function, contributions are very welcome. You can see the full API documentation here.

Project Maturity

Since beicon is a young project there can be some API breakage.

Install

The simplest way to use beicon in a Clojure project, is by including it in the dependency vector on your project.clj file:

[funcool/beicon "4.1.0"]

Creating Streams

This section will give you the available methods for create observable streams.

From a collection

The most basic way to create a stream is to just take a collection and convert it into an observable sequence:

(require '[beicon.core :as rx])

(def stream (rx/from-coll [1 2 3]))

(rx/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> v: 2
;; ==> v: 3

From range

An other way to create an observable stream is using the range constructor, which is pretty analogous to the Clojures one:

(def stream (rx/range 3))

(rx/on-value stream #(println "v:" %))
;; ==> v: 0
;; ==> v: 1
;; ==> v: 2

From Atom

Atoms in Clojure are watchable, so you can listen for their changes. This method converts that changes into an infinite observable sequence of atom changes:

(def a (atom 1))

(def stream (rx/from-atom a))

(rx/on-value stream #(println "v:" %))
(swap! a inc)
;; ==> v: 2

From Value

It creates an observable sequence of one unique value:

(def stream (rx/just 10)

(rx/on-value stream #(println "v:" %))
;; ==> v: 10
Note
just or once are simple aliases to of function.

From multiple values

There is a way to create an observable sequence from multiple values, using the of constructor:

(def stream (rx/of 1 2 3))

(rx/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> v: 2
;; ==> v: 3

Empty

Some times you also want just a terminated stream:

(def stream (rx/empty))

This stream does not yield any value and just terminates.

With timeout

This allows to create an observable sequence of one unique value, that will be emitted after a specified amount of time:

(def stream (rx/timeout 1000 10))

(rx/on-value stream #(println "v:" %))
;; After 1 sec...
;; ==> v: 10

From factory

This is the most advanced and flexible way to create an observable sequence. It allows to have control about termination and errors, and is intended to be used for building other kinds of constructors.

(def stream
  (rx/create (fn [sink]
               (sink 1)          ;; next with `1` as value
               (sink (rx/end 2)) ;; next with `2` as value and end the stream
               (fn []
                 ;; function called on unsubscription
                 ))))

(rx/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> v: 2

This is implemented using protocols to make it flexible and easily extensible by the user. This is how the default implementation behaves:

  • Exceptions or exception instances trigger the error termination of stream.

  • (rx/end value) sends the unwrapped value to the stream, then terminates stream.

  • rx/end as value triggers the stream termination.

  • nil triggers stream validation.

  • any other values are valid values for sending to the stream.

On the JVM, there are other factory methods, that allow creation of backpressure-aware observables:

(def stream
  (rx/generate (fn [state sink]
                 (let [nextval (inc state)]
                   (sink nextval)
                   nextval))
               (constantly 0)))

The difference to the create function, the generate executes the factory function multiple times and you need to call sink once for each execution. Additionally, you can maintain state between executions retuning the desired value and that value will passed as first argument on the next execution. You can optionally provide an initial state using the second argument setup and a third dispose callback.

Many simple observables can be converted to be backpressure-aware using the to-flowable function. Be aware, that the default strategy :buffer (see below) is used by default to handle backpressure (over production).

(->> (rx/from-coll [1 2 3])
     (rx/to-flowable))

Optionally, you can specify the strategy to use for handling backpressure by the first argument:

(->> (rx/from-coll [1 2 3])
     (rx/to-flowable :error))

This is a list of available strategies:

  • :buffer - Buffers all values until the downstream consumes it.

  • :error - Signals a MissingBackpressureException in case the downstream can’t keep up.

  • :drop - Drops the most recent value, if the downstream can’t keep up.

  • :latest - Keeps only the latest value, overwriting any previous value, if the downstream can’t keep up.

Consuming streams

The stream states

The observable sequence can be in three different kind of states: alive, "errored" or ended. If an error is emitted the stream can be considered ended with an error. So error or end states are considered termination states.

And for convenience you can subscribe to any of that states of an observable sequence.

General purpose

A general purpose subscription is one that allows you to create one subscription, that watches all the different possible states of an observable sequence:

(def sub (rx/subscribe stream
                       #(println "on-value:" %)
                       #(println "on-error:" %)
                       #(println "on-end:")))

The return value of the subscribe function is a subscription object, that identifies the current subscription. It can be cancelled by executing (rx/cancel! sub).

Consume values

But in most circumstances you only want to consume values regardless of any error or termination. For this purposes there is the on-value function:

(def sub (rx/on-value sub #(println "val:" %)))

Like with the subscribe function, the on-value function also returns a callable, that, when called, will dispose the created subscription.

Note
take care, that calling any one of those helper functions creates a separated subscription and it can behave unexpectedly, if you are not aware whether you are using hot-vs-cold-observales.

Consume successful termination

With the on-end function you can watch the successful termination of an observable sequence:

(def sub (rx/on-end sub #(println "end!")))

Consume error termination

With the on-error function you can watch the error termination of an observable sequence:

(def sub (rx/on-end sub #(println "error:" %)))

Backpressure-aware consimption

The streams created using the generate function, are backpressure-aware streams and can be consumed in an backpressure-aware way. For this you need to pass an object, that implements the ISubscriber protocol. Lets see an example:

(def stream
  (->> (rx/from-coll [1 2 3])
       (rx/to-flowable :buffer)))

(rx/subscribe-with stream (reify rx/ISubscriber
                            (-on-init [_ s]
                              (s/request! s 1))

                            (-on-next [_ s v]
                              (println "on-next:" v)
                              (s/request! s 1))

                            (-on-error [_ e]
                              (println "on-error"))

                            (-on-end [_]
                              (println "on-end"))))
;; => on-next: 1
;; => on-next: 2
;; => on-next: 3
;; => on-end

In the same way as the subscribe function, the subscribe-with returns a cancellable subscription object.

Transformations

Filter

The main advantage of using reactive streams is that you may treat them like normal sequences, and in this case filter them with a predicate:

(def stream (->> (rx/from-coll [1 2 3 4 5])
                 (rx/filter #(> % 3))))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 4
;; ==> on-value: 5
;; ==> on-end

Map

Also, you can apply a function over each value in the stream:

(def stream (->> (rx/from-coll [1 2])
                 (rx/map inc)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Flat Map

Converts an observable sequence, that can contain other observable sequences, into a new observable sequence, that emits just plain values.

The result is similar to concatenating all the underlying sequences.

(def stream (->> (rx/from-coll [1 2])
                 (rx/map #(sfrom-coll (range % (+ % 2))))
                 (rx/flat-map))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 2
;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Skip

Also, sometimes you just want to skip values from stream by different criteria.

You can skip the first N values:

(def stream (->> (rx/from-coll [1 2 3 4 5 6])
                 (rx/skip 4)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 5
;; ==> on-value: 6
;; ==> on-end

Skip while some predicate evaluates to true:

(def stream (->> (rx/from-coll [1 1 1 1 2 3])
                 (rx/skip-while odd?)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Or skip until another observable yields a value with skip-until (no example at this moment).

Take

You can also limit the observable sequence to an specified number of elements:

(def stream (->> (rx/from-coll [1 1 1 1 2 3])
                 (rx/take 2)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-end

Or a predicate evaluates to true:

(def stream (->> (rx/from-coll [1 1 1 1 2 3])
                 (rx/take-while odd?)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-end

Slice

This is a combination of skip and take, and returns an observable sequence, that represents the portion between start and end of the source observable sequence.

(def stream (->> (rx/from-coll [1 2 3 4])
                 (rx/slice 1 3)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Reduce

Allows combining all results of an observable sequence using a combining function (also called reducing function):

(def stream (->> (rx/from-coll [1 2 3 4])
                 (rx/reduce + 0)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 10
;; ==> on-end

Scan

Like reduce (see above), but returns a stream of each intermediate result instead (similar to reductions in Clojure):

(def stream (->> (rx/from-coll [1 2 3 4])
                 (rx/scan + 0)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 3
;; ==> on-value: 6
;; ==> on-value: 10
;; ==> on-end

Buffer

This transformer function allows to accumulate N values in a buffer and then emits them as one value (similar to partition in Clojure)

(def stream (->> (rx/from-coll [1 2 3 4])
                 (rx/buffer 2)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: [1 2]
;; ==> on-value: [3 4]
;; ==> on-end

Combinators

Choice

Performs an arbitrary choice between two or more observable sequences and returns the first value available from any provided observables.

This kind of combinator works very well with operations that can timeout:

(def stream (rx/choice
              (rx/timeout 1000 :timeout)
              (rx/timeout 900 :value)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: :value
;; ==> on-end

Zip

This combinator combines two observable sequences in one.

(def stream (rx/zip
              (rx/from-coll [1 2 3])
              (rx/from-coll [2 3 4])))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: [1 2]
;; ==> on-value: [2 3]
;; ==> on-value: [3 4]
;; ==> on-end

Concat

This combinator concatenates two or more observable sequences in order.

(def stream (rx/concat
              (rx/from-coll [1 2])
              (rx/from-coll [3 4])))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-value: 4
;; ==> on-end

Merge

This combinator merges two or more observable sequences at random (see concat for ordered).

(def stream (rx/merge
              (rx/from-coll [1 2])
              (rx/from-coll [3 4])))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 3
;; ==> on-value: 2
;; ==> on-value: 4
;; ==> on-end

Subject

This is an abstraction that combines observable sequence with the observer. So you can push values into it and transform and subscribe to it like any other sequence.

Creating a subject.

You can create a subject instance using the subject constructor function.

This is an example of using subject for two things: push values and subscribe to it.

(def subject (rx/subject))
(def stream (->> subject
                (rx/skip 1)
                (rx/map inc)
                (rx/take 2)))

(rx/subscribe stream
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

(rx/push! subject 1)
(rx/push! subject 2)
(rx/push! subject 1)
(rx/push! subject 2)

;; ==> on-value: 3
;; ==> on-value: 2
;; ==> on-end

Ending a subject

You can end a subject at any moment just by executing the end! function:

(def subject (rx/subject))

(rx/subscribe subject
              #(println "on-value:" %)
              #(println "on-error:" %)
              #(println "on-end"))

(rx/end! subject)
;; ==> on-end

Developers Guide

Philosophy

Five most important rules:

  • Beautiful is better than ugly.

  • Explicit is better than implicit.

  • Simple is better than complex.

  • Complex is better than complicated.

  • Readability counts.

All contributions to beicon should keep these important rules in mind.

Contributing

Unlike Clojure and other Clojure contributed libraries beicon does not have many restrictions for contributions. Just open an issue or pull request.

Source Code

beicon is open source and can be found on github.

You can clone the public repository with this command:

git clone https://github.com/funcool/beicon

Run tests

For running tests just execute this:

ClojureScript
./scrpts/build
node ./out/tests.js
Clojure
lein test

License

beicon is licensed under BSD (2-Clause) license:

Copyright (c) 2015-2016 Andrey Antukh <niwi@niwi.nz>

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.