ZeroMQ vs. Serialization: a match made in...

Another day, another opportunity to bring together code libraries that really should work well with each other, but the authors have never collaborated. This is my role, and I love it, except when I'm tearing my hair out trying to figure out how to get these two very different technologies to work well together.

This week, it's object serialization and ZeroMQ queues that I'm bashing into each other. Before delving into the solution I came up with, let's enumerate what my system architect wanted to get. Requirements are always good, right?

We are creating some interfaces that we hope to publish to a small number of industry partners. At this time, we are creating code within our systems to manage configuring and status tracking on some fairly common equipment. Ideally, we'd rather have the equipment just have a standard API for configuration and status tracking, so what we are doing now is creating a standard API, and writing code to make each of the several competing devices able to use the standard API. That said, some day we would like to publish the API, work with the vendors to make sure it works for their needs, and have the vendors implement this API directly in their devices. Great idea, right?

This means several things to us. First and foremost, we have to make a good API. It has to be designed well so the vendors won't just laugh and move on. It has to use technologies that are within their understanding, and will fit within their system footprints. Because these are embedded devices, this means the communications should be based on open standards, and on well-known, functioning software.

We picked ZeroMQ as our messaging platform because it's pretty much ideal for our needs. It supports local communications via fast, on-box resources (UNIX domain sockets), network transports for off-box communications (TCP sockets), and multi-cast communications for when that is most effective.

ZeroMQ is a message transport, but says nothing about the format of the messages themselves. To quote directly from the ZeroMQ FAQ:

Does ØMQ include APIs for serializing data to/from the wire representation?

No. This design decision adheres to the UNIX philosophy of "do one thing and do it well". In the case of ØMQ, that one thing is moving messages, not marshalling data to/from binary representations.

So we're left up to our own designs in serializing messages through our ZeroMQ queues. The Architect is a big fan of XML, because it is readable, lends itself to debugging, and because we can clearly specify the API -- perfect for a published API, right? I should note that the interfaces we will be exposing are low-volume APIs; during normal usage of the system these APIs may be exercised tens of times per hour, and then only if the operator is feeling energetic.

I'ved used gSOAP to create SOAP interfaces in C++ in the past, so when the Architect mentioned this, gSOAP leaped immediately to mind. I quickly set about searching for an example of how to get gSOAP to simply generate the XML de/serialization code for a set of messages based on an XML specification, because we obviously didn't need all the SOAP socket-handling stuff as we are using ZeroMQ for message transport.

This took a bit of doing. I finally stumbled across a promising-look blog post by Bojan Komazec that provided a perfect example. This met my goals for this demonstration:

The messages are specified in an XML Schema Definition
gSOAP generates objects for the messages
gSOAP generates the de/serialization code for these objects
The objects can be serialized to/from buffers I can use with ZeroMQ queues

So I worked up this example program to demonstrate this ability. We'll start of with the messages.

<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:gt="http://www.bk.com/gSOAP/test" attributeFormDefault="unqualified" elementFormDefault="qualified" targetNamespace="http://www.bk.com/gSOAP/test" xmlns:xs="http://www.w3.org/2001/XMLSchema">
  <xs:element name="RegisterSignalConditionRequest">
    <xs:complexType>
      <xs:attribute name="topic" type="xs:string" use="required" />
      <xs:attribute name="interval" type="xs:unsignedInt" use="required" />
    </xs:complexType>
  </xs:element>
  <xs:element name="RegisterSignalConditionResponse">
    <xs:complexType>
      <xs:attribute name="success" type="xs:boolean" use="required" />
    </xs:complexType>
  </xs:element>
  <xs:element name="SignalConditionReport">
    <xs:complexType>
      <xs:attribute name="signalLevel" type="xs:float" use="required" />
      <xs:attribute name="signalToNoiseRatio" type="xs:float" use="optional" />
    </xs:complexType>
  </xs:element>
</xs:schema>


As you can see, we are creating Request, Response, and Report messages. The first two will be employed to ask a code module to generated reports, on a Request/Response queue. The latter will be published periodically on a Publish/Subscribe queue, as directed by the Request(s).

The first simple program was simply a demonstrator to actually pull together the gSOAP serializers and a ZeroMQ pub/sub queue. First we'll present the Makefile for this simple program, because there are a couple of gSOAP related steps to get to the code.


CXXFLAGS= -g # -Wall
LIBS= -lgsoap++ -lzmq

.PHONY: all
all: soapRegister

soapC.cpp: register.h
 soapcpp2 -I $(HOME)/src/gsoap-2.8/gsoap/import register.h 

register.h: register.xsd
 wsdl2h -t $(HOME)/src/gsoap-2.8/gsoap/typemap.dat -N tsp register.xsd 

clean:
 $(RM) soapRegister register.h soapC* soapH* soapStub.* tsp.nsmap *.o

#-- First example program --
RSRCS= soapRegister.cpp soapC.cpp
ROBJS= $(RSRCS:.cpp=.o)

soapRegister: $(ROBJS)
 $(CXX) -o $@ $^ $(LIBS)

soapRegister.cpp: soapC.cpp ZeeMQ.h # soapC.h actually, but this works better


The first step is to use the gSOAP wsdl2h program to generate the object classes from the XSD. The second gSOAP step is to generate the de/serialization code using the soapcpp2 program. Then, finally, we compile our application in soapRegister.cpp with the generated code to get a test program.

Here, then, is the test program, soapRegister.cpp:

#include "soapH.h"
#include "tsp.nsmap"
#include <iostream>
#include <fstream>
#include <string>
#include <sstream>

#include "ZeeMQ.h"

using std::string;
using std::stringstream;

// RAII support
class CScopedSoap
{
  // gSOAP runtime context
  struct soap m_soap;

public:
  CScopedSoap()
  {
    // initialize runtime context
    soap_init(&m_soap);

    // Set input and output modes
    soap_imode(&m_soap, SOAP_ENC_XML);
    soap_set_omode(&m_soap, SOAP_XML_INDENT);

    // start new (de)serialization phase
    soap_begin(&m_soap);
  }

  ~CScopedSoap()
  {
    // remove deserialized class instances (C++ objects)
    soap_destroy(&m_soap);

    // clean up and remove deserialized data
    soap_end(&m_soap);

    // detach context (last use and no longer in scope)
    soap_done(&m_soap);
  }

  struct soap & GetSoap()
  {
    return m_soap;
  } 
};


bool ImportFromXML(struct soap & soap, 
                   _tsp__RegisterSignalConditionRequest & request,
                   std::istream & istream)
{
  soap.is = &istream;

  if (soap_read__tsp__RegisterSignalConditionRequest(&soap, &request) != SOAP_OK)
  { 
    std::cerr << "soap_read__tsp__RegisterSignalConditionRequest() failed"
              << std::endl;
    return false;
  } 
  return true;
}


bool ExportToXML(struct soap & soap, 
                 _tsp__RegisterSignalConditionRequest & request, 
                 std::ostream & ostream)
{
  soap.os = &ostream;

  // calls soap_begin_send, soap_serialize, soap_put and soap_end_send
  if (soap_write__tsp__RegisterSignalConditionRequest(&soap, &request) != SOAP_OK)
  { 
    std::cerr << "soap_write__tsp__RegisterSignalConditionRequest() failed" 
              << std::endl;
    return false;
  } 
  return true;
}


int main()
{
  std::stringstream cmd;
  CScopedSoap scopedSoap;

  _tsp__RegisterSignalConditionRequest request;

  request.topic = "STEP";
  request.interval = 200;
  if (ExportToXML(scopedSoap.GetSoap(), request, cmd))
  { 
    std::cout << "Serialized to: " << cmd.str() << std::endl;
  } 
  else
  { 
    std::cerr << "Write exception caught" << std::endl;
    return -1;
  }

  // Open up Sub, then Pub queues, so comms will go through.
  // Publish the serialized message.

  std::string addr("ipc:///tmp/trash.ipc");
  std::string topic("TRASH");
  ZeeMQ::SubSocket sub(addr, topic);
  ZeeMQ::PubSocket pub(addr, topic);

  pub.Send(cmd.str());

  // Let's try to get it and print it out.

  _tsp__RegisterSignalConditionRequest readBack;
  if (ImportFromXML(scopedSoap.GetSoap(), readBack, cmd))
  {
    std::cout << "Client wants us to publish on '" 
              << readBack.topic << "' every " 
              << readBack.interval << " msec." << std::endl;
  }
  else
  {
    std::cerr << "Read exception caught" << std::endl;
    return -2;
  }

  std::string message = sub.Recv();
  std::cout << "Raw XML is:\n" << message << std::endl;

  return 0;
}
The follow-on programs to this simple example interact with each other using both a Req/Rep queue for command input and a Pub/Sub queue for reporting, and demonstrate more of the interaction we'll be looking for in our final design, but this example was my proof-of-concept for pairing gSOAP serialization with ZeroMQ sockets. I hope it helps someone else as well.

The first generation of this program simply serialized a request into the cmd stream, then deserialized it back from the command string. Once this worked (a very straightforward quest given my starting point) I added the pub/sub queues. The ZeroMQ queue I/O is performed using a wrapper I wrote around the ZMQ interface to make it more C++-aware; the standard zmq.hpp provides a very C-like interface to the library.

Currently this is based on both zmq.hpp and another C++ header file I encountered a while ago, zhelpers.hpp; I will at some point make ZeeMQ rely only on the zmq C API. One other deficiency I discovered while contemplating this blog post in the shower this morning is that PubSocket::Write needs another entry where the caller can specify the topic, as it is absolutely the case in my follow-on program that the publisher will need to be able to publish multiple topics on the same publish queue. I think I'll leave the concept of the "bound" topic in place, since publishing on a single topic is pretty common.

Here's ZeeMQ.h:

#ifndef __ZEEMQ_H__
#define __ZEEMQ_H__

#include 
#include "zhelpers.hpp"

namespace ZeeMQ
{
  enum ErrorType { ZMQ_ERROR, UNKNOWN_TOPIC };

  class Error
  {
  public:
    Error()
      : mType(ZMQ_ERROR)
    {
    }

    Error(ErrorType type)
      : mType(type)
    {
    }

  private:
    ErrorType mType;
  };

  class Socket
  {
  public:
    virtual ~Socket() {}

  protected:
    Socket(int io_threads, int sock_type)
      : mContext(io_threads)
      , mSocket(mContext, sock_type)
    {
    }

    zmq::context_t mContext;
    zmq::socket_t  mSocket;
  };

  class WritableSocket : virtual public Socket
  {
  public:
    virtual ~WritableSocket() {}

    bool Send(std::string & message)
    {
      return s_send(mSocket, message);
    }

  protected:
    WritableSocket(int io_threads, int sock_type)
      : Socket(io_threads, sock_type)
    {
    }
  };

  class ReadableSocket : virtual public Socket
  {
  public:
    virtual ~ReadableSocket() {}

    std::string Recv()
    {
      return s_recv(mSocket);
    }

  protected:
    ReadableSocket(int io_threads, int sock_type)
      : Socket(io_threads, sock_type)
    {
    }
  };

  class PubSocket : public WritableSocket
  {
  public:
    PubSocket(std::string & addr, std::string & topic, int io_threads = 1)
      : WritableSocket(io_threads, ZMQ_PUB)
      , Socket::Socket(io_threads, ZMQ_SUB)
      , mTopic(topic)
    {
      mSocket.bind(addr.c_str());
    }

    bool Send(std::string & message)
    {
      // Publishing requires sending the bound topic
      // then the message.

      return (s_sendmore(mSocket, mTopic) &&
          s_send(mSocket, message));
    }

  private:
    std::string mTopic;
  };

  class SubSocket : public ReadableSocket
  {
  public:
    SubSocket(std::string & addr, std::string & topic, int io_threads = 1)
      : ReadableSocket(io_threads, ZMQ_SUB)
      , Socket::Socket(io_threads, ZMQ_SUB)
      , mTopic(topic)
    {
      mSocket.connect(addr.c_str());
      mSocket.setsockopt(ZMQ_SUBSCRIBE, mTopic.c_str(), mTopic.size());
    }

    std::string Recv()
    {
      // We will receive topic and then message on
      // subscribe queues

      std::string topic = s_recv(mSocket);
      std::string message = s_recv(mSocket);

      // Ensure we asked for this message
      // @todo:wes can this ever actually fail?

      if (0 != mTopic.compare(topic))
      {
        throw Error(UNKNOWN_TOPIC);
      }

      return message;

  private:
    std::string mTopic;
  };

  class ReqSocket : public WritableSocket, ReadableSocket
  {
    ReqSocket(std::string & addr, int io_threads = 1)
      : WritableSocket(io_threads, ZMQ_REQ)
      , ReadableSocket(io_threads, ZMQ_REQ)
      , Socket::Socket(io_threads, ZMQ_REQ)
    {
      Socket::mSocket.connect(addr.c_str());
    }
  };

  class RepSocket : public WritableSocket, ReadableSocket
  {
    RepSocket(std::string & addr, int io_threads = 1)
      : WritableSocket(io_threads, ZMQ_REP)
      , ReadableSocket(io_threads, ZMQ_REP)
      , Socket::Socket(io_threads, ZMQ_REP)
    {
      Socket::mSocket.bind(addr.c_str());
    }
  };

} // namespace ZeeMQ


#endif // __ZEEMQ_H__
This library has the nice behavior of encapsulating the fact that Pub queues are write-only and Sub queues are read-only. The C way to handle this is to simply have read and write API entries, and return an error if you try to cross-thread the API. C++ gives us a better, more expressive way of handling this, once you've revisited the nuances of the diamond inheritance pattern problem.

I finally found a solution to the code snippet problem. Thanks Luka!

Comments

  1. I've since found a much better tool for generating serialization code from XSDs: CodeSynthesis/XSD. I'll blog about this in a future post.

    ReplyDelete

Post a Comment

Popular Posts