volttron.platform.dbutils package

Submodules

volttron.platform.dbutils.basedb module

exception volttron.platform.dbutils.basedb.ConnectionError[source]

Bases: Exception

Custom class for connection errors

class volttron.platform.dbutils.basedb.DbDriver(dbapimodule, **kwargs)[source]

Bases: object

Parent class used by sqlhistorian.historian.SQLHistorian to do the database operations. This class is inherited by - volttron.platform.dbutils.mysqlfuncts.MySqlFuncts - volttron.platform.dbutils.sqlitefuncts.SqlLiteFuncts

bulk_insert()[source]

Function to meet bulk insert requirements. This function can be overridden by historian drivers to yield the required method for data insertion during bulk inserts in the respective historians. In this generic case it will yield the single insert method :yields: insert method

close()[source]

Close connection to database :return:

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

Collect the aggregate data by querying the historian’s data store :param topic_ids: list of topic ids for which aggregation should be performed. :param agg_type: type of aggregation :param start: start time for query (inclusive) :param end: end time for query (exclusive) :return: a tuple of (aggregated value, count of records over which this aggregation was computed)

commit()[source]

Commit a transaction

Returns:True if successful, False otherwise
create_aggregate_store(agg_type, period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period> :param agg_type: The type of aggregation. (avg, sum etc.) :param period: The time period of aggregation :return: True if successful, False otherwise

cursor()[source]
execute_many(stmt, args, commit=False)[source]

Execute a sql statement with multiple args :param stmt: the statement to execute :param args: optional arguments :param commit: True if transaction should be committed. Defaults to False :return: count of the number of affected rows

execute_stmt(stmt, args=None, commit=False)[source]

Execute a sql statement :param stmt: the statement to execute :param args: optional arguments :param commit: True if transaction should be committed. Defaults to False :return: count of the number of affected rows

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id :return: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available :return: list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
get_aggregation_list()[source]

Return list of aggregation supported by the specific data store :return: list of aggregations

get_topic_map()[source]

Returns details of topics in database :return: two dictionaries. - First one maps topic_name.lower() to topic id and - Second one maps topic_name.lower() to topic name

insert_agg_meta(topic_id, metadata)[source]

Inserts metadata for aggregate topic :param topic_id: aggregate topic id for which metadata is inserted :param metadata: metadata :return: True if execution completes. Raises exception if connection to database fails

insert_agg_topic(topic, agg_type, agg_time_period)[source]

Insert a new aggregate topic :param topic: topic name to insert :param agg_type: type of aggregation :param agg_time_period: time period of aggregation :return: id of the topic inserted if insert was successful. Raises exception if unable to connect to database

insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate(agg_topic_id, agg_type, period, ts, data, topic_ids)[source]

Insert aggregate data collected for a specific time period into database. Data is inserted into <agg_type>_<period> table :param agg_topic_id: topic id :param agg_type: type of aggregation :param period: time period of aggregation :param ts: end time of aggregation period (not inclusive) :param data: computed aggregate :param topic_ids: topic ids or topic ids for which aggregate was computed :return: True if execution was successful, raises exception in case of connection failures

insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database :param table_name: name of the table into which the aggregate data needs to be inserted :return: sql insert/replace statement to insert aggregate data for a specific time slice :rtype: str

insert_data(ts, topic_id, data)[source]

Inserts data for topic :param ts: timestamp :param topic_id: topic id for which data is inserted :param data: data value :return: True if execution completes. raises Exception if unable to connect to database

insert_data_query()[source]
Returns:query string to insert data into database
insert_meta(topic_id, metadata)[source]

Inserts metadata for topic :param topic_id: topic id for which metadata is inserted :param metadata: metadata :return: True if execution completes. Raises exception if unable to connect to database

insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic(topic)[source]

Insert a new topic :param topic: topic to insert :return: id of the topic inserted if insert was successful. Raises exception if unable to connect to database

insert_topic_query()[source]
Returns:query string to insert a topic into database
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Optional function to manage database size. :param history_limit_timestamp: remove all data older than this timestamp :param storage_limit_gb: remove oldest data until database is smaller than this value.

query(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

Queries the raw historian data or aggregate data and returns the results of the query :param topic_ids: list of topic ids to query for. :param id_name_map: dictionary that maps topic id to topic name :param start: Start of query timestamp as a datetime. :param end: End of query timestamp as a datetime. :param agg_type: If this is a query for aggregate data, the type of aggregation ( for example, sum, avg) :param agg_period: If this is a query for aggregate data, the time period of aggregation :param skip: Skip this number of results. :param count: Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic :param order: How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST” :type start: datetime :type end: datetime :type skip: int :type count: int :type order: str :return: result of the query in the format: .. code-block:: python

{ topic_name:[(timestamp1, value1),

(timestamp2:,value2), …],
topic_name:[(timestamp1, value1),
(timestamp2:,value2), …],

…}

query_topics_by_pattern(topic_pattern)[source]

Return a map of {topic_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

read_tablenames_from_db(meta_table_name)[source]

Reads names of the tables used by this historian to store data, topics, metadata, aggregate topics and aggregate metadata :param meta_table_name: The volttron metadata table in which table definitions are stored :return: table names .. code-block:: python

{
‘data_table’: name of table that store data, ‘topics_table’:name of table that store list of topics, ‘meta_table’:name of table that store metadata, ‘agg_topics_table’:name of table that stores aggregate topics, ‘agg_meta_table’:name of table that store aggregate metadata }
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

rollback()[source]

Rollback a transaction :return: True if successful, False otherwise

select(query, args=None, fetch_all=True)[source]

Execute a select statement :param query: select statement :param args: arguments for the where clause :param fetch_all: Set to True if function should return retrieve all the records from cursors and return it. Set to False to return cursor. :return: resultant rows if fetch_all is True else returns the cursor It is up to calling method to close the cursor

setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic(agg_id, agg_topic_name)[source]

Update a aggregate topic name :param agg_id: topic id for which update is done :param agg_topic_name: new aggregate topic name :return: True if execution is complete. Raises exception if unable to connect to database

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic(topic, topic_id)[source]

Update a topic name :param topic: new topic name :param topic_id: topic id for which update is done :return: True if execution is complete. Raises exception if unable to connect to database

update_topic_query()[source]
Returns:query string to update a topic in database
volttron.platform.dbutils.basedb.closing(obj)[source]

volttron.platform.dbutils.crateutils module

volttron.platform.dbutils.crateutils.create_schema(connection, schema='historian', table_names={}, num_replicas='0-1', num_shards=6, use_v2=True)[source]
volttron.platform.dbutils.crateutils.drop_schema(connection, truncate_tables, schema=None, truncate=True)[source]
volttron.platform.dbutils.crateutils.insert_data_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.insert_topic_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.select_all_topics_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.select_topics_metadata_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.update_topic_query(schema, table_name)[source]

volttron.platform.dbutils.influxdbutils module

volttron.platform.dbutils.mongoutils module

volttron.platform.dbutils.mongoutils.get_agg_topic_map(client, agg_topics_collection)[source]
volttron.platform.dbutils.mongoutils.get_agg_topics(client, agg_topics_collection, agg_meta_collection)[source]
volttron.platform.dbutils.mongoutils.get_mongo_client(connection_params, **kwargs)[source]
volttron.platform.dbutils.mongoutils.get_tagging_queries_from_ast(tup, tag_refs, sub_queries)[source]
volttron.platform.dbutils.mongoutils.get_topic_map(client, topics_collection)[source]

volttron.platform.dbutils.mysqlfuncts module

class volttron.platform.dbutils.mysqlfuncts.MySqlFuncts(connect_params, table_names)[source]

Bases: volttron.platform.dbutils.basedb.DbDriver

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

Collect the aggregate data by querying the historian’s data store :param topic_ids: list of topic ids for which aggregation should be performed. :param agg_type: type of aggregation :param start: start time for query (inclusive) :param end: end time for query (exclusive) :return: a tuple of (aggregated value, count of records over which this aggregation was computed)

create_aggregate_store(agg_type, agg_time_period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period> :param agg_type: The type of aggregation. (avg, sum etc.) :param period: The time period of aggregation :return: True if successful, False otherwise

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id :return: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available :return: list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
get_aggregation_list()[source]

Return list of aggregation supported by the specific data store :return: list of aggregations

get_topic_map()[source]

Returns details of topics in database :return: two dictionaries. - First one maps topic_name.lower() to topic id and - Second one maps topic_name.lower() to topic name

init_microsecond_support()[source]
insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database :param table_name: name of the table into which the aggregate data needs to be inserted :return: sql insert/replace statement to insert aggregate data for a specific time slice :rtype: str

insert_data_query()[source]
Returns:query string to insert data into database
insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic_query()[source]
Returns:query string to insert a topic into database
query(topic_ids, id_name_map, start=None, end=None, skip=0, agg_type=None, agg_period=None, count=None, order='FIRST_TO_LAST')[source]

Queries the raw historian data or aggregate data and returns the results of the query :param topic_ids: list of topic ids to query for. :param id_name_map: dictionary that maps topic id to topic name :param start: Start of query timestamp as a datetime. :param end: End of query timestamp as a datetime. :param agg_type: If this is a query for aggregate data, the type of aggregation ( for example, sum, avg) :param agg_period: If this is a query for aggregate data, the time period of aggregation :param skip: Skip this number of results. :param count: Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic :param order: How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST” :type start: datetime :type end: datetime :type skip: int :type count: int :type order: str :return: result of the query in the format: .. code-block:: python

{ topic_name:[(timestamp1, value1),

(timestamp2:,value2), …],
topic_name:[(timestamp1, value1),
(timestamp2:,value2), …],

…}

query_topics_by_pattern(topic_pattern)[source]

Return a map of {topic_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

record_table_definitions(tables_def, meta_table_name)[source]
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

setup_aggregate_historian_tables(meta_table_name)[source]
setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic_query()[source]
Returns:query string to update a topic in database

volttron.platform.dbutils.postgresqlfuncts module

volttron.platform.dbutils.redshiftfuncts module

volttron.platform.dbutils.sqlitefuncts module

class volttron.platform.dbutils.sqlitefuncts.SqlLiteFuncts(connect_params, table_names)[source]

Bases: volttron.platform.dbutils.basedb.DbDriver

Implementation of SQLite3 database operation for sqlhistorian.historian.SQLHistorian and sqlaggregator.aggregator.SQLAggregateHistorian For method details please refer to base class volttron.platform.dbutils.basedb.DbDriver

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

This function should return the results of a aggregation query @param topic_ids: list of single topics @param agg_type: type of aggregation @param start: start time @param end: end time @return: aggregate value, count of number of records over which aggregation was computed

create_aggregate_store(agg_type, period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period> :param agg_type: The type of aggregation. (avg, sum etc.) :param period: The time period of aggregation :return: True if successful, False otherwise

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id :return: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available :return: list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
get_aggregation_list()[source]

Return list of aggregation supported by the specific data store :return: list of aggregations

static get_tagging_query_from_ast(topic_tags_table, tup, tag_refs)[source]

Get a query condition syntax tree and generate sqlite query to query topic names by tags. It calls the get_compound_query to parse the abstract syntax tree tuples and then fixes the precedence

Example: # User input query string :

campus.geoPostalCode=”20500” and equip and boiler and “equip_tag 7” > 4

# Example output sqlite query

SELECT topic_prefix from test_topic_tags WHERE tag=”campusRef”
and value IN(
SELECT topic_prefix from test_topic_tags WHERE tag=”campus” and value=1 INTERSECT SELECT topic_prefix from test_topic_tags WHERE tag=”geoPostalCode” and value=”20500”

)

INTERSECT SELECT topic_prefix from test_tags WHERE tag=”equip” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag=”boiler” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag = “equip_tag 7” and value > 4

Parameters:
  • topic_tags_table – table to query
  • tup – parsed query string (abstract syntax tree)
  • tag_refs – dictionary of ref tags and its parent tag
Returns:

sqlite query

:rtype str

get_topic_map()[source]

Returns details of topics in database :return: two dictionaries. - First one maps topic_name.lower() to topic id and - Second one maps topic_name.lower() to topic name

insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database :param table_name: name of the table into which the aggregate data needs to be inserted :return: sql insert/replace statement to insert aggregate data for a specific time slice :rtype: str

insert_data_query()[source]
Returns:query string to insert data into database
insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic_query()[source]
Returns:query string to insert a topic into database
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Manage database size. :param history_limit_timestamp: remove all data older than this timestamp :param storage_limit_gb: remove oldest data until database is smaller than this value.

query(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

This function should return the results of a query in the form:

{"values": [(timestamp1, value1), (timestamp2, value2), ...],
 "metadata": {"key1": value1, "key2": value2, ...}}

metadata is not required (The caller will normalize this to {} for you) @param topic_ids: topic_ids to query data for @param id_name_map: dictionary containing topic_id:topic_name @param start: @param end: @param agg_type: @param agg_period: @param skip: @param count: @param order:

query_topics_by_pattern(topic_pattern)[source]

Return a map of {topic_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

record_table_definitions(table_defs, meta_table_name)[source]
regex_select(query, args, fetch_all=True, cache_size=None)[source]
static regexp(expr, item)[source]
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

set_cache(cache_size)[source]
setup_aggregate_historian_tables(meta_table_name)[source]
setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic_query()[source]
Returns:query string to update a topic in database

volttron.platform.dbutils.sqlutils module

volttron.platform.dbutils.sqlutils.get_dbfuncts_class(database_type)[source]