Azure Service Bus Queue Client + Reactive Extensions

Azure Service Bus soporta dos modelos de mensajería asíncrona basados en colas, éstos son: las colas y los tópicos. Las colas ofrecen una comunicación 1:1, es decir, se publica un mensaje en la cola y de los n receptores sólo uno podrá trabajar el mensaje. Por el contrario, los tópicos, responden de una manera más fiel a un modelo pub/sub, y nos permiten una comunicación 1:n, es decir, el mensaje que se publica en un tópico puede ser recibido por uno o varios  subscritores que podrán o no hacer uso de filtros para recibir solo los mensajes que les interesan; esto se logra haciendo uso de un sistema de colas virtuales para cada subscripción, sistema que controla Azure por nosotros.

Cada modelo responde a necesidades distintas y no son excluyentes, es decir, en un mismo namespace podemos estar trabajando con colas y tópicos al mismo tiempo y usarlas para responder a las necesidades de nuestra solución. Aunque, si lo deseamos, podemos agregar a las colas un comportamiento similar al de los tópicos, interactuando con las propiedades de los mensajes y creando filtros para responder de forma distinta a cada uno de los mensajes que encontremos en la cola. Digo similar, pero esto no llega ni a la mitad de los tópicos, pues aquí no estaríamos empleando ningún sistema de colas distinto al provisto por Azure y se sigue compitiendo por recibir los mensajes.

Desde el SDK 2 tenemos la posibilidad de recibir mensajes de una forma event-driven a través del OnMessage, método que encapsula todo el loop infinito que sondea la cola en espera de nuevos mensajes. Con este modelo dirigido por eventos ya podemos imaginarnos el por qué de los Reactive Extensions. Los Reactive Extensions nos permiten crear secuencias observables de distintas formas, así podemos trabajar con estos datos asíncronos de una forma más sencilla y escribiendo un código más declarativo, fácil de leer y mantener. Así que manos a la obra.

Lo primero es crear la secuencia observable a partir del metodo OnMessage, aquí, no sabía bien cual era el mejor enfoque (aún me pierdo con todos los operadores de Rx), así que pregunté a los colegas y terminé con un código de la forma:

            IObservable<BrokeredMessage> observable = Observable.Create<BrokeredMessage>(
                observer =>
                {
                    _queueClient.OnMessage(observer.OnNext, new OnMessageOptions());
                    return Disposable.Empty;
                }).Publish().RefCount();

El código anterior tiene tres partes importantes, lo primero es acudir al método Create (En la sobrecarga con IDisposale) para crear la secuencia observable y devolver un IDisposable. Esta secuencia observable se creará a partir de la implementación de una función “subscriptora” (El primer parámetro del método). Si dejamos hasta ahí y empezamos a vincular subscriptores (más de uno) a la secuencia observable, el método OnAction se ejecutará cuantas veces se agregué un subscriptor, lo que provocaría una excepción con el mensaje: The method ‘OnMessage’ or ‘OnMessageAsync’ has already been called. Lo que nos lleva al segundo punto importante del código, el operador Publish, éste retornará un IConnectableObservale<T>, que nos permitirá compartir la subscripción (y no solo la secuencia observable), si! compartir el OnMessage que registramos en el Create. Finalmente, hacemos uso del RefCount, que nos retorna un IObservable<T> a partir del IConnectableObservable<T> y manejará por nosotros la conexión y desconexión de los subscriptores.

Teniendo “claro” como se genera la secuencia observable solo queda usar los conocimientos de linq para crear nuevas secuencias observables y subscripciones. En este ejemplo separaremos comandos y consultas y ejecutaremos algo (impresión en pantalla) con esta información. Algo de la forma:

            var comandos = observable.Where(
                message =>
                {
                    object obj;
                    return message.Properties.TryGetValue("IsCommand", out obj) && (bool)obj;
                }).Subscribe(
                x => //OnNext
                {
                    Console.ForegroundColor = ConsoleColor.DarkYellow;
                    Console.WriteLine(x.GetBody<string>());
                },
                x => Console.WriteLine(x.Message), //OnError
                () => Console.WriteLine("Complete")); //OnComplete

Y tenemos como salida:

image

El código completo esta en este gist.

Como vemos la implementación con Reactive Extensions nos permite escribir un código más declarativo, un poco más fácil de mantener y de leer, con lo que ya sabemos de Linq.

Espero les sea de utilidad.

Hasta el próximo post

Anuncios
Azure Service Bus Queue Client + Reactive Extensions

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s