NATS의 Pub/Sub, Req/Reply, JetStream 예제
2025. 6. 22. 18:42ㆍProgramming/Dev_etc
NATS docker 실행 및 설정 방법
docker 실행 방법
docker run --rm -it \\
-p 4222:4222 \\ # 클라이언트 연결 포트 (기본 포트)
-p 6222:6222 \\ # 서버 간 클러스터링 포트 (필요시)
-p 8222:8222 \\ # NATS 서버의 상태 및 모니터링 정보를 HTTP로 제공하는 포트 (Grafana 용)
nats:2.10-alpine \\ # 사용할 Docker 이미지: NATS v2.10, 경량 Alpine 기반
-js -m 8222 # JetStream 활성화
nats 서버 설정
# nats.conf
port: 4222
http_port: 8222
jetstream {
store_dir: nats
# 1GB
max_memory_store: 1073741824
# 10GB
max_file_store: 10737418240
}
server_name: "nats-dev"
# 설정 파일로 도커 실행
docker run --rm -it \\
-p 4222:4222 \\
-p 8222:8222 \\
-v $(pwd)/nats.conf:/etc/nats/nats.conf \\ # 로컬의 nats.conf를 컨테이너 안으로 마운트
nats:2.10-alpine \\
-c /etc/nats/nats.conf # 해당 설정 파일을 사용해 NATS 서버 시작
nats cli, nats nui 사용 방법
nats cli
메시징 시스템을 터미널에서 직접 조작할 수 있도록 도와주는 명령어 기반 도구
설치 방법
#설치 mac os
brew tap nats-io/nats-tools
brew install nats-io/nats-tools/nats
#설치 window
scoop bucket add extras
scoop install extras/natscli
#설치 Linux
yay natscli
Pub/Sub 방식 예제
# [메시지 subscribe]
# nats sub <subject>
nats sub "order.>"
# [메시지 publish]
# nats pub <subject> <message>
nats pub order.status.123 "Hello 123"
nats pub order.status.456 "Hello 456"
nats pub order.demo "message {{.Count}} @ {{.TimeStamp}}" --count=5
output
(base) ➜ ~ nats pub order.status.123 "Hello 123"
(base) ➜ ~ nats pub order.status.456 "Hello 456"
(base) ➜ ~ nats pub order.demo "message {{.Count}} @ {{.TimeStamp}}" --count=5
15:05:16 Published 9 bytes to "order.status.123"
15:05:16 Published 9 bytes to "order.status.456"
15:05:16 Published 37 bytes to "order.demo"
15:05:16 Published 37 bytes to "order.demo"
15:05:16 Published 37 bytes to "order.demo"
15:05:16 Published 37 bytes to "order.demo"
15:05:16 Published 37 bytes to "order.demo"
(base) ➜ projects nats sub "order.>"
15:04:44 Subscribing on order.>
[#1] Received on "order.status.123"
Hello 123
[#2] Received on "order.status.456"
Hello 456
[#3] Received on "order.demo"
message 1 @ 2025-06-18T15:05:16+09:00
[#4] Received on "order.demo"
message 2 @ 2025-06-18T15:05:16+09:00
[#5] Received on "order.demo"
message 3 @ 2025-06-18T15:05:16+09:00
[#6] Received on "order.demo"
message 4 @ 2025-06-18T15:05:16+09:00
[#7] Received on "order.demo"
message 5 @ 2025-06-18T15:05:16+09:00
Request/Reply 방식 예제
# [메시지 subscribe]
# --match-replies request와 reply를 쌍으로 보여주는 옵션
nats sub --match-replies "order.demo"
# order.demo로 "Ping"이라는 메시지를 요청하고, order.demo를 구독한 리스폰더가 있을 경우, 그 응답을 받아 출력
# [메시지 reply]
# nats reply <subject> <message>
nats reply order.demo "Pong"
# [메시지 request]
# nats request <subject> <message>
nats request order.demo "Ping"
output
(base) ➜ ~ nats sub --match-replies "order.demo"
00:28:57 Matching replies with inbox prefix _INBOX.>
00:28:57 Subscribing on order.demo
[#1] Received on "order.demo" with reply "_INBOX.W2yppiPfhP0xyA8TLSiT91.oYaxDxWs"
Ping
[#1] Matched reply on "_INBOX.W2yppiPfhP0xyA8TLSiT91.oYaxDxWs"
Pong
(base) ➜ ~ nats reply order.demo "Pong"
00:29:24 Listening on "order.demo" in group "NATS-RPLY-22"
00:29:35 [#0] Received on subject "order.demo":
Ping
(base) ➜ ~ nats request order.demo "Ping"
00:29:35 Sending request on "order.demo"
00:29:35 Received with rtt 5.871542ms
Pong
QueueGroup 방식 예제
# [메시지 subscribe]
# --queue=Q1 Queue Group 이름 Q1에 등록
nats sub "order.>" --queue=Q1 # 터미널 A
nats sub "order.>" --queue=Q1 # 터미널 B
# [메시지 publish]
# nats pub <subject> <message>
nats pub order.status.123 "Hello 123"
nats pub order.status.456 "Hello 456"
nats pub order.demo "message {{.Count}} @ {{.TimeStamp}}" --count=5
output
(base) ➜ ~ nats sub "order.>" --queue=Q1
00:39:11 Subscribing on order.>
[#1] Received on "order.status.123"
Hello 123
[#2] Received on "order.demo"
message 2 @ 2025-06-18T00:39:21+09:00
[#3] Received on "order.demo"
message 5 @ 2025-06-18T00:39:21+09:00
(base) ➜ ~ nats sub "order.>" --queue=Q1
00:39:16 Subscribing on order.>
[#1] Received on "order.status.456"
Hello 456
[#2] Received on "order.demo"
message 1 @ 2025-06-18T00:39:21+09:00
[#3] Received on "order.demo"
message 3 @ 2025-06-18T00:39:21+09:00
[#4] Received on "order.demo"
message 4 @ 2025-06-18T00:39:21+09:00
JetStream
단순한 메시지 전달을 넘어서, 메시지를 저장하고 재전송 가능
# [stream 생성]
# nats add stream --subjects <subject 패턴명>
nats stream add ORDERS --subjects "order.>"
# [주의] 겹치는 subject 패턴명으로 stream을 만들 수 없음.
# nats: error: could not create Stream: subjects overlap with an existing stream (10065)
# [JetStream config 설정]
[localhost] ? Storage file
[localhost] ? Replication 1
[localhost] ? Retention Policy Limits
[localhost] ? Discard Policy Old
[localhost] ? Stream Messages Limit -1
[localhost] ? Per Subject Messages Limit -1
[localhost] ? Total Stream Size -1
[localhost] ? Message TTL -1
[localhost] ? Max Message Size -1
[localhost] ? Duplicate tracking time window 2m0s
[localhost] ? Allow message Roll-ups No
[localhost] ? Allow message deletion Yes
[localhost] ? Allow purging subjects or the entire stream Yes
# [stream 생성 확인]
nats stream ls
# [메시지 publish]
# nats pub <subject> <message>
nats pub order.status.123 "Hello 123"
nats pub order.status.456 "Hello 456"
nats pub order.demo "message {{.Count}} @ {{.TimeStamp}}" --count=5
nats pub --js work.order.status.123 "Hello 123"
nats pub --js work.order.status.456 "Hello 456"
nats pub --js work.order.demo "message {{.Count}} @ {{.TimeStamp}}" --count=5
# [consumer 생성]
# nats consumer add <stream명> <consumer명>
nats consumer add ORDERS order-puller --pull
# [consumer config 설정]
[localhost] ? Start policy (all, new, last, subject, 1h, msg sequence) all
[localhost] ? Acknowledgment policy explicit
[localhost] ? Replay policy instant
[localhost] ? Filter Stream by subjects (blank for all)
[localhost] ? Maximum Allowed Deliveries -1
[localhost] ? Maximum Acknowledgments Pending 0
[localhost] ? Deliver headers only without bodies No
[localhost] ? Add a Retry Backoff Policy No
# [pull 메시지 요청]
# nats consumer next <stream명> <consumer명> --옵션
nats consumer next ORDERS order-puller --count=2 --ack
output_limits
(base) ➜ ~ nats stream add ORDERS --subjects "order.>"
[localhost] ? Storage file
[localhost] ? Replication 1
[localhost] ? Retention Policy Limits
[localhost] ? Discard Policy Old
[localhost] ? Stream Messages Limit -1
[localhost] ? Per Subject Messages Limit -1
[localhost] ? Total Stream Size -1
[localhost] ? Message TTL -1
[localhost] ? Max Message Size -1
[localhost] ? Duplicate tracking time window 2m0s
[localhost] ? Allow message Roll-ups No
[localhost] ? Allow message deletion Yes
[localhost] ? Allow purging subjects or the entire stream Yes
Stream ORDERS was created
Information for Stream ORDERS created 2025-06-18 01:01:10
Subjects: order.>
Replicas: 1
Storage: File
Options:
Retention: Limits
Acknowledgments: true
Discard Policy: Old
Duplicate Window: 2m0s
Direct Get: true
Allows Msg Delete: true
Allows Purge: true
Allows Per-Message TTL: false
Allows Rollups: false
Limits:
Maximum Messages: unlimited
Maximum Per Subject: unlimited
Maximum Bytes: unlimited
Maximum Age: unlimited
Maximum Message Size: unlimited
Maximum Consumers: unlimited
State:
Messages: 0
Bytes: 0 B
First Sequence: 0
Last Sequence: 0
Active Consumers: 0
(base) ➜ ~ nats stream ls
╭──────────────────────────────────────────────╮
│ Streams │
├────────┬─────────────┬─────────────────────┬── ≈
│ Name │ Description │ Created │ M ≈
├────────┼─────────────┼─────────────────────┼── ≈
│ EVENTS │ │ 2025-06-11 23:14:32 │ 0 ≈
│ ORDERS │ │ 2025-06-18 01:01:10 │ 0 ≈
╰────────┴─────────────┴─────────────────────┴── ≈
(base) ➜ ~ nats consumer next ORDERS order-puller --count=2 --ack
[01:03:10] subj: order.status.123 / tries: 1 / cons seq: 1 / str seq: 1 / pending: 6
Hello 123
Acknowledged message
[01:03:10] subj: order.status.456 / tries: 1 / cons seq: 2 / str seq: 2 / pending: 5
Hello 456
Acknowledged message
(base) ➜ ~ nats consumer next ORDERS order-puller --count=2 --ack
[01:03:15] subj: order.demo / tries: 1 / cons seq: 3 / str seq: 3 / pending: 4
message 1 @ 2025-06-18T01:02:23+09:00
Acknowledged message
[01:03:15] subj: order.demo / tries: 1 / cons seq: 4 / str seq: 4 / pending: 3
message 2 @ 2025-06-18T01:02:23+09:00
Acknowledged message
nats nui
NATS GUI 툴 : Stream/Consumer 상태 확인, 메시지 전송 및 수신 테스트, 실시간 모니터링
nats-py 사용법 및 예제
Python 언어용 NATS 클라이언트 라이브러리
# 설치
pip install nats-py
Pub/Sub 예제
import asyncio
import nats
async def pub_sub(nc):
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"Received a message on '{subject}': {data}")
await nc.subscribe("order.status.>", cb=message_handler)
for i in range(5):
await nc.publish(f"order.status.{i}", f"Hello {i}".encode())
async def main():
nc = await nats.connect("nats://localhost:4222")
await pub_sub(nc)
await nc.drain()
if __name__ == "__main__":
asyncio.run(main())
output
(base) ➜ nats-examples python -u "/Users/jihyeon/Documents/projects/nats-examples/pub_sub.py"
Received a message on 'order.status.0': Hello 0
Received a message on 'order.status.1': Hello 1
Received a message on 'order.status.2': Hello 2
Received a message on 'order.status.3': Hello 3
Received a message on 'order.status.4': Hello 4
Request/Reply 예제
import asyncio
import nats
async def request_reply(nc):
async def reply_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print(
"Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data
)
)
await nc.publish(msg.reply, b"accepted")
await nc.subscribe("order.request.>", cb=reply_handler)
try:
response = await nc.request("order.request.123", b"request 123", timeout=0.5)
print("Received response: {message}".format(message=response.data.decode()))
except TimeoutError:
print("Request timed out")
async def main():
nc = await nats.connect("nats://localhost:4222")
await request_reply(nc)
await nc.drain()
if __name__ == "__main__":
asyncio.run(main())
output
(base) ➜ nats-examples python -u "/Users/jihyeon/Documents/projects/nats-examples/request_reply.py"
Received a message on 'order.request.123 _INBOX.ny3po7QhSd0PjvG0vrFWsu.ny3po7QhSd0PjvG0vrFWv0403a': request 123
Received response: accepted
JetStream 예제
import asyncio
import nats
async def jetstream(nc):
js = nc.jetstream()
await js.add_stream(name="ORDERS_STREAM_DEMO", subjects=["order.status.>"])
async def qsub_a(msg):
await msg.ack()
print("QSUB A :", msg)
async def qsub_b(msg):
await msg.ack()
print("QSUB B :", msg)
# Queue Group용
await js.subscribe("order.status.>", "workers", cb=qsub_a)
await js.subscribe("order.status.>", "workers", cb=qsub_b)
# Pull Subscription용
for i in range(0, 10):
ack = await js.publish(f"order.status.{i}", f"hello world: {i}".encode())
print(ack)
orders_consumer = await js.pull_subscribe("order.status.>", "orders-consumer")
for i in range(0, 10):
msgs = await orders_consumer.fetch(1)
for msg in msgs:
await msg.ack()
print(msg)
async def main():
nc = await nats.connect("nats://localhost:4222")
await jetstream(nc)
await nc.drain()
if __name__ == "__main__":
asyncio.run(main())
output
(base) ➜ nats-py git:(master) ✗ python -u "/Users/jihyeon/Documents/hits/nats-py/jetstream.py"
PubAck(stream='ORDERS_STREAM_DEMO', seq=11, domain=None, duplicate=None)
QSUB B : Msg(_client=<nats client v2.10.0>, subject='order.status.0', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.11.11.1750263130419579180.0', data=b'hello world: 0', headers=None, _metadata=None, _ackd=True, _sid=3)
PubAck(stream='ORDERS_STREAM_DEMO', seq=12, domain=None, duplicate=None)
QSUB B : Msg(_client=<nats client v2.10.0>, subject='order.status.1', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.12.12.1750263130420237513.0', data=b'hello world: 1', headers=None, _metadata=None, _ackd=True, _sid=3)
PubAck(stream='ORDERS_STREAM_DEMO', seq=13, domain=None, duplicate=None)
QSUB B : Msg(_client=<nats client v2.10.0>, subject='order.status.2', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.13.13.1750263130420870596.0', data=b'hello world: 2', headers=None, _metadata=None, _ackd=True, _sid=3)
PubAck(stream='ORDERS_STREAM_DEMO', seq=14, domain=None, duplicate=None)
QSUB B : Msg(_client=<nats client v2.10.0>, subject='order.status.3', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.14.14.1750263130421416596.0', data=b'hello world: 3', headers=None, _metadata=None, _ackd=True, _sid=3)
PubAck(stream='ORDERS_STREAM_DEMO', seq=15, domain=None, duplicate=None)
QSUB A : Msg(_client=<nats client v2.10.0>, subject='order.status.4', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.15.15.1750263130421912138.0', data=b'hello world: 4', headers=None, _metadata=None, _ackd=True, _sid=2)
PubAck(stream='ORDERS_STREAM_DEMO', seq=16, domain=None, duplicate=None)
QSUB B : Msg(_client=<nats client v2.10.0>, subject='order.status.5', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.16.16.1750263130422363471.0', data=b'hello world: 5', headers=None, _metadata=None, _ackd=True, _sid=3)
PubAck(stream='ORDERS_STREAM_DEMO', seq=17, domain=None, duplicate=None)
QSUB B : Msg(_client=<nats client v2.10.0>, subject='order.status.6', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.17.17.1750263130422791971.0', data=b'hello world: 6', headers=None, _metadata=None, _ackd=True, _sid=3)
PubAck(stream='ORDERS_STREAM_DEMO', seq=18, domain=None, duplicate=None)
QSUB A : Msg(_client=<nats client v2.10.0>, subject='order.status.7', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.18.18.1750263130423239888.0', data=b'hello world: 7', headers=None, _metadata=None, _ackd=True, _sid=2)
QSUB A : Msg(_client=<nats client v2.10.0>, subject='order.status.8', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.19.19.1750263130423697305.0', data=b'hello world: 8', headers=None, _metadata=None, _ackd=True, _sid=2)
PubAck(stream='ORDERS_STREAM_DEMO', seq=19, domain=None, duplicate=None)
QSUB A : Msg(_client=<nats client v2.10.0>, subject='order.status.9', reply='$JS.ACK.ORDERS_STREAM_DEMO.workers.1.20.20.1750263130424153346.0', data=b'hello world: 9', headers=None, _metadata=None, _ackd=True, _sid=2)
PubAck(stream='ORDERS_STREAM_DEMO', seq=20, domain=None, duplicate=None)
Msg(_client=<nats client v2.10.0>, subject='order.status.0', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.11.11.1750263130419579180.9', data=b'hello world: 0', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.1', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.12.12.1750263130420237513.8', data=b'hello world: 1', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.2', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.13.13.1750263130420870596.7', data=b'hello world: 2', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.3', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.14.14.1750263130421416596.6', data=b'hello world: 3', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.4', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.15.15.1750263130421912138.5', data=b'hello world: 4', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.5', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.16.16.1750263130422363471.4', data=b'hello world: 5', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.6', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.17.17.1750263130422791971.3', data=b'hello world: 6', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.7', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.18.18.1750263130423239888.2', data=b'hello world: 7', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.8', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.19.19.1750263130423697305.1', data=b'hello world: 8', headers=None, _metadata=None, _ackd=True, _sid=4)
Msg(_client=<nats client v2.10.0>, subject='order.status.9', reply='$JS.ACK.ORDERS_STREAM_DEMO.orders-consumer.1.20.20.1750263130424153346.0', data=b'hello world: 9', headers=None, _metadata=None, _ackd=True, _sid=4)
FastStream 사용법 및 예제
NATS, Kafka, RabbitMQ, Redis 등의 다양한 메시지 브로커를 추상화하여 마치 FastAPI처럼 간단하고 직관적으로 메시징 애플리케이션을 작성할 수 있게 해줌.
# 설치
pip install 'faststream[nats]'
Pub/Sub 예제
# 메시지 수신자 (Subscriber)
from faststream.nats import NatsBroker
broker = NatsBroker("nats://localhost:4222")
@broker.subscriber("my.topic")
async def handle_msg(msg: str):
print("Received:", msg)
if __name__ == "__main__":
broker.run()
# 메시지 발행자 (Publisher)
from faststream.nats import NatsBroker
broker = NatsBroker("nats://localhost:4222")
async def main():
await broker.start()
await broker.publish("Hello", topic="my.topic")
await broker.close()
JetStream 예제
import asyncio
from faststream.nats import NatsBroker
from faststream.nats.annotations import NatsMessage
from faststream.nats import JStream
broker = NatsBroker("nats://localhost:4222")
# JetStream 설정
stream = JStream(
"ORDERS_STREAM",
num_replicas=1,
subjects=["faststream.orders.*"], # orders.로 시작하는 모든 subject
retention="workqueue", # 메시지 처리 후 삭제
max_msgs=1000, # 최대 메시지 수
max_age=3600, # 메시지 최대 보관 시간 (초)
)
@broker.subscriber("faststream.orders.created", stream=stream)
async def handle_order_created(msg: str, raw_msg: NatsMessage):
print(f"[ORDER CREATED] {msg}")
await raw_msg.ack()
@broker.subscriber("faststream.orders.processed", stream=stream)
async def handle_order_processed(msg: str, raw_msg: NatsMessage):
print(f"[ORDER PROCESSED] {msg}")
await raw_msg.ack()
@broker.subscriber("faststream.orders.cancelled", stream=stream)
async def handle_order_cancelled(msg: str, raw_msg: NatsMessage):
print(f"[ORDER CANCELLED] {msg}")
await raw_msg.ack()
async def publish_orders():
"""주문 메시지들을 발행하는 함수"""
await broker.start()
try:
# 주문 생성 메시지들
for i in range(5):
order_data = f"Order #{i + 1} - Product: Laptop, Price: ${999}"
await broker.publish(order_data, subject="faststream.orders.created")
print(f"Published order created: {order_data}")
await asyncio.sleep(0.5)
# 주문 처리 메시지들
for i in range(3):
order_data = f"Order #{i + 1} - Status: Processing Complete"
await broker.publish(order_data, subject="faststream.orders.processed")
print(f"Published order processed: {order_data}")
await asyncio.sleep(0.5)
# 주문 취소 메시지
order_data = "Order #4 - Status: Cancelled by customer"
await broker.publish(order_data, subject="faststream.orders.cancelled")
print(f"Published order cancelled: {order_data}")
# 메시지들이 처리될 시간을 기다림
print("Waiting for messages to be processed...")
await asyncio.sleep(3)
except Exception as e:
print(f"Error publishing messages: {e}")
finally:
await broker.close()
if __name__ == "__main__":
print("Starting JetStream Example...")
asyncio.run(publish_orders())
print("JetStream Example completed!")
output
(base) ➜ nats-py git:(master) ✗ python -u "/Users/jihyeon/Documents/hits/nats-py/faststream_main.py"
Starting JetStream Example...
2025-06-18 23:52:27,161 INFO - ORDERS_STREAM | faststream.orders.created | - `HandleOrderCreated` waiting for messages
2025-06-18 23:52:27,165 INFO - ORDERS_STREAM | faststream.orders.processed | - `HandleOrderProcessed` waiting for messages
2025-06-18 23:52:27,167 INFO - ORDERS_STREAM | faststream.orders.cancelled | - `HandleOrderCancelled` waiting for messages
Published order created: Order #1 - Product: Laptop, Price: $999
2025-06-18 23:52:27,169 INFO - ORDERS_STREAM | faststream.orders.created | 783aeafe-2 - Received
[ORDER CREATED] Order #1 - Product: Laptop, Price: $999
2025-06-18 23:52:27,172 INFO - ORDERS_STREAM | faststream.orders.created | 783aeafe-2 - Processed
Published order created: Order #2 - Product: Laptop, Price: $999
2025-06-18 23:52:27,674 INFO - ORDERS_STREAM | faststream.orders.created | 51d18270-b - Received
[ORDER CREATED] Order #2 - Product: Laptop, Price: $999
2025-06-18 23:52:27,675 INFO - ORDERS_STREAM | faststream.orders.created | 51d18270-b - Processed
Published order created: Order #3 - Product: Laptop, Price: $999
2025-06-18 23:52:28,174 INFO - ORDERS_STREAM | faststream.orders.created | 8b1b1385-1 - Received
[ORDER CREATED] Order #3 - Product: Laptop, Price: $999
2025-06-18 23:52:28,175 INFO - ORDERS_STREAM | faststream.orders.created | 8b1b1385-1 - Processed
Published order created: Order #4 - Product: Laptop, Price: $999
2025-06-18 23:52:28,675 INFO - ORDERS_STREAM | faststream.orders.created | 582a46d1-7 - Received
[ORDER CREATED] Order #4 - Product: Laptop, Price: $999
2025-06-18 23:52:28,676 INFO - ORDERS_STREAM | faststream.orders.created | 582a46d1-7 - Processed
Published order created: Order #5 - Product: Laptop, Price: $999
2025-06-18 23:52:29,178 INFO - ORDERS_STREAM | faststream.orders.created | 5866c8fb-3 - Received
[ORDER CREATED] Order #5 - Product: Laptop, Price: $999
2025-06-18 23:52:29,179 INFO - ORDERS_STREAM | faststream.orders.created | 5866c8fb-3 - Processed
Published order processed: Order #1 - Status: Processing Complete
2025-06-18 23:52:29,680 INFO - ORDERS_STREAM | faststream.orders.processed | 81690ca2-0 - Received
[ORDER PROCESSED] Order #1 - Status: Processing Complete
2025-06-18 23:52:29,682 INFO - ORDERS_STREAM | faststream.orders.processed | 81690ca2-0 - Processed
Published order processed: Order #2 - Status: Processing Complete
2025-06-18 23:52:30,181 INFO - ORDERS_STREAM | faststream.orders.processed | db979481-8 - Received
[ORDER PROCESSED] Order #2 - Status: Processing Complete
2025-06-18 23:52:30,183 INFO - ORDERS_STREAM | faststream.orders.processed | db979481-8 - Processed
Published order processed: Order #3 - Status: Processing Complete
2025-06-18 23:52:30,683 INFO - ORDERS_STREAM | faststream.orders.processed | c6bbd139-e - Received
[ORDER PROCESSED] Order #3 - Status: Processing Complete
2025-06-18 23:52:30,684 INFO - ORDERS_STREAM | faststream.orders.processed | c6bbd139-e - Processed
Published order cancelled: Order #4 - Status: Cancelled by customer
Waiting for messages to be processed...
2025-06-18 23:52:31,184 INFO - ORDERS_STREAM | faststream.orders.cancelled | 4a7c37a3-6 - Received
[ORDER CANCELLED] Order #4 - Status: Cancelled by customer
2025-06-18 23:52:31,186 INFO - ORDERS_STREAM | faststream.orders.cancelled | 4a7c37a3-6 - Processed
JetStream Example completed!
반응형