Topic
- Topic은 카프카에서 메시지를 분류하는 논리적 단위이다
- Producer와 Consumer는 Topic을 기준으로 메시지를 전송하거나 메시지를 읽는다
- 토픽은 여러 파티션에 걸쳐서 나누어 분산 저장 된다
- 파티션들의 집합
Partition
- Kafka에서 실제로 메시지를 저장 하는 공간으로 브로커 내에 존재
- 각 파티션 내에서는 데이터의 순서가 보장된다
- 파티션 수를 늘리면 처리량을 늘릴 수 있다(스케일 아웃)
Producer
Producer는 Kafka 클러스터에 데이터를 보내는 클라이언트로 메시지를 발행하는 역할을 한다
- 메시지 발행
- 파티션 선택(키를 통해 특정 파티션으로 메시지를 발행할 수 있다, 키 없으면 RoundRobin)
- 데이터 직렬화
- 전송 확인 및 재시도 acks
- 0 : 전송 확인 안함(빠름)
- 1 : 리더 브로커 저장 확인 후 응답
- all or -1 : ISR(In-Sync Replicas) 에도 모두 저장이 된 후 응답(상대적으로 느림, 안전성 높음)
Consumer
Producer에 의해 발행된 메시지를 구독하며 메시지를 가져와 처리하는 클라이언트로 메시지를 소비 하는 역할을 한다
- Topic 구독 및 메시지 처리
- 하나 이상의 Topic을 구독하여 Topic에 저장된 메시지를 읽고 처리한다
- 오프셋 관리
- 자신이 읽은 메시지의 위치를 관리하며 이를 통해 재시작 시 해당 위치부터 이어서 읽어 처리할 수 있다
Consumer Group
Consumer Group은 하나 이상의 Consumer 들이 모여 Topic의 메시지를 읽고 처리하는 논리적 집합 이다
- Consumer는 Consumer Group에 속해야 한다(단독도 가능하지만 확장성이 떨어진다)
- Consumer Group을 통해 확장성을 확보할 수 있다(Scale Out)
- Consumer Group 내 특정 파티션이 문제가 생겼을 때 다른 Consumer가 이를 받아 처리함으로 서 내결함성을 가진다
★ 중요
Consumer Group B (파티션의 수가 Consumer의 수 보다 많은 경우)
- 하나의 Topic 내에 리더 파티션이 3개가 있는 Topic을 Consumer를 2개 가지고 있는 Consumer Group이 구독 하고 있는 상태 이다
- Consumer Group 내 Consumer의 수 가 파티션 수 보다 적으면 하나의 Consumer가 다 수의 파티션의 이 할당되게 된다
- 이론적으로 Consumer에 할당될 수 있는 파티션의 수에는 제약이 없으나 Consumer의 리소스적 한계가 있기 때문에 Consumer의 수를 적절하게 조절해야 한다
Consumer Group C (파티션의 수 보다 Consumer의 수가 더 많은 경우)
- 하나의 Topic 내에 리더 파티션이 3개가 있는 Topic을 Consumer를 4개 가지고 있는 Consumer Group이 구독하고 있는 상태 이다
- Consumer Group 내 Consumer의 수 가 파티션의 수 보다 많은 경우 파티션의 개수 만큼만 Consumer에 파티션이 할당되고 남은 Consumer는 대기 상태로 파티션이 할당되지 않고 메시지 소비도 일어나지 않는다
- 할당 될 수 있는 Consumer의 수는 최대 파티션의 수 만큼만 할당 가능하다
Spring 적용 예제
주문, 결제의 경우 비동기 처리를 하는 경우가 있는데,
이런 경우에 카프카를 사용하는 경우가 많다
그러한 상황에 대한 간단한 예제로, 주문 발생 시 Kafka로 결제 event를 발행하고 결제 서비스가 이를 consume함으로써 결제가 이루어지는 경우에 대한 예제에 대한 일부 코드를 구성한 것이다
주문
YML
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Controller
@RestController
@RequestMapping("/orders")
class OrderController(private val orderProducer: OrderProducer) {
@PostMapping
fun createOrder(@RequestBody order: OrderCreatedEvent): ResponseEntity<String> {
orderProducer.sendOrder(order)
return ResponseEntity.ok("${order.orderId} 주문 완료!")
}
}
Producer
@Service
class OrderProducer(private val kafkaTemplate: KafkaTemplate<String, OrderCreatedEvent>) {
fun sendOrder(order: OrderCreatedEvent) {
kafkaTemplate.send("order-created", order.orderId, order)
}
}
결제
Config
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS()
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.enableSimpleBroker("/topic")
}
}
YML
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: payment-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
auto-offset-reset: latest
Service
@Service
class PaymentProcessor(
private val kafkaTemplate: KafkaTemplate<String, PaymentCompletedEvent>,
private val messagingTemplate: SimpMessagingTemplate
) {
@KafkaListener(topics = ["order-created"], groupId = "payment-service")
fun handleOrderEvent(event: OrderCreatedEvent) {
// business logic
// PG 결제 등등
val completed = PaymentCompletedEvent(
orderId = event.orderId,
success = true,
transactionId = UUID.randomUUID().toString()
)
messagingTemplate.convertAndSend("/topic/payment/\\${completed.orderId}", completed)
}
}