Source code for volttron.platform.web.authenticate_endpoint

import logging
import os
import re
from urllib.parse import urlparse

import jwt
from jinja2 import Environment, FileSystemLoader, select_autoescape
from passlib.hash import argon2
#from watchdog_gevent import Observer

from volttron.platform import get_home
from volttron.platform.agent.web import Response
from volttron.utils import VolttronHomeFileReloader
from volttron.utils.persistance import PersistentDict

_log = logging.getLogger(__name__)

__PACKAGE_DIR__ = os.path.dirname(os.path.abspath(__file__))
__TEMPLATE_DIR__ = os.path.join(__PACKAGE_DIR__, "templates")
__STATIC_DIR__ = os.path.join(__PACKAGE_DIR__, "static")


# Our admin interface will use Jinja2 templates based upon the above paths
# reference api for using Jinja2 http://jinja.pocoo.org/docs/2.10/api/
# Using the FileSystemLoader instead of the package loader in this case however.
tplenv = Environment(
    loader=FileSystemLoader(__TEMPLATE_DIR__),
    autoescape=select_autoescape(['html', 'xml'])
)


[docs]class AuthenticateEndpoints(object): def __init__(self, ssl_private_key): self._ssl_private_key = ssl_private_key self._userdict = None self.reload_userdict() # TODO Add back reload capability # self._observer = Observer() # self._observer.schedule( # FileReloader("web-users.json", self.reload_userdict), # get_home() # ) # self._observer.start()
[docs] def reload_userdict(self): webuserpath = os.path.join(get_home(), 'web-users.json') self._userdict = PersistentDict(webuserpath)
[docs] def get_routes(self): """ Returns a list of tuples with the routes for authentication. Tuple should have the following: - regular expression for calling the endpoint - 'callable' keyword specifying that a method is being specified - the method that should be used to call when the regular expression matches code: return [ (re.compile('^/csr/request_new$'), 'callable', self._csr_request_new) ] :return: """ return [ (re.compile('^/authenticate'), 'callable', self.get_auth_token) ]
[docs] def get_auth_token(self, env, data): """ Creates an authentication token to be returned to the caller. The response will be a text/plain encoded user :param env: :param data: :return: """ if env.get('REQUEST_METHOD') != 'POST': _log.warning("Authentication must use POST request.") return Response('', status='401 Unauthorized') assert len(self._userdict) > 0, "No users in user dictionary, set the master password first!" if not isinstance(data, dict): _log.debug("data is not a dict, decoding") decoded = dict((k, v if len(v) > 1 else v[0]) for k, v in urlparse.parse_qs(data).iteritems()) username = decoded.get('username') password = decoded.get('password') else: username = data.get('username') password = data.get('password') _log.debug("Username is: {}".format(username)) error = "" if username is None: error += "Invalid username passed" if not password: error += "Invalid password passed" if error: _log.error("Invalid parameters passed: {}".format(error)) return Response(error, status='401') user = self.__get_user(username, password) if user is None: _log.error("No matching user for passed username: {}".format(username)) return Response('', status='401') encoded = jwt.encode(user, self._ssl_private_key, algorithm='RS256').encode('utf-8') return Response(encoded, '200 OK', content_type='text/plain')
def __get_user(self, username, password): """ Retrieve user from the user store based upon username/password The hashed_password will not be returned with the value in the user object. If there is not a username/password that match return None. :param username: :param password: :return: """ user = self._userdict.get(username) if user is not None: hashed_pass = user.get('hashed_password') if hashed_pass and argon2.verify(password, hashed_pass): usr_cpy = user.copy() del usr_cpy['hashed_password'] return usr_cpy return None