원글 페이지 : 바로가기
해당글에 앞서 백킹서비스 개념을 먼저 알고 가는게 좋다 https://ssmyefrin.tistory.com/56 [MSA] Backing service – MOM ✔ Backing service Backing Service란, 어플리케이션이 실행되는 가운데 네트워크를 통해서 사용할 수 있는 모든 서비스를 말하며 My SQL과 같은 데이터베이스, 캐쉬 시스템, SMTP 서비스 등 어플리케이션 ssmyefrin.tistory.com 카프카를 설치했으니, 이제 Spring Boot 를 연동하여 구현해보자. ✔ 실행환경 – MacOS(Intel) – Kafka + Zookeeper (Docker) – IntelliJ + Gradle + SpringBoot 3.1.5 + JDK17 – Spring Cloud Eureka 적용 – WebClient 비동기통신 – 카카오모먼트 API ✔ Description 나는 기존에 했던 프로젝트에서 리펙토링을 한다고 생각하고 구성을 했다. 카카오 매체에 내가 원하는 광고를 집행시키려면 캠페인,광고그룹,광고를 등록하는 과정을 거쳐야 한다. (이개념은 그냥이렇다 하고 넘어가자) 그리고 광고그룹을 등록시 캠페인ID 가 필요하고, 광고를 등록시엔 광고그룹ID 가 필요하다.(앞단계의 결과값이 필요함) 카카오모먼트는 API 초당 콜수 제한이 있기에 메시지 큐(Redis)서버를 이용하여 대기열에 쌓아두고 배치를 돌려 상호작용하였다. 발그림ㅠ ㅠ ✔ 전제 : 카카오모먼트 API 콜수제한이 없다고 가정 기존프로젝트 => 쓰로틀링 개선(큐서버 상호작용) 카카오모먼트 : 캠페인등록 -> 광고그룹등록 -> 광고그룹등록 기존통신모듈 : WebClient.block() 동기모듈 ✔ Repactoring 쓰로틀링 => 카프카 메시지브로커 카카오모먼트 : 캠페인 Topic / 광고그룹 Topic 캠페인 등록후 결과(캠페인ID)를 발행 -> 구독하고 있던 그룹서비스가 ID 받아서 광고그룹등록 (…) 통신모듈 : WebClient 비동기 ✔ 발행(Pub)서버 구축 1. 의존성추가 implementation ‘org.springframework.boot:spring-boot-starter-web’
// for apache kafka
implementation ‘org.springframework.kafka:spring-kafka’
implementation ‘org.springframework.boot:spring-boot-starter-webflux’ 2. Properties 설정(application.yml) server:
port: 8000
spring:
kafka:
bootstrap-servers: localhost:9092 서버포트와 카프카서버(9092) 쪽 설정해준다. 3. KafkaConfig 설정 @EnableKafka // @KafkaListener 사용을 위한 설정
@Configuration
public class KafkaConfig {
@Value(value = “${spring.kafka.bootstrap-servers}”)
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// ———————— Publish 설정 ————————————-
/**
* 캠페인토픽
* @return
*/
@Bean
public NewTopic campaignTopic() {
return new NewTopic(“campaign”, 1, (short) 1);
}
/**
* 광고그룹토픽
* @return
*/
@Bean
public NewTopic adGroupTopic() {
return new NewTopic(“adgroup”, 1, (short) 1);
}
/**
* 광고토픽
* @return
*/
@Bean
public NewTopic adTopic() {
return new NewTopic(“ad”, 1, (short) 1);
}
/**
* 카프카 데이터전송에 필요한 설정
* @return
*/
@Bean
public ProducerFactory
Map
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* 카프카 데이터를 전송을 위한 템플릿생성
* @return
*/
@Bean
public KafkaTemplate
return new KafkaTemplate<>(producerFactory());
}
} 토픽생성 및 전송을 위한 설정 생성(데이터직렬화를 위한 Factory 생성, 템플릿등) 4. 컨트롤러 생성 @RestController
@RequiredArgsConstructor
@RequestMapping(“campaign/kakao”)
@Slf4j
public class CampaignProducerController {
private final KafkaTemplate
private final NewTopic campaignTopic;
private final KakaoService kakaoService;
private final SendService sendService;
@GetMapping(“/search”)
public ResponseEntity> publishKakaoCampaign(){
log.info(“[카카오] 캠페인조회”);
Mono
kakaoService.campaignSubscribe(monoResult);
return ResponseEntity.ok().body(null);
}
@GetMapping(“/regist”)
public ResponseEntity> publicKakaoRegist() {
log.info(“[카카오] 캠페인등록”);
Mono
kakaoService.campaignSubscribe(monoResult);
return ResponseEntity.ok().body(null);
}
} 5. 서비스 생성 @Service
@RequiredArgsConstructor
@Slf4j
public class KakaoService {
private final WebClientConfig webClientConfig;
private final SendService sendService;
/**
* 캠페인조회
* @return
*/
public Mono
Mono
try{
mono = webClientConfig.getKakaoCampaignList();
} catch (Exception e){
log.error(“[카카오 API] 캠페인조회 EXCEPTION:”+e.getMessage(),e);
}
return mono;
}
/**
* 캠페인생성
* @return
*/
public Mono
Mono
try{
JSONObject param = new JSONObject();
param.put(“name”,”20231127_디스플레이방문”);
JSONObject goal = new JSONObject();
goal.put(“campaignType”,”DISPLAY”);
goal.put(“goal”,”VISITING”);
param.put(“campaignTypeGoal”, goal);
mono = webClientConfig.createKakaoCampaign(param);
} catch (Exception e){
log.error(“[카카오 API] 캠페인등록 EXCEPTION:”+e.getMessage(),e);
}
return mono;
}
/**
* 비동기작업콜백
* @param monoResult
*/
public void campaignSubscribe(Mono
monoResult.subscribe(
// 비동기 작업이 성공적으로 완료된 경우 실행되는 코드
result -> {
log.info(“[카카오 API] Result => {}”,result);
// 카프카로 메시지전송
sendService.kafkaSend(result);
},
// 에러가 발생한 경우 실행되는 코드
error -> {
log.error(“[카카오 API] Exception:”+error.getMessage());
},
// 비동기 작업이 완료되었을 때 실행되는 코드
() -> {
log.info(“[카카오 API] Successful Completed”);
}
);
}
} 비동기는 Try-Catch 로는 예외를 잡을수 없다. 서로 다른스레드에서 작업중이므로 캐치하지못한다. 따라서 따로 Error 함수에서 처리해야한다. (그 외의 다른 함수에서 처리되는것들을 잡으려면 쓰긴해야함) 6. 카프카로 메시지전송 @Slf4j
@Service
@RequiredArgsConstructor
public class SendService {
private final KafkaTemplate
private final NewTopic campaignTopic;
/**
* 카프카로 메시지전송
* @param obj
*/
public void kafkaSend(Object obj){
CompletableFuture
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info(“[카카오] 카프카 Pub Sent message=[” + obj.toString() + “] with offset=[” + result.getRecordMetadata().offset() + “]”);
} else {
log.error(“[카카오] 카프카 Pub Unable to send message=[” + obj.toString() + “] due to : ” + ex.getMessage(), ex);
}
});
} 실행하면, 토픽이 3개 생성되어있다. (캠페인,광고그룹,광고) ✔ Consumer 1. 컨슈머 Config 설정 @EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(“${spring.kafka.bootstrap-servers}”)
private String bootstrapAddress;
@Value(“${spring.kafka.my.push.topic.name}”)
private String groupId;
@Bean
public ConsumerFactory
Map
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, “todo”);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory
ConcurrentKafkaListenerContainerFactory
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
2. Properties 설정(application.yml) server:
port: 8002
spring:
application:
name: backend-service
kafka:
bootstrap-servers: localhost:9092
my:
push:
topic:
name: campaign
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka 참고로 유레카서버를 이용하여 Gateway 설정을 하였다. 3. Kafka 리스너 설정 @Service
@RequiredArgsConstructor
@Slf4j
public class KafkaMessageListener {
private final KakaoGroupService kakaoGroupService;
@KafkaListener(topics = “${spring.kafka.my.push.topic.name}”, groupId = “todo”)
public void kakaoConsumer(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition){
log.info(“[카카오 컨슈머 message]: {} from partition: {}”, message, partition);
JSONObject param = ConvertUtils.strToJson(message);
if(!ObjectUtils.isEmpty(param)) {
kakaoGroupService.registAdGroup(param);
}
}
} 4. 광고그룹등록 서비스 등록 @Service
@RequiredArgsConstructor
@Slf4j
public class KakaoGroupService {
private final WebClientConfig webClientConfig;
/**
* 광고그룹등록
* @param obj
*/
public void registAdGroup(JSONObject obj) {
log.info(“[카카오 API] 광고그룹등록”);
JSONObject param = new JSONObject();
JSONObject campaignObj = new JSONObject();
JSONObject targetObj = new JSONObject();
JSONObject dateObj = new JSONObject();
String[] place = new String[]{“KAKAO_SERVICE”, “KAKAO_TALK”};
String[] deviceType = new String[]{“ANDROID”, “IOS”};
campaignObj.put(“id”, obj.getInt(“id”));
param.put(“campaign”,campaignObj);
param.put(“name”, “20231127_광고그룹”);
param.put(“placements”, place);
param.put(“allAvailableDeviceType”, false);
param.put(“allAvailablePlacement”, true);
param.put(“deviceTypes”, deviceType);
targetObj.put(“ageType”, “ALL”);
targetObj.put(“genderType”, “ALL”);
targetObj.put(“locationType”, “ALL”);
param.put(“targeting”, targetObj);
param.put(“adult”, false);
param.put(“dailyBudgetAmount”, 50000);
param.put(“bidStrategy”, “AUTOBID”);
param.put(“pricingType”, “CPC”);
param.put(“bidAmount”, 0);
param.put(“pacing”, “NONE”);
param.put(“pricingType”, “CPC”);
dateObj.put(“beginDate”, “2023-12-01”);
dateObj.put(“lateNight”, true);
dateObj.put(“detailTime”, false);
param.put(“schedule”, dateObj);
adGroupSubscribe(webClientConfig.createKakaoAdGroup(param));
}
/**
* WebClient Nonblocking
* @param monoResult
*/
public void adGroupSubscribe(Mono
monoResult.subscribe(
// 비동기 작업이 성공적으로 완료된 경우 실행되는 코드
result -> {
log.info(“[카카오 API] Result => {}”,result);
// 광고 카프카로 메시지전송
},
// 에러가 발생한 경우 실행되는 코드
error -> {
// WebClientResponseException 예외 처리(카카오API)
if (error instanceof WebClientResponseException) {
WebClientResponseException we = (WebClientResponseException) error;
log.error(“[카카오 API] WEBCLIENT EXCEPTION:”+we.getRequest().getURI()+”] : “+we.getStatusCode()+”/”+we.getResponseBodyAsString(Charset.forName(“UTF-8”)),we);
} else
// 다른 예외 처리
log.error(“[카카오 API] EXCEPTION:”+error.getMessage());
},
// 비동기 작업이 완료되었을 때 실행되는 코드
() -> {
log.info(“[카카오 API] Successful Completed”);
}
);
}
}
5. 테스트 Publisher 2023-11-27T16:19:19.812+09:00 INFO 6468 — [nio-8000-exec-6] c.m.k.c.CampaignProducerController : [카카오] 캠페인등록
2023-11-27T16:19:20.971+09:00 INFO 6468 — [tor-http-nio-10] com.msa.kafka.service.KakaoService : [카카오 API] Result => {“id”:,”name”:”20231127_디스플레이방문”,”campaignTypeGoal”:{“campaignType”:”DISPLAY”,”goal”:”VISITING”},”objective”:null,”dailyBudgetAmount”:null,”config”:”ON”,”statusDescription”:”운영중”,”trackId”:null,”adAccountId”:,”status”:[“READY”],”systemConfig”:”ON”,”isDailyBudgetAmountOver”:false}
2023-11-27T16:19:20.976+09:00 INFO 6468 — [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition campaign-0 to 0 since the associated topicId changed from null to kt18OAhrTXqCV5MBN7cixw
2023-11-27T16:19:20.979+09:00 INFO 6468 — [tor-http-nio-10] com.msa.kafka.service.KakaoService : [카카오 API] Successful Completed
2023-11-27T16:19:20.986+09:00 INFO 6468 — [ad | producer-1] com.msa.kafka.service.SendService : [카카오] 카프카 Pub Sent message=[{“id”:,”name”:”20231127_디스플레이방문”,”campaignTypeGoal”:{“campaignType”:”DISPLAY”,”goal”:”VISITING”},”objective”:null,”dailyBudgetAmount”:null,”config”:”ON”,”statusDescription”:”운영중”,”trackId”:null,”adAccountId”:,”status”:[“READY”],”systemConfig”:”ON”,”isDailyBudgetAmountOver”:false}] with offset=[16] 캠페인 ID 를 가지고 카프카로 전송 Consumer [카카오 컨슈머 message]: {“id”:,”name”:”20231127_디스플레이방문”,”campaignTypeGoal”:{“campaignType”:”DISPLAY”,”goal”:”VISITING”},”objective”:null,”dailyBudgetAmount”:null,”config”:”ON”,”statusDescription”:”운영중”,”trackId”:null,”adAccountId”:390257,”status”:[“READY”],”systemConfig”:”ON”,”isDailyBudgetAmountOver”:false} from partition: 0
2023-11-27T16:19:21.005+09:00 INFO 16299 — [ntainer#0-0-C-1] com.msa.todo.service.KakaoGroupService : [카카오 API] 광고그룹등록
2023-11-27T16:19:21.005+09:00 DEBUG 16299 — [ntainer#0-0-C-1] com.msa.todo.common.HttpWebClient : [KAKAO_Request_Param]:{“bidStrategy”:”AUTOBID”,”allAvailableDeviceType”:false,”dailyBudgetAmount”:50000,”allAvailablePlacement”:true,”bidAmount”:0,”placements”:[“KAKAO_SERVICE”,”KAKAO_TALK”],”schedule”:{“beginDate”:”2023-12-01″,”detailTime”:false,”lateNight”:true},”targeting”:{“locationType”:”ALL”,”ageType”:”ALL”,”genderType”:”ALL”},”deviceTypes”:[“ANDROID”,”IOS”],”pacing”:”NONE”,”name”:”20231127_광고그룹”,”campaign”:{“id”:},”adult”:false,”pricingType”:”CPC”}
2023-11-27T16:19:23.045+09:00 INFO 16299 — [ctor-http-nio-2] com.msa.todo.service.KakaoGroupService : [카카오 API] Successful Completed ID 를 받아 광고그룹을 등록한다. 참고 : https://backendcode.tistory.com/209 https://velog.io/@_koiil/Spring-Component%EC%99%80-Repository-Service-Controller https://victorydntmd.tistory.com/343 https://tech.kakaopay.com/post/bff_webflux_coroutine/#%EB%8B%A4%EC%96%91%ED%95%9C-%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-%EA%B5%AC%EB%8F%99%EB%90%9C%EB%8B%A4%EB%A9%B4-graphql-%EA%B3%A0%EB%A0%A4%ED%95%98%EA%B8%B0 https://americanopeople.tistory.com/417 https://developer-syubrofo.tistory.com/150 https://erjuer.tistory.com/89 https://velog.io/@18k7102dy/%EC%9C%84%EB%93%9C%EB%A7%88%EC%BC%93-%EA%B0%9C%EB%B0%9C%EA%B8%B0-%EB%A6%AC%EB%B7%B0-%EC%9E%91%EC%84%B1%EC%9D%84-%EC%B2%98%EB%A6%AC%ED%95%98%EB%8A%94%EA%B2%8C-%EC%9D%B4%EB%A0%87%EA%B2%8C-%EC%96%B4%EB%A0%B5%EB%8B%A4%EA%B3%A0