ETF监控策略(写在2025除夕夜)

全面拥抱AI,这个除夕夜有点不一样!

近期Deepseek受到国内外广泛关注,本文的代码贡献来源于Deepseek,2025,全面拥抱AI。

策略

代码本身并不重要,你需要的是把自己的想法告诉ai,并让他来为你实现。
为什么会选择做etf而不是个股?
1.个股风险太大,而行业etf则不会死。
2.对于资金比较少的玩家,交易费用是一笔钱,交易一次最低5块钱,而且还有印花税等其他,而etf则是买多少,给多少费用,部分头部券商的费用可以很低,适用于频繁交易。
下面开始讲解我的策略:
结合RSI和布林线进行做多(A股不会做空,暂时不用),对于rsi在20以下(默认是30,但是我的选择是更加保守,调整为20),可以视为超卖,这时候再结合布林线进行观察,如果触碰到布林线下轨,那么我们可以进行买入,我主要是用RSI6这个指标,是一个短期指标,所以上升后需要尽快卖出,不能太贪。需要注意的风险是,如果遇到单边一直下跌,这个策略将会失效,同样,如果进行单边上涨,那么我们一直会找不到买入机会。
最后再利用树莓派进行每天跑程序,触发条件自动发送邮件。

代码

创建文件etf.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import time
import pandas as pd
import akshare as ak
from datetime import datetime, timedelta
import smtplib
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr
from concurrent.futures import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from collections import defaultdict

# ===================== 配置区域 =====================
RSI_PERIOD = 6 # RSI计算周期
RSI_OVERBUY = 80 # 超买阈值
RSI_OVERSOLD = 20 # 超卖阈值
CHECK_INTERVAL = 3 # 检查间隔(分钟)
MAX_WORKERS = 6 # 最大并发线程数
BOLLINGER_WINDOW = 20 # 布林带计算窗口
BOLLINGER_STD = 2 # 布林带标准差倍数

ETF_LIST = [
# 格式: (ETF名称, 代码)
# 消费类
("主要消费ETF", "159928"), # 深交所
("酒ETF", "512690"), # 上交所
("细分食品饮料ETF", "515710"), # 上交所

# 医药类
("医疗ETF", "512170"), # 上交所
("生物医药ETF", "512290"), # 上交所

# 科技类
("科技龙头ETF", "515000"), # 上交所
("半导体芯片ETF", "159995"), # 深交所
("人工智能ETF", "515980"), # 上交所
#("5G通信主题ETF", "515050"), # 上交所
("计算机主题ETF", "512720"), # 上交所

# 新能源类
("新能源汽车ETF", "515030"), # 上交所
("光伏产业ETF", "515790"), # 上交所
("碳中和ETF", "159790"), # 深交所

# 周期类
("有色金属ETF", "512400"), # 上交所
("煤炭ETF", "515220"), # 上交所
#("钢铁ETF", "515210"), # 上交所
#("化工ETF", "159870"), # 深交所

# 金融地产类
("银行ETF", "512800"), # 上交所
("证券ETF", "512880"), # 上交所
("房地产ETF", "512200"), # 上交所

# 宽基指数类
("沪深300ETF", "510300"), # 上交所
("创业板50ETF", "159949"), # 深交所

# 其他主题类
("军工ETF", "512660"), # 上交所
("黄金ETF", "518880"), # 上交所
("农业主题ETF", "159825"), # 深交所
#("家电ETF", "159996"), # 深交所
#("传媒ETF", "512980"), # 上交所
("畜牧养殖ETF", "159865"), # 深交所
("基建ETF", "516950"), # 上交所
#("现代物流ETF", "516910"), # 上交所
("稀土产业ETF", "516780"), # 上交所
("旅游主题ETF", "159766"), # 深交所
("电力ETF", "159611"), # 深交所
("红利ETF", "510880"), # 上交所
("纳斯达克ETF", "159632"),
("标普500ETF", "513650")

]

# 邮件配置
my_sender = 'xxx' # 发件人QQ邮箱账号
my_pass = 'xxx' # 发件SMTP授权码
my_user = 'xxx' # 收件人QQ邮箱账号,自己发自己就行
# ===================================================

def mail(content):
ret = True
try:
msg = MIMEMultipart()
msg['From'] = formataddr(["A股RSI监控", my_sender])
msg['To'] = formataddr(["A股RSI监控", my_user])
msg['Subject'] = "A股RSI监控"

msg.attach(MIMEText(content, 'plain', 'utf-8'))

server = smtplib.SMTP_SSL("smtp.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender, [my_user, ], msg.as_string())
server.quit()
except Exception:
ret = False
return ret

def get_full_symbol(code):
"""智能补全交易所代码"""
if code.startswith(('5', '6', '9')):
return f"sh{code}"
elif code.startswith(('0', '1', '2', '3')):
return f"sz{code}"
return code

def fetch_historical_data(etf_code):
"""获取历史数据"""
try:
df = ak.fund_etf_hist_em(symbol=etf_code, adjust="qfq")
if df.empty:
print(f"获取 {etf_code} 历史数据失败: 数据为空")
return None
df['日期'] = pd.to_datetime(df['日期'])
df = df.sort_values(by='日期', ascending=True)
return df
except Exception as e:
print(f"获取 {etf_code} 历史数据失败: {e}")
return None

def fetch_realtime_data(etf_code):
"""获取ETF实时数据"""
try:
df = ak.fund_etf_spot_em()
if not df.empty:
filtered_df = df[df['代码'] == etf_code]
if not filtered_df.empty:
return filtered_df.iloc[0]
except Exception as e:
print(f"获取 {etf_code} 实时数据失败: {e}")
return None

def calculate_rsi(data, period=14):
"""计算RSI"""
close_prices = data['收盘']
if len(close_prices) < period + 1:
return None
# 计算价格变化
deltas = close_prices.diff()
# 分离涨跌幅
gains = deltas.where(deltas > 0, 0)
losses = -deltas.where(deltas < 0, 0)
# 计算平均涨幅和跌幅(EMA方式)
avg_gain = gains.ewm(alpha=1 / period, adjust=False).mean().iloc[-1]
avg_loss = losses.ewm(alpha=1 / period, adjust=False).mean().iloc[-1]
# 计算RSI
if avg_loss == 0:
return 100.0 # 防止除零错误
rs = avg_gain / avg_loss
return 100.0 - (100.0 / (1 + rs))

def calculate_bollinger_bands(data, window=20, std=2):
"""计算布林带"""
data['Middle Band'] = data['收盘'].rolling(window=window).mean()
data['Std Dev'] = data['收盘'].rolling(window=window).std(ddof=0)
data['Upper Band'] = data['Middle Band'] + (data['Std Dev'] * std)
data['Lower Band'] = data['Middle Band'] - (data['Std Dev'] * std)
return data

class AlertManager:
def __init__(self):
self.alert_records = defaultdict(dict)

def should_alert(self, etf_code, alert_type):
record = self.alert_records.get(etf_code, {})
current_date = datetime.now().date()

if record.get("last_date") == current_date:
if alert_type == "overbuy" and record.get("overbuy"):
return False
if alert_type == "oversold" and record.get("oversold"):
return False
return True

def update_record(self, etf_code, alert_type):
current_date = datetime.now().date()
self.alert_records[etf_code] = {
"last_date": current_date,
"overbuy": alert_type == "overbuy",
"oversold": alert_type == "oversold"
}



alert_manager = AlertManager()

def monitor_etf(etf_name, etf_code):
"""执行单个ETF监控"""
# 获取历史数据
historical_data = fetch_historical_data(etf_code)
if historical_data is None or len(historical_data) < BOLLINGER_WINDOW:
return None

#测试的时候发现,如果在交易时间内,其实获取历史数据也会把最新的数据记录的,放在收盘那列,所以无需再把最新价加进去
latest_close = historical_data['收盘'].tail(1).iloc[0] # 最后一行的收盘价就是交易时间的最新价格

# 计算RSI6
current_rsi = calculate_rsi(historical_data, RSI_PERIOD)
if current_rsi is None:
return None

historical_data = calculate_bollinger_bands(historical_data, BOLLINGER_WINDOW, BOLLINGER_STD)
upper_band = historical_data['Upper Band'].iloc[-1]
lower_band = historical_data['Lower Band'].iloc[-1]
mid_band = historical_data['Middle Band'].iloc[-1]

status = f"{etf_name}({etf_code}) | 现价: {latest_close:.3f} | RSI{RSI_PERIOD}: {current_rsi:.3f} | 布林带上轨: {upper_band:.3f} | 布林带中轨: {mid_band:.3f} | 布林带下轨: {lower_band:.3f}"

alert_message = None
if current_rsi >= RSI_OVERBUY and latest_close >= upper_band:
if alert_manager.should_alert(etf_code, "overbuy"):
alert_message = f"🚨 建议卖出 {status}"
alert_manager.update_record(etf_code, "overbuy")
elif current_rsi <= RSI_OVERSOLD and latest_close <= lower_band:
if alert_manager.should_alert(etf_code, "oversold"):
alert_message = f"💡 建议买入 {status}"
alert_manager.update_record(etf_code, "oversold")

print(f"[{time.strftime('%H:%M')}] {status}")
return alert_message

def is_trading_time():
now = pd.Timestamp.now(tz='Asia/Shanghai')
if now.weekday() >= 5:
return False
return ((now.hour == 9 and now.minute >= 30) or
(10 <= now.hour <= 11) or
(13 <= now.hour <= 14) or
(now.hour == 15 and now.minute == 0))

def batch_monitor():
now = pd.Timestamp.now(tz='Asia/Shanghai')
if now.hour >= 15 and now.minute > 0:
print("交易时间已结束")
mail("hi,交易时间已结束,程序即将退出,今天你赚到钱了吗?")
os._exit(0) # 强制退出程序
if not is_trading_time():
return

print(f"\n=== 开始轮次监控 {now.strftime('%Y-%m-%d %H:%M')} ===")
alert_messages = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(monitor_etf, name, code) for name, code in ETF_LIST]
for future in futures:
try:
result = future.result()
if result:
alert_messages.append(result)
except Exception as e:
print(f"任务执行失败: {e}")
# 如果有警报信息,统一发送邮件
if alert_messages:
mail_content = "\n".join(alert_messages)
mail(mail_content)



if __name__ == "__main__":
print(f"启动ETF监控系统(共监控{len(ETF_LIST)}个品种)")
print("=" * 50)

mail("hi,ETF监控已启动,敬请关注今日推送")
scheduler = BlockingScheduler(timezone='Asia/Shanghai')

scheduler.add_job(
batch_monitor,
'interval',
minutes=CHECK_INTERVAL,
start_date="2024-01-01 09:15:00"
)


try:
scheduler.start()
except KeyboardInterrupt:
print("监控已手动停止")

下面是用企业微信提醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import time
import pandas as pd
import akshare as ak
from datetime import datetime, timedelta
import smtplib
import requests
import os
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from apscheduler.schedulers.blocking import BlockingScheduler

# ===================== 配置区域 =====================
RSI_PERIOD = 6 # RSI计算周期
RSI_OVERBUY = 80 # 超买阈值
RSI_OVERSOLD = 20 # 超卖阈值
CHECK_INTERVAL = 1 # 检查间隔(分钟)
MAX_WORKERS = 9 # 最大并发线程数
BOLLINGER_WINDOW = 20 # 布林带计算窗口
BOLLINGER_STD = 2 # 布林带标准差倍数

ETF_LIST = [
# 消费类
("主要消费ETF", "159928"), # 深交所
("酒ETF", "512690"), # 上交所
("食品饮料ETF", "515170") # 上交所

]
# ===================================================

#企业微信机器人配置
webhook_url = "urlxxxxx"

def mail(msg):
# 构造企业微信消息
data = {
"msgtype": "text",
"text": {
"content": msg
}
}

try:
# 发送请求到企业微信机器人
response = requests.post(webhook_url, json=data, timeout=5)
if response.status_code == 200:
print("消息发送成功!")
else:
print(f"消息发送失败,状态码:{response.status_code}")
except Exception as e:
print("发送消息时出错:", e)


def get_full_symbol(code):
"""智能补全交易所代码"""
if code.startswith(('5', '6', '9')):
return f"sh{code}"
elif code.startswith(('0', '1', '2', '3')):
return f"sz{code}"
return code

def fetch_historical_data(etf_code):
"""获取历史数据"""
try:
df = ak.fund_etf_hist_em(symbol=etf_code, adjust="qfq")
if df.empty:
print(f"获取 {etf_code} 历史数据失败: 数据为空")
return None
df['日期'] = pd.to_datetime(df['日期'])
df = df.sort_values(by='日期', ascending=True)
return df
except Exception as e:
print(f"获取 {etf_code} 历史数据失败: {e}")
return None

def fetch_realtime_data(etf_code):
"""获取ETF实时数据"""
try:
df = ak.fund_etf_spot_em()
if not df.empty:
filtered_df = df[df['代码'] == etf_code]
if not filtered_df.empty:
return filtered_df.iloc[0]
except Exception as e:
print(f"获取 {etf_code} 实时数据失败: {e}")
return None

def calculate_rsi(data, period=14):
"""计算RSI"""
close_prices = data['收盘']
if len(close_prices) < period + 1:
return None
# 计算价格变化
deltas = close_prices.diff()
# 分离涨跌幅
gains = deltas.where(deltas > 0, 0)
losses = -deltas.where(deltas < 0, 0)
# 计算平均涨幅和跌幅(EMA方式)
avg_gain = gains.ewm(alpha=1 / period, adjust=False).mean().iloc[-1]
avg_loss = losses.ewm(alpha=1 / period, adjust=False).mean().iloc[-1]
# 计算RSI
if avg_loss == 0:
return 100.0 # 防止除零错误
rs = avg_gain / avg_loss
return 100.0 - (100.0 / (1 + rs))

def calculate_bollinger_bands(data, window=20, std=2):
"""计算布林带"""
data['Middle Band'] = data['收盘'].rolling(window=window).mean()
data['Std Dev'] = data['收盘'].rolling(window=window).std(ddof=0)
data['Upper Band'] = data['Middle Band'] + (data['Std Dev'] * std)
data['Lower Band'] = data['Middle Band'] - (data['Std Dev'] * std)
return data

class AlertManager:
def __init__(self):
self.alert_records = defaultdict(dict)

def should_alert(self, etf_code, alert_type):
record = self.alert_records.get(etf_code, {})
current_date = datetime.now().date()

if record.get("last_date") == current_date:
if alert_type == "overbuy" and record.get("overbuy"):
return False
if alert_type == "oversold" and record.get("oversold"):
return False
return True

def update_record(self, etf_code, alert_type):
current_date = datetime.now().date()
self.alert_records[etf_code] = {
"last_date": current_date,
"overbuy": alert_type == "overbuy",
"oversold": alert_type == "oversold"
}



alert_manager = AlertManager()

def monitor_etf(etf_name, etf_code):
"""执行单个ETF监控"""
# 获取历史数据
historical_data = fetch_historical_data(etf_code)
if historical_data is None or len(historical_data) < BOLLINGER_WINDOW:
return None
# 获取实时数据
#realtime_data = fetch_realtime_data(etf_code)
#if realtime_data is None:
# return None
# 将实时数据添加到历史数据中
#latest_close = realtime_data['最新价'] #测试的时候发现,如果在交易时间内,其实获取历史数据也会把最新的数据记录的,放在收盘那列,所以无需再把最新价加进去
latest_close = historical_data['收盘'].tail(1).iloc[0]#最后一行的收盘价就是交易时间的最新价格
#latest_date = pd.Timestamp.now(tz='Asia/Shanghai').strftime('%Y-%m-%d')
#new_row = pd.DataFrame({'日期': [latest_date], '收盘': [latest_close]})#其实这个也不需要用到了
#historical_data = pd.concat([historical_data, new_row], ignore_index=True)

# 计算RSI6
current_rsi = calculate_rsi(historical_data, RSI_PERIOD)
if current_rsi is None:
return None

historical_data = calculate_bollinger_bands(historical_data, BOLLINGER_WINDOW, BOLLINGER_STD)
upper_band = historical_data['Upper Band'].iloc[-1]
lower_band = historical_data['Lower Band'].iloc[-1]
mid_band = historical_data['Middle Band'].iloc[-1]

status = f"{etf_name}({etf_code}) | 现价: {latest_close:.3f} | RSI{RSI_PERIOD}: {current_rsi:.3f} | 布林带上轨: {upper_band:.3f} | 布林带中轨: {mid_band:.3f} | 布林带下轨: {lower_band:.3f}"

alert_message = None
if current_rsi >= RSI_OVERBUY and latest_close >= upper_band:
if alert_manager.should_alert(etf_code, "overbuy"):
alert_message = f"🚨 建议卖出 {status}"
alert_manager.update_record(etf_code, "overbuy")
elif current_rsi <= RSI_OVERSOLD and latest_close <= lower_band:
if alert_manager.should_alert(etf_code, "oversold"):
alert_message = f"💡 建议买入 {status}"
alert_manager.update_record(etf_code, "oversold")

print(f"[{time.strftime('%H:%M')}] {status}")
return alert_message

def is_trading_time():
now = pd.Timestamp.now(tz='Asia/Shanghai')
if now.weekday() >= 5:
return False
return ((now.hour == 9 and now.minute >= 30) or
(10 <= now.hour <= 11) or
(13 <= now.hour <= 14) or
(now.hour == 15 and now.minute == 0))

def batch_monitor():
now = pd.Timestamp.now(tz='Asia/Shanghai')
if now.hour >= 15 and now.minute > 0:
print("交易时间已结束")
mail("hi,交易时间已结束,程序即将退出,今天你赚到钱了吗?")
os._exit(0) # 强制退出程序
if not is_trading_time():
return

print(f"\n=== 开始轮次监控 {now.strftime('%Y-%m-%d %H:%M')} ===")
alert_messages = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(monitor_etf, name, code) for name, code in ETF_LIST]
for future in futures:
try:
result = future.result()
if result:
alert_messages.append(result)
except Exception as e:
print(f"任务执行失败: {e}")
# 如果有警报信息,统一发送邮件
if alert_messages:
mail_content = "\n".join(alert_messages)
mail(mail_content)



if __name__ == "__main__":
print(f"启动ETF监控系统(共监控{len(ETF_LIST)}个品种)")
print("=" * 50)

mail("hi,ETF监控已启动,敬请关注今日推送")
scheduler = BlockingScheduler(timezone='Asia/Shanghai')

scheduler.add_job(
batch_monitor,
'interval',
minutes=CHECK_INTERVAL,
start_date="2024-01-01 09:15:00"
)


try:
scheduler.start()
except KeyboardInterrupt:
print("监控已手动停止")

自动化运行

1.由于是在树莓派上运行,arm架构会导致一些python包运行出问题,在网上找到了一个包可以进行安装
python3 -m venv myenv
2.创建一个虚拟环境安装完所需要的包,需要注意有个包需要手动下载安装,不然用akshare会有问题
pip install py_mini_racer-0.6.0-py3-none-linux_aarch64.whl
3.创建bash文件添加一下内容,log是每次覆盖
nano /home/pi/run_etf.sh
ctrl+x是保存

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
# 激活虚拟环境
source /home/pi/myenv/bin/activate

# 运行 etf.py 脚本,并将日志输出到 /home/pi/logs/etf.log
#python /home/pi/etf.py >> /home/pi/etf.log 2>&1 这个是追加内容模式,如果只有一个>,则是覆盖模式
python /home/pi/etf.py > /home/pi/etf.log 2>&1

# 退出虚拟环境
deactivate

顺便把文件权限设置下为可运行
chmod +x /home/pi/run_etf.sh
4.设置定时任务
crontab -e
添加一下内容
30 9 * * 1-5 /home/pi/run_etf.sh
可以用crontab -l查看定时任务是否添加成功

预测回归代码

这个买入和卖出只考虑rsi和boll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import pandas as pd
import akshare as ak
import numpy as np

# ===================== 配置区域 =====================
RSI_PERIOD = 6 # RSI计算周期
RSI_OVERSOLD = 20 # 超卖阈值
RSI_OVERBOUGHT = 80 # 超买阈值
INITIAL_CAPITAL = 10000 # 初始本金
START_DATE = "2024-06-01" # 开始回测的时间
BOLLINGER_WINDOW = 20 # 布林带计算窗口
BOLLINGER_STD = 2 # 布林带标准差倍数
ETF_LIST = [
# 格式: (ETF名称, 代码)
# 消费类
("主要消费ETF", "159928"), # 深交所
("酒ETF", "512690"), # 上交所
("细分食品饮料ETF", "515710"), # 上交所

# 医药类
("医疗ETF", "512170"), # 上交所
("生物医药ETF", "512290"), # 上交所

# 科技类
("科技龙头ETF", "515000"), # 上交所
("半导体芯片ETF", "159995"), # 深交所
("人工智能ETF", "515980"), # 上交所
("5G通信主题ETF", "515050"), # 上交所
("计算机主题ETF", "512720"), # 上交所

# 新能源类
("新能源汽车ETF", "515030"), # 上交所
("光伏产业ETF", "515790"), # 上交所
("碳中和ETF", "159790"), # 深交所

# 周期类
("有色金属ETF", "512400"), # 上交所
("煤炭ETF", "515220"), # 上交所
("钢铁ETF", "515210"), # 上交所
("化工ETF", "159870"), # 深交所

# 金融地产类
("银行ETF", "512800"), # 上交所
("证券ETF", "512880"), # 上交所
("房地产ETF", "512200"), # 上交所

# 宽基指数类
("沪深300ETF", "510300"), # 上交所
("创业板50ETF", "159949"), # 深交所

# 其他主题类
("军工ETF", "512660"), # 上交所
("黄金ETF", "518880"), # 上交所
("农业主题ETF", "159825"), # 深交所
("家电ETF", "159996"), # 深交所
("传媒ETF", "512980"), # 上交所
("畜牧养殖ETF", "159865"), # 深交所
("基建ETF", "516950"), # 上交所
("现代物流ETF", "516910"), # 上交所
("稀土产业ETF", "516780"), # 上交所
("旅游主题ETF", "159766"), # 深交所
("电力ETF", "159611"), # 深交所
("红利ETF", "510880"), # 上交所
("纳斯达克ETF", "159632"),
("标普500ETF", "513650")

] # 基金名称和代码列表
# ===================================================

def fetch_historical_data(etf_code):
"""获取历史数据"""
try:
df = ak.fund_etf_hist_em(symbol=etf_code, adjust="qfq")
if df.empty:
print(f"获取 {etf_code} 历史数据失败: 数据为空")
return None
df['日期'] = pd.to_datetime(df['日期'])
df = df.sort_values(by='日期', ascending=True)
return df
except Exception as e:
print(f"获取 {etf_code} 历史数据失败: {e}")
return None

def calculate_rsi(data, period=14):
"""计算RSI"""
close_prices = data['收盘']
if len(close_prices) < period + 1:
return None

deltas = close_prices.diff()
gains = deltas.where(deltas > 0, 0)
losses = -deltas.where(deltas < 0, 0)

avg_gain = gains.ewm(alpha=1 / period, adjust=False).mean()
avg_loss = losses.ewm(alpha=1 / period, adjust=False).mean()

rs = avg_gain / avg_loss
rsi = 100.0 - (100.0 / (1 + rs))
return rsi

def calculate_bollinger_bands(data, window=20, std=2):
"""计算布林带"""
data['Middle Band'] = data['收盘'].rolling(window=window).mean()
data['Std Dev'] = data['收盘'].rolling(window=window).std()
data['Upper Band'] = data['Middle Band'] + (data['Std Dev'] * std)
data['Lower Band'] = data['Middle Band'] - (data['Std Dev'] * std)
return data

def backtest_strategy(data, initial_capital):
"""回测策略"""
data['RSI'] = calculate_rsi(data, RSI_PERIOD)
data = calculate_bollinger_bands(data, BOLLINGER_WINDOW, BOLLINGER_STD)

data['Signal'] = 0
data['Position'] = 0
data['Profit'] = 0.0

position = 0
buy_price = 0.0
buy_count = 0
sell_count = 0
total_profit = 0.0
capital = initial_capital

# 用于记录交易的列表
trades = []

for i in range(1, len(data)):
current_price = data['收盘'].iloc[i]
if position == 0 and data['RSI'].iloc[i] < RSI_OVERSOLD and current_price <= data['Lower Band'].iloc[i]:
# 买入信号
position = 1
buy_price = current_price
buy_count += 1
trades.append({
'日期': data['日期'].iloc[i],
'价格': buy_price,
'RSI': data['RSI'].iloc[i],
'布林带': data['Lower Band'].iloc[i],
'操作': '买入'
})

elif position == 1 and (current_price >= data['Upper Band'].iloc[i] or data['RSI'].iloc[i] >= RSI_OVERBOUGHT):
# 卖出信号
position = 0
profit = (current_price - buy_price) / buy_price
total_profit += profit
capital *= (1 + profit)
sell_count += 1
trades.append({
'日期': data['日期'].iloc[i],
'价格': current_price,
'RSI': data['RSI'].iloc[i],
'布林带': data['Upper Band'].iloc[i],
'操作': '卖出'
})

return buy_count, sell_count, total_profit, capital, trades

def main():
results = []
for etf_name, etf_code in ETF_LIST:
print(f"\n开始回测基金: {etf_name} ({etf_code})")
data = fetch_historical_data(etf_code)
if data is None:
print(f"无法获取 {etf_code} 的历史数据,跳过该基金。")
continue

data = data[data['日期'] >= START_DATE]
if len(data) == 0:
print(f"没有找到 {START_DATE} 之后的数据,请调整开始时间。")
continue

buy_count, sell_count, total_profit, final_capital, trades = backtest_strategy(data, INITIAL_CAPITAL)

results.append({
'基金名称': etf_name,
'基金代码': etf_code,
'买入次数': buy_count,
'卖出次数': sell_count,
'总盈利': total_profit,
'初始本金': INITIAL_CAPITAL,
'最终本金': final_capital,
'最终盈利': final_capital - INITIAL_CAPITAL,
'年化收益': (total_profit / len(data) * 252) * 100,
'交易记录': trades
})

print("\n所有基金的回测结果:")
for result in results:
print(f"\n基金名称: {result['基金名称']} ({result['基金代码']})")
print(f"买入次数: {result['买入次数']}")
print(f"卖出次数: {result['卖出次数']}")
print(f"总盈利: {result['总盈利'] * 100:.2f}%")
print(f"初始本金: {result['初始本金']:.2f} 元")
print(f"最终本金: {result['最终本金']:.2f} 元")
print(f"最终盈利: {result['最终盈利']:.2f} 元")
print(f"年化收益: {result['年化收益']:.2f}%")
print("\n交易记录:")
for trade in result['交易记录']:
print(f"{trade['操作']} - 时间: {trade['日期']}, 价格: {trade['价格']:.2f} 元, RSI: {trade['RSI']:.2f}, 布林带: {trade['布林带']:.2f}")

if __name__ == "__main__":
main()

这个买入考虑rsi和boll,卖出只考虑盈利百分之几就卖,胜率更加高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import pandas as pd
import akshare as ak
import numpy as np

# ===================== 配置区域 =====================
RSI_PERIOD = 6 # RSI计算周期
RSI_OVERSOLD = 20 # 超卖阈值
INITIAL_CAPITAL = 10000 # 初始本金
START_DATE = "2015-01-01" # 开始回测的时间
BOLLINGER_WINDOW = 20 # 布林带计算窗口
BOLLINGER_STD = 2 # 布林带标准差倍数
PROFIT_TARGET = 0.02 # 盈利目标,2%
ETF_LIST = [
# 核心资产(防御性行业)
("主要消费ETF", "159928"), # 深交所:跟踪主要消费行业,现金流稳定,适合长期投资
("酒ETF", "512690"), # 上交所:白酒行业有高毛利率和品牌壁垒,长期潜力大,但需注意行业竞争和政策风险
("细分食品饮料ETF", "515710"), # 上交所:覆盖食品饮料细分领域,成长性较高

# 医药类
("医疗ETF", "512170"), # 上交所:跟踪医疗行业,流动性好,市值大,但需关注政策变化
("生物医药ETF", "512290"), # 上交所:生物医药行业有高技术壁垒,适合长期投资,但波动较大
("创新药ETF", "517100"), # 上交所:创新药行业研发成本高但市场潜力大,适合长期投资者

# 科技类
("科技龙头ETF", "515000"), # 上交所:跟踪科技行业龙头股,流动性好,市值大,适合长期持有
("半导体芯片ETF", "159995"), # 深交所:半导体行业是科技核心,技术壁垒高,波动大但成长性强
("人工智能ETF", "515980"), # 上交所:人工智能行业处于快速成长期,市场潜力巨大,但需关注技术发展

# 新能源类
("新能源汽车ETF", "515030"), # 上交所:新能源汽车行业受政策支持,市场前景广阔,但竞争激烈
("光伏产业ETF", "515790"), # 上交所:光伏行业受益于全球能源转型,成长性高,但受政策和原材料价格影响
("碳中和ETF", "159790"), # 深交所:碳中和是长期主题,政策支持力度大,适合长期投资

# 周期类
("有色金属ETF", "512400"), # 上交所:有色金属行业周期性波动大,适合波段操作,流动性较好
("煤炭ETF", "515220"), # 上交所:煤炭行业受政策和供需影响,波动较大,适合波段操作

# 金融地产类
("银行ETF", "512800"), # 上交所:银行行业稳定性高,分红率高,适合长期投资
("证券ETF", "512880"), # 上交所:证券行业波动较大,受市场环境影响,适合波段操作
("房地产ETF", "512200"), # 上交所:房地产行业受政策影响大,需关注政策变化和市场环境

# 宽基指数类
("沪深300ETF", "510300"), # 上交所:中国股市核心宽基指数,流动性好,市值大,适合长期投资
("创业板50ETF", "159949"), # 深交所:创业板核心成分股,成长性高,波动较大,适合风险承受能力强的投资者
("上证50ETF", "510050"), # 上交所:上证50成分股,稳定性高,分红率高,适合长期投资

# 红利类
("红利ETF", "510880"), # 上交所:跟踪红利指数,分红率高,适合长期投资者

# 其他主题类
("军工ETF", "512660"), # 上交所:军工行业受政策支持,市场潜力大,但波动较大,适合长期投资
("黄金ETF", "518880"), # 上交所:黄金具有避险属性,适合资产配置,流动性好
("稀土产业ETF", "516780"), # 上交所:稀土行业资源稀缺,市场潜力大,但受政策和市场环境影响,波动较大
("旅游主题ETF", "159766"), # 深交所:旅游行业受政策和市场环境影响,波动较大,适合波段操作

# 纳斯达克相关
("纳斯达克ETF", "159632"), # 深交所:跟踪纳斯达克指数,流动性好,市值大,适合长期投资

# 标普500相关
("标普500ETF", "513500") # 上交所:博时标普500ETF,流动性好,市值大,适合长期投资
]
# ===================================================

def fetch_historical_data(etf_code):
"""获取历史数据"""
try:
df = ak.fund_etf_hist_em(symbol=etf_code, adjust="qfq")
if df.empty:
print(f"获取 {etf_code} 历史数据失败: 数据为空")
return None
df['日期'] = pd.to_datetime(df['日期'])
df = df.sort_values(by='日期', ascending=True)
return df
except Exception as e:
print(f"获取 {etf_code} 历史数据失败: {e}")
return None

def calculate_rsi(data, period=14):
"""计算RSI"""
close_prices = data['收盘']
if len(close_prices) < period + 1:
return None

deltas = close_prices.diff()
gains = deltas.where(deltas > 0, 0)
losses = -deltas.where(deltas < 0, 0)

avg_gain = gains.ewm(alpha=1 / period, adjust=False).mean()
avg_loss = losses.ewm(alpha=1 / period, adjust=False).mean()

rs = avg_gain / avg_loss
rsi = 100.0 - (100.0 / (1 + rs))
return rsi

def calculate_bollinger_bands(data, window=20, std=2):
"""计算布林带"""
data['Middle Band'] = data['收盘'].rolling(window=window).mean()
data['Std Dev'] = data['收盘'].rolling(window=window).std(ddof=0)
data['Upper Band'] = data['Middle Band'] + (data['Std Dev'] * std)
data['Lower Band'] = data['Middle Band'] - (data['Std Dev'] * std)
return data

def backtest_strategy(data, initial_capital):
"""回测策略"""
data['RSI'] = calculate_rsi(data, RSI_PERIOD)
data = calculate_bollinger_bands(data, BOLLINGER_WINDOW, BOLLINGER_STD)

data['Signal'] = 0
data['Position'] = 0
data['Profit'] = 0.0

position = 0
buy_price = 0.0
buy_count = 0
sell_count = 0
total_profit = 0.0
capital = initial_capital

# 用于记录交易的列表
trades = []

for i in range(1, len(data)):
current_price = data['收盘'].iloc[i]
if position == 0 and data['RSI'].iloc[i] < RSI_OVERSOLD and current_price <= data['Lower Band'].iloc[i]:
# 买入信号
position = 1
buy_price = current_price
buy_count += 1
trades.append({
'日期': data['日期'].iloc[i],
'价格': buy_price,
'RSI': data['RSI'].iloc[i],
'布林带': data['Lower Band'].iloc[i],
'操作': '买入'
})

elif position == 1 and (current_price - buy_price) / buy_price >= PROFIT_TARGET:
# 卖出信号:仅考虑盈利目标
position = 0
profit = (current_price - buy_price) / buy_price
total_profit += profit
capital *= (1 + profit)
sell_count += 1
trades.append({
'日期': data['日期'].iloc[i],
'价格': current_price,
'RSI': data['RSI'].iloc[i],
'布林带': data['Upper Band'].iloc[i],
'操作': '卖出'
})

return buy_count, sell_count, total_profit, capital, trades

def main():
results = []
for etf_name, etf_code in ETF_LIST:
print(f"\n开始回测基金: {etf_name} ({etf_code})")
data = fetch_historical_data(etf_code)
if data is None:
print(f"无法获取 {etf_code} 的历史数据,跳过该基金。")
continue

data = data[data['日期'] >= START_DATE]
if len(data) == 0:
print(f"没有找到 {START_DATE} 之后的数据,请调整开始时间。")
continue

buy_count, sell_count, total_profit, final_capital, trades = backtest_strategy(data, INITIAL_CAPITAL)

results.append({
'基金名称': etf_name,
'基金代码': etf_code,
'买入次数': buy_count,
'卖出次数': sell_count,
'总盈利': total_profit,
'初始本金': INITIAL_CAPITAL,
'最终本金': final_capital,
'最终盈利': final_capital - INITIAL_CAPITAL,
'年化收益': (total_profit / len(data) * 252) * 100,
'交易记录': trades
})

print("\n所有基金的回测结果:")
for result in results:
print(f"\n基金名称: {result['基金名称']} ({result['基金代码']})")
print(f"买入次数: {result['买入次数']}")
print(f"卖出次数: {result['卖出次数']}")
print(f"总盈利: {result['总盈利'] * 100:.2f}%")
print(f"初始本金: {result['初始本金']:.2f} 元")
print(f"最终本金: {result['最终本金']:.2f} 元")
print(f"最终盈利: {result['最终盈利']:.2f} 元")
print(f"年化收益: {result['年化收益']:.2f}%")
print("\n交易记录:")
#for trade in result['交易记录']:
# print(f"{trade['操作']} - 时间: {trade['日期']}, 价格: {trade['价格']:.2f} 元, RSI: {trade['RSI']:.2f}, 布林带: {trade['布林带']:.2f}")

if __name__ == "__main__":
main()

拥抱AI,一起去感受这个星球最顶级的人做出来的东西!!!