도입

동시성 제어는 여러 사용자가 동시에 시스템을 사용할 때 발생할 수 있는 데이터 무결성과 성능 문제를 해결하는 데 중요한 역할을 한다. 본 글은 Redis와 Redisson 라이브러리를 활용하여 분산 환경에서 발생하는 동시성 이슈를 해결하기 위한 방안을 탐구하고자 한다. 본 글에서는 사내에서 발생했던 동시성 문제와 비슷한 상황을 예시 코드로 재현하여 문제를 정의하고, 이를 해결하기 위한 방법을 설명한다.


이론적 배경

  1. 동시성 제어와 분산 락의 중요성
    • 동시성 이슈는 분산 환경에서 동일한 데이터에 여러 프로세스가 동시에 접근할 때 발생한다. 이를 해결하기 위해 락(lock) 메커니즘이 널리 사용된다.
    • 일반적인 락(mutex lock, spin lock 등)은 단일 시스템 내에서만 작동하며, 분산 락은 여러 시스템 간의 경쟁 상태를 관리한다.
      • 단일 시스템의 예로는 하나의 WAS(Tomcat)에서 실행되는 애플리케이션이 있고, 이 환경에서는 스레드 간의 동기화가 주로 필요하다.
      • 여러 시스템의 예로는 여러 개의 WAS(Tomcat) 인스턴스가 로드 밸런서를 통해 분산 처리하는 환경이 있으며, 이러한 경우 서버 간 데이터 동기화를 위한 분산 락이 필요하다.
  1. Redis와 Redisson
    • Redis는 인메모리 데이터베이스로 높은 처리 속도와 다양한 데이터 구조를 지원한다. 이를 활용한 분산 락 구현은 효율적인 동시성 제어를 제공한다.
    • Redisson은 Redis 클라이언트 라이브러리로, 분산 락을 포함한 다양한 동시성 제어 기능을 제공한다.

연구방법

1. 문제 정의

  • 챗봇 대화 서비스를 운영중이었고, 유저가 챗봇과 대화 시 대화에 필요한 서비스 내 재화가 소모 되는 방식이었다.
  • 서버 구조는 다음과 같았다:
    • 메시지큐를 이용해 채팅 서버로 메시지를 전달하고, 답장 서버에서 답장을 메시지큐를 통해 다시 수신하여 처리하는 구조였다.
    • 답장 서버는 8대가 운영 중이었고, 유저는 동시에 여러 챗봇과 대화가 가능했다.
    • 메시지큐에 동일한 유저가 여러 챗봇과의 대화를 진행하며 생성된 메시지가 각각 저장되고, 여러 서버에서 이러한 메시지를 동시에 처리하면서 동시성 문제가 발생하여, 유저가 소비해야 하는 재화가 올바르게 차감되지 않는 현상이 나타났다.
    • 이를 재현하기 위해 아래와 같은 코드를 작성하여 실험을 설계하였다.

2. 실험 설계

  • 문제를 재현하기 위해 아래 코드를 작성하여 실험을 설계하였다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public ResponseEntity<String> consumeResource(int userId) {
    Optional<User> userOpt = resourceRepository.findById(userId);
    try {
    Thread.sleep(100); // 로직 수행에 100ms가 걸린다고 가정하고, 100ms 지연
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }

    User user = userOpt.get();
    user.consumeResource(1);

    resourceRepository.save(user);
    log.info("User {} 자원 1 사용, 남은 자원: {}", userId, user.getRemainingResources());
    return ResponseEntity.ok("자원 소비. 남은 자원: " + user.getRemainingResources());
    }
    • 자원 소비 중 동시성 문제가 발생하는 현상을 확인하기 위한 예시 코드로, 여러 스레드에서 동일한 유저의 데이터를 동시에 처리할 경우 재화가 예상보다 적게 차감되는 동작을 확인하였다.
    • 이 코드는 단일 시스템에서 동작하는 코드로, 분산 락이 아닌 일반적인 락으로도 해결할 수 있는 상황이다. 하지만 실제 문제와 동일하게 여러 스레드나 서버가 하나의 자원에 동시 접근하는 상황을 재현할 수 있으므로, 실험 결과의 신뢰성에는 문제가 없을 것으로 보인다.

2.1. 실험 결과

  • wrk를 사용하여 동일 자원에 대해 다수의 요청을 보냈으며, 다음과 같은 결과가 확인되었다.
  • 테스트에 사용된 lua script
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    -- 요청 수 제한
    local max_requests = 10 -- 요청 수 제한
    local request_count = 0 -- 현재 요청 수

    request = function()
    if request_count >= max_requests then
    wrk.thread:stop() -- 요청 제한에 도달하면 쓰레드 중지
    end

    request_count = request_count + 1
    local user_id = 1
    local path = "/resources/" .. user_id .. "/consume"
    return wrk.format("POST", path)
    end
  • 실험 명령어:
    1
    wrk -t5 -c20 -d1s -s test_consume.lua http://localhost:8080
  • 실험 결과 요약:
    • 로그에서 동일 자원에 대해 여러 요청이 동시에 처리되며 자원의 남은 개수가 정확히 감소하지 않는 현상이 나타났다.
    • 이는 요청 간 자원 상태가 정확히 반영되지 않고 동일한 초기 상태로 처리된 결과이다.
    • 예를 들어, 로그에서 “User 1 자원 1 사용, 남은 자원: 9”가 반복 출력되었으며, 이후 일부 요청만 자원이 감소된 상태를 올바르게 반영하였다.
  • 문제 분석:
    • 요청이 짧은 시간에 집중적으로 발생하며, 트랜잭션 커밋이 완료되기 전에 다른 요청이 동일 자원의 상태를 조회하고 처리하였다.
    • 이러한 동시성 이슈는 예시 코드에서 자원 접근 시 락을 사용하지 않았기 때문에 발생한 것으로 보인다.
    • 결과적으로, 여러 요청이 자원을 중복해서 처리하며 데이터의 무결성이 훼손되었다.

3. 문제 해결

  • 분산 락 설계:

    • RedissontryLock 메서드를 활용하여 특정 리소스에 대한 락을 획득한다.
    • 락 획득에 실패한 요청은 대기하거나 재시도하도록 구현한다.
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      @PostMapping("{userId}/consume")
      public ResponseEntity<String> consumeResource(@PathVariable("userId") int userId) {
      String lockName = "user:" + userId + ":resource:lock";

      RLock lock = redissonClient.getLock(lockName);
      int remainingResources = 0;
      try {
      boolean isLocked = lock.tryLock(3, 10, TimeUnit.SECONDS);

      if (isLocked) {
      remainingResources = resourceService.consumeResource(userId);
      }
      } catch (RuntimeException | InterruptedException e) {
      throw new RuntimeException("Lock 획득 실패", e);
      } finally {
      if (lock.isHeldByCurrentThread()) { // 락 소유 여부 확인
      lock.unlock();
      }
      }

      return ResponseEntity.ok("자원 소비. 남은 자원: " + remainingResources);
      }
  • 락 해제 매커니즘:
    Redisson 내부적으로 락을 관리하고 해제하는 메커니즘은 Redis Pub/Sub 시스템과 AsyncSemaphore를 활용하여 다음과 같은 방식으로 동작한다:

    Redisson tryLock() Code
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 1. 최초 락 획득 시도
    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
    return true;
    } else {
    // 2. 타이머 갱신 및 대기
    time -= System.currentTimeMillis() - current;
    if (time <= 0L) {
    this.acquireFailed(waitTime, unit, threadId);
    return false;
    } else {
    current = System.currentTimeMillis();
    // 3. Pub/Sub 채널 구독
    CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

    try {
    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException var21) {
    if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
    subscribeFuture.whenComplete((res, ex) -> {
    if (ex == null) {
    this.unsubscribe(res, threadId);
    }

    });
    }

    this.acquireFailed(waitTime, unit, threadId);
    return false;
    } catch (ExecutionException var22) {
    ExecutionException e = var22;
    LOGGER.error(e.getMessage(), e);
    this.acquireFailed(waitTime, unit, threadId);
    return false;
    }

    try {
    time -= System.currentTimeMillis() - current;
    if (time <= 0L) {
    this.acquireFailed(waitTime, unit, threadId);
    boolean var25 = false;
    return var25;
    } else {
    boolean var16;
    do {
    long currentTime = System.currentTimeMillis();
    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
    var16 = true;
    return var16;
    }

    time -= System.currentTimeMillis() - currentTime;
    if (time <= 0L) {
    this.acquireFailed(waitTime, unit, threadId);
    var16 = false;
    return var16;
    }

    // 4. Latch 대기와 재시도
    currentTime = System.currentTimeMillis();
    if (ttl >= 0L && ttl < time) {
    ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
    ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    time -= System.currentTimeMillis() - currentTime;
    } while(time > 0L);

    this.acquireFailed(waitTime, unit, threadId);
    var16 = false;
    return var16;
    }
    } finally {
    // 5. 구독 해제
    this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
    }
    }
    }
    }

    1. 최초 락 획득 시도

    • tryAcquire 메서드를 통해 락 획득 가능 여부를 확인하며, null을 반환하면 현재 락을 바로 획득할 수 있음을 의미한다.
    • 락이 이미 선점된 경우, ttl(잠금의 남은 유효 시간)을 반환하여 대기할 시간을 계산한다.

    2. 타이머 갱신 및 대기

    • 대기 중 경과된 시간을 waitTime에서 차감하며 남은 시간이 0 이하일 경우 대기를 종료하고 false를 반환한다.

    3. Pub/Sub 채널 구독

    • 락 해제 이벤트를 감지하기 위해 Redis Pub/Sub 채널에 구독을 요청한다.
    • subscribeFuture를 통해 락이 해제되었거나 구독에 실패한 경우 작업이 비동기적으로 처리된다.

    4. latch 대기와 재시도

    • Pub/Sub 채널 구독이 성공한 상태에서 tryAcquire를 반복 호출하며 락 획득을 재시도한다.
    • TTL 값에 따라 latch 대기 시간이 설정되며, 다른 스레드에서 락이 해제되면 latch가 해제되어 작업이 이어진다.

    5. 구독 해제

    • 락을 획득하든 못 하든 메서드를 종료하기 전에 Pub/Sub 구독 리소스를 해제하여 시스템 자원을 정리한다.

3.1. Redisson 적용 후 실험 결과:

  • 2.1. 실험과 동일한 script와 wrk 명령어를 이용해 실험을 진행했으며, 다음과 같은 결과가 확인되었다.

  • 실험 결과 요약:

    • Redisson을 사용하여 분산 락을 적용한 결과, 모든 요청이 순차적으로 처리되었으며 데이터의 무결성이 유지되었다.
    • 로그에서 확인할 수 있듯이 동일한 자원에 대해 락이 적용되어 요청이 병렬적으로 처리되지 않고 순차적으로 처리되었다.
    • 예를 들어, “User 1 자원 1 사용, 남은 자원: 9”에서 시작하여 요청이 처리될 때마다 자원이 정확히 감소하는 모습을 확인할 수 있었다.
    • 실험 중에는 잘못된 자원 감소나 동시성 문제가 발생하지 않았다.
  • 문제 해결 확인:

    • Redisson의 분산 락을 적용함으로써 동시성 문제로 인한 데이터 무결성 훼손이 해결되었다.
    • 요청량이 많을 때에도 대기 상태로 처리되며 자원의 상태가 정확히 갱신되었다.

결과

  • 테스트 결과
    • Redisson 기반 분산 락을 적용한 후, 중복 트랜잭션 문제가 해결되었다.
    • 부하 테스트(wrk)에서 동일한 유저의 데이터를 처리할 때, 데이터 무결성이 유지됨을 확인하였다.
    • 이로써 분산 서버 환경에서 발생하던 자원 동시성 문제를 해결할 수 있었다.

논의

Pub/Sub 기반 락 메커니즘은 성능과 자원 활용면에서 유리하며, 이를 사용하는 Redisson을 사용하여 분산 환경에서도 효율적으로 동시성 제어가 가능하지만,
Redis를 따로 구축하여야 한다는 점, Redis의 장애 시 락 메커니즘이 정상적으로 동작하지 않을 수 있다는 점과 락 해제 실패 또는 락 대기 시간 초과 시 추가적 오류 처리가 필요하다는 한계점 등이 존재한다.
해당 한계점들을 개선하기 위해 이후에 Redis Cluster를 활용해 고가용성을 확보하여야하고, Redis 외에 기타 다른 도구(Zookeeper, Etcd 등)와의 성능 비교가 필요하다.

결론

본 글에서는 Redis와 Redisson을 활용한 분산 락 메커니즘을 통해 동시성 이슈를 해결하는 방법을 제시하였다.
실험 결과, 제안된 방법은 데이터 무결성을 효과적으로 유지하며, 부하 테스트에서도 우수한 성능을 보였다.
본 글이 분산 환경에서의 동시성 문제를 해결하려는 개발자들에게 실질적인 가이드라인을 제공하길 기대하며, 본 글에서 사용된 tryLock()의 내부 작동원리는 동시성 제어를 위한 Redisson tryLock 메서드의 작동 원리에서 확인 할 수 있다.

댓글 공유

  • page 1 of 1

Junggu Ji

author.bio


author.job

PrepScheduler

Note over Admin, Worker: ... 시간 경과 (T-0 min, 09:00) ...

Note over Publisher, MQ: [Phase 3] '발사' - 준비된 데이터를 신속하게 발행
activate Publisher
loop
Publisher->>OutboxTable: 7. 메시지 선점 (SELECT... FOR UPDATE SKIP LOCKED)
activate OutboxTable
OutboxTable-->>Publisher: Chunk of Messages
deactivate OutboxTable

Publisher->>MQ: 8. Publish to Message Queue
Publisher->>OutboxTable: 9. (In TX) UPDATE messages status to 'PUBLISHED'
end
deactivate Publisher

Note over Worker, DLQ: [Phase 4] 메시지 '소비', '검증', 그리고 '실패 처리'
activate Worker
Worker->>Redis: 10. GET user_setting (Final Check)
activate Redis
Redis-->>Worker: Setting is 'ON'
deactivate Redis

%% 알림 설정이 켜져 있을 경우
alt Setting is 'ON'
loop 3 Retries with Exponential Backoff
Worker->>PushSvc: 11. Send Push Request
activate PushSvc
PushSvc-->>Worker: 5xx Error (Temporary Failure)
deactivate PushSvc
end

%% 재시도 후에도 최종 실패 시
opt After Retries, Final Failure
Worker->>DLQ: 12. Send Message to DLQ
end

%% 알림 설정이 꺼져 있을 경우
else Setting is 'OFF'
Worker->>Worker: Drop Message (발송하지 않고 폐기)
end
deactivate Worker

5. 결론 및 논의 (Conclusion & Discussion)

본 글에서 제안한 비동기 메시지 기반 아키텍처는 push_outbox를 체크포인트로 활용하고 메시지 큐를 통해 작업을 분산함으로써 기존 시스템의 DB 병목 문제를 해결하고 안정적인 확장성의 기틀을 마련한다. 또한, 지수 백오프 기반 재시도, DLQ, 최종 검증(Final Check) 메커니즘을 통해 시스템의 견고성과 데이터 정확성을 보장한다.

현실적인 제약과 점진적인 아키텍처 진화

이 아키텍처의 모든 컴포넌트에 고가용성(HA)을 즉시 적용하는 것은 현실적으로 복잡성과 비용 부담이 클 수 있다. 따라서 우리는 ‘점진적인 아키텍처 진화(Architectural Evolution)’ 전략을 취해야 한다. push_outbox 테이블은 인프라 장애 시 데이터 유실을 방지하는 핵심적인 안전장치 역할을 하므로, 단일 노드 환경에서도 데이터 정합성과 회복 가능성을 확보하는 현실적인 트레이드오프를 제공한다. 이후 서비스 규모에 맞춰 Redis와 MQ를 클러스터로 전환하는 방식으로 시스템의 가용성을 점진적으로 높여나가는 것이 바람직하다.

미래 비전: 하이퍼스케일을 위한 스트림 처리 아키텍처

제안된 push_outbox 모델은 대부분의 환경에서 안정성과 성능을 보장하지만, 수천만 건 이상의 데이터를 1분 이내에 처리해야 하는 극한의 하이퍼스케일 환경에서는 push_outbox에 데이터를 쓰고 읽는 I/O 비용 자체가 새로운 병목이 될 수 있다.

최종적인 모델은 중간 저장소를 제거하고 ‘스트림 처리(Stream Processing)’ 로 전환하는 것이다. 이는 DB 조회 결과를 테이블에 저장하는 대신, 데이터베이스 커서(Cursor)를 이용해 Kafka와 같은 메시지 큐에 직접 스트리밍하는 방식을 의미한다. 이 모델은 중간 I/O 단계를 완전히 제거하여 최고의 성능을 달성하지만, 데이터 정합성과 실패 시 복구 로직을 DB 트랜잭션 대신 Kafka의 트랜잭션 기능과 Consumer Offset 관리에 의존해야 하므로 시스템의 복잡도가 극단적으로 높아진다.

결국, 이 시스템 설계의 핵심은 현재의 비즈니스 제약과 타협하되, 미래의 성장과 극한의 성능을 위한 확장 로드맵까지 명확히 확보하는 유연성에 있다. 본 글이 이상적인 설계와 현실적인 제약을 조율하며 시스템을 성숙시키는 과정에 대한 실질적인 가이드라인을 제공하길 기대한다.

댓글 공유

도입

동시성 제어는 여러 사용자가 동시에 시스템을 사용할 때 데이터 무결성을 유지하고 성능 저하를 방지하는 데 중요한 역할을 한다. 특히 분산 환경에서는 이러한 문제가 더욱 두드러지며, 이를 해결하기 위한 방법으로 Redis와 Redisson 라이브러리가 자주 사용된다. 본 포스팅에서는 동시성 제어를 위해 사용되는 Redisson의 tryLock 메서드가 어떻게 작동하는지 소개한다.
본 포스팅은 Redisson 3.41.0 버전을 기준으로 작성되었다.


이론적 배경

1. 동시성 제어의 필요성

  • 동시성 이슈는 여러 프로세스가 동시에 동일한 자원에 접근할 때 발생한다. 이를 해결하기 위해 락(lock) 메커니즘이 사용된다.
  • 단일 시스템에서는 일반적인 락으로 해결할 수 있지만, 분산 시스템에서는 분산 락이 필요하다.

2. Redis와 Redisson

  • Redis는 인메모리 데이터베이스로, 높은 성능과 다양한 데이터 구조를 제공한다.
  • Redisson은 Redis 기반의 클라이언트 라이브러리로, 분산 락 및 다양한 동시성 제어 기능을 지원한다.
  • Redisson을 이용한 실제적인 분산락 적용 방법은 이전 포스팅 동시성 이슈와 Redis( Redisson )를 이용한 해결방법에서 확인할 수 있다.

tryLock 메서드의 작동 원리

Redisson의 tryLock 메서드는 분산 락을 구현하는 데 핵심 역할을 한다. 아래는 주요 작동 원리와 코드에 대한 설명이며, 아래 코드는 Redisson 3.41.0 기준으로 동작한다. 특정 버전에서는 내부 구현이 달라질 수 있으니 공식 문서를 참고하기 바란다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1. 최초 락 획득 시도
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
// 2. 타이머 갱신
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
// 3. 락에 대한 pub/sub 채널 구독
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
this.unsubscribe(res, threadId);
}

});
}

this.acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException var22) {
ExecutionException e = var22;
LOGGER.error(e.getMessage(), e);
this.acquireFailed(waitTime, unit, threadId);
return false;
}

boolean var16;
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var25 = false;
return var25;
}

do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}

currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);

this.acquireFailed(waitTime, unit, threadId);
var16 = false;
} finally {
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}

return var16;
}
}
}

1. 최초 락 획득 시도

1
2
3
4
5
6
7
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
// ttl != null이면 락이 선점되어 있음을 의미
...
}
  • this.tryAcquire()는 락 획득 가능 여부를 확인한다.
    • 반환값이 null인 경우, 현재 락을 바로 획득할 수 있음을 의미한다.
    • 반환값이 null이 아닌 경우, 이미 락이 선점되었거나 대기가 필요함을 나타낸다.

2. 타이머 갱신

1
2
3
4
5
6
7
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
...
}
  • 남은 대기 시간을 갱신하며, 시간이 0 이하인 경우 락 획득을 포기한다.

3. 락에 대한 pub/sub 채널 구독

1
2
3
4
5
6
7
8
9
10
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
...
} catch (ExecutionException var22) {
...
}
  • subscribe() 는 Pub/Sub를 통해 락 이벤트를 감지할 준비를 한다.
  • subscribeFuture.get()을 호출해 설정된 시간 동안 구독이 성공하기를 기다린다.
    • 시간이 초과되면(TimeoutException) 구독을 종료하고 false를 반환한다.
    • ExecutionException이 발생하면 에러를 로깅하고 false를 반환한다.
1
2
3
protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
return this.pubSub.subscribe(this.getEntryName(), this.getChannelName());
}
  • threadId는 구독 채널 생성에 직접 사용되지 않고, 락 이름(lock name)과 채널 ID를 기반으로 Pub/Sub 채널이 생성된다.
  • 이 과정은 락 해제 이벤트를 감지하기 위해 필수적이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public CompletableFuture<E> subscribe(String entryName, String channelName) {
// a. Semaphore 생성 및 acquire 호출
AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture();
semaphore.acquire().thenAccept((c) -> {
if (newPromise.isDone()) {
semaphore.release();
} else {
// b. Pub/Sub 채널 확인 및 생성
E entry = (PubSubEntry)this.entries.get(entryName);
if (entry != null) {
// (1) 기존 엔트리가 있는 경우
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
} else {
newPromise.complete(r);
}
});
} else {
// (2) 새로운 엔트리를 생성
E value = this.createEntry(newPromise);
value.acquire();
E oldValue = (PubSubEntry)this.entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
} else {
newPromise.complete(r);
}
});
} else {
// (3) Redis Pub/Sub 연결 로직 수행
RedisPubSubListener<Object> listener = this.createListener(channelName, value);
CompletableFuture<PubSubConnectionEntry> s = this.service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, new RedisPubSubListener[]{listener});
newPromise.whenComplete((r, e) -> {
if (e != null) {
s.completeExceptionally(e);
}

});
// (4) Pub/Sub 구독 상태 관리 및 실패 처리
s.whenComplete((r, e) -> {
if (e != null) {
this.entries.remove(entryName);
value.getPromise().completeExceptionally(e);
} else {
if (!value.getPromise().complete(value) && value.getPromise().isCompletedExceptionally()) {
this.entries.remove(entryName);
}

}
});
}
}
}
});
return newPromise;
}

a. Semaphore 생성 및 acquire 호출

1
2
3
4
5
6
7
8
9
10
AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture();
semaphore.acquire().thenAccept((c) -> {
if (newPromise.isDone()) {
semaphore.release();
} else {
...
}
...
});
  • getSemaphore()를 통해 해당 채널 이름에 대한 비동기 세마포어를 가져온다.
    • 여러 스레드나 인스턴스에서 동일한 채널에 구독 요청이 들어올 수 있으므로, 세마포어를 사용해 동시성을 제어한다.
  • acquire()를 호출하여 락 획득을 시도한다.
    • 이 작업은 비동기로 수행되며, 락 획득이 완료되면 thenAccept 블록이 실행된다.
    • 이 때 획득 시도하는 락은 채널 구독을 위한 락이다.
    • 구독 요청은 AsyncSemaphore 내부의 큐(FastRemovalQueue)를 통해 순서대로 처리된다.
  • 작업 완료 상태 확인 후 락 해제:
    • 만약 newPromise.isDone() 상태라면, 작업이 이미 완료된 것이므로 락을 해제한다.
    • 이는 동일한 채널을 구독 중인 다른 스레드가 작업을 이미 완료했음을 감지하기 위함이다.

b. Pub/Sub 채널 확인 및 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
E entry = (PubSubEntry)this.entries.get(entryName);
if (entry != null) {
// (1) 기존 엔트리가 있는 경우
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
} else {
newPromise.complete(r);
}
});
} else {
// (2) 새로운 엔트리를 생성
E value = this.createEntry(newPromise);
value.acquire();
E oldValue = (PubSubEntry)this.entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
} else {
newPromise.complete(r);
}
});
} else {
// (3) Redis Pub/Sub 연결 로직 수행
RedisPubSubListener<Object> listener = this.createListener(channelName, value);
CompletableFuture<PubSubConnectionEntry> s = this.service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, new RedisPubSubListener[]{listener});
newPromise.whenComplete((r, e) -> {
if (e != null) {
s.completeExceptionally(e);
}

});
// (4) Pub/Sub 구독 상태 관리 및 실패 처리
s.whenComplete((r, e) -> {
if (e != null) {
this.entries.remove(entryName);
value.getPromise().completeExceptionally(e);
} else {
if (!value.getPromise().complete(value) && value.getPromise().isCompletedExceptionally()) {
this.entries.remove(entryName);
}

}
});
}
}
  • lock key(entryName)로 Pub/Sub 채널을 확인하거나 생성한다.
    • entries 맵에서 entryName에 해당하는 채널을 가져오고, 이미 존재하면 재사용한다.
    • 채널이 없을 경우 새로 생성해 등록한다.

(1) 기존 채널이 있는 경우:

1
2
3
4
5
6
7
8
9
entry.acquire();
semaphore.release();
entry.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
} else {
newPromise.complete(r);
}
});
  • entry.acquire()를 호출해 채널을 재사용한다.
  • 이후 semaphore.release()로 채널 구독을 위한 락을 해제한다.
  • 작업 완료 시 entry.getPromise()로 결과를 처리한다.

(2) 새 채널을 생성하는 경우:

1
2
3
4
5
6
7
8
9
10
11
12
13
E value = this.createEntry(newPromise);
value.acquire();
E oldValue = (PubSubEntry)this.entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().whenComplete((r, e) -> {
if (e != null) {
newPromise.completeExceptionally(e);
} else {
newPromise.complete(r);
}
});
  • createEntry(newPromise)로 새로운 엔트리를 생성한다.
  • putIfAbsent(entryName, value)를 통해 이미 생성된 엔트리가 있는지 확인한다.
  • 기존 엔트리가 있으면 이를 사용하고, 없으면 새로 생성한 엔트리를 등록한다.

(3) Redis Pub/Sub 채널 연결:

1
2
3
4
5
6
7
8
RedisPubSubListener<Object> listener = this.createListener(channelName, value);
CompletableFuture<PubSubConnectionEntry> s = this.service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, new RedisPubSubListener[]{listener});
newPromise.whenComplete((r, e) -> {
if (e != null) {
s.completeExceptionally(e);
}

});
  • createListener(channelName, value)를 통해 실제 Redis Pub/Sub 채널을 구독한다.
  • subscribeNoTimeout()로 비동기적으로 연결을 시도한다.

(4) Pub/Sub 구독 상태 관리 및 실패 처리:

1
2
3
4
5
6
7
8
9
10
11
s.whenComplete((r, e) -> {
if (e != null) {
this.entries.remove(entryName);
value.getPromise().completeExceptionally(e);
} else {
if (!value.getPromise().complete(value) && value.getPromise().isCompletedExceptionally()) {
this.entries.remove(entryName);
}

}
});
  • 구독 실패 처리:
    • e != null 조건은 Pub/Sub 구독 중 예외가 발생했음을 의미한다.
    • 실패한 경우, 다음 작업을 수행한다:
      • this.entries.remove(entryName): 실패한 구독에 해당하는 entryNameentries 맵에서 제거한다.
      • value.getPromise().completeExceptionally(e): 예외를 호출자에게 전달하여 구독 실패 원인을 알린다.
  • 구독 성공 상태 관리:
    • 구독이 성공적으로 완료되었더라도, 추가적인 상태 확인을 수행한다:
    • value.getPromise().complete(value): valuePromise로 완료하고 성공 여부를 반환한다.
    • Promise가 이미 완료되었거나(isCompletedExceptionally로 확인) 정상적으로 완료되지 않은 경우:
      • this.entries.remove(entryName): 일관성을 유지하기 위해 해당 엔트리를 제거한다.
  • 상태 확인의 목적:
    • Redis Pub/Sub 구독 실패 및 이상 상태를 감지하고, 관련 자원을 정리하여 시스템 일관성을 보장한다.
    • 실패나 이상 상태 시 적절한 정리 작업이 이루어지도록 설계되어 있다.

c. 락에 대한 Pub/Sub 채널 구독 정리

1. Semaphore 생성 및 acquire 호출

  • AsyncSemaphore는 채널 단위의 subscribe 작업에 대한 동시성을 제어한다.
  • acquire() 호출 시, 내부적으로 대기 큐가 생성되어 락이 해제될 때 대기 중인 작업이 순차적으로 처리된다.
  • 이를 통해 여러 스레드가 동일한 채널에 대해 동시에 subscribe 요청을 보내더라도 안전하게 작업이 처리된다.
  • 작업 완료 상태(newPromise.isDone()) 확인 후, 이미 완료된 작업에 대한 락을 해제한다.

2. Pub/Sub 채널 확인 및 생성

  • entries 맵에서 entryName에 해당하는 채널을 가져오고, 이미 존재하면 이를 재사용한다.
  • 존재하지 않을 경우:
    • createEntry(newPromise)로 새로운 엔트리를 생성하여 등록한다.
    • putIfAbsent(entryName, value)를 통해 중복 등록을 방지하고, 기존 엔트리가 있으면 이를 재사용한다.
    • 기존 엔트리와 새 엔트리는 동일한 처리 로직으로 이어지며, 작업이 완료되면 결과를 호출자에게 전달한다.

3. Redis Pub/Sub 채널 연결

  • 새로운 엔트리에 대해 createListener(channelName, value)를 통해 Redis Pub/Sub 채널을 구독한다.
  • subscribeNoTimeout() 를 사용하여 비동기적으로 구독을 시도한다.
  • 구독 중 예외 발생 시, 구독 결과를 처리하는 CompletableFuture에 예외 상태를 전달한다.

4. Pub/Sub 구독 상태 관리 및 실패 처리

  • Pub/Sub 구독이 성공했는지 여부를 확인하고, 실패 또는 이상 상태가 발생하면 관련 자원을 정리한다:
    • e != null(구독 실패)일 경우:
      • entries 맵에서 해당 entryName을 제거한다.
      • value.getPromise().completeExceptionally(e)로 구독 실패 원인을 호출자에게 전달한다.
    • 구독이 성공했더라도:
      • value.getPromise().complete(value) 호출로 성공 여부를 확인한다.
      • 성공하지 못했거나(isCompletedExceptionally) 예외 상태라면, entries에서 해당 엔트리를 제거한다.
  • 이를 통해 시스템의 데이터 일관성을 유지하고, 자원 누수를 방지한다.

결과적으로

  • Semaphore를 사용해 subscribe 요청의 동시성을 관리한다.
  • Pub/Sub 채널의 상태를 확인하거나 생성하고, Redis 채널과 비동기적으로 연결한다.
  • 구독 성공 및 실패 상태를 처리하고, 필요시 자원을 정리하여 시스템의 안정성을 보장한다.

여기까지가 락 이벤트를 감지하기 위한 Pub/Sub 채널에 구독하기 위한 로직이다.

4. 락 획득 재시도 및 Latch 대기 로직

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}

currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);

this.acquireFailed(waitTime, unit, threadId);
var16 = false;

a. 락 획득 재시도

  • Pub/Sub 채널 구독이 성공한 상태에서 다시 tryAcquire를 반복 호출해 락 획득을 재시도한다.
  • ttl == null일 경우, 즉시 락을 획득할 수 있으므로 true를 반환하며 종료한다.

b. 남은 시간 갱신

  • 락 획득을 재시도하기 전에 현재 시간을 기준으로 남은 대기 시간을 갱신한다(time -= ...).
  • 남은 대기 시간이 없으면(time <= 0), acquireFailed() 를 호출해 실패 처리를 진행한 후 false를 반환한다.

c. Latch 대기

  • ttl 값에 따라 Latch를 사용해 일정 시간 대기한다.
    • ttl >= 0인 경우: 락 해제까지의 남은 시간 동안 대기한다.
    • ttl < 0 또는 현재 남은 시간 이하인 경우: 설정된 대기 시간까지 대기한다.
    • Latch다른 스레드/프로세스가 락을 해제하거나 갱신하는 이벤트가 발생하면 해제되는 동기화 장치로 사용되며, Redisson에서는 Semaphore를 Latch를 사용한다.

d. 반복 조건

  • do-while 루프를 통해 남은 대기 시간이 소진될 때까지 락 획득을 반복 시도한다.
  • 대기 시간이 소진되면 acquireFailed()를 호출하고 false를 반환하며 종료한다.

5. 구독 해제

1
2
3
} finally {
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
  • Pub/Sub 채널을 구독했으므로, 메서드를 종료하기 전에 반드시 unsubscribe 메서드를 호출해 리소스를 해제한다.
  • 이는 락 획득 여부와 관계없이 실행되며, Pub/Sub 구독을 통해 사용된 자원을 정리하는 역할을 한다.

tryLock()의 장점 및 한계

장점

  1. 효율적인 이벤트 기반 처리

    • Pub/Sub 메커니즘을 활용하여 락 해제 이벤트를 감지함으로써, 불필요한 대기 시간을 최소화한다.
    • 락이 해제될 가능성이 있는 경우 반복적으로 대기하거나 재시도하지 않고 이벤트 기반으로 처리하므로 자원을 효율적으로 사용한다.
  2. 정교한 타이머 관리

    • 남은 대기 시간을 지속적으로 갱신하며, 설정된 제한 시간 내에 락 획득 여부를 결정한다.
    • 대기 시간이 초과될 경우 즉시 실패 처리를 진행하므로, 클라이언트 응답 지연을 방지한다.
  3. 안정적인 자원 정리

    • 락 획득 여부와 관계없이 finally 블록을 통해 모든 구독 리소스를 해제하여 시스템 자원의 안정성을 유지한다.
  4. Latch를 활용한 동기화

    • 락 획득 시도가 실패하더라도, Latch를 통해 TTL 동안 대기하며 락 해제 가능성을 최대한 활용한다.

한계

  1. 락 획득 시도와 Pub/Sub의 비동기 처리 간의 간격

    • 락 획득 재시도와 Pub/Sub 이벤트 처리 사이의 간격에서 레이스 컨디션이 발생할 가능성이 있다. 락이 해제된 후 새로 락이 설정되는 순간에 간격이 생길 수 있으므로, 타이밍에 민감한 경우 신중한 설계가 필요하다.

      레이스 컨디션 발생 원인
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      do {
      long currentTime = System.currentTimeMillis();
      ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
      if (ttl == null) {
      var16 = true;
      return var16;
      }

      time -= System.currentTimeMillis() - currentTime;
      if (time <= 0L) {
      this.acquireFailed(waitTime, unit, threadId);
      var16 = false;
      return var16;
      }

      currentTime = System.currentTimeMillis();
      if (ttl >= 0L && ttl < time) {
      ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
      ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
      }

      time -= System.currentTimeMillis() - currentTime;
      } while(time > 0L);

      a. 첫 번째 tryAcquire 호출

      1
      ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
      • 현재 스레드에서 락을 획득하려 시도하지만, 락이 이미 다른 스레드에 의해 점유된 상태라 TTL(남은 유효 시간)을 반환받는다.
      • 이 시점에서 현재 스레드는 락 해제 이벤트를 기다릴 준비한다.

      b. 락 반납 발생

      • 락을 점유하고 있던 스레드(또는 프로세스)가 락을 반납한다.
      • 이 시점에서 락은 잠깐 동안 해제된 상태가 된다.

      c. 다른 스레드가 락을 채감

      • 락 해제 이벤트가 Pub/Sub를 통해 현재 스레드에 전달되기 전에, 다른 스레드가 tryAcquire()를 호출하여 락을 빠르게 획득한다.
      • 이로 인해 현재 스레드는 락 해제 이벤트를 감지하지 못하게 된다.

      d. 현재 스레드의 Pub/Sub 대기

      1
      2
      ((RedissonLockEntry) this.commandExecutor.getNow(subscribeFuture))
      .getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      • 현재 스레드는 여전히 락 해제 이벤트를 기다리지만, 락은 이미 다른 스레드에 의해 점유된 상태이다.
      • 이로 인해 현재 스레드는 대기 시간을 낭비하거나, 최종적으로 time이 초과되어 락 획득에 실패할 수 있다.
  1. Pub/Sub 메시지 손실 가능성

    • Pub/Sub 시스템 특성상 구독자와 퍼블리셔 간 네트워크 이슈가 발생하면 락 해제 메시지를 놓칠 가능성이 있다. 이 경우 락을 획득하지 못하고 대기 시간을 낭비할 위험이 있다.
  2. 복잡한 로직으로 인한 디버깅 난이도

    • tryLock()의 로직이 복잡하여, 장애 발생 시 원인을 파악하거나 디버깅하는 데 시간이 소요될 수 있다.

결론

Redisson의 tryLock()은 분산 환경에서 동시성 제어를 효과적으로 구현하기 위해 설계된 복잡한 로직을 제공한다.

tryLock()은 동시성 제어에서 효율성과 안정성을 동시에 추구하는 설계로, 분산 환경에서의 락 처리 문제를 효과적으로 해결한다. 그러나 복잡한 로직 구조와 Pub/Sub 시스템의 특성을 고려하여 사용 환경에 적합한 설정을 적용하고, 잠재적인 한계를 관리할 수 있는 보완책을 함께 설계해야 한다.

본 글이 tryLock() 내부 로직에 대한 깊은 이해를 돕고, 동시성 제어가 필요한 시스템 설계 시 참고 자료가 되길 바라며, 실제적으로 Redisson을 활용하여 분산락을 적용하고자 하는 사용자는 이전 글 동시성 이슈와 Redis( Redisson )를 이용한 해결방법을 참고하길 바란다.

댓글 공유

Junggu Ji

author.bio


author.job