Streaming data over ZeroMQ

Karabo Bridge provides access to live data during the experiment over a ZeroMQ socket. The karabo_data Python package can stream data from files using the same protocol. You can use this to test code which expects to receive data from Karabo Bridge, or use the same code for analysing live data and stored data.

To stream the data from a file or run unmodified, use the command:

karabo-bridge-serve-files /gpfs/exfel/exp/SPB/201830/p900022/raw/r0034 4545

The number (4545) must be an unused TCP port above 1024. It will bind to this and stream the data to any connected clients.

We provide Karabo bridge clients as Python and C++ libraries.

If you want to do some processing on the data before streaming it, you can use this Python interface to send it out:

class karabo_data.export.ZMQStreamer(port, maxlen=10, protocol_version='2.2', dummy_timestamps=False)

ZeroMQ inteface sending data over a TCP socket.

# Server:
serve = ZMQStreamer(1234)
serve.start()

for tid, data in run.trains():
    result = important_processing(data)
    serve.feed(result)

# Client:
from karabo_bridge import Client
client = Client('tcp://server.hostname:1234')
data = client.next()
Parameters
  • port (int) – Local TCP port to bind socket to

  • maxlen (int, optional) – How many trains to cache before sending (default: 10)

  • protocol_version (('1.0' | '2.1')) – Which version of the bridge protocol to use. Defaults to the latest version implemented.

  • dummy_timestamps (bool) – Some tools (such as OnDA) expect the timestamp information to be in the messages. We can’t give accurate timestamps where these are not in the file, so this option generates fake timestamps from the time the data is fed in.

start()

Start a zmq.REP socket.

feed(data, metadata=None)

Push data to the sending queue.

This blocks if the queue already has maxlen items waiting to be sent.

Parameters
  • data (dict) –

    Contains train data. The dictionary has to follow the karabo_bridge protocol structure:

    • keys are source names

    • values are dict, where the keys are the parameter names and values must be python built-in types or numpy.ndarray.

  • metadata (dict, optional) –

    Contains train metadata. The dictionary has to follow the karabo_bridge protocol structure:

    • keys are (str) source names

    • values (dict) should contain the following items:

      • ’timestamp’ Unix time with subsecond resolution

      • ’timestamp.sec’ Unix time with second resolution

      • ’timestamp.frac’ fractional part with attosecond resolution

      • ’timestamp.tid’ is European XFEL train unique ID

    {
        'source': 'sourceName'  # str
        'timestamp': 1234.567890  # float
        'timestamp.sec': '1234'  # str
        'timestamp.frac': '567890000000000000'  # str
        'timestamp.tid': 1234567890  # int
    }
    

    If the metadata dict is not provided it will be extracted from ‘data’ or an empty dict if ‘metadata’ key is missing from a data source.