Book
[카프카, 데이터 플랫폼의 최강자] 5장. 카프카 컨슈머
Jordy-torvalds
2021. 9. 29. 23:36
반응형
컨슈머란?
- 토픽의 메시지를 가져와서 소비(consume)하는 애플리케이션과 서버.
- 주요기능은 특정 파티션을 관리하고 있는 파티션 리더로부터 메시지 가져오기 요청을 하는 것.
- 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신합니다.
- 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고 필요하다면 이미 가져온 데이터도 다시 들고 올 수 있습니다.(다른 메시지 큐에서는 제공되지 않는 기능, 메시지가 디스크에 저장되기에 가능.)
5.1. 컨슈머의 주요 옵션
- 컨슈머의 종류는 크게 두 가지로 old consumer와 new consumer가 있음.
- 주키퍼의 지노드에 오프셋을 저장하면 old consumer,
카프카의 토픽에 저장하면 new consumer - 주키퍼의 지노드에 저장하는 방식은 향후 deprecated될 예정
# 카프카 브로커 서버의 아이피 및 포트 정보
bootstrap.server:<kafka-broker-server-ip:port,kafka-broker-server-ip:port, ...>
# 한 번에 가져올 수 있는 최소 데이터 사이즈. 데이터의 사이즈가 설정 값보다 작으면 대기
fetch.min.bytes
# 컨슈머 그룹의 식별자. 중요.
group.id
# 백그라운드로 주기적으로 오프셋을 커밋
enable.auto.commit
# 카프카에 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우 다음 옵션으로 리셋
auto.offset.reset=earliest # 가장 초기의 오프셋 값 설정
auto.offset.reset=latest # 가장 마지막의 오프셋 값설정
auto.offset.reset=none # 이전 오프셋 값을 못 찾으면 에러 출력
# 한 번에 가져올 수 있는 최대 데이터 사이즈
fetch.max.bytes
# 요청에 대해 응답을 기다리는 시간
request.timeout.ms
# 컨슈머와 브로커 사이의 세션 타임 아웃 시간. 브로커가 컨슈머가 살아있다고 판단하는 시간
# 기본 값 10초
# 설정된 시간이 지나면 컨슈머가 종료되었거나 장애가 발생한 것으로 판단하고 리밸런스 함
# heartbeat.timeout.ms 와 밀접하게 관련됨. 함께 수정됨.
# 너무 짧게 잡으면 GC나 POLL LOOP를 완료하는 시간이 길어지게 되며 의도하지 않은
# 리밸런스가 발생할 수 있음.
session.timeout.ms
# 그룹 코디네이터(주키퍼)에 얼마나 자주 consumer poll 메소드로 하트비트를 보낼 것인지 조정하는 옵션.
# session.timeout.ms 보다 낮아야 하며 대체로 1/3 정도로 설정.
# 기본 값 3초
heartbeat.timeout.ms
# 단일 호출 poll에 대해 최대 레코드 수를 조정
max.poll.records
# 컨슈머가 살아있는지를 체크하기 위해 하트비트를 주기적으로 보냄
# 그런데 컨슈머가 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수 있음
# 이럴 땐 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고
# 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션의 메시지를 가져갈 수 있게 함.
max.poll.interval.ms
# 주기적으로 오프셋을 커밋하는 시간
auto.commit.interval.ms
# fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간
fetch.max.wait.ms
# 그 외에 다른 프로듀서 옵션은 https://kafka.apache.org/documentation/#consumerconfigs 참조
파티션 내에서는 순서가 보장되지만 토픽 내에 여러 파티션 간에는 순서가 보장되지 않음.
그렇기 때문에 순서가 엄격히 보장되어야 하는 토픽은 파티션을 하나만 설정해야 함.
그에 따른 트레이드 오프로 처리량이 떨어짐.
컨슈머 그룹
- 하나의 토픽에 대해 여러 컨슈머가 하나의 그룹을 이뤄서 메시지를 읽어올 수 있게하는 기능
- 이 기능의 장점은 토픽에 메시지가 쌓이는 속도를 컨슈머가 따라가지 못해 처리 지연이 발생하게 됨.
- 컨슈머만 확장할 경우 오프셋이 뒤섞여 순서가 엉망이 될 수 있음
- 컨슈머 그룹은 이 모든 문제를 해결해 줌.
- group.id 가 동일한 컨슈머가 같은 그룹에 속하게 됨.
- 특정 파티션에 대한 컨슈머의 소유 권한을 이동 시키는 것을 리밸런스라고 하는데, 이 리밸런스를 통해 여러 컨슈머에게 부하를 분산할 수 있음.
- 커슈머 그룹과 리밸런스 덕분에 가용성과 확장성이 확보됨.
- 리밸런스를 하는 동안에는 파티션으로부터 메시지를 가져올 수 없기 때문에 빈번히 일어나서는 안됨.
- 파티션 내 순서 보장 때문에 하나의 파티션에 여러 컨슈머가 붙을 수가 없음.
- 그래서, 토픽에 처리되지 못한 메시지가 쌓이는 것을 방지하기 위해서는 토픽의 파티션과 컨슈머 그룹 내에 컨슈머의 수를 함께 늘리는 것이 필요함
- 컨슈머 그룹의 일부 컨슈머가 죽더라도, 여러 파티션이 하나의 컨슈머에 연결되는 것은 부하가 몰린다는 문제가 있긴 하지만 에러는 아님. 그래도 모니터링을 통해 재빨리 컨슈머를 복구하는 것이 필요함.
- 하나의 토픽(큐)에 대해 여러 용도로 쓸 수 있고 컨슈머가 메시지를 가져가도 사라지지 않는 다는 특성 때문에 용도에 따라 컨슈머 그룹을 추가할 수 있음.
- 컨슈머 그룹 같에는 오프셋이 공유 되지 않으므로 순서에 있어서도 문제가 되지 않음.
커밋과 오프셋
- 컨슈머 그룹의 각각의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치정보(오프셋)을 기록하고 있음.
- 각 파티션에 대한 현재 위치를 업데이트하는 동작을 커밋한다고 함.
- 오프셋의 저장 위치가 주키퍼면 올드 컨슈머, 성능 등의 문제로 카프카 내에 별도로 내부에서 사용하는 토픽(consumer_offsets)를 만들고 그 토픽에 오프셋 정보를 정하고 있음.
- 갑작스럽게 컨슈머가 다운되서 리밸런스가 일어나게되서, 파티션의 소유권이 바뀌면 소유권을 전달받은 컨슈머는 오프셋 정보를 읽어 가장 최근 커밋된 오프셋을 읽고 그 이후부터 메시지들을 가져옴
- 만약 커밋된 오프셋이 실제 마지막으로 처리한 오프셋보다 작으면 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메시지는 중복 처리 됨.
- 만약 커밋된 오프셋이 실제 마지막으로 처리한 오프셋보다 크면 마지막 처리된 오프셋과 커밋된 오프셋 사이의 메시지는 누락 됨.
- 그렇기 때문에 커밋은 매우 중요.
자동 커밋
- 파티션에 대한 오프셋 정보 관리, 파티션 변경에 대한 관리는 매우 번거로움
- 그래서 오프셋 관리를 사용자가 직접하는 것이 아닌 시스템에 맡기는 것이 가장 쉬운 방법
- 그래서 가장 많이 사용되는 것이 자동 커밋
- 자동 커밋을 하고 싶을 때는 컨슈머 옵션에
enable.auto.commit=true
로 헐정하면 5초마다 컨슈머는 poll()을 호출할 떄 가장 마지막 오프셋을 커밋. - 이 커밋 주기를 기본 값인 5초에서 바꾸고 싶으면
[auto.commit.interval.ms](http://auto.commit.interval.ms)
옵션으로 조정 가능. - 오프셋 관리가 편리한 만큼 중복이 발생할 확률도 높아짐
수동 커밋
- 경우에 따라 자동 커밋이 아닌 수동 커밋이 적절할 수 있음
- 이러한 경우에 메시지 처리가 완료될 떄까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용.
- 컨슈머 옵션에
enable.auto.commit=false
로 추가하면 설정 가능하며 commitSync 메소드를 호출해 수동으로 커밋할 수 있음. - 물론 수동 커밋도 데이터베이스 트랜잭션 실패 등으로 인해 중복 처리가 발생할 수 있지만, 적어도 손실은 없음
특정 파티션 할당
- 특정 컨슈머 인스턴스에게 특정 파티션을 할당해서 처리하는 것이 가능.
- 사용시
[group.id](http://group.id)
가 다른 컨슈머 인스턴스과 다르게 설정해서 오프셋이 공유 되지 않도록 해야함.
특정 오프셋부터 가져오기
- 메시지 중복 처리 등의 이유로 경우에 따라 오프셋 관리를 수동으로 하는 경우도 있음
- 이러한 경우에는 수동으로 어디부터 메시지를 읽어올지를 지정해야 하는데 이때는 seek()메소드를 사용하면 됨
seek(/* 파티션 번호 /, /*오프셋 번호 */);
ex) seek(partition0, 3); // partition0 에서 3번 오프셋 부터 메시지를 읽어옴.
반응형