尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

印度尼西亚股票数据API对接实现

印度尼西亚股票数据API对接实现
📅 发布时间:2026/6/22 5:49:45

环境准备

首先安装必要的依赖包:

pip install requests websocket-client pandas numpy

基础配置

import requests
import json
import websocket
import threading
import time
from datetime import datetime# API配置
API_KEY = "YOUR_API_KEY"  # 替换为您的实际API密钥
BASE_URL = "https://api.stocktv.top"
WS_URL = "wss://ws-api.stocktv.top/connect"# 印尼股票代码映射(示例)
IDX_SYMBOLS = {"BBCA": "Bank Central Asia","BBRI": "Bank Rakyat Indonesia","TLKM": "Telkom Indonesia","ASII": "Astra International","UNVR": "Unilever Indonesia"
}

REST API实现

1. 获取印尼股票列表

def get_indonesia_stocks(page=1, page_size=100):"""获取印尼交易所股票列表"""url = f"{BASE_URL}/stock/stocks"params = {"countryId": 42,      # 印尼国家ID"exchangeId": 62,     # IDX交易所ID"pageSize": page_size,"page": page,"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]["records"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching stock list: {str(e)}")return []# 示例:获取第一页股票列表
stocks = get_indonesia_stocks()
for stock in stocks:print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")

2. 查询特定股票详情

def get_stock_detail(symbol_or_id):"""获取股票详细信息"""url = f"{BASE_URL}/stock/queryStocks"# 判断是symbol还是idif isinstance(symbol_or_id, str) and symbol_or_id.isdigit():params = {"id": symbol_or_id, "key": API_KEY}else:params = {"symbol": symbol_or_id, "key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200 and data["data"]:return data["data"][0]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching stock detail: {str(e)}")return None# 示例:获取BBCA股票详情
bbca_detail = get_stock_detail("BBCA")
if bbca_detail:print(f"BBCA当前价格: {bbca_detail['last']}")print(f"涨跌幅: {bbca_detail['chgPct']}%")

3. 获取指数数据

def get_indonesia_indices():"""获取印尼主要指数"""url = f"{BASE_URL}/stock/indices"params = {"countryId": 42,  # 印尼国家ID"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching indices: {str(e)}")return []# 示例:获取印尼指数
indices = get_indonesia_indices()
for index in indices:print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")

4. 获取K线数据

def get_kline_data(pid, interval="PT15M"):"""获取股票K线数据"""url = f"{BASE_URL}/stock/kline"params = {"pid": pid,"interval": interval,"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching kline data: {str(e)}")return []# 示例:获取BBCA的15分钟K线数据
bbca_kline = get_kline_data(41602, "PT15M")
for kline in bbca_kline[:5]:  # 显示前5条dt = datetime.fromtimestamp(kline["time"] / 1000)print(f"{dt}: O:{kline['open']} H:{kline['high']} L:{kline['low']} C:{kline['close']}")

5. 获取涨跌排行榜

def get_top_gainers():"""获取涨幅榜"""url = f"{BASE_URL}/stock/updownList"params = {"countryId": 42,  # 印尼国家ID"type": 1,         # 1=涨幅榜, 2=跌幅榜"key": API_KEY}try:response = requests.get(url, params=params)if response.status_code == 200:data = response.json()if data.get("code") == 200:return data["data"]else:print(f"API Error: {data.get('message')}")else:print(f"Request failed with status: {response.status_code}")except Exception as e:print(f"Error fetching top gainers: {str(e)}")return []# 示例:获取印尼涨幅榜
gainers = get_top_gainers()
for stock in gainers[:10]:  # 显示前10名print(f"{stock['symbol']}: {stock['last']} (+{stock['chgPct']}%)")

WebSocket实时数据

WebSocket客户端实现

class IDXWebSocketClient:def __init__(self, api_key):self.api_key = api_keyself.ws = Noneself.connected = Falseself.subscriptions = set()# 启动连接self.connect()# 启动心跳线程threading.Thread(target=self.heartbeat, daemon=True).start()def connect(self):"""连接WebSocket服务器"""ws_url = f"{WS_URL}?key={self.api_key}"self.ws = websocket.WebSocketApp(ws_url,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)# 启动WebSocket线程threading.Thread(target=self.ws.run_forever).start()time.sleep(1)  # 等待连接建立def on_open(self, ws):"""连接建立回调"""print("已连接到印尼股票实时数据服务")self.connected = True# 重新订阅之前订阅的股票if self.subscriptions:self.subscribe(list(self.subscriptions))def on_message(self, ws, message):"""接收消息回调"""try:data = json.loads(message)# 处理实时行情数据if "pid" in data:symbol = data.get("symbol", "Unknown")price = data.get("last_numeric", 0)change = data.get("pc", 0)change_pct = data.get("pcp", 0)print(f"实时行情 [{symbol}]: {price} ({change} / {change_pct}%)")# 处理心跳响应elif data.get("action") == "pong":passexcept Exception as e:print(f"处理实时数据时出错: {str(e)}")def on_error(self, ws, error):"""错误处理回调"""print(f"WebSocket错误: {str(error)}")def on_close(self, ws, close_status_code, close_msg):"""连接关闭回调"""print("WebSocket连接已关闭")self.connected = False# 尝试重新连接print("尝试重新连接...")time.sleep(3)self.connect()def subscribe(self, pids):"""订阅股票"""if not self.connected:print("未连接,无法订阅")return False# 添加到订阅列表self.subscriptions.update(pids)# 构造订阅消息message = json.dumps({"action": "subscribe","pids": list(pids)})# 发送订阅请求self.ws.send(message)print(f"已订阅: {', '.join(map(str, pids))}")return Truedef unsubscribe(self, pids):"""取消订阅股票"""if not self.connected:print("未连接,无法取消订阅")return False# 从订阅列表中移除for pid in pids:self.subscriptions.discard(pid)# 构造取消订阅消息message = json.dumps({"action": "unsubscribe","pids": list(pids)})# 发送取消订阅请求self.ws.send(message)print(f"已取消订阅: {', '.join(map(str, pids))}")return Truedef heartbeat(self):"""心跳维护"""while True:if self.connected:try:# 每30秒发送一次心跳self.ws.send(json.dumps({"action": "ping"}))except Exception as e:print(f"发送心跳失败: {str(e)}")time.sleep(30)# 使用示例
if __name__ == "__main__":# 创建WebSocket客户端ws_client = IDXWebSocketClient(API_KEY)# 订阅股票(需要先获取股票ID)time.sleep(2)  # 等待连接建立ws_client.subscribe([41602, 41605])  # 订阅BBCA和BRIS# 保持主线程运行try:while True:time.sleep(1)except KeyboardInterrupt:print("程序已终止")

高级功能实现

1. 数据缓存策略

from cachetools import TTLCacheclass IDXDataCache:def __init__(self, maxsize=100, ttl=60):"""初始化数据缓存"""self.cache = TTLCache(maxsize=maxsize, ttl=ttl)def get_stock_data(self, symbol_or_id):"""获取股票数据(带缓存)"""# 检查缓存if symbol_or_id in self.cache:return self.cache[symbol_or_id]# 从API获取data = get_stock_detail(symbol_or_id)# 更新缓存if data:self.cache[symbol_or_id] = datareturn data# 使用示例
cache = IDXDataCache()
bbca_data = cache.get_stock_data("BBCA")

2. 实时数据处理器

class RealTimeDataProcessor:def __init__(self):self.data_buffer = {}self.batch_size = 10self.last_process_time = time.time()def add_data(self, symbol, data):"""添加实时数据到缓冲区"""if symbol not in self.data_buffer:self.data_buffer[symbol] = []self.data_buffer[symbol].append(data)# 检查是否达到批处理条件current_time = time.time()if (len(self.data_buffer[symbol]) >= self.batch_size or current_time - self.last_process_time >= 1.0):self.process_data(symbol)self.last_process_time = current_timedef process_data(self, symbol):"""处理缓冲区的数据"""if symbol not in self.data_buffer or not self.data_buffer[symbol]:returndata_points = self.data_buffer[symbol]# 计算统计指标prices = [d["last_numeric"] for d in data_points]volumes = [d.get("turnover_numeric", 0) for d in data_points]avg_price = sum(prices) / len(prices)max_price = max(prices)min_price = min(prices)total_volume = sum(volumes)print(f"\n{symbol} 实时数据统计 (最近 {len(data_points)} 个更新):")print(f"平均价格: {avg_price:.2f}, 最高: {max_price:.2f}, 最低: {min_price:.2f}")print(f"总成交量: {total_volume}")# 清空缓冲区self.data_buffer[symbol] = []# 在WebSocket客户端的on_message方法中使用
processor = RealTimeDataProcessor()def on_message(self, ws, message):try:data = json.loads(message)if "pid" in data:symbol = data.get("symbol", "Unknown")processor.add_data(symbol, data)except Exception as e:print(f"处理实时数据时出错: {str(e)}")

3. 错误处理与重试机制

def api_request_with_retry(url, params, max_retries=3):"""带重试机制的API请求"""for attempt in range(max_retries):try:response = requests.get(url, params=params, timeout=10)if response.status_code == 200:data = response.json()if data.get("code") == 200:return dataelif data.get("code") == 429:  # 请求过多retry_after = int(data.get("retryAfter", 30))print(f"请求过于频繁,等待 {retry_after} 秒后重试...")time.sleep(retry_after)else:print(f"API返回错误: {data.get('message')}")else:print(f"请求失败,状态码: {response.status_code}")except Exception as e:print(f"请求异常: {str(e)}")# 指数退避等待wait_time = 2 ** attemptprint(f"等待 {wait_time} 秒后重试 (尝试 {attempt+1}/{max_retries})")time.sleep(wait_time)print(f"请求失败,已达最大重试次数 {max_retries}")return None

完整示例应用

class IDXStockMonitor:def __init__(self, api_key):self.api_key = api_keyself.ws_client = Noneself.data_cache = IDXDataCache()self.monitored_stocks = set()def start_monitoring(self, symbols):"""开始监控指定股票"""print("开始监控印尼股票...")# 获取股票IDstock_ids = []for symbol in symbols:stock_data = self.data_cache.get_stock_data(symbol)if stock_data:stock_ids.append(stock_data["id"])self.monitored_stocks.add(symbol)print(f"已添加监控: {symbol} (ID: {stock_data['id']})")else:print(f"无法获取股票信息: {symbol}")# 启动WebSocket连接if stock_ids:self.ws_client = IDXWebSocketClient(self.api_key)time.sleep(2)  # 等待连接建立self.ws_client.subscribe(stock_ids)# 启动定期数据更新self.start_periodic_updates()def start_periodic_updates(self):"""启动定期数据更新"""def update_loop():while True:# 每5分钟更新一次指数数据self.update_indices()# 每10分钟更新一次股票列表if len(self.monitored_stocks) < 10:  # 只更新少量股票self.update_stock_list()time.sleep(300)  # 5分钟threading.Thread(target=update_loop, daemon=True).start()def update_indices(self):"""更新指数数据"""print("\n更新印尼指数数据...")indices = get_indonesia_indices()for index in indices:print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")def update_stock_list(self):"""更新股票列表"""print("\n更新印尼股票列表...")stocks = get_indonesia_stocks(page_size=50)for stock in stocks[:10]:  # 只显示前10只print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")def run(self):"""运行监控"""try:# 监控主要印尼股票symbols_to_monitor = ["BBCA", "BBRI", "TLKM", "ASII", "UNVR"]self.start_monitoring(symbols_to_monitor)# 保持主线程运行while True:time.sleep(1)except KeyboardInterrupt:print("\n监控已停止")except Exception as e:print(f"监控出错: {str(e)}")# 启动监控
if __name__ == "__main__":monitor = IDXStockMonitor(API_KEY)monitor.run()

部署建议

  1. API密钥管理:不要将API密钥硬编码在代码中,使用环境变量或配置文件
  2. 错误处理:增加更完善的错误处理和日志记录
  3. 速率限制:遵守API的速率限制,避免频繁请求
  4. 数据存储:考虑将重要数据存储到数据库中以供后续分析
  5. 监控告警:设置价格波动告警机制
# 从环境变量获取API密钥
import os
API_KEY = os.getenv("STOCKTV_API_KEY", "YOUR_API_KEY")

以上是一个完整的印尼股票数据API对接实现方案。您可以根据实际需求进行调整和扩展。如果您需要更多特定功能或有任何问题,请随时告诉我。

相关新闻

  • 一天一款实用的AI工具,第1期,AI标题生成工具
  • 详细介绍:智慧校园统一身份认证中心:一个账号畅行校园内外
  • LlamaIndex 项目深度技术分析 - 详解

最新新闻

  • 最新深圳法律业务律师推荐指南2026:深圳离婚律师离婚财产分割股权分割抚养权纠纷起诉离婚流程 - 逻辑孤岛
  • WeChatMsg终极指南:数字记忆重构与对话资产化完整方案
  • 2026/4/2课程博客 软件测试复习:选择题考点(测试工具+等价类划分)
  • 零基础学AI人工智能:9.4 聚类算法
  • Let‘s Encrypt介绍(免费、自动化、开放的SSL/TLS证书颁发机构CA,Certificate Authority)cert-manager
  • Video2X深度解析:如何通过C++架构重构实现高性能视频AI处理

日新闻

  • 2026速览惠州叛逆青少年学校前十大排名名单出炉 - 武汉中职最新信息发布
  • 2026上饶白蚁消杀哪家好?15年本土2大权威白蚁防治公司推荐(金盾虫控/青蚁卫士) - 我叫一
  • 天龙八部单机版终极数据管理工具:5个技巧快速掌握游戏数据编辑

周新闻

  • Visual C++运行库修复终极指南:5分钟快速解决Windows软件启动错误
  • 手把手教你构建统计局地区经济数据爬虫:从环境搭建到数据持久化全指南
  • 2026多Agent深度解析:用AI团队替代单一模型,四种架构实战落地

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号