bhyg-decomp/api.py
2024-07-01 00:27:51 +08:00

624 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import time
import urllib.parse
import hashlib
import hmac
import qrcode
import requests
from loguru import logger
from i18n import *
from utils import save, load
from globals import *
class BilibiliHyg:
global sdk
def __init__(self, config, sdk,client,session):
self.waited = False
self.sdk = sdk
self.config = config
self.config["gaia_vtoken"] = None
self.session = requests.Session()
if "user-agent" in self.config:
self.headers = {
"User-Agent": self.config["user-agent"],
}
else:
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)",
}
self.headers["Cookie"] = self.config["cookie"]
if self.config["proxy"]:
if self.config["proxy_channel"] != "0":
self.headers["kdl-tps-channel"] = config["proxy_channel"]
self.client = client
self.session = session
if self.client != None:
self.ip = self.client.tps_current_ip(sign_type="hmacsha1")
if self.config["mode"] == 'time':
logger.info(i18n_gt()["now_mode_time_on"])
logger.info(i18n_gt()["wait_get_token"])
while self.get_time() < self.config["time"]-15:
time.sleep(10)
logger.info(i18n_gt()["now_waiting_info"].format((self.config["time"]-self.get_time())))
while self.get_time() < self.config["time"]:
pass
logger.info(i18n_gt()["get_token_finish"])
self.token = self.get_token()
logger.info(i18n_gt()["will_pay_bill"])
def get_time(self):
return float(time.time() + self.config["time_offset"])
def get_ticket_status(self):
url = (
"https://show.bilibili.com/api/ticket/project/getV2?version=134&id="
+ self.config["project_id"]
)
try:
response = self.session.get(url, headers=self.headers, timeout=1)
except (
requests.exceptions.Timeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
):
logger.error(i18n_gt()["network_timeout"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
return self.get_ticket_status()
return -1, 0
try:
if response.status_code == 412:
logger.error(i18n_gt()["wind_control"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
return self.get_ticket_status()
else:
self.risk = True
logger.error(i18n_gt()["net_method"])
input(i18n_gt()["res_3_returns"])
input(i18n_gt()["res_2_returns"])
input(i18n_gt()["res_1_return"])
return -1, 0
screens = response.json()["data"]["screen_list"]
# 找到 字段id为screen_id的screen
screen = {}
for i in range(len(screens)):
if screens[i]["id"] == int(self.config["screen_id"]):
screen = screens[i]
break
if screen == {}:
logger.error(i18n_gt()["no_found_screen"])
return -1, 0
# 找到 字段id为sku_id的sku
skus = screen["ticket_list"]
sku = {}
for i in range(len(skus)):
if skus[i]["id"] == int(self.config["sku_id"]):
sku = skus[i]
break
if sku == {}:
logger.error(i18n_gt()["no_found_sku"])
return -1, 0
return int(sku["sale_flag_number"]), sku["clickable"]
except:
logger.error(i18n_gt()["may_wind_control"])
return -1, 0
def get_prepare(self):
url = (
"https://show.bilibili.com/api/ticket/order/prepare?project_id="
+ self.config["project_id"]
)
if self.config["gaia_vtoken"]:
url += "&gaia_vtoken=" + self.config["gaia_vtoken"]
data = {
"project_id": self.config["project_id"],
"screen_id": self.config["screen_id"],
"order_type": self.config["order_type"],
"count": self.config["count"],
"sku_id": self.config["sku_id"],
"token": "",
"newRisk": "true",
"requestSource": "neul-next",
}
if "act_id" in self.config:
data["act_id"] = self.config["act_id"]
response = self.session.post(url, headers=self.headers, data=data)
if response.status_code == 412:
logger.error(i18n_gt()["not_handled_412"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
return self.get_prepare()
if response.json()["errno"] != 0 and response.json()["errno"] != -401:
logger.error(response.json()["msg"])
return response.json()["data"]
def gee_verify(self, gt, challenge, token):
from geetest import run
time_start = time.time()
self.captcha_data = run(gt, challenge, token, mode = self.config["captcha"], key = self.config["rrocr"])
delta = time.time() - time_start
self.sdk.metrics.distribution(
key="gt_solve_time",
value=delta*1000,
unit="millisecond"
)
self.captcha_data["csrf"] = self.headers["Cookie"][
self.headers["Cookie"].index("bili_jct")
+ 9 : self.headers["Cookie"].index("bili_jct")
+ 41
]
self.captcha_data["token"] = token
success = self.session.post(
"https://api.bilibili.com/x/gaia-vgate/v1/validate",
headers=self.headers,
data=self.captcha_data,
).json()["data"]["is_valid"]
self.config["gaia_vtoken"] = token
self.captcha_data = None
if self.headers["Cookie"].find("x-bili-gaia-vtoken") != -1:
self.headers["Cookie"] = self.headers["Cookie"].split(
"; x-bili-gaia-vtoken"
)[0]
self.headers["Cookie"] += "; x-bili-gaia-vtoken=" + token
save(self.config)
return success
def phone_verify(self, token):
if "phone" in self.config:
phone = self.config["phone"]
else:
phone = input(i18n_gt()["input_phone_num"]+": ")
self.captcha_data = {
"code": phone,
}
self.captcha_data["csrf"] = self.headers["Cookie"][
self.headers["Cookie"].index("bili_jct")
+ 9 : self.headers["Cookie"].index("bili_jct")
+ 41
]
self.captcha_data["token"] = token
success = self.session.post(
"https://api.bilibili.com/x/gaia-vgate/v1/validate",
headers=self.headers,
data=self.captcha_data,
).json()["data"]["is_valid"]
if not success:
logger.error(i18n_gt()["input_verify_fail"])
if "phone" in self.config:
self.config.pop("phone")
return False
self.config["gaia_vtoken"] = token
self.captcha_data = None
if self.headers["Cookie"].find("x-bili-gaia-vtoken") != -1:
self.headers["Cookie"] = self.headers["Cookie"].split(
"; x-bili-gaia-vtoken"
)[0]
self.headers["Cookie"] += "; x-bili-gaia-vtoken=" + token
save(self.config)
return success
def confirm_info(self, token):
url = (
"https://show.bilibili.com/api/ticket/order/confirmInfo?token="
+ token
+ "&timestamp="
+ str(int(time.time() * 1000))
+ "&project_id="
+ self.config["project_id"]
+ "&requestSource=neul-next"
)
response = self.session.get(url, headers=self.headers)
if response.status_code == 412:
logger.error(i18n_gt()["not_handled_412"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
return self.confirm_info(token)
response = response.json()
logger.info(i18n_gt()["info_confirmed"])
logger.debug(response)
self.config["order_type"] = response["data"]["order_type"]
if response["data"]["act"] is not None:
logger.info(i18n_gt()["info_discount"])
self.config["act_id"] = response["data"]["act"]["act_id"]
return
def get_token(self):
info = self.get_prepare()
if info == {}:
logger.warning(i18n_gt()["info_no_ticket"])
time.sleep(1)
return self.get_token()
if info["token"]:
logger.success(
i18n_gt()["info_bill_ok"]
+ "https://show.bilibili.com/platform/confirmOrder.html?token="
+ info["token"]
)
self.sdk.add_breadcrumb(
category="prepare",
message=f'Order prepared as token:{info["token"]}',
level="info",
)
try:
self.confirm_info(info["token"])
except:
logger.error(i18n_gt()["info_bill_fail"])
return self.get_token()
return info["token"]
else:
logger.warning(i18n_gt()["info_wind_control"])
self.sdk.add_breadcrumb(
category="gaia",
message="Gaia found",
level="info",
)
riskParam = info["ga_data"]["riskParams"]
# https://api.bilibili.com/x/gaia-vgate/v1/register
risk = self.session.post(
"https://api.bilibili.com/x/gaia-vgate/v1/register",
headers=self.headers,
data=riskParam,
).json()
while risk["code"] != 0:
risk = self.session.post(
"https://api.bilibili.com/x/gaia-vgate/v1/register",
headers=self.headers,
data=riskParam,
).json()
if risk["data"]["type"] == "geetest":
logger.warning(i18n_gt()["type_captcha"])
gt, challenge, token = (
risk["data"]["geetest"]["gt"],
risk["data"]["geetest"]["challenge"],
risk["data"]["token"],
)
cap_data = self.gee_verify(gt, challenge, token)
while cap_data == False:
logger.error(i18n_gt()["input_verify_fail"])
return self.get_token()
logger.info(i18n_gt()["input_verify_success"])
elif risk["data"]["type"] == "phone":
logger.warning(i18n_gt()["type_mobile"])
token = risk["data"]["token"]
cap_data = self.phone_verify(token)
while cap_data == False:
logger.error(i18n_gt()["input_verify_fail"])
return self.get_token()
elif risk["data"]["type"] == "sms":
logger.warning(i18n_gt()["type_sms"])
logger.warning(i18n_gt()["unsupport_sms"])
elif risk["data"]["type"] == "biliword":
logger.warning(i18n_gt()["type_sms"])
logger.warning(i18n_gt()["unsupport_text"])
else:
logger.error(i18n_gt()["unknown_wind"])
logger.warning(i18n_gt()["unsupport_captcha"])
self.sdk.add_breadcrumb(
category="gaia",
message="Gaia passed",
level="info",
)
return self.get_token()
def generate_clickPosition(self) -> dict:
"""
生成虚假的点击事件
Returns:
dict: 点击坐标和时间
"""
# 生成随机的 x 和 y 坐标以下范围大概是1920x1080屏幕下可能的坐标
x = random.randint(1320, 1330)
y = random.randint(880, 890)
# 生成随机的起始时间和结束时间(或当前时间)
now_timestamp = int(time.time() * 1000)
# 添加一些随机时间差 (5s ~ 10s)
origin_timestamp = now_timestamp - random.randint(5000, 10000)
return {"x": x, "y": y, "origin": origin_timestamp, "now": now_timestamp}
def create_order(self):
url = "https://show.bilibili.com/api/ticket/order/createV2"
data = {
"project_id": self.config["project_id"],
"screen_id": self.config["screen_id"],
"sku_id": self.config["sku_id"],
"token": self.token,
"deviceId": "",
"project_id": self.config["project_id"],
"pay_money": self.config["all_price"],
"count": self.config["count"],
"timestamp": int(time.time() + 5),
"order_type": self.config["order_type"],
"newRisk": "true",
"requestSource": "neul-next",
"clickPosition": self.generate_clickPosition(),
}
if self.config["id_bind"] == 0:
data["buyer"] = self.config["buyer"]
data["tel"] = self.config["tel"]
else:
data["buyer_info"] = self.config["buyer_info"]
if self.config["is_paper_ticket"]:
data["deliver_info"] = self.config["deliver_info"]
if "act_id" in self.config:
data["act_id"] = self.config["act_id"]
data["again"] = 1
try:
response = self.session.post(url, headers=self.headers, data=data)
except (
requests.exceptions.Timeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
):
logger.error(i18n_gt()["network_timeout"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
return self.create_order()
if response.status_code == 412:
logger.error(i18n_gt()["wind_control"])
logger.info(response.text)
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
return self.create_order()
else:
self.risk = True
logger.error(i18n_gt()["pause_60s"])
time.sleep(60)
return {}
return response.json()
def fake_ticket(self, pay_token, order_id = None):
url = (
"https://show.bilibili.com/api/ticket/order/createstatus?project_id="
+ self.config["project_id"]
+ "&token="
+ pay_token
+ "&timestamp="
+ str(int(time.time() * 1000))
)
if order_id:
url += "&orderId=" + str(order_id)
logger.debug(url)
response = self.session.get(url, headers=self.headers)
if response.status_code == 412:
logger.error(i18n_gt()["not_handled_412"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
response = response.json()
logger.debug(response)
if response["errno"] == 0:
self.sdk.add_breadcrumb(
category="success",
message=f'Success, orderid:{response["data"]["order_id"]}, payurl:https://pay.bilibili.com/payplatform-h5/pccashier.html?params="{urllib.parse.quote(json.dumps(response["data"]["payParam"], ensure_ascii=False))}',
level="info",
)
logger.success(i18n_gt()["pay_success"])
order_id = response["data"]["order_id"]
pay_url = response["data"]["payParam"]["code_url"]
response["data"]["payParam"].pop("code_url")
response["data"]["payParam"].pop("expire_time")
response["data"]["payParam"].pop("pay_type")
response["data"]["payParam"].pop("use_huabei")
logger.info(i18n_gt()["bill_serial"] + order_id)
self.order_id = order_id
logger.info(i18n_gt()["bill_pay_hint"])
logger.info(i18n_gt()["bill_qr"] + pay_url)
qr = qrcode.QRCode()
qr.add_data(pay_url)
qr.print_ascii(invert=True)
img = qr.make_image()
img.show()
logger.info(
i18n_gt()["bill_open"] + " https://pay.bilibili.com/payplatform-h5/pccashier.html?params="
+ urllib.parse.quote(
json.dumps(response["data"]["payParam"], ensure_ascii=False)
)
+ " " + i18n_gt()["bill_pay_ok"]
)
logger.info(i18n_gt()["bill_manual"])
return True
else:
logger.error(i18n_gt()["bill_fail"])
return False
def order_status(self, order_id):
url = "https://show.bilibili.com/api/ticket/order/info?order_id=" + str(order_id)
response = self.session.get(url, headers=self.headers)
if response.status_code == 412:
logger.error(i18n_gt()["not_handled_412"])
if self.config["proxy"]:
if self.ip == self.client.tps_current_ip(sign_type="hmacsha1"):
logger.info(
i18n_gt()["manual_change_ip"].format(
self.client.change_tps_ip(sign_type="hmacsha1")
)
)
self.session.close()
response = response.json()
if response["data"]["status"] == 1:
return True
elif response["data"]["status"] == 2:
logger.success(i18n_gt()["pay_ok"])
return False
elif response["data"]["status"] == 4:
logger.warning(i18n_gt()["bill_cancel"])
return False
else:
logger.warning(
i18n_gt()["status_unknown"] + ": "
+ response["data"]["status_name"]
+ response["data"]["sub_status_name"]
)
return False
def logout(self):
#https://passport.bilibili.com/login/exit/v2
url = "https://passport.bilibili.com/login/exit/v2"
# biliCSRF str CSRF Token (位于 cookie 中的 bili_jct)
response = self.session.post(url, headers=self.headers, data={
"biliCSRF": self.headers["Cookie"][self.headers["Cookie"].index("bili_jct") + 9 : self.headers["Cookie"].index("bili_jct") + 41]
}).json()
if response["status"] == True:
logger.success(i18n_gt()["quit_login"])
else:
logger.error(i18n_gt()["logout_fail"])
def try_create_order(self):
if not self.waited:
logger.info(i18n_gt()["wait_4_96s"])
time.sleep(4.96)
self.waited = True
result = self.create_order()
if result == {}:
return False
if result["errno"] == 100009:
logger.warning(i18n_gt()["ticketless"])
self.waited = False
elif result["errno"] == 100017:
logger.warning(i18n_gt()["ticket_unbuyable"])
self.waited = False
elif result["errno"] == 3:
logger.warning(i18n_gt()["slowdown_5s"])
elif result["errno"] == 100001:
logger.warning(i18n_gt()["bili_speed_limit"])
elif result["errno"] == 100041:
logger.warning(i18n_gt()["tokenless"])
elif result["errno"] == 100016:
logger.error(i18n_gt()["not_salable"])
elif result["errno"] == 0:
logger.success(i18n_gt()["bill_push_ok"])
pay_token = result["data"]["token"]
orderid = None
if "orderId" in result["data"]:
orderid = result["data"]["orderId"]
if self.fake_ticket(pay_token, order_id = orderid):
# self.logout()
if "pushplus" in self.config:
# https://www.pushplus.plus/send/
url = "https://www.pushplus.plus/send"
response = requests.post(url, json={
"token": self.config["pushplus"],
"title": i18n_gt()["BHYG_notify"],
"content": i18n_gt()["rob_ok_paying"]+self.order_id,
}).json()
if response["code"] == 200:
logger.success(i18n_gt()["notify_ok"]+" "+response['data'])
else:
logger.error(i18n_gt()["notify_fail"]+" "+response)
if "webhook" in self.config:
url = self.config["webhook"]
response = requests.post(url, json={
"msg_type": "text",
"text": {
"content": i18n_gt()["rob_ok_paying"]+self.order_id,
}
}).json()
if response["code"] == 200:
logger.success(i18n_gt()["notify_ok"]+" "+response['data'])
else:
logger.error(i18n_gt()["notify_fail"]+" "+response)
if "hunter" in self.config:
return True
logger.info(i18n_gt()["unpaid_bill"])
while self.order_status(self.order_id):
time.sleep(1)
self.sdk.capture_message("Exit by in-app exit")
return True
else:
logger.error(i18n_gt()["fake_ticket"])
elif result["errno"] == 100051:
self.token = self.get_token()
elif result["errno"] == 100079 or result["errno"] == 100048:
logger.info(result["msg"])
logger.success(i18n_gt()["rob_already_ok"])
self.sdk.capture_message("Exit by in-app exit")
return True
elif result["errno"] == 219:
logger.info(i18n_gt()["ticket_sto_less"])
else:
logger.error(i18n_gt()["unknown_error"] + str(result))
return False
@staticmethod
def gen_bili_ticket():
def hmac_sha256(key, message):
"""
使用HMAC-SHA256算法对给定的消息进行加密
:param key: 密钥
:param message: 要加密的消息
:return: 加密后的哈希值
"""
key = key.encode("utf-8")
message = message.encode("utf-8")
hmac_obj = hmac.new(key, message, hashlib.sha256)
hash_value = hmac_obj.digest()
hash_hex = hash_value.hex()
return hash_hex
o = hmac_sha256("XgwSnGZ1p", f"ts{int(time.time())}")
url = "https://api.bilibili.com/bapis/bilibili.api.ticket.v1.Ticket/GenWebTicket"
params = {
"key_id": "ec02",
"hexsign": o,
"context[ts]": f"{int(time.time())}",
"csrf": "",
}
import random
headers = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 17_5 like Mac OS X) AppleWebKit/618.1.15.10.15 (KHTML, like Gecko) Mobile/21F90 BiliApp/77900100 os/ios model/iPhone 15 mobi_app/iphone build/77900100 osVer/17.5.1 network/2 channel/AppStore c_locale/zh-Hans_CN s_locale/zh-Hans_CH disable_rcmd/0 "+str(random.randint(0, 9999)),
}
resp = requests.post(url, params=params, headers=headers).json()
return resp["data"]["ticket"]