En el post pasado les comencé a hablar de las Reactive Extensions y cómo es que introducían un paradigma de programación distinto. Ahí mostré un ejemplo muy sencillo de la programación reactiva... pero sé que no siempre todo el código que escribiremos será así de sencillo, así que en este post crearé y mostraré un caso más complejo en el que podemos utilizar Rx.

El río

Piensa en un río en el cual hay un conjunto de peces que viajan a través de él, es por eso que los pescadores se acercan al río para tratar de conseguir un poco de alimento. El río produce una cantidad x de peces que pasan por él uno a uno cada y milisegundos, además de que hay ocasiones en que el agua del río hace muy peligroso pescar en él.

River

Para recrear las condiciones anteriores, crea una clase llamada River que reciba y almacene las especificaciones de la cantidad de peces, el tiempo entre ellos y si es o no peligroso:

public class River
{
    private readonly int _fishAmount;
    private readonly int _waitTime;
    private readonly bool _dangerousRiver;


    public River(int fishAmount, int waitTime = 500,  bool dangerousRiver = false)
    {
        _fishAmount = fishAmount;
        _waitTime = waitTime;
        _dangerousRiver = dangerousRiver;
    }

Fish

Pero aguarda, no te olvides de los peces: estos tendrán una especie (FishSpecies), un color (Color) y un peso (Weight), todo representado dentro de una clase:

    public class Fish
    {
        public double Weight { get; set; }
        public Color Color { get; set; }
        public FishSpecies Species { get; set; }

        #region Random fish

        private static readonly Random R = new Random();
        private const double MinWeight = 500; // Gramos
        private const double MaxWeight = 5000; // Gramos

        public static Fish RandomFish()
        {
            var fish = new Fish
            {
                Weight = (R.NextDouble() * (MaxWeight - MinWeight)) + MinWeight,
                Species = (FishSpecies) R.Next(4), // There are four species
                Color = (Color) R.Next(4) // There are four colors
            };

            return fish;
        }

        #endregion
    }

Oh, además en esta clase introduje un método para crear peces aleatoriamente.

River (cont)

Bueno, volviendo al diseño del río. Ahora toca escribir la forma en que representaremos el paso de los peces a través de él. ¿Estás de acuerdo en que es un flujo? sí, un flujo de peces. Y al ser un flujo es el un buen candidato para convertirse en un IObservablede Fish.

Para acceder al flujo de peces usaremos un método que retorne un IOBservable<Fish>. Acá es una buena oportunidad para decir que no es recomendable que tu mismo implementes la interfaz, lo mejor en muchos casos es hacer uso de las Reactive Extensions para crear un objeto de ese tipo:

    public IObservable<Fish> Stream()
    {
        return Observable.Create<Fish>(observer =>
        {
            for (int i = 0; i < _fishAmount; i++)
            {
                var fish = Fish.RandomFish();
                observer.OnNext(fish);
                Thread.Sleep(_waitTime);

                if (_dangerousRiver && i == _fishAmount / 3)
                    throw new Exception("Uh, hubo un derrame en el río");
            }
            observer.OnCompleted();
            return () => { };
        });
    }

Vamos a revisar qué es lo que está ocurriendo en ese método:

1.
Lo primero que nos damos cuenta es el tipo de retorno: IObservable<Fish>
2.
Inmediatamente después tenemos la creación de nuestro observable mediante el método estático Observable.Create, este método recibe un `Func<IObserver<Fish>, Action>`. El parámetro de entrada (IObserver<Fish>) es una referencia al suscriptor que está al tanto de lo que publiquemos. El valor de retorno (Action) es un delegado que en teoría deberíamos usar para limpiar los recursos que usamos en el flujo.
3.
Vamos a generar tantos peces como sean necesarios, en este caso creándolos de forma aleatoria, después, publicaremos cada uno de los peces a nuestro Observer mediante su método OnNext. Esperarémos después el tiempo indicado entre cada pez. Antes de repetir el ciclo comprobaremos que el río es seguro, y si no lo es, lanzaremos una excepción.
4.
Apenas terminar el ciclo enviaremos el mensaje OnCompleted al observer
5.
Regresaremos una Action vacía, recuerda que deberíamos usar este para liberar los recursos que usó nuestro flujo de datos. En este caso, no debemos liberar nada.

Usar el método Observable.Create<T> es solo una de las muchas maneras en las que podemos crear un flujo, en este sitio web puedes ver 16 formas de crearlos sin necesidad de heredar tú mismo de IObservable.

Usando el río

Vamos a crear un par de ríos para después poder suscribirnos a ellos:

var rioNormal = new River(10);
var rioRapido = new River(10, waitTime: 10);
var dangerousRiver = new River(fishAmount: 10, dangerousRiver: true);

Entonces ahora podemos suscribirnos y estar al tanto de qué peces pasan por el río:

rioNormal.Stream()
    .Subscribe(
        onNext: fish => Console.WriteLine($"{DateTime.Now:HH:mm:ss.ffff}: {fish}"),
        onCompleted: () => Console.WriteLine("¡Terminé de pescar!")
    );
13:35:40.3923: Anglerfish, Green, 3,787.68
13:35:40.9770: Anglerfish, Green, 1,462.02
13:35:41.4831: ClownFish, Black, 1,469.41
13:35:41.9871: Blowfish, Red, 1,539.82
13:35:42.4926: Tuna, Black, 2,540.65
13:35:42.9931: Blowfish, Green, 2,905.58
13:35:43.4939: Blowfish, Red, 3,611.61
13:35:43.9994: Tuna, Black, 2,029.88
13:35:44.5048: Tuna, Yellow, 1,672.38
13:35:45.0102: Anglerfish, Black, 4,194.68
¡Terminé de pescar!

Como vimos anteriormente, también podemos filtrar los elementos:

rioNormal.Stream()
    .Where(fish => fish.Color == Color.Green && fish.Weight > 3000) // Filtramos los elementos
    .Subscribe(
        onNext: fish => Console.WriteLine($"{DateTime.Now:HH:mm:ss.ffff}: {fish}"),
        onCompleted: () => Console.WriteLine("¡Terminé de pescar!")
    );
13:35:48.5546: Blowfish, Green, 3,420.28
13:35:50.0690: Anglerfish, Green, 3,974.44
¡Terminé de pescar!

Podemos agrupar los elementos con el método Buffer, que permitirá agrupar los elementos del flujo. Por ejemplo, imagina que el pescador está usando una red para 3 peces:

rioNormal.Stream()
    .Buffer(3) // Agrupamos 3 elementos
    .Subscribe(
        onNext: fishCollection =>
        {
            var f = String.Join(", ", fishCollection.Select(fi => fi.Species));
            Console.WriteLine($"Atrapé {fishCollection.Count} peces ({f})");
        },
        onCompleted: () => Console.WriteLine("¡Terminé de pescar usando una red para 3 peces!")
    );
Atrapé 3 peces (Tuna, Blowfish, Tuna)
Atrapé 3 peces (Tuna, Tuna, Blowfish)
Atrapé 3 peces (Tuna, Blowfish, Blowfish)
Atrapé 1 peces (Anglerfish)
¡Terminé de pescar usando una red para 3 peces!

Y no olvides que podemos especificarle al código que hacer en caso de que ocurra un error en el flujo, esto mediante el método (o lambda en este caso) OnError:

dangerousRiver.Stream()
    .Subscribe(
        onNext: fish => Console.WriteLine($"{DateTime.Now:HH:mm:ss.ffff}: {fish}"),
        onError: ex => Console.WriteLine($"Ocurrió un problema en el río: {ex.Message}"),
        onCompleted: () => Console.WriteLine("¡Terminé de pescar en el río peligroso!")
    );
13:35:45.5200: Anglerfish, Black, 3,171.37
13:35:46.0216: Anglerfish, Red, 3,041.97
13:35:46.5272: Anglerfish, Black, 4,589.28
13:35:47.0297: Blowfish, Black, 3,128.05
Ocurrió un problema en el río: Uh, hubo un derrame en el río

El pescador o IObserver<Fish>

Hasta el momento hemos estado empleando lambdas para reaccionar a los mensajes de un publicador. Pero si recuerdas, en el post pasado hablé de otra interfaz que representaba al observador, recordarás también que esta interfaz especifica tres métodos. Nosotros vamos a crear una clase Fisher que representará al pescador:

public class Fisher : IObserver<Fish>
{
    private readonly string _name;

    public Fisher(string name)
    {
        _name = name;
    }
    public void OnNext(Fish value)
    {
        Console.WriteLine($"{_name}: atrapé un {value.Species} de color {value.Color} a las {DateTime.Now:HH:mm:ss.ffff}");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine($"{_name}: Oops, algo pasó {error.Message}");
    }

    public void OnCompleted()
    {
        Console.WriteLine($"{_name}: ¡Terminó la pesca!");
    }
}

Y una vez que tenemos a un pescador (observador) podemos suscribirlo al flujo de peces para que esté al tanto de lo que ocurre:

var erik = new Fisher("Erik");
rioNormal.Stream()
    .Subscribe(erik);
Erik: atrapé un Anglerfish de color Red a las 13:35:57.6664
Erik: atrapé un Blowfish de color Yellow a las 13:35:57.6795
Erik: atrapé un Anglerfish de color Black a las 13:35:57.6923
Erik: atrapé un Blowfish de color Green a las 13:35:57.7051
Erik: atrapé un Anglerfish de color Red a las 13:35:57.7172
Erik: atrapé un Tuna de color Yellow a las 13:35:57.7301
Erik: atrapé un ClownFish de color Yellow a las 13:35:57.7429
Erik: atrapé un Blowfish de color Yellow a las 13:35:57.7558
Erik: atrapé un ClownFish de color Green a las 13:35:57.7686
Erik: atrapé un Blowfish de color Red a las 13:35:57.7815
Erik: ¡Terminó la pesca!

Y listo, ese fue todo el ejemplo. Recuerda que el código está en GitHub para qe lo descargues y juegues con él. En el futuro estaré creando otro par de ejemplos sobre las bondades de la programación reactiva, entre tanto, si tienes preguntas, no dudes en hacérmelas llegar.