Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ ocs.client_t
:undoc-members:
:show-inheritance:

ocs.common
----------

The ``common/`` directory contains driver code shared among agents.

ocs.common.influxdb_drivers
```````````````````````````

.. automodule:: ocs.common.influxdb_drivers
:members:
:undoc-members:
:show-inheritance:

.. _ocs_agent_api:

ocs.ocs_agent
Expand Down
30 changes: 30 additions & 0 deletions docs/developer/feeds.rst
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,36 @@ Attempting to publish an invalid field name should raise an error by the agent.
However, if invalid field names somehow make it to the aggregator, the
aggregator will attempt to correct them before writing to disk.

Using InfluxDB Tags
'''''''''''''''''''
ocs v0.12.1 introduced improved InfluxDB tagging. This changes the structure
with which the InfluxDB Publisher agents writes to InfluxDB. In order to take
advantage of this improved tagging agents must supply tag information when
publishing data to their feeds.

This data extends the existing message structure using the 'influxdb_tags'
key::

message = {
'block_name': <Key to identify group of co-sampled data>
'timestamps': [ctime1, ctime2, ... ]
'influxdb_tags': {
'field_name_1': {'tag1': tag1_1, 'tag2': tag1_2, '_field': 'value'},
'field_name_2': {'tag1': tag2_1, 'tag2': tag2_2, '_field': 'value'}
}
'data': {
'field_name_1': [data1_1, data1_2, ...],
'field_name_2': [data2_1, data2_2, ...]
}
}

The field names within the 'data' and 'influxdb_tags' dicts must match, and
there must be a set of tags for every field. Each set of tags must also contain
the special '_field' key, which determine the field name within InfluxDB. In
this example it is just the word 'value', but a more practical example would
be 'temperature', 'resistance', or 'voltage' for a device measuring
temperatures in a cryostat.

Subscribing to a Feed
---------------------

Expand Down
9 changes: 8 additions & 1 deletion ocs/agents/fake_data/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ def acq(self, session, params):
next_timestamp += n_data / self.sample_rate

# self.log.info('Sending %i data on %i channels.' % (len(t), len(T)))
session.app.publish_to_feed('false_temperatures', block.encoded())
tags = {}
for channel in self.channel_names:
_channel_tag = {channel: {'channel': int(channel.split('_')[1]),
'_field': 'temperature'}}
tags.update(_channel_tag)
publish_block = block.encoded()
publish_block.update(influxdb_tags=tags)
session.app.publish_to_feed('false_temperatures', publish_block)

# Update session.data
data_cache = {"fields": {}, "timestamp": None}
Expand Down
158 changes: 130 additions & 28 deletions ocs/common/influxdb_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def timestamp2influxtime(time, protocol):

Args:
time:
ctime timestamp
Unix 'ctime' timestamp, i.e. ``1775500953.5108523``
protocol:
'json' or line'
InfluxDB protocol to format timestamp for. Either 'json' or line'.

"""
if protocol == 'json':
Expand All @@ -35,6 +35,23 @@ def _format_field_line(field_key, field_value):
return line


@dataclass
class InfluxTags:
"""Stores tags to apply to a set of data within an InfluxBlock.

Examples:
>>> tags = InfluxTags(shared_tags={'feed': 'example_fed'},
... field_tags={'key1': 1, '_field': 'value'})


"""
#: Tags to apply to all data points.
shared_tags: dict

#: Tags to apply per field, along with '_field' value to use.
field_tags: dict = None


@dataclass
class InfluxBlock:
"""Holds and can convert the data and feed information into a format
Expand All @@ -54,7 +71,7 @@ class InfluxBlock:
measurement: str

#: Tags to apply to the measurements.
tags: dict
tags: InfluxTags

def _group_data(self):
"""Takes the block structured data and groups each data point in a set
Expand Down Expand Up @@ -135,8 +152,23 @@ def _encode_line(self, fields, timestamp):
"""
# Convert json format tags to line format
tag_list = []
for k, v in self.tags.items():
tag_list.append(f"{k}={v}")
for k, v in self.tags.shared_tags.items():
tag_list.append(f'{k}={v}')

# Add unique field tags to the list and overwrite the field key
if self.tags.field_tags:
field_name = fields.split('=')[0]
tags_to_add = self.tags.field_tags.get(field_name)
for k, v in tags_to_add.items():
if k == '_field':
continue
tag_list.append(f'{k}={v}')

# Overwrite field name with _field from tags_to_add (influxdb_tags)
new_field_key = tags_to_add.get('_field')
field_value = fields.split('=')[1]
fields = f'{new_field_key}={field_value}'

tags = ','.join(tag_list)

try:
Expand Down Expand Up @@ -170,6 +202,24 @@ def _encode_json(self, fields, timestamp):
'tags': {'feed': 'false_temperatures'}}

"""
# Add unique field tags to the list and overwrite the field key
if self.tags.field_tags:
(field_name, field_value), = fields.items() # Unpack single (k, v)
tags_to_add = self.tags.field_tags.get(field_name)
tags = {}
for k, v in tags_to_add.items():
if k == '_field':
continue
tags[k] = v

tags.update(self.tags.shared_tags)

# Overwrite field name with _field from tags_to_add (influxdb_tags)
new_field_key = tags_to_add.get('_field')
fields = {new_field_key: field_value}
else:
tags = self.tags.shared_tags

try:
t_influx = timestamp2influxtime(timestamp, protocol='json')
except OverflowError:
Expand All @@ -181,7 +231,7 @@ def _encode_json(self, fields, timestamp):
"measurement": self.measurement,
"time": t_influx,
"fields": fields,
"tags": self.tags,
"tags": tags,
}
return json

Expand All @@ -201,18 +251,42 @@ def encode(self, protocol='line'):
"""
encoded_list = []
if protocol == 'line':
fields_lines = self._group_fields_lines()
for fields, time_ in zip(fields_lines, self.timestamps):
line = self._encode_line(fields, time_)
if line is not None:
encoded_list.append(line)
# If we don't have unique field tags, group the fields together
if self.tags.field_tags is None:
fields_lines = self._group_fields_lines()
for fields, time_ in zip(fields_lines, self.timestamps):
line = self._encode_line(fields, time_)
if line is not None:
encoded_list.append(line)

# If we do have field_tags, encode each line separately
else:
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
for (field, value) in fields.items():
f_line = _format_field_line(field, value)
line = self._encode_line(f_line, time_)
if line is not None:
encoded_list.append(line)

elif protocol == 'json':
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
text = self._encode_json(fields, time_)
if text is not None:
encoded_list.append(text)
# If we don't have unique field tags, group the fields together
if self.tags.field_tags is None:
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
text = self._encode_json(fields, time_)
if text is not None:
encoded_list.append(text)

# If we do have field_tags, encode each line separately
else:
grouped_data = self._group_data()
for fields, time_ in zip(grouped_data, self.timestamps):
for (field, value) in fields.items():
single_field_dict = {field: value}
text = self._encode_json(single_field_dict, time_)
if text is not None:
encoded_list.append(text)
else:
print(f"Protocol '{protocol}' not supported.")

Expand Down Expand Up @@ -243,38 +317,66 @@ def _convert_single_to_group(message):
def format_data(data, feed, protocol):
"""Format the data from an OCS feed for publishing to InfluxDB.

The scheme here is as follows:
- agent_address is the "measurement" (conceptually like an SQL
table)
- feed names are an indexed "tag" on the data structure
(effectively a table column)
- keys within an OCS block's 'data' dictionary are the field names
(effectively a table column)
The scheme used depends on whether 'influxdb_tags' are published to the Feed.

Without 'influxdb_tags' the measurement consists of the agent address, i.e.
``address_root.instance_id``, there is a single tag for the feed name, and
each data field from the OCS feed is used directly as the field name in
InfluxDB. This structure, however, is not ideal for InfluxDB query
performance.

When 'influxdb_tags' are provided by the agent then the measurement becomes
the agent class, and the address root and instance-id are added as tags.
The 'influxdb_tags' are also used to add additional tags and provide a
simple field name. See the examples below.

Args:
data (dict):
data from the OCS Feed subscription
Data from the OCS Feed subscription.
feed (dict):
feed from the OCS Feed subscription, contains feed information
used to structure our influxdb query
Feed from the OCS Feed subscription, contains feed information
used to structure our influxdb query.
protocol (str):
Protocol for writing data. Either 'line' or 'json'.

Returns:
list: Data ready to publish to influxdb, in the specified protocol.

Examples:
>>> # without 'influxdb_tags'
>>> format_data(data, feed, protocol='line')
['observatory.fake-data1,feed=false_temperatures channel_00=0.20307 1775502374078489088',
'observatory.fake-data1,feed=false_temperatures channel_01=0.35795 1775502374078489088',
'observatory.fake-data1,feed=false_temperatures channel_00=0.20548 1775502375078489088',
'observatory.fake-data1,feed=false_temperatures channel_01=0.36313 1775502375078489088']

>>> # with 'influxdb_tags'
>>> format_data(data, feed, protocol='line')
['FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20307 1775502374078489088',
'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.35795 1775502374078489088',
'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20548 1775502375078489088',
'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.36313 1775502375078489088']

"""
# Load data into InfluxBlock objects.
blocks = []
for _, bv in data.items():
tags = {'feed': feed['feed_name']}
shared_tags = {'feed': feed['feed_name']}
measurement = feed.get('agent_address')
if bv.get('influxdb_tags'):
measurement = feed.get('agent_class')
shared_tags['address_root'] = feed['agent_address'].split('.')[0]
shared_tags['instance_id'] = feed['agent_address'].split('.')[1]
tags = InfluxTags(
shared_tags=shared_tags,
field_tags=bv.get('influxdb_tags'))
if 'timestamp' in bv:
bv = _convert_single_to_group(bv)
block = InfluxBlock(
block_name=bv['block_name'],
data=bv['data'],
timestamps=bv['timestamps'],
measurement=feed['agent_address'],
measurement=measurement,
tags=tags)
blocks.append(block)

Expand Down
41 changes: 41 additions & 0 deletions ocs/ocs_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(self, name, keys):
"""
self.name = name
self.timestamps = []
self.tags = None
self.data = {
k: [] for k in keys
}
Expand All @@ -37,6 +38,7 @@ def append(self, d):
raise Exception("Block structure does not match: {}".format(self.name))

self.timestamps.append(d['timestamp'])
self.tags = d.get('influxdb_tags')

for k in self.data:
self.data[k].append(d['data'][k])
Expand All @@ -49,6 +51,7 @@ def extend(self, block):
raise Exception("Block structure does not match: {}".format(self.name))

self.timestamps.extend(block['timestamps'])
self.tags = block.get('influxdb_tags')
for k in self.data:
self.data[k].extend(block['data'][k])

Expand All @@ -58,6 +61,7 @@ def encoded(self):
return {
'block_name': self.name,
'data': {k: self.data[k] for k in self.data.keys()},
'influxdb_tags': self.tags,
'timestamps': self.timestamps,
}

Expand Down Expand Up @@ -213,6 +217,9 @@ def publish_message(self, message, timestamp=None):
Feed.verify_data_field_string(k)
Feed.verify_message_data_type(v)

# check influxdb_tags
Feed.verify_influxdb_tags(message)

# Data is stored in Block objects
block_name = message['block_name']
try:
Expand Down Expand Up @@ -244,6 +251,40 @@ def publish_message(self, message, timestamp=None):
self.agent.log.error('Could not publish to Feed. TransportLost. '
+ 'crossbar server likely unreachable.')

@staticmethod
def verify_influxdb_tags(message):
"""Check the 'influxdb_tags' to make sure all the needed information is
provided.

This checks to make sure each tag has a '_field' provided, and that
each field has a corresponding tag.

Args:
message (dict):
'message' dictionary value published (see Feed.publish_message for details).

Raises:
ValueError: If the 'influxdb_tags' provided in the message do not
meet the required format.

"""
tags = message.get('influxdb_tags')
if tags is None:
return

# check '_field' supplied with each tag
for v in tags.values():
if '_field' not in v:
error_msg = f"'_field' not supplied with 'influxdb_tags' for tag set {v}"
raise ValueError(error_msg)

# check that all fields have a corresponding tag
tag_fields = tags.keys()
for k in message['data'].keys():
if k not in tag_fields:
error_msg = f"'influxdb_tags' does not contain tags for '{k}'"
raise ValueError(error_msg)

@staticmethod
def verify_message_data_type(value):
"""Aggregated Feeds can only store certain types of data. Here we check
Expand Down
Loading