跳转到: 导航, 搜索

Ceilometer/blueprints/move-listener-framework-oslo

  • Launchpad Entry: CeilometerSpec:move-listener-framework-oslo
  • 创建时间: 2013年02月05日
  • 贡献者: Doug Hellmann

总结

Ceilometer 和 Quantum 使用 RPC 连接对象的私有方法来配置自身监听共享于一组 worker 之间的队列。此蓝图讨论了如何将该 API 公开。

发布说明

原理

Ceilometer 和 Quantum 正在使用 RPC Connection 类的私有方法,这限制了它们只能使用 kombu 或 qpid 实现。我们希望公开该 API,以便促进其他 RPC 驱动程序中的实现。

用户故事

Ceilometer 希望监听来自所有其他 OpenStack 组件的通知,在 collector 进程中。部署者需要能够运行 collector 的多个副本,以分散传入通知消息的处理负载。不应多次处理任何消息,因此现有的扇出订阅模型不起作用。消息没有被封装在 RPC 信封中,因此 ProxyCallback 类(由 create_worker() API 使用)不起作用(我们也不能简单地创建一个不同的调度器)。

前提条件

  1. 该 API 不应对传入消息的格式做出任何假设,除了所有其他消息所做的通常的签名和 JSON 序列化假设之外。
  2. 该 API 不应对用于监听传入消息的控制交换机做出任何假设。
  3. 该 API 不应对用于监听传入消息的队列名称做出任何假设。

设计

Connection 类上的新方法将避免向后兼容性问题。


def join_consumer_pool(self, callback, pool_name, topic, exchange_name=cfg.CONF.control_exchange):
    """Register as a member of a group of consumers for a
    given topic from the specified routing exchange.

    Exactly one member of a given pool will receive each message.

    A message will be delivered to multiple pools, if more than one is created.
    """


在 AMQP 驱动程序中,回调将使用基于 ProxyCallback 类但与特定调度行为不太直接相关的 CallbackWrapper 包装。


class CallbackWrapper(object):
    def __init__(self, conf, callback, connection_pool):
        ...

    def __call__(self, message_data):
        """Spawn a GreenPool thread to invoke the callback with the message_data."""

    def wait(self):
        """Wait for all callback threads to exit."""


应该能够为 common.ProxyCallback 和 common.CallbackWrapper 创建一个基类,以共享等待和池管理行为。

ZMQ 驱动程序现在没有使用 ProxyCallback,但具有一组类似的类。需要为 ZMQ 驱动程序创建一个 CallbackWrapper 版本。

可以通过单独的 NotificationDispatcher 类来实现基于通知类型的调度,该类还具有 call 方法,因此可以将实例传递给 join_consumer_pool(),就像它是一个普通的 callback 一样。 NotificationDispatcher 的用户将对其进行子类化,以提供基于通知名称的方法,类似于当前在 Quantum 中找到的版本 (https://github.com/openstack/quantum/blob/master/quantum/agent/rpc.py#L90)。 该实现不完全适合重用,因为它还在处理订阅和消费逻辑。

实现

UI 变更

代码变更

AMQP 实现(kombu 和 qpid)将基于其 declare_topic_consumer() 方法(这是 Ceilometer 和 Quantum 一直使用的私有方法)。将使用池名称创建一个队列,并将其附加到交换机,以便接收指定主题的消息。当收到每条消息时,将使用整个消息作为参数调用回调。

迁移

  1. Ceilometer 将更新为使用新方法,因为它正在开发中,作为一种测试方法。
  2. 新方法将添加到 oslo-incubator。
  3. Ceilometer 中来自 oslo 的 rpc 库将被更新。

测试/演示计划

这不必在规范接近 Beta 之前添加或完成。

未解决的问题

  1. 我不确定如何为 ZMQ 实现这一点。