币安K线数据获取与存储系统

项目概述

本项目是一个高性能的加密货币K线数据获取和存储系统,专门设计用于从币安期货市场获取永续合约交易对的历史K线数据。系统采用多线程设计,实现了API请求的负载均衡、速率限制、故障转移和自动重试等机制,并使用SQLite数据库进行高效存储和查询。

!系统架构图

核心功能

  • 数据获取:从币安API获取永续合约K线数据

  • 数据存储:使用SQLite数据库存储K线数据

  • 内存缓存:使用内存缓存最近的数据,提高查询效率

  • 负载均衡:支持多个API端点轮询和故障转移

  • 速率限制:遵守币安API的请求频率限制

  • 高性能查询:优化的数据库结构和缓存机制

  • 多线程下载:并行下载多个交易对数据

系统架构

模块组成

  1. BinanceKlineFetcher:负责与币安API交互

  2. KlineStorage:管理数据库存储和缓存

  3. RateLimiter:控制API请求速率

  4. 负载均衡与故障转移:管理多个API端点

数据流程图

技术细节

负载均衡与故障转移

系统使用了多种负载均衡策略:

  1. 轮询(Round Robin):通过get_next_client()方法循环使用多个API端点

  2. 故障转移:当一个端点失败时,通过get_healthy_client()自动切换到健康的端点

  3. 端点健康检查:定期检查失败的端点是否恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
httplib::Client& get_healthy_client() {
std::lock_guard<std::mutex> lock(endpoint_mutex);

// 优先使用正常的端点
for (size_t i = 0; i < client_pool.size(); i++) {
size_t index = (current_client_index.fetch_add(1)) % client_pool.size();
if (endpoint_status[api_endpoints[index]]) {
return client_pool[index];
}
}

// 所有端点都有问题,重置状态并使用第一个
for (auto& pair : endpoint_status) {
pair.second = true;
}

return client_pool[0];
}

速率限制

为遵守币安API的请求限制,系统实现了RateLimiter类:

  • K线API限制:1200次/分钟

  • 信息API限制:100次/10秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void wait_if_needed() {
std::lock_guard<std::mutex> lock(mtx);
auto now = std::chrono::steady_clock::now();

// 清理过期的时间戳
while (!request_timestamps.empty() &&
(now - request_timestamps.front() > timeframe)) {
request_timestamps.pop();
}

// 如果达到限制,等待直到可以发送
if (request_timestamps.size() >= max_requests) {
auto sleep_time = timeframe - (now - request_timestamps.front());
std::this_thread::sleep_for(sleep_time);
}

// 记录本次请求
request_timestamps.push(std::chrono::steady_clock::now());
}

内存缓存系统

为提高数据读取性能,系统使用内存缓存最近的K线数据:

  • 每个交易对可缓存最近1000条K线数据

  • 查询时优先从缓存读取

  • 缓存未命中时从数据库读取

  • 插入新数据时自动更新缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
std::vector<Kline> get_klines(const std::string& symbol, 
long start_time = 0,
long end_time = 0) {
{
std::lock_guard<std::mutex> lock(cache_mutex);
auto it = kline_cache.find(symbol);

if (it != kline_cache.end()) {
// 缓存命中,检查时间范围
if (start_time == 0 && end_time == 0) {
// 请求全部数据,检查是否可以完全从缓存提供
if (!it->second.empty()) {
return it->second;
}
} else {
// 需要过滤时间范围
std::vector<Kline> filtered;
for (const auto& k : it->second) {
if ((start_time == 0 || k.open_time >= start_time) &&
(end_time == 0 || k.open_time <= end_time)) {
filtered.push_back(k);
}
}

// 如果找到符合条件的所有数据,直接返回
if (!filtered.empty()) {
return filtered;
}
}
}
}

// 缓存未命中,从数据库读取
// ...数据库查询代码...
}

自动重试机制

系统实现了自动重试机制,包括指数退避策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename Func>
auto make_balanced_request(Func api_call, int retries = 3) -> decltype(api_call(std::declval<httplib::Client&>())) {
for (int attempt = 0; attempt <= retries; attempt++) {
try {
// 获取健康的客户端
auto& client = get_healthy_client();
std::string current_endpoint = api_endpoints[&client - &client_pool[0]];

// 执行API调用
auto result = api_call(client);

// 成功则返回
return result;
} catch (const std::exception& e) {
if (attempt == retries) throw;

// 记录失败并尝试下一个端点
std::string failed_endpoint = api_endpoints[current_client_index.load() % client_pool.size()];
handle_request_failure(failed_endpoint);

// 指数退避
std::this_thread::sleep_for(std::chrono::milliseconds((1 << attempt) * 500));
}
}

throw std::runtime_error("所有重试尝试均失败");
}

数据库结构

系统使用SQLite数据库存储K线数据,表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE klines (
symbol TEXT NOT NULL,
open_time INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
close_time INTEGER NOT NULL,
quote_asset_volume REAL NOT NULL,
num_trades INTEGER NOT NULL,
taker_buy_volume REAL NOT NULL,
taker_buy_quote_volume REAL NOT NULL,
PRIMARY KEY (symbol, open_time)
);

CREATE INDEX idx_klines_symbol_time ON klines(symbol, open_time);

使用方法

初始化和运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int main() {
try {
KlineStorage storage;

// 设置缓存大小并预加载缓存
storage.set_cache_size(1000); // 每个标的最近1000条K线
std::cout << "预加载K线数据缓存..." << std::endl;
storage.preload_all_caches();

std::cout << "开始下载数据..." << std::endl;
download_all_symbols();

std::cout << "\n开始测试读取速度..." << std::endl;
test_read_speed();

} catch (const std::exception& e) {
std::cerr << "错误: " << e.what() << std::endl;
return 1;
}

return 0;
}

性能优化

系统采用了多种性能优化策略:

  1. 多线程下载:并行下载多个交易对数据

  2. 内存缓存:减少数据库IO操作

  3. 数据库索引:优化查询性能

  4. 批量事务:使用事务批量插入数据

  5. 连接复用:使用Keep-Alive复用HTTP连接

常见问题

SSL证书验证

如果遇到SSL证书验证问题,可以修改代码来跳过验证:

1
2
3
4
5
try {
cli.enable_server_certificate_verification(true);
} catch (...) {
std::cerr << "警告: SSL证书验证不受支持" << std::endl;
}

代理配置

如果需要通过代理访问币安API,请修改以下代码:

1
cli.set_proxy("127.0.0.1", 7890);

项目依赖

  • C++17或更高版本

  • SQLite3:数据库

  • nlohmann/json:JSON解析

  • cpp-httplib:HTTP客户端


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!