ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 아직 나만 못해본 Kafka cluster 구성하기
    Kafka 2020. 4. 2. 15:38

    30분마다 넘어오는 크롤링 데이터를 가공하여 웹서비스를 제공하는 프로젝트에 들어가게 되었습니다.

     

    저는 다음과 같은 고민을 하였습니다.

     

    1. 하루에 수십 번씩 쏟아지는 많은 양의 데이터를 어떻게 전달할 것인가?

    2. 병목 현상으로 처리 실패 및 데이터 유실이 발생하지 않을까?

     

    그 고민 중 kafka를 알게 되었고 아래와 같은 이유로 kafka를 선택하게 되었습니다.

     

    벤치마크를 통한 성능의 우위.

    메시지의 순서가 보장이 필요 없음.

    다른 MQ들과는 달리 읽어도 메시지가 사라지지 않아 데이터 유실이 걱정 없음. 또한 사라지지 않기 때문에 추후 다른 서비스에서 분석을 위해 메시지를 같이 수신할 수 있음.

     

    때문에 kafka의 도입을 계획하며 kafka를 맛보면 정리한 글입니다.

     

     

     

    이 글은 아래와 같은 순서로 진행이 됩니다. 

     

    1. 카프카 설치

    2. JDK 설치

    3. 주키퍼 서버 설정 및 구동

    4. 카프카 서버 설정 및 구동

    5. 확인

    Option. 기본 개념 정리

     

     

    0. 인스턴스 구성

    클러스터를 구성하기 위하여 인스턴스 3개를 준비하였습니다.

    하지만 연습이기 때문에 꼭 3개를 준비하실 필요 없습니다.  한 개의 환경에서도 충분히 구성하실 수 있습니다. 글 중간중간 설명을 하겠습니다.

     

    3개의 인스턴스

    1. 카프카 설치

    https://downloads.apache.org/kafka/에서 원하시는 버전을 다운로드하실 수 있습니다.

    다운로드 후 압축을 해제하도록 하겠습니다.

    wget https://downloads.apache.org/kafka/2.4.1/kafka_2.13-2.4.1.tgz
    tar xvf kafka_2.13-2.4.1.tgz

     

    2. JDK 설치

     

    카프카는 스칼라와 자바로 구성되었기 때문에 자바를 설치해주셔야 합니다.

    저는 Amazon linux2 AMI를 사용하여 인스턴스를 생성하였기때문에 아래와 같은 명령어를 통해 설치하였습니다.
    각자의 환경에 맞춰 설치해주세요.

    sudo amazon-linux-extras install java-openjdk11

     

    3. 주키퍼 서버 설정 및 구동

     

    압축을 해제하신 폴더의 config 폴더 내부에 주키퍼 서버 설정을 담당하는 zookeeper.properties라는 파일이 존재합니다.

    저는 어떤 환경이든 default로 제공한 원본 설정을 따로 저장해두는것이 좋은 습관이라고 여겨 복사를 해둡니다.

    cp kafka_2.13-2.4.1/config/zookeeper.properties kafka_2.13-2.4.1/config/zookeeper_origin.properties

     

    3-1. 주키퍼 서버 설정

     

    주키퍼 앙상블을 구성하기 위해 zookeeper.properties 을 설정하여줍니다.

     

    (1번 서버 예시)

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # the directory where the snapshot is stored.
    dataDir=/tmp/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
    
    #the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be twice the tickTime.
    tickTime=2000
    
    #initLimit is timeouts ZooKeeper uses to limit the length of time the ZooKeeper servers in quorum have to connect to a leader.
    initLimit=5
    
    #syncLimit limits how far out of date a server can be from a leader.
    syncLimit=2
    
    server.1=0.0.0.0:2888:3888
    server.2=x.x.x.x:2888:3888
    server.3=x.x.x.x:2888:3888

     

    dataDir: 주키퍼의 트랜잭션 로그와 스냅샷이 저장되는 데이터 경로. (한 인스턴스에서 클러스터를 구축하시는 분들은 각 서버별로 디렉터리를 다르게 해주어야 합니다)

    자세히 설명하면 지노드에 변경사항이 발생하면, 트랜잭션 로그에 추가가 됩니다. 그리고 로그가 커지게 되면 현재 모든 지노드의 상태 스냅샷이 파일시스템에 저장이됩니다. 그러므로 이 디렉토리 안에 들어가있는 스냅샷 파일은 이전 로그들이므로 중요한 디렉터리입니다.

     

     

    tickTime: 주키퍼가 사용하는 시간(하트비트, 타임아웃)에 대한 기본 측정 단위 (ms).

     

    initTime: 팔로워가 리더와 초기에 연결하는 시간에 대한 타임아웃. (tickTime이 2초니까 initTime은 결국 10초)

     

    syncLimit: 팔로워가 리더와 동기화하는 시간에 대한 타임아웃. (4초)

     

    clientPort: 주키퍼가 사용하는 TCP 포트. (한 인스턴스에서 클러스터를 구축하시는 분들은 각 서버별로 포트를 다르게 해주어야 합니다)

     

    server.x: 앙상블을 구축하기 위한 서버 설정, server.myid 형식으로 사용.

     

    여기서 2888포트는 앙상블내 서버끼리 동기화하는데 사용되는 포트이며 3888포트는 리더를 선출하는데 사용되는 포트입니다

     

     

    server.x에 대한 부분은 각 서버에 맞게,  1번 서버에서 1번 서버를 선언할 때 0.0.0.0처럼 아래와 같이 각자 다르게 설정하시면 됩니다.

    여기서 1번 2번 3번 은 어떻게 구분을 하지? 궁금하실 수 있습니다. 3-2 섹션에서 설명드리겠습니다.

    각 서버의 server.myid 설정

    더 자세한 설정은 http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_configuration 에서 확인하실 수 있습니다.

     

    3-2. 주키퍼 서버 ID 설정

     

    위에서 server.myid의 형식으로 서버를 선언하여 앙상블을 구축한다고 하였습니다.

    이제 앙상블 내 주키퍼 노드를 구분하기 위한 id를 설정하겠습니다.

    dataDir에서 선언하신 디렉터리 하위에 zookeeper라는 폴더를 만드신 후 myid라는 파일명에 myid에 해당하는 정수를 입력한 파일을 만드시면 됩니다.

     

    (1번 서버 예시)

    mkdir /tmp/zookeeper
    
    echo 1 > /tmp/zookeeper/myid
    

     

    3-3. 주키퍼 서버 구동

     

    서버 실행 파일과 서버 설정 파일을 이용하여 아래의 2개 중 1개를 택하여 명령어를 실행시키시면 주키버 서버가 구동이 되는 것을 확인하실 수 있습니다.

    bin/zookeeper-server-start.sh ./config/zookeeper.properties
    또는
    bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
    
    
    # 한개의 인스턴스에서 클러스터를 구축하시는 분들
    bin/zookeeper-server-start.sh -daemon ./config/zookeeper_1.properties ./config/zookeeper_2.properties ./config/zookeeper_3.properties

     

    3-4. 주키퍼 서버 확인

     

    인스턴스에 맞는 ip를 사용하여 아래의 명령어를 실행시킵니다.

    bin/zookeeper-shell.sh x.x.x.x:2181

     

    정상적으로 실행된 것을 확인하실 수 있습니다.

     

     

    4. 카프카 서버 설정 및 구동

    압축을 해제하신 폴더의 config 폴더 내부에 카프카 서버 설정을 담당하는 server.properties라는 파일이 존재합니다.

    저는 어떤 환경이든 default로 제공한 원본 설정을 따로 저장해두는것이 좋은 습관이라고 여겨 복사를 해둡니다.

    cp kafka_2.13-2.4.1/config/server.properties kafka_2.13-2.4.1/config/server_origin.properties

     

    4-1. 카프카 서버 설정

     

    저는 기본제공된 설정에서 아래의 설정만 바꾸었습니다.

     

    1. broker.id - 필수 - 브로커 (카프카 서버)를 구분하기 위한 ID로 유니크해야합니다.

    2. listeners - PLAINTEXT://:9092 (한 인스턴스에서 클러스터를 구성하시는 분들은 각 설정의 포트번호가 달라야합니다)

    3. advertiesed.listeners - PLAINTEXT://x.x.x.x:9092 (PLAINTEXT://각 인스턴스 ip: lisetners에서 설정한 포트)

    4. zookeeper.connect - 필수 - x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092(각 서버 별 IP와 설정한 listeners의 포트)

     

    여기서 PLAINTEXT라는것이 대체 뭐지?? 저런 프로토콜이 있나? 라고 생각하시는분이 계실겁니다. 제가 그랬습니다.

    아래 두 링크를 읽어보시면 도움이 될것입니다.

    https://stackoverflow.com/questions/50737950/what-does-plaintext-keyword-means-in-kafka-configuration

    http://kafka.apache.org/11/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html

     

     

    Tip. zookeeper.session.timeout.ms 을 3초이상으로 설정해두는것을 권장합니다.

    간혹 메모리를 너무 많이 사용하게되면 FGC(Full Garbage Collection)이 발생하면서 서버가 일시적으로 멈춤현상이 일어나게됩니다.

    해당 설정을 너무 짧게 설정하시면 의도치않게 서버가 다운된것으로 판단해 오작동을 일으킬수 있습니다.

    그러므로 GC타임을 주기적으로 체크하는것을 권장합니다

     

    더 자세한 설정은 http://kafka.apache.org/documentation/#brokerconfigs 에서 확인하실 수 있습니다.

     

     

    (3번 서버 예시)

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # see kafka.server.KafkaConfig for additional details and defaults
    
    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=2
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    listeners=PLAINTEXT://:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set,
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    advertised.listeners=PLAINTEXT://13.125.xxx.xx7:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    #
    auto.create.topics.enable=false
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=24
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=13.209.1xx.xx:2181, 13.125.2xx.xxx:2181, 13.125.1xx.xxx:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0

     

    4-2. 카프카 서버 실행

    bin/kafka-server-start.sh ./config/server.properties
    
    또는
    
    bin/kafka-server-start.sh -daemon ./config/server.properties
    
    # 한개의 인스턴스에서 클러스터를 구축하시는 분들
    bin/kafka-server-start.sh ./config/server_1.properties ./config/server_2.properties ./config/server_3.properties

     

    혹시 아래와같은 에러가 발생하실 수 도 있습니다.

    저는 메모리 1GB인  t2.micro로 인스턴스를 생성하였는데 카프카의 기본 메모리 설정이 1GB여서 에러가 발생하였습니다.

    그럴 경우 bin/kafka-server-start.sh 파일을 아래와 같이 적당한 메모리 용량으로 설정해주시면 됩니다.

     

    5. 확인

    아래의 명령어를 통해 토픽을 생성하고 확인하고 상세정보를 통해 파티션이 제대로 잘 생성됬는지 등을 확인하며 클러스터가 제대로 구축된것을 확인하실 수 있습니다

    # 토픽 리스트 확인
    bin/kafka-topics.sh --list --zookeeper 13.209.1xx.xxx:2181, 13.125.2xx.xxx:2181, 13.125.1xx.xxx:2181
    
    # 토픽 생성
    bin/kafka-topics.sh --create --zookeeper 13.209.1xx.xxx:2181, 13.125.2xx.xxx:2181, 13.125.1xx.xxx:2181 --replication-factor 3 --partitions 2 --topic testTopic
    
    # 토픽 상세정보 확인
    bin/kafka-topics.sh --describe --zookeeper 13.209.1xx.xxx:2181, 13.125.2xx.xxx:2181, 13.125.1xx.xxx:2181

     

     

    수정 및 피드백은 언제나 환영입니다 :)

Designed by Tistory.