파티션과 컨슈머

  • 컨슈머는 컨슈머그룹의 일부로서 작동
  • 하나의 파티션에는 하나의 컨슈머가 할당됨
  • 파티션수보다 컨슈머수가 더 많다면 유휴 컨슈머가 생김
  • 파티션수보다 컨슈머수가 적다면 하나의 컨슈머가 여러파티션을 컨슘하여 처리함
  • 애플리케이션에서 모든 토픽의 데이터를 컨슘하기 위해 애플리케이션 별로 컨슈머 그룹을 생성해야 함

파티션 리밸런스

  • 컨슈머가 추가되거나, 크래시되었거나 파티션이 추가되는 등의 변경이 발생하면 컨슈머를 파티션에 재할당하는 작업이 실행되고 이 과정을 리밴런스라 함
  • Eager rebalance(조급한 리밸런스)
    • 모든 컨슈머가 읽기 작업을 멈추고 모든 파티션에 대한 소유권을 포기한 후 컨슈머 모두가 그룹에 다시 참여하여야 새로운 파티션을 할당받도록 하는 방법
    • Stop the world
  • Cooperative rebalance(협력적 리밸런스) / Incremental rebalance(점진적 리밸런스)
    • 컨슈머 리더가 일부 파티션 재할당을 통지하면, 해당 컨슈머들은 작업을 중단하고 소유권을 포기한 후 이 컨슈머들을 새로운 파티션에 재할당 하는 방법
    • 안정적으로 파티션이 할당될때까지 반복될 수 있음
    • Stop the world는 발생하지 않음
  • 2.4 이후 Eager rebalance가 default 였으나 3.1 부터 Cooperative rebalance가 default가 되고 Eager rebalance는 deprecated 될 예정
  • session.timeout.ms에 설정된 값(default - 2.8 : 10초, 3.0:45초)동안 컨슈머가 heartbeat를 보내지 않으면 해당 컨슈머가 죽은것으로 판단하여 리밸런스 발생
  • max.poll.interval.ms(default 5분) 에 지정된 시간동안 컨슈머가 폴링하지 않으면 해당 컨슈머는 죽은것으로 판단
    • heartbeat는 백그라운드 스레드에서 전송하므로 메인스레드가 블록 되더라도 hearbeat는 전송가능 함
    • 이러한 경우를 판단하기 위해 max.poll.interval.ms로 컨슈머가 정상적으로 컨슘하는지 확인

폴링루프

  • while(true)를 이용한 무한 루프
  • 루프내에서 컨슈머의 poll(Duration) 메소드를 호출하여 메시지를 가져오고, 처음으로 호출되는 경우 GroupCoordinator를 찾아서 컨슈머 그룹에 참가하고 파티션을 할당 받음
  • max.poll.interval.ms에 지정된 시간 이상으로 호출되지 않는 경우 컨슈머는 죽은 것으로 판정되므로 블록되지 않도록 주의 해야함

파티션 할당 전략

  • partition.assignment.strategy 설정
  • Range(default)
    • RangeAssignor
    • 컨슈머가 구독하는 각 토픽의 파티션들을 연속된 그룹으로 나눠서 할당
    • 홀수개의 파티션을 갖는 경우 앞의 컨슈머는 뒤의 컨슈머보다 많은 파티션을 하당받게 됨
  • RoundRobin
    • RoundRobinAssignor
    • 모든 구독된 토픽의 모든 파티션을 가져다 순차적으로 하나씩 컨슈머에 할당
    • 모든 컨슈머들이 동일한 수의 파티션 할당(많아야 1개차이)
  • Sticky
    • StickyAssignor
    • 파티션들을 가능한 균등하게 할당(RoundRobin과 유사)
    • 리밸런스가 발생했을때 가능하면 많은 파티션들이 같은 컨슈머에 할당
    • 컨슈머가 변경되는 오버헤드를 최소화
  • Cooperative Sticky
    • CooperativeStickyAssignor
    • Sticky 와 동일
    • 컨슈머가 재할당되지 않은 파티션으로부터 레코드를 계속해서 읽어올 수 있도록 해주는 협력적 리밸런스기능 지원

오프셋 커밋

  • 파티션에서의 현재위치를 업데이트 하는 작업
  • poll이 리턴한 마지막 오프셋 바로 다음 오프셋을 커밋함(수동으로 오프셋을 다룰 경우 유의)
  • 레코드를 개별적으로 커밋하지 않고 파티션에서 성공적으로 처리해 낸 마지막 메시지를 커밋 함
  • 커밋된 오프셋이 클라이언트가 마지막으로 처리한 오프셋보다 작을 경우 중복 처리 됨
  • 커밋된 오프셋이 클라이언트가 마지막으로 처리한 오프셋보다 클 경우 메시지 누락처리됨
  • 자동 커밋
    • enable.auto.commit = true 로 설정
    • 컨슈머가 대신 커밋
    • auto.commit.intervel.ms(default 5초)에 한번 poll을 통해 받은 메시지 중 마지막 메시지 오프셋 커밋
    • 커밋 인터벌 내에서 리밸런싱이 발생한다면 해당 인터벌내의 메시지는 중복 처리됨
  • 현재 오프셋 커밋
    • enable.auto.commit = false 로 설정
  • commitSync
    • 가장 간단하고 신뢰성있는 commit API
    • poll에 의해 리턴된 마지막 오프셋 커밋
    • poll에서 리턴된 모든 메시지를 처리하기 전 commit 할 경우 메시지가 누락될 수 있음
    • 브로커가 커밋 요청에 응답할 때 까지 블록됨
    • 성공하거나 재시도 불가능한 실패가 발생할때까지 재시도
  • commitAsync
    • 재시도 하지 않음
    • 콜백 처리(OffsetCommitCallback)
    • 콜백에서 재시도 처리시 커밋의 순서를 유의 해야 함
      • Apache Kafka에서 컨슈머의 `commitAsync` 메소드는 비동기적으로 오프셋을 커밋합니다. 이 방법은 `commitSync`에 비해 성능상 이점이 있지만, 비동기적 특성으로 인해 오프셋 커밋 순서가 보장되지 않는 문제가 있습니다. 즉, 늦게 시작된 커밋이 먼저 완료될 수 있으며, 이로 인해 컨슈머 재시작 시 메시지 중복 처리나 누락의 가능성이 존재합니다.

        ### 커밋 순서 강제 방법

        비동기 커밋 시 순서를 강제하는 명확한 방법은 Kafka 자체적으로 제공하지 않지만, 다음과 같은 방법으로 순서 문제를 완화할 수 있습니다:

        1. **단일 콜백 사용**: 비동기 커밋을 할 때마다 새로운 콜백을 생성하는 대신, 단일 콜백 인스턴스를 재사용하면서 내부적으로 최신 오프셋을 추적하는 방법입니다. 콜백 내에서는 커밋된 오프셋이 현재 알고 있는 최신 오프셋보다 이전인지 확인하고, 이전이면 무시합니다. 이 방법은 순서를 완벽하게 보장하지는 않지만, 순서 문제로 인한 영향을 최소화할 수 있습니다.

        2. **커밋 직렬화**: 오프셋 커밋 요청을 내부적으로 관리하는 큐를 구현하여, 이전 커밋 요청의 완료를 확인한 후에 다음 커밋을 실행하는 방법입니다. 이 방법은 성능에 영향을 줄 수 있지만, 커밋 순서를 엄격하게 관리할 수 있습니다.

        3. **비동기 커밋 후 동기 커밋으로 마무리**: 정기적으로 또는 특정 조건(예: 애플리케이션 종료 시)에서 `commitSync`를 호출하여 오프셋 커밋을 확실하게 완료하는 방법입니다. 이는 비동기 커밋 과정에서 발생할 수 있는 순서 문제를 마지막에 동기 커밋으로 보정합니다.

        4. **외부 저장소 사용**: 컨슈머가 처리한 오프셋을 외부 시스템(예: 데이터베이스)에 저장하고, 컨슈머 재시작 시 외부 시스템의 오프셋 정보를 기반으로 메시지 처리 위치를 결정하는 방법입니다. 이 방법은 추가적인 외부 시스템 의존성이 필요하지만, 컨슈머의 상태 관리를 더 세밀하게 할 수 있습니다.

        비동기 오프셋 커밋을 사용할 때는 커밋 순서가 보장되지 않는 점을 고려하여 애플리케이션의 로직을 설계해야 하며, 중복 처리나 메시지 누락을 방지하기 위한 추가적인 조치가 필요할 수 있습니다.
  • commitSync, commitAsync 동시 사용
    • 정상적인 상황에서는 commitAsync를 사용함
    • 컨슈머를 닫는 상황에서는 commitSync를 사용하여 재시도하도록 함
  • 특정 오프셋 커밋
    • 배치 처리 중 오프셋을 커밋하고 싶은 경우
    • Map<TopicPartition, OffsetAndMetadata> 을 commitSync/commitAsync에 파라미터로 넘겨 처리
    • 에러 처리
  • 컨슈머가 닫힐때 오프셋 커밋
    • ConsumerRebalanceListener 인터페이스를 구현하여 onPartitionsRevoked 메소드에 커밋 로직 구현
      • Eager rebalance 인경우 컨슈머가 읽기를 멈추고 리밸런스가 시작되기 전 호출
      • Cooperative rebalance 인 경우 리밸런스가 완료될때 할당해제해야할 파티션들에 대해서 호출

Standalone consumer

  • 컨슈머가 스스로 특정 토픽의 모든 메시지를 처리해야 하는 경우
  • 토픽을 구독할 필요가 없음
  • 리밸런스 필요없음
  • List<PartitionInfo> partitions = consumer.partitionsFor("Topic") 을 호출하여 파티션 정보를 조회
  • consumer.assign(partitions)
  • 루프를 돌면서 consumer.poll 호출하여 레코드를 가져와서 처리
  • offset commit
  • 리밸런싱을 사용하지 않으므로 파티션이 추가되는 경우를 자동으로 감지할 수 없으므로 consumer.partitionsFor를 주기적으로 호출하여 감지 하거나 애플리케이션을 재시작 해야할 수 있음

읽기 지연

  • 리더 레플리카는 팔로워 레플리카가 복제한 오프셋을 관리 함
  • 컨슈머는 모든 in sync 레플리카에 복제된 메시지만 읽을 수 있음
  • 복제가 지연되는 경우 컨슈머가 읽는 메시지도 지연될 수 있음
  • replica.lag.time.max.ms 에 정의된 시간 만큼 복제지연이 발 생할 수 있고 이 시간을 지나면 해당 레플리카는 out of sync 레플리카가 됨

 

전송전략

  • fire and forget
    • 메시지를 서버에 전송만하고 성공/실패 여부는 신경 쓰지 않음
    • 메시지 유실 가능성
      • 메시지 직렬화 실패
      • 버퍼가 가득찰 경우 타임아웃 발생
      • 전송 스레드에 인터럽트가 걸리는 경우
      • 애플리케이션은 예외에 대한 아무런 정보를 받지 않음. 
    • 거의 사용하지 않음
  • Synchronous send
    • 카프카는 기본적으로 비동기적으로 동작함
    • 다음 메시지를 전송하기 전에 send가 반환하는 Future 객체의 get 메소드를 이용하여 성공여부 확인
    • 전송지연으로 인한 스레드 대기시 성능 문제 발생가능
    • 브로커가 에러응답 반환
    • 재전송 횟수 소진
    • 거의 사용하지 않음. 예제는 많음
  • Asynchronous send
    • 콜백함수 이용(Callback 인터페이스 구현)
    • onCompletion(RecordMetadata rm, Exception e)
      • e가 null이 아닌경우 에러발생
    • 프로듀서의 메인 스레드에서 콜백이 실행됨
    • 동일한 스레드로 보낸 메시지의 콜백은 보낸 순서대로 실행됨
    • 콜백메소드에서는 블로킹 작업 수행은 권장하지 않음

응답 처리 전략 (acks)

임의의 쓰기 작업에 대한 성공 여부를 판단하기 위해 얼마나 많은 파티션 레플리카카 해당 레코드를 받아야 하는지 설정

  • acks = 0
    • 성공적으로 전달 되었다 간주하고 브로커의 응답을 기다리지 않음
    • 메시지 유실될 수 있음
    • 매우 높은 처리량이 필요할 때 사용
  • acks = 1
    • 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공 응답을 받는다
    • 리더에 크래시가 나고 아직 새로운 리더가 선출되지 않은 경우 에러응답을 받고 재시도
    • 리더에 크래시가 난 상태에서 복제가 안된상태로 새 리더가 선출 될경우 메시지 유실
  • acks = all
    • 모든 in sync 레플리카에 전달되에 브로커가 성공응답
    • 가장 안전
    • 모든 레플리카에 전달되어야 하기 때문에 지연시간은 더 길어질 수 있음

에러처리

  • 재시도 가능한 에러 처리
    • retries : Integer.MAX 
    • delivery.timeout.ms 
    • delivery.timeout.ms 시간내에서 무한히 재시도
    • Network Errors: 네트워크 문제로 인해 발생하는 에러입니다. 예를 들어, Kafka 브로커와의 연결이 일시적으로 끊어진 경우 등이 있습니다.
    • Leader Election: Kafka는 파티션의 리더(replica)에게만 데이터를 쓸 수 있습니다. 리더가 다운되어 새로운 리더가 선출되는 동안 발생하는 에러도 재시도가 가능합니다.
    • Not Enough Replicas: 아직 메시지를 받을 준비가 되지 않은 복제본(replicas) 때문에 발생하는 에러입니다. 이는 복제본이 충분한 데이터를 동기화하지 못한 경우에 발생할 수 있습니다.
    • Not Leader for Partition: 메시지를 쓰려는 파티션의 리더가 변경되었을 때 발생하는 에러입니다. 새 리더로의 재할당 후에 재시도가 가능합니다.
    • Record Too Large: 프로듀서가 보내려는 메시지의 크기가 너무 커서 브로커가 처리할 수 없는 경우 발생합니다. 이는 설정을 조정하여 해결할 수 있지만, 재시도 전에 메시지 크기를 줄여야 할 수도 있습니다.
    • Timeout Errors: 요청에 대한 응답을 Kafka 브로커로부터 지정된 시간 내에 받지 못할 때 발생합니다.
  • 재시도 불가능한 에러 처리
  • Callback 이용

At least once 보장

  • 프로듀서 Retry + Ack 전략
    • acks = all 이고 delivery.timeout.ms 가 충분히 크게 잡혀 있는 경우 메시지를 모든 레플리카에 복제된 상황에서 리더 레플리카가 크래시 나는 경우 프로듀서는 request.timeout.ms 만큼 기다리고 재시도를 하게 되며, 새로 선출된 리더 레플리카에는 이미 메시지가 복제된 상태이므로 중복 저장되지만 at least once는 보장됨
    • retries, retry.backoff.ms 설정
  • 컨슈머 offset 관리
    • 컨슈머는 메시지를 처리하면 offset을 커밋하게 되고 재시도시에는 커밋한 offset 이후 부터 처리 

순서보장

  • 파티션 내에서 메시지의 순서를 보존
  • retries > 0 , max.in.flight.requests.per.connection >= 1 인 경우 순서가 뒤집어질 수 있다.
  • 성능및 신뢰성을 보장하기 위해 retries > 0, in.flight >= 2 이어야 하므로 enable.idempotence=true할 경우 in.flight 최대 5까지 요청을 허용하며, 순서도 보장되고 재전송시에도 중복이 발생하지 않도록 해줌

메시지 사이즈 고려

  • 프로듀서가 전송하는 메시지의 최대 크기를 설정하는 max.request.size와 브로커가 받아 들일수 있는 메시지의 사이즈를 결정하는 message.max.bytes 설정을 동일하게 맞춰야 함

시리얼라이저

  • 커스텀 시리얼라이저를 구현할 수 있으나, 하위호환성 유지, 직렬화/비직렬화 로직 디버깅, 여러팀에서 같이 사용하는 경우 동시 코드 변경 등의 문제가 발생할 수 있음
  • JSON, 에이브로, 스리프트, 프로토버프와 같은 범용 라이브러리 사용 권장

파티션 할당

  • 키값에 따라 항상 동일한 파티션에 할당되어야 하는 경우 토픽 생성시 파티션을 충분히 크게하고 파티션을 추가하지 않는다. 
  • 파티션 수 변경 시 할당되는 파티션도 달라 질 수 있음

인터셉터

  • 애플리케이션 공통로직을 처리해야 하는 경우 사용
  • 모니터링, 정보추적, 표준 헤더 삽입등
  • ProducerInterceptor
    • ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
      • 레코드를 브로커로 보내기전, 직렬화되기 직전 호출
      • ProducerRecord 조회 및 수정 가능
      • 유효한 ProducerRecord를 반환하는 것에 주의
    • void onAcknowledgement(RecordMetadata metadata, Exception e)
      • 브로커가 보낸응답을 클라이언트가 받았을때 호출
      • 응답을 변경할 수는 없음. 조회는 가능

 

+ Recent posts