본문 바로가기
실전 프로젝트/프로젝트 과정

실전 프로젝트 17 - 카프카 이벤트 처리 (Product Consumer)

by 구너드 2023. 8. 22.

상품 관련 재고 수정 이벤트 처리의 Consumer 부분 처리

 

@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.address}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, String> stockConsumerFactory() {
        HashMap<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_3");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> stockKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stockConsumerFactory());

        return factory;
    }
}

Producer에서 했던 것과 유사하게 기본 설정들을 해준다. 다만 Consumer는 그룹을 생성할 수 있기 때문에 팀원들과 미리 합의한 대로 group_3을 생성하였다. 카프카 템플릿이 아닌 ConcurrentKafkaListenerContainerFactory를 이용하였는데 이는 다수의 카프카 리스너 컨테이너를 생성하고 구성할 수 있는 팩토리 클래스로, 멀티스레드 환경에서 카프카 리스너를 생성하고 관리하는데 도움을 주는 역할을 한다. 여러 개의 리스너 컨테이너를 사용하면, 여러 스레드에서 병렬로 카프카 메시지를 처리할 수 있다. 이를 통해 카프카의 다량의 메시지를 효율적으로 처리하고 응답성을 향상시킬 수 있다.


@RequiredArgsConstructor
@Service
@Slf4j
public class StockEventConsumer {

    private final ProductService productService;
    private final ObjectMapper objectMapper;

    @Transactional
    @KafkaListener(topics = "${kafka.topic.stock}", containerFactory = "stockKafkaListenerContainerFactory")
    public void listener(String message) throws JsonProcessingException {
        log.info("message = {}", message);
        RequestStockDto requestStockDto = objectMapper.readValue(message, RequestStockDto.class);
        log.info("requestBidDto = {}, {}", requestStockDto.getProductId());
        productService.updateStockCount(requestStockDto.getProductId());
    }
}

재고 관련 이벤트를 담당하는 Consumer. Producer를 작성할 때와 달리 이번 Consumer 작성에서는 application.yml 파일에 Bootstrap server와 topic의 이름을 작성하는 것으로 실제 코드에서 실수가 발생할 수 있는 부분들을 최소화해보는 방식으로 진행했다.


재고 관련 비즈니스 로직

    @Override
    public void updateStockCount(Long productId) {
        Product findProduct = productRepository.findById(productId).orElseThrow(() -> new IllegalArgumentException("상품이 없습니다."));

        findProduct.updateStockCount();
    }

 

@Entity
@Table(name = "products")
@Getter
@Builder
@NoArgsConstructor(access = PROTECTED)
@AllArgsConstructor(access = PROTECTED)
public class Product {

	///

    public Long updateStockCount() {

        if (stockCount <= 0) {
            throw new IllegalStateException("상품의 재고가 부족합니다");
        }

        return this.stockCount--;
    }
}