diff --git a/ocs/common/influxdb_drivers.py b/ocs/common/influxdb_drivers.py index e9c40fe6..864c4ac2 100644 --- a/ocs/common/influxdb_drivers.py +++ b/ocs/common/influxdb_drivers.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from datetime import datetime, timezone @@ -34,8 +35,213 @@ def _format_field_line(field_key, field_value): return line +@dataclass +class InfluxBlock: + """Holds and can convert the data and feed information into a format + suitable for publishing to InfluxDB. + + """ + #: OCS Block name. + block_name: str + + #: OCS data, as it comes across the feed. + data: dict + + #: Corresponding timestamps for the data. + timestamps: list + + #: Measurement name to publish to in InfluxDB. + measurement: str + + #: Tags to apply to the measurements. + tags: dict + + def _group_data(self): + """Takes the block structured data and groups each data point in a set + of fields so they can be combined with the corresponding timestamp. + + Example: + This takes something of the form:: + + {'channel_00': [-0.0943, -0.0965], + 'channel_01': [-0.0082, -0.0086]} + + and shapes it into the form:: + + [{'channel_00': -0.0943, 'channel_01': -0.0082}, + {'channel_00': -0.0965, 'channel_01': -0.0086}] + + This is the form needed to pass directly to the 'json' format. + + """ + grouped_data = [] + for i in range(len(self.timestamps)): + grouped_dict = {} + for k, v in self.data.items(): + grouped_dict[k] = v[i] + grouped_data.append(grouped_dict) + + return grouped_data + + def _group_fields_lines(self): + """Takes the block structured data and groups each data point into a + set of fields formatted for the 'line' protocol that can be combined + with the corresponding timestamp. + + Example: + This takes something of the form:: + + {'channel_00': [-0.1170, -0.1180, -0.1153], + 'channel_01': [-0.0241, -0.0267, -0.0226]} + + and shapes it into the form:: + + ['channel_00=-0.1170,channel_01=-0.0241', + 'channel_00=-0.1180,channel_01=-0.0267', + 'channel_00=-0.1153,channel_01=-0.0226'] + + This is the form needed to pass directly to the 'line' format. + + """ + grouped_data = self._group_data() + fields_lines = [] + for fields in grouped_data: + fields_line = [] + for mk, mv in fields.items(): + f_line = _format_field_line(mk, mv) + fields_line.append(f_line) + + field_line = ','.join(fields_line) + fields_lines.append(field_line) + return fields_lines + + def _encode_line(self, fields, timestamp): + """Given the fields and timestamps, encode them for use in the 'line' + protocol. + + Args: + fields (str): Comma separated list of fields. + timestamp (float): Unix timestamp. + + Returns: + str: Complete 'line' protocol string. + + Example: + An example of the formatting:: + + >>> block._encode_line('channel_00=-0.0341,channel_01=0.0612', 1775162753.7786078) + observatory.fake-data1,feed=false_temperatures channel_00=-0.0341,channel_01=0.0612 1775162753778607872 + + """ + # Convert json format tags to line format + tag_list = [] + for k, v in self.tags.items(): + tag_list.append(f"{k}={v}") + tags = ','.join(tag_list) + + try: + t_influx = timestamp2influxtime(timestamp, protocol='line') + except OverflowError: + print(f"Warning: Cannot convert {timestamp} to an InfluxDB compatible time. " + + "Dropping this data point.") + return + + line = f"{self.measurement},{tags} {fields} {t_influx}" + return line + + def _encode_json(self, fields, timestamp): + """Given the fields and timestamps, encode them for use in the 'json' + protocol. + + Args: + fields (dict): Dictionary with the fields and their associated values. + timestamp (float): Unix timestamp. + + Returns: + dict: Complete 'json' protocol dict. + + Example: + An example of the formatting:: + + >>> block._encode_json({'channel_00': -0.1149, 'channel_01': -0.0038}, 1775163570.7786078) + {'measurement': 'observatory.fake-data1', + 'time': '2026-04-02T20:59:30.778608', + 'fields': {'channel_00': -0.1149, 'channel_01': -0.0038}, + 'tags': {'feed': 'false_temperatures'}} + + """ + try: + t_influx = timestamp2influxtime(timestamp, protocol='json') + except OverflowError: + print(f"Warning: Cannot convert {timestamp} to an InfluxDB compatible time. " + + "Dropping this data point.") + return + + json = { + "measurement": self.measurement, + "time": t_influx, + "fields": fields, + "tags": self.tags, + } + return json + + def encode(self, protocol='line'): + """Encode Block data into InfluxDB compatible format for the given + protocol. + + Args: + protocol (str): Protocol to use to publish to InfluxDB. Either + 'line' or 'json'. Defaults to 'line'. + + Returns: + list: + List of encoded data points, formatted to be used by the + InfluxDB client. + + """ + encoded_list = [] + if protocol == 'line': + fields_lines = self._group_fields_lines() + for fields, time_ in zip(fields_lines, self.timestamps): + line = self._encode_line(fields, time_) + if line is not None: + encoded_list.append(line) + + elif protocol == 'json': + grouped_data = self._group_data() + for fields, time_ in zip(grouped_data, self.timestamps): + text = self._encode_json(fields, time_) + if text is not None: + encoded_list.append(text) + else: + print(f"Protocol '{protocol}' not supported.") + + return encoded_list + + +def _convert_single_to_group(message): + """Convert a single sample 'timestamp' message to the co-sampled group + 'timestamps' format, which we can handle already. + + Args: + message (dict): Single sample 'timestamp' message from the OCS feed. + + Notes: + This doesn't take the data directly from the feed, but the block dict + only. + + """ + message['timestamps'] = [message['timestamp']] + new_data = {} + for field, value in message['data'].items(): + new_data[field] = [value] + message['data'] = new_data + message.pop('timestamp') + return message + + def format_data(data, feed, protocol): - """Format the data from an OCS feed into a dict for pushing to InfluxDB. + """Format the data from an OCS feed for publishing to InfluxDB. The scheme here is as follows: - agent_address is the "measurement" (conceptually like an SQL @@ -58,56 +264,23 @@ def format_data(data, feed, protocol): list: Data ready to publish to influxdb, in the specified protocol. """ - measurement = feed['agent_address'] - feed_tag = feed['feed_name'] + # Load data into InfluxBlock objects. + blocks = [] + for _, bv in data.items(): + tags = {'feed': feed['feed_name']} + if 'timestamp' in bv: + bv = _convert_single_to_group(bv) + block = InfluxBlock( + block_name=bv['block_name'], + data=bv['data'], + timestamps=bv['timestamps'], + measurement=feed['agent_address'], + tags=tags) + blocks.append(block) - json_body = [] + # There is typically only one block, but just in case. + formatted_data = [] + for block in blocks: + formatted_data.extend(block.encode(protocol=protocol)) - # Reshape data for query - for _, bv in data.items(): - grouped_data_points = [] - times = bv['timestamps'] - num_points = len(bv['timestamps']) - for i in range(num_points): - grouped_dict = {} - for data_key, data_value in bv['data'].items(): - grouped_dict[data_key] = data_value[i] - grouped_data_points.append(grouped_dict) - - for fields, time_ in zip(grouped_data_points, times): - if protocol == 'line': - fields_line = [] - for mk, mv in fields.items(): - f_line = _format_field_line(mk, mv) - fields_line.append(f_line) - - measurement_line = ','.join(fields_line) - try: - t_line = timestamp2influxtime(time_, protocol='line') - except OverflowError: - print(f"Warning: Cannot convert {time_} to an InfluxDB compatible time. " - + "Dropping this data point.") - continue - line = f"{measurement},feed={feed_tag} {measurement_line} {t_line}" - json_body.append(line) - elif protocol == 'json': - try: - t_json = timestamp2influxtime(time_, protocol='json') - except OverflowError: - print(f"Warning: Cannot convert {time_} to an InfluxDB compatible time. " - + "Dropping this data point.") - continue - json_body.append( - { - "measurement": measurement, - "time": t_json, - "fields": fields, - "tags": { - "feed": feed_tag - } - } - ) - else: - print(f"Protocol '{protocol}' not supported.") - - return json_body + return formatted_data diff --git a/tests/test_influxdb_publisher.py b/tests/test_influxdb_publisher.py index 51124a43..a62a345d 100644 --- a/tests/test_influxdb_publisher.py +++ b/tests/test_influxdb_publisher.py @@ -14,10 +14,10 @@ def test_timestamp2influxtime(t, protocol, expected): assert timestamp2influxtime(t, protocol) == expected -def test_format_data(): +def test_format_data_line(): """Test passing int, float, string to InfluxDB line protocol.""" - # Not a real feed, but this is all we need for Publisher.format_data + # Not a real feed, but this is all we need for influxdb_drivers.format_data feed = {'agent_address': 'test_address', 'feed_name': 'test_feed'} data = {'test': {'block_name': 'test', @@ -31,6 +31,58 @@ def test_format_data(): expected = 'test_address,feed=test_feed key1=1i,key2=2.3,key3="test" 1615394417359038720' assert format_data(data, feed, 'line')[0] == expected + # Now test single timestamp structure + feed = {'agent_address': 'test_address', + 'feed_name': 'test_feed'} + data = {'test': {'block_name': 'test', + 'timestamp': 1615394417.3590388, + 'data': {'key1': 1, + 'key2': 2.3, + 'key3': "test"}, + } + } + + expected = 'test_address,feed=test_feed key1=1i,key2=2.3,key3="test" 1615394417359038720' + assert format_data(data, feed, 'line')[0] == expected + + +def test_format_data_json(): + """Test passing int, float, string to InfluxDB json protocol.""" + + # Not a real feed, but this is all we need for influxdb_drivers.format_data + feed = {'agent_address': 'test_address', + 'feed_name': 'test_feed'} + data = {'test': {'block_name': 'test', + 'timestamps': [1615394417.3590388], + 'data': {'key1': [1], + 'key2': [2.3], + 'key3': ["test"]}, + } + } + + expected = {'fields': {'key1': 1, 'key2': 2.3, 'key3': 'test'}, + 'measurement': 'test_address', + 'tags': {'feed': 'test_feed'}, + 'time': '2021-03-10T16:40:17.359039'} + assert format_data(data, feed, 'json')[0] == expected + + # Now test single timestamp structure + feed = {'agent_address': 'test_address', + 'feed_name': 'test_feed'} + data = {'test': {'block_name': 'test', + 'timestamp': 1615394417.3590388, + 'data': {'key1': 1, + 'key2': 2.3, + 'key3': "test"}, + } + } + + expected = {'fields': {'key1': 1, 'key2': 2.3, 'key3': 'test'}, + 'measurement': 'test_address', + 'tags': {'feed': 'test_feed'}, + 'time': '2021-03-10T16:40:17.359039'} + assert format_data(data, feed, 'json')[0] == expected + def test_format_data_inf_time(): """Test passing unrealistically large time."""