FSharp.Core


Observable Module

Contains operations for working with first class event and other observable objects.

Functions and values

Function or value Description

Observable.add callback source

Full Usage: Observable.add callback source

Parameters:
    callback : 'T -> unit - The function to be called on each observation.
    source : IObservable<'T> - The input Observable.

Creates an observer which permanently subscribes to the given observable and which calls the given function for each observation.

callback : 'T -> unit

The function to be called on each observation.

source : IObservable<'T>

The input Observable.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 let multiplyByTwo = fun number -> printf $"{number * 2} "
 Observable.add multiplyByTwo observableNumbers
The sample evaluates to: 2 4 6 8 10

Observable.choose chooser source

Full Usage: Observable.choose chooser source

Parameters:
    chooser : 'T -> 'U option - The function that returns Some for observations to be propagated and None for observations to ignore.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable that only propagates some of the observations from the source.

Returns an observable which chooses a projection of observations from the source using the given function. The returned object will trigger observations x for which the splitter returns Some x. The returned object also propagates all errors arising from the source and completes when the source completes.

chooser : 'T -> 'U option

The function that returns Some for observations to be propagated and None for observations to ignore.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable that only propagates some of the observations from the source.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getOddNumbers number =
     match number with
     | _ when number % 2 = 0 -> None
     | _ -> Some number

 let map = Observable.choose getOddNumbers observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
The sample will output: 1 3 5

Observable.filter predicate source

Full Usage: Observable.filter predicate source

Parameters:
    predicate : 'T -> bool - The function to apply to observations to determine if it should be kept.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'T> An Observable that filters observations based on filter.

Returns an observable which filters the observations of the source by the given function. The observable will see only those observations for which the predicate returns true. The predicate is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.

predicate : 'T -> bool

The function to apply to observations to determine if it should be kept.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T>

An Observable that filters observations based on filter.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getEvenNumbers = fun number -> number % 2 = 0
 let map = Observable.filter multiplyByTwo observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
The sample will output: 2 4

Observable.map mapping source

Full Usage: Observable.map mapping source

Parameters:
    mapping : 'T -> 'U - The function applied to observations from the source.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable of the type specified by mapping.

Returns an observable which transforms the observations of the source by the given function. The transformation function is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.

mapping : 'T -> 'U

The function applied to observations from the source.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable of the type specified by mapping.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let multiplyByTwo = fun number -> number * 2
 let map = Observable.map multiplyByTwo observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
The sample will output: 2 4 6 8 10

Observable.merge source1 source2

Full Usage: Observable.merge source1 source2

Parameters:
Returns: IObservable<'T> An Observable that propagates information from both sources.

Returns an observable for the merged observations from the sources. The returned object propagates success and error values arising from either source and completes when both the sources have completed.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the sources must not be triggered concurrently on different threads.

source1 : IObservable<'T>

The first Observable.

source2 : IObservable<'T>

The second Observable.

Returns: IObservable<'T>

An Observable that propagates information from both sources.

Example


 open System.Reactive.Linq
 open System

 let createTimer interval =
     let timer = new Timers.Timer(interval)
     timer.AutoReset <- true
     timer.Enabled <- true
     Observable.Create(fun observer -> timer.Elapsed.Subscribe(observer))

 let observableFirstTimer = createTimer 1000
 let observableSecondTimer = createTimer 3000

 let result = Observable.merge observableFirstTimer observableSecondTimer

 result.Subscribe(fun output -> printfn $"Output - {output.SignalTime} ")
 |> ignore

 Console.ReadLine() |> ignore
The sample will merge all events at a given interval and output it to the stream: Output - 2/5/2022 3:49:37 AM Output - 2/5/2022 3:49:38 AM Output - 2/5/2022 3:49:39 AM Output - 2/5/2022 3:49:39 AM Output - 2/5/2022 3:49:40 AM Output - 2/5/2022 3:49:41 AM Output - 2/5/2022 3:49:42 AM Output - 2/5/2022 3:49:42 AM

Observable.pairwise source

Full Usage: Observable.pairwise source

Parameters:
Returns: IObservable<'T * 'T> An Observable that triggers on successive pairs of observations from the input Observable.

Returns a new observable that triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the source must not be triggered concurrently on different threads.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T * 'T>

An Observable that triggers on successive pairs of observations from the input Observable.

Example


 /// open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let pairWise = Observable.pairwise observableNumbers

 pairWise.Subscribe(fun pair -> printf $"{pair} ")
 |> ignore
The sample evaluates to: (1, 2), (2, 3), (3, 4), (4, 5)

Observable.partition predicate source

Full Usage: Observable.partition predicate source

Parameters:
    predicate : 'T -> bool - The function to determine which output Observable will trigger a particular observation.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'T> * IObservable<'T> A tuple of Observables. The first triggers when the predicate returns true, and the second triggers when the predicate returns false.

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

predicate : 'T -> bool

The function to determine which output Observable will trigger a particular observation.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T> * IObservable<'T>

A tuple of Observables. The first triggers when the predicate returns true, and the second triggers when the predicate returns false.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let isEvenNumber = fun number -> number % 2 = 0
 let initialState = 2

 let leftPartition, rightPartition =
     Observable.partition isEvenNumber observableNumbers

 leftPartition.Subscribe(fun x -> printfn $"Left partition: {x}") |> ignore

 rightPartition.Subscribe(fun x -> printfn $"Right partition: {x}") |> ignore
The sample evaluates to: Left partition: 2, 4, Right partition: 1, 3, 5

Observable.scan collector state source

Full Usage: Observable.scan collector state source

Parameters:
    collector : 'U -> 'T -> 'U - The function to update the state with each observation.
    state : 'U - The initial state.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable that triggers on the updated state values.

Returns an observable which, for each observer, allocates an item of state and applies the given accumulating function to successive values arising from the input. The returned object will trigger observations for each computed state value, excluding the initial value. The returned object propagates all errors arising from the source and completes when the source completes.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the source must not be triggered concurrently on different threads.

collector : 'U -> 'T -> 'U

The function to update the state with each observation.

state : 'U

The initial state.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable that triggers on the updated state values.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let multiplyBy number = fun y -> number * y
 let initialState = 2
 let scan = Observable.scan multiplyBy initialState observableNumbers

 scan.Subscribe(fun x -> printf "%A " x) |> ignore
The sample evaluates to: 2 4 12 48 240

Observable.split splitter source

Full Usage: Observable.split splitter source

Parameters:
    splitter : 'T -> Choice<'U1, 'U2> - The function that takes an observation an transforms it into one of the two output Choice types.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U1> * IObservable<'U2> A tuple of Observables. The first triggers when splitter returns Choice1of2 and the second triggers when splitter returns Choice2of2.

Returns two observables which split the observations of the source by the given function. The first will trigger observations x for which the splitter returns Choice1Of2 x. The second will trigger observations y for which the splitter returns Choice2Of2 y The splitter is executed once for each subscribed observer. Both also propagate error observations arising from the source and each completes when the source completes.

splitter : 'T -> Choice<'U1, 'U2>

The function that takes an observation an transforms it into one of the two output Choice types.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U1> * IObservable<'U2>

A tuple of Observables. The first triggers when splitter returns Choice1of2 and the second triggers when splitter returns Choice2of2.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getEvenNumbers number =
     match number % 2 = 0 with
     | true -> Choice1Of2 number
     | false -> Choice2Of2 $"{number} is not an even number"

 let evenSplit, printOddNumbers = Observable.split getEvenNumbers observableNumbers

 let printOutput observable functionName =
     use subscription =
         Observable.subscribe
             (fun output -> printfn $"{functionName} - Split output: {output}. Type: {output.GetType()}")
             observable

     subscription

 printOutput evenSplit (nameof evenSplit) |> ignore
 printOutput printOddNumbers (nameof printOddNumbers) |> ignore
The sample evaluates to: evenSplit - Split output: 2. Type: System.Int32 evenSplit - Split output: 4. Type: System.Int32 printOddNumbers - Split output: 1 is not an even number. Type: System.String printOddNumbers - Split output: 3 is not an even number. Type: System.String printOddNumbers - Split output: 5 is not an even number. Type: System.String

Observable.subscribe callback source

Full Usage: Observable.subscribe callback source

Parameters:
    callback : 'T -> unit - The function to be called on each observation.
    source : IObservable<'T> - The input Observable.

Returns: IDisposable An object that will remove the callback if disposed.

Creates an observer which subscribes to the given observable and which calls the given function for each observation.

callback : 'T -> unit

The function to be called on each observation.

source : IObservable<'T>

The input Observable.

Returns: IDisposable

An object that will remove the callback if disposed.

Example


 open System.Reactive.Linq
 let numbers = seq { 1..3 }
 let observableNumbers = Observable.ToObservable numbers
 let printOutput observable =
     use subscription = Observable.subscribe (fun x -> printfn "%A" x) observable
     subscription
 printOutput observableNumbers |> ignore
The sample evaluates to: 1, 2, 3