연재 시리즈

쿠버네티스 오퍼레이터 스터디 3주차 - strimzi operator를 이용한 카프카 토픽 읽기/쓰기 작업

악분 2022. 6. 12. 16:18
반응형

안녕하세요. 이번 글은 strimzi operator을 이용하여 카프카 클러스터에 읽기/쓰기 작업과정을 다룹니다. 이 글을 실습하기 위해서는 strimzi operator와 카프카 클러스터가 설치되어 있어야 합니다.

 

1. 준비

카프카 클라이언트와 엔드포인트를 설정합니다.

 

1.1 카프카 클라이언트 생성

카프카 클라이언트를 pod로 생성합니다. 쓰기작업을 요청하는 producer pod와 읽기 작업을 요청하는 consumer pod를 생성합니다.

# producer
apiVersion: v1
kind: Pod
metadata:
  name: myclient2
  labels:
    app: myclient
spec:
  nodeName: k8s-m
  containers:
  - name: myclient2
    image: bitnami/kafka:3.2
    command: ["tail"]
    args: ["-f", "/dev/null"]
  terminationGracePeriodSeconds: 0
---
# consumer
apiVersion: v1
kind: Pod
metadata:
  name: myclient2
  labels:
    app: myclient
spec:
  nodeName: k8s-m
  containers:
  - name: myclient2
    image: bitnami/kafka:3.2
    command: ["tail"]
    args: ["-f", "/dev/null"]
  terminationGracePeriodSeconds: 0

 

1.2 카프카 엔드포인트 설정

카프카 엔드포인트는 쿠버네티스 서비스를 사용합니다. 엔드포인트를 리눅스 변수로 저장합니다.

이전글 참고: https://malwareanalysis.tistory.com/348
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

 

1.3 토픽 생성

crd를 이용하여 토픽을 생성합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: mytopic1 
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 1
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
    min.insync.replicas: 2

 

2. 토픽에 데이터 쓰기

카프카 클라이언트에서 데이터를 쓰려면 producer.sh스크립트를 실행하여 쓰기모드로 진입합니다. 쓰기 모드에 성공적으로 진입하면 화살표 ">" 가 표시됩니다.

kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1

 

토픽에 hello문자열과 world문자열을 저장하겠습니다. 쓰기 모드를 나가고 싶다면 Ctrl+C 또는 Ctrl+D를 누르시면 됩니다.

 

3. 토픽 데이터 읽기

카프카 클라이언트에서는 kafka-console-consumer.sh를 이용하여 토픽 데이터를 읽어올 수 있습니다. 처음부터 모든 데이터를 가져오기 위해   --from-beginning 인자를 사용했습니다.

hello, world가 두번 출력된 이유는, 제가 [챕터2]에서 쓰기 작업을 2번해서 입니다.
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning

 

4. 토픽 쓰기/읽기 실시간

정말 쓰기/읽기가 실시간으로 되는지 한 화면에 출력해보겠습니다.

 

# 쓰기모드(왼쪽 화면)
kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1

# 읽기모드(오른쪽 화면)
kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1

반응형