跳转到: 导航, 搜索

Zaqar/用例

模式

Marconi 被设计用来支持多种消息模式,包括许多 企业集成模式,通过 API 语义和消息管道驱动程序的组合实现。我们列出了一些更常见的模式如下。


任务分发

在此用例中,您使用队列(类似于 SQS)来供给工作池。

主要流程

1. 生产者将消息推送到队列

2. 工作者声明消息

3. 工作者处理消息

4. 消息删除(确认)消息

变体

1a. 一个或多个生产者将多个消息推送到队列。在这种情况下,工作者可以声明一批消息,或者一次声明一条消息,具体取决于其自行决定。

2a. 工作者在消息过期之前没有声明消息。生产者应该调整为使用更高的消息 TTL,或者工作者应该更频繁地轮询。

2b. 没有消息可声明,因为所有消息都已被此工作者或其他工作者消费。工作者继续定期发送请求以声明消息,直到有新消息可用。或者,一个管理进程可以选择根据负载自动扩展工作池(可以通过获取队列的统计信息和/或监控单个工作者是否空闲来检查负载)。

3a. 工作者在声明消息后崩溃,但在处理之前。在这种情况下,消息上的声明最终会过期,之后消息将再次可用,可以被另一个工作者声明。

3b. 工作者在处理消息后崩溃,但在删除之前。在这种情况下,下一个声明消息的工作者将检查消息是否已经被处理过,然后再继续。在某些情况下,罕见的重复处理消息是可以接受的,在这种情况下,不需要进行检查。


广播

在这种模式下,客户端将事件广播给所有感兴趣的观察者。

主要流程

1. 发布者将消息 X 推送到队列

2. 观察者 A 列出队列中的消息,获取消息 X

3. 观察者 B 列出队列中的消息,获取消息 X

4. 观察者 A 和 B 单独处理消息 X

变体

2a. 观察者已经在之前的轮次中列出了消息。在这种情况下,观察者将提交一个“next”标记,告诉服务器它已经看到哪些消息,以便服务器只返回新的消息给观察者。

2b. 观察者在消息 X 过期之前没有列出消息。在这种情况下,生产者应该调整为在发布消息时使用更高的 TTL,和/或观察者应该更频繁地轮询。

2b. 所有消息都已列出。在这种情况下,观察者会得到一个空响应,并将继续定期使用队列的最后一个已知标记列出消息,直到它得到一个非空响应。

2c. 观察者在能够获取消息 X 之前崩溃。在这种情况下,一个进程监视器将简单地重新启动观察者,并且观察者将能够获取消息 X,只要它能够在发布者设置的 TTL 期间内轮询即可。


点对点

这有点像一个松散耦合的 RPC 模式。在这种情况下,每个代理都有自己的队列。在 Cloud Queues 中,“队列”资源非常轻量级,因此您可以根据需要创建数十万个队列。

请注意,双向通信只需要一个队列。

主要流程

1. 控制器将消息推送到队列

2. 代理列出队列中的消息,获取消息

3. 代理执行请求的操作

4. 代理将结果消息推送到队列

5. 控制器列出队列中的消息,获取结果消息

变体

2a. 代理可以声明消息,但它比简单地列出消息要慢,而且由于只有单个客户端在读取队列,因此声明在这种情况下是不必要的。

2b. 代理在获取消息之前崩溃。在这种情况下,只要代理在控制器设置的 TTL 期间内启动,它仍然可以获取消息。

4a. 代理在发布结果消息之前崩溃。控制器需要有一个超时期限的概念,在此期限之后它不再期望从其请求收到响应。

4b. 如果不期望结果,则跳过这些步骤。


审计

这种模式实际上是一个混合用例。想法是添加一个额外的观察者,该观察者不断地列出和记录队列中的消息。这可以是一个 CLI “tail” 类型的脚本,或者是一个被动服务器进程记录所有传入的内容。

这有助于诊断消息生产者中的问题/错误。它还可以确保消息已正确处理,例如,如果您将队列用作计量解决方案的一部分,并且希望审计您的记录以确保所有可计费事件都已提交到计费系统。

也就是说,能够延迟消息被声明一段时间的功能是我们仍然需要的功能,以便使审计真正有效。毕竟,您需要给审计员一个在工作者删除它之前列出消息的机会。今天,工作者*可以*在声明消息后暂停 X 秒再删除它,但我们认为在生产者端处理它会更优雅。