[Tistory] MSA 환경에서 Kafka 이벤트 기반 주문 처리와 트랜잭션 관리

원글 페이지 : 바로가기

서론 MSA 환경에서 상품 주문 처리 기능을 구현하는 과정에서 트랜잭션과 이벤트 발행의 실행 시점 차이로 인해 문제가 발생하였다. MSA 환경에서 트랜잭션과 이벤트 발행의 중요성 트랜잭션과 이벤트 간의 일관성을 보장하는 것은 시스템의 안전성, 데이터 일관성, 그리고 서비스 간 느슨한 결합을 유지하는 데 중요하다. 데이터 일관성 유지: 트랜잭션이 성공적으로 커밋된 경우에만 이벤트를 발행함으로써, 데이터 일관성을 보장해야 한다. ex) 주문 처리가 성공적으로 이루어진 경우에만 결제 처리 이벤트를 발행해야 한다. 분산 시스템에서의 느슨한 결합: 이벤트 기반 아키텍처는 서비스 간의 느슨한 결합을 지원한다. 트랜잭션 결과에 따라 이벤트를 발행하면, 각 서비스는 독립적으로 작동할 수 있으며, 이벤트를 구독하는 방식으로만 다른 서비스와 상호 작용한다. 복구 용이성: 트랜잭션 결과에 기반해 이벤트를 발행하면, 시스템의 오류나 장애가 발생했을 경우 복구가 용이하다. 상품 주문 플로우차트 상품 주문 플로우차트 상품 서비스에서 상품 재고를 감소시킨다. 주문 요청 이벤트를 발행한다. 주문 서비스에서 주문 로직 처리 후 결제 요청 이벤트를 발행한다. 결제 서비스는 결제 로직을 처리한다. 여기서 문제는 로직 처리 후 이벤트 발행 과정에서 발생한다. 이 글에서는 주문 서비스를 예시로 설명한다. 문제 상황 코드 상품 서비스에서 주문 이벤트가 발행하면 다음 코드가 실행된다 가정한다. @Override
@Transactional
public void processOrder(OrderRequestEventDto orderRequestEventDto) {
Order order = orderRepository.save(new Order(orderRequestEventDto.getUserId()));
OrderDetail orderDetail = orderDetailRepository.save(new OrderDetail(order, orderRequestEventDto));
kafkaSender.sendPaymentRequestEvent(KafkaVO.PAYMENT_REQUEST_TOPIC,
PaymentRequestEventDto.fromOrderRequest(orderRequestEventDto, order.getId()));
// 복잡하고 오래 걸리는 작업 …
} @Transactional 어노테이션을 추가해 메서드가 하나의 트랜잭션으로 관리되도록 한다. Order, OrderDetail 객체를 생성하고 저장한다. 위 과정을 마치고 Kafka 토픽에 메시지와 함께 이벤트를 발행한다. 그 후 복잡하고 오래 걸리는 작업이 존재한다 가정한다. 문제 상황 상품 주문 시, 주문 서비스의 로그를 살펴본다. 상품 서비스 Transaction, Kafka 로그 2024-05-17T22:49:08.042+09:00 INFO 48626 — [order-service] [ntainer#0-0-C-1] c.m.o.service.kafka.KafkaReceiver : traceId=5d51710f-b1f1-4bf4-aa38-9b7efecaf1ae, 상품 주문 이벤트 소비
2024-05-17T22:49:08.043+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.miri.orderservice.service.order.OrderServiceImpl.processOrder]
2024-05-17T22:49:08.043+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:49:08.044+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:49:08.044+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:49:08.046+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:49:08.046+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:49:08.047+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:49:08.047+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=PaymentRequestEventDto(userId=10001, orderId=2, goodsId=1, quantity=2, goodsPrice=75000, traceId=5d51710f-b1f1-4bf4-aa38-9b7efecaf1ae), timestamp=null)
2024-05-17T22:49:08.047+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@29fbd707] send(ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=PaymentRequestEventDto(userId=10001, orderId=2, goodsId=1, quantity=2, goodsPrice=75000, traceId=5d51710f-b1f1-4bf4-aa38-9b7efecaf1ae), timestamp=null))
2024-05-17T22:49:08.047+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 109, 105, 114, 105, 46, 99, 111, 114, 101, 109, 111, 100, 117, 108, 101, 46, 100, 116, 111, 46, 107, 97, 102, 107, 97, 46, 80, 97, 121, 109, 101, 110, 116, 82, 101, 113, 117, 101, 115, 116, 69, 118, 101, 110, 116, 68, 116, 111])], isReadOnly = true), key=null, value=PaymentRequestEventDto(userId=10001, orderId=2, goodsId=1, quantity=2, goodsPrice=75000, traceId=5d51710f-b1f1-4bf4-aa38-9b7efecaf1ae), timestamp=null)
2024-05-17T22:49:08.047+09:00 TRACE 48626 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.miri.orderservice.service.order.OrderServiceImpl.processOrder]
2024-05-17T22:49:08.049+09:00 TRACE 48626 — [order-service] [ad | producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 109, 105, 114, 105, 46, 99, 111, 114, 101, 109, 111, 100, 117, 108, 101, 46, 100, 116, 111, 46, 107, 97, 102, 107, 97, 46, 80, 97, 121, 109, 101, 110, 116, 82, 101, 113, 117, 101, 115, 116, 69, 118, 101, 110, 116, 68, 116, 111])], isReadOnly = true), key=null, value=PaymentRequestEventDto(userId=10001, orderId=2, goodsId=1, quantity=2, goodsPrice=75000, traceId=5d51710f-b1f1-4bf4-aa38-9b7efecaf1ae), timestamp=null), metadata: payment-request-0@83061
2024-05-17T22:49:08.050+09:00 TRACE 48626 — [order-service] [ad | producer-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@29fbd707] close(PT5S) 주문 서비스의 로그를 확인해 보면, 트랜잭션 완료(Completing transaction) 전에 결제 요청 이벤트가 발행(Sent: ProducerRecord)되는 것을 확인할 수 있다. 이는 다음과 같은 상황에서 문제가 될 수 있다. 해결 방안 탐색(Kafka Transaction vs. TransactionalEventListener) Kafka Transaction과 Spring의 @TransactionalEventListener는 모두 트랜잭션과 관련된 이벤트를 처리하는 기능을 제공하지만, 사용 목적과 처리 방식에서 차이가 있다. 기준 Kafka Transaction @TransactionalEventListener 기술 스택 Kafka Spring Framework 주요 기능 여러 메시지 전송을 하나의 트랜잭션으로 묶음 트랜잭션의 특정 단계(ex: 커밋, 롤백)에서 이벤트 발생 처리 운용 환경 분산 시스템 내의 서비스 간 메시지 전송 및 처리 트랜잭션 관리 및 이벤트 처리 트랜잭션 관리 범위 Kafka 내 메시지의 생산과 소비에 대한 트랜잭션 관리 데이터베이스 트랜잭션에 연계된 내부 이벤트의 처리 데이터 일관성 유지 방식 분산 트랜잭션을 통한 메시지 원자성(Atomicity) 보장 스프링 트랜잭션의 성공/실패 여부에 따른 조건부 이벤트 발행 사용 사례 분산 시스템 간 데이터 동기화, 높은 처리량이 요구되는 이벤트 스트리밍 애플리케이션 내 데이터 처리 및 비지니스 로직의 이벤트 기반 실행 장점 분산 시스템에서의 데이터 일관성과 신뢰성 보장 트랜잭션 생명 주기에 따른 이벤트 제어 단점 설정과 관리가 복잡하며 트랜잭션 처리로 인한 성능 저하 발생 가능 메시지 큐 시스템과 직접적으로 통합되지 않으며, Spring Framework 내에서만 사용됨 해결 방안 선정 @TransactionalEventListener를 사용하도록 하였다. 현재 내 프로젝트의 경우, 시스템 간의 메시지 원자성을 보장하고자 하는 것이 아닌 트랜잭션의 상태(ex: 커밋, 롤백)에 따른 이벤트 발생 여부이다. 이를 고려했을 때, 데이터베이스 트랜잭션과 관련된 강력한 기능을 제공해 주는 @TransactionalEventListener가 적절하다 판단하였다. 또한 Kafka Transaction을 활용하는 경우 트랜잭션의 상태와 상관없이 무조건 이벤트가 발행된다는 특징을 가진다. 즉, Spring Transaction 예외 발생 시에도 이벤트가 발행된다. 문제 상황을 해결하기 위해서는 Producer가 아닌 Consumer가 isolation.level 값을 read_committed로 설정하여 Kafka Transaction이 Commit 된 이벤트만 구독하도록 설정해야 한다. 이러한 특징은 현재 필요한 기능이 아니며 오히려 성능 저하를 발생시킬 수 있을 것이라 판단하였다. @TransactionalEventListener 도입 코드 @Getter
public class ProcessOrderEvent extends ApplicationEvent {

private final OrderRequestEventDto orderRequestEventDto;
private final Long orderId;

public ProcessOrderEvent(Object source, OrderRequestEventDto orderRequestEventDto, Long orderId) {
super(source);
this.orderRequestEventDto = orderRequestEventDto;
this.orderId = orderId;
}
} @Slf4j
@Component
public class TransactionEventListener {

private final KafkaSender kafkaSender;

public TransactionEventListener(KafkaSender kafkaSender) {
this.kafkaSender = kafkaSender;
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderSuccess(ProcessOrderEvent event) {
OrderRequestEventDto orderRequestEventDto = event.getOrderRequestEventDto();
log.info(“traceId={}, 카프카 결제 요청 이벤트 발행”, orderRequestEventDto.getTraceId());
kafkaSender.sendPaymentRequestEvent(KafkaVO.PAYMENT_REQUEST_TOPIC,
PaymentRequestEventDto.fromOrderRequest(orderRequestEventDto, event.getOrderId()));
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void onOrderFailure(ProcessOrderEvent event) {
OrderRequestEventDto orderRequestEventDto = event.getOrderRequestEventDto();
log.info(“traceId={}, 주문 실패 이벤트 발행”, orderRequestEventDto.getTraceId());
kafkaSender.sendRollbackRequestEvent(KafkaVO.STOCK_ROLLBACK_TOPIC,
StockRollbackEventDto.fromOrderRequest(orderRequestEventDto));
}
} 주문 처리 이벤트(ProcessOrderEvent)에 대한 리스너를 정의한다. onOrderSuccess 메서드는 트랜잭션이 성공적으로 커밋된 후에 실행되며, 결제 요청 이벤트를 카프카로 발행한다. onOrderFailure 메서드는 트랜잭션이 롤백된 후 실행되며, 주문 실패와 관련된 이벤트를 카프카로 발행한다. @Override
@Transactional
public void processOrder(OrderRequestEventDto orderRequestEventDto) {
Order order = orderRepository.save(new Order(orderRequestEventDto.getUserId()));
OrderDetail orderDetail = orderDetailRepository.save(new OrderDetail(order, orderRequestEventDto));
applicationEventPublisher.publishEvent(new ProcessOrderEvent(this, orderRequestEventDto, order.getId()));
} applicationEventPublisher.publishEvent를 호출하여 ProcessOrderEvent 이벤트를 발행한다. @TransactionalEventListener 적용 결과 상품 서비스 Transaction, Kafka 로그 2024-05-17T22:36:51.937+09:00 INFO 48083 — [order-service] [ntainer#0-0-C-1] c.m.o.service.kafka.KafkaReceiver : traceId=3ddb0690-f8b6-4e6f-ae88-f8822c0d10e6, 상품 주문 이벤트 소비
2024-05-17T22:36:51.938+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.miri.orderservice.service.order.OrderServiceImpl.processOrder]
2024-05-17T22:36:51.938+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:36:51.940+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:36:51.940+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:36:51.942+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2024-05-17T22:36:51.942+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.miri.orderservice.service.order.OrderServiceImpl.processOrder]
2024-05-17T22:36:51.944+09:00 INFO 48083 — [order-service] [ntainer#0-0-C-1] c.m.o.event.TransactionEventListener : traceId=3ddb0690-f8b6-4e6f-ae88-f8822c0d10e6, 카프카 결제 요청 이벤트 발행
2024-05-17T22:36:51.945+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=PaymentRequestEventDto(userId=10001, orderId=4, goodsId=1, quantity=2, goodsPrice=75000, traceId=3ddb0690-f8b6-4e6f-ae88-f8822c0d10e6), timestamp=null)
2024-05-17T22:36:51.945+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@66db7bc2] send(ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=PaymentRequestEventDto(userId=10001, orderId=4, goodsId=1, quantity=2, goodsPrice=75000, traceId=3ddb0690-f8b6-4e6f-ae88-f8822c0d10e6), timestamp=null))
2024-05-17T22:36:51.945+09:00 TRACE 48083 — [order-service] [ntainer#0-0-C-1] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 109, 105, 114, 105, 46, 99, 111, 114, 101, 109, 111, 100, 117, 108, 101, 46, 100, 116, 111, 46, 107, 97, 102, 107, 97, 46, 80, 97, 121, 109, 101, 110, 116, 82, 101, 113, 117, 101, 115, 116, 69, 118, 101, 110, 116, 68, 116, 111])], isReadOnly = true), key=null, value=PaymentRequestEventDto(userId=10001, orderId=4, goodsId=1, quantity=2, goodsPrice=75000, traceId=3ddb0690-f8b6-4e6f-ae88-f8822c0d10e6), timestamp=null)
2024-05-17T22:36:51.948+09:00 TRACE 48083 — [order-service] [ad | producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=payment-request, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 109, 105, 114, 105, 46, 99, 111, 114, 101, 109, 111, 100, 117, 108, 101, 46, 100, 116, 111, 46, 107, 97, 102, 107, 97, 46, 80, 97, 121, 109, 101, 110, 116, 82, 101, 113, 117, 101, 115, 116, 69, 118, 101, 110, 116, 68, 116, 111])], isReadOnly = true), key=null, value=PaymentRequestEventDto(userId=10001, orderId=4, goodsId=1, quantity=2, goodsPrice=75000, traceId=3ddb0690-f8b6-4e6f-ae88-f8822c0d10e6), timestamp=null), metadata: payment-request-0@83055 주문 서비스의 로그를 확인해보면, 트랜잭션이 완료(Completing transaction)된 이후, 결제 요청 이벤트가 발행(Sent: ProducerRecord)되는 것을 확인할 수 있다. 결론적으로, TransactionalEventListener를 도입하여 스프링 환경에서 결제 처리와 관련된 이벤트를 트랜잭션 결과에 따라 조건적으로 발행할 수 있게 되었다. 참고 https://velog.io/@youmakemesmile/Spring-KafkaTransaction-Kafka-Producer-%EC%A0%95%EB%A6%AC-%EA%B7%BC%EB%8D%B0-%EC%9D%B4%EC%A0%9C-%EC%8B%A4%EC%A0%9C-%EC%97%85%EB%AC%B4%EC%99%80-Transaction%EC%9D%84-%EA%B3%81%EB%93%A4%EC%9D%B8#1-kafka-transaction-%ED%99%9C%EC%9A%A9%EB%B0%A9%EC%8B%9D

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다