FSharp.Core


Observable Module

Contains operations for working with first class event and other observable objects.

Functions and values

Function or value Description

Observable.add callback source

Full Usage: Observable.add callback source

Parameters:
    callback : 'T -> unit - The function to be called on each observation.
    source : IObservable<'T> - The input Observable.

Creates an observer which permanently subscribes to the given observable and which calls the given function for each observation.

callback : 'T -> unit

The function to be called on each observation.

source : IObservable<'T>

The input Observable.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 let multiplyByTwo = fun number -> printf $"{number * 2} "
 Observable.add multiplyByTwo observableNumbers
The sample evaluates to: 2 4 6 8 10

Observable.choose chooser source

Full Usage: Observable.choose chooser source

Parameters:
    chooser : 'T -> 'U option - The function that returns Some for observations to be propagated and None for observations to ignore.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable that only propagates some of the observations from the source.

Returns an observable which chooses a projection of observations from the source using the given function. The returned object will trigger observations x for which the splitter returns Some x. The returned object also propagates all errors arising from the source and completes when the source completes.

chooser : 'T -> 'U option

The function that returns Some for observations to be propagated and None for observations to ignore.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable that only propagates some of the observations from the source.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 
 let getOddNumbers number =
     match number with
     | _ when number % 2 = 0 -> None
     | _ -> Some number
 
 let map = Observable.choose getOddNumbers observableNumbers
 
 map.Subscribe(fun x -> printf $"{x} ") |> ignore
The sample will output: 1 3 5

Observable.filter predicate source

Full Usage: Observable.filter predicate source

Parameters:
    predicate : 'T -> bool - The function to apply to observations to determine if it should be kept.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'T> An Observable that filters observations based on filter.

Returns an observable which filters the observations of the source by the given function. The observable will see only those observations for which the predicate returns true. The predicate is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.

predicate : 'T -> bool

The function to apply to observations to determine if it should be kept.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T>

An Observable that filters observations based on filter.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getEvenNumbers = fun number -> number % 2 = 0
 let map = Observable.filter multiplyByTwo observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
The sample will output: 2 4

Observable.map mapping source

Full Usage: Observable.map mapping source

Parameters:
    mapping : 'T -> 'U - The function applied to observations from the source.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable of the type specified by mapping.

Returns an observable which transforms the observations of the source by the given function. The transformation function is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.

mapping : 'T -> 'U

The function applied to observations from the source.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable of the type specified by mapping.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let multiplyByTwo = fun number -> number * 2
 let map = Observable.map multiplyByTwo observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
The sample will output: 2 4 6 8 10

Observable.merge source1 source2

Full Usage: Observable.merge source1 source2

Parameters:
Returns: IObservable<'T> An Observable that propagates information from both sources.

Returns an observable for the merged observations from the sources. The returned object propagates success and error values arising from either source and completes when both the sources have completed.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the sources must not be triggered concurrently on different threads.

source1 : IObservable<'T>

The first Observable.

source2 : IObservable<'T>

The second Observable.

Returns: IObservable<'T>

An Observable that propagates information from both sources.

Example

 open System.Reactive.Linq
 open System
 
 let createTimer interval =
     let timer = new Timers.Timer(interval)
     timer.AutoReset <- true
     timer.Enabled <- true
     Observable.Create(fun observer -> timer.Elapsed.Subscribe(observer))
 
 let observableFirstTimer = createTimer 1000
 let observableSecondTimer = createTimer 3000
 
 let result = Observable.merge observableFirstTimer observableSecondTimer
 
 result.Subscribe(fun output -> printfn $"Output - {output.SignalTime} ")
 |> ignore
 
 Console.ReadLine() |> ignore
The sample will merge all events at a given interval and output it to the stream: Output - 2/5/2022 3:49:37 AM Output - 2/5/2022 3:49:38 AM Output - 2/5/2022 3:49:39 AM Output - 2/5/2022 3:49:39 AM Output - 2/5/2022 3:49:40 AM Output - 2/5/2022 3:49:41 AM Output - 2/5/2022 3:49:42 AM Output - 2/5/2022 3:49:42 AM

Observable.pairwise source

Full Usage: Observable.pairwise source

Parameters:
Returns: IObservable<'T * 'T> An Observable that triggers on successive pairs of observations from the input Observable.

Returns a new observable that triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the source must not be triggered concurrently on different threads.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T * 'T>

An Observable that triggers on successive pairs of observations from the input Observable.

Example

 /// open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 
 let pairWise = Observable.pairwise observableNumbers
 
 pairWise.Subscribe(fun pair -> printf $"{pair} ")
 |> ignore
The sample evaluates to: (1, 2), (2, 3), (3, 4), (4, 5)

Observable.partition predicate source

Full Usage: Observable.partition predicate source

Parameters:
    predicate : 'T -> bool - The function to determine which output Observable will trigger a particular observation.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'T> * IObservable<'T> A tuple of Observables. The first triggers when the predicate returns true, and the second triggers when the predicate returns false.

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

predicate : 'T -> bool

The function to determine which output Observable will trigger a particular observation.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T> * IObservable<'T>

A tuple of Observables. The first triggers when the predicate returns true, and the second triggers when the predicate returns false.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 
 let isEvenNumber = fun number -> number % 2 = 0
 let initialState = 2
 
 let leftPartition, rightPartition =
     Observable.partition isEvenNumber observableNumbers
 
 leftPartition.Subscribe(fun x -> printfn $"Left partition: {x}") |> ignore
 
 rightPartition.Subscribe(fun x -> printfn $"Right partition: {x}") |> ignore
The sample evaluates to: Left partition: 2, 4, Right partition: 1, 3, 5

Observable.scan collector state source

Full Usage: Observable.scan collector state source

Parameters:
    collector : 'U -> 'T -> 'U - The function to update the state with each observation.
    state : 'U - The initial state.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable that triggers on the updated state values.

Returns an observable which, for each observer, allocates an item of state and applies the given accumulating function to successive values arising from the input. The returned object will trigger observations for each computed state value, excluding the initial value. The returned object propagates all errors arising from the source and completes when the source completes.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the source must not be triggered concurrently on different threads.

collector : 'U -> 'T -> 'U

The function to update the state with each observation.

state : 'U

The initial state.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable that triggers on the updated state values.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 
 let multiplyBy number = fun y -> number * y
 let initialState = 2
 let scan = Observable.scan multiplyBy initialState observableNumbers
 
 scan.Subscribe(fun x -> printf "%A " x) |> ignore
The sample evaluates to: 2 4 12 48 240

Observable.split splitter source

Full Usage: Observable.split splitter source

Parameters:
    splitter : 'T -> Choice<'U1, 'U2> - The function that takes an observation an transforms it into one of the two output Choice types.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U1> * IObservable<'U2> A tuple of Observables. The first triggers when splitter returns Choice1of2 and the second triggers when splitter returns Choice2of2.

Returns two observables which split the observations of the source by the given function. The first will trigger observations x for which the splitter returns Choice1Of2 x. The second will trigger observations y for which the splitter returns Choice2Of2 y The splitter is executed once for each subscribed observer. Both also propagate error observations arising from the source and each completes when the source completes.

splitter : 'T -> Choice<'U1, 'U2>

The function that takes an observation an transforms it into one of the two output Choice types.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U1> * IObservable<'U2>

A tuple of Observables. The first triggers when splitter returns Choice1of2 and the second triggers when splitter returns Choice2of2.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 
 let getEvenNumbers number =
     match number % 2 = 0 with
     | true -> Choice1Of2 number
     | false -> Choice2Of2 $"{number} is not an even number"
 
 let evenSplit, printOddNumbers = Observable.split getEvenNumbers observableNumbers
 
 let printOutput observable functionName =
     use subscription =
         Observable.subscribe
             (fun output -> printfn $"{functionName} - Split output: {output}. Type: {output.GetType()}")
             observable
 
     subscription
 
 printOutput evenSplit (nameof evenSplit) |> ignore
 printOutput printOddNumbers (nameof printOddNumbers) |> ignore
The sample evaluates to: evenSplit - Split output: 2. Type: System.Int32 evenSplit - Split output: 4. Type: System.Int32 printOddNumbers - Split output: 1 is not an even number. Type: System.String printOddNumbers - Split output: 3 is not an even number. Type: System.String printOddNumbers - Split output: 5 is not an even number. Type: System.String

Observable.subscribe callback source

Full Usage: Observable.subscribe callback source

Parameters:
    callback : 'T -> unit - The function to be called on each observation.
    source : IObservable<'T> - The input Observable.

Returns: IDisposable An object that will remove the callback if disposed.

Creates an observer which subscribes to the given observable and which calls the given function for each observation.

callback : 'T -> unit

The function to be called on each observation.

source : IObservable<'T>

The input Observable.

Returns: IDisposable

An object that will remove the callback if disposed.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..3 }
 let observableNumbers = Observable.ToObservable numbers
 let printOutput observable =
     use subscription = Observable.subscribe (fun x -> printfn "%A" x) observable
     subscription
 printOutput observableNumbers |> ignore
The sample evaluates to: 1, 2, 3