NetMQ: Breaking Down the Basics


NetMQ is a .NET library that provides a high-level API for several messaging patterns, allowing applications to communicate with each other over a network. It’s a 100% native C# port of the lightweight messaging library ZeroMQ.

NetMQ is transport-agnostic and can work over several transports such as inproc (inter-thread, inter-process), IPC (inter-process), TCP, and more. It’s designed to handle high load, is very flexible, and doesn’t require a dedicated message broker.

NetMQ supports several messaging patterns:

  1. Request-Reply: This is a two-way communication pattern where a client sends a request and waits for a reply from the server.
  2. Publish-Subscribe: This pattern is used for distributing data from one publisher to multiple subscribers. The publisher doesn’t need to know about the subscribers, and the subscribers can specify the types of messages they want to receive.
  3. Push-Pull (Pipeline): This pattern is used for distributing tasks among multiple workers. The Push socket sends messages to Pull sockets in a round-robin fashion, load balancing the tasks among the workers.
  4. Dealer-Router: This pattern is used for complex routing scenarios. The Dealer socket is used for asynchronous request-reply messaging, allowing the sending of multiple requests without waiting for replies. The Router socket provides advanced routing capabilities.
  5. Pair: This pattern is used for connecting two sockets in a peer-to-peer fashion. It’s the simplest pattern, allowing one-to-one communication between two endpoints.
  6. Bus: This pattern is used for creating a message bus that can be used for publish-subscribe and point-to-point communication. Every message sent to a Bus socket is received by all other members of the bus.
  7. XPub-XSub: This pattern is used for creating a proxy that can be used to connect a publisher to multiple subscribers. The XPub socket collects messages from multiple publishers and the XSub socket distributes them to the subscribers.

Sample implementation of the Publish-Subscribe model

This C# code defines a class EventPublisher that uses the NetMQ library to implement a publisher in a Publish-Subscribe pattern.

using System.Threading;
using NetMQ;
using NetMQ.Sockets;

class Program
{
    internal class EventPublisher {
        private NetMQSocket socket;
        private static readonly object synObj = new object();
        public EventPublisher() {
            const string serviceUri = "tcp://127.0.0.1:8501";
            socket = new PublisherSocket();
            socket.Options.SendHighWatermark = 0;
            socket.Bind(serviceUri);
            Thread.Sleep(200);
        }

        public void Publish(string message) {
            lock (synObj)
            {
                socket.SendFrame("ImageStored" + "," + message);
            }

        }
    }
}

Here’s a breakdown of what the code does:

  1. public EventPublisher(): This is the constructor of the EventPublisher class. It initializes the socket as a PublisherSocket, sets its SendHighWatermark option to 0 (which means there’s no limit on the number of outstanding messages the socket can send), and binds the socket to the URI “tcp://127.0.0.1:8501”. It then pauses for 200 milliseconds to allow the socket to bind successfully before any messages are published.
  2. public void Publish(string identifier): This is a method that sends a message over the socket. The message is a string composed of “ImageStored” as a topic and the message parameter, separated by a comma. The lock statement is used to ensure that only one thread can send a message at a time, preventing race conditions.

Next, we will create a subscriber that receives the string message from the publisher.

using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace Program {
    internal class EventSubscriber {
        private SubscriberSocket socket;
        private static readonly object SyncReceiveMessages = new object();
        public EventSubscriber()
        {
            const string topic = "ImageStored";
            const string serviceUri = "tcp://127.0.0.1:8501";
            socket = new SubscriberSocket();
            socket.Options.ReceiveHighWatermark = 100000;
            socket.Connect(serviceUri);
            socket.Subscribe(topic);

            Thread.Sleep(100);
            Task.Factory.StartNew(FetchMessage);
        }

        private void FetchMessage() {
            while (true) {
                while (socket.TryReceiveFrameString(out var messageReceived)) {
                    lock (SyncReceiveMessages) {
						console.Writeline($"Data received: {messageReceived}")
                    }
                }
            }
        }
    }
}

This C# code defines a class EventSubscriber that uses the NetMQ library to implement a subscriber in a Publish-Subscribe pattern. Here’s a breakdown of what the code does:

  1. public EventSubscriber(): This is the constructor of the EventSubscriber class. It initializes the socket as a SubscriberSocket, sets its ReceiveHighWatermark option to 100000 (which means the socket can queue up to 100000 messages if they are not processed), and connects the socket to the URI “tcp://127.0.0.1:8501”. It then subscribes to the topic “ImageStored”, pauses for 100 milliseconds to allow the socket to connect successfully before any messages are received, and starts a new task that runs the FetchMessage method.
  2. private void FetchMessage(): This is a method that continuously tries to receive messages over the socket. If a message is received, it is printed to the console. The lock statement is used to ensure that only one thread can print a message at a time, preventing race conditions. The method runs in an infinite loop, so it will keep trying to receive messages until the program is terminated.

References

netmq