跳转到: 导航, 搜索

Python 客户端库 (Marconi)

本文档旨在建立使用 python Zaqar 客户端的工作感受。它既是愿景,也是实现者的方向。请查看蓝图以获取更多详细信息。

如果您有任何问题,请在 freenode.irc.net #openstack-zaqar 上联系我们!

特性

客户端库在 PyPI 上可用: https://pypi.python.org/pypi/python-marconiclient/

  • 证书验证
  • Token 过期时的重新认证
  • 异步操作
  • 完全覆盖 Zaqar API

Etherpads

控制器

  • 客户端:处理账户范围内的操作 - 队列检索等。
  • 队列:提供对一些元数据操作的访问,以及消息处理
  • 消息:提供对消息属性的访问,以及删除
  • 声明:声明的消息集合 - 处理查询、更新和删除

Core

  • 连接:处理与 Keystone 的身份验证,获取 Zaqar 端点,处理请求和网络逻辑

实用工具

  • ErrorBase:Zaqar 客户端特定错误的基类

API 概要

请参阅 Zaqar/specs/api/v1

客户端 API 概要

# Given the following vars defined with the following types:
# - Client:client
# - Subscriber:sub
# - Queue:queue
# - Message:message
# - Claim:claim
>>> dir(client)
['async', 'create_queue', 'home', 'queues']
>>> dir(sub)
['channels', 'listen', 'subscribe', 'unsubscribe']
>>> dir(queue)
['claim', 'delete', 'href', 'messages', 'metadata', 'name', 'post', 'stats']
>>> dir(metadata)
['content', 'reload', 'update']
>>> dir(claim)
['delete', 'grace', 'href', 'messages', 'patch', 'release', 'ttl']
>>> dir(message)
['age', 'body', 'delete', 'href', 'reload', 'status', 'ttl']

用法

使用连接

由于包含通用的 lib BaseClient,本节已得到简化。Openstack 所说的“客户端”我们将称之为“连接”。原因是连接处理网络层级的细节:put、patch、delete、post 等。我们所说的客户端关注更高层级的细节,例如队列和声明。

接口如下,基于代码阅读

    >>> # given a class zaqarclient.connection implemented using zaqarclient.common.apiclient.client
    >>> from zaqarclient import connection
    >>> conn = connection.Connection(auth_plugin=...,
                                     username='tacocat', password='queue_master')
    >>> dir(conn)
    ['client_request', 'head', 'get', 'post', 'put', 'delete', 'patch', 'get_class']

希望能够更新 Http 客户端以使用 enum34,如下面详细说明。目前也足够了。

有关 Python 枚举的更多信息,请参阅 enum34PEP 435

客户端操作

    >>> client = Connection(async=False)
    >>> client.queues(marker=..., limit=10, detailed=False)
    <generator object <genexpr> ar 0x7fd3ef1ed730>
    >>> client.create_queue(name='wot')
    <ZaqarQueue [wot]>
    >>> client.home
    <HomeDoc ...>
    # affects all operations for objects acquired from the client
    >>> client.async
    False

设置发布/订阅连接

这借鉴了一些 redis-py 的想法

    >>> sub = client.subscriber()
    >>> sub.<TAB>
    sub.channels  sub.listen    sub.subscribe    sub.unsubscribe
    >>> sub.channels
    set([])
    >>> sub.subscribe('darn_good_queue')
    >>> sub.channels
    set(['darn_good_queue'])
    >>> for msg in sub.listen():
            # blocks until a message arrives in any of the subscribed queues
            # polling implementation by default
    <Ctrl-C>
    >>> 

队列处理

    >>> queue = next(q for q in client.queues if q.name == 'tacocat')
    >>> queue.name
    u'tacocat'
    >>> queue.href
    u'/v1/queues/tacocat'
    >>> queue.stats
    # a dictionary derived from a JSON response
    >>> queue.messages(include_claimed=False)
    <generator object <genexpr> ar 0x7fd3ef1ed742>
    >>> queue.messages(ids=[50b68a50d6f5b8c8a7c62b01, 50b68a50d6f5b8c8a7c62b02],
                               claim=a28ee94e-6cb4-11e2-b4d5-7703267a7926, limit=1)
    <generator object <genexpr> ar 0x7fd3ef1ed742>
    >>> queue.post_messages(messages=...)
    >>> queue.metadata
    <Metadata ...>  # fetches metadata from API, returns a Metadata controller
    >>> queue.claim(limit=10)
    <Claim size:8 ...>  # size: actual number of messages received
    >>> queue.delete()

队列元数据处理

    >>> meta = queue.metadata
    >>> meta.update({'max_size': 1000})  # communicate with API, replaces
    >>> meta.reload()  # gets most recent attributes from API

消息处理

    >>> message = next(queue.messages(...))
    >>> message.age
    90
    >>> message.ttl
    120
    >>> message.href
    u'/v1/queues/darn_good_queue/messages/91wqe9bqwsbq98'
    >>> message.body
    {u'action': u'win'}
    >>> message.reload()
    >>> message.delete()
    >>> message.status
    <EnumValue: Message.Free [value=1]>

声明管理

>>> claim = queue.claim(limit=10)
>>> claim.messages(...)
<generator object <genexpr> ar 0x7fd3ef1ed742>
>>> msg = next(claim.messages(...))
>>> msg.status
<EnumValue: Message.Claimed [value=2]>
>>> claim.href
u'/v1/queues/tacocat/claims/a28ee94e-6cb4-11e2-b4d5-7703267a7926'
>>> claim.ttl
90
>>> claim.grace
30
>>> claim.patch(ttl=..., grace=...)
>>> claim.delete()

错误管理

Wiki 对 Zaqar 错误 进行了详尽的解释。客户端层级的错误处理在于将 Zaqar 返回的响应转换为对用户有意义的异常。

以下是客户端层级错误使用的快速模拟

    >> error = zaqar.error.ErrorBase()
    >>> error.title
    u'...'
    >>> error.description
    u'...'
    >>> error.code
    1092
    >>> raise error
    ErrorBase (error.code): error.title
    error.description

工作流程

本节更偏向于自上而下。请将其视为在 ipython 环境中启动 python-zaqarclient 的感受

    >>> from zaqarclient.connection import Connection
    >>> from zaqarclient.client import Client
    >>> client = Connection(auth_url='https://keystone.example.com/', username='me', password='win')
    >>> client.create_queue('wot')
    >>> queue = next(client.queues())
    >>> queue.post_messages(messages=[{'event': {'data': 'winning', 'score': 10}})
    >>> message = next(queue.messages())
    >>> message.body
    {'event': {'data': 'winning', 'score': 10}}
    >>> message.status
    <Free...>
    >>> queue.stats
    {...}
    >>> claim = queue.claim(1)
    >>> message = next(claim.messages())
    >>> message.status
    <Claimed...>
    >>> message
    <Message ttl:120>
    >>> message.delete()
    >>> claim
    <Claim size:1>
    >>> claim.delete()
    >>> queue
    <Queue [wot]>
    >>> queue.delete()
    >>> client.async
    False