在分布式系统中,用到同步锁是常见场景。本文同步锁适用于,对某一个相同参数(如订单号)进行加锁,如果同一个订单号出现并发,那么要依次等待上一个执行完成再进行下一个请求的执行,同时设置等待超时时间。当然还有一种锁是直接丢弃第二次请求,本文不是这种锁。
1. 定义注解
定义一个自定义注解来标记需要加锁的方法:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {String key();
long timeout() default 60; // 默认超时时间 60 秒}
2. 创建分布式锁工具类
创建一个工具类来处理 Redis 中的分布式锁操作:
更新 DistributedLock 工具类,使用单个 timeout 参数:
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Component
public class DistributedLock {
private final StringRedisTemplate stringRedisTemplate;
public DistributedLock(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}
public boolean tryLock(String key, String value, long timeout) {Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
return result != null && result;
}
public void unlock(String key, String value) {String currentValue = stringRedisTemplate.opsForValue().get(key);
if (value.equals(currentValue)) {stringRedisTemplate.delete(key);
}
}
public boolean tryLockWithWait(String key, String value, long timeout) throws InterruptedException {long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {if (tryLock(key, value, timeout)) {return true;}
Thread.sleep(100); // 等待 100 毫秒后重试
}
return false;
}
}
3. 创建 AOP 切面
创建一个 AOP 切面来自动处理分布式锁:
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.UUID;
@Aspect
@Component
public class DistributedLockAspect {
@Autowired
private DistributedLock distributedLock;
@Around("@annotation(com.yourpackage.DistributedLock)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
DistributedLock lockAnnotation = method.getAnnotation(DistributedLock.class);
// 构建锁的 key
String className = joinPoint.getSignature().getDeclaringTypeName();
String methodName = method.getName();
String businessKey = extractBusinessKey(lockAnnotation.key(), joinPoint.getArgs(), method);
String key = className + ":" + methodName + ":" + businessKey;
String value = UUID.randomUUID().toString();
long timeout = lockAnnotation.timeout();
if (distributedLock.tryLockWithWait(key, value, timeout)) {
try {return joinPoint.proceed();
} finally {distributedLock.unlock(key, value);
}
} else {throw new RuntimeException("Failed to acquire lock after waiting");
}
}
private String extractBusinessKey(String keyExpression, Object[] args, Method method) {StandardEvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < args.length; i++) {context.setVariable("arg" + i, args[i]);
}
return parser.parseExpression(keyExpression).getValue(context, String.class);
}
}
4. 使用注解
在你的服务方法上使用这个注解:
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/orders")
public class OrderController {@PostMapping("/pay/{orderId}")
@DistributedLock(key = "#arg0.id", timeout = 60) // 从第一个参数中提取 id 字段
public ResponseEntity<String> payOrder(@RequestBody OrderRequest request) {
// 处理支付逻辑
return ResponseEntity.ok("Payment processed successfully for order: " + request.getId());
}
@PostMapping("/cancel/{orderId}")
@DistributedLock(key = "#arg1", timeout = 60) // 直接使用第二个参数
public ResponseEntity<String> cancelOrder(@PathVariable String orderId, @RequestParam String userId) {
// 处理取消逻辑
return ResponseEntity.ok("Order cancelled successfully for user: " + userId);
}
}
发表至: 代码脚本
2024-12-16