파티션과 컨슈머

  • 컨슈머는 컨슈머그룹의 일부로서 작동
  • 하나의 파티션에는 하나의 컨슈머가 할당됨
  • 파티션수보다 컨슈머수가 더 많다면 유휴 컨슈머가 생김
  • 파티션수보다 컨슈머수가 적다면 하나의 컨슈머가 여러파티션을 컨슘하여 처리함
  • 애플리케이션에서 모든 토픽의 데이터를 컨슘하기 위해 애플리케이션 별로 컨슈머 그룹을 생성해야 함

파티션 리밸런스

  • 컨슈머가 추가되거나, 크래시되었거나 파티션이 추가되는 등의 변경이 발생하면 컨슈머를 파티션에 재할당하는 작업이 실행되고 이 과정을 리밴런스라 함
  • Eager rebalance(조급한 리밸런스)
    • 모든 컨슈머가 읽기 작업을 멈추고 모든 파티션에 대한 소유권을 포기한 후 컨슈머 모두가 그룹에 다시 참여하여야 새로운 파티션을 할당받도록 하는 방법
    • Stop the world
  • Cooperative rebalance(협력적 리밸런스) / Incremental rebalance(점진적 리밸런스)
    • 컨슈머 리더가 일부 파티션 재할당을 통지하면, 해당 컨슈머들은 작업을 중단하고 소유권을 포기한 후 이 컨슈머들을 새로운 파티션에 재할당 하는 방법
    • 안정적으로 파티션이 할당될때까지 반복될 수 있음
    • Stop the world는 발생하지 않음
  • 2.4 이후 Eager rebalance가 default 였으나 3.1 부터 Cooperative rebalance가 default가 되고 Eager rebalance는 deprecated 될 예정
  • session.timeout.ms에 설정된 값(default - 2.8 : 10초, 3.0:45초)동안 컨슈머가 heartbeat를 보내지 않으면 해당 컨슈머가 죽은것으로 판단하여 리밸런스 발생
  • max.poll.interval.ms(default 5분) 에 지정된 시간동안 컨슈머가 폴링하지 않으면 해당 컨슈머는 죽은것으로 판단
    • heartbeat는 백그라운드 스레드에서 전송하므로 메인스레드가 블록 되더라도 hearbeat는 전송가능 함
    • 이러한 경우를 판단하기 위해 max.poll.interval.ms로 컨슈머가 정상적으로 컨슘하는지 확인

폴링루프

  • while(true)를 이용한 무한 루프
  • 루프내에서 컨슈머의 poll(Duration) 메소드를 호출하여 메시지를 가져오고, 처음으로 호출되는 경우 GroupCoordinator를 찾아서 컨슈머 그룹에 참가하고 파티션을 할당 받음
  • max.poll.interval.ms에 지정된 시간 이상으로 호출되지 않는 경우 컨슈머는 죽은 것으로 판정되므로 블록되지 않도록 주의 해야함

파티션 할당 전략

  • partition.assignment.strategy 설정
  • Range(default)
    • RangeAssignor
    • 컨슈머가 구독하는 각 토픽의 파티션들을 연속된 그룹으로 나눠서 할당
    • 홀수개의 파티션을 갖는 경우 앞의 컨슈머는 뒤의 컨슈머보다 많은 파티션을 하당받게 됨
  • RoundRobin
    • RoundRobinAssignor
    • 모든 구독된 토픽의 모든 파티션을 가져다 순차적으로 하나씩 컨슈머에 할당
    • 모든 컨슈머들이 동일한 수의 파티션 할당(많아야 1개차이)
  • Sticky
    • StickyAssignor
    • 파티션들을 가능한 균등하게 할당(RoundRobin과 유사)
    • 리밸런스가 발생했을때 가능하면 많은 파티션들이 같은 컨슈머에 할당
    • 컨슈머가 변경되는 오버헤드를 최소화
  • Cooperative Sticky
    • CooperativeStickyAssignor
    • Sticky 와 동일
    • 컨슈머가 재할당되지 않은 파티션으로부터 레코드를 계속해서 읽어올 수 있도록 해주는 협력적 리밸런스기능 지원

오프셋 커밋

  • 파티션에서의 현재위치를 업데이트 하는 작업
  • poll이 리턴한 마지막 오프셋 바로 다음 오프셋을 커밋함(수동으로 오프셋을 다룰 경우 유의)
  • 레코드를 개별적으로 커밋하지 않고 파티션에서 성공적으로 처리해 낸 마지막 메시지를 커밋 함
  • 커밋된 오프셋이 클라이언트가 마지막으로 처리한 오프셋보다 작을 경우 중복 처리 됨
  • 커밋된 오프셋이 클라이언트가 마지막으로 처리한 오프셋보다 클 경우 메시지 누락처리됨
  • 자동 커밋
    • enable.auto.commit = true 로 설정
    • 컨슈머가 대신 커밋
    • auto.commit.intervel.ms(default 5초)에 한번 poll을 통해 받은 메시지 중 마지막 메시지 오프셋 커밋
    • 커밋 인터벌 내에서 리밸런싱이 발생한다면 해당 인터벌내의 메시지는 중복 처리됨
  • 현재 오프셋 커밋
    • enable.auto.commit = false 로 설정
  • commitSync
    • 가장 간단하고 신뢰성있는 commit API
    • poll에 의해 리턴된 마지막 오프셋 커밋
    • poll에서 리턴된 모든 메시지를 처리하기 전 commit 할 경우 메시지가 누락될 수 있음
    • 브로커가 커밋 요청에 응답할 때 까지 블록됨
    • 성공하거나 재시도 불가능한 실패가 발생할때까지 재시도
  • commitAsync
    • 재시도 하지 않음
    • 콜백 처리(OffsetCommitCallback)
    • 콜백에서 재시도 처리시 커밋의 순서를 유의 해야 함
      • Apache Kafka에서 컨슈머의 `commitAsync` 메소드는 비동기적으로 오프셋을 커밋합니다. 이 방법은 `commitSync`에 비해 성능상 이점이 있지만, 비동기적 특성으로 인해 오프셋 커밋 순서가 보장되지 않는 문제가 있습니다. 즉, 늦게 시작된 커밋이 먼저 완료될 수 있으며, 이로 인해 컨슈머 재시작 시 메시지 중복 처리나 누락의 가능성이 존재합니다.

        ### 커밋 순서 강제 방법

        비동기 커밋 시 순서를 강제하는 명확한 방법은 Kafka 자체적으로 제공하지 않지만, 다음과 같은 방법으로 순서 문제를 완화할 수 있습니다:

        1. **단일 콜백 사용**: 비동기 커밋을 할 때마다 새로운 콜백을 생성하는 대신, 단일 콜백 인스턴스를 재사용하면서 내부적으로 최신 오프셋을 추적하는 방법입니다. 콜백 내에서는 커밋된 오프셋이 현재 알고 있는 최신 오프셋보다 이전인지 확인하고, 이전이면 무시합니다. 이 방법은 순서를 완벽하게 보장하지는 않지만, 순서 문제로 인한 영향을 최소화할 수 있습니다.

        2. **커밋 직렬화**: 오프셋 커밋 요청을 내부적으로 관리하는 큐를 구현하여, 이전 커밋 요청의 완료를 확인한 후에 다음 커밋을 실행하는 방법입니다. 이 방법은 성능에 영향을 줄 수 있지만, 커밋 순서를 엄격하게 관리할 수 있습니다.

        3. **비동기 커밋 후 동기 커밋으로 마무리**: 정기적으로 또는 특정 조건(예: 애플리케이션 종료 시)에서 `commitSync`를 호출하여 오프셋 커밋을 확실하게 완료하는 방법입니다. 이는 비동기 커밋 과정에서 발생할 수 있는 순서 문제를 마지막에 동기 커밋으로 보정합니다.

        4. **외부 저장소 사용**: 컨슈머가 처리한 오프셋을 외부 시스템(예: 데이터베이스)에 저장하고, 컨슈머 재시작 시 외부 시스템의 오프셋 정보를 기반으로 메시지 처리 위치를 결정하는 방법입니다. 이 방법은 추가적인 외부 시스템 의존성이 필요하지만, 컨슈머의 상태 관리를 더 세밀하게 할 수 있습니다.

        비동기 오프셋 커밋을 사용할 때는 커밋 순서가 보장되지 않는 점을 고려하여 애플리케이션의 로직을 설계해야 하며, 중복 처리나 메시지 누락을 방지하기 위한 추가적인 조치가 필요할 수 있습니다.
  • commitSync, commitAsync 동시 사용
    • 정상적인 상황에서는 commitAsync를 사용함
    • 컨슈머를 닫는 상황에서는 commitSync를 사용하여 재시도하도록 함
  • 특정 오프셋 커밋
    • 배치 처리 중 오프셋을 커밋하고 싶은 경우
    • Map<TopicPartition, OffsetAndMetadata> 을 commitSync/commitAsync에 파라미터로 넘겨 처리
    • 에러 처리
  • 컨슈머가 닫힐때 오프셋 커밋
    • ConsumerRebalanceListener 인터페이스를 구현하여 onPartitionsRevoked 메소드에 커밋 로직 구현
      • Eager rebalance 인경우 컨슈머가 읽기를 멈추고 리밸런스가 시작되기 전 호출
      • Cooperative rebalance 인 경우 리밸런스가 완료될때 할당해제해야할 파티션들에 대해서 호출

Standalone consumer

  • 컨슈머가 스스로 특정 토픽의 모든 메시지를 처리해야 하는 경우
  • 토픽을 구독할 필요가 없음
  • 리밸런스 필요없음
  • List<PartitionInfo> partitions = consumer.partitionsFor("Topic") 을 호출하여 파티션 정보를 조회
  • consumer.assign(partitions)
  • 루프를 돌면서 consumer.poll 호출하여 레코드를 가져와서 처리
  • offset commit
  • 리밸런싱을 사용하지 않으므로 파티션이 추가되는 경우를 자동으로 감지할 수 없으므로 consumer.partitionsFor를 주기적으로 호출하여 감지 하거나 애플리케이션을 재시작 해야할 수 있음

읽기 지연

  • 리더 레플리카는 팔로워 레플리카가 복제한 오프셋을 관리 함
  • 컨슈머는 모든 in sync 레플리카에 복제된 메시지만 읽을 수 있음
  • 복제가 지연되는 경우 컨슈머가 읽는 메시지도 지연될 수 있음
  • replica.lag.time.max.ms 에 정의된 시간 만큼 복제지연이 발 생할 수 있고 이 시간을 지나면 해당 레플리카는 out of sync 레플리카가 됨

 

+ Recent posts