프로듀서

  • 재시도로 인해 발생하는 중복을 방지
  • 멱등적 프로듀서 + 트랜잭션
  • 멱등적 프로듀서
    • 멱등적 프로듀서 기능 on : enable.idempotence=true, acks = all
    • 식별자 생성
      • producer Id, sequence id 생성
      • 대상 토픽, 파티션
      • max.in.flights.requests.per.connection <= 5
    • 중복시 에러가 발생하지는 않지만 메트릭에는 수집됨
      • client : record-error-rate
      • broker : RequestMetrics - ErrorsPerSec
    • 예상보다 높은 시퀀스 값을 받게 될경우 out of order sequence number 에러 발생
      • 트랜잭션 기능을 사용하지 않는 다면 무시해도 됨
      • 프로듀서와 브로커 사이에 메시지 유실
      • 브로커 설정 재점검 및 언클린 리더 선출 발생 여부 확인
    • 프로듀서 재시작
      • 초기화 과정에서 브로커로부터 프로듀서id를 생성 받음
      • 프로듀서 초기화마다 새로운 id 생성
      • 새 프로듀서가 이미 기존 프로듀서가 전송한 메시지를 다시 전송할 경우 중복될 수 있음
    • 브로커 장애
      • 컨트롤러는 장애가 난 브로커가 리더를 맡고 있었던 파티션들에 대해 새 리더를 선출
      • 새 리더는 중복메시지를 체크하기 시작함
        • 리더는 인메모리 프로듀서 상태에 저장된 최근 5개의 시퀀스 넘버를 업데이트 하고 팔로워는 새로운 메시지를 복제할 때마다 자체적인 인메모리 버퍼를 업데이트함으로써 상태를 알 수 있다. 
      • 장애난 브로커의 복구 
        • 브로커는 종료되거나 새 세그먼트가 생성될 때마다 프로듀서 상태에 대한 스냅샷을 파일 형대로 저장
        • 브로커가 시작되면 파일에서 최신 상태를 읽어옴
        • 현재 리더로부터 복제한 레코드를 사용해서 프로듀서 상태를 업데이트 함
    • 한계
      • 내부로직으로 인한 재시도가 발생할 겨우 생기는 중복만 방지
        • 동일한 메시지를 producer.send를 두번 호출하면 중복 발생

트랜잭션

  • 카프카 스트림즈를 사용해서 개발된 애플리케이션에 정확성을 보장하기 위해 도입 (읽기-처리-쓰기 패턴)
  • 다수의 파티션에 대해 원자적 쓰기(atomic multipartition write) 기능도입
  • 문제상황
    • 애플리케이션 크래시에 의한 재시도
    • 좀비 애플리케이션에 의해 발생하는 재처리
  • Transactional producer
    • transactional.id 설정 
      • 재시작시에도 값이 유지됨
    • initTransactions() 호출로 초기화
      • transactional.id와 producer.id 의 대응관계를 유지함
      • 이미 있는 transactional.id 프로듀서가 initTransactions를 호출하는 경우 이전에 쓰던 producer.id값을 할당
    • Zombie fencing
      • 좀비 인스턴스가 출력 스트림에 결과를 쓰는 것을 방지
      • epoch 사용
      • initTransactions 호출 시 transactional.id에 해당하는 epoch 값을 증가 시킴
      • epoch값이 낮은 프로듀서가 전송 후 트랜잭션 커밋/중단 요청을 보낼경우 FencedProducer 에러 발생
      • close()를 호출하여 좀비 애플리케이션 종료
  • 컨슈머 격리 수준(isolation.level)
    • read_uncommitted(default) : 진행중이거나 중단된 트랜잭션에 속한 모든 레코드 리턴
    • read_committed
      • 커밋된 트랜잭션에 속한 메시지 / 트랜잭션에 속하지 않는 메시지만 리턴
      • 트랜잭션에 속한 일부 토픽만 구독하므로 트랜잭션에 속한 모든 메시지가 리턴된다고 보장되지 않음
      • 트랜잭션이 처음으로 시작된 시점 (Last Stable Offset, LSO) 이후에 쓰여진 메시지는 리턴되지 않음
        • 트랜잭션이 커밋/중단 되는 시점 또는 transaction.timeout.ms 만큼 시간이 지날 때까지 보류
    • transaction.timeout.ms
      • default : 15분
      • 종단 지연이 길어질 수 있음
  • 한계
    • 스트림 처리내에서 외부 효과를 일으키는 작업
    • 카프카 토픽에서 읽어서 데이터 베이스에 쓰는 경우
    • DB에서 읽어서 카프카에 쓰고 다른 DB에 쓰는 경우
    • 한클러스터에서 다른 클러스터로 데이터 복제
    • 발행/구독 패턴
  • 사용법
    • 카프카 스트림즈
      • processing.guarantee= exactly_once / exactly_once_beta 로 설정
    • 트랜잭션 API 사용
      • transactional.id 설정
      • enable.auto.commit = false
      • isolation.leve = read_committed
      • producer.initTransactions()
        • transactional.id 등록 또는 epoch 값 증가
      • producer.beginTransaction()
        • 프로듀서에 현재 진행중이 트랜잭션이 있음을 알림
        • 레코드 전송 시 프로듀셔가 브로커에 AddPartitionsToTxn 요청
        • 트랜잭션 로그에 기록
      • producer.sendOffsetsToTransaction
        • 트랜잭션이 커밋 되기전 호출
        • 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 아이디가 포함된 요청 전송
        • 트랜잭션 코디네이터는 컨슈머 그룹 아이디를 이용하여 컨슈머 그룹 코디네이터를 찾고 오프셋 커밋
      • producer.commitTransaction / producer.abortTransaction
        • 트랜잭션 코디네이터로 EndTxn 요청 전송
        • 트랜잭션 로그에 기록
        • 트랜잭션에 포함된 모든 파티션에 마커 쓰기
        • 트랜잭션 로그에 기록
          • 로그에기록 되었지만 코디네이터가 종료되거나 크래쉬 되면 새로 선출된 코디네이터가 마무리 짐
      • transaction.timeout.ms 
        • 해당 시간내에 커밋/중단 되지 않으면 트랜잭션 코디네이터가 중단
  • 원리
    • 찬디-램포트 스냅샷 알고리즘 : 통신채널을 통해 카머라 불리는 컨트롤 메시지를 보내고 도착을 기준으로 일관적인 상태를 결정
    • two phase commit, 트랜잭션 로그(transaction_state 내부 토픽) 사용
      • 현재 진행중인 트랜잭션과 파티션을 함께 로그에 기록
      • 커밋/중단 시도 기록
      • 모든 파티션에 트랜잭션 마커를 씀
      • 트랜잭션 종료 로그 기록
  • 모니터링
    • LastStableOffsetLag 지표
      • 파티션의 LSO가 최신 오프셋 값에서 얼마나 떨어졌는지를 나타냄
      • 이값이 무한히 증가하는 상황 : 트랜잭션 멈춤 현상

+ Recent posts