카프카는 메시지 브로커의 일종으로 Producer, Topic, Consumer, Broker 로 구성된다. 

Producer를 이용하여 이벤트를 브로커의 토픽으로 발행하고 토픽을 구독하고 있는 Consumer 가 이벤트를 처리하는 구조를 갖는다. 

 

Producer

특정 토픽으로 이벤트를 발행하는 역할

여러개의 프로듀서가 동시에 한토픽으로 이벤트를 발행할 수 있다. 

Serializer를 이용하여 JSON, Avro, Protobuf 등 다양한 이벤트를 발행할 수 있다. 

필수 설정

  • bootstrap.servers : 브로커 설정
  • key.serializer 
  • value.serializer

메커니즘

1. ProducerRecord 생성

  • Key 지정 : 키를 해싱하여 파티션을 정하는데 사용됨
  • Partition 지정 : 특정 파티션을 지정하여 전송
  • Vlaue

2. RecordBatch

  • 전송할 레코드는 파티션별로 RecordBatch 에 모이고 별도의 스레드가 전송

3. ProducerRecord 전송

  • 파티션이 지정되지 않았다면 파티셔너로 전송되고 파티셔너가 파티션을 지정함

4. 브로커 응답

  • 성공시 토픽, 파티션, 오프셋을 담은 RecordMetadata 객체를 리턴

전송방법

  • Fire and forget : 전송 후 성공/실패 여부를 신경쓰지 않음, 재시도 할 수 없는 경우 유실가능, 
  • Synchronous : Future 를 이용하여 처리
  • Asynchronous : 콜백 함수 이용

acks

  • 얼마나 많은 파티션 레플리카가 해당  레코드를 받아야 하는지 결정
  • acks=0 
  • acks=1 : 리더 레플리카가 메시지를 받는 순간 응답
  • acks=all : 모든 in-sync 레플리카에 전달된 후 응답

순서보장

  • 파티션내에서 순서는 보장됨
  • 설정에 따라 순서가 변경될 수 있음 

Broker

이벤트, Topic, Partition 등을 저장하고 관리하기 위한 서버

여러개의 브로커에 파티션을 복제하여 고가용성을 확보할 수 있다

 

Topic

이벤트를 관리하기 위한 단위 

여러개의 파티션으로 구성될 수 있다

 

Partition

이벤트를 저장하는 물리적인 단위이며, 파티션당 한개의 컨슈머를 갖을 수 있다. 

여러개의 파티션을 이용하여 멀티 컨슈머 처리가 가능하다. 

각각의 이벤트는 offset으로 관리된다. 

이벤트는 저장소에 리텐션주기에 따라 저장되어 보관된다. 

Offset을 이용하여 이전 데이터의 재처리가 가능해진다.

 

Offset

프로듀서가 발행한 이벤트는 한개의 오프셋을 갖는다. 

이벤트가 증가함에 따라 오프셋도 증가한다. 

컨슈머는 오프셋 단위로 이벤트를 처리한다. 

 

Consumer

오프셋별로 토픽에서 이벤트를 가져와 처리하고 오프셋을 커밋한다. 

이벤트의 형식에 따라 Deserialize 한다. 

일반적으로 컨슈머는 컨슈머 그룹에 속함

 

컨슈머그룹

  • 파티션수 > 컨슈머수 : 여러 파티션을 하나의 컨슈머에서 처리하게 됨
  • 파티션수 = 컨슈머수 : 각각의 파티션당 하나의 컨슈머가 처리
  • 파티션수 < 컨슈머수 : 유휴 컨슈머가 있음

 

리밸런스

  • 컨슈머에 파티션을 재할당 하는 작업
  • eager rebalance : 모든 컨슈머가 작업을 멈추고 파티션을 재 할당 받음
  • cooperative rebalance : 점진적인 재할당

메커니즘

1. subscribe topic

2. 폴링 루프

3. poll

4. deserialize

5. offset commit

 

파티션 할당 전략(PartitionAssigner)

  • Range
  • RoundRobin
  • Sticky
  • Cooperative sticky

오프셋 커밋

  • 중복처리 : 1 ~ 10까지 오프셋을 처리하는 중 10번까지 처리 되었으나 5번 오프셋까지만 커밋된 상태에서 리밸런싱이 일어나는 경우 6 ~ 10은 중복 처리됨
  • 유실 :  1 ~ 10까지 오프셋을 처리하는 중 10번 오프셋이 커밋된 상태에서 리밸런싱이 일어나는 경우 
  • 자동커밋
    • enable.auto.commit=true
    • auto.commit.interval.ms=5000(default)
  • 동기 커밋 : commitSync 사용, 브로커로부터 응답이 올때까지 블록됨, 실패시 재시도.
  • 비동기 커밋 : commitAsync 사용, 응답을 기다리지 않고 다음 커밋을 바로 처리함, 재시도 하지 않음. 커밋 순서가 꼬일 수 있음
  • 현재 오프셋 커밋 : 현재 오프셋 + 1 의 오프셋이 커밋됨

스탠드얼론 컨슈머

컨슈머 그룹에 속하지 않고 특정 토픽을 컨슘처리

1. partitionsFor(topic) : 토픽의 파티션 정보 획득

2. assign(partitions) : 파티션 할당

3. poll

4. commit

 

콘트롤러

KRaft 

복제

리더레플리카

팔로워레플리카

선호리더

in sync replica

out of sync replica

 

계층화된저장소

  • 각 계층별 리텐션 주기설정
  • 로컬계층과 원격계층 독립적인 읽기 가능. 원격계층의 메시지를 캐싱하거나 카피하지 않고 바로 네트워크 계층 전송가능
  • 로컬계층 : 브로커 로컬 디스크
  • 원격계층 : HDFS / S3 등 원격저장소

파티션할당

  • 브로커 간에 고르게 분산
  • 서로다른 브로커에 배치
  • 랙 구분
  • 라운드 로빈

파일관리

  • 세그먼트 단위로 관리. 파티션을 여러개의 세그먼트로 분리
  • 액티브 세그먼트 : 현재 쓰여지고 있는 세그먼트, 삭제 안됨

파일형식

  • 세그먼트는 하나의 파일로 저장
  • 프로듀서 - 브로커 - 컨슈머 까지 전달되는 동일한 형태로 저장(제로카피 최적화)
  • 버전별 형식으로 변환하여 컨슈머 전달 (FetchMessageConversionsPerSec, MessageConversionsTimeMs 지표 확인, 클라이언트 업데이트)

인덱스

  • 오프셋 세그먼트 인덱스 : 오프셋 - 세그먼트 파일 - 파일내 위치
  • 타임스탬프 오프셋 인덱스 : 카프카 스트림즈에서 자주 사용

보존정책

  • delete : 리텐션기간이 지난 데이터 삭제
  • compact : 가장 최근 상태만 보관
    • clean
    • dirty
  • delete,compact : 보존기간이 지난 compact 메시지도 삭제

대상

  • spring boot application
  • mysql (docker)

모니터링 구성

  • prometheus
  • grafana
  • mysql_exporter

prometheus docker 설치

  • prometheus.yml boot application 설정
global:
  scrape_interval: 15s
scrape_configs:
  - job_name: boot
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['ip:port']

ip : docker 내부에서 로컬 application 으로 접속 해야 하므로 localhost로는 접속이 안됨. ip 로 설정

port : application port

 

  • docker run
docker run -d -p 9090:9090 -v /파일경로/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus

 

  • prometheus 콘솔 접속
    • http://localhost:9090/

grafana docker 설치

docker run --name grafana -d -p 80:3000 grafana/grafana
  • 접속 : http://localhost/
  • prometheus 데이터 소스 설정
    • http://localhost/connections/datasources 접속 하여 add new data source 버튼 클릭
    • prometheus 선택 
    • Name 입력
    • Prometheus Server URL 입력 : http://host.docker.internal:9090
      • localhost 로 설정 시 아래와 같은 에러로 접속 테스트 실패 
      • Post "http://localhost:9090/api/v1/query": dial tcp 127.0.0.1:9090: connect: connection refused - There was an error returned querying the Prometheus API.
    • 화면 끝에 Save & Test 버튼 클릭
  • 대시 보드 구성

mysql_exporter docker 설치

  • mysql_exporter 용 mysql 사용자(test_exp) 생성
CREATE USER 'test_exp'@'localhost' IDENTIFIED BY 'test_exp' WITH MAX_USER_CONNECTIONS 3;
GRANT PROCESS, REPLICATION CLIENT, SELECT ON *.* TO 'test_exp'@'localhost';
flush privileges;
  • /etc/mysql/my.cnf 파일 생성 (원하는 위치에 생성)
[client]
host=ip
port=3306
socket=/home/mysql.sock
user=test_exp
password=test_exp

 

  • docker run
    • mysql 이 docker로 떠 있는 상태에서 mysql.sock을 이용해 mysql_exporter가 mysql에 접속 해야 함
    • 따라서 mysql docker의 mysql.sock 을 mysql_exporter가 참조 하도록 해야 함
    • mysql docker의 mysql.sock은 /var/lib/mysql/mysql.sock 에 있지만, 심볼릭 링크로 걸려 있음 
    • 링크 : /var/lib/mysql/mysql.sock -> /var/run/mysqld/mysqld.sock
    • 링크 : /var/run -> /run
    • 결국 이 위치 : /run/mysqld/mysqld.sock
    • 이 파일을 mysql_exporter와 공유해야 함 
    • mysql docker 실행 시 -v /mornitoring/mysql_exporter:/run/mysqld 옵션을 주어 로컬의 /mornitoring/mysql_exporter 디렉토리로 공유하고
    • 이 디렉토리의 mysqld.sock 을 mysql_exporter와 공유하도록 함
docker run -itd -p 9104:9104 -v /etc/mysql/my.cnf:/home/.my.cnf -v /mornitoring/mysql_exporter/mysqld.sock:/home/mysql.sock --name mysql_exporter prom/mysqld-exporter --config.my-cnf=/home/.my.cnf
  • prometheus.yml mysql_exporter 설정 추가 후 prometheus 재시작
global:
  scrape_interval: 15s
scrape_configs:
  - job_name: mysql_exporter
    static_configs:
      - targets: ['ip:9104']
  • grafana 대시보드 구성

Prometheus 확인

  • http://localhost:9090/targets?search= 접속

 

 

 

 

+ Recent posts