goodbye

Apache Kafka Producer 본문

Kafka

Apache Kafka Producer

goodbye 2023. 1. 25. 07:39
프로듀서의 기본 흐름

카프카 프류듀서 어플리케이션은 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송합니다. 프로듀서는 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신합니다. 프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너 배치 생성 단계를 거칩니다.

프로듀서 내부 구조는 아래와 같은 구조를 가지고 있습니다. ProducerRecord 는 프로듀서에서 생성하는 레코들 오프셋을 포함하지 않습니다. send() 메서드를 통해 레코드를 전송 요청하게 되면 Partitioner 가 어느 파티션으로 전송할지 지정합니다 (기본값으로 DefaultParitioner 설정) 그리고 Accumulator 가 높은 데이터 처리량을 위해 전송 할 데이터를 배치로 모으는 버퍼 기능을 수행합니다.

프로듀서의 기본 파티셔너

프로듀서 API 를 사용하면 UnifromStickyParitionerRoundRobinPartitioner 2개 파티셔너를 제공합니다.

카프카 클라이언트 라이브러리 2.5.0 버전에서는 파티셔너를 지정하지 않을 경우 UnifromStickyParitioner 파티셔너가 기본으로 설정됩니다.

메시지 키가 있을 경우 동작 순서는 아래와 같습니다.

  1. UnifromStickyParitionerRoundRobinPartitioner 둘다 메세지 키가 있을 때는 메시지 키의 해시값과 파티션을 매칭하여 레코드를 전송합니다
  1. 동일한 메시지 키가 존재하는 레코드는 동일한 파티션 번호에 전달됩니다
  1. 만약 파티션 개수가 변경될 경우 메세지 키와 파티션 번호 매칭은 깨지게 됩니다

    ❗️ 따라서 파티션 개수는 여유있게 생성하는것을 고려해야합니다

메세지 키가 없을 경우에는 파티션에 최대한 동일하게 분배하는 로직이 들어있는데 UniformStickyPartitionerRoundRobinPartitioner 의 단점을 개선하여 향상된 성능을 가지고 있습니다

프로듀서의 커스텀 파티셔너

카프카 클라어언트 라이브러리에서는 사용자 지정 파티셔너를 생성하기 위한 Partitioner 인터페이스를 제공합니다. Partitioner 인터페이스를 상속받는 사용자 정의 클래스에서 message key 또는 message value 에 따라 파티션을 지정할 수도 있습니다.

파티셔너를 통해 파티션이 지정된 데이터는 Accumlator 에 버퍼로 쌓이고 sender thread 는 accumlator 에 쌓인 배치 데이터를 가져가 카프카 브로커로 전송합니다

RoundRobinPartitionerUniformStickyPartitioner
ProducerRocord가 들어오는대로 파티션 순회하면서 전송모든 파티션에 분배 전송(배치로 묶일뿐 결국 파티션을 순회)
전송 성능이 낮음 - Accumlator에 묶이는 정도가 적기 때문Accumlator에 레코드가 배치로 묶일때까지 기다렸다 전송

프로듀서 주요 옵션(필수 옵션)

프로듀서를 실행할때 반드시 지정해야하는 옵션입니다. (defalut 옵션 없음)

bootstrap-servers프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성해야합니다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는데 이슈가 없도록 설정 가능합니다
key.serialzer레코드의 메세지 키를 직렬화하는 클래스를 지정합니다
value.serializer레코드의 메세지 값을 직렬화 하느 클래스를 지정합니다

프로듀서 주요 옵션(선택 옵션)

acks프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용하는 옵션, 0, 1, -1(all) 중 하나로 설정 가능 deaflut : 1
linger.ms배치를 전송하기전까지 기다리는 최소 시간 defalut : 0
retries브로커로부터 에러를 받고 난뒤 재전송을 시도하는 횟수 default : 2147483647
max.in.flight.requests.per.connection한번에 요청하는 최대 커넥션 개수, 설정된 값만큼 동시에 전달 요청 수행 default : 5
partitioner.class레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정 default : org.apache.kafka.clients.producer.internals.DefaultPartitioner
enable.idempotence멱등성 프로듀서로 동작할지 여부 설정 (네트워크 이슈 발생시 중복데이터 허용 여부) defalut : false (v2.5.0) , true(3.x.x)
transactional.id프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부 설정 defalut : null
In-Sync-Replicas(ISR) 와 acks 옵션

ISR 은 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태를 의미합니다. 복제 개수가 2인 토픽을 가정했을때, 이 토픽에는 리터 파티션 1개와 팔로워 파티션 1개가 존재합니다. 리터파티션에 0부터 10의 오프셋이 있다고 가정한다면, 팔로워 파티션에 동기화가 완료되기 위해서 0부터 10까지 오프셋이 동일하게 존재해야합니다.

동기화가 완료되었다는 의미는 리더 파티션의 모든 데이터가 팔로워 파티션에 복제된 상태를 의미합니다

ISR 이라는 용어가 등장한 이유는 팔로워 파티션이 리더 파티션으로부터 데이터를 복제하는 데 시간이 걸리기 때문입니다. 프로듀셔가 특정 파티션에 데이터를 저장하는 작업은 리더 파티션을 통해 처리합니다. 이때 리터 파티션에 새로운 레코드가 추가되어 오프셋이 증가하면 팔로워 파티션이 위치한 브로커는 리터 파티션의 데이터를 복제하는데 리터 파티션에 데이터가 적재된 이후 팔로워 파티션이 복제하는 시간차이(Recplication lac) 때문에 리더 파티션과 팔로워 파티션 간에 오프셋 차이가 발생합니다

acks

카프카 프로듀서의 acks 옵션은 0, 1, all(또는 -1) 값을 가질 수 있습니다. 이 옵션을 통해 프로듀서가 전송한 데이터가 카프카 클러스터에 얼마나 신뢰성 높게 저장할지 지정 할 수 있습니다. 그리고 acks 옵션에 따라 성능이 달라질수 있으므로 acks 옵션에 따른 카프카의 동작 방식을 상세히 알고 설정해야 합니다.

복제 개수가 1인 경우 acks 옵션에 따른 성능 변화는 크지 않습니다. 그러나 안정적으로 데이터를 운영하기 위해서는 실제 실무 환경에서는 복제개수가 2 이상으로 운영하는 경우가 대부분입니다.

acks=0

acks 를 0으로 설정하는 케이스는 프로듀서가 리더 파티션으로 데이터를 전송했을때 리더 파티션으로 데이터가 저장되었을때 확인하지 않는다는 뜻입니다. 리더 파티션은 데이터가 저장된 이후에 데이터가 몇 번째 오프셋에 저장되었는지 리턴하는데, acks가 0으로 설정되어 있으면 프로듀서는 리더 파티션에 데이터가 저장되었는지 여부에 대한 응답 값을 받지 않습니다.

즉 리더 파티션에 데이터가 정상적으로 전송되었는지 나머지 파티션에 정상적으로 복제되었는지 알수 없으므로 신뢰성이 낮은 정책에 해당합니다. 예컨대 리터파티션의 브로커가 장애가 났을 경우에 데이터 유실 가능성이 존재합니다.

다만 데이터의 전송속도는 acks 를 1또는 all 로 했을때보다 훨씬 빠릅니다. 따라서 데이터의 일부 유실이 발생을 감수하더라도 전송속도가 중요한 경우에는 이 옵션값을 사용해도 좋습니다. 예를 들면 GPS 네비게이션 데이터를 제공하는 경우 일부데이터가 유실되더라도 이슈가 없고 속도가 중요하기때문에 사용을 고려할수 있습니다.

acks=1

acks 를 1으로 설정하는 케이스는 프로듀서가 보낸 데이터를 리터 파티션만 확인하고 나머지 파티션에 정상적으로 복제되었는지 여부는 확인하지 않습니다. 만약 리더 파티션에 정상적으로 적재되지 않았다면 리더 파티션에 적재될때까지 재시도 할 수 있지만 리더 파티션에 적재가 끝나고 팔로워 파티션에 데이터를 복제하기 직전에 리터 파티션이 있는 브로커에 장애가 발생하면 동기화하지 못한 데이터가 유실 될 수 있습니다.

대다수 실무 환경에서는 1으로 설정하여 운영하여도 충분한 신뢰도를 얻을수 있다고 합니다

acks=all

acks를 all 으로 설정하는 케이스는 프로듀서가 보낸 데이터를 리터 파티션 뿐만 아니라 팔로워 파티션까지 모두 정상적으로 적재되었는지 확인합니다. 리더 파티션뿐만 아니라 팔로워 파티션까지 데이터가 적재되었는지 확인하기 때문에 0 또는 1 옵션 보다 속도가 가장 느리지만 일부 브로커에 장애가 발생하더라도 프로듀서는 안전하게 데이터를 전송하고 저장할 수 있음을 보장 받을 수 있습니다.

min.insync.replicas

해당 옵션은 프로듀서가 리더 파티션과 팔로워 파티션에 데이터가 적재되었는지 확인하기 위한 최소 ISR 그룹의 파티션 개수입니다. 예를 들어 min.insync.replicas 옵션 값이 1이라면 ISR 중 최소 1개 이상의 파티션에 데이터가 적재되었음을 확인하는 것입니다. 이 경우 acks 를 1로 했을 때와 동일한 동작을 하는데 ISR 중 가장 처음 적재가 완료되는 파티션은 리터 파티션이기 때문입니다.

그리고 acks 를 all 로 설정한 경우에는 토픽 단위로 설정 가능한 min.insync.replicas 옵션값에 따라 데이터의 안정성이 달라집니다.

Single Message Producer

단일 메세지 레코드를 전송하려는 경우 ProducerRecord 생성 시 파라미터로 추가하면 됩니다. 토픽이름과 함께 메세지를 파라미터로 넣고 생성하면 메세지가 저장됩니다. 간단한 구현 방법은 아래와 같습니다.

@Slf4j
public class SimpleProducer {

    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        String messageValue = "testMessage1";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);  // producer  메시지 전송
        log.info("send message : {}", record);
        producer.flush(); // accumulator 쌓인 데이터를 모두 전송
        producer.close(); // producer 종료
    }
}

kafka-console.consumer.sh 명령어를 통해 토픽을 확인 할 수 있습니다

다만 토픽을 확인 하기 전에 zookeeper 와 kafka broker 가 모두 기동되어 있어야 합니다.

Key Value Producer

메세지 키와 메세지 값이 포함된 레코드를 전송하려는 경우 ProducerRecord 생성 시 파라미터로 추가해야 합니다. 토픽 이름, 메세지 키, 메시지 값을 순서대로 파라미터로 넣고 생성하면 메세지 키가 저장 됩니다

org.apache.kafka:kafka-clients library 를 이용한 간단한 구현방법은 아래와 같습니다

@Slf4j
public class KeyValueProducer {

    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Nation", "Korea");
        producer.send(record);
        ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "Nation", "Japan");
        producer.send(record2);
        ProducerRecord<String, String> record3 = new ProducerRecord<>(TOPIC_NAME, "City", "Seoul");
        producer.send(record3);
        ProducerRecord<String, String> record4 = new ProducerRecord<>(TOPIC_NAME, "City", "Tokyo");
        producer.send(record4);

        producer.flush(); // accumulator 쌓인 데이터를 모두 전송
        producer.close(); // producer 종료
    }
}

kafka-console.consumer.sh 명령어를 통해 토픽을 확인 할 수 있습니다

파티션 번호를 지정한 Producer

메세지를 생산할때 파티션을 직접 지정하여 전달 할 수도 있습니다

public class ProducerExactPartition {

    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        int partitionNum = 0;
        ProducerRecord<String, String> record = new ProducerRecord<>(
                TOPIC_NAME, partitionNum,"Partition", "Partition");

        producer.send(record);
        producer.flush(); // accumulator 쌓인 데이터를 모두 전송
        producer.close(); // producer 종료
    }
}

Custom Partitioner Producer

프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 할 때가 있습니다. 예를 들어 특정 메세지 값을 가진 메세지 키가 반드시 0번 파티션으로 들어가야 한다고 가정하면, 기본 설정 파티셔너를 사용 할 경우 메세지 키의 해시값을 파티션에 매칭하여 데이터를 전송하기때문에 어느 파티션에 들어갓는지 알 수 없습니다. 이 때 Patitioner Interface 를 사용하여 custom Partitioner 를 생성하면 특정 메세지 값을 가진 메세지 키에 대해서 반드시 파티션 0번으로 지정하도록 설정 가능합니다. 이렇게 지정할 경우 토픽의 파티션 개수가 변경되더라도 특정 메세지 키를 가진 데이터는 파티션 0번에 적재됩니다

아래는 Kakao 라는 키의 모든 토픽을 0번 파티션에 전송하는 예제입니다

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithCustomPartitioner {

    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties config = new Properties();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // Custom Partitioner 설정
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Kakao", "Kakao");
        producer.send(record);

        producer.flush(); // accumulator 쌓인 데이터를 모두 전송
        producer.close(); // producer 종료
    }
}
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            throw new InvalidRecordException("message key is empty");
        }

        if (key.equals("Kakao")) {  // Kakao 라는 키가 들어오면 0번 파티션에 전송
            return 0;
        }

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

레코드의 전송 결과를 확인하는 Producer

KafkaProducer 의 send() 메서드는 Future 객체를 반환합니다. 이 객체는 RecoredMetadata 의 비동기 결과를 표현하는 것으로 ProducerRecord 가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어 있습니다. 다음 코드와 같이 get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있습니다.

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

@Slf4j
public class ProducerWithSyncCallBack {

    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(config);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Naver", "Naver");
        try {
            RecordMetadata metadata = producer.send(record).get();
            log.info("Partition: {}, Offset: {}", metadata.partition(), metadata.offset());
        } catch (Exception e) {
            log.error("Error while producing {},", e.getMessage(), e);
        } finally {
            producer.flush(); // accumulator 쌓인 데이터를 모두 전송
            producer.close(); // producer 종료
        }
    }
}

ACKS 값을 따로 설정하지 않고 메세지를 전송하면 defalut 로 1로 설정하여 전송하게 되고 아래와 같이 어떤 파티션과 어떤 offset에 저장되었는지 결과를 리턴받을 수 있습니다.

ACKS 값을 0으로 직접 설정하고 다시 메세지를 전송하게 되면 offset 값을 확인 할수 없게 됩니다


config.put(ProducerConfig.ACKS_CONFIG, "0");


Uploaded by N2T

'Kafka' 카테고리의 다른 글

Apache Kafka CLI  (0) 2023.01.24
Comments