项目介绍
<p>[TOC]</p>
<p><strong>RMQ</strong>(reliable-message-queue)是<strong>基于可靠消息的最终一致性</strong>的分布式事务解决方案。</p>
<h3>框架定位</h3>
<p>RMQ本身不生产消息队列,只是消息的搬运工。
RMQ框架提供消息预发送、消息发送、消息确认、消息恢复、消息管理等功能,结合成熟的消息中间件,解决分布式事务,达到数据最终一致性。</p>
<hr />
<h3>流程说明</h3>
<p>假设两个业务系统的两个业务AB需要在一个事务内处理。基于RMQ实现分布式事务流程图如下:</p>
<h5>正常流程</h5>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/d45c0bbfe8a87e86d898c9a000426bab?showdoc=.jpg" alt="" /></p>
<ol>
<li>
<p>在上层系统处理业务前,先向RMQ发送一条消息,RMQ收到后将该条消息持久化,但不向下投递。此时下层系统仍然不知道该条消息的存在。持久化成功后,向上层系统返回持久化成功。</p>
</li>
<li>上层系统收到应答后,执行业务A。业务A处理完成后,请求RMQ确认消息接口(异步)。该请求发出后,对上层系统而言,该事务的处理过程就结束了,此时它可以处理别的任务了。
为什么要异步请求确认消息接口?</li>
</ol>
<pre><code>(1) 假如使用同步请求,RMQ确认消息成功,已经把MQ消息发送给下层业务系统。但是响应时由于网络波动或其他原因抛出异常,使已经正常执行的业务A回滚,但是消息已经发出,下层业务已处理,就会造成数据不一致。
(2) 异步请求可以忽略确认消息请求的异常。假如确认请求异常,由消息确认子系统来处理,下文会介绍。</code></pre>
<ol>
<li>
<p>RMQ收到确认消息请求后,标记该消息为已确认,向下层系统投递该消息,从而执行业务B。</p>
</li>
<li>业务B执行完成后,下层系统调用RMQ确认消费消息,通知RMQ消息已经成功消费,RMQ会删除已成功消费的消息。</li>
</ol>
<p>上述过程可以得出以下结论:</p>
<pre><code>(1) RMQ扮演着分布式事务协调者的角色。
(2) 业务A完成后,到业务B完成之间,会存在一定的时间差。在这个时间差内,整个系统处于数据不一致的状态,但这短暂的不一致性是可以接受的,因为经过短暂的时间后,系统又可以保持数据一致性,满足BASE理论。</code></pre>
<hr />
<h5>消息发送阶段异常流程</h5>
<p>假设在消息投递给下层系统前,每一步都有可能因为网络或其他原因,发生异常。</p>
<ul>
<li>如果异常不影响数据一致性,则无需处理。</li>
<li>如果异常影响数据一致性,则由RMQ的<strong>消息确认子系统</strong>,定时回查长时间未确认的消息。然后再根据业务处理结果投递或删除消息。如果回查请求异常(上层系统Down机或其他原因),则下次继续回查,直至上层系统返回明确结果。</li>
</ul>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/f15bf90667a6e65e7edfd202a06ca7c4?showdoc=.jpg" alt="" /></p>
<table>
<thead>
<tr>
<th>异常情况</th>
<th>数据状态</th>
<th>数据一致性</th>
<th>处理</th>
</tr>
</thead>
<tbody>
<tr>
<td>预发送消息异常</td>
<td>消息未存储,业务未执行</td>
<td>一致</td>
<td>无需处理</td>
</tr>
<tr>
<td>消息持久化异常</td>
<td>消息未存储,业务未执行</td>
<td>一致</td>
<td>无需处理</td>
</tr>
<tr>
<td>返回应答异常</td>
<td>消息已存储,业务未执行</td>
<td>不一致</td>
<td>回查</td>
</tr>
<tr>
<td>执行业务异常</td>
<td>消息已存储,业务未执行</td>
<td>不一致</td>
<td>回查</td>
</tr>
<tr>
<td>确认发送消息异常</td>
<td>消息已存储(消息未确认),业务已执行</td>
<td>不一致</td>
<td>回查</td>
</tr>
<tr>
<td>标记消息已确认异常</td>
<td>消息已存储(消息未确认),业务已执行</td>
<td>不一致</td>
<td>回查</td>
</tr>
</tbody>
</table>
<hr />
<h5>消息消费阶段异常流程</h5>
<p>假设在消息投递给下层系统后,每一步都有可能因为网络或其他原因,发生异常。</p>
<ul>
<li>RMQ的<strong>消息恢复子系统</strong>,定时重发长时间未消费的消息。由于消息可能会重新投递,因此需要下层业务系统实现业务的<strong>幂等性</strong>。</li>
<li>消息恢复子系统重发消息有次数限制,超过一定次数,消息将会被标记为<strong>死亡</strong>,不再重发。可在消息管理子系统<strong>人工干预</strong>(删除或继续重发)。
<img src="https://www.showdoc.cc/server/api/common/visitfile/sign/a4c0465f0b8c89e5f42faf6054b2dca0?showdoc=.jpg" alt="" /></li>
</ul>
<hr />
<h3>架构图</h3>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/0ec8c7ec81c8e42990640a10caaf7de6?showdoc=.jpg" alt="架构图" /></p>
<h5>节点角色说明</h5>
<table>
<thead>
<tr>
<th>节点</th>
<th>角色说明</th>
</tr>
</thead>
<tbody>
<tr>
<td>消息服务子系统</td>
<td>暴露服务的服务提供方,业务服务实现(预发送消息、确认发送消息等)</td>
</tr>
<tr>
<td>消息确认子系统</td>
<td>对于长时间未确认的消息,与上层业务系统确认消息是否发送</td>
</tr>
<tr>
<td>消息恢复子系统</td>
<td>对于长时间未消费的消息,重新发送消息给下层业务系统</td>
</tr>
<tr>
<td>消息管理子系统</td>
<td>消息可视化管理后台,消息队列配置、消息查看、重发、删除等</td>
</tr>
<tr>
<td>消息中间件</td>
<td>提供消息队列功能,ActiveMQ、RocketMQ等</td>
</tr>
</tbody>
</table>