개요
- 지난시간에 설치한 kafka with k8s를 이용하여 python으로 producer, consumer 수행 테스트
Consumer 수행시켜 놓고 Producer 수행
python 준비
1
pip3 install kafka-python
Consumer
- Topic명을 지정.
- Topic : test
1
2
3
4
5
6
7
8
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers='localhost:9092',
enable_auto_commit=True, auto_offset_reset='latest')
for message in consumer:
print(message.value.decode('utf-8'))
- 수행
1
python consumer.py
Producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
start = time.time()
for i in range(10):
print("index ", i)
data = {'str': 'result' + str(i)}
producer.send('test', value=data)
producer.flush()
print("elapsed :", time.time() - start)
- 수행
1
python producer.py
결과
1
2
3
4
5
6
7
8
9
10
11
% python consumer.py
{"str": "result0"}
{"str": "result1"}
{"str": "result2"}
{"str": "result3"}
{"str": "result4"}
{"str": "result5"}
{"str": "result6"}
{"str": "result7"}
{"str": "result8"}
{"str": "result9"}
consumer는 offset을 잘 지정해야한다.
다음 목표
- 채팅이니까 우선 웹페이지… (아 싫어라)
마무리
간단하게 kafka-python을 사용해봤는데 offset이나 여러가지 옵션을 이용해야 원활히 사용할 수 있을 것 같다.