Header menu logo FSharp.Core

Observable Module

Contains operations for working with first class event and other observable objects.

Functions and values

Function or value Description

Observable.add callback source

Full Usage: Observable.add callback source

Parameters:
    callback : 'T -> unit - The function to be called on each observation.
    source : IObservable<'T> - The input Observable.

Creates an observer which permanently subscribes to the given observable and which calls the given function for each observation.

callback : 'T -> unit

The function to be called on each observation.

source : IObservable<'T>

The input Observable.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers
 let multiplyByTwo = fun number -> printf $"{number * 2} "
 Observable.add multiplyByTwo observableNumbers
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<int>
module Observable from Microsoft.FSharp.Control
val multiplyByTwo: number: int -> unit
val number: int
val printf: format: Printf.TextWriterFormat<'T> -> 'T
val add: callback: ('T -> unit) -> source: System.IObservable<'T> -> unit
The sample evaluates to: 2 4 6 8 10

Observable.choose chooser source

Full Usage: Observable.choose chooser source

Parameters:
    chooser : 'T -> 'U option - The function that returns Some for observations to be propagated and None for observations to ignore.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable that only propagates some of the observations from the source.

Returns an observable which chooses a projection of observations from the source using the given function. The returned object will trigger observations x for which the splitter returns Some x. The returned object also propagates all errors arising from the source and completes when the source completes.

chooser : 'T -> 'U option

The function that returns Some for observations to be propagated and None for observations to ignore.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable that only propagates some of the observations from the source.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getOddNumbers number =
     match number with
     | _ when number % 2 = 0 -> None
     | _ -> Some number

 let map = Observable.choose getOddNumbers observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<int>
module Observable from Microsoft.FSharp.Control
val getOddNumbers: number: int -> int option
val number: int
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
val map: System.IObservable<int>
val choose: chooser: ('T -> 'U option) -> source: System.IObservable<'T> -> System.IObservable<'U>
member System.IObservable.Subscribe: callback: ('T -> unit) -> System.IDisposable
System.IObservable.Subscribe(observer: System.IObserver<int>) : System.IDisposable
val x: int
val printf: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample will output: 1 3 5

Observable.filter predicate source

Full Usage: Observable.filter predicate source

Parameters:
    predicate : 'T -> bool - The function to apply to observations to determine if it should be kept.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'T> An Observable that filters observations based on filter.

Returns an observable which filters the observations of the source by the given function. The observable will see only those observations for which the predicate returns true. The predicate is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.

predicate : 'T -> bool

The function to apply to observations to determine if it should be kept.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T>

An Observable that filters observations based on filter.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getEvenNumbers = fun number -> number % 2 = 0
 let map = Observable.filter multiplyByTwo observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<obj>
module Observable from Microsoft.FSharp.Control
val getEvenNumbers: number: int -> bool
val number: int
val map: System.IObservable<obj>
val filter: predicate: ('T -> bool) -> source: System.IObservable<'T> -> System.IObservable<'T>
member System.IObservable.Subscribe: callback: ('T -> unit) -> System.IDisposable
System.IObservable.Subscribe(observer: System.IObserver<obj>) : System.IDisposable
val x: obj
val printf: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample will output: 2 4

Observable.map mapping source

Full Usage: Observable.map mapping source

Parameters:
    mapping : 'T -> 'U - The function applied to observations from the source.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable of the type specified by mapping.

Returns an observable which transforms the observations of the source by the given function. The transformation function is executed once for each subscribed observer. The returned object also propagates error observations arising from the source and completes when the source completes.

mapping : 'T -> 'U

The function applied to observations from the source.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable of the type specified by mapping.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let multiplyByTwo = fun number -> number * 2
 let map = Observable.map multiplyByTwo observableNumbers

 map.Subscribe(fun x -> printf $"{x} ") |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<int>
module Observable from Microsoft.FSharp.Control
val multiplyByTwo: number: int -> int
val number: int
val map: System.IObservable<int>
val map: mapping: ('T -> 'U) -> source: System.IObservable<'T> -> System.IObservable<'U>
member System.IObservable.Subscribe: callback: ('T -> unit) -> System.IDisposable
System.IObservable.Subscribe(observer: System.IObserver<int>) : System.IDisposable
val x: int
val printf: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample will output: 2 4 6 8 10

Observable.merge source1 source2

Full Usage: Observable.merge source1 source2

Parameters:
Returns: IObservable<'T> An Observable that propagates information from both sources.

Returns an observable for the merged observations from the sources. The returned object propagates success and error values arising from either source and completes when both the sources have completed.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the sources must not be triggered concurrently on different threads.

source1 : IObservable<'T>

The first Observable.

source2 : IObservable<'T>

The second Observable.

Returns: IObservable<'T>

An Observable that propagates information from both sources.

Example

 open System.Reactive.Linq
 open System

 let createTimer interval =
     let timer = new Timers.Timer(interval)
     timer.AutoReset <- true
     timer.Enabled <- true
     Observable.Create(fun observer -> timer.Elapsed.Subscribe(observer))

 let observableFirstTimer = createTimer 1000
 let observableSecondTimer = createTimer 3000

 let result = Observable.merge observableFirstTimer observableSecondTimer

 result.Subscribe(fun output -> printfn $"Output - {output.SignalTime} ")
 |> ignore

 Console.ReadLine() |> ignore
namespace System
val createTimer: interval: 'a -> 'b
val interval: 'a
val timer: Timers.Timer
namespace System.Timers
Multiple items
type Timer = inherit Component interface ISupportInitialize new: unit -> unit + 2 overloads member BeginInit: unit -> unit member Close: unit -> unit member EndInit: unit -> unit member Start: unit -> unit member Stop: unit -> unit member AutoReset: bool member Enabled: bool ...
<summary>Generates an event after a set interval, with an option to generate recurring events.</summary>

--------------------
Timers.Timer() : Timers.Timer
Timers.Timer(interval: float) : Timers.Timer
Timers.Timer(interval: TimeSpan) : Timers.Timer
property Timers.Timer.AutoReset: bool with get, set
<summary>Gets or sets a Boolean indicating whether the <see cref="T:System.Timers.Timer" /> should raise the <see cref="E:System.Timers.Timer.Elapsed" /> event only once (<see langword="false" />) or repeatedly (<see langword="true" />).</summary>
<returns><see langword="true" /> if the <see cref="T:System.Timers.Timer" /> should raise the <see cref="E:System.Timers.Timer.Elapsed" /> event each time the interval elapses; <see langword="false" /> if it should raise the <see cref="E:System.Timers.Timer.Elapsed" /> event only once, after the first time the interval elapses. The default is <see langword="true" />.</returns>
property Timers.Timer.Enabled: bool with get, set
<summary>Gets or sets a value indicating whether the <see cref="T:System.Timers.Timer" /> should raise the <see cref="E:System.Timers.Timer.Elapsed" /> event.</summary>
<exception cref="T:System.ObjectDisposedException">This property cannot be set because the timer has been disposed.</exception>
<exception cref="T:System.ArgumentException">The <see cref="P:System.Timers.Timer.Interval" /> property was set to a value greater than <see cref="F:System.Int32.MaxValue">Int32.MaxValue</see> before the timer was enabled.</exception>
<returns><see langword="true" /> if the <see cref="T:System.Timers.Timer" /> should raise the <see cref="E:System.Timers.Timer.Elapsed" /> event; otherwise, <see langword="false" />. The default is <see langword="false" />.</returns>
module Observable from Microsoft.FSharp.Control
event Timers.Timer.Elapsed: IEvent<Timers.ElapsedEventHandler,Timers.ElapsedEventArgs>
member IObservable.Subscribe: callback: ('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<Timers.ElapsedEventArgs>) : IDisposable
val observableFirstTimer: IObservable<obj>
val observableSecondTimer: IObservable<obj>
val result: IObservable<obj>
val merge: source1: IObservable<'T> -> source2: IObservable<'T> -> IObservable<'T>
member IObservable.Subscribe: callback: ('T -> unit) -> IDisposable
IObservable.Subscribe(observer: IObserver<obj>) : IDisposable
val output: obj
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
type Console = static member Beep: unit -> unit + 1 overload static member Clear: unit -> unit static member GetCursorPosition: unit -> struct (int * int) static member MoveBufferArea: sourceLeft: int * sourceTop: int * sourceWidth: int * sourceHeight: int * targetLeft: int * targetTop: int -> unit + 1 overload static member OpenStandardError: unit -> Stream + 1 overload static member OpenStandardInput: unit -> Stream + 1 overload static member OpenStandardOutput: unit -> Stream + 1 overload static member Read: unit -> int static member ReadKey: unit -> ConsoleKeyInfo + 1 overload static member ReadLine: unit -> string ...
<summary>Represents the standard input, output, and error streams for console applications. This class cannot be inherited.</summary>
Console.ReadLine() : string
The sample will merge all events at a given interval and output it to the stream: Output - 2/5/2022 3:49:37 AM Output - 2/5/2022 3:49:38 AM Output - 2/5/2022 3:49:39 AM Output - 2/5/2022 3:49:39 AM Output - 2/5/2022 3:49:40 AM Output - 2/5/2022 3:49:41 AM Output - 2/5/2022 3:49:42 AM Output - 2/5/2022 3:49:42 AM

Observable.pairwise source

Full Usage: Observable.pairwise source

Parameters:
Returns: IObservable<'T * 'T> An Observable that triggers on successive pairs of observations from the input Observable.

Returns a new observable that triggers on the second and subsequent triggerings of the input observable. The Nth triggering of the input observable passes the arguments from the N-1th and Nth triggering as a pair. The argument passed to the N-1th triggering is held in hidden internal state until the Nth triggering occurs.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the source must not be triggered concurrently on different threads.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T * 'T>

An Observable that triggers on successive pairs of observations from the input Observable.

Example

 /// open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let pairWise = Observable.pairwise observableNumbers

 pairWise.Subscribe(fun pair -> printf $"{pair} ")
 |> ignore
val numbers: int seq
 open System.Reactive.Linq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<obj>
module Observable from Microsoft.FSharp.Control
val pairWise: System.IObservable<obj * obj>
val pairwise: source: System.IObservable<'T> -> System.IObservable<'T * 'T>
member System.IObservable.Subscribe: callback: ('T -> unit) -> System.IDisposable
System.IObservable.Subscribe(observer: System.IObserver<obj * obj>) : System.IDisposable
val pair: obj * obj
val printf: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample evaluates to: (1, 2), (2, 3), (3, 4), (4, 5)

Observable.partition predicate source

Full Usage: Observable.partition predicate source

Parameters:
    predicate : 'T -> bool - The function to determine which output Observable will trigger a particular observation.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'T> * IObservable<'T> A tuple of Observables. The first triggers when the predicate returns true, and the second triggers when the predicate returns false.

Returns two observables which partition the observations of the source by the given function. The first will trigger observations for those values for which the predicate returns true. The second will trigger observations for those values where the predicate returns false. The predicate is executed once for each subscribed observer. Both also propagate all error observations arising from the source and each completes when the source completes.

predicate : 'T -> bool

The function to determine which output Observable will trigger a particular observation.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'T> * IObservable<'T>

A tuple of Observables. The first triggers when the predicate returns true, and the second triggers when the predicate returns false.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let isEvenNumber = fun number -> number % 2 = 0

 let leftPartition, rightPartition =
     Observable.partition isEvenNumber observableNumbers

 leftPartition.Subscribe(fun x -> printfn $"Left partition: {x}") |> ignore

 rightPartition.Subscribe(fun x -> printfn $"Right partition: {x}") |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<int>
module Observable from Microsoft.FSharp.Control
val isEvenNumber: number: int -> bool
val number: int
val leftPartition: System.IObservable<int>
val rightPartition: System.IObservable<int>
val partition: predicate: ('T -> bool) -> source: System.IObservable<'T> -> System.IObservable<'T> * System.IObservable<'T>
member System.IObservable.Subscribe: callback: ('T -> unit) -> System.IDisposable
System.IObservable.Subscribe(observer: System.IObserver<int>) : System.IDisposable
val x: int
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample evaluates to: Left partition: 2, 4, Right partition: 1, 3, 5

Observable.scan collector state source

Full Usage: Observable.scan collector state source

Parameters:
    collector : 'U -> 'T -> 'U - The function to update the state with each observation.
    state : 'U - The initial state.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U> An Observable that triggers on the updated state values.

Returns an observable which, for each observer, allocates an item of state and applies the given accumulating function to successive values arising from the input. The returned object will trigger observations for each computed state value, excluding the initial value. The returned object propagates all errors arising from the source and completes when the source completes.

For each observer, the registered intermediate observing object is not thread safe. That is, observations arising from the source must not be triggered concurrently on different threads.

collector : 'U -> 'T -> 'U

The function to update the state with each observation.

state : 'U

The initial state.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U>

An Observable that triggers on the updated state values.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let multiplyBy number = fun y -> number * y
 let initialState = 2
 let scan = Observable.scan multiplyBy initialState observableNumbers

 scan.Subscribe(fun x -> printf "%A " x) |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<int>
module Observable from Microsoft.FSharp.Control
val multiplyBy: number: int -> y: int -> int
val number: int
val y: int
val initialState: int
val scan: System.IObservable<int>
val scan: collector: ('U -> 'T -> 'U) -> state: 'U -> source: System.IObservable<'T> -> System.IObservable<'U>
member System.IObservable.Subscribe: callback: ('T -> unit) -> System.IDisposable
System.IObservable.Subscribe(observer: System.IObserver<int>) : System.IDisposable
val x: int
val printf: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample evaluates to: 2 4 12 48 240

Observable.split splitter source

Full Usage: Observable.split splitter source

Parameters:
    splitter : 'T -> Choice<'U1, 'U2> - The function that takes an observation an transforms it into one of the two output Choice types.
    source : IObservable<'T> - The input Observable.

Returns: IObservable<'U1> * IObservable<'U2> A tuple of Observables. The first triggers when splitter returns Choice1of2 and the second triggers when splitter returns Choice2of2.

Returns two observables which split the observations of the source by the given function. The first will trigger observations x for which the splitter returns Choice1Of2 x. The second will trigger observations y for which the splitter returns Choice2Of2 y The splitter is executed once for each subscribed observer. Both also propagate error observations arising from the source and each completes when the source completes.

splitter : 'T -> Choice<'U1, 'U2>

The function that takes an observation an transforms it into one of the two output Choice types.

source : IObservable<'T>

The input Observable.

Returns: IObservable<'U1> * IObservable<'U2>

A tuple of Observables. The first triggers when splitter returns Choice1of2 and the second triggers when splitter returns Choice2of2.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..5 }
 let observableNumbers = Observable.ToObservable numbers

 let getEvenNumbers number =
     match number % 2 = 0 with
     | true -> Choice1Of2 number
     | false -> Choice2Of2 $"{number} is not an even number"

 let evenSplit, printOddNumbers = Observable.split getEvenNumbers observableNumbers

 let printOutput observable functionName =
     use subscription =
         Observable.subscribe
             (fun output -> printfn $"{functionName} - Split output: {output}. Type: {output.GetType()}")
             observable

     subscription

 printOutput evenSplit (nameof evenSplit) |> ignore
 printOutput printOddNumbers (nameof printOddNumbers) |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<int>
module Observable from Microsoft.FSharp.Control
val getEvenNumbers: number: int -> Choice<int,string>
val number: int
union case Choice.Choice1Of2: 'T1 -> Choice<'T1,'T2>
union case Choice.Choice2Of2: 'T2 -> Choice<'T1,'T2>
val evenSplit: System.IObservable<int>
val printOddNumbers: System.IObservable<string>
val split: splitter: ('T -> Choice<'U1,'U2>) -> source: System.IObservable<'T> -> System.IObservable<'U1> * System.IObservable<'U2>
val printOutput: observable: System.IObservable<'a> -> functionName: 'b -> System.IDisposable
val observable: System.IObservable<'a>
val functionName: 'b
val subscription: System.IDisposable
val subscribe: callback: ('T -> unit) -> source: System.IObservable<'T> -> System.IDisposable
val output: 'a
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
val nameof: 'T -> string
val ignore: value: 'T -> unit
The sample evaluates to: evenSplit - Split output: 2. Type: System.Int32 evenSplit - Split output: 4. Type: System.Int32 printOddNumbers - Split output: 1 is not an even number. Type: System.String printOddNumbers - Split output: 3 is not an even number. Type: System.String printOddNumbers - Split output: 5 is not an even number. Type: System.String

Observable.subscribe callback source

Full Usage: Observable.subscribe callback source

Parameters:
    callback : 'T -> unit - The function to be called on each observation.
    source : IObservable<'T> - The input Observable.

Returns: IDisposable An object that will remove the callback if disposed.

Creates an observer which subscribes to the given observable and which calls the given function for each observation.

callback : 'T -> unit

The function to be called on each observation.

source : IObservable<'T>

The input Observable.

Returns: IDisposable

An object that will remove the callback if disposed.

Example

 open System.Reactive.Linq
 let numbers = seq { 1..3 }
 let observableNumbers = Observable.ToObservable numbers
 let printOutput observable =
     use subscription = Observable.subscribe (fun x -> printfn "%A" x) observable
     subscription
 printOutput observableNumbers |> ignore
namespace System
val numbers: int seq
Multiple items
val seq: sequence: 'T seq -> 'T seq

--------------------
type 'T seq = System.Collections.Generic.IEnumerable<'T>
val observableNumbers: System.IObservable<obj>
module Observable from Microsoft.FSharp.Control
val printOutput: observable: System.IObservable<'a> -> System.IDisposable
val observable: System.IObservable<'a>
val subscription: System.IDisposable
val subscribe: callback: ('T -> unit) -> source: System.IObservable<'T> -> System.IDisposable
val x: 'a
val printfn: format: Printf.TextWriterFormat<'T> -> 'T
val ignore: value: 'T -> unit
The sample evaluates to: 1, 2, 3

Type something to start searching.