분석과 주문의 역할을 분리하기 위해 메시지 큐(Message Queue) 기반의 아키텍처를 사용합니다.
1. 메시지 큐(Message Queue)란?
- 메시지 큐는 주문서 레일과 같습니다.
- 분석 프로그램 : 시장 분석 중 매수 신호를 발견하면 이 신호를 주문서(메시지)에 적어 주문서 레일 (Active MQ)에 올린 후 분석을 지속합니다.
- 주문 프로그램 : 주문서 레일에서 대기하다가 주문서가 올라오면 증권사 API에게 전달합니다.
- 장점
- 비동기 처리 (Asynchronous)
- 예시: 온라인 쇼핑몰에서 '주문하기' 버튼을 누르면, "주문 접수 완료!" 메시지가 바로 뜹니다. 실제 재고 처리, 카드 승인, 배송 준비 등 시간이 걸리는 작업들은 메시지 큐에 차곡차곡 쌓여 백그라운드에서 순차적으로 처리됩니다.
- 결합도 감소 (Decoupling)
- 예시: 주문을 처리하는 '소비자' 프로그램을 더 빠른 신기술로 업그레이드해도, 주문을 생성하는 '생산자' 프로그램 코드는 단 한 줄도 수정할 필요가 없습니다.
- 확장성 (Scalability)
- 예시: 블랙 프라이데이 세일 기간에 주문이 폭주하면, '홀 매니저(소비자)'를 1명에서 10명으로 늘려 주문서를 동시에 처리하게 할 수 있습니다.
- 안정성 및 데이터 보존 (Reliability)
- 소비자 프로그램에 갑자기 문제가 생겨 멈추더라도, 처리되지 않은 메시지들은 큐 안에 안전하게 보관됩니다. 프로그램이 복구된 후, 큐에 남아있던 메시지부터 다시 처리를 시작할 수 있어 데이터가 유실될 위험이 적습니다.
- 만약 주문이 갑자기 폭주한다면, '소비자' 프로그램의 개수만 늘려서 큐에 쌓인 메시지를 더 빠르게 함께 처리하면 됩니다. 반대로 일이 없으면 소비자 수를 줄여 자원을 효율적으로 사용할 수 있습니다.
- 생산자와 소비자는 서로의 존재를 전혀 알 필요가 없습니다. 오직 '메시지 큐'의 주소만 알면 됩니다. 이 덕분에 한쪽 시스템을 수정하거나 교체해도 다른 쪽에 전혀 영향을 주지 않습니다.
- 생산자는 소비자가 메시지를 처리할 때까지 기다릴 필요가 없습니다. 메시지를 큐에 던져놓고 바로 자신의 다음 일을 할 수 있습니다. 이 덕분에 전체 시스템의 응답 속도가 빨라집니다.
- 비동기 처리 (Asynchronous)
2. ActiveMQ와 stomp.py: 서버와 통역사
- ActiveMQ 위에서 설명한 '주문서 레일'의 역할을 하는 실제 서버 프로그램 입니다.
- stomp.py: Python이 ActiveMQ 서버와 대화할 수 있도록 도와주는 라이브러리 입니다.
ActiveMQ는 'STOMP'라는 프로토콜로 대화하는데, stomp.py는 Python 코드를 이 STOMP 언어로 번역해 ActiveMQ에 전달하고, 반대 역할도 수행합니다.
3. 간단 실습
mq_sender.py (메시지 생산자):
import stomp
import time
# 1. ActiveMQ 서버에 연결 ('localhost'의 61613번 포트)
conn = stomp.Connection([('localhost', 61613)])
conn.connect('admin', 'admin', wait=True)
# 2. 'trading_signals'라는 이름의 레일에 메시지를 보냄
conn.send(body='AAPL 매수 신호 발생!', destination='/queue/trading_signals')
print("메시지를 성공적으로 보냈습니다.")
# 3. 연결 종료
conn.disconnect()
mq_receiver.py (메시지 소비자)
import stomp
import time
# '리스너'는 새로운 메시지가 도착했을 때 실행할 행동을 정의합니다.
class MyListener(stomp.ConnectionListener):
def on_message(self, frame):
print(f'메시지 수신: {frame.body}')
# 1. ActiveMQ 서버에 연결
conn = stomp.Connection([('localhost', 61613)])
conn.set_listener('', MyListener())
conn.connect('admin', 'admin', wait=True)
# 2. 'trading_signals'라는 이름의 레일을 구독(지켜보기 시작)
conn.subscribe(destination='/queue/trading_signals', id=1, ack='auto')
print("메시지를 기다리는 중입니다...")
# 3. 10초 동안 메시지를 기다리다가 종료
time.sleep(10)
conn.disconnect()
4. 방향성
- 분석 로직 고도화: mq_sender.py는 앞으로 RSI, MACD 등 다양한 보조지표를 계산하여, 복잡한 조건에 따라 '매수/매도' 신호를 생성하고, 그 신호를 메시지로 보내는 역할을 맡게 됩니다.
- 주문 실행기 개발: mq_receiver.py는 실제 증권사 API와 연동되어, 메시지를 받는 즉시 지정된 수량만큼 자동으로 주문을 넣는 역할을 수행할 것입니다.
- 분산 환경 구축: 다음 단계로, 이 두 프로그램을 서로 다른 컴퓨터(또-는 가상머신)에 배포하여, 실제 네트워크 환경에서도 안정적으로 메시지를 주고받는지 테스트하며 시스템을 확장해나갈 계획입니다.
반응형