分布式事务解决方案ServiceComb - Omega源码阅读与分享

2019-05-19 05:57发布

概览

ServiceComb已经是Apache的顶级项目,包含两个组件,即 alpha 和 omega。

  • alpha是协调者的角色,需要单独部署的服务,主要负责对事务的事件进行持久化存储以及协调子事务的状态,保证全局事务的状态保持一致。
  • omega是微服务中的一个agent,负责对网络请求进行拦截并向alpha上报事务事件,并在异常情况下根据alpha下发的指令执行相应的补偿操作

一、代码导入

源码地址:https://github.com/apache/servicecomb-pack

  • 第一步当然是下载代码并导入:git clone https://github.com/apache/servicecomb-pack.git

  • 主要模块说明

alpha 是事务协调中心,保存事务日志,通过日志协调各个分支

demo 里面项目的各框架的例子:spring和dubbo saga tcc

docs 设计文档,最先应该熟悉的。

omega 负责与alpha通讯,子事务逻辑

pack-contracts gRPC通讯接口定义文件,通过中间文件生成客户端与服务端面代码,让开发者不必关心通讯过程

web 用angular写的web界面,可以查看事务的状态。

我们主要关注的alpha和omega的代码,gRPC知识是通讯基础非常重要,最好先了解gRPC和probuf、Kyro序列化对阅读源码

还是很有帮助的。但通讯部分只是简单带过。

服务之间的事务传递与OmegaContext

GlobalTxId全局事务ID标记子事务是否同属性一个事务中

ParentTxId 父类的事务ID

localTxId子事务Id

Omega会面向切面编程的方式,向程序中注入相应的逻辑,初始化事务上下文OmegaContext,在事务处理的过程中向alpha报告事务状态,

public class OmegaContext {
//在微服务之间相互传递作为 globalTxId和localTxId的Key
public static final String GLOBAL_TX_ID_KEY = "X-Pack-Global-Transaction-Id";
public static final String LOCAL_TX_ID_KEY = "X-Pack-Local-Transaction-Id";
private final ThreadLocal globalTxId = new InheritableThreadLocal<>();
private final ThreadLocal localTxId = new InheritableThreadLocal<>();
private final IdGenerator idGenerator;//事务Id生成类,用UUID实现
public OmegaContext(IdGenerator idGenerator) {
this.idGenerator = idGenerator;
}
public String newGlobalTxId() {
String id = idGenerator.nextId();
globalTxId.set(id);
return id;
}
}

omega配置与初始化

实现saga协调协议和TCC协调协议,下面就是omega客户端要配置了三个信息

  • alpha集群的通讯地址
  • 服务注册中心的,这个是可选的,可以使用eureka、consul
  • omega.enable=true开启omage,还有@EnableOmega不过这种方式已经被标记抛弃,下个版本也许就不支持注解方式开启了
alpha:
cluster:
address: 127.0.0.1:8081
eureka:
client:
enabled: true
service-url:
defaultZone: http://127.0.0.1:8761/eureka
omega:
enable: true

omega->omega-spring-starter Omega与SpringBoot框架的结合

omega.enable=true或@EnableOmega的作用只标记开启Omega,Omega在SpringBoot上初始化过程:

OmegaSpringAutoConfiguration通过@Configuration,在Spring框架启动时加载并配置

@Configuration
@Import({OmegaSpringConfig.class,TransactionAspectConfig.class})
@ConditionalOnProperty(value = {"omega.enabled"}, matchIfMissing = true)
public class OmegaSpringAutoConfiguration {
//@Configuration用于定义配置类,可替换xml配置文件
//@Import({OmegaSpringConfig.class,TransactionAspectConfig.class})在当前类起作用时,
//也初始化OmegaSpringConfig和TransactionAspectConfig类里的Spring Bean
//@ConditionalOnProperty(value = {"omega.enabled"}, matchIfMissing = true)
//条件满足则初始化OmegaSpringAutoConfiguration 有omega.enabled就初始化,默认值为true
}

OmegaSpringConfig

作用:初始化各Bean,IdGenerator用来生成子事务Id Saga的事件Sender Tcc事件Sender回调CallbackContext

@Configuration
public class OmegaSpringConfig {
@Bean(name = {"omegaUniqueIdGenerator"})
IdGenerator idGenerator() {
return new UniqueIdGenerator();
}
@Bean
OmegaContext omegaContext(@Qualifier("omegaUniqueIdGenerator") IdGenerator idGenerator) {
//分支事务的上下文,OmegaContext里存放globaTxId localTxId
return new OmegaContext(idGenerator);
}
@Bean(name = {"compensationContext"})
CallbackContext compensationContext(OmegaContext omegaContext) {
//补偿者回调上下文
return new CallbackContext(omegaContext);
}
@Bean(name = {"coordinateContext"})
CallbackContext coordinateContext(OmegaContext omegaContext) {
//协调者回调上下文
return new CallbackContext(omegaContext);
}
@Bean
SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext") LoadBalanceContext loadBalanceSenderContext) {
....saga协议消息发送者,初始化,可以使用负载均衡策略给alpha集群发送transaction的事件。
return sagaMessageSender;
}
...省略了Tcc协议的发送者

TransactionAspectConfig

对@SagaStart @Compensable注解AOP的切面编程对象初始

@Configuration
@EnableAspectJAutoProxy
public class TransactionAspectConfig {
@Bean
MessageHandler messageHandler(SagaMessageSender sender,
@Qualifier("compensationContext") CallbackContext context, OmegaContext omegaContext) {
return new CompensationMessageHandler(sender, context);
}
@Bean
SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext context) {
return new SagaStartAspect(sender, context);
}
@Bean
TransactionAspect transactionAspect(SagaMessageSender sender, OmegaContext context) {
return new TransactionAspect(sender, context);
}
@Bean
CompensableAnnotationProcessor compensableAnnotationProcessor(OmegaContext omegaContext,
@Qualifier("compensationContext") CallbackContext compensationContext) {
return new CompensableAnnotationProcessor(omegaContext, compensationContext);
}
...省略TCC协议部分代码
}

Omega内部机制:SagaStartAspect @SagaStart的AOP切面编程

@Aspect
public class SagaStartAspect {
...
public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
this.context = context;
this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
}
@Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
initializeOmegaContext();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
//发送SagaStart事件到alpha
sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
try {
//调用@SagaStart的函数
Object result = joinPoint.proceed();
//发送SagaStart函数结束事件到alpha
sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
LOG.debug("Transaction with context {} has finished.", context);
return result;
} catch (Throwable throwable) {
// We don't need to handle the OmegaException here
if (!(throwable instanceof OmegaException)) {
//@SagaStart函数抛异常,向alpha发送异常事件
sagaStartAnnotationProcessor.onError(method.toString(), throwable);
LOG.error("Transaction {} failed.", context.globalTxId());
}
throw throwable;
} finally {
context.clear();
}
}
}

TransactionAspect对@Compensable注解AOP切面编程

成功场景下,每个事务都会有开始和对应的结束事件。

TransactionAspect=>DefaultRecovery=>CompensableInterceptor

@Aspect
public class TransactionAspect {
@Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
//反射的方法
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
String localTxId = context.localTxId();
context.newLocalTxId();
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
//方法重试次数
int retries = compensable.retries();
//重试调用
RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(retries);
try {
return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, retries);
} finally {
context.setLocalTxId(localTxId);
LOG.debug("Restored context back to {}", context);
}
}
}
-------------DefaultRecovery----------------
public class DefaultRecovery extends AbstractRecoveryPolicy {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public Object applyTo(ProceedingJoinPoint joinPoint, Compensable compensable, CompensableInterceptor interceptor,
OmegaContext context, String parentTxId, int retries) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
LOG.debug("Intercepting compensable method {} with context {}", method.toString(), context);
String compensationSignature = compensable.compensationMethod().isEmpty() ? "" : compensationMethodSignature(joinPoint, compensable, method);
String retrySignature = (retries != 0 || compensationSignature.isEmpty()) ? method.toString() : "";
//
AlphaResponse response = interceptor.preIntercept(parentTxId, compensationSignature, compensable.timeout(), retrySignature, retries, joinPoint.getArgs());
if (response.aborted()) {
String abortedLocalTxId = context.localTxId();
context.setLocalTxId(parentTxId);
throw new InvalidTransactionException("Abort sub transaction " + abortedLocalTxId +" because global transaction " + context.globalTxId() + " has already aborted.");
}
try {
Object result = joinPoint.proceed();
interceptor.postIntercept(parentTxId, compensationSignature);
return result;
} catch (Throwable throwable) {
interceptor.onError(parentTxId, compensationSignature, throwable);
throw throwable;
}
}
}
-------------CompensableInterceptor---------------
class CompensableInterceptor implements EventAwareInterceptor {
private final OmegaContext context;
private final SagaMessageSender sender;
CompensableInterceptor(OmegaContext context, SagaMessageSender sender) {
this.sender = sender;
this.context = context;
}
@Override
public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, String retriesMethod,
int retries, Object... message) {
return sender.send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod,
timeout, retriesMethod, retries, message));
}
@Override
public void postIntercept(String parentTxId, String compensationMethod) {
sender.send(new TxEndedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod));
}
@Override
public void onError(String parentTxId, String compensationMethod, Throwable throwable) {
sender.send(
new TxAbortedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, throwable));
}
}

服务间通信:omega-transport

封装了通讯组件:dubbo fegin resttemplate servicecomb实现的通讯

这些组件各自通讯的基础上,在服务之间相互调用时,把globalTxId和localTxId传递过去,并注入Context中

如:omega-transport->omega-transport-resttemplate

RestTemplateConfig 配置拦截器TransactionClientHttpRequestInterceptor

TransactionClientHttpRequestInterceptor:把当前的上下文的globalTxId和localTxId放到请求里

TransactionHandlerInterceptor 服务提供者,把resttemplate传递过来的globalTxId和localTxId放到当前上下文里,

@Configuration
public class RestTemplateConfig {
@Bean(name = "omegaRestTemplate")
public RestTemplate omegaRestTemplate(@Autowired(required=false) OmegaContext context) {
RestTemplate template = new RestTemplate();
List interceptors = template.getInterceptors();
interceptors.add(new TransactionClientHttpRequestInterceptor(context));
template.setInterceptors(interceptors);
return template;
}
}
class TransactionClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OmegaContext omegaContext;
TransactionClientHttpRequestInterceptor(OmegaContext omegaContext) {
this.omegaContext = omegaContext;
}
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
if (omegaContext!= null && omegaContext.globalTxId() != null) {
request.getHeaders().add(GLOBAL_TX_ID_KEY, omegaContext.globalTxId());
request.getHeaders().add(LOCAL_TX_ID_KEY, omegaContext.localTxId());
LOG.debug("Added {} {} and {} {} to request header",
GLOBAL_TX_ID_KEY,
omegaContext.globalTxId(),
LOCAL_TX_ID_KEY,
omegaContext.localTxId());
} else {
LOG.debug("Cannot inject transaction ID, as the OmegaContext is null or cannot get the globalTxId.");
}
return execution.execute(request, body);
}
}
class TransactionHandlerInterceptor implements HandlerInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OmegaContext omegaContext;
TransactionHandlerInterceptor(OmegaContext omegaContext) {
this.omegaContext = omegaContext;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
if (omegaContext != null) {
String globalTxId = request.getHeader(GLOBAL_TX_ID_KEY);
if (globalTxId == null) {
LOG.debug("Cannot inject transaction ID, no such header: {}", GLOBAL_TX_ID_KEY);
} else {
omegaContext.setGlobalTxId(globalTxId);
omegaContext.setLocalTxId(request.getHeader(LOCAL_TX_ID_KEY));
}
} else {
LOG.debug("Cannot inject transaction ID, as the OmegaContext is null.");
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object o, ModelAndView mv) {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o, Exception e) {
if (omegaContext != null) {
omegaContext.clear();
}
}
}

Omega与Alpha底层通讯使用gRPC

pack-contracts->pack-contract-grpc

gRPC的接口服务定义文件:GrpcCommon.proto GrpcTccEvent.protogrpcTxEvent.proto:这些文件在protobuf命令直接编译成java代码。

如何使用请看:proto文件gRpc基础

TCC
service TccEventService {
rpc OnConnected (GrpcServiceConfig) returns (stream GrpcTccCoordinateCommand) {
}
rpc OnParticipationStarted(GrpcParticipationStartedEvent) returns (GrpcAck) {}
rpc OnParticipationEnded(GrpcParticipationEndedEvent) returns (GrpcAck) {}
rpc OnTccTransactionStarted (GrpcTccTransactionStartedEvent) returns (GrpcAck) {}
rpc OnTccTransactionEnded (GrpcTccTransactionEndedEvent) returns (GrpcAck) {}
rpc OnTccCoordinated(GrpcTccCoordinatedEvent) returns(GrpcAck) {}
rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
}
}
Saga: OnTxEvent函数
service TxEventService {
rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {}
rpc OnTxEvent (GrpcTxEvent) returns (GrpcAck) {}
rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
}
}

谢谢能看到最后的人:我分享我是怎么阅读源码的。源码阅读不能一上来就找到main入口一行一行的看。最先应该了解基本的组成架构、和用到了哪些技术栈,如果还用了你从来没见的技术,建议先去学习这门新的技术,再回头来看代码,熟悉了各模块相对应的功能后。我会找到一个切入口,猜一下它的实现方式,再根据猜测,带着疑问,去找答案。如果对整个项目的模块不是很清楚,最好先把源码里的Demo正常的运行。通过这样的简单学习,一步步的深入。有的代码的抽象是比较复杂的。可先跳过,当你对整个结构都非常了解了,这时再回头去读剩下的难点。最后有个总结有个对比就是最好结果。

文章来源: https://www.toutiao.com/group/6692483505940595212/