- Kafka 52023년 08월 27일
- starryeye
- 작성자
- 2023.08.27.오후10:21
공부한 개념을 한 문장씩 쭉 세워본다.
컨슈머는 카프카에 적재된 데이터를 가져가서 필요한 처리를 한다.
컨슈머는 토픽 기준으로 데이터를 소비한다.
카프카 공식 오픈 라이브러리 (Java) 기준으로..
Fetcher 객체는 리더 파티션으로부터 레코드들을 가져오는 책임을 가진다. (배치로 묶어서 가져온다.)
KafkaConsumer 의 poll 메서드는 Fetcher 에 있는 레코드들을 리턴해준다.
ConsumerRecords 객체는 poll 메서드의 리턴 타입으로 처리가 필요한 레코드들(ConsumerRecord)이 담겨 있다.
ConsumerRecord 객체에는 오프셋이 포함되어있다.
컨슈머 그룹은 컨슈머를 각 컨슈머 그룹으로 부터
격리된 환경에서 안전하게 운영할 수 있도록 도와준다.
컨슈머 그룹으로 묶인 컨슈먼가 토픽을 구독해서 데이터를 가져갈 때,
1 개의 파티션은 최대 1 개의 컨슈머에 할당 된다. (1:1 로 할당될 때 최대의 성능)
1 개의 컨슈머는 여러개의 파티션에 할당될 수 있다.
컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 같거나 적어야한다.
만약, 컨슈머 그룹의 컨슈머가 파티션 개수보다 많으면..
파티션을 할당 받지 못한 컨슈머는 실질적인 데이터를 처리하지 못한다. (idle, 불필요한 자원)
위 그림은 두개의 컨슈머 그룹의 컨슈머들이 각 파티션에 할당된 모습을 보여준다.
위와 같은 구조를 사용함으로써 카프카 컨슈머 그룹은 서로 격리되고 각각의 오프셋을 가질수 있다.
이것이 컨슈머 그룹의 존재 이유이다.
컨슈머 그룹 0 에서 장애가 발생하더라도 컨슈머 그룹 1 에는 영향이 전혀 없으며
컨슈머 그룹 0 의 장애가 복구되면 컨슈머 그룹 0 의 오프셋을 이용하여 작업이 재개 될 것이다.
컨슈머 그룹으로 이루어진 컨슈머들 중 일부 컨슈머에 장애가 발생하면,
장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 재할당 된다.
이러한 과정을 리밸런싱(Rebalancing) 이라 한다.
이것이 카프카의 fail over 방식이다.
지속적으로 데이터를 처리할 수 있게 해주어 가용성을 높여주는 것이다.
리밸런싱은 장애상황에만 이루어지는 것이 아닌..
단순히 컨슈머가 추가되거나 삭제되는 상황에 발생된다.
실무에서 파티션이 100 개 이상이라면, 리밸런싱 과정이 몇십초에서 몇분까지 걸릴 수 있다.
컨슈머는 카프카 브로커로부터 데이터를 가져가고
해당 데이터를 처리하면 커밋이라는 행위를 통해
토픽에 데이터를 어디까지 가져갔고 처리했는지 기록한다.
커밋은 카프카 브로커 내부 토픽인 __consumer_offsets 이라는 토픽에 기록된다.
커밋이 제대로 동작하지 못했다면 데이터 처리의 중복이 발생할 수 있다.
컨슈머와 파티션 할당 정책은 컨슈머의 Assignor 에 의해 결정된다.
(정책은 컨슈머가 정하지만, 할당 자체는 브로커(코디네이터)에 의해 수행되는 듯..)
정책 구현체로는..
RangeAssignor 는 파티션을 숫자로 정렬하고 컨슈머를 사전 순서로 정렬하여 서로 할당한다. (기본값이다.)
RoundRobinAssignor 는 모든 파티션을 컨슈머에서 번갈아가며 할당
StickyAssignor 는 최대한 파티션을 균등하게 배분하면서 할당..
컨슈머의 필수 옵션..
- bootstrap.servers 는 카프카 클러스터의 브로커 호스트 이름과 포트를 1개 이상 적용해준다.
- key.deserializer 는 레코드의 메시지 키를 역직렬화하는 클래스 지정
- value.deserializer 는 레코드의 메시지 값을 역직렬화하는 클래스 지정
컨슈머의 선택 옵션..
- group.id 는 컨슈머 그룹 아이디를 지정한다.
subscribe 메서드를 사용할 때는 이 옵션이 필수이다. (기본 값 null)
- auto.offset.reset 는 한번도 커밋하지 않았을 경우(컨슈머 오프셋이 없는 경우) 에
어느 오프셋 부터 읽을지를 설정하는 옵션이다. (기본 값 latest)
- enable.auto.commit 은 자동 커밋 모드로 할지 수동 커밋 모드로 할지 선택한다. (기본 값 true)
false 로 수동 커밋모드로 할 경우, poll 메서드를 호출했다면 데이터 처리 후 commit 메서드를 호출해줘야한다.
- auto.commit.interval.ms 는 자동 커밋 모드일 경우 오프셋 커밋 간격을 지정 (기본 값 5000, 5초)
- max.poll.records 는 poll 메서드를 호출 하면 반환되는 레코드 최대 갯수를 지정 (기본 값 500)
아래는 리밸런싱의 근거가 된다.
- session.timeout.ms 는 컨슈머가 브로커와 연결 끊김 판단 시간이다. (기본 값 10000, 10초)
- hearbeat.interval.ms 는 컨슈머가 자신이 살아 있음을 알리는..
하트비트 전송 시간 간격이다. (기본 값 3000, 3초)
- max.poll.interval.ms 는 poll 메서드를 호출하는 시간 간격이다. (기본 값 300000, 5분)
컨슈머는 살아있지만, 데이터 처리에 문제가 생길 경우 문제가 생겼다고 판단하는 시간
- isolation.level 은 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우...
이를 컨슈머 입장에서 트랜잭션 단위로 읽을 때 사용하는 옵션이다.
프로듀서에서 send 메서드의 결과를 비동기(콜백도 가능) 처리할 수 있었다.
수동 커밋 모드의 컨슈머에서는 poll 이후 데이터 처리를 하고
commitAsync 메서드를 통해 비동기(콜백도 가능)로 커밋을 수행하여 데이터 처리 속도를 높일 수 있다.
컨슈머는 ConsumerRebalanceListener 인터페이스를 구현하여
리밸런스 리스너를 구현할 수 있다.
onPartitionAssigned 메서드는 파티션이 할당 완료되면 호출된다.
onPartitionRevoked 메서드는 리밸런스가 시작되기 직전에 호출된다.
subscribe 메서드를 사용하면 토픽을 구독하는 컨슈머 그룹으로 운영하는 방식이다.
하지만..
어떤 토픽에 대하여 특정 파티션에만 접근하는 어떤 컨슈머를 직접 할당 하고 싶을 수 도 있다.
이때는 subscribe 메서드 대신 assign 메서드를 사용한다.
(파티션 번호가 필요하고 따라서 프로듀서 입장에서는 메시지 키를 레코드에 적재시켜 줘야한다.)
어떤 토픽에 대해서 파티션이 3개 일 때...
1. 1개의 프로세스에 3개의 스레드(컨슈머)로 구성할 것인가...
2. 1개의 스레드(컨슈머)를 가진 3개의 프로세스로 구성할 것인가...
1번 방식은 컨슈머의 장애 격리가 안된다.
2번 방식은 컨슈머의 장애 격리가 되고 scale out이 쉽다.
AWS 와 같이 클라우드 환경에서 자주 쓰인다. 하지만, 배포 부담이 좀 있다.
컨슈머 랙(LAG) 은 파티션의 최신 오프셋과 컨슈머 오프셋 간의 차이이다.
{파티션 오프셋 - 컨슈머 오프셋} 이다.
파티션 개수 * 컨슈머 그룹 만큼 컨슈머 랙을 모니터링 할 수 있다.
프로듀서 1개, 파티션 1개, 컨슈머 1개인 상태에서...
프로듀서가 보내는 데이터 속도(records/second)가
컨슈머의 데이터 처리 속도(records/second) 보다 크면...
시간이 증가됨에 따라 컨슈머랙이 커진다.
해결책으로는 파티션개수와 컨슈머 개수를 선형으로 증가시키면
병렬처리량이 늘어나서 해결된다.
다음글이전글이전 글이 없습니다.댓글