본문 바로가기
실전 프로젝트/프로젝트 과정

실전 프로젝트 16 - 카프카 이벤트 처리(Auction Producer)

by 구너드 2023. 8. 21.

독립적으로 서비스하는 경매 도메인에서 입찰 이벤트를 회원 도메인으로 보내기 위해서 카프카 관련 설정 및 코드를 작성하는 과제를 하게 되었다. 주키퍼와 카프카 서버를 로컬에서 띄운뒤, 콘솔 컨슈머를 이용해 실제로 발생하는 입찰 이벤트가 콘솔 컨슈머에서 정상적으로 나타나는지 확인하는 게 오늘의 목표.


@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

카프카와 관련된 Bootstrap server와 직렬화 설정을 하는 클래스.

여기서 사용되는 카프카 템플릿은 애플리케이션 개발에 사용되는 패턴이다. 이 템플릿은 카프카에 연결하고 데이터를 보내거나 받는 과정을 간소화하며, 일반적인 개발 작업을 단순화하고 일관성을 유지하는 데 도움을 준다.

 

카프카 템플릿을 이용함으로써 얻을 수 있는 효과

 

1.생산자와 소비자 작업 단순화: 카프카 템플릿은 카프카로 데이터를 전송하는 생산자와 데이터를 소비하는 소비자의 작업을 단순화할 수 있다. 필요한 설정을 최소화하고, 일반적인 사용 사례를 위한 미리 구성된 기능을 제공한다.

2.메시지 송수신 단순화: 템플릿을 사용하면 메시지를 보내고 받는 작업을 추상화하여 간편하게 처리할 수 있다. 이를 통해 개발자는 데이터를 처리하는 코드에 집중할 수 있다.

 

작성된 카프카 관련 설정들을 카프카 템플릿을 이용하면 손쉽게 카프카 메세지 처리를 할 수 있다는 게 큰 장점.

 

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendBidDto(String topic, RequestBidDto bidDto) {
        ObjectMapper objectMapper = new ObjectMapper();
        String jsonInString = "";

        try {
            jsonInString = objectMapper.writeValueAsString(bidDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);
        log.info("memberId:{}, point:{}", bidDto.getBid());
    }
}

Json을 String 타입으로 변환하기 위해 objectMapper를 이용하였다. 기본적인 Producer 설정이 되었으니까 이를 이용해서 실제 입찰이 실행되는 Service 계층에 넣어주면 된다.

 

    @Transactional
    public ResponseWinningPriceDto bid(Long auctionId, RequestAuctionDto requestAuctionDto, Long memberId) {

        Auction findAuction = auctionRepository.findById(auctionId).orElseThrow(() -> new IllegalArgumentException("진행중인 경매가 없습니다"));

        RequestBidDto bidDto = RequestBidDto.builder()
                .memberId(memberId)
                .bid(requestAuctionDto.getPoint())
                .build();

        kafkaProducer.sendBidDto("bid-topic", bidDto);

        ResponsePointDto responsePointDto = memberServiceClient.getPoint(memberId).getBody();

        validateAuctionCondition(requestAuctionDto, findAuction, responsePointDto);

        findAuction.update(requestAuctionDto.getPoint(), memberId);

        return createResponseWinningPriceDto(findAuction);
    }

Service 계층의 코드는 순수한 자바 코드로 이루어지는 게 좋고, 그 변동성이 적어야 한다는 점도 인지하고 있지만 카프카와 이벤트 발송 처리를 Controller에서 하는 것은 클라이언트와 연결을 담당하는 역할을 하는 Controller와 거리가 먼 듯 했다. 그렇다고 해당 코드를 Repository에 넣을 수는 없을 것 같아서 우선은 Service 계층에서 카프카 이벤트처리까지 하는 걸로 생각했다. 나중에 시간이 돼서 아키텍처에 변화가 생긴다면 보다 순수한 비즈니스 로직이 있는 계층을 만드는 데 주력해야하지 않나 싶다.


수정된 테스트코드. 

@ExtendWith(MockitoExtension.class)
class AuctionServiceTest {

    @Mock
    private AuctionRepository auctionRepository;

    @Mock
    private KafkaProducer kafkaProducer;

    @Mock
    private MemberServiceClient memberServiceClient;

    @InjectMocks
    private AuctionService auctionService;

    private ResponsePointDto responsePointDto;

    private ResponsePointDto responsePointDtoSub;

    private Auction auction;


    @BeforeEach
    public void beforeEach() {

        auction = Auction.builder()
                .id(1L)
                .productId(1L)
                .productName("product")
                .imageUrl("image")
                .memberId(1L)
                .openingPrice(3000L)
                .openingTime(LocalDateTime.now().withHour(15))
                .closingTime(LocalDateTime.now().withHour(16).withMinute(59))
                .winningPrice(6000L)
                .build();

        responsePointDto = ResponsePointDto.builder()
                .point(30000L)
                .availablePoint(30000L)
                .deposit(0L)
                .build();

        responsePointDtoSub = ResponsePointDto.builder()
                .point(40000L)
                .availablePoint(40000L)
                .deposit(0L)
                .build();
    }

    @Test
    @DisplayName("경매 정보 조회")
    void getAuctionTest() throws Exception {
        //given

        when(auctionRepository.findByCurrentTime(any()))
                .thenReturn(Optional.of(auction));

        //when
        ResponseAuctionDto responseDto = auctionService.getAuction();
        //then
        assertThat(responseDto.getProductName()).isEqualTo("product");

    }

    @Test
    @DisplayName("입찰 성공 테스트1")
    public void bidSuccess1() {

        when(memberServiceClient.getPoint(any()))
                .thenReturn(ResponseEntity.ok(responsePointDto));

        doNothing().when(kafkaProducer).sendBidDto(any(), any());

        when(auctionRepository.findById(any()))
                .thenReturn(Optional.of(auction));

        RequestAuctionDto request = new RequestAuctionDto(12000L, LocalDateTime.now().withHour(15).withMinute(10));

        ResponseWinningPriceDto response = auctionService.bid(1L, request, 1L);

        Assertions.assertThat(response.getWinningPrice()).isEqualTo(12000L);
    }


    @Test
    @DisplayName("현재 입찰가 실패시 발생하는 예외 테스트")
    public void bidFailCausedByWinningPrice() {

        when(auctionRepository.findById(any()))
                .thenReturn(Optional.of(auction));

        when(memberServiceClient.getPoint(any()))
                .thenReturn(ResponseEntity.ok(responsePointDto));

        doNothing().when(kafkaProducer).sendBidDto(any(), any());

        RequestAuctionDto request = new RequestAuctionDto(5000L, LocalDateTime.now().withHour(16));

        assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
                .isInstanceOf(IllegalArgumentException.class)
                .hasMessage("현재 입찰가보다 부족한 입찰 금액입니다");
    }

    @Test
    @DisplayName("기본 입찰가 실패시 발생하는 예외 테스트")
    public void bidFailCausedByOpeningPrice() {

        when(auctionRepository.findById(any()))
                .thenReturn(Optional.of(auction));

        when(memberServiceClient.getPoint(any()))
                .thenReturn(ResponseEntity.ok(responsePointDto));

        doNothing().when(kafkaProducer).sendBidDto(any(), any());

        RequestAuctionDto request = new RequestAuctionDto(2000L, LocalDateTime.now().withHour(16));

        assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
                .isInstanceOf(IllegalArgumentException.class)
                .hasMessage("기본 입찰가보다 부족한 입찰 금액입니다");
    }

    @Test
    @DisplayName("마감시간 초과시 발생하는 예외 테스트")
    public void bidFailCausedByClosingTime() {

        when(auctionRepository.findById(any()))
                .thenReturn(Optional.of(auction));

        when(memberServiceClient.getPoint(any()))
                .thenReturn(ResponseEntity.ok(responsePointDto));

        doNothing().when(kafkaProducer).sendBidDto(any(), any());


        RequestAuctionDto request = new RequestAuctionDto(10000L, LocalDateTime.now().withHour(17));

        assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
                .isInstanceOf(IllegalStateException.class)
                .hasMessage("경매가 종료되었습니다");
    }

    @Test
    @DisplayName("보유 포인트 부족시 발생하는 예외 테스트")
    public void bidFailCausedByPoint() {

        when(auctionRepository.findById(any()))
                .thenReturn(Optional.of(auction));

        when(memberServiceClient.getPoint(any()))
                .thenReturn(ResponseEntity.ok(responsePointDto));

        doNothing().when(kafkaProducer).sendBidDto(any(), any());


        RequestAuctionDto request = new RequestAuctionDto(40000L, LocalDateTime.now().withHour(16));

        assertThatThrownBy(() -> auctionService.bid(1L, request, 1L))
                .isInstanceOf(IllegalArgumentException.class)
                .hasMessage("가지고 있는 포인트보다 많은 금액을 입찰할 수 없습니다");
    }

    @Test
    @DisplayName("입찰 경쟁 및 예치금 초기화 테스트")
    public void bidCompetitionAndInitDeposit() {

        when(auctionRepository.findById(any()))
                .thenReturn(Optional.of(auction));

        when(memberServiceClient.getPoint(1L))
                .thenReturn(ResponseEntity.ok(responsePointDto));

        when(memberServiceClient.getPoint(2L))
                .thenReturn(ResponseEntity.ok(responsePointDtoSub));

        doNothing().when(kafkaProducer).sendBidDto(any(), any());

        RequestAuctionDto request1 = new RequestAuctionDto(12000L, LocalDateTime.now().withHour(15));
        RequestAuctionDto request2 = new RequestAuctionDto(20000L, LocalDateTime.now().withHour(15));

        auctionService.bid(1L, request1, 1L);
        auctionService.bid(1L, request2, 2L);

        Assertions.assertThat(auction.getMemberId()).isEqualTo(2L);
    }
}