Zaqar/用例
模式
Marconi 被设计用来支持多种消息模式,包括许多 企业集成模式,通过 API 语义和消息管道驱动程序的组合实现。我们列出了一些更常见的模式如下。
任务分发
在此用例中,您使用队列(类似于 SQS)来供给工作池。
主要流程
1. 生产者将消息推送到队列
2. 工作者声明消息
3. 工作者处理消息
4. 消息删除(确认)消息
变体
1a. 一个或多个生产者将多个消息推送到队列。在这种情况下,工作者可以声明一批消息,或者一次声明一条消息,具体取决于其自行决定。
2a. 工作者在消息过期之前没有声明消息。生产者应该调整为使用更高的消息 TTL,或者工作者应该更频繁地轮询。
2b. 没有消息可声明,因为所有消息都已被此工作者或其他工作者消费。工作者继续定期发送请求以声明消息,直到有新消息可用。或者,一个管理进程可以选择根据负载自动扩展工作池(可以通过获取队列的统计信息和/或监控单个工作者是否空闲来检查负载)。
3a. 工作者在声明消息后崩溃,但在处理之前。在这种情况下,消息上的声明最终会过期,之后消息将再次可用,可以被另一个工作者声明。
3b. 工作者在处理消息后崩溃,但在删除之前。在这种情况下,下一个声明消息的工作者将检查消息是否已经被处理过,然后再继续。在某些情况下,罕见的重复处理消息是可以接受的,在这种情况下,不需要进行检查。
广播
在这种模式下,客户端将事件广播给所有感兴趣的观察者。
主要流程
1. 发布者将消息 X 推送到队列
2. 观察者 A 列出队列中的消息,获取消息 X
3. 观察者 B 列出队列中的消息,获取消息 X
4. 观察者 A 和 B 单独处理消息 X
变体
2a. 观察者已经在之前的轮次中列出了消息。在这种情况下,观察者将提交一个“next”标记,告诉服务器它已经看到哪些消息,以便服务器只返回新的消息给观察者。
2b. 观察者在消息 X 过期之前没有列出消息。在这种情况下,生产者应该调整为在发布消息时使用更高的 TTL,和/或观察者应该更频繁地轮询。
2b. 所有消息都已列出。在这种情况下,观察者会得到一个空响应,并将继续定期使用队列的最后一个已知标记列出消息,直到它得到一个非空响应。
2c. 观察者在能够获取消息 X 之前崩溃。在这种情况下,一个进程监视器将简单地重新启动观察者,并且观察者将能够获取消息 X,只要它能够在发布者设置的 TTL 期间内轮询即可。
点对点
这有点像一个松散耦合的 RPC 模式。在这种情况下,每个代理都有自己的队列。在 Cloud Queues 中,“队列”资源非常轻量级,因此您可以根据需要创建数十万个队列。
请注意,双向通信只需要一个队列。
主要流程
1. 控制器将消息推送到队列
2. 代理列出队列中的消息,获取消息
3. 代理执行请求的操作
4. 代理将结果消息推送到队列
5. 控制器列出队列中的消息,获取结果消息
变体
2a. 代理可以声明消息,但它比简单地列出消息要慢,而且由于只有单个客户端在读取队列,因此声明在这种情况下是不必要的。
2b. 代理在获取消息之前崩溃。在这种情况下,只要代理在控制器设置的 TTL 期间内启动,它仍然可以获取消息。
4a. 代理在发布结果消息之前崩溃。控制器需要有一个超时期限的概念,在此期限之后它不再期望从其请求收到响应。
4b. 如果不期望结果,则跳过这些步骤。
审计
这种模式实际上是一个混合用例。想法是添加一个额外的观察者,该观察者不断地列出和记录队列中的消息。这可以是一个 CLI “tail” 类型的脚本,或者是一个被动服务器进程记录所有传入的内容。
这有助于诊断消息生产者中的问题/错误。它还可以确保消息已正确处理,例如,如果您将队列用作计量解决方案的一部分,并且希望审计您的记录以确保所有可计费事件都已提交到计费系统。
也就是说,能够延迟消息被声明一段时间的功能是我们仍然需要的功能,以便使审计真正有效。毕竟,您需要给审计员一个在工作者删除它之前列出消息的机会。今天,工作者*可以*在声明消息后暂停 X 秒再删除它,但我们认为在生产者端处理它会更优雅。