kwang23
2024. 2. 7. 15:44
2024. 2. 7. 15:44
프로듀서
- 재시도로 인해 발생하는 중복을 방지
- 멱등적 프로듀서 + 트랜잭션
- 멱등적 프로듀서
- 멱등적 프로듀서 기능 on : enable.idempotence=true, acks = all
- 식별자 생성
- producer Id, sequence id 생성
- 대상 토픽, 파티션
- max.in.flights.requests.per.connection <= 5
- 중복시 에러가 발생하지는 않지만 메트릭에는 수집됨
- client : record-error-rate
- broker : RequestMetrics - ErrorsPerSec
- 예상보다 높은 시퀀스 값을 받게 될경우 out of order sequence number 에러 발생
- 트랜잭션 기능을 사용하지 않는 다면 무시해도 됨
- 프로듀서와 브로커 사이에 메시지 유실
- 브로커 설정 재점검 및 언클린 리더 선출 발생 여부 확인
- 프로듀서 재시작
- 초기화 과정에서 브로커로부터 프로듀서id를 생성 받음
- 프로듀서 초기화마다 새로운 id 생성
- 새 프로듀서가 이미 기존 프로듀서가 전송한 메시지를 다시 전송할 경우 중복될 수 있음
- 브로커 장애
- 컨트롤러는 장애가 난 브로커가 리더를 맡고 있었던 파티션들에 대해 새 리더를 선출
- 새 리더는 중복메시지를 체크하기 시작함
- 리더는 인메모리 프로듀서 상태에 저장된 최근 5개의 시퀀스 넘버를 업데이트 하고 팔로워는 새로운 메시지를 복제할 때마다 자체적인 인메모리 버퍼를 업데이트함으로써 상태를 알 수 있다.
- 장애난 브로커의 복구
- 브로커는 종료되거나 새 세그먼트가 생성될 때마다 프로듀서 상태에 대한 스냅샷을 파일 형대로 저장
- 브로커가 시작되면 파일에서 최신 상태를 읽어옴
- 현재 리더로부터 복제한 레코드를 사용해서 프로듀서 상태를 업데이트 함
- 한계
- 내부로직으로 인한 재시도가 발생할 겨우 생기는 중복만 방지
- 동일한 메시지를 producer.send를 두번 호출하면 중복 발생
트랜잭션
- 카프카 스트림즈를 사용해서 개발된 애플리케이션에 정확성을 보장하기 위해 도입 (읽기-처리-쓰기 패턴)
- 다수의 파티션에 대해 원자적 쓰기(atomic multipartition write) 기능도입
- 문제상황
- 애플리케이션 크래시에 의한 재시도
- 좀비 애플리케이션에 의해 발생하는 재처리
- Transactional producer
- transactional.id 설정
- initTransactions() 호출로 초기화
- transactional.id와 producer.id 의 대응관계를 유지함
- 이미 있는 transactional.id 프로듀서가 initTransactions를 호출하는 경우 이전에 쓰던 producer.id값을 할당
- Zombie fencing
- 좀비 인스턴스가 출력 스트림에 결과를 쓰는 것을 방지
- epoch 사용
- initTransactions 호출 시 transactional.id에 해당하는 epoch 값을 증가 시킴
- epoch값이 낮은 프로듀서가 전송 후 트랜잭션 커밋/중단 요청을 보낼경우 FencedProducer 에러 발생
- close()를 호출하여 좀비 애플리케이션 종료
- 컨슈머 격리 수준(isolation.level)
- read_uncommitted(default) : 진행중이거나 중단된 트랜잭션에 속한 모든 레코드 리턴
- read_committed
- 커밋된 트랜잭션에 속한 메시지 / 트랜잭션에 속하지 않는 메시지만 리턴
- 트랜잭션에 속한 일부 토픽만 구독하므로 트랜잭션에 속한 모든 메시지가 리턴된다고 보장되지 않음
- 트랜잭션이 처음으로 시작된 시점 (Last Stable Offset, LSO) 이후에 쓰여진 메시지는 리턴되지 않음
- 트랜잭션이 커밋/중단 되는 시점 또는 transaction.timeout.ms 만큼 시간이 지날 때까지 보류
- transaction.timeout.ms
- default : 15분
- 종단 지연이 길어질 수 있음
- 한계
- 스트림 처리내에서 외부 효과를 일으키는 작업
- 카프카 토픽에서 읽어서 데이터 베이스에 쓰는 경우
- DB에서 읽어서 카프카에 쓰고 다른 DB에 쓰는 경우
- 한클러스터에서 다른 클러스터로 데이터 복제
- 발행/구독 패턴
- 사용법
- 카프카 스트림즈
- processing.guarantee= exactly_once / exactly_once_beta 로 설정
- 트랜잭션 API 사용
- transactional.id 설정
- enable.auto.commit = false
- isolation.leve = read_committed
- producer.initTransactions()
- transactional.id 등록 또는 epoch 값 증가
- producer.beginTransaction()
- 프로듀서에 현재 진행중이 트랜잭션이 있음을 알림
- 레코드 전송 시 프로듀셔가 브로커에 AddPartitionsToTxn 요청
- 트랜잭션 로그에 기록
- producer.sendOffsetsToTransaction
- 트랜잭션이 커밋 되기전 호출
- 트랜잭션 코디네이터로 오프셋과 컨슈머 그룹 아이디가 포함된 요청 전송
- 트랜잭션 코디네이터는 컨슈머 그룹 아이디를 이용하여 컨슈머 그룹 코디네이터를 찾고 오프셋 커밋
- producer.commitTransaction / producer.abortTransaction
- 트랜잭션 코디네이터로 EndTxn 요청 전송
- 트랜잭션 로그에 기록
- 트랜잭션에 포함된 모든 파티션에 마커 쓰기
- 트랜잭션 로그에 기록
- 로그에기록 되었지만 코디네이터가 종료되거나 크래쉬 되면 새로 선출된 코디네이터가 마무리 짐
- transaction.timeout.ms
- 해당 시간내에 커밋/중단 되지 않으면 트랜잭션 코디네이터가 중단
- 원리
- 찬디-램포트 스냅샷 알고리즘 : 통신채널을 통해 카머라 불리는 컨트롤 메시지를 보내고 도착을 기준으로 일관적인 상태를 결정
- two phase commit, 트랜잭션 로그(transaction_state 내부 토픽) 사용
- 현재 진행중인 트랜잭션과 파티션을 함께 로그에 기록
- 커밋/중단 시도 기록
- 모든 파티션에 트랜잭션 마커를 씀
- 트랜잭션 종료 로그 기록
- 모니터링
- LastStableOffsetLag 지표
- 파티션의 LSO가 최신 오프셋 값에서 얼마나 떨어졌는지를 나타냄
- 이값이 무한히 증가하는 상황 : 트랜잭션 멈춤 현상