Kafka Python Producer, Consumer
포스트
취소

Kafka Python Producer, Consumer

개요

  • 지난시간에 설치한 kafka with k8s를 이용하여 python으로 producer, consumer 수행 테스트

Consumer 수행시켜 놓고 Producer 수행

python 준비

1
pip3 install kafka-python

Consumer

  • Topic명을 지정.
    • Topic : test
1
2
3
4
5
6
7
8
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         bootstrap_servers='localhost:9092',
                         enable_auto_commit=True, auto_offset_reset='latest')

for message in consumer:
    print(message.value.decode('utf-8'))
  • 수행
1
python consumer.py

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
from json import dumps

from kafka import KafkaProducer

producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))
start = time.time()
for i in range(10):
    print("index ", i)
    data = {'str': 'result' + str(i)}
    producer.send('test', value=data)
    producer.flush()

print("elapsed :", time.time() - start)
  • 수행
1
python producer.py

결과

1
2
3
4
5
6
7
8
9
10
11
% python consumer.py
{"str": "result0"}
{"str": "result1"}
{"str": "result2"}
{"str": "result3"}
{"str": "result4"}
{"str": "result5"}
{"str": "result6"}
{"str": "result7"}
{"str": "result8"}
{"str": "result9"}

consumer는 offset을 잘 지정해야한다.

다음 목표

  • 채팅이니까 우선 웹페이지… (아 싫어라)

마무리

간단하게 kafka-python을 사용해봤는데 offset이나 여러가지 옵션을 이용해야 원활히 사용할 수 있을 것 같다.

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.

Kafka with K8s

npm ERR! getaddrinfo ENOTFOUND artifactory.twitter.biz (kubeflow pipeline frontend)