상품 관련 재고 수정 이벤트 처리의 Consumer 부분 처리
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.address}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, String> stockConsumerFactory() {
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_3");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> stockKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(stockConsumerFactory());
return factory;
}
}
Producer에서 했던 것과 유사하게 기본 설정들을 해준다. 다만 Consumer는 그룹을 생성할 수 있기 때문에 팀원들과 미리 합의한 대로 group_3을 생성하였다. 카프카 템플릿이 아닌 ConcurrentKafkaListenerContainerFactory를 이용하였는데 이는 다수의 카프카 리스너 컨테이너를 생성하고 구성할 수 있는 팩토리 클래스로, 멀티스레드 환경에서 카프카 리스너를 생성하고 관리하는데 도움을 주는 역할을 한다. 여러 개의 리스너 컨테이너를 사용하면, 여러 스레드에서 병렬로 카프카 메시지를 처리할 수 있다. 이를 통해 카프카의 다량의 메시지를 효율적으로 처리하고 응답성을 향상시킬 수 있다.
@RequiredArgsConstructor
@Service
@Slf4j
public class StockEventConsumer {
private final ProductService productService;
private final ObjectMapper objectMapper;
@Transactional
@KafkaListener(topics = "${kafka.topic.stock}", containerFactory = "stockKafkaListenerContainerFactory")
public void listener(String message) throws JsonProcessingException {
log.info("message = {}", message);
RequestStockDto requestStockDto = objectMapper.readValue(message, RequestStockDto.class);
log.info("requestBidDto = {}, {}", requestStockDto.getProductId());
productService.updateStockCount(requestStockDto.getProductId());
}
}
재고 관련 이벤트를 담당하는 Consumer. Producer를 작성할 때와 달리 이번 Consumer 작성에서는 application.yml 파일에 Bootstrap server와 topic의 이름을 작성하는 것으로 실제 코드에서 실수가 발생할 수 있는 부분들을 최소화해보는 방식으로 진행했다.
재고 관련 비즈니스 로직
@Override
public void updateStockCount(Long productId) {
Product findProduct = productRepository.findById(productId).orElseThrow(() -> new IllegalArgumentException("상품이 없습니다."));
findProduct.updateStockCount();
}
@Entity
@Table(name = "products")
@Getter
@Builder
@NoArgsConstructor(access = PROTECTED)
@AllArgsConstructor(access = PROTECTED)
public class Product {
///
public Long updateStockCount() {
if (stockCount <= 0) {
throw new IllegalStateException("상품의 재고가 부족합니다");
}
return this.stockCount--;
}
}
'실전 프로젝트 > 프로젝트 과정' 카테고리의 다른 글
실전 프로젝트 19 - 분산 추적 (0) | 2023.08.26 |
---|---|
실전 프로젝트 18 - FeignClient (1) | 2023.08.24 |
실전 프로젝트 16 - 카프카 이벤트 처리(Auction Producer) (0) | 2023.08.21 |
실전 프로젝트 15 - MVP 중간발표 (0) | 2023.08.19 |
실전 프로젝트 14 - 동기, 비동기 (0) | 2023.08.19 |