The CachedDataServer accepts timestamped field:value data, holds it in an in-memory cache, and serves it to clients over WebSockets. It is used to feed display widgets, to provide intermediate caching for derived-data transforms, and as a general-purpose data bus between OpenRVDAS components.

In the default OpenRVDAS installation a CachedDataServer is already running and listening for WebSocket connections on port 8766.


Running the Server

Default installation

The default installation uses supervisord to start and maintain the server:

server/cached_data_server.py --port 8766 \
    --disk_cache /var/tmp/openrvdas/disk_cache \
    --max_records 86400 -v

This serves WebSocket connections on port 8766, retains at most 86400 records per field (one day at 1 Hz), and maintains a disk cache at /var/tmp/openrvdas/disk_cache that is used to warm the in-memory cache on restart. It does not listen on a UDP port — all data arrives via WebSocket publish messages.

Stdout and stderr are written to /var/log/openrvdas/cached_data_server.std{out,err}. The full supervisord spec is in /etc/supervisor/conf.d/openrvdas.conf (Ubuntu) or /etc/supervisord.d/openrvdas.ini (CentOS/Redhat).

To start, stop, or restart the server, use the supervisord web interface at http://openrvdas:9001 or the command line:

root@openrvdas:~# supervisorctl
cached_data_server               RUNNING   pid 5641, uptime 1:35:54
logger_manager                   RUNNING   pid 5646, uptime 1:35:53

supervisor> stop cached_data_server
cached_data_server: stopped

supervisor> start cached_data_server
cached_data_server: started

supervisor> exit

Manual invocation

You can also run the server directly. If you are running a LoggerManager, pass --start_data_server to have it start its own CachedDataServer automatically.

For a standalone server that also listens for data on a UDP port:

server/cached_data_server.py \
  --udp 6225 \
  --port 8766 \
  --disk_cache /var/tmp/openrvdas/disk_cache \
  --back_seconds 3600 \
  --cleanup_interval 60 \
  -v

Server command-line flags

Flag Default Description
--port (required) WebSocket port to serve clients on
--udp (none) Comma-separated UDP port(s) to listen for incoming data on. Prefix with a multicast group to use multicast, e.g. 239.0.0.1:6225
--disk_cache (none) Directory for the disk-backed cache. On restart, data is reloaded from here to warm the in-memory cache
--back_seconds 86400 Maximum age (seconds) of data to retain
--max_records 2880 Maximum number of records to retain per field. Set to 0 for unlimited
--min_back_records 64 Minimum number of records to keep per field even when purging old data
--cleanup_interval 60 How often (seconds) to purge old data and flush the disk cache
--interval 0.5 How often (seconds) the server pushes updates to subscribed clients

The WebSocket Protocol

All interaction with the CachedDataServer — reading and writing — happens over a single WebSocket connection. Connect to ws://host:8766 and exchange JSON messages. Every message you send has a "type" field; every response from the server has "type", "status" (HTTP-style, e.g. 200), and "data" fields.

fields — list available fields

{"type": "fields"}

Returns a list of all field names currently held in the cache:

{"type": "data", "status": 200, "data": ["S330CourseTrue", "S330SpeedKt", ...]}

describe — get field metadata

{"type": "describe", "fields": ["S330CourseTrue", "S330SpeedKt"]}

Returns a dict of metadata (units, description, device, etc.) for each named field. Omit "fields" to get metadata for every field in the cache:

{
  "type": "data", "status": 200,
  "data": {
    "S330CourseTrue": {"description": "True course", "units": "degrees", ...},
    "S330SpeedKt":   {"description": "Speed in knots", "units": "kt", ...}
  }
}

subscribe — stream field updates

Subscribing is a two-step loop: send a subscribe message once to register interest, then send ready repeatedly to receive successive batches of updates.

{
  "type": "subscribe",
  "fields": {
    "S330CourseTrue": {"seconds": 30},
    "S330SpeedKt":    {"seconds": 0},
    "S330HeadingTrue":{"seconds": -1}
  }
}

The "seconds" value controls how much historical data is returned in the first response:

seconds value Meaning
0 Only new values that arrive after the subscription
-1 The single most recent value, then all future new values
N (positive) Up to N seconds of historical data, then all future new values

If "seconds" is omitted, 0 is used.

back_records (optional, per-field): guarantee a minimum number of historical records regardless of the seconds window. Useful when data arrives irregularly:

{"S330CourseTrue": {"seconds": 60, "back_records": 10}}

interval (optional, top-level): override the server’s default push interval (seconds). Useful for low-bandwidth clients or slow-changing data:

{
  "type": "subscribe",
  "fields": {"S330CourseTrue": {"seconds": 0}},
  "interval": 15
}

Wildcards: field names may contain * to match multiple fields:

{"type": "subscribe", "fields": {"S330*": {"seconds": -1}}}

format (optional, top-level): controls the shape of the data payload in each response.

"field_dict" (default) — a dict mapping each field name to a list of [timestamp, value] pairs:

{
  "S330CourseTrue": [[1714000000.0, 219.6], [1714000001.0, 219.7], ...],
  "S330SpeedKt":   [[1714000000.0, 8.9],   [1714000001.0, 8.9],   ...]
}

"record_list" — collated by timestamp into a list of DASRecord-like dicts. Useful when processing records in time order across multiple fields:

{
  "type": "subscribe",
  "fields": {"S330CourseTrue": {"seconds": 30}, "S330SpeedKt": {"seconds": 30}},
  "format": "record_list"
}

Response data:

[
  {"timestamp": 1714000000.0, "fields": {"S330CourseTrue": 219.6, "S330SpeedKt": 8.9}},
  {"timestamp": 1714000001.0, "fields": {"S330CourseTrue": 219.7, "S330SpeedKt": 8.9}},
  ...
]

ready — acknowledge and receive the next batch

After the initial subscribe response, send ready each time you are prepared to receive the next update:

{"type": "ready"}

The server responds with a data message containing all field values that have arrived since the previous ready. This back-pressure mechanism prevents a slow client from being overwhelmed with buffered data.

publish — write data into the cache

Any WebSocket client can push data into the cache using a publish message:

{
  "type": "publish",
  "data": {
    "timestamp": 1555468528.452,
    "fields": {
      "field_1": "value_1",
      "field_2": "value_2"
    }
  }
}

This is the mechanism used by the CachedDataWriter component to feed data into the server.


Reading from the Server

listen.py

logger/listener/listen.py accepts a --cached_data argument that subscribes to one or more fields and prints received records to stdout. Useful for quick inspection and for piping CDS data into other command-line tools.

logger/listener/listen.py --cached_data field_1,field_2,field_3

Connects to localhost:8766 by default. To target a different host or port, append @host:port:

logger/listener/listen.py --cached_data S330CourseTrue,S330SpeedKt@192.168.1.10:8766

All subscribed fields use seconds: 0 — only values that arrive after the subscription starts are returned. The --cached_data flag can be combined with other listen.py transforms and writers in the usual way:

logger/listener/listen.py \
  --cached_data S330CourseTrue,S330SpeedKt \
  --transform_prefix vessel \
  --write_logfile /var/tmp/log/s330

Interactive exploration

For ad-hoc queries or protocol debugging, any interactive WebSocket client works. Two popular command-line options are wscat (Node.js) and websocat (Rust).

List all cached fields:

$ wscat -c ws://localhost:8766
Connected (press CTRL+C to quit)
> {"type":"fields"}
< {"type": "data", "status": 200, "data": ["S330CourseTrue", "S330SpeedKt", ...]}

Get the most recent value of a field and exit:

$ echo '{"type":"subscribe","fields":{"S330CourseTrue":{"seconds":-1}}}' \
  | websocat ws://localhost:8766

CachedDataReader in a YAML config file

CachedDataReader is the standard component for pulling data from the CDS inside a logger configuration.

readers:
  class: CachedDataReader
  kwargs:
    data_server: localhost:8766
    subscription:
      fields:
        S330CourseTrue:
          seconds: 0
        S330SpeedKt:
          seconds: -1
        S330HeadingTrue:
          seconds: 60

All subscription options from the WebSocket protocol — wildcards, back_records, interval, format — are available inside the subscription dict.

As a convenience, fields may be given as a list instead of a dict; all fields are then subscribed with seconds: 0:

readers:
  class: CachedDataReader
  kwargs:
    data_server: localhost:8766
    subscription:
      fields:
        - S330CourseTrue
        - S330SpeedKt
        - S330HeadingTrue

CachedDataReader parameters:

Parameter Default Description
data_server localhost:8766 Host and port of the CachedDataServer
subscription (required) Subscription dict (see above)
bundle_seconds 0 If > 0, accumulate records for this many seconds and return them as a list
return_das_record False If True, wrap results in DASRecord objects
data_id None data_id to assign to returned DASRecord objects (requires return_das_record=True)
use_wss False Connect using secure WebSockets (wss://)
check_cert False Verify the server’s TLS certificate; may be a path to a .pem file

Example — read two fields and write them to a logfile:

readers:
  class: CachedDataReader
  kwargs:
    data_server: localhost:8766
    subscription:
      fields:
        S330CourseTrue:
          seconds: 0
        S330SpeedKt:
          seconds: 0
writers:
  class: LogfileWriter
  kwargs:
    filebase: /var/tmp/log/s330_derived

CachedDataReader in Python

from logger.readers.cached_data_reader import CachedDataReader

subscription = {
    'fields': {
        'S330CourseTrue': {'seconds': 30},
        'S330SpeedKt':    {'seconds': -1},
    }
}

reader = CachedDataReader(subscription=subscription, data_server='localhost:8766')

while True:
    record = reader.read()  # blocks until data arrives
    print(record)
    # {'timestamp': ..., 'fields': {'S330CourseTrue': ..., 'S330SpeedKt': ...}}

To return DASRecord objects instead of plain dicts:

reader = CachedDataReader(
    subscription=subscription,
    data_server='localhost:8766',
    return_das_record=True,
    data_id='s330_consumer',
)
record = reader.read()  # returns a DASRecord

To accumulate a time window of records before returning:

reader = CachedDataReader(
    subscription=subscription,
    data_server='localhost:8766',
    bundle_seconds=5,
)
records = reader.read()  # returns a list of dicts

External Python (asyncio + websockets)

Any Python program can talk to the CachedDataServer directly using the websockets library:

import asyncio
import json
import websockets

async def read_fields():
    async with websockets.connect('ws://localhost:8766') as ws:
        # Subscribe
        await ws.send(json.dumps({
            'type': 'subscribe',
            'fields': {
                'S330CourseTrue': {'seconds': 60},
                'S330SpeedKt':    {'seconds': -1},
            }
        }))
        await ws.recv()  # discard subscribe acknowledgement

        # Poll for updates
        while True:
            await ws.send(json.dumps({'type': 'ready'}))
            response = json.loads(await ws.recv())
            if response.get('status') == 200:
                for field, values in response.get('data', {}).items():
                    for timestamp, value in values:
                        print(f'{field}: {value} @ {timestamp}')
            await asyncio.sleep(1)

asyncio.run(read_fields())

One-shot query for the most recent value of all fields matching a pattern:

async def latest_values(pattern='S330*'):
    async with websockets.connect('ws://localhost:8766') as ws:
        await ws.send(json.dumps({
            'type': 'subscribe',
            'fields': {pattern: {'seconds': -1}},
        }))
        await ws.recv()  # discard subscribe acknowledgement
        await ws.send(json.dumps({'type': 'ready'}))
        response = json.loads(await ws.recv())
        return response.get('data', {})

data = asyncio.run(latest_values())

JavaScript (browser)

The OpenRVDAS WidgetServer class in display/js/widgets/widget_server.js handles the subscribe/ready loop automatically and dispatches incoming data to a list of display widgets:

var widgets = [
  new TextWidget('course_div', {'S330CourseTrue': {'seconds': 0}}, 'Degrees'),
  new TextWidget('speed_div',  {'S330SpeedKt':    {'seconds': 0}}, 'kt'),
];

var server = new WidgetServer(widgets, 'ws://localhost:8766');
server.serve();

For custom pages that don’t use the widget framework, the raw browser WebSocket API works the same way:

const ws = new WebSocket('ws://localhost:8766');

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'subscribe',
    fields: {
      S330CourseTrue: {seconds: 60},
      S330SpeedKt:    {seconds: -1},
    },
    format: 'record_list',
  }));
};

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  if (msg.type === 'data' && msg.status === 200) {
    // msg.data is an array of {timestamp, fields} objects
    console.log(msg.data);
  }
  ws.send(JSON.stringify({type: 'ready'}));
};

Writing to the Server

CachedDataWriter in a YAML config file

CachedDataWriter is the standard component for pushing data into the CDS from a logger pipeline:

writers:
  class: CachedDataWriter
  kwargs:
    data_server: localhost:8766

It accepts DASRecord or dict-format records and forwards them to the server via WebSocket publish messages. If the connection is lost, it buffers up to max_backup records (default: 86400) locally until it can reconnect.

UDP input

If the server is started with one or more --udp ports, it will also accept JSON-encoded records broadcast on those ports:

server/cached_data_server.py --port 8766 --udp 6225

The default supervisord installation does not enable UDP input. To enable it, uncomment the relevant line in scripts/start_openrvdas.sh:

#DATA_SERVER_LISTEN_ON_UDP='--udp $DATA_SERVER_UDP_PORT'

Direct Python API

Code that instantiates a CachedDataServer object directly (rather than connecting to a running server) can call cache_record() directly:

from server.cached_data_server import CachedDataServer

server = CachedDataServer(port=8766)
server.cache_record({
    'timestamp': time.time(),
    'fields': {'field_1': 'value_1', 'field_2': 'value_2'}
})

Input Data Formats

Whether data arrives via UDP, WebSocket publish, or direct cache_record() call, the server expects records in one of the following formats.

Standard format — a dict with an optional data_id and timestamp, and a mandatory fields key:

{
  "data_id": "s330",
  "timestamp": 1555468528.452,
  "fields": {
    "S330CourseMag":  244.29,
    "S330CourseTrue": 219.61,
    "S330SpeedKt":    8.9
  }
}

data_id is optional. timestamp is optional — time.time() is used if absent.

Pre-timestamped format — field values may themselves be lists of (timestamp, value) pairs, in which case the top-level timestamp is ignored:

{
  "fields": {
    "S330CourseMag":  [[1555468527.1, 244.1], [1555468528.4, 244.29]],
    "S330CourseTrue": [[1555468527.1, 219.5], [1555468528.4, 219.61]]
  }
}

With metadata — a record may include a metadata key. The server extracts the fields dict inside it and caches it as per-field metadata, which is then returned by describe requests. This metadata is emitted at intervals by RecordParser/ParseTransform when metadata_interval is set:

{
  "data_id": "s330",
  "fields": {"S330CourseMag": 244.29, "S330CourseTrue": 219.61},
  "metadata": {
    "fields": {
      "S330CourseMag":  {"description": "Magnetic course", "units": "degrees", ...},
      "S330CourseTrue": {"description": "True course",     "units": "degrees", ...}
    }
  }
}

Quick Reference

Reading:

Method When to use
listen.py --cached_data Command-line inspection; piping into other tools
wscat / websocat Interactive protocol debugging
CachedDataReader in YAML Reading CDS data inside a logger or derived-data pipeline
CachedDataReader in Python Scripted consumers within the OpenRVDAS codebase
asyncio + websockets External Python programs or services
JavaScript WidgetServer Browser display pages using the built-in widget framework
Raw WebSocket (JS/other) Custom browser pages or other language bindings

Writing:

Method When to use
CachedDataWriter in YAML Feeding the CDS from a logger pipeline
WebSocket publish message Any WebSocket client pushing data directly
--udp port Legacy UDP broadcast from instruments or other processes
cache_record() in Python In-process use when directly instantiating a CachedDataServer

Updated: