티스토리 뷰

OpenSource/Kafka

5장 카프카 컨슈머

Jinhyy 2019. 12. 16. 13:09

컨슈머 주요 옵션

  • bootstrap.servers

    • 카프카 클러스터의 주소
  • fetch.min.bytes (df: 1 bytes)
    • 한번에 가져올 수 있는 최소 사이즈로, 만약 가져오는 데이터가 지정한 사이즈보다 작으면 요청에 응답하지 않고, 데이터가 누적될 때 까지 기다린다.
  • fetch.max.wait.ms(df: 500sec = 0.5sec) : fetch.min.bytes에 설정된 데이터보다 데이터 양이 적은 경우 요청에 응답을 기다리는 최대시간을 설정
  • group.id: 컨슈머가 속한 컨슈머 그룹의 id
  • enable.auto.commit (df: true)
    • 백그라운드로 주기적으로 오프셋을 커밋할지 안할지 설정하는 옵션
  • auto.commit.interval.ms (df: 5000ms = 5sec) : 주기적으로 오프셋을 커밋하는 시간
  • auto.offset.reset (df: latest) 
    • 카프카에서 초기 오프셋이 없거나, 현재 오프셋이 더 이상 존재하지 않는경우 다음 옵션으로 리셋한다.
      • earlist: 가장 초기의 오프셋 값으로 설정
      • latest: 가장 최근의 오프셋 값으로 설정
      • none: 이전 오프셋값을 찾지 못하면 에러를 발생시킵니다.
  • fetch.max.bytes (df : 52428800 bytes = 50mb) : 한번에 가져올 수 있는 최대 데이터 사이즈
  • request.timeout.ms(df: 30000ms = 30sec) : 요청에 대한 응답을 기다리는 최대 시간으로, 지정한 시간만큼 요청에 대한 응답이 안오면 retry 한다.
  • session.timeout.ms(df: 10000ms = 10sec)
    • 컨슈머와 브로커 사이의 세션 타임 아웃 시간으로, 브로커가 컨슈머가 살아있는 것으로 판단하는 시간.
    • 컨슈머가 그룹 코디네이터에게 하트비트를 해당시간만큼 보내지 않으면, 해당 컨슈머는 장애가 생겼다고 판단하여 컨슈머 그룹은 리밸런스(rebalanace)를 시도한다.
  • heartbeat.interval.ms(df: 3000ms = 3sec)
    • 그룹 코디네이터에게 얼마나 KafkaConsumer.poll() 메소드를 통하여 하트비트를 보낼 것인지 조정한다.
    • 일반적으로, session.timeout.ms의 1/3 정도로 설정한다.
  • max.poll.records(df: 500) : 폴링루프에서 이뤄지는 한건의 KafkaConsumer.poll() 메소드에 대한 최대 레코드수를 조정한다. 
  • max.poll.interval.ms(df: 300000ms = 30sec) : 컨슈머가 하트비트를 보냄에도 불구하고, poll을 하지 않으면 장애이기 때문에 poll 주기를 설정하여 장애를 판단하는데 사용한다. 해당 옵션보다 poll 주기가 길었을 경우 컨슈머 그룹에서 제외한 후, 다른 컨슈머가 해당 파티션을 처리할 수 있도록 한다.

 

파티션과 메시지 순서

  • 카프카는 파티션이 여러개인 경우 라운드 로빈 형식으로 파티션에 데이터를 분산 저장하고, 컨슈머에서 해당 데이터를 불러올 때, 파티션1의 데이터 / 2의 데이터 , ... 이런 순서대로 불러오기 때문에 메시지의 순서가 보장되지 않는다.

  • 메시지의 순서를 보장하려면 파티션을 1개만 사용해야 하는데, 이런 경우 분산저장이 힘들고 컨슈머그룹 안의 하나의 컨슈머만 파티션과 연결 할 수 있기 때문에 처리량이 낮다.

컨슈머 그룹

  • 카프카는 컨슈머 그룹이라는 개념을 통해, 하나의 토픽을 여러개의 컨슈머 그룹이 다른 방식으로 처리할 수 있다.
  • 카프카는 컨슈머 그룹에 리밸런스라는 개념을 통하여, 컨슈머에 장애가 발생했을 시 다른컨슈머로 해당 작업을 자동으로 위임한다. 이 때, 장애가 발생함을 알기 위해서 주기적으로 컨슈머에서 브로커로 보내는 하트비트와 poll() 메소드 등의 주기를 감시한다.
  • 하나의 컨슈머가 너무 많은 파티션을 담당하고 있는지 확인하는게 필요하다.

커밋과 오프셋

  • 컨슈머는 각각의 파티션에 대하여 자신이 가져간 메시지의 오프셋을 저장한다.
  • 커밋 : 컨슈머가 처리하고 있는 각 파티션 안에다가 자신의 오프셋을 업데이트 하는 동작
  • 자동커밋 - enable.auto.commit 옵션등을 사용하여 설정 할 수 있으며, 정한 주기에 따라서 자동으로 commit 요청을 보내는데, 주의할 점으로는 작업이 처리된 상태이고, 자동 커밋이 되기전 상황에 리밸런스가 되버리면, commit 이 안된 상태이기 때문에 처리했던 작업을 다시 중복으로 처리하게 될 수 있다.
  • 수동커밋 - DB에 저장하여 중복이 최대한 없도록 할 때 사용하는데, Consumer 프로젝트 내에서, consumer.commitSync() 라는 메소드를 통하여, 원하는 작업이 끝난 후 commit을 하도록 호출 할 수도 있다. 하지만 ,이 경우에도 DB에 저장은 되고, commit이 되기전 해당 서버가 죽게 된다면 해당 작업을 중복으로 처리 될 가능성은 있다.
  • 특정 파티션 할당
    • 특정 파티션에 할당해야 하는 상황은 아래와 같은 경우가 있을 수 있다.
      • 키-값의 형태로 파티션에 저장되어 있고, 특정 파티션에 대한 메시지만 가져와야 할 때
      • 컨슈머 프로세스가 가용성과 장애복구 능력이 높아서 브로커가 컨슈머의 실패를 감지, 리밸런스할 필요 없고 자동으로 컨슈머 프로세스가 다른 시스템에서 시작되는 경우(YARN, Mescos)
      • (컨슈머에서 해당 토픽의 특정 파티션만 처리하도록 설정하는 예제)
TopicPartition p0 = new TopicPartition(topic, 0);
TopicPartition p1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(p0, p1));
  • 파티션의 특정 오프셋으로부터 메시지 가져오기
consumer.assign(Arrays.asList(p0, p1));
consumer.seek(p0, offset1);
consumer.seek(p1, offset2);

'OpenSource > Kafka' 카테고리의 다른 글

4장 카프카 프로듀서  (0) 2019.12.16
3장 카프카 디자인  (0) 2019.12.09
2장 카프카 설치(zookeeper 중심)  (0) 2019.12.04
1. 카프카란 무엇인가  (0) 2019.12.02
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함