실전 프로젝트 트래픽 처리 1
이번 프로젝트에서 트래픽 처리와 관련해서는 대략적인 설계 과정에 참여만 하고 실질적으로 구현은 다른 팀원들이 담당했다. 따라서 트래픽처리와 관련된 구체적인 코드를 살펴보고 싶다고 생각해서 프로젝트 코드들을 개인적으로 체크해보는 시간을 가져보고자 한다.
쿠폰 발급과 관련된 트래픽을 처리를 위한 세 가지의 서버를 구성하였다. 쿠폰 서버, 대기열 서버, 스케쥴링 서버다. 대기열 서버의 구성 방식에 대해서는 다양한 논의가 오갔지만 레디스를 활용하여 요청들을 저장하고 스케쥴링 서버가 해당 레디스에서 요청들을 일정한 개수를 가져와 쿠폰 서버에 발급을 요청하도록 하는 것이 가장 안정적이라고 결론을 내릴 수 있었다. 단순히 레디스의 분산락을 이용하여 트래픽 처리를 하게 된다면 서비스 요구사항인 순서보장이 지켜지지 않기 때문에 추가적으로 카프카를 도입하여 요청들은 들어온 순서대로 대기열에 저장하고 이를 빼내와 카프카로 쿠폰서버에 보내는 방식으로 구현하였다.
대기열 서버
@RestController
@RequestMapping("/events")
public class WaitingController {
private final WaitingService waitingService;
public WaitingController(WaitingService waitingService) {
this.waitingService = waitingService;
}
@PostMapping("/{eventId}")
public ResponseEntity<ResponseMessageDto> joinEvent(@PathVariable("eventId") Long eventId,
@RequestBody EventJoinRequestDto requestDto) {
ResponseMessageDto responseDto = waitingService.addWaiting(eventId, requestDto.getMemberId());
return ResponseEntity.status(HttpStatus.OK.value()).body(responseDto);
}
}
"/events/{eventId}" 엔드포인트에 대한 POST 요청 처리
-> 요청된 eventId와 EventJoinRequestDto를 사용하여 waitingService를 호출하고, 그 결과를 클라이언트에게 응답으로 반환
@Service
public class WaitingService {
private final GlobalVariables globalVariables;
private final WaitingRepository waitingRepository;
private final KafkaProducer kafkaProducer;
private final RedissonClient redisson;
public WaitingService(GlobalVariables globalVariables,
WaitingRepository waitingRepository,
KafkaProducer kafkaProducer,
RedissonClient redissonClient) {
this.globalVariables = globalVariables;
this.waitingRepository = waitingRepository;
this.kafkaProducer = kafkaProducer;
this.redisson = redissonClient;
}
public ResponseMessageDto addWaiting(Long eventId, Long memberId) {
RLock lock = redisson.getLock("waiting-lock");
try {
boolean isLocked = lock.tryLock(5000, 1000, TimeUnit.MILLISECONDS);
if (!isLocked) {
throw new Exception("대기 시간 초과");
}
// 크리티컬 섹션 - 락으로 보호되어야 하는 코드
waitingValidate(eventId, memberId);
waitingRepository.add(eventId, memberId);
kafkaProducer.send(eventId, memberId);
lock.unlock();
return new ResponseMessageDto("Success");
} catch (Exception e) {
return new ResponseMessageDto(e.getMessage());
}
}
public void waitingValidate(Long eventId, Long memberId) throws Exception {
Long maxMember = globalVariables.getEventMaxMember();
Long currentSize = waitingRepository.size(eventId);
if (currentSize >= maxMember) {
throw new Exception("정원이 초과되어 이벤트에 참여할 수 없습니다.");
}
if (waitingRepository.isMember(eventId, memberId)) {
throw new Exception("이미 참여한 이벤트 입니다.");
}
}
}
public ResponseMessageDto addWaiting(Long eventId, Long memberId)
이벤트에 대한 대기 목록에 참여하는 로직을 처리하는 메서드.
Long eventId와 Long memberId는 해당 이벤트와 회원을 식별.
RLock lock = redisson.getLock("waiting-lock")
Redisson 라이브러리를 사용하여 "waiting-lock"이라는 이름의 분산 락을 사용
boolean isLocked = lock.tryLock(5000, 1000, TimeUnit.MILLISECONDS)
최대 5000 밀리초(5초) 동안 락 획득 시도
1000 밀리초(1초)마다 획득 재시도
락을 얻지 못하면 isLocked 변수에 false가 할당
if (!isLocked) { throw new Exception("대기 시간 초과"); }
락을 얻지 못했을 경우, 예외를 던짐
public void waitingValidate(eventId, memberId)
크리티컬 섹션에 해당하는 부분으로, 락으로 보호되어야 하는 코드
waitingValidate 메서드를 호출하여 대기 목록에 대한 유효성을 검사
waitingRepository.add(eventId, memberId)
대기 목록에 추가로 들어온 요청을 추가합니다.
->이벤트에 대한 대기 목록을 관리하고, Redis 락을 사용하여 동시성 문제 해결. 또 Kafka를 사용하여 이벤트 참여 정보를 전송
@Repository
public class WaitingRepository {
private final RedisTemplate<String, String> redisTemplate;
@Value("${redis.key.event}")
private String key;
public WaitingRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void add(Long eventId, Long memberId) {
SetOperations<String, String> opsForSet = redisTemplate.opsForSet();
opsForSet.add(eventId.toString(), memberId.toString());
}
public Long size(Long eventId) {
return redisTemplate.opsForSet().size(eventId.toString());
}
public boolean isMember(Long eventId, Long memberId) {
return Boolean.TRUE.equals(redisTemplate.opsForSet().isMember(eventId.toString(), memberId.toString()));
}
}
private final RedisTemplate<String, String> redisTemplate
Redis와 상호 작용하기 위한 RedisTemplate 객체
RedisTemplate은 Redis에 대한 기본적인 액세스를 제공
@Value("${redis.key.event}")
redis.key.event 프로퍼티를 가져와서 key 변수에 할당
이 프로퍼티는 Redis에서 이벤트 관련 데이터를 저장하기 위한 키
public WaitingRepository(RedisTemplate<String, String> redisTemplate)
WaitingRepository의 생성자, redisTemplate을 초기화.
public void add(Long eventId, Long memberId)
이 메서드는 대기 목록에 새로운 참여자를 추가
SetOperations을 사용하여 Redis의 Set 자료구조에 이벤트 ID와 회원 ID를 추가.
public Long size(Long eventId):
이 메서드는 특정 이벤트의 대기 목록 크기를 반환.
redisTemplate.opsForSet().size(eventId.toString()) 는 Redis Set의 크기를 반환.
public boolean isMember(Long eventId, Long memberId)
이 메서드는 특정 이벤트에 특정 회원이 속해 있는지 여부를 확인
redisTemplate.opsForSet().isMember(eventId.toString(), memberId.toString()) 는 해당 회원이 이벤트의 대기 목록에 속해 있는지 여부를 반환.
스케쥴링 서버
@RequiredArgsConstructor
@Repository
public class RedisRepository {
private final RedisTemplate<String, String> redisTemplate;
public Set<String> zPopMin(String eventId, long count) {
Set<ZSetOperations.TypedTuple<String>> result = redisTemplate.opsForZSet().popMin(eventId, count);
if (result != null) {
Set<String> castedResult = result.stream()
.map(typedTuple -> typedTuple.getValue())
.collect(Collectors.toSet());
return castedResult;
}
return null;
}
public Set<String> zRange(String eventId, long count) {
return redisTemplate.opsForZSet().range(eventId,0,count - 1);
}
}
private final RedisTemplate<String, String> redisTemplate
Redis와 상호 작용하기 위한 RedisTemplate 객체.
RedisTemplate은 Redis에 대한 기본적인 액세스를 제공.
public Set<String> zPopMin(String eventId, long count)
Sorted Set에서 score가 가장 낮은 멤버들을 최대 count 개 가져오는 메서드.
redisTemplate.opsForZSet().popMin(eventId, count)를 호출하여 Sorted Set에서 데이터를 가져온다
결과는 Set<ZSetOperations.TypedTuple<String>> 형태로 반환된다.
if (result != null) { ... }
결과가 null이 아면 아래의 작업을 수행
Set<String> castedResult = result.stream()...:
결과를 스트림으로 변환하고, 각 TypedTuple에서 값을 추출하여 Set 형태로 변환
return castedResult
최종적으로 변환된 Set을 반환
return null
결과가 null이면 null을 반환
public Set<String> zRange(String eventId, long count)
Sorted Set에서 인덱스 범위 내의 멤버들을 가져오는 메서드
redisTemplate.opsForZSet().range(eventId, 0, count - 1)를 호출하여 데이터를 가져온다.
eventId: Sorted Set의 이름이며, 이벤트를 식별하는 고유한 키
0: 시작 인덱스. Sorted Set의 가장 낮은 순위에 해당하는 멤버부터 가져올 것을 지정. 여기서는 가장 낮은 순위인 첫 번째 멤버를 의미
count - 1: 가져올 멤버의 개수. count는 가져올 멤버의 최대 개수를 지정하는데, 여기서는 1을 빼는 이유는 인덱스는 0부터 시작하기 때문. 예를 들어, count가 5이면 실제로는 6개의 멤버가 반환. 즉, 이 메서드는 Sorted Set에서 특정 범위의 멤버를 가져온다. 시작 인덱스부터 count 개수만큼의 멤버를 가져와 Set 형태로 반환한다.
-> Redis의 Sorted Set에서 데이터를 가져오거나 조작하는 기능을 제공. Sorted Set은 멤버와 각 멤버의 순위(score)를 관리하는 Redis의 자료구조
@Slf4j
@RequiredArgsConstructor
@Service
public class JobDetails {
private final KafkaProducer kafkaProducer;
private final ObjectMapper objectMapper;
private final RedisRepository redisRepository;
public void transferWinner() {
try {
Set<String> strings = redisRepository.zPopMin(String.valueOf(1L), 10L);
for (String string : strings) {
log.info("zPopMin = {}", string);
RequestCouponMessage requestCouponMessage = new RequestCouponMessage(1L, Long.parseLong(string));
kafkaProducer.sendEventMessage("coupon-topic", objectMapper.writeValueAsString(requestCouponMessage));
}
} catch (Exception e) {
log.error("An error occurred during transferWinner: {}", e.getMessage(), e);
}
}
}
KafkaProducer kafkaProducer: Kafka 프로듀서를 관리하는 클래스의 인스턴스
ObjectMapper objectMapper: JSON 데이터를 객체로 변환하거나, 객체를 JSON으로 변환하는데 사용
RedisRepository redisRepository: Redis와 상호 작용하는 클래스의 인스턴스
public void transferWinner()
이 메서드는 대기열의 순차적인 요청 정보를 처리하고 Kafka를 통해 해당 정보를 전송한다.
Set<String> strings = redisRepository.zPopMin(String.valueOf(1L), 10L)
Redis의 Sorted Set에서 score가 가장 낮은 멤버들을 최대 10개 가져온다.
여기서 1L은 Sorted Set의 key를 나타내며, 10L은 redisRepository에서 최대로 가져올 멤버 수이다
for (String string : strings) { ... }
가져온 각각의 멤버에 대해서 아래의 로직을 수행한다.
RequestCouponMessage requestCouponMessage = new RequestCouponMessage(1L, Long.parseLong(string))
RequestCouponMessage 객체를 생성한다. 여기서 1L은 이벤트 ID를, Long.parseLong(string)은 멤버 ID를 나타낸다.
kafkaProducer.sendEventMessage("coupon-topic", objectMapper.writeValueAsString(requestCouponMessage))
Kafka 프로듀서를 사용하여 생성한 requestCouponMessage를 "coupon-topic" 토픽으로 전송한다
-> Redis의 Sorted Set에서 순차적인 요청 정보를 가져와 Kafka를 통해 해당 요청을 구폰 서버에 전송하는 기능을 수행.
@Slf4j
@RequiredArgsConstructor
@Component
@EnableSchedulerLock(defaultLockAtMostFor = "PT1S")
public class EventScheduleJob {
private final JobDetails jobDetails;
@Scheduled(cron = "0/1 * * * * *")
@SchedulerLock(name = "event-scheduler", lockAtMostFor = "PT1S", lockAtLeastFor = "PT1S")
public void eventSchedule() throws JsonProcessingException {
log.info("scheduler running");
jobDetails.transferWinner();
}
}
private final JobDetails jobDetails
JobDetails 클래스의 인스턴스인 jobDetails. 이를 통해 EventScheduleJob 클래스는 JobDetails의 기능을 사용.
@Scheduled(cron = "0/1 * * * * *")
스케줄링 작업을 정의. 여기서는 Cron 표현식을 사용하여 1초마다 실행되도록 설정되어 있다. "0/1 * * * * *"는 초(0-59)에 대한 wildcard를 사용하여 매 초마다 작업을 수행하도록 지정.
@SchedulerLock(name = "event-scheduler", lockAtMostFor = "PT1S", lockAtLeastFor = "PT1S")
스케줄링 작업에 대한 락을 설정.
name = "event-scheduler"는 이 락의 이름을 "event-scheduler"로 지정.
lockAtMostFor = "PT1S"는 최대 1초 동안 락을 소유할 수 있도록 설정.
lockAtLeastFor = "PT1S"는 최소 1초 동안 락을 유지해야 함을 나타냄.
public void eventSchedule() throws JsonProcessingException
eventSchedule 메서드는 스케줄링 작업의 진입점. 매 초마다 실행.
jobDetails.transferWinner()
jobDetails를 통해 transferWinner 메서드를 호출. 이는 JobDetails 클래스에서 정의된 메서드를 실행하여 승자 정보를 처리하고 Kafka를 통해 해당 정보를 전송하는 작업을 수행.
->스케줄링 작업을 정의하고 매 초마다 실행되도록 설정. 스케줄링 작업 내부에서는 JobDetails를 통해 실제 작업을 수행
가장 주의깊게 살펴본 포인트
1. Sorted Set을 사용한 Redis
Sorted Set은 중복된 값이 없으며 원소들의 순서가 지정된 집합을 의미한다. 이벤트 트래픽 처리에서 정해진 수만큼 Sorted Set에서 대기열에 들어온 유저들을 뽑아내어 쿠폰을 발급해주는데 이 때, Set 내에서의 순서는 보장되지 않는다. 즉 1등부터 50등까지의 참여자에게 쿠폰을 발급하더라도 그 안에서의 순서보장은 제대로 되지 않을 수 있다. 선착순이라는 순서가 보장되어야 하는 요구사항 특성상 사실 좀 헷갈렸던 부분인데, 어쨌든 특정 개수의 이벤트 쿠폰이 존재하고 이 개수가 초과되면 더 이상 발급이 불가능한 조건이므로 해당 개수까지의 순서는 유지하되 이들을 순차적으로 발급해주는 쿠폰 매커니즘 안에서는 순서보장이 필요없을 것이라고 생각했다. 만약 순위, 랭킹과 같은 그 순서가 매우 민감한 주제라면 이러한 방식은 사용되어선 안 되겠지만, 정해진 순서 내에 들어온 사람들에게는 쿠폰을 발급해주기 때문에 이러한 방식의 사용도 괜찮다고 생각했다.
다만 다른 팀원의 생각은 레디스 내에서도 순서보장이 되는 게 좀 더 요구사항에 맞지 않을까라는 의견을 제시하였다. 주어진 수량 내에 요청이 들어가도록 끊어도 누락이 발생하게 되면 이 후 레디스에서 꺼내오는 요청의 순서가 보장되어야 해당 누락된 쿠폰을 다시 순서대로 배정할 수 있기 때문에라는 근거를 들어 zpop에서 변화가 필요하다고 얘기하였는데, 어느 정도 일리가 있다 생각하여 해당 의견을 따르기로 했다. 최종적으로는 ZRANGE(O(log(N) + M)$ 명령어로 일정 개수만큼 순서가 보장된 데이터를 조회 후 쿠폰 발급 서버로 전달한 후, ZREMRANGEBYRANK(O(log(N) + M))명령어로 일정 개수만큼 삭제시키는 로직으로 변경하였다.
2. 스케쥴링 서버 분리와 스케쥴러 락
스케쥴링 서버 분리 이유 - 대기열 서버는 높은 트래픽을 처리해야하는 서버로서, 요청을 안정적으로 저장하고 관리하는 역할을 수행해야 한다. 따라서, 부가적인 작업을 최소화하고, 주 업무에만 집중하는 것이 더 나을 것으로 생각했다..
스케줄링 서버를 단일로 구성할 시 장애에 취약하므로 다중 서버로 구성. 다만 이렇게 스케일 아웃되 각 스케줄링 서버마다 스케줄러가 중복 실행되므로, ScheduleLock을 이용하여 Lock을 선점한 스케줄러가 동작한다.