<#if settings.post_mathjax!false>

下载加密货币k线数据Python代码合集

admin
1
2025-11-02

从OKX下载K线数据

支持断点续传,时区转换

import ccxt  
import pandas as pd  
import time  
import os  
from datetime import datetime, timezone  
  
import pytz  
  
  
def fetch_ohlcv_with_proxy_and_save(symbol='ETH/USDT', timeframe='1m', limit=300, from_='2017-01-01 00:00:00', to_='', timezone_='', batch_save_size=10000, output_file='', proxy=''):  
    # 默认时区是utc+0,即GMT
    if not output_file:  
        output_file = f"{symbol.replace('/', '_').replace(':', '_')}_{timeframe}.csv"  
    # 初始化交易所(带代理)  
    exchange = ccxt.okx({  
        'enableRateLimit': True,  
        'options': {'defaultType': 'swap'} if symbol.endswith(':USDT') else {},  
        'proxies': {  
            'http': proxy,  
            'https': proxy,  
        }  
    })  
  
    # 初始化时间范围  
    if timezone_ != '':  
        tz = pytz.timezone(timezone_)  
        since = int(tz.localize(datetime.strptime(from_, '%Y-%m-%d %H:%M:%S')).timestamp() * 1000)  
    else:  
        since = int(datetime.strptime(from_, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp() * 1000)  
    end = int(datetime.now(timezone.utc).timestamp() * 1000 if to_ == '' else datetime.strptime(to_, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp() * 1000)  
  
    # 如果文件存在,从最后一条K线时间继续(断点续传)  
    if os.path.exists(output_file):  
        existing = pd.read_csv(output_file)  
        if len(existing) > 0:  
            last_time = int(pd.to_datetime(existing['datetime'].iloc[-1]).timestamp() * 1000) + 1  
            print(f"检测到已有数据,从 {existing['datetime'].iloc[-1]} 继续下载...")  
        else:  
            last_time = since  
    else:  
        last_time = since  
  
    all_data = []  
    total_count = 0  
  
    print(f"开始下载 {symbol}{datetime.fromtimestamp(last_time/1000, tz=timezone.utc)} 至今 的 {timeframe} 数据...")  
  
    while last_time < end:  
        try:  
            ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since=last_time, limit=limit)  
            # print(ohlcv)  
            if not ohlcv:  
                break  
  
            all_data.extend(ohlcv)  
            last_time = ohlcv[-1][0] + 1  
            total_count += len(ohlcv)  
  
            ts = datetime.fromtimestamp(ohlcv[-1][0] / 1000, tz=timezone.utc)  
            print(f"获取到: {ts},累计 {total_count} 条")  
  
            # 每到一定数量就保存一次  
            if len(all_data) >= batch_save_size:  
                save_to_csv(all_data, output_file, timezone_)  
                print(f"💾 已保存 {len(all_data)} 条到 {output_file}")  
                all_data = []  # 清空缓存,继续下载  
  
            time.sleep(exchange.rateLimit / 1000)  
  
        except Exception as e:  
            print(f"⚠️ 出错: {e},5秒后重试...")  
            time.sleep(5)  
  
    # 保存剩余未落盘数据  
    if all_data:  
        save_to_csv(all_data, output_file, timezone_)  
        print(f"💾 最后保存 {len(all_data)} 条到 {output_file}")  
  
    print(f"✅ 下载完成,总计 {total_count} 条数据。文件:{output_file}")  
  
  
def save_to_csv(data, filename, timezone_=''):  
    """将数据追加写入 CSV 文件"""  
    df = pd.DataFrame(data, columns=['timestamp','open','high','low','close','volume'])  
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)  
    if timezone_ != '':  
        # df['datetime'] = df['datetime'].dt.tz_convert('Asia/Shanghai')  # 转换为 UTC+8        df['datetime'] = df['datetime'].dt.tz_convert(timezone_).dt.tz_localize(None)   # 去掉时区信息(如果你希望 CSV 里时间没有“+08:00”尾巴)  
    # 调整列顺序  
    df = df[['datetime','open','high','low','close','volume']]  
  
    # 如果文件存在就追加,否则新建  
    header = not os.path.exists(filename)  
    df.to_csv(filename, mode='a', header=header, index=False)  
  
  
if __name__ == "__main__":  
    # 如果是现货,symbol='ETH/USDT'  
    # 如果是合约,symbol='ETH/USDT:USDT'  
    fetch_ohlcv_with_proxy_and_save(symbol='ETH/USDT:USDT', from_="2025-11-01 00:00:00", timezone_='Asia/Shanghai', output_file=r'1.csv', proxy='http://127.0.0.1:7897')

将k线数据csv文件按年月分割

import pandas as pd  
from pathlib import Path  
  
# 1. 读取 CSV 文件  
df = pd.read_csv("downloaded/ETH_USDT_1m_1.csv", parse_dates=["datetime"])  
  
# 2. 提取年月  
df["year_month"] = df["datetime"].dt.strftime("%Y-%m")  
  
# 3. 按月份分组  
output_dir = Path("ETH_USDT_1m")  
output_dir.mkdir(exist_ok=True)  
  
for month, group in df.groupby("year_month"):  
    # 去掉辅助列  
    group = group.drop(columns=["year_month"])  
    # 保存为单独的CSV  
    file_path = output_dir / f"{month}.csv"  
    group.to_csv(file_path, index=False)  
    print(f"已保存: {file_path}")
动物装饰