Source code for volttron.platform.web.csr_endpoints

import re
import logging
import weakref

from volttron.platform import jsonapi
from volttron.platform.agent.utils import get_platform_instance_name
from volttron.platform.agent.web import Response
from volttron.platform.certs import Certs

_log = logging.getLogger(__name__)


[docs]class CSREndpoints(object): def __init__(self, core): self._core = weakref.ref(core) self._certs = Certs() self._auto_allow_csr = False @property def auto_allow_csr(self): return self._auto_allow_csr @auto_allow_csr.setter def auto_allow_csr(self, value): self._auto_allow_csr = value
[docs] def get_routes(self): """ Returns a list of tuples with the routes for authentication. Tuple should have the following: - regular expression for calling the endpoint - 'callable' keyword specifying that a method is being specified - the method that should be used to call when the regular expression matches code: return [ (re.compile('^/csr/request_new$'), 'callable', self._csr_request_new) ] :return: """ return [ (re.compile('^/csr/request_new$'), 'callable', self._csr_request_new) ]
def _csr_request_new(self, env, data): _log.debug("New csr request") if not isinstance(data, dict): try: request_data = jsonapi.loads(data) except: _log.error("Invalid data for csr request. Must be json serializable") return Response() else: request_data = data.copy() csr = request_data.get('csr') identity = self._certs.get_csr_common_name(str(csr)) # The identity must start with the current instances name or it is a failure. if not identity.startswith(get_platform_instance_name() + "."): json_response = dict(status="ERROR", message="CSR must start with instance name: {}".format( get_platform_instance_name())) Response(json.dumps(json_response), content_type='application/json', headers={'Content-type': 'application/json'}) csr_file = self._certs.save_pending_csr_request(env.get('REMOTE_ADDR'), identity, csr) if self._auto_allow_csr: _log.debug("Creating cert and permissions for user: {}".format(identity)) status = self._certs.get_csr_status(identity) json_response = dict(status=status) if status == 'APPROVED': json_response['cert'] = self._certs.get_cert_from_csr(identity) else: cert = self._certs.approve_csr(identity) permissions = self._core().rmq_mgmt.get_default_permissions(identity) self._core().rmq_mgmt.create_user_with_permissions(identity, permissions, True) json_response = dict( status="SUCCESSFUL", cert=cert ) else: status = self._certs.get_csr_status(identity) cert = self._certs.get_cert_from_csr(identity) json_response = dict(status=status) if status == "APPROVED": json_response['cert'] = self._certs.get_cert_from_csr(identity) elif status == "PENDING": json_response['message'] = "The request is pending admininstrator approval." elif status == "DENIED": json_response['message'] = "The request has been denied by the administrator." elif status == "UNKNOWN": json.response['message'] = "An unknown common name was specified to the server {}".format(identity) else: json_response['message'] = "An unkonwn error has occured during the respons phase" return Response(jsonapi.dumps(json_response), content_type='application/json', headers={'Content-type': 'application/json'})