Ceilometer/blueprints/move-listener-framework-oslo
- Launchpad Entry: CeilometerSpec:move-listener-framework-oslo
- 创建时间: 2013年02月05日
- 贡献者: Doug Hellmann
总结
Ceilometer 和 Quantum 使用 RPC 连接对象的私有方法来配置自身监听共享于一组 worker 之间的队列。此蓝图讨论了如何将该 API 公开。
发布说明
原理
Ceilometer 和 Quantum 正在使用 RPC Connection 类的私有方法,这限制了它们只能使用 kombu 或 qpid 实现。我们希望公开该 API,以便促进其他 RPC 驱动程序中的实现。
用户故事
Ceilometer 希望监听来自所有其他 OpenStack 组件的通知,在 collector 进程中。部署者需要能够运行 collector 的多个副本,以分散传入通知消息的处理负载。不应多次处理任何消息,因此现有的扇出订阅模型不起作用。消息没有被封装在 RPC 信封中,因此 ProxyCallback 类(由 create_worker() API 使用)不起作用(我们也不能简单地创建一个不同的调度器)。
前提条件
- 该 API 不应对传入消息的格式做出任何假设,除了所有其他消息所做的通常的签名和 JSON 序列化假设之外。
- 该 API 不应对用于监听传入消息的控制交换机做出任何假设。
- 该 API 不应对用于监听传入消息的队列名称做出任何假设。
设计
Connection 类上的新方法将避免向后兼容性问题。
def join_consumer_pool(self, callback, pool_name, topic, exchange_name=cfg.CONF.control_exchange):
"""Register as a member of a group of consumers for a
given topic from the specified routing exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than one is created.
"""
在 AMQP 驱动程序中,回调将使用基于 ProxyCallback 类但与特定调度行为不太直接相关的 CallbackWrapper 包装。
class CallbackWrapper(object):
def __init__(self, conf, callback, connection_pool):
...
def __call__(self, message_data):
"""Spawn a GreenPool thread to invoke the callback with the message_data."""
def wait(self):
"""Wait for all callback threads to exit."""
应该能够为 common.ProxyCallback 和 common.CallbackWrapper 创建一个基类,以共享等待和池管理行为。
ZMQ 驱动程序现在没有使用 ProxyCallback,但具有一组类似的类。需要为 ZMQ 驱动程序创建一个 CallbackWrapper 版本。
可以通过单独的 NotificationDispatcher 类来实现基于通知类型的调度,该类还具有 call 方法,因此可以将实例传递给 join_consumer_pool(),就像它是一个普通的 callback 一样。 NotificationDispatcher 的用户将对其进行子类化,以提供基于通知名称的方法,类似于当前在 Quantum 中找到的版本 (https://github.com/openstack/quantum/blob/master/quantum/agent/rpc.py#L90)。 该实现不完全适合重用,因为它还在处理订阅和消费逻辑。
实现
UI 变更
无
代码变更
AMQP 实现(kombu 和 qpid)将基于其 declare_topic_consumer() 方法(这是 Ceilometer 和 Quantum 一直使用的私有方法)。将使用池名称创建一个队列,并将其附加到交换机,以便接收指定主题的消息。当收到每条消息时,将使用整个消息作为参数调用回调。
迁移
- Ceilometer 将更新为使用新方法,因为它正在开发中,作为一种测试方法。
- 新方法将添加到 oslo-incubator。
- Ceilometer 中来自 oslo 的 rpc 库将被更新。
测试/演示计划
这不必在规范接近 Beta 之前添加或完成。
未解决的问题
- 我不确定如何为 ZMQ 实现这一点。