[F#] Tail Recursion y el Stack

En la entrada anterior veíamos cómo, de manera sencilla, podíamos razonar y entender las funciones recursivas. Pero ¿Cómo funcionan estas llamadas recursivas en .NET?

Empecemos por aclarar algo. Cuando se llama a una función, el computador debe recordar el lugar desde donde fue llamada, la dirección de retorno, de modo que pueda volver a ese lugar con el resultado una vez la llamada se ha completado¹. Estas direcciones son almacenadas en el call stack. Una vez la función retorna un valor la pila de llamadas se borra y el programa continúa desde la función llamadora.

Si una función recursiva se llama a si misma una y otra vez, como un loop, el stack se irá llenando con dichas direcciones (las llamadas a si mismo). El problema es que el stack no es infinito y podemos recibir una conocida excepción, si, la Stack Overflow Exception.

Podemos comprobar este comportamiento si escribimos una función que calcule el factorial de un número de manera recursiva, algo de la forma:

    let rec factorial =
        function
        | x when x > 1 -> x * factorial (x-1)
        | _ -> 1

Usando Visual Studio, podemos poner un breakpoint en el otherwise, donde esta el “1“, llamar la función y ver que pasa:

factorial

Ahora podemos probar calculando el factorial de Int32.MaxValue y ver que pasa… Process is terminated due to StackOverflowException.

Entonces, ¿Es posible escribir funciones recursivas que no llenen el call stack? La respuesta es si, y es lo que llamamos tail recursion (lo dejo en inglés porque recursión de cola no me gusta). Este tipo de recursión se basa en que todas las llamadas recursivas aplicarán la optimización de tail call. Esta optimización la obtenemos cuando la función llamadora retorna directamente el valor de la función llamada sin aplicar ninguna transformación/operación/reducción sobre dicho valor de retorno. Entonces, si todas las llamadas de la función recursiva son tail calls tendremos tail call recursion.

¿Pero cómo podemos lograr esto? Si analizamos de nuevo el tail call la respuesta pasa por retornar directamente la llamada a la función sin aplicar la función x*. Pero… ¿Y por cual valor multiplicamos? pues tendremos que usar un valor inicial (seed/acumulador/…). Con lo que la función factorial, tail recursive, puede ser definida así:

    let factorial' =
        let rec fact acc = 
            function
            | x when x > 1 -> fact (acc*x) (x-1)
            | _ -> 1
        fact 1

Cómo podemos ver ahora hay una nueva función dentro de la definición de factorial'. Esta función recibirá el valor del acumulador, que será 1 y el parámetro n. Si ponemos nuevamente un breakpoint en el otherwise, donde está el “acc“. Y usamos la función con Int32.MaxValue, esta vez veremos un call stack como el siguiente:

factorial1

De esta forma retornamos directamente la función sin hacer ninguna operación (x*) sobre la función.

En lenguajes funcionales, como F#, es común que el compilador optimice este código, y lo convierta a loops cómo los que tendríamos en lenguajes imperativos, logrando así el mismo comportamiento y performance.

Por el contrario, si queremos aplicar esta tipo de recursión en lenguajes como C# veremos que no hay ninguna optimización y el call stack se irá llenando con cada llamada a la función. El por qué en C# no se hace esta optimización es un misterio para mi, pero para eso esta F#.

Espero les sea de utilidad.

Hasta el próximo post.

Referencias

¹ Véase Wikipedia, Tail call, consultado: 7/11/2015.

MSDN Blog, Tail calls in F#

MSDN Blog, Understanding Tail Recursion

StackOverflow, Why doesn’t .NET/C# optimize for tail-call recursion?

[F#] Tail Recursion y el Stack

[AutoMapper] Convertir de Utc a determinada zona horaria

Dado un modelo como el siguiente:

public class User
{
	//Otras propiedades...
	public string TimeZoneId { get; set; }		
}

public class UserAction
{
	//Otras propiedades...
	public DateTime DateInUtc { get; set; }
}

Al registrar una acción del usuario registraremos la fecha en utc, pero a la hora de recuperarla y mostrarla al usuario, esta deberá ser de nuevo convertida, de Utc a la zona horaria del usuario. Si recuperamos una colección de acciones del usuario sería necesario crear una colección del ViewModel y en un bucle construir las fechas:

var timeZoneId = GetTimeZoneId();
var timeZoneInfo = TimeZoneInfo.FindSystemTimeZoneById(timeZoneId);
var userActions = GetUserActions();
var viewModel = new List<UserActionViewModel>();
foreach(var userAction in userActions)
{
	viewModel.Add(new UserActionViewModel()
		{
			ValidDate = TimeZoneInfo.ConvertTimeFromUtc(userAction.DateInUtc, timeZoneInfo)
		});
}

Podemos extraer el código y reutilizarlo para construir los ViewModels que necesitemos y hagan uso de las fechas y las zonas horarias. Pero si ya hemos incluido AutoMapper como una dependencia más de nuestro proyecto… ¿cómo lo podemos hacer?

IMappingOperationOptions y ValueResolver

AutoMapper ofrece la opción de “pasar información” al custom map haciendo uso de la segunda sobrecarga del método Map. Algo de la forma:

var timeZoneId = GetTimeZoneId();
var viewModel = Mapper.Map<UserActionViewModel>(userAction, opts=>opts.Items.Add("timezone", timeZoneId));

Y ahora ¿Cómo recuperamos dicho valor en el custom map?. Debemos convertir de alguna forma el valor de origen y recuperar el TimeZoneId. Para esto haremos uso de un ValueResolver, pues nos ofrece la posibilidad de acceder al contexto. Algo de la forma:

        class LocalizedDateTimeResolver : IValueResolver
        {
            public ResolutionResult Resolve(ResolutionResult source)
            {
                var timeZoneId = source.Context.Options.Items["timezone"].ToString();
                var utc = (DateTime)source.Value;
                var timezoneInfo = TimeZoneInfo.FindSystemTimeZoneById(timeZoneId);
                var dateTime = TimeZoneInfo.ConvertTimeFromUtc(utc, timezoneInfo);
                return source.New(dateTime);
            }
        }

Con esto, en el custom map podemos usar el método ResolveUsing, de la forma:

            Mapper.CreateMap<UserAction, UserActionViewModel>()
                .ForMember(des => des.ValidDate,
                    opts => opts.ResolveUsing<LocalizedDateTimeResolver>().FromMember(src => src.DateInUtc));

Espero les sea de utilidad. Hasta el próximo  post.

[AutoMapper] Convertir de Utc a determinada zona horaria

[Entity Framework] Encapsular las Fk de las entidades

La ultima vez que use Entity Framework quedé con un disgusto por como terminé modelando los objetos, en parte por ignorancia y en parte por imposiciones y convenciones propias del ORM.

En esta ocasión, me encontré con un caso de propiedades de navegación y las propiedades “{*}Id” que usa Entity Framework para construir las relaciones. Esto me sirvió para recordar como funcionan estas convenciones y ver si puedo crear un modelo menos anémico y más ajustado a la realidad.

Dado el siguiente modelo:

    public class Entity
    {
        protected Entity()
        {
            Id = Guid.NewGuid();
        }

        public Entity(Guid id)
        {
            Id = id;
        }

        public Guid Id { get; private set; }
    }

    public class User : Entity
    {
        //EF
        protected User() { }

        public User(string name, string lastName)
        {
            LastName = lastName;
            Name = name;
        }

        public void FinishRegister(Country country)
        {
            Country = country;
        }

        public string Name { get; private set; }

        public string LastName { get; private set; }

        public Country Country { get; private set; }
    }

    public class Country : Entity
    {
        //EF
        protected Country() { }

        public Country(string name, string iso)
        {
            Iso = iso;
            Name = name;
        }

        public string Name { get; private set; }

        public string Iso { get; private set; }
    }

Vemos en el modelo ya una convención de EF, el uso de la propiedad virtual para poder generar los proxies que serán usados en el lazy loading y change tracking… pero continuemos.

La propiedad virtual es una convención en EF que creará una columna FK, uniqueidentifier, allow nulls con el nombre de la propiedad y el sufijo “_Id”. Lo que se ajusta perfecto a nuestro modelo.

Supongamos ahora que el usuario se registra con su nombre y apellido, y luego finalizará su proceso de registro. En este proceso solicitamos toda la información restante. Por practicidad del ejemplo solo será el país.

Este modelo se ajusta bien a los requerimientos pero si lo llevamos a la practica no será muy “optimo“, es decir, suponiendo un entorno web: en una petición POST, para finalizar el registro, en los datos de la petición solo enviaremos el Id del país y será necesario entonces consultar la base de datos para recuperar el el usuario y el país (por su Id) y luego si finalizar el registro. Desde mi punto de vista la consulta que sobraría es la del país.

            using (var context = new ApplicationDbContext())
            {
                var user = context.Users.Find(viewModel.UserId);
                var country = context.Countries.Find(viewModel.SelectedCountry);
                user.FinishRegister(country);
                context.SaveChanges();
            }

Entonces la solución pasaría por contaminar el modelo usando una propiedad Id que represente la identidad del país y aprovechar las bondades del orm, que para eso lo estamos usando! El modelo queda de la forma:

    public class User : Entity
    {
        //EF
        protected User() { }

        public User(string name, string lastName)
        {
            LastName = lastName;
            Name = name;
        }

        public void FinishRegister(Guid countryId)
        {
            CountryId = countryId;
        }

        public string Name { get; private set; }

        public string LastName { get; private set; }

        public Guid? CountryId { get; private set; }

        public Country Country { get; private set; }
    }

En este caso usamos la convención de llaves foraneas de EF y ya podremos usar solo el Id del país para finalizar el proceso de registro.

Lo que no me gusta de esta solución es tener que exponer al publico la propiedad CountryId  pues esto no representa nada en el exterior, para eso está la propiedad del tipo País que ya definimos. Entonces lo mejor sería ocultarla del exterior quitándole su modificador de acceso publico. Pero al quitarle el acceso publico EF no entenderá la convención de llaves foráneas y  el código que finaliza el registro no servirá, porque no entiende que ese GUID es la clave de país.

Ahora la pregunta es: ¿Cómo mapear propiedades no publicas en Entity Framework?… y Stack Overflow sale al rescate.

En síntesis agregamos otro contaminante al modelo, el atributo Column, y generaremos una convención que se encargue de hacer el mapping de estas propiedades (no publicas y marcadas con dicho atributo) vía reflection. El código del contexto está en este gist

De esta forma ya podemos Finalizar el registro haciendo uso del Id pero sin exponer propiedades extra al exterior.

Espero les sea de utilidad.

Hasta el próximo post.

[Entity Framework] Encapsular las Fk de las entidades

Azure Service Bus Queue Client + Reactive Extensions

Azure Service Bus soporta dos modelos de mensajería asíncrona basados en colas, éstos son: las colas y los tópicos. Las colas ofrecen una comunicación 1:1, es decir, se publica un mensaje en la cola y de los n receptores sólo uno podrá trabajar el mensaje. Por el contrario, los tópicos, responden de una manera más fiel a un modelo pub/sub, y nos permiten una comunicación 1:n, es decir, el mensaje que se publica en un tópico puede ser recibido por uno o varios  subscritores que podrán o no hacer uso de filtros para recibir solo los mensajes que les interesan; esto se logra haciendo uso de un sistema de colas virtuales para cada subscripción, sistema que controla Azure por nosotros.

Cada modelo responde a necesidades distintas y no son excluyentes, es decir, en un mismo namespace podemos estar trabajando con colas y tópicos al mismo tiempo y usarlas para responder a las necesidades de nuestra solución. Aunque, si lo deseamos, podemos agregar a las colas un comportamiento similar al de los tópicos, interactuando con las propiedades de los mensajes y creando filtros para responder de forma distinta a cada uno de los mensajes que encontremos en la cola. Digo similar, pero esto no llega ni a la mitad de los tópicos, pues aquí no estaríamos empleando ningún sistema de colas distinto al provisto por Azure y se sigue compitiendo por recibir los mensajes.

Desde el SDK 2 tenemos la posibilidad de recibir mensajes de una forma event-driven a través del OnMessage, método que encapsula todo el loop infinito que sondea la cola en espera de nuevos mensajes. Con este modelo dirigido por eventos ya podemos imaginarnos el por qué de los Reactive Extensions. Los Reactive Extensions nos permiten crear secuencias observables de distintas formas, así podemos trabajar con estos datos asíncronos de una forma más sencilla y escribiendo un código más declarativo, fácil de leer y mantener. Así que manos a la obra.

Lo primero es crear la secuencia observable a partir del metodo OnMessage, aquí, no sabía bien cual era el mejor enfoque (aún me pierdo con todos los operadores de Rx), así que pregunté a los colegas y terminé con un código de la forma:

            IObservable<BrokeredMessage> observable = Observable.Create<BrokeredMessage>(
                observer =>
                {
                    _queueClient.OnMessage(observer.OnNext, new OnMessageOptions());
                    return Disposable.Empty;
                }).Publish().RefCount();

El código anterior tiene tres partes importantes, lo primero es acudir al método Create (En la sobrecarga con IDisposale) para crear la secuencia observable y devolver un IDisposable. Esta secuencia observable se creará a partir de la implementación de una función “subscriptora” (El primer parámetro del método). Si dejamos hasta ahí y empezamos a vincular subscriptores (más de uno) a la secuencia observable, el método OnAction se ejecutará cuantas veces se agregué un subscriptor, lo que provocaría una excepción con el mensaje: The method ‘OnMessage’ or ‘OnMessageAsync’ has already been called. Lo que nos lleva al segundo punto importante del código, el operador Publish, éste retornará un IConnectableObservale<T>, que nos permitirá compartir la subscripción (y no solo la secuencia observable), si! compartir el OnMessage que registramos en el Create. Finalmente, hacemos uso del RefCount, que nos retorna un IObservable<T> a partir del IConnectableObservable<T> y manejará por nosotros la conexión y desconexión de los subscriptores.

Teniendo “claro” como se genera la secuencia observable solo queda usar los conocimientos de linq para crear nuevas secuencias observables y subscripciones. En este ejemplo separaremos comandos y consultas y ejecutaremos algo (impresión en pantalla) con esta información. Algo de la forma:

            var comandos = observable.Where(
                message =>
                {
                    object obj;
                    return message.Properties.TryGetValue("IsCommand", out obj) && (bool)obj;
                }).Subscribe(
                x => //OnNext
                {
                    Console.ForegroundColor = ConsoleColor.DarkYellow;
                    Console.WriteLine(x.GetBody<string>());
                },
                x => Console.WriteLine(x.Message), //OnError
                () => Console.WriteLine("Complete")); //OnComplete

Y tenemos como salida:

image

El código completo esta en este gist.

Como vemos la implementación con Reactive Extensions nos permite escribir un código más declarativo, un poco más fácil de mantener y de leer, con lo que ya sabemos de Linq.

Espero les sea de utilidad.

Hasta el próximo post

Azure Service Bus Queue Client + Reactive Extensions

Compilar con Roslyn en Sublime Text

Con el anuncio hecho ayer durante el //Build las inquietudes sobre Roslyn se incrementaron, pues, pese a que ya llevan varios años desarrollándolo y anunciándolo muchos aun no hemos estudiado a fondo de que va esto de un compilador como servicio.

Una de las inquietudes que me manifestó un colega ayer fue: “Pero entonces ¿ahora estamos atados a Visual Studio? yo usaba el csc.exe por fuera de éste sin problema” La respuesta es que no. Y es una de las cosas que más me gusta de poder compilar los fuentes de Roslyn.

Obtener y compilar los fuentes

Esto, por fortuna, ya esta muy bien documentado en la wiki de Roslyn en CodePlex. Así que aquí solo mostraré unas capturas de pantalla para recrear el proceso :P:

  1. Descargar e instalar el .NET Compiler Platform (“Roslyn”) End User Preview
  2. Obtener los fuentes:
  3. image
  4. Abrir el Developer Command Prompt for VS2013 como administrador (Click derecho –> correr como administrador). Esta utilidad la encontramos en: C:\Program Files (x86)\Microsoft Visual Studio 12.0\Common7\Tools\Shortcuts
  5. Restaurar paquetes (en la carpeta de Roslyn, donde se clonó el repo): Src\.nuget\nuget restore Src\Roslyn.sln
  6. Correr el MSBuild: msbuild Roslyn.sln
  7. Si todo va bien, verás algo como:

image

NOTA: No quise caer en el error de transcribir lo que ya esta documentado en la Wiki del proyecto, si hay alguna duda, allí esta descrito en detalle cada uno de los pasos. Si no entiendes bien, y quieres aclarar algo no dudes en comentar.

¿Cuantos Compiladores Tengo?

Si exploramos la carpeta Src\Compilers\CSharp\rcsc\obj\Debug veremos el .exe que nos permitirá realizar las compilaciones usando nuestra  versión del compilador. En la documentación recomiendan no usar este, por problemas con el performance (aun no entiendo bien por qué).

Así pues, tenemos un ejecutable nativo en la dirección Src\Compilers\CSharp\rcsc2\obj\Debug y este llamará a nuestra versión del compilador.

Cabe anotar que el output de la compilación esta en la ruta roslyn\Binaries\Debug :

image

Entonces, podemos usar la línea de comandos para compilar nuestros archivos:

image

Agregar Build System en Sublime Text

Hechas este par de aclaraciones, solo basta con ir al menú: Tools –> Build System –> New Build System y escribir:

{
    “cmd”: [“Unidad donde se clonó el repo\\roslyn\\Binaries\\Debug\\rcsc2.exe /utf8output /nologo”, “$file_name”, “&&”, “$file_base_name.exe”],
      “shell”: true
}

 

Guardamos con el nombre que deseemos y lo seleccionamos como nuestro Buidl System:

image

Con esto, ya podemos ejecutar nuestras aplicaciones con la versión de nuestro compilador por fuera de Visual Studio:

image

 

Espero les sea de utilidad.

Hasta el próximo post.

Compilar con Roslyn en Sublime Text

Los test de integración y el contenedor de IoC

A diferencia de los test unitarios, en los que testeamos una unidad definida y de manera aislada, los test de integración están pensados para probar la interacción, no solo de distintas unidades, sino que además interactúan con dependencias externas reales como la base de datos.

Cuando escribo este tipo de pruebas me gusta tener un entorno los más similar a la realidad que se tiene en producción, es decir, si manejo soluciones con bases de datos relacionales y no relacionales, no trataré de crear ningún stub para emular el comportamiento de alguno de sus contextos.

Pero entonces, si queremos testear varios componentes del sistema y no debemos crear mocks o stubs para las dependencias, ¿qué usamos para resolverlas? pues, como lo hacemos en el entorno de producción! con el contenedor de IoC.

NOTA: No soy un gran amante de los contenedores de IoC, por distintas razones que darían para otro articulo, pero aquí expongo un caso especifico que si hace uso de un IoC container

No siempre usar la misma configuración del IoC para los Test

Aunque suene rara la contradicción (mantener todo lo más fiel al entorno de producción), en ocasiones es necesario re-escribir la configuración del contenedor de IoC, ¿para qué? para no tener que realizar peticiones a servicios que tienen algún tipo de throttling, por ejemplo, o por algún otro motivo que esconda alguna implementación concreta.

Un ejemplo concreto, SimpleInjector, VSTest y Entity Framework

En un proyecto en el que colaboro empleamos el SimpleInjector como contenedor de IoC, la parte de tests la realizamos con la suite de VSTest y como ORM tenemos Entity Framework.

La parte que hace referencia a la configuración del contenedor de dependencias es muy sencilla, simplemente registramos las dependencias como las necesitamos, tratando de ser lo más fieles a lo que tenemos en producción. Algo de la forma:

    public class IntegrationTestIocConfiguration : BaseIocConfiguration
    {
        public override void InitLifetimeDependantTypes(Container container)
        { 
            //AzureItems	  	
            //...
            //...

            //Infrastructure.Data Base
            //...
            //...

            //Some Repositories
            //...
            //...		
        }
    }

Ahora bien, en un entorno web de asp.net el punto de entrada lo localizamos en el Application_Start del Gloal.asax y es aquí donde se inicializan estos contenedores de dependencias. Esto es importante! porqué el contenedor solo se inicializa una vez (en dicho punto de entrada) y, si queremos mantener los test lo más similar al entorno de producción, dicho contenedor deberá ser inicializado una única vez para todas las pruebas que se ejecuten.

Pero en un proyecto de test, que no tiene un método main 😛 ¿donde ubicamos el punto de entrada? En este caso concreto, en el espacio de nombres Microsoft.VisualStudio.TestTools.UnitTesting tenemos el atributo AssemblyInitializeAttribute que nos servirá para realizar la inicialización, algo de la forma:

    [TestClass]
    public class IntegrationTestEntryPoint
    {
        public static Container IocContainer = null;

        [AssemblyInitialize]
        public static void Sartup(TestContext context)
        {
            Database.SetInitializer(new DropCreateDatabaseAlways<YourContext>());
            IocContainer = new Container();
            var configuration = new IntegrationTestIocConfiguration();
            configuration.Initialize(IocContainer);
            IocContainer.Verify();
        }
    }

Puede darse el caso que queramos que dichas pruebas usen una base de datos diferente, se conecten a servicios diferentes, o incluso que el Entity Framework use un seed diferente para los datos de estas pruebas, etc. Si lo has tenido en cuenta, y son parámetros configurables, puedes valerte del app.config para cambiar dichas direcciones.

Ahora, podemos empezar a escribir nuestros test, algo de la forma:

        [TestMethod]
        public void Should_Save_YourCaseToTest()
        {
            var service = IntegrationTestEntryPoint.IocContainer.GetInstance<IYourService>();
            var yourObj = new YourOject
                           {
                               //blah, blah, blah
                           };
            service.YourMethodSaveSuperTransacation(yourObj);
            var ctx = IntegrationTestEntryPoint.IocContainer.GetInstance<IAnyContext>();
            var obj = ctx.Get<YourObject>().FirstOrDefault(x => x.Property == "Value");
            Assert.IsNotNull(cmp);
            Assert.AreEqual(obj.Status, ObjectsStatus.Created);
            //Other interesting Asserts here
        }

Conclusión

No esta mal (en mi muy humilde opinión) usar el contenedor de dependencias para realizar los test de integración.

Resulta útil mantener estas pruebas lo más fieles posible al entorno de producción.

Pueden presentarse casos en los que simplemente es necesario sobrescribir las configuraciones del contenedor (como siempre, de todo depende!)

 

Cualquier feedback es bienvenido, no duden en comentar.

Hasta el próximo post.

Los test de integración y el contenedor de IoC

Reactive Extensions – TPL Dataflow

En la entrada anterior decíamos que los Reactive Extension emplean una secuencia de datos y que esta secuencia de datos pueden ser eventos.

Esta es una de las características de los Rx que más gusta! la posibilidad de combinar los eventos y su asincronía con el poder de LINQ, reduciendo la cantidad de código que debemos escribir y facilitándonos la mantenibilidad del mismo.

Para éste ejemplo usaremos un FileSystemWatcher que estará pendiente de la creación de archivos sobre uno de las carpetas del disco, pero solo los archivos que cumplan con cierta condición serán procesados por una red de de flujo de trabajo provista por la TPL Dataflow, ésta, en si, se basa en una serie de bloques que procesaran datos de múltiples orígenes en varios bloques (Bloques de acción, bloques de unión, bloques de transformación, bloques de buffer…) enlazados entre si a través del método  LinkTo() y devuelven un resultado de dicho proceso. Hablar sobre TDL da para una serie completa, así que nos quedaremos con esta definición por ahora.

Dejemos que sea el código quien termine de contar la historia.

    public partial class Form1 : Form
    {
        //Declaración del watcher
        private readonly FileSystemWatcher _watcher;

        public Form1()
        {
            InitializeComponent();
            //Inicialización del Watcher indicando el path e indicando que lance los eventos asociados
            _watcher = new FileSystemWatcher(@"D:\Test") { EnableRaisingEvents = true };
        }

        private void Form1Load(object sender, EventArgs e)
        {
            //Declaración de un bloque de transformación
            //Se basa en un Func, este recibirá un string y devolverá un Image.
            //El código que realizará dicha transformación esta en el método Transform(string path)
            var transformation = new TransformBlock((Func)Transform);

            //Declaración de un bloque de acción
            //Se basa en un Action, es decir que ejecutara el código del delegado 
            //y no retornará ningún valor, en este caso solo pintara en pantalla los Image que se le pasen
            //a traves de un control picturebox sobre un control TableLayoutPanel
            var action = new ActionBlock(img =&gt; tableLayoutPanel.Controls.Add(new PictureBox
                                                                                     {
                                                                                         Image = img,
                                                                                         Width = 50,
                                                                                         Height = 50,
                                                                                     }),

                //Los TDL no realizan procesamiento en paralelo de forma predeterminada
                //para esto es necesario indicarlo mediante uno de los DataflowBlockOptions
                //como es el ExecutionDataflowBlockOptions
                new ExecutionDataflowBlockOptions
                {
                    TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), //Sincronización de contexto (violación de thread principal)
                    MaxDegreeOfParallelism = 5 // Numero de mensajes que se pueden procesar al mismo tiempo
                });

            //Enlazar el bloque de transformación con el bloque de acción
            transformation.LinkTo(action);

            //Inicializará una secuencia de eventos observables a partir del patrón de eventos de .NET
            //con una administración automática de "desuscripción" de eventos, dado que la subscripción 
            // a estas secuencias es un tipo de IDisposable
            var observable = Observable.FromEventPattern
                (
                    fsHandler =&gt; _watcher.Created += fsHandler,
                    fsHandler =&gt; _watcher.Created -= fsHandler)
                    //Consulta directa sobre los la secuencia observable de delegados
                    //a partir del eventargs (Nótese el patrón de eventos de .NET)
                    //En este caso todos los que empiecen por "a" serán mostrados en pantalla
                .Where(x =&gt; x.EventArgs.Name.ToLower().StartsWith("a"));
            //La suscripcion a esta secuencia observable invocara el bloque de transformación
            //que esta enlazado al bloque de acción
            observable.Subscribe(x =&gt; transformation.Post(x.EventArgs.FullPath));
        }

        //Crear un image a partir de su path
        private static Image Transform(string path)
        {
            Image image = Image.FromFile(path);
            return image;
        }
    }

Como vemos, es posible generar una secuencia observable de delegados de eventos que cumplan con el patrón descrito por los eventos de .NET, pero… y si nuestras clases no usan esto? si no usamos un EventHandler? ¿Hay operadores para trabajar con estos? este es el interrogante a resolver en el siguiente post.

Espero que les sea de utilidad.

Hasta el próximo post.

Reactive Extensions – TPL Dataflow