mongodb.historian module

class mongodb.historian.MongodbHistorian(connection, tables_def=None, initial_rollup_start_time=None, rollup_query_start=None, rollup_topic_pattern=None, rollup_query_end=1, periodic_rollup_frequency=1, periodic_rollup_initial_wait=0.25, **kwargs)[source]

Bases: volttron.platform.agent.base_historian.BaseHistorian

Historian that stores the data into mongodb collections.

add_metadata_to_query_result(agg_type, multi_topic_query, topic, topic_ids, values)[source]

Adds metadata to query results. If query is based on multiple topics does not add any metadata. If query is based on single topic return the metadata of it. If topic is an aggregate topic, returns the metadata of underlying topic :param agg_type: :param multi_topic_query: :param topic: :param topic_ids: :param values: :return:

add_raw_data_results(db, topic_name, values, pipeline, add_to_beginning)[source]
static bulk_write_rolled_up_data(collection_name, requests, ids, db)[source]

Handle bulk inserts into daily or hourly roll up table. :param collection_name: name of the collection on which the bulk operation should happen :param requests: array of bulk write requests :param ids: array of data collection _ids that are part of the bulk write requests :param db: handle to database :return: emptied request array, ids array, and True if there were errors during write operation or False if there was none

closing_mongo_client(sender, **kwargs)[source]
get_last_updated_data(db, collection)[source]
historian_setup()[source]

Optional setup routine, run in the processing thread before main processing loop starts. Gives the Historian a chance to setup connections in the publishing thread.

initialize_daily(topic_id, ts)[source]
initialize_hourly(topic_id, ts)[source]
static insert_to_daily(db, data_id, topic_id, ts, value)[source]
static insert_to_hourly(db, data_id, topic_id, ts, value)[source]
json_string_to_dict(value)[source]

Verify if the value was converted to json string at the time of storing into db. If so, convert it back to dict and return :param value: :return:

manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Remove documents older than history_limit_timestamp from all collections when history_limit_days is specified in the agent configuration. storage_limit_gb is ignored.

periodic_rollup()[source]
publish_to_historian(to_publish_list)[source]

Main publishing method for historian Agents.

Parameters

to_publish_list (list) – List of records

to_publish_list takes the following form:

[
    {
        'timestamp': timestamp1.replace(tzinfo=pytz.UTC),
        'source': 'scrape',
        'topic': "pnnl/isb1/hvac1/thermostat",
        'value': 73.0,
        'meta': {"units": "F", "tz": "UTC", "type": "float"}
    },
    {
        'timestamp': timestamp2.replace(tzinfo=pytz.UTC),
        'source': 'scrape',
        'topic': "pnnl/isb1/hvac1/temperature",
        'value': 74.1,
        'meta': {"units": "F", "tz": "UTC", "type": "float"}
    },
    ...
]

The contents of meta is not consistent. The keys in the meta data values can be different and can change along with the values of the meta data. It is safe to assume that the most recent value of the “meta” dictionary are the only values that are relevant. This is the way the cache treats meta data.

Once one or more records are published either BaseHistorianAgent.report_all_handled() or BaseHistorianAgent.report_handled() must be called to report records as being published.

query_aggregate_topics()[source]

This function is called by BaseQueryHistorianAgent.get_aggregate_topics() to find out the available aggregates in the data store

Returns

List of tuples containing (topic_name, aggregation_type, aggregation_time_period, metadata)

Return type

list

query_historian(topic, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

Returns the results of the query from the mongo database.

This historian stores data to the nearest second. It will not store subsecond resolution data. This is an optimisation based upon storage for the database. Please see volttron.platform.agent.base_historian.BaseQueryHistorianAgent.query_historian() for input parameters and return value details

query_topic_data(topic_id, id_name_map, collection_name, start, end, query_start, query_end, count, skip_count, order_by, use_rolled_up_data, values)[source]
query_topic_list()[source]

This function is called by BaseQueryHistorianAgent.get_topic_list() to actually topic list from the data store.

Returns

List of topics in the data store.

Return type

list

query_topics_by_pattern(topics_pattern)[source]

Find the list of topics and its id for a given topic_pattern

Returns

returns list of dictionary object {topic_name:id}

query_topics_metadata(topics)[source]

This function is called by BaseQueryHistorianAgent.get_topics_metadata() to find out the metadata for the given topics

Parameters

topics (str or list) – single topic or list of topics

Returns

dictionary with the format

{topic_name: {metadata_key:metadata_value, ...},
topic_name: {metadata_key:metadata_value, ...} ...}
Return type

dict

record_table_definitions(meta_table_name)[source]

Record the table or collection names in which data, topics and metadata are stored into the metadata table. This is essentially information from information from configuration item ‘table_defs’. The metadata table contents will be used by the corresponding aggregate historian(if any)

Parameters

meta_table_name – table name into which the table names and

table name prefix for data, topics, and meta tables should be inserted

starting_mongo(sender, **kwargs)[source]
update_values(data, topic_name, start, end, values)[source]
static value_to_sumable(value)[source]
verify_use_of_rolledup_data(start, end, topics_list)[source]

See if we can use rolled up data only be done with version >2, with valid time period verify start is >= from when rolled up data is available verify end date is < current time - configured lag time (config.rollup_query_end) this is to account for any lag between the main historian thread and the thread that periodically rolls up data. Also check rolled up data exists for the topics queried :param start: query start time :param end: query end time :param topics_list: list of topics in queried :return:

version()[source]

Return the current version number of the historian :return: version number

mongodb.historian.dumps(data)[source]
mongodb.historian.historian(config_path, **kwargs)[source]

This method is called by the mongodb.historian.main() to parse the passed config file or configuration dictionary object, validate the configuration entries, and create an instance of MongodbHistorian

Parameters
  • config_path – could be a path to a configuration file or can be a dictionary object

  • kwargs – additional keyword arguments if any

Returns

an instance of MongodbHistorian

mongodb.historian.loads(data_string)[source]
mongodb.historian.main(argv=['/home/docs/checkouts/readthedocs.org/user_builds/cs-volttron/envs/rtd_video_embed/lib/python3.6/site-packages/sphinx/__main__.py', '-T', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '_build/html'])[source]

Main method called by the eggsecutable. @param argv: