# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
from __future__ import absolute_import
import errno
import inspect
import logging
import uuid
import weakref
from volttron.platform import jsonapi
import errno
from .base import SubsystemBase
from collections import defaultdict
import requests
from requests.packages.urllib3.connection import (ConnectionError,
NewConnectionError)
from volttron.platform import is_rabbitmq_available
from volttron.platform import jsonapi
from ..decorators import annotate, annotations, dualmethod, spawn
from ..errors import Unreachable
from ..results import ResultsDictionary
if is_rabbitmq_available():
import pika
__all__ = ['RMQPubSub']
min_compatible_version = '5.0'
max_compatible_version = ''
[docs]class RMQPubSub(SubsystemBase):
"""
Pubsub subsystem concrete class implementation for RabbitMQ message bus.
"""
def __init__(self, core, rpc_subsys, peerlist_subsys, owner):
self.core = weakref.ref(core)
self.rpc = weakref.ref(rpc_subsys)
self.peerlist = weakref.ref(peerlist_subsys)
self._owner = owner
self._logger = logging.getLogger(__name__)
self._results = ResultsDictionary()
self._message_number = 0
self._pubcount = dict()
self._isconnected = False
def subscriptions():
return defaultdict(set)
self._my_subscriptions = defaultdict(subscriptions)
def setup(sender, **kwargs):
# pylint: disable=unused-argument
core.onconnected.connect(self._connected)
def subscribe(member): # pylint: disable=redefined-outer-name
for peer, bus, prefix, all_platforms, queue_name in annotations(
member, set, 'pubsub.subscriptions'):
self._logger.debug("peer: {0}, prefix:{1}".format(peer, prefix))
routing_key = self._form_routing_key(prefix, all_platforms=all_platforms)
# If named queue, add "persistent" in the queue name
if queue_name:
queue_name = "{user}.pubsub.persistent.{queue_name}".format(user=self.core().rmq_user,
queue_name=queue_name)
else:
queue_name = "{user}.pubsub.{uid}".format(user=self.core().rmq_user,
uid=uuid.uuid4())
self._add_subscription(routing_key, member, queue_name)
# self._logger.debug("SYNC RMQ: all_platforms {}")
inspect.getmembers(owner, subscribe)
core.onsetup.connect(setup, self)
def _connected(self, sender, **kwargs):
"""
After connection to RMQ broker is established, synchronize local subscriptions with RMQ broker.
param sender: identity of sender
type sender: str
param kwargs: optional arguments
type kwargs: pointer to arguments
"""
# self.core().connection.channel.confirm_delivery(self.on_delivery_confirmation,nowait=True)
self._isconnected = True
self.synchronize()
[docs] def synchronize(self):
"""
Synchronize local subscriptions with RMQ broker.
:return:
"""
connection = self.core().connection
# self._logger.debug("Synchronize {}".format(self._my_subscriptions))
for prefix, subscriptions in self._my_subscriptions.items():
for queue_name, callback in subscriptions.items():
durable = False
auto_delete = True
# Check if queue needs to be persistent
if 'persistent' in queue_name:
durable = True
auto_delete = False
connection.channel.queue_declare(queue=queue_name,
durable=durable,
exclusive=True,
auto_delete=auto_delete,
callback=None)
connection.channel.queue_bind(exchange=connection.exchange,
queue=queue_name,
routing_key=prefix,
callback=None)
if prefix.startswith('__pubsub__.*.'):
original_prefix = self._get_original_topic(prefix)
self._send_proxy(original_prefix)
for cb in callback:
self._add_callback(connection, queue_name, cb)
return True
def _add_subscription(self, prefix, callback, queue_name):
"""
Store subscriptions so that it can used later
:param prefix: subscription prefix
:param callback: callback method
:param queue: queue name
:return:
"""
if not callable(callback):
raise ValueError('callback %r is not callable' % (callback,))
try:
self._my_subscriptions[prefix][queue_name].add(callback)
# _log.debug("SYNC: add subscriptions: {}".format(self._my_subscriptions['internal'][bus][prefix]))
except KeyError:
self._logger.error("PUBSUB something went wrong when adding subscriptions")
@dualmethod
@spawn
def subscribe(self, peer, prefix, callback, bus='', all_platforms=False, persistent_queue=None):
"""Subscribe to a prefix and register callback. If 'all_platforms' flag is set to True, then
agent subscribes to receive topic from all platforms. A named queue will set persistent
behavior to the topic subscriptions. That means even if the agent shutdowns and restarts, it
will receive all the messages during the shutdown/turn off period.
:param peer "pubsub" string
:type peer str
:param prefix prefix of the topic
:type prefix str
:param callback callback method
:type callback method
:param bus bus
:type bus str
:param all_platforms Flag indicating if type is 'local' or 'all'
:type all_platforms boolean
:param persistent_queue Name of the queue for persistent behavior
:type persistent_queue str
:returns: Subscribe is successful or not
:rtype: boolean
:Return Values:
Success or Failure
"""
result = None
connection = self.core().connection # bytes(uuid.uuid4())
routing_key = self._form_routing_key(prefix, all_platforms=all_platforms)
if all_platforms:
# Send message to proxy agent in order to subscribe with zmq message bus
self._send_proxy(prefix)
queue_name = ''
durable = False
auto_delete = True
if persistent_queue:
durable = True
auto_delete = False
queue_name = "{user}.pubsub.persistent.{queue_name}".format(user=self.core().rmq_user,
queue_name=persistent_queue)
else:
queue_name = "{user}.pubsub.{uid}".format(user=self.core().rmq_user, uid=str(uuid.uuid4()))
# Store subscriptions for later use
self._add_subscription(routing_key, callback, queue_name)
self._logger.debug("RMQ PUBSUB subscribing to {}".format(routing_key))
try:
connection.channel.queue_declare(callback=None,
queue=queue_name,
durable=durable,
exclusive=False,
auto_delete=auto_delete)
connection.channel.queue_bind(callback=None,
exchange=connection.exchange,
queue=queue_name,
routing_key=routing_key)
self._add_callback(connection, queue_name, callback)
except AttributeError as ex:
self._logger.error("Subscription will be added when agent gets connected to messagebus."
.format(self.core().identity))
return result
def _send_proxy(self, prefix, bus=''):
"""
Send the message to proxy router
:param prefix:
:param bus:
:return:
"""
connection = self.core().connection
rkey = self.core().instance_name + '.proxy.router.pubsub'
sub_msg = jsonapi.dumps(
dict(prefix=prefix, bus=bus, all_platforms=True)
)
# VIP format - [SENDER, RECIPIENT, PROTO, USER_ID, MSG_ID, SUBSYS, ARGS...]
frames = [self.core().identity, '', 'VIP1', '', '', 'pubsub', 'subscribe', sub_msg]
connection.channel.basic_publish(exchange=connection.exchange,
routing_key=rkey,
body=jsonapi.dumps(frames, ensure_ascii=False))
def _add_callback(self, connection, queue, callback):
"""
Register agent's callback method with RabbitMQ broker
:param connection: RabbitMQ connection object
:param queue: queue name
:param callback: callback method
:return:
"""
def rmq_callback(ch, method, properties, body):
# Strip prefix from routing key
topic = self._get_original_topic(str(method.routing_key))
try:
msg = jsonapi.loads(body)
headers = msg['headers']
message = msg['message']
bus = msg['bus']
sender = msg['sender']
self.core().spawn(callback, 'pubsub', sender, bus, topic, headers, message)
except KeyError as esc:
self._logger.error("Missing keys in pubsub message {}".format(esc))
connection.channel.basic_consume(rmq_callback,
queue=queue,
no_ack=True)
[docs] @subscribe.classmethod
def subscribe(cls, peer, prefix, bus='', all_platforms=False, persistent_queue=None):
"""
Class method for subscribe
:param peer: "pubsub" string
:param prefix: prefix of the topic
:param bus: bus
:param all_platforms: Flag indicating if type is 'local' or 'all'
:param persistent_queue: Name of the queue for persistent behavior
:return:
"""
def decorate(method):
annotate(method, set, 'pubsub.subscriptions', (peer, bus, prefix, all_platforms, persistent_queue))
return method
return decorate
[docs] def list(self, peer, prefix='', bus='', subscribed=True, reverse=False, all_platforms=False):
"""Gets list of subscriptions matching the prefix
param peer: peer
type peer: str
param prefix: prefix of a topic
type prefix: str
param bus: bus
type bus: bus
param subscribed: subscribed or not
type subscribed: boolean
param reverse: reverse
type reverse:
:returns: List of subscriptions, i.e, list of tuples of bus, topic and flag to indicate if peer is a
subscriber or not
:rtype: list of tuples
:Return Values:
List of tuples [(bus, topic, flag to indicate if peer is a subscriber or not)]
"""
async_result = next(self._results)
results = []
if reverse:
test = prefix.startswith
else:
test = lambda t: t.startswith(prefix)
try:
bindings = self.core().rmq_mgmt.get_bindings('volttron')
except (requests.exceptions.HTTPError, ConnectionError, NewConnectionError) as e:
self._logger.error("Error making request to RabbitMQ Management interface.\n"
"Check Connection Parameters: {} \n".format(e))
else:
try:
items = [(b['destination'], self._get_original_topic(b['routing_key']))
for b in bindings if b['routing_key'].startswith('__pubsub__')]
except KeyError as e:
return async_result
for item in items:
peer = item[0]
topic = item[1]
if test(topic):
member = self.core().identity in peer
if not subscribed or member:
results.append(('', topic, member))
self.core().spawn_later(0.01, self.set_result, async_result.ident, results)
return async_result
[docs] def publish(self, peer, topic, headers=None, message=None, bus=''):
"""Publish a message to a given topic via a peer.
Publish headers and message to all subscribers of topic on bus.
If peer is None, use self. Adds volttron platform version
compatibility information to header as variables
min_compatible_version and max_compatible version
param peer: peer
type peer: str
param topic: topic for the publish message
type topic: str
param headers: header info for the message
type headers: None or dict
param message: actual message
type message: None or any
param bus: bus
type bus: str
return: Number of subscribers the message was sent to.
:rtype: int
:Return Values:
Number of subscribers
"""
result = next(self._results)
self._pubcount[self._message_number] = result.ident
self._message_number += 1
routing_key = self._form_routing_key(topic)
connection = self.core().connection
self.core().spawn_later(0.01, self.set_result, result.ident, 1)
if headers is None:
headers = {}
headers['min_compatible_version'] = min_compatible_version
headers['max_compatible_version'] = max_compatible_version
# self._logger.debug("RMQ PUBSUB publish message To. {0}, {1}, {2}, {3} ".format(routing_key,
# self.core().identity,
# message,
# topic))
# VIP format - [SENDER, RECIPIENT, PROTO, USER_ID, MSG_ID, SUBSYS, ARGS...]
dct = {
# 'user_id': self.core().identity,
'app_id': connection.routing_key, # SENDER
'headers': dict(recipient='', # RECEIVER
proto='VIP', # PROTO
user=self.core().identity, # USER_ID
),
'message_id': result.ident, # MSG_ID
'type': 'pubsub', # SUBSYS
'content_type': 'application/json'
}
properties = pika.BasicProperties(**dct)
json_msg = dict(sender=self.core().identity, bus=bus, headers=headers, message=message)
try:
connection.channel.basic_publish(exchange=connection.exchange,
routing_key=routing_key,
properties=properties,
body=jsonapi.dumps(json_msg, ensure_ascii=False))
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as exc:
self._isconnected = False
raise Unreachable(errno.EHOSTUNREACH, "Connection to RabbitMQ is lost",
'rabbitmq broker', 'pubsub')
return result
[docs] def set_result(self, ident, value=None):
try:
result = self._results.pop(ident)
if result:
result.set(value)
except KeyError:
pass
[docs] def on_delivery_confirmation(self, method_frame):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
is an integer counter indicating the message number that was sent
on the channel via Basic.Publish. Here we're just doing house keeping
to keep track of stats and remove message numbers that we expect
a delivery confirmation of from the list used to keep track of messages
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
try:
delivery_number = method_frame.method.delivery_tag
self._logger.info("PUBSUB Delivery confirmation {0}, pending {1}, ".
format(method_frame.method.delivery_tag, len(self._pubcount)))
ident = self._pubcount.pop(delivery_number, None)
if ident:
result = None
try:
result = self._results.pop(ident)
if result:
result.set(delivery_number)
except KeyError:
pass
except TypeError:
pass
[docs] def unsubscribe(self, peer, prefix, callback, bus='', all_platforms=False):
"""Unsubscribe and remove callback(s).
Remove all handlers matching the given info - peer, callback and bus,
which was used earlier to subscribe as well. If all handlers for a
topic prefix are removed, the topic is also unsubscribed.
param peer: peer
type peer: str
param prefix: prefix that needs to be unsubscribed
type prefix: str
param callback: callback method
type callback: method
param bus: bus
type bus: bus
return: success or not
:rtype: boolean
:Return Values:
success or not
"""
routing_key = None
result = next(self._results)
if prefix is not None:
routing_key = self._form_routing_key(prefix, all_platforms=all_platforms)
topics = self._drop_subscription(routing_key, callback)
self.core().spawn_later(0.01, self.set_result, result.ident, topics)
# Send the message to proxy router to send it to external 'zmq' platforms
if all_platforms:
subscriptions = dict()
subscriptions['all'] = dict(prefix=topics, bus=bus)
rkey = self.core().instance_name + '.proxy.router.pubsub'
frames = [self.core().identity, '', 'VIP1', '', '', 'pubsub',
'unsubscribe', jsonapi.dumps(subscriptions)]
self.core().connection.channel.basic_publish(exchange=self.core().connection.exchange,
routing_key=rkey,
body=frames)
return result
def _drop_subscription(self, routing_key, callback):
"""
Utility method to remove subscription
:param routing_key: routing key
:param callback: callback method
:return:
"""
self._logger.debug("DROP subscriptions: {}".format(routing_key))
topics = []
remove = []
remove_topics = []
if routing_key is None:
if callback is None:
for prefix in self._my_subscriptions:
subscriptions = self._my_subscriptions[prefix]
for queue_name in list(subscriptions):
self.core().connection.channel.queue_delete(
callback=None, queue=queue_name)
subscriptions.pop(queue_name)
topics.append(prefix)
else:
# Traverse through all subscriptions to find the callback
for prefix in self._my_subscriptions:
subscriptions = self._my_subscriptions[prefix]
self._logger.debug("prefix: {0}, {1}".format(prefix, subscriptions))
for queue_name, callbacks in subscriptions.items():
try:
callbacks.remove(callback)
except KeyError:
pass
else:
topics.append(prefix)
if not callbacks:
# Delete queue
self.core().connection.channel.queue_delete(callback=None, queue=queue_name)
remove.append(queue_name)
for que in remove:
del subscriptions[que]
del remove[:]
if not subscriptions:
remove_topics.append(prefix)
for prefix in remove_topics:
del self._my_subscriptions[prefix]
if not topics:
raise KeyError('no such subscription')
self._logger.debug("my subscriptions: {0}".format(self._my_subscriptions))
else:
# Search based on routing key
if routing_key in self._my_subscriptions:
self._logger.debug("RMQ subscriptions {}".format(self._my_subscriptions))
topics.append(routing_key)
subscriptions = self._my_subscriptions[routing_key]
if callback is None:
for queue_name, callbacks in subscriptions.items():
self._logger.debug("RMQ queues {}".format(queue_name))
self.core().connection.channel.queue_delete(callback=None, queue=queue_name)
del self._my_subscriptions[routing_key]
else:
self._logger.debug("topics: {0}".format(topics))
for queue_name, callbacks in subscriptions.items():
try:
callbacks.remove(callback)
except KeyError:
pass
if not callbacks:
# Delete queue
self.core().connection.channel.queue_delete(callback=None, queue=queue_name)
remove.append(queue_name)
for que in remove:
del subscriptions[que]
if not subscriptions:
del self._my_subscriptions[routing_key]
self._logger.debug("my subscriptions: {0}".format(self._my_subscriptions))
orig_topics = []
# Strip '__pubsub__.<instance_name>' from the topic string
for topic in topics:
orig_topics.append(self._get_original_topic(topic))
# self._logger.debug("AFTER DROP topics: {}".format(orig_topics))
return orig_topics
def _get_original_topic(self, routing_key):
"""
Replace '.' delimiter with '/'
:param routing_key: routing_key string
:return: return original topic string
"""
try:
original_topic = routing_key.split('.')[2:]
original_topic = original_topic[:-1]
original_topic = '/'.join(original_topic)
return original_topic
except IndexError as exc:
return routing_key
def _form_routing_key(self, topic, all_platforms=False):
"""
Form routing key from the original topic
:param topic: Original topic
:param all_platforms: Flag indicating if it is intended for all platforms
:return: Routing key string
"""
routing_key = ''
topic = '#' if topic == '' else topic + '.#'
if all_platforms:
# Format is '__pubsub__.*.<prefix>.#'
routing_key = "__pubsub__.*.{}".format(topic.replace("/", "."))
else:
routing_key = "__pubsub__.{0}.{1}".format(self.core().instance_name, topic.replace("/", "."))
return routing_key