Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- stream
- StringBuilder
- 인덱스
- 생성자 주입
- 열 속성
- jwt
- 테스트 코드
- docker
- hashcode
- DDL
- 조합
- static
- equals
- VUE
- select_type
- jpa
- AOP
- MSA
- 바이너리 카운팅
- KEVISS
- SQL
- DI
- cache
- java
- lambda
- 재정의
- Spring
- Test
- 필드 주입
- redis
Archives
- Today
- Total
백엔드 개발자 블로그
Redisson trylock 내부로직 본문
redisson 이란??
- redisson은 자바 언어로 구현된 레디스 분산락 클라이언트이다
- 레디스 분산락은 서로 다른 프로세스가 서로 베타적인 방식으로 공유 리소스와 함께 작동해야 하는 많은 환경에서 매우 유용한 기본 기능이다
- 자바 언어 이외에 ruby, python, php 등 다양한 클라이언트 라이브러리가 존재한다
https://redis.io/docs/manual/patterns/distributed-locks/
분산락을 구현하는 로직
- 구현 자체는 간단하다
- 획득하고자 하는 이름의 락을 정의하고 유효 시간까지 락 획득을 시도하게 된다
- 만약 유효 시간이 지나면 락 획득은 실패하게 된다
- 단 여기서 주의할 점은 unlock할 경우에, 해당 세션에서 잠금을 생성한 락인지 확인해야 한다
- 그렇지 않으면 다른 세션에서 수행중인 잠금이 해제될수도 있다
- isLocked() : 잠금이 되었는지 확인
- isHeldByCurrentThread() : 해당 세션에서 잠금을 생성했는지 확인
// 특정 이름으로 락 정의
RLock lock = redissonClient.getLock(key.toString());
try {
// 락 획득을 시도한다(20초동안 시도를 할 예정이며 획득할 경우 1초안에 해제할 예정이다)
boolean available = lock.tryLock(20, 1, TimeUnit.SECONDS);
if (!available) {
System.out.println("lock 획득 실패");
return;
}
// 트랜잭션 로직(ex. orderService.createOrder(), stockService.increase())
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
락을 점유를 시도하는 tryRock은 어떻게 구현되어 있나?
전체 코드
- 전체 코드를 확인하고 아래는 각 단계별 수행하는 내용에 대해서 추가적으로 설명하려고 한다
- redisson은 일반적으로 lettuce로 동시성을 해결하기 위한 차선책으로 많이 제시된다
- 그 이유는 재시도가 필요한 경우에 lettuce는 직접 스핀락으로 구현해서 락 점유를 시도하기 때문이다
- redisson은 스핀락으로 락을 점유하지 않고 pub/sub 구조로 레디스에 부하를 줄인다고 이야기하지만 실제 내부 로직을 살펴보면 스핀락 개념이 아예 없지는 않다
- redisson 구현의 특징은 lua script와 세마포어의 사용이라고 할 수 있다
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
1. lock 획득을 시도한다
- 획득을 시도하려는 lock의 유지시간을 확인한다
- 아래에서 tryAcuiqre 로직을 더 살펴보겠지만 우선 ttl값이 null이면 lock을 점유했다고 간주한다
// 1. 락 획득을 시도한다
// (락 점유시간이 null이라면 획득이 가능하다고 판단하여 true를 리턴한다)
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
- tryAcquire 내부 로직을 살펴보면 lua script를 사용해서 setnx를 실행하는 것을 확인할 수 있다
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
redis.call('exists', KEYS[1]) == 0
}
-
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // LOCK KEY가 존재하는지 확인한다(없으면 0, 있으면 1) // LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다 // LOCK KEY에 유효시간을 설정한다 // null 값을 리턴한다
-
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 해시맵 기반으로 LOCK KEY와 쓰레드 아이디로 존재하면 0이고, 존재하지 않으면 저장하고 1을 리턴한다 // LOCK KEY가 존재하지 않으면 LOCK KEY와 현재 쓰레드 아이디를 기반으로 값을 1 증가시켜준다 // LOCK KEY에 유효시간을 설정한다 // null 값을 리턴한다
-
return redis.call('pttl', KEYS[1]); // 위의 조건들이 모두 false 이면 현재 존재하는 LOCK KEY의 TTL 시간을 리턴한다
2. waitTime이 초과되었는지 확인한다
- lock 에 대한 점유시간이 아직 남아있다면 다시 lock에 대한 획득을 시도하기 이전에 waitTime(lock 획득 시간)이 초과되지는 않았는지 확인한다
- 만약 이미 초과되었다면 lock 획득은 실패로 리턴한다
// 2. 락 획득 대기 시간보다 초과되었는지 확인하고 초과되었으면 false를 리턴한다
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
3. 고유 Thread Id를 채널로 구독하여 lock이 available할때까지 대기한다
- CompleteFuture.get() 메서드를 호출하여 thread id로 구독한 채널로 lock 획득이 유효할때까지 대기한다
- 만약에 사용자가 설정한 waitTime을 초과할 경우 TimeoutException이 발생하여 lock 획득에 실패한다
- 여기서 중요한 점은 subscribe 내부에는 세마포어를 사용해서 공유자원에 대한 점유를 수행한다는 것이다
- 세마포어를 사용하여 공유자원을 점유하기 때문에 스핀락 보다는 레디스 I/O에 대한 부하를 줄일 수 있다
// 3. threadId를 채널로 구독하여 waitTime까지 대기한다.(block 처리)
// (만약 설정한 waitTime 보다 구독에 대한 응답이 없을 경우엔 TimeoutException이 발생하여 락 획득 결과를 false로 리턴한다)
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
- subscribe 내부 로직을 간단히 살펴보면 세마포어를 사용하는 것을 확인할 수 있다
public CompletableFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture<>();
semaphore.acquire(() -> {
if (newPromise.isDone()) {
semaphore.release();
return;
}
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
return;
}
newPromise.complete(r);
});
return;
}
E value = createEntry(newPromise);
value.acquire();
...
}
- 아래는 CompletableFuture.get 메소드에서 exception이 발생하는 케이스를 확인한 내용이다
/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
@SuppressWarnings("unchecked")
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long nanos = unit.toNanos(timeout);
Object r;
if ((r = result) == null)
r = timedGet(nanos);
return (T) reportGet(r);
}
redisson은 왜 세마포어를 사용했는가?
공식문서를 살펴 보면 세마포어를 사용한 이유를 파악할 수 있다. 레디스는 싱글 쓰레드로 동작하기 때문에 공유 자원에 대해서 쓰레드 세이프하게 동작하기 위해 동기화 매커니즘을 수행하기 위한 용도로 사용되었다고 설명하고 있다
공식문서 참고 : https://redisson.org/glossary/java-semaphore.html
4. waitTime 이전까지 무한루프를 수행하면서 lock 점유시간을 한번 더 확인한다
- lock을 획득하여 아직 점유 유효시간이 남아있는지 한번 더 체크한다
while (true) {
long currentTime = System.currentTimeMillis();
// 6. 락 획득을 시도한다
// (락 점유시간이 null이라면 획득이 가능하다고 판단하여 true를 리턴한다)
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
...
5. thread id로 구독한 객체로 유효시간 또는 남은시간까지 lock이 avaliable한지 구독한다
- ttl은 이전에 lock이 점유되어 남아있던 시간을 의미한다
- time은 시도할 수 있는 남은 시간을 의미한다
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
6. lock을 시도할 수 있는 시간이 남아있는지 체크한다
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
7. 그 이후에는 유효시간 동안 while문으로 4~6번 과정을 반복하게 된다
결론
- 처음엔 redisson은 spin lock 로직이 없이 내부적으로 pub/sub 구조만 가지고 있는줄 알았다
- 하지만 살펴보니 spin lock 개념은 완전히 걷어내지는 못했지만 그래도 lettuce로 spin lock을 구현하는 것보단 훨씬 적은 부하로 락을 획득할 수 있다고 보여진다