MANAGING RABBITMQ MESSAGES WITH F# AND AKKA.NET

Durable message queues can complement well a system based on the actor model. While persistent actors can protect the system against message loss, the same can often be achieved using durable queues and explicit message acknowledgement. RabbitMQ is one of the products that fully support such scenarios and can be installed and consumed on different platforms, including .NET. This post presents a RabbitMQ actor that uses Akka.NET and is written in F#.

SUPPORTED SCENARIOS

RabbitMQ Web site lists the following main message exchange patterns:

  • Work queues
  • Publish/subscribe
  • Routing
  • Topics
  • RPC

For the F# queue actor I decided to skip RPC support, because RPC communication style doesn't really fit the message-driven nature of actors. So I focused on work queues, pub/sub, routing and topics.

Another important decision was to identify scenarios requiring message acknowledgement from consumers. Message acknowledgement changes communication from a simple fire-and-forget scenario (represented by the Message pattern and its variations in Gregor Hohpe's and Bobby Woolfe's "Enterprise Integration Patterns" catalog) into a request-reply scenario with a return address (Request-Reply and Return Address in Hohpes's and Woolfe's catalog). I decided to support acknowledgement for any type of queues but use it only in examples for work queues, because other scenarios (except for RPC which is out of the scope of this work) are all variations of message broadcasting to multiple subscribers and notifying a queue with a proof of receipt is often of less concern.

PREREQUISITES AND REFERENCED WORK

The development of the RabbitMQ actor was conducted as a part of a project at Norwegian Broadcasting Corporation (NRK). The actor was built using following libraries and NuGet packages (which in addition include other dependencies):

  • FSharp.Core 4.3.1
  • Akka 1.0.5
  • Akka.FSharp 1.0.5
  • RabbitMQ.Client 3.6.0

Part of the inspiration for this work came from a series of blog posts by Panos and Jorge Fioranelle's article "Reactive Messaging Patterns with F# and Akka.NET". I also found Vaughn Vernon's book "Reactive Messaging Patterns with the Actor Model" to be an invaluable resource for anyone building a actor-based system.

IMPLEMENTATION PLAN

Here is the sequence of steps that guides us through the implementation:

  • Server actor and domain messages
  • Server actor and queue message acknowledgement
  • Client actor
  • Queue actor

SERVER ACTOR AND DOMAIN MESSAGES

Our server will respond to simple requests consisting of single strings. It will reply with single strings too. So our DomainMessage discriminated union is tiny:

F#
module DomainMessages

type DomainMessage =
    | Request of string  
    | Reply of string` 

Note that we don't need to include sender's identification information in a message body: it will be provided by Akka as a message mailbox property. The first Server implementation is simple too. It's an F# actor that uses Akka.NET F# API:

F#
module Server

open System
open Akka.Actor
open Akka.FSharp

open DomainMessages

let serverActor (mailbox: Actor<_>) =

    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | Request what -> 
            printfn "Server: received request: %s" what
            mailbox.Sender() <! Reply what
        return! loop ()
    }
    loop ()

SERVER ACTOR AND QUEUE MESSAGE ACKNOWLEDGEMENT

The above implementation will work fine as long as messages don't require acknowledgement to be sent to the queueing system. If such requirement is stated (and it is indeed in a work queue scenario), DomainMessage should be extended with the acknowledgement ID. One straightforward way to achieve this is just to extend the DomainMessage definition:

F#
type AckId = uint64

type DomainMessage =
    | Request of string
    | RequestWithAck of IActorRef * AckId * string
    | Reply of string

While this will certainly work, I don't like polluting our domain definitions with communication infrastructure details. Instead we will define a generic MessageEnvelope type that will effectively wrap domain-specific message content in a container that will be passed between actors:

F#
type AckId = uint64

type MessageEnvelope<'a> =
    | Message of 'a
    | MessageWithAck of 'a * IActorRef * AckId

Then instead of sending instances of Request and Reply actors will exchanges instances of MessageEnvelope<DomainMessage> that can be matched with Message of Request, MessageWithAck of Request and Message of Reply.Now we need to extend Server implementation. Vaughn Vernon in his book suggested the use of a dedicated worker actor that would handle the Return Address message pattern, so let's follow his advice; here's a queue acknowledgement actor:

F#
module QueueActors

open System
open System.Text
open Akka.Actor
open Akka.FSharp

type AcknowledgementMessage = 
    | QueueAck of IActorRef * AckId
    | QueueNack of IActorRef * AckId

let queueAcknowledgeActor (mailbox: Actor<_>) =
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | QueueAck (queue, tag) -> 
            printfn "Acknowledger: received ack: %d" tag
            queue <! Ack tag
        | QueueNack (queue, tag) -> 
            printfn "Acknowledger: received nack: %d" tag
            queue <! Nack tag
        return! loop ()
    }
    loop ()

Equipped with an actor implementing queue acknowledgement, we can now write a final version of our server actor:

F#
module Server

open System
open Akka.Actor
open Akka.FSharp

open DomainMessages
open QueueActors


let serverActor (mailbox: Actor<_>) =

    let acknowledger = spawn mailbox.Context "acknowledge" queueAcknowledgeActor
    let rec loop () = actor {
        let! message = mailbox.Receive ()
        match message with
        | Message (Request what) -> 
            printfn "Server: received request: %s" what
            mailbox.Sender() <! Message (Reply what)
        | MessageWithAck (Request what, queue, tag) -> 
            printfn "Server: received request: %s" what
            mailbox.Sender() <! Message (Reply what)
            acknowledger.Forward <| QueueAck (queue, tag)
        | _ -> printfn "Server: unsupported message: %A" message
        return! loop ()
    }
    loop ()

CLIENT ACTOR

Now that we have finalized the implementation of our server (and defined a domain message that it responds to), it's time to look at the client side. Looking at the server implementation it's clear that client should be able to send two types of messages to it:

F#
server <! Message (Request what)

and

F#
server <! MessageWithAck (Request what, queue, tag)

Here's the implementation:

F#
let clientActor (server: IActorRef) (mailbox: Actor<_>) =

    let rec loop () =
        actor {
            let! message = mailbox.Receive ()
            match message with
            | Message (Request what) ->
                printfn "Client: sending request: %s" what
                server <! message
            | MessageWithAck (Request what, queue, tag) -> 
                printfn "Client: sending request: %s" what
                server <! message
            | Message (Reply what) -> 
                printfn "Client: received reply: %s" what
            | _ -> printfn "Client: unsupported message: %A" message
            return! loop ()
        }

    loop ()

Our client will be queue message driven, so both of these requests should come as actions triggered upon receipt of messages from RabbitMQ. We can't handle such messages inside the same mailbox (those messages will be of a different type and coming from a difference source).

If we were only dealing with work queues, it could be practical to bind the client to the associated queue for the whole function scope (i.e. pass the queue reference as the function parameter). But pub/sub, routing and topic exchange patterns imply greater level of decoupling between clients and queues, so we will let the client start and stop its communication with a particular queue. Here's a SubscriberMessage definition:

F#
type SubscriberMessage =
    | StartWith of IActorRef
    | Stop
    | QueueMessage of string
    | QueueMessageWithAck of AckId * string

The choice of StartWith name follows Vaughn Vernon's convention for Request-Reply pattern. A client starts listening to a queue with StartWith command, receives messages with or without acknowledgement requirements, and stops receiving messages on Stop command. Effectively a client lives in one of two states: connected or not connected to a queue. We can reflect these states in the implementation by defining two recursive message loop functions:

Here's a definition of a queued message reader actor that will forward incoming MQ messages to the clientActor's mailbox:

F#
let queueReaderActor<'a> (deserializeMessage : (string -> 'a)) (messageHandler: IActorRef) (mailbox: Actor<_>) =
    let rec starting() =
        actor {
            let! message = mailbox.Receive ()
            match message with
            | StartWith queue ->
                return! listening (queue)
            | _ -> 
                printfn "QueueReader: invalid operation in starting state: %A" message
            return! starting ()
        }
    and listening (queue: IActorRef) = 
        actor {
            let! message = mailbox.Receive ()
            match message with
            | StartWith queue ->
                return! listening (queue)
            | Stop ->
                return! starting ()
            | QueueMessage msg ->
                let message : MessageEnvelope<'a> = Message(deserializeMessage msg)
                messageHandler <! message
            | QueueMessageWithAck (tag, msg) ->
                let message : MessageEnvelope<'a> = MessageWithAck(deserializeMessage msg, queue, tag)
                messageHandler <! message
            return! listening (queue)
        }

    starting ()

Note that queueReaderActor is generic - it doesn't drag any domain-specific definitions so it can be reused in a wide range of scenarios.

What I like about writing actors in F# is that actors fit very well with the immutable data principle that constitutes a foundation of functional languages. Note that queueReaderActoreffectively can mutate, but it doesn't define any mutable or immutable variables: instead it changes its behavior by switching between its two mailbox processing functions (starting and listening). This method is a cornerstone of the actor implementation, and we will use in the implementation of the main actor of this blog post - the queueActor that is presented in the next section.

QUEUE ACTOR

Finally we can move on to the RabbitMQ actor implementation. In addition to earlier definitions we need to define a few other types and messages.

We will use ExchangeType and QueueExchange to specify the type and name of RabbitMQ exchange when establishing queue communication ("exchange" is a concept RabbitMQ uses to abstract away actual queues):

F#
type ExchangeType =
    | Default
    | Direct
    | Fanout
    | Topic

type QueueExchange = 
    | Queue of string
    | Exchange of ExchangeType * string

For the sake of simplicity we will be dealing with text messages, but we will refer to them as Payload so it should be easy to change the content type. For the same reason we will define RoutingKey.

F#
type Payload = string
type RoutingKey = string

Now we can move on to higher level definitions and introduce a MessageContent:

F#
type MessageContent =
    | Content of Payload
    | ContentWithRouting of Payload * RoutingKey

And this brings us to one of the most essential definitions for queue actor – QueueCommand:

F#
type QueueCommand =
    | Connect
    | Disconnect
    | Subscribe of IActorRef
    | Unsubscribe
    | Route of RoutingKey
    | Unroute of RoutingKey
    | Publish of MessageEnvelope
    | Receive of AckId * Payload
    | Ack of AckId
    | Nack of AckId

QueueCommand is a discriminated union of 10 cases, or 5 pairs that cover all possible communication subjects:

  • Connecting and disconnecting an underlying actor to/from a queue
  • Subscribing and unsubscribing a client to/from an actor
  • Routing and unrouting messages with certain routing keys to a subscriber
  • Publishing and receiving domain messages
  • Acking and nacking received messages

To keep a better control of active queue connections, we will instantiate queue actors from another actor type: queue factory. It will accept FactoryCommand consisting of an actor name exchange details and a boolean flag to disable acknowledgement:

F#
type FactoryCommand =
    | CreateExchange of string * QueueExchange * bool

Finally, the queueActor and queueFactoryActor definitions:

F#
module QueueActors

open System
open System.Text
open Akka.Actor
open Akka.FSharp
open RabbitMQ.Client
open RabbitMQ.Client.Events
open RabbitMQ.Client.Exceptions

let queueActor (factory : ConnectionFactory) (queueExchange : QueueExchange * bool) (mailbox: Actor<_>) =

    let (exchangeType, exchangeOrQueueName, noAck) = 
        match queueExchange with
        | (Queue queueName, noAck) -> (ExchangeType.Default, queueName, noAck)
        | (Exchange (exchangeType, exchangeName), noAck) -> (exchangeType, exchangeName, noAck)

    let disconnect (connection : IConnection) (channel : IModel) =
        printfn "Queue: Disconnecting..."
        channel.Dispose()
        connection.Dispose()
        printfn "Queue: Disconnected."

    let publishMessage (channel : IModel) (message : MessageEnvelope) =
        let (what, routingKey) =
            match message with
                | Content what -> (what, "")
            | ContentWithRouting (what, routingKey)-> (what, routingKey)
        let body = Encoding.UTF8.GetBytes(what)
        printfn "Queue: Publish message: %s" what
        match exchangeType with
        | ExchangeType.Default -> channel.BasicPublish("", exchangeOrQueueName, null, body)
        | _ -> channel.BasicPublish(exchangeOrQueueName, routingKey, null, body)

    let receiveMessage (e : BasicDeliverEventArgs) =
        printfn "Received queue message"
        let message = Encoding.UTF8.GetString(e.Body)
        mailbox.Self <! Receive (e.DeliveryTag, message)
        printfn "Message is forwarded to Self"

    let rec disconnected () = 
        actor {
            let! message = mailbox.Receive ()
            match message with
            | Connect -> 
                printfn "Queue: Connecting..."
                let connection = factory.CreateConnection()
                let channel = connection.CreateModel()
                match exchangeType with
                | Default ->
                    channel.QueueDeclare(exchangeOrQueueName, true, false, false, null) |> ignore
                | _ -> 
                    channel.ExchangeDeclare(exchangeOrQueueName, (sprintf "%A" exchangeType).ToLower())
                printfn "Queue: Connected."
                return! connected (connection, channel)

            | _ -> 
                printfn "Queue: invalid operation in disconnected state: %A" message
                return! disconnected ()
        }
    and connected (connection : IConnection, channel : IModel) = 
        actor {
            let! message = mailbox.Receive ()
            match message with
            | Subscribe subscriber ->
                let consumer = new EventingBasicConsumer(channel)
                consumer.Received.Add(receiveMessage)
                let queueName =
                    match exchangeType with
                    | Default -> exchangeOrQueueName
                    | _ -> 
                        let queueName = channel.QueueDeclare().QueueName
                        let exchangeName = exchangeOrQueueName
                        channel.QueueBind(queueName, exchangeName, "")
                        queueName
                channel.BasicConsume(queueName, noAck, consumer) |> ignore
                subscriber <! StartWith mailbox.Self
                return! subscribed (connection, channel, queueName, subscriber)

            | Disconnect ->
                disconnect connection channel
                return! disconnected ()

            | Publish content -> publishMessage channel content

            | Receive _ -> ()

            | Ack tag -> 
                printfn "Queue: Ack: %d" tag
                channel.BasicAck(tag, false)

            | Nack tag -> 
                printfn "Queue: Nack: %d" tag
                channel.BasicNack(tag, false, false)

            | _ -> 
                printfn "Queue: invalid operation in connected state: %A" message

            return! connected (connection, channel)
        }
    and subscribed (connection : IConnection, channel : IModel, queueName : string, subscriber : IActorRef) = 
        actor {
            let! message = mailbox.Receive ()
            match message with
            | Unsubscribe ->
                return! connected (connection, channel)

            | Route routingKey ->
                match exchangeType with
                | Default -> printfn "Queue: Can not set routing for Default %A exchange type." exchangeType
                | _ -> 
                    channel.QueueBind(queueName, exchangeOrQueueName, routingKey)

            | Unroute routingKey ->
                match exchangeType with
                | Default -> printfn "Queue: Can not set routing for Default %A exchange type." exchangeType
                | _ -> 
                    channel.QueueUnbind(queueName, exchangeOrQueueName, routingKey, null)

            | Disconnect ->
                subscriber <! Stop
                disconnect connection channel
                return! disconnected ()

            | Publish content -> publishMessage channel content

            | Receive (tag, what) ->
                printfn "Queue: Received message with tag: %d %s" tag what
                match queueExchange with
                | (_, false) -> subscriber <! QueueMessageWithAck (tag, what)
                | _ -> subscriber <! QueueMessage what

            | Ack tag -> 
                printfn "Queue: Ack: %d" tag
                channel.BasicAck(tag, false)

            | Nack tag -> 
                printfn "Queue: Nack: %d" tag
                channel.BasicNack(tag, false, false)

            | _ -> 
                printfn "Queue: invalid operation in connected state: %A" message

            return! subscribed (connection, channel, queueName, subscriber)
        }

    disconnected ()

let queueFactoryActor (connectionString : string) (mailbox: Actor<_>) =
    let connectionFactory = new ConnectionFactory(HostName = "localhost")
    connectionFactory.AutomaticRecoveryEnabled <- true

    let rec loop (connectionFactory : ConnectionFactory) =
        actor {
            let! message = mailbox.Receive ()
            match message with
            | CreateExchange (actorName, queueExchange) ->
                spawn mailbox.Context actorName (queueActor connectionFactory queueExchange) |> ignore

            return! loop (connectionFactory)
        }

    disconnected ()

let queueFactoryActor (connectionString : string) (mailbox: Actor<_>) =
    let connectionFactory = new ConnectionFactory(HostName = "localhost")
    connectionFactory.AutomaticRecoveryEnabled <- true

    let rec loop (connectionFactory : ConnectionFactory) =
        actor {
            let! message = mailbox.Receive ()
            match message with
            | CreateExchange (actorName, queueExchange, noAck) ->
                spawn mailbox.Context actorName (queueActor connectionFactory (queueExchange, noAck)) |> ignore

            return! loop (connectionFactory)
        }

    loop (connectionFactory)

PUTTING IT ALL TOGETHER

Let's now test our implementation in all main scenarios. For all tests we will need some common actors: system, server, a couple of clients and a queue factory:

F#
let system = System.create "system" <| Configuration.load ()
let server = spawn system "server" serverActor
let client = spawn system "client" (clientActor server)
let another_client = spawn system "another_client" (clientActor server)
let queue_factory = spawn system "queues" (queueFactoryActor "localhost")
let queue_reader = spawn system "queue_reader" (queueReaderActor<DomainMessage> Request client)
let another_queue_reader = spawn system "another_queue_reader" (queueReaderActor<DomainMessage> Request another_client)

Output:val system : ActorSystem
val server : IActorRef = [akka://system/user/server]
val client : IActorRef = [akka://system/user/client]
val another_client : IActorRef = [akka://system/user/another_client]
val queue_factory : IActorRef = [akka://system/user/queues] val queue_reader : IActorRef = [akka://system/user/queue_reader] val another_queue_reader : IActorRef = [akka://system/user/another_queue_reader]

TESTING WORK QUEUES

To test work queues we need a queue actor to publish messages and a queue actor to subscribe a client to it, but in fact we can manage with a single queue actor being used as both queue publisher and consumer:

F#
queue_factory <! CreateExchange ("work", (Queue "hello"), false)
let queue = select "akka://system/user/queues/work" system
queue <! Connect
queue <! Subscribe queue_reader

Output (some of non-informative lines are removed for clarity):

val queue : ActorSelection
Queue: Connecting...
Queue: Connected.

Now we can publish a message and check that the queue acknowledgement actor sends back a message ack with a RabbitMQ delivery tag:

F#
queue <! Publish (Content "Hi!")

Output:Queue: Publish message: Hi!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 1 Hi!
Client: request with ack: 1 Hi!
Server: received request: Hi!
Acknowledger: received ack: 1
Queue: Ack: 1

… and disconnect:

queue <! Disconnect

Output:

Queue: Disconnecting...
Queue: Disconnected.

TESTING PUBLISH/SUBCRIBE

For pub/sub scenario we will need to use a different RabbitMQ exchange type (Fanout). We'll also need to create two different instances of queue actors: one to publish the message and one to subscribe clients to it. To demonstrate multiple message recipients we will subscribe both client and another_client to the same queue.

F#
queue_factory <! CreateExchange ("publisher", (Exchange (Fanout, "logs")), true)
let publisher = select "akka://system/user/queues/publisher" system
queue_factory <! CreateExchange ("subscriber", (Exchange (Fanout, "logs")), true)
let subscriber = select "akka://system/user/queues/subscriber" system

Output:

val publisher : ActorSelection
val subscriber : ActorSelection

Now we'll subscribe a single client and watch the first message delivered to it with no acknowledgement:

F#
publisher <! Connect
publisher <! Subscribe queue_reader
publisher <! Publish (Content "Hi 1!")

Output:Queue: Connecting...
Queue: Connected.
Queue: Publish message: Hi 1!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 1 Hi 1!
Client: request: Hi 1!
Server: received request: Hi 1!

Subscribe another client and check that the second message is delivered to both of them:

F#
subscriber <! Connect
subscriber <! Subscribe another_queue_reader
publisher <! Publish (Content "Hi 2!")

Output:Queue: Connecting...
Queue: Connected.
Queue: Publish message: Hi 2!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 2 Hi 2!
Received queue message
Client: request: Hi 2!
Message is forwarded to Self
Queue: Received message with tag: 1 Hi 2!
Client: request: Hi 2!
Server: received request: Hi 2!
Server: received request: Hi 2!

Now disconnect the first subscriber and publish a message. Message should be delivered only once:

F#
subscriber <! Disconnect
publisher <! Publish (Content "Hi 3!")
publisher <! Disconnect

Output:Queue: Disconnecting...
Queue: Disconnected.
Queue: Publish message: Hi 3!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 3 Hi 3!
Client: request: Hi 3!
Server: received request: Hi 3!
Queue: Disconnecting...
Queue: Disconnected.

TESTING MESSAGE ROUTING

Message routing is a more advanced pub/sub scenario where publishers can attach routing keys to its messages and consumers can selectively receive only messages with routing keys of their interest. The routing exchange can not use the Fanout exchange type, instead it uses Direct, as shown below:

F#
queue_factory <! CreateExchange ("routing_publisher", (Exchange (Direct, "direct_logs")), true)
queue_factory <! CreateExchange ("routing_subscriber1", (Exchange (Direct, "direct_logs")), true)
queue_factory <! CreateExchange ("routing_subscriber2", (Exchange (Direct, "direct_logs")), true)

Output:val routing_publisher : ActorSelection
val routing_subscriber1 : ActorSelection
val routing_subscriber2 : ActorSelectionIf we now publish a message, it will not reach the server due to lack of subscribers:

F#
let routing_publisher = select "akka://system/user/queues/routing_publisher" system

routing_publisher <! Connect
routing_publisher <! Publish (ContentWithRouting ("Hi 1!", "a"))

Output:Queue: Connecting...
Queue: Connected.
Queue: Publish message: Hi 1!Now we will create a couple of subscribers and start routing messages to them with certain keys:

F#
let routing_subscriber1 = select "akka://system/user/queues/routing_subscriber1" system
let routing_subscriber2 = select "akka://system/user/queues/routing_subscriber2" system

routing_subscriber1 <! Connect
routing_subscriber1 <! Subscribe queue_reader
routing_subscriber1 <! Route "a"

routing_subscriber2 <! Connect
routing_subscriber2 <! Subscribe another_queue_reader
routing_subscriber2 <! Route "a"
routing_subscriber2 <! Route "b"

If we now start publishing messages, they will be routed accordingly:

F#
routing_publisher <! Publish (ContentWithRouting ("Hi 2!", "a"))
routing_publisher <! Publish (ContentWithRouting ("Hi 3!", "b"))
routing_subscriber1 <! Unsubscribe
routing_subscriber2 <! Unroute "b"

routing_publisher <! Publish (ContentWithRouting ("Hi 4!", "a"))
routing_publisher <! Publish (ContentWithRouting ("Hi 5!", "b"))
routing_subscriber2 <! Unsubscribe

Output:Queue: Connecting...
Queue: Connected.
Queue: Connecting...
Queue: Connected.
Queue: Publish message: Hi 2!
Received queue message
Message is forwarded to Self
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 1 Hi 2!
Queue: Received message with tag: 1 Hi 2!
Client: request: Hi 2!
Client: request: Hi 2!
Server: received request: Hi 2!
Server: received request: Hi 2!
Queue: Publish message: Hi 3!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 2 Hi 3!
Client: request: Hi 3!
Server: received request: Hi 3!
Queue: Publish message: Hi 4!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 3 Hi 4!
Client: request: Hi 4!
Received queue message
Server: received request: Hi 4!
Message is forwarded to Self
Queue: Publish message: Hi 5!

... and disconnect:

F#
routing_publisher <! Disconnect
routing_subscriber1 <! Disconnect
routing_subscriber2 <! Disconnect

Output:Queue: Disconnecting...
Queue: Disconnected.
Queue: Disconnecting...
Queue: Disconnected.
Queue: Disconnecting...
Queue: Disconnected.

TESTING TOPIC EXCHANGE

If routing key scenario is an advanced form of a pub/sub communication, then topic exchange is an advanced form of routing where subscribers can use a special wild card notation to subscribe to messages with routing keys falling into a certain range. As with previous examples, we start by creating queue actors:

F#
queue_factory <! CreateExchange ("topic_publisher", (Exchange (Topic, "topic_logs")), true)
let topic_publisher = select "akka://system/user/queues/topic_publisher" system
queue_factory <! CreateExchange ("topic_subscriber", (Exchange (Topic, "topic_logs")), true)
let topic_subscriber = select "akka://system/user/queues/topic_subscriber" system

Output:

val topic_publisher : ActorSelection
val topic_subscriber : ActorSelection

And the first published message doesn't reach any subscribers as there are none:

F#
topic_publisher <! Connect
topic_publisher <! Publish (ContentWithRouting ("Hi 1!", "x.y"))

Output:

Queue: Connecting...
Queue: Connected.
Queue: Publish message: Hi 1!

Let's now subscribe to topics using the wild card notation:

F#
topic_subscriber <! Connect
topic_subscriber <! Subscribe queue_reader
topic_subscriber <! Route "x.*"
topic_publisher <! Publish (ContentWithRouting ("Hi 2!", "x"))
topic_publisher <! Publish (ContentWithRouting ("Hi 3!", "x.y"))
topic_subscriber <! Route "x"
topic_publisher <! Publish (ContentWithRouting ("Hi 4!", "x"))
topic_subscriber <! Route "#"
topic_publisher <! Publish (ContentWithRouting ("Hi 5!", "y"))
topic_subscriber <! Disconnect

Output:

Queue: Connecting...
Queue: Connected.
Queue: Publish message: Hi 2!
Queue: Publish message: Hi 3!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 1 Hi 3!
Client: request: Hi 3!
Server: received request: Hi 3!
Queue: Publish message: Hi 4!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 2 Hi 4!
Client: request: Hi 4!
Server: received request: Hi 4!
Queue: Publish message: Hi 5!
Received queue message
Message is forwarded to Self
Queue: Received message with tag: 3 Hi 5!
Client: request: Hi 5!
Server: received request: Hi 5!
Queue: Disconnecting...
Queue: Disconnected

.... and disconnect after publishing a last message that won't be fetched:

F#
topic_publisher <! Publish (ContentWithRouting ("Hi 6!", "x.y"))
topic_publisher <! Disconnect

Output:

Queue: Publish message: Hi 6!
Queue: Disconnecting...
Queue: Disconnected.

CONCLUSION

I hope that the code snippets above demonstrated that F# is a very efficient language to wrap infrastructure code and build finite state machines - our queue actor is an FSM that mutates from state to state without a single mutable piece of data. I was also very pleased with the F# Akka.NET API. Unlike some other API language wrappers, Akka.NET F# API is significantly different from its C# counterpart, offering declaring and using actors in an idiomatic F# manner - with functions and computational expressions.

Publisert 08.02.2016 av

Vagif Abilov