跳转到: 导航, 搜索

Zaqar/用例

< Zaqar(重定向自 Use Cases (Zaqar))

模式

Marconi 被设计用来支持多种消息模式,包括许多 企业集成模式,通过 API 语义和消息管道驱动程序的组合实现。我们列出了一些更常见的模式如下。


任务分发

在此用例中,您使用队列(类似于 SQS)来供给工作池。

主要流程

1. 生产者将消息推送到队列

2. 工作者获取消息

3. 工作者处理消息

4. 消息删除(确认)

变体

1a. 一个或多个生产者将多个消息推送到队列。在这种情况下,工作者可以批量获取消息,或者一次获取单个消息,由其自行决定。

2a. 工作者在消息过期之前没有获取消息。生产者应该调整为使用更高的消息 TTL,或者工作者应该更频繁地轮询。

2b. 没有消息可获取,因为所有消息都已被此工作者或其他工作者消费。工作者继续定期发送请求以获取消息,直到有新消息可用。或者,一个 governor 进程可以选择根据负载自动扩展工作池(可以通过获取队列的统计信息和/或监控单个工作者是否空闲来检查负载)。

3a. 工作者在获取消息后崩溃,但在处理之前。在这种情况下,消息上的声明最终会过期,之后消息将再次可用,可以被另一个工作者获取。

3b. 工作者在处理消息后崩溃,但在删除之前。在这种情况下,下一个获取消息的工作者会检查消息是否已经被处理过,然后再进行处理。在某些情况下,罕见的重复处理消息是可以接受的,在这种情况下,不需要进行检查。


广播

在这种模式下,客户端将事件广播给所有感兴趣的观察者。

主要流程

1. 发布者将消息 X 推送到队列

2. 观察者 A 列出队列中的消息,获取消息 X

3. 观察者 B 列出队列中的消息,获取消息 X

4. 观察者 A 和 B 分别处理消息 X

变体

2a. 观察者在之前的轮次中已经列出过消息。在这种情况下,观察者会提交一个“next”标记,告诉服务器它已经看到哪些消息,以便服务器只返回新的消息给观察者。

2b. 观察者在消息 X 过期之前没有列出消息。在这种情况下,生产者应该调整为在发布消息时使用更高的 TTL,和/或观察者应该更频繁地轮询。

2b. 所有消息都已被列出。在这种情况下,观察者会得到一个空响应,并将继续定期使用队列的最后一个已知标记列出消息,直到它得到一个非空响应。

2c. 观察者在获取消息 X 之前崩溃。在这种情况下,一个进程监控器会简单地重启观察者,并且观察者只要能够在发布者设置的 TTL 期间内轮询,就可以获取消息 X。


点对点

这有点像一个松散耦合的 RPC 模式。在这种情况下,每个代理都有自己的队列。在 Cloud Queues 中,“队列”资源非常轻量级,因此您可以根据需要创建数十万个队列。

请注意,双向通信只需要一个队列。

主要流程

1. 控制器将消息推送到队列

2. 代理列出队列中的消息,获取消息

3. 代理执行请求的操作

4. 代理将结果消息推送到队列

5. 控制器列出队列中的消息,获取结果消息

变体

2a. 代理可以获取消息,但比简单地列出消息要慢,而且由于只有单个客户端在读取队列,因此获取消息也没有必要。

2b. 代理在获取消息之前崩溃。在这种情况下,只要代理在控制器设置的 TTL 期间内启动,它仍然可以获取消息。

4a. 代理在发布结果消息之前崩溃。控制器需要有一个超时期限的概念,在此期限之后它不再期望从其请求收到响应。

4b. 如果不需要结果,则跳过这些步骤。


审计

这种模式实际上是一个混合用例。想法是添加一个额外的观察者,该观察者会不断列出和记录队列中的消息。这可以是一个 CLI “tail” 类型的脚本,或者是一个被动服务器进程记录所有传入的内容。

这有助于诊断消息生产者中的问题/错误。它还可以确保消息已正确处理,例如,如果您将队列用作计量解决方案的一部分,并且希望审计记录以确保所有计费事件都已提交到计费系统。

也就是说,能够延迟消息被获取一段时间的功能是我们仍然需要的功能,以便使审计真正有效。毕竟,您需要给审计员一个列出消息的机会,然后再由工作者删除它。今天,工作者*可以*在获取消息后暂停 X 秒再删除它,但我们认为在生产者端处理会更优雅。