독립적으로 서비스하는 경매 도메인에서 입찰 이벤트를 회원 도메인으로 보내기 위해서 카프카 관련 설정 및 코드를 작성하는 과제를 하게 되었다. 주키퍼와 카프카 서버를 로컬에서 띄운뒤, 콘솔 컨슈머를 이용해 실제로 발생하는 입찰 이벤트가 콘솔 컨슈머에서 정상적으로 나타나는지 확인하는 게 오늘의 목표.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
카프카와 관련된 Bootstrap server와 직렬화 설정을 하는 클래스.
여기서 사용되는 카프카 템플릿은 애플리케이션 개발에 사용되는 패턴이다. 이 템플릿은 카프카에 연결하고 데이터를 보내거나 받는 과정을 간소화하며, 일반적인 개발 작업을 단순화하고 일관성을 유지하는 데 도움을 준다.
카프카 템플릿을 이용함으로써 얻을 수 있는 효과
1.생산자와 소비자 작업 단순화: 카프카 템플릿은 카프카로 데이터를 전송하는 생산자와 데이터를 소비하는 소비자의 작업을 단순화할 수 있다. 필요한 설정을 최소화하고, 일반적인 사용 사례를 위한 미리 구성된 기능을 제공한다.
2.메시지 송수신 단순화: 템플릿을 사용하면 메시지를 보내고 받는 작업을 추상화하여 간편하게 처리할 수 있다. 이를 통해 개발자는 데이터를 처리하는 코드에 집중할 수 있다.
작성된 카프카 관련 설정들을 카프카 템플릿을 이용하면 손쉽게 카프카 메세지 처리를 할 수 있다는 게 큰 장점.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendBidDto(String topic, RequestBidDto bidDto) {
ObjectMapper objectMapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = objectMapper.writeValueAsString(bidDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("memberId:{}, point:{}", bidDto.getBid());
}
}
Json을 String 타입으로 변환하기 위해 objectMapper를 이용하였다. 기본적인 Producer 설정이 되었으니까 이를 이용해서 실제 입찰이 실행되는 Service 계층에 넣어주면 된다.
@Transactional
public ResponseWinningPriceDto bid(Long auctionId, RequestAuctionDto requestAuctionDto, Long memberId) {
Auction findAuction = auctionRepository.findById(auctionId).orElseThrow(() -> new IllegalArgumentException("진행중인 경매가 없습니다"));
RequestBidDto bidDto = RequestBidDto.builder()
.memberId(memberId)
.bid(requestAuctionDto.getPoint())
.build();
kafkaProducer.sendBidDto("bid-topic", bidDto);
ResponsePointDto responsePointDto = memberServiceClient.getPoint(memberId).getBody();
validateAuctionCondition(requestAuctionDto, findAuction, responsePointDto);
findAuction.update(requestAuctionDto.getPoint(), memberId);
return createResponseWinningPriceDto(findAuction);
}
Service 계층의 코드는 순수한 자바 코드로 이루어지는 게 좋고, 그 변동성이 적어야 한다는 점도 인지하고 있지만 카프카와 이벤트 발송 처리를 Controller에서 하는 것은 클라이언트와 연결을 담당하는 역할을 하는 Controller와 거리가 먼 듯 했다. 그렇다고 해당 코드를 Repository에 넣을 수는 없을 것 같아서 우선은 Service 계층에서 카프카 이벤트처리까지 하는 걸로 생각했다. 나중에 시간이 돼서 아키텍처에 변화가 생긴다면 보다 순수한 비즈니스 로직이 있는 계층을 만드는 데 주력해야하지 않나 싶다.
수정된 테스트코드.
@ExtendWith(MockitoExtension.class)
class AuctionServiceTest {
@Mock
private AuctionRepository auctionRepository;
@Mock
private KafkaProducer kafkaProducer;
@Mock
private MemberServiceClient memberServiceClient;
@InjectMocks
private AuctionService auctionService;
private ResponsePointDto responsePointDto;
private ResponsePointDto responsePointDtoSub;
private Auction auction;
@BeforeEach
public void beforeEach() {
auction = Auction.builder()
.id(1L)
.productId(1L)
.productName("product")
.imageUrl("image")
.memberId(1L)
.openingPrice(3000L)
.openingTime(LocalDateTime.now().withHour(15))
.closingTime(LocalDateTime.now().withHour(16).withMinute(59))
.winningPrice(6000L)
.build();
responsePointDto = ResponsePointDto.builder()
.point(30000L)
.availablePoint(30000L)
.deposit(0L)
.build();
responsePointDtoSub = ResponsePointDto.builder()
.point(40000L)
.availablePoint(40000L)
.deposit(0L)
.build();
}
@Test
@DisplayName("경매 정보 조회")
void getAuctionTest() throws Exception {
//given
when(auctionRepository.findByCurrentTime(any()))
.thenReturn(Optional.of(auction));
//when
ResponseAuctionDto responseDto = auctionService.getAuction();
//then
assertThat(responseDto.getProductName()).isEqualTo("product");
}
@Test
@DisplayName("입찰 성공 테스트1")
public void bidSuccess1() {
when(memberServiceClient.getPoint(any()))
.thenReturn(ResponseEntity.ok(responsePointDto));
doNothing().when(kafkaProducer).sendBidDto(any(), any());
when(auctionRepository.findById(any()))
.thenReturn(Optional.of(auction));
RequestAuctionDto request = new RequestAuctionDto(12000L, LocalDateTime.now().withHour(15).withMinute(10));
ResponseWinningPriceDto response = auctionService.bid(1L, request, 1L);
Assertions.assertThat(response.getWinningPrice()).isEqualTo(12000L);
}
@Test
@DisplayName("현재 입찰가 실패시 발생하는 예외 테스트")
public void bidFailCausedByWinningPrice() {
when(auctionRepository.findById(any()))
.thenReturn(Optional.of(auction));
when(memberServiceClient.getPoint(any()))
.thenReturn(ResponseEntity.ok(responsePointDto));
doNothing().when(kafkaProducer).sendBidDto(any(), any());
RequestAuctionDto request = new RequestAuctionDto(5000L, LocalDateTime.now().withHour(16));
assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("현재 입찰가보다 부족한 입찰 금액입니다");
}
@Test
@DisplayName("기본 입찰가 실패시 발생하는 예외 테스트")
public void bidFailCausedByOpeningPrice() {
when(auctionRepository.findById(any()))
.thenReturn(Optional.of(auction));
when(memberServiceClient.getPoint(any()))
.thenReturn(ResponseEntity.ok(responsePointDto));
doNothing().when(kafkaProducer).sendBidDto(any(), any());
RequestAuctionDto request = new RequestAuctionDto(2000L, LocalDateTime.now().withHour(16));
assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("기본 입찰가보다 부족한 입찰 금액입니다");
}
@Test
@DisplayName("마감시간 초과시 발생하는 예외 테스트")
public void bidFailCausedByClosingTime() {
when(auctionRepository.findById(any()))
.thenReturn(Optional.of(auction));
when(memberServiceClient.getPoint(any()))
.thenReturn(ResponseEntity.ok(responsePointDto));
doNothing().when(kafkaProducer).sendBidDto(any(), any());
RequestAuctionDto request = new RequestAuctionDto(10000L, LocalDateTime.now().withHour(17));
assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
.isInstanceOf(IllegalStateException.class)
.hasMessage("경매가 종료되었습니다");
}
@Test
@DisplayName("보유 포인트 부족시 발생하는 예외 테스트")
public void bidFailCausedByPoint() {
when(auctionRepository.findById(any()))
.thenReturn(Optional.of(auction));
when(memberServiceClient.getPoint(any()))
.thenReturn(ResponseEntity.ok(responsePointDto));
doNothing().when(kafkaProducer).sendBidDto(any(), any());
RequestAuctionDto request = new RequestAuctionDto(40000L, LocalDateTime.now().withHour(16));
assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("가지고 있는 포인트보다 많은 금액을 입찰할 수 없습니다");
}
@Test
@DisplayName("입찰 경쟁 및 예치금 초기화 테스트")
public void bidCompetitionAndInitDeposit() {
when(auctionRepository.findById(any()))
.thenReturn(Optional.of(auction));
when(memberServiceClient.getPoint(1L))
.thenReturn(ResponseEntity.ok(responsePointDto));
when(memberServiceClient.getPoint(2L))
.thenReturn(ResponseEntity.ok(responsePointDtoSub));
doNothing().when(kafkaProducer).sendBidDto(any(), any());
RequestAuctionDto request1 = new RequestAuctionDto(12000L, LocalDateTime.now().withHour(15));
RequestAuctionDto request2 = new RequestAuctionDto(20000L, LocalDateTime.now().withHour(15));
auctionService.bid(1L, request1, 1L);
auctionService.bid(1L, request2, 2L);
Assertions.assertThat(auction.getMemberId()).isEqualTo(2L);
}
}
'실전 프로젝트 > 프로젝트 과정' 카테고리의 다른 글
실전 프로젝트 18 - FeignClient (1) | 2023.08.24 |
---|---|
실전 프로젝트 17 - 카프카 이벤트 처리 (Product Consumer) (0) | 2023.08.22 |
실전 프로젝트 15 - MVP 중간발표 (0) | 2023.08.19 |
실전 프로젝트 14 - 동기, 비동기 (0) | 2023.08.19 |
실전 프로젝트 13 - 서비스 아키텍처 (0) | 2023.08.17 |