Source code for volttron.utils

import re

from watchdog.events import PatternMatchingEventHandler

from volttron.platform import get_home
import logging

_log = logging.getLogger(__name__)


[docs]def is_ip_private(vip_address): """ Determines if the passed vip_address is a private ip address or not. :param vip_address: A valid ip address. :return: True if an internal ip address. """ ip = vip_address.strip().lower().split("tcp://")[1] # https://en.wikipedia.org/wiki/Private_network priv_lo = re.compile("^127\.\d{1,3}\.\d{1,3}\.\d{1,3}$") priv_24 = re.compile("^10\.\d{1,3}\.\d{1,3}\.\d{1,3}$") priv_20 = re.compile("^192\.168\.\d{1,3}.\d{1,3}$") priv_16 = re.compile("^172.(1[6-9]|2[0-9]|3[0-1]).[0-9]{1,3}.[0-9]{1,3}$") return priv_lo.match(ip) is not None or priv_24.match( ip) is not None or priv_20.match(ip) is not None or priv_16.match( ip) is not None
[docs]def get_hostname(): with open('/etc/hostname') as fp: hostname = fp.read().strip() assert hostname return hostname
[docs]class VolttronHomeFileReloader(PatternMatchingEventHandler): """ Extends PatternMatchingEvent handler to watch changes to a singlefile/file pattern within volttron home. filetowatch should be path relative to volttron home. For example filetowatch auth.json with watch file <volttron_home>/auth.json. filetowatch *.json will watch all json files in <volttron_home> """ def __init__(self, filetowatch, callback): super(VolttronHomeFileReloader, self).__init__([get_home() + '/' + filetowatch]) _log.debug("patterns is {}".format([get_home() + '/' + filetowatch])) self._callback = callback
[docs] def on_any_event(self, event): _log.debug("Calling callback on event {}. Calling {}".format(event, self._callback)) try: self._callback() except BaseException as e: _log.error("Exception in callback: {}".format(e)) _log.debug("After callback on event {}".format(event))
[docs]class AbsolutePathFileReloader(PatternMatchingEventHandler): """ Extends PatternMatchingEvent handler to watch changes to a singlefile/file pattern within volttron home. filetowatch should be path relative to volttron home. For example filetowatch auth.json with watch file <volttron_home>/auth.json. filetowatch *.json will watch all json files in <volttron_home> """ def __init__(self, filetowatch, callback): super(VolttronHomeFileReloader, self).__init__([filetowatch]) self._callback = callback self._filetowatch = filetowatch @property def watchfile(self): return self._filetowatch
[docs] def on_any_event(self, event): _log.debug("Calling callback on event {}. Calling {}".format(event, self._callback)) try: self._callback() except BaseException as e: _log.error("Exception in callback: {}".format(e)) _log.debug("After callback on event {}".format(event))