跳转到: 导航, 搜索

Oslo/Messaging

Oslo 消息传递库提供了两个独立的 API

  1. oslo.messaging.rpc 用于实现客户端-服务器远程过程调用
  2. oslo.messaging.notify 用于发出和处理事件通知

它们属于同一个库,主要是出于历史原因——oslo.messaging.notify 的最常见的传输机制是 oslo.messaging.rpc 使用的传输机制。

注意:以下文章是一份设计文档,早于库的实际实现。实现与设计密切对应,因此该文章仍然是有用的背景信息,但请参阅 完整的 API 文档以获取更多详细信息

目标

随着 OpenStack 的发展,消息传递对于该项目越来越重要,我们继续围绕 RPC 的使用来设计项目的架构。

然而,RPC API 当前严重依赖 AMQP 概念,并且对推荐用法几乎没有限制。对于那些设计组件之间关系但又不希望深入了解 AMQP 的人来说,这令人沮丧。设计不良的服务内接口可能会成为我们面临的真正问题,因为这些接口必须以向后兼容的方式演进,否则可能会给操作员带来升级时的严重问题。

此外,虽然 API 的两个最常用的实现——kombu/rabbitmq 和 qpid——都是 AMQP 实现,但我们预计 zmq 实现会随着时间的推移被其他非 AMQP 实现加入。每次项目以 AMQP 特定的方式使用 RPC API 时,非 AMQP 实现都会进一步落后。我们需要一个清晰的抽象,以便服务希望采用的每种新的消息传递模式都必须以抽象形式建模,以便非 AMQP 实现可以支持它。

最后,我们希望将这些 API 作为适当的独立库发布,但又不希望锁定自己以保留当前 API 的向后兼容性。

本提案最初不会涵盖例如 ceilometer 和 cells 所需的所有用例,但最终会涵盖。我们只需要首先专注于为一些更常见的用例设计正确。

当前状态

API 设计已从以下描述中有所发展,并在 oslo.messaging 仓库中实现。另请参阅从内联文档字符串生成的 API 文档

此时,API 设计已经相当成熟,但我们还没有实现任何传输驱动程序。

oslo.messaging.rpc

RPC API 定义了寻址、过程调用、广播调用、返回值和错误处理的语义。

有多个后端传输驱动程序实现 API 语义,使用不同的消息传递系统——例如 RabbitMQ、Qpid、ZeroMQ。虽然连接双方必须使用以相同方式配置的相同传输驱动程序,但 API 避免暴露传输细节,以便使用一种传输编写的代码应该可以与任何其他传输一起工作。

对此原则的例外是,API 暴露了传输特定的配置,用于如何连接到消息传递系统本身。

概念

  • 服务器:服务器使 RPC 接口可供客户端使用
  • 客户端:客户端在服务器上调用方法
  • 交换器:项目中的每个主题的范围所在的容器
  • 主题:主题是 RPC 接口的标识符;服务器侦听主题上的方法调用;客户端在主题上调用方法
  • 命名空间:服务器可以在一个主题上公开多个方法集,其中每个方法集都在一个命名空间下限定。有一个默认的空命名空间。
  • 方法:方法具有名称和多个命名(即“关键字”)参数
  • API 版本:每个命名空间都与一个版本号相关联,每次接口更改时都会增加该版本号。向后兼容的更改表示为增加的次要编号,不兼容的更改表示为增加的主要编号。服务器可以同时支持多个不兼容的版本。客户端可以在调用方法时请求它们需要的最低版本。
  • 传输:一种底层的消息传递系统,它将 RPC 请求传递给服务器并将回复返回给客户端,而无需客户端和服务器了解消息传递系统的任何具体细节

用例

注意:如果列表中的某个用例缺失,请考虑该用例是否由 oslo.messaging.notify API 涵盖。如果是这样,那么 oslo.messaging 必须在内部支持该用例,但不一定由公共面向的 oslo.messaging.rpc API 支持。是的,这很令人困惑。

在多个服务器中的一个上调用方法

这是我们有多个服务器侦听交换器中的主题上的方法调用的情况,并且这两个交换器和主题名称都为客户端所知。当客户端在此主题上调用方法时,该调用将以轮询方式分派到正在侦听的服务器之一。

此用例的示例

  • nova-api 调用 'scheduler' 主题在 'nova' 交换器中的 'run_instance' 方法,nova-scheduler 服务之一处理它
  • nova-compute 调用 'nova' 交换器中的 'conductor' 主题上的 'instance_update' 方法,nova-conductor 服务之一处理它
  • nova-api 调用 'nova' 交换器中的 'network' 主题上的 'add_dns_entry' 方法,nova-network 服务之一处理它
  • cinder-api 调用 'cinder' 交换器中的 'scheduler'[1] 主题上的 'run_instance' 方法,cinder-scheduler 服务之一处理它
  • quantum-linuxbridge-agent 调用 'quantum' 交换器中的 'plugin'[2] 主题上的 'update_device_up' 方法,linuxbridge 插件中的 quantum-server 服务之一处理它
  • quantum-dhcp-agent 调用 'quantum' 交换器中的 'q-plugin' 主题上的 'get_dhcp_port' 方法,quantum-server 服务之一处理它
  • quantum-lbaas-agent 调用 'quantum' 交换器中的 'q-loadbalancer-plugin' 主题上的 'plug_vip_port' 方法,quantum-server 服务之一处理它
  • heat-api 调用 'heat' 交换器中的 'engine' 主题上的 'identify_stack' 方法,heat-engine 服务处理它
[1] - cinder 当前使用主题名称 'cinder-scheduler',因为 bug #1173552
[2] - quantum 当前使用主题名称 'q-plugin',但可能没有理由像这样为其加前缀
[3] - 这将在 Heat 支持 多个 heat-engine 服务器时更改

在特定的服务器上调用方法

这是我们有多个服务器侦听交换器中的主题上的方法调用的情况,并且这两个交换器和主题名称都为客户端所知。但是,在这种情况下,客户端希望在这些服务器中的特定服务器上调用方法。

此用例的示例

  • nova-scheduler 选择 'foobar' 作为运行新实例的主机,因此它在 'nova' 交换器中的 'compute' 主题上调用 'foobar' 服务器上的 'run_instance' 方法,nova-compute 服务在 'foobar' 上启动实例
  • nova-api 收到终止在主机 'foobar' 上运行的实例的请求,因此它在 'nova' 交换器中的 'compute' 主题上调用 'foobar' 服务器上的 'terminate_instance' 方法,nova-compute 服务在 'foobar' 上处理它
  • nova-dhcpbridge dnsmasq 助手在主机 'foobar' 上运行,当 DHCP 租约到期时,它在 'nova' 交换器中的 'network' 主题上调用 'foobar' 服务器上的 'release_fixed_ip',同一主机上的 nova-network 服务处理它
  • cinder-scheduler 选择 'foobar' 作为创建新卷的卷节点,因此它在 'cinder' 交换器中的 'volume'[1] 主题上调用 'foobar' 服务器上的 'create_volume' 方法,'foobar' 上运行的 cinder-volume 服务处理它
  • quantum-server 在 'quantum' 交换器中的 'dhcp_agent' 主题上调用 'foobar' 服务器上的 'port_delete_end','foobar' 上运行的 quantum-dhcp-agent 处理它
[1] - 同样,cinder 当前使用主题名称 'cinder-volume',因为 bug #1173552

在多个服务器的所有服务器上调用方法

这是我们有多个服务器侦听交换器中的主题上的方法调用的情况,并且这两个交换器和主题名称都为客户端所知。在这种情况下,客户端希望在这些服务器的所有服务器上调用方法。

  • nova-compute 定期在 'nova' 交换器中的 'scheduler' 主题上以扇出模式调用 'update_service_capabilities' 方法,所有 nova-scheduler 服务都处理它
  • 在启动时,nova-scheduler 在 'nova' 交换器中的 'compute' 主题上以扇出模式调用 'publish_service_capabilities' 方法,所有 nova-compute 服务都处理它
  • 当需要更新 DNS 条目时,nova-network 在 'nova' 交换器中的 'network' 主题上以扇出模式调用 'update_dns' 方法,所有 nova-network 服务都处理它
  • quantum-server 在 'quantum' 交换器中的 'q-agent-notifier-network-delete' 主题上以扇出模式调用 'network_delete' 方法,所有 quantum-linuxbridge-agent 服务都处理它

方法调用类型

有两种方法可以调用方法

  1. cast - 该方法异步调用,不将结果返回给调用者
  2. call - 该方法同步调用,并将结果返回给调用者

请注意,调用者不必知道方法是使用 cast 还是 call 调用。

另请注意,这里没有提到 multicall——“multicall”是方法调用目标的一个属性,而不是调用类型,但是不可能执行扇出 call。

最后,请注意,我们之前有一个 multicall 概念,其中该方法同步调用,并返回多个结果给调用者,每个结果在准备好时返回。但是,由于我们目前没有此用例,因此我们停止支持它。

传输

API 足够抽象,可以由多个消息传递系统实现。目前,我们有基于 AMQP 的两个实现(kombu/rabbitmq 和 qpid)以及一个 ZeroMQ 实现。这些消息传递系统可以被认为是传输。

API 不会暴露任何传输特定的语义,除了如何配置传输特定的配置,该配置描述了客户端或服务器应“连接到”传输的方式。

让我们看一下我们当前支持的每个传输的配置参数。

Kombu/RabbitMQ

配置参数主要描述了如何连接到 RabbitMQ 代理

  • host
  • port
  • userid
  • password
  • virtual_host

我们有一些参数来控制如果连接到代理失败,我们的重试行为

  • retry_interval
  • retry_backoff
  • max_retries

我们有一些 SSL 相关的配置参数

  • use_ssl
  • ssl_version
  • keyfile
  • certfile
  • ca_certs

我们有一些高度 AMQP 相关的参数,允许配置持久的和/或镜像的 AMQP 队列

  • durable_queues
  • ha_queues

最终有趣的细节是,我们支持配置多个 RabbitMQ 代理主机名,以防您正在使用代理集群

  • hosts = [$host]

Qpid

Qpid 具有非常相似的传输配置参数。

基本的代理连接信息

  • hostname
  • port
  • username
  • password
  • protocol (tcp 或 ssl)

更多晦涩的配置参数

  • sasl_mechanisms
  • heartbeat
  • nodelay

并且,再次支持集群中的多个主机

  • hosts = [$host]

ZeroMQ

描述我们应该如何侦听 zmq 连接

  • bind_address
  • host
  • port
  • ipc_dir

调整/微调

  • contexts
  • topic_backlog

Matchmaker 配置

  • matchmaker(即使用哪个驱动程序)

特定于 ringfile matchmaker

  • ringfile(指向 json 文件的路径)

特定于 redis matchmaker

  • host
  • port
  • password
  • heartbeat_freq
  • heartbeat_ttl

传输 URL

有趣的是,kombu 支持以 URL 的形式提供传输配置参数

amqp://$userid:$password@$host:$port/$virtual_host

它还使用查询参数来设置一些选项,例如

amqp://$userid:$password@$host:$port/$virtual_host?ssl=1

我们将使用类似的模式来描述传输配置。在标准 URL 属性对特定传输没有任何意义的退化情况下,我们可以执行

weirdo:///?param1=foo&param2=bar

由于某些传输将支持配置多个 RabbitMQ 代理主机名的集群场景,因此我们将支持使用一组 URL 来配置传输。

目标

方法在目标上调用。目标由以下参数描述

  • exchange(默认为 CONF.control_exchange)
  • topic
  • server(可选)
  • fanout(默认为 False)
  • namespace(可选)
  • API 版本(可选)

在主题名称中使用星号在 AMQP 中具有特殊含义,其中服务器可以绑定到例如 'topic.subtopic.*' 并接收发送到 'topic.subtopic.foo' 主题的消息。我们似乎没有在 OpenStack 中使用这种 AMQP 特定的通配符主题匹配,因此我们不应该允许在服务器端 API 中使用这样的主题名称。

注意:能够将一些目标参数包含在传输 URL 中(交换器?主题?)可能并非荒谬,尤其是在你想要一个“如何联系此其他服务”配置参数的情况下,例如

glance_transport = kombu://me:secret@foobar:3232//glance/notifications

客户端 API

我们使用一个客户端代理类,它封装了将方法调用发送到特定服务器端接口的过程

# in nova
class BaseAPIClient(messaging.RPCClient):

    def __init__(self, transport):
        target = rpc.Target(topic='blaa', version='1.0', namespace='baseapi')
        super(BaseAPIClient, self).__init__(transport, target)

    def ping(self, context, arg, timeout=None):
        cctxt = self.prepare(timeout=timeout)
        return cctxt.call('ping', context, arg)

    def get_backdoor_port(self, context, host):
        cctxt = self.prepare(version='1.1', server=host)
        return self.call('get_backdoor_port', context)
# in oslo.messaging.rpc.client
class _CallContext(object):

    def __init__(self, transport, target, timeout=None, ...):
        ...
        self.transport = transport
        self.target = target
        self.timeout = timeout
        ...

    def call(self, method, ...):
        print('calling %(topic)s.%(method)s version %(version)s' %
              dict(topic=self.target.topic, method=method,
                   version=self.target.version))
        msg = self._make_message(method, ...)
        return self.transport.send(self.target, msg, wait_for_reply=True)
    ...

class RPCClient(object):

    def __init__(self, transport, target, ...):
        self.transport = transport
        self.target = target
        ...
        super(RPCClient, self).__init__()

    def prepare(self, server=None, version=None,
                timeout=None, ...):
        target = self.target(server=server, version=version)
        ...
        return _CallContext(self.transport, target, timeout, ...)

    def call(self, method, **kwargs):
        return self.prepare().call(method, **kwargs)

    def cast(self, method, **kwargs):
        self.prepare().cast(method, **kwargs)

我们在这里避免的主要问题是调用者永远不会看到将在网络上传输的消息负载。 消息在网络上的格式应完全由传输驱动程序决定,API 用户不应考虑消息本身。 如果我们可以将 call() 参数限制为方法和方法参数,那么我们就不需要像现在这样使用 make_msg() 方法了。

服务器端 API

在服务器端,我们有一个服务器对象,它封装了侦听目标(交换器、主题、主机)部分的过程,并允许为每个暴露的(命名空间、版本)注册 API 端点。

请注意,无论客户端通过将目标缩小到特定主机还是将其扩展到扇出模式来调用方法,服务器都可以透明地处理。

server = eventlet.EventletRPCServer(transport, target, [manager, baseapi])
server.start()
...
server.stop()
server.close()

基本的服务器类看起来像

# in oslo.messaging
class MessageHandlingServer(object):

    def __init__(self, transport, target, dispatcher, executor_cls):
        ...
        self.transport = transport
        self.target = target
        self.dispatcher = dispatcher

        self._executor_cls = executor_cls
        self._executor = None

        super(MessageHandlingServer, self).__init__()

    def start(self):
        if self._executor is not None:
            return
        listener = self.transport._listen(self.target)
        self._executor = self._executor_cls(..., listener, self.dispatcher)
        self._executor.start()

    def stop(self):
        if self._executor is not None:
            self._executor.stop()

    def wait(self):
        if self._executor is not None:
            self._executor.wait()
        self._executor = None


# in oslo.messaging.rpc
class RPCServer(server.MessageHandlingServer):

    def __init__(self, transport, target, endpoints, executor_cls):
	super(RPCServer, self).__init__(transport,
					target,
					dispatcher.RPCDispatcher(endpoints),
					executor_cls)


class BlockingRPCServer(RPCServer):

    def __init__(self, transport, target, endpoints):
	executor_cls = impl_blocking.BlockingExecutor
	super(BlockingRPCServer, self).__init__(transport,
                                                target,
                                                endpoints,
                                                executor_cls)

# in oslo.eventlet
class EventletRPCServer(server.RPCServer):

    def __init__(self, transport, target, endpoints):
        executor_cls = impl_eventlet.EventletExecutor
	super(EventletRPCServer, self).__init__(transport,
                                                target,
                                                endpoints,
                                                executor_cls)

这里的想法是服务器使用两个内部概念实现:调度器和执行器。 调度器查看传入的消息负载并调用适当的方法。 执行器代表轮询传输以获取传入消息并将它们传递给调度器的策略。 这两种抽象允许我们使用相同的服务器实现多个调度器(例如,用于 RPC 和通知)和多个执行器(例如,阻塞和 eventlet)。

这里特别重要的是,我们没有在传输驱动程序中编码对 eventlet 的依赖,这为我们将来切换到其他东西留下了空间。

方法调用被分派到 API 端点,这些端点封装了一个远程可调用接口(即一组方法),位于命名空间下并与版本相关联

# in nova
class BaseAPI(object):

    target = rpc.Target(namespace='baseapi', version='1.1')

    def __init__(self, server, service_name, backdoor_port):
        self.conf = server.conf
        self.server = server
        ...
        self.service_name = service_name
        self.backdoor_port = backdoor_port

      @rpc.expose
      def ping(self, context, arg):
          return dict(service=self.service_name, arg=arg)

      @rpc.expose
      def get_backdoor_port(self, context):
          return self.backdoor_port

注意:建议使用一个(可选?)rpc.expose 装饰器来显式标记可用于 RPC 接口的方法。 我们有很多例子表明,我们受益于对我们打算远程调用的方法有更清晰的了解。

另请注意:如果 API 端点必须能够引用服务器 - 例如,引用表示我们使用的配置文件(ConfigOpts 对象),或者在阻塞服务器的情况下,从回调中停止服务器 - 那么你将如上所示将服务器对象传递给 API 端点构造函数。

API 版本协商

描述客户端/服务器 API 的版本控制方式,以及如果服务器未实现客户端所需的版本,则会引发 UnsupportedRpcEnvelopeVersion 异常。

传输驱动程序 API

我们应该摆脱消息传递库中的全局传输驱动程序对象,而是要求 API 用户构造一个驱动程序并将其传递给库中需要的地方。 API 用户可以选择拥有自己的全局驱动程序对象以方便使用。

传输驱动程序类应作为入口点注册

# in oslo.messaging.drivers
NAMESPACE = 'oslo.messaging.drivers'

TRANSPORT_DRIVERS = [
    'rabbit = oslo.messaging._drivers.impl_rabbit:RabbitDriver',
    'qpid = oslo.messaging._drivers.impl_qpid:QpidDriver',
    'zmq = oslo.messaging._drivers.impl_zmq:ZmqDriver',

    # To avoid confusion
    'kombu = oslo.messaging._drivers.impl_rabbit:RabbitDriver',

    # For backwards compat
    'openstack.common.rpc.impl_kombu ='
    ' oslo.messaging._drivers.impl_rabbit:RabbitDriver',
    'openstack.common.rpc.impl_qpid ='
    ' oslo.messaging._drivers.impl_qpid:QpidDriver',
    'openstack.common.rpc.impl_zmq ='
    ' oslo.messaging._drivers.impl_zmq:ZmqDriver',
]

# in setup.py
entry_points = {
    drivers.NAMESPACE: drivers.TRANSPORT_DRIVERS,
}

并通过“rpc_backend”配置选项查找该类

  from stevedore import driver

  def get_transport(conf, url=None):
      if url is not None:
          rpc_backend = urlparse.urlparse(url).scheme
      else
          rpc_backend = conf.rpc_backend
      ...
      kwargs = ...
      ...
      mgr = driver.DriverManager(drivers.NAMESPACE,
                                 rpc_backend,
                                 invoke_on_load=True,
                                 invoke_args=[conf],
                                 invoke_kwds=kwargs)
      return Transport(mgr.driver)

返回的驱动程序将是 BaseDriver 抽象基类的实现

class BaseDriver(object):

    __metaclass__ = abc.ABCMeta

    def __init__(self, conf, url=None, default_exchange=None):
	self.conf = conf
        self._url = url
	self._default_exchange = default_exchange

    @abc.abstractmethod
    def send(self, target, message, wait_for_reply=None, timeout=None):
        pass

    @abc.abstractmethod
    def listen(self, target):
        pass

上下文

在哈瓦那设计峰会上,关于上下文是否应该在 API 中保持其当前状态进行了一些辩论。 我还没有意见 :)

错误处理

描述从 API 用户的角度来看,服务器端引发异常时会发生什么

需要考虑的细节

  • allowed_rpc_exception_modules 配置
  • 如果无法反序列化,则回退到 RemoteError
  • 将类型设置为 Remote_$type 的怪异之处
  • @client_exceptions() 装饰器和日志记录
  • 超时 - 例如,什么会引发 rpc_common.Timeout 异常

配置

库注册的配置选项是 API 的一部分。 例如,如果我们决定将“control_exchange”重命名为“topic_container”,那么任何引用 CONF.control_exchange 的代码都会中断。 我们可能可以安全地说,特定于传输驱动程序的配置选项是私有的,并说我们不支持 API 用户依赖这些选项? 或者我们只是记录哪些选项是 API 的一部分,并警告人们不要依赖其他选项?

我们有一些与传输无关的配置选项

  • rpc_backend - 选择传输驱动程序
  • control_exchange - 允许例如,使用相同的消息代理进行两个 heat 部署,只需将 control_exchange = 'heat1' 即可

然后是一些调整参数

  • rpc_thread_pool_size
  • rpc_conn_pool_size
  • rpc_response_timeout
  • rpc_cast_timeout

然后我们有一个应该只是 API 的一部分的配置选项,因为用户永远不应该配置它

  • allowed_rpc_exception_modules

还有这种怪异之处

  • fake_rabbit

oslo.notify

通知 API 定义了发出和处理事件通知的语义。

与 RPC API 类似,有多个后端驱动程序实现使用不同传输机制的 API 语义。 到目前为止,最常用的后端驱动程序通知是 RPC 驱动程序,它通过与 RPC 消息相同的消息传递系统发送通知。 但是,完全有可能我们将来会有一个使用不适合用于 RPC 消息的传输的通知驱动程序。

用例

重要的是要对比通知与 RPC 的用例。 历史上,我们将其使用限制在希望异步向第三方发出信号的情况下。

Ceilometer 计量消息

Doug 的用例

http://lists.openstack.org/pipermail/openstack-dev/2013-April/008109.html
6. 一个应用程序必须能够侦听任何对等方发送的消息,以及任何服务(即,指定不同的“交换器”,尽管我们希望将其扩展到包括不同的代理)。

可以描述为

在多个服务器池的每个服务器上调用方法

或者

插入一个 Tap 以侦听服务器池上的方法调用

Ceilometer 中的用例似乎是

  • ceilometer-collector 处理来自 glance-api 的通知,并在“ceilometer”交换器中的“metering”主题上调用“record_metering_data”,并且其中一个 ceilometer-collector 服务处理该方法……但另一个第三方服务也处理它?

似乎这里与通知有些重叠 - 从 ceilometer 代码(无论是在 ceilometer-collector 中运行的代码还是插入到例如 nova-compute 中的代码)使用内部 ceilometer RPC API 发送计量数据是正确的。 但是,将计量数据发送给未知的第三方侦听器,我们应该使用通知。

即,对于将计量数据发送到 ceilometer-collector,我们有 ceilometer 代码使用内部 ceilometer RPC API。 但是,为了使计量数据可供第三方使用,为什么不使用通知发布它们?

正如 讨论的,Ceilometer 将仅发送计量数据作为通知。

Searchlight 资源索引

Searchlight 索引来自各种 Openstack 服务的信息(当前到 Elasticsearch),为 Horizon 和其他消费者提供统一的搜索和缓存层。 它侦听通知以确定资源何时发生更改。 在可能的情况下,仅使用通知来索引资源,并且我们正在努力鼓励服务将其尽可能多的信息放入其通知中。

发出通知

通知消息具有定义明确的格式,通知消费者依赖于该格式。 一个示例通知消息可能是

{
  'message_id': '5c329c20-d435-444c-8fa2-e1b54592219c',
  'publisher_id': 'compute.host1',
  'event_type': 'compute.instance.exists',
  'priority': 'INFO',
  'payload': {'user_id': ..., 'tenant_id': ..., 'instance_id', ..., ...}
  '_context_user_id': None,
  '_context_project_id': None,
  '_context_is_admin': True,
  ...
}

我们已经承诺了此消息格式的稳定性,因此此 API 的第一个目标必须是能够发送此确切的格式。

我们应该努力鼓励针对发送相关通知的项目特定 API,例如

class ImageNotifier(notify.Notifier):

    def __init__(self, conf):
        super(ImageNotifier, self).__init__(conf, publisher_id=conf.host)

  def add(self, context, image):
      payload = self._generate_image_payload(mage)
      self.notifier.info(context, 'image.update', payload)

一个基类将促进这一点

class Notifier(object):

    def __init__(self, conf, publisher_id, driver=None,
                 transport=None, topic=None):
        self.conf = conf
        self.publisher_id = publisher_id
        self._drivers = drivers if driver is not None else conf.notification_driver
        self._topics = [topic] if topic is not None else conf.notification_topics

    def debug(self, context, event_type, payload):
        ...

    def info(self, context, event_type, payload):
        ...

    def warn(self, context, event_type, payload):
        ...

通过为每个优先级级别添加一个方法,我们可以更明确地说明哪些级别可用。 我们允许 API 用户指定通知驱动程序、消息传递传输和/或主题,但这些通常来自配置。

我们将继续支持现有的配置选项

  • notification_driver:应用于发出通知的驱动程序列表。 当前,此列表中的值是完整的 python 模块路径,但我们将切换到使用入口点名称,同时仍然支持以前的名称。 当前可用的驱动程序是“rpc”、“rpc2”、“log”、“no-op”和“test”。 两个“rpc”驱动程序是最常用的,可能更有意义将它们重命名为例如“messaging”。 请注意,“rpc2”驱动程序使用新的网络消息格式,并且“rpc”驱动程序已被弃用,但将在可预见的未来继续受到支持。
  • notification_topics:将发送通知消息的主题名称列表。 默认值为“notifications”,实际上,主题名称是例如“notifications.info”、“notifications.error”,因此这更多是关于配置主题前缀而不是实际主题名称。

处理通知

子主题

RPC 通知驱动程序当前使用类似于“notifications.info”的主题,这意味着这些通知的消费者需要使用基于通配符的主题来获取所有通知。 我们可能希望允许 RPC 驱动程序没有基于通配符的主题匹配机制,因此我们将显式地将主题和子主题传递给驱动程序,并在发送时传递。

问题是当前队列名称与路由键之间存在 1:1 的映射。 理想情况下,我们希望有一个队列 并根据通知优先级向其发出不同的路由键。 这个单个队列应该能够拥有 .info、.error、.warn 等事件(与每个优先级不同的队列相反)。 这样,我们可以使用单个消费者绑定来处理所有优先级。 当我们需要发布到多个下游消费者(例如,ceilometer 和 billing)时,问题会加剧。 现在我们每个下游有 3 个队列,或者 6 个队列。 通配符主题匹配实际上是一件好事。 --Sandy

进一步讨论

Cells

我们是否涵盖了 cells 的用例? 看起来我们只需要能够连接到另一个代理并发送我们希望邻居单元的 nova-cells 服务调用的方法描述?

参考文献