feat: Enhance Temporary Email Service with Domain Blocking and Improved Error Handling

- Add domain blocking mechanism to filter out problematic email domains
- Implement dynamic domain list retrieval from external source
- Improve error handling and logging in email creation process
- Add fallback service switching when domains are blocked
- Update localization files with new error and blocking-related messages
- Increment version to 1.4.06
This commit is contained in:
yeongpin
2025-02-25 23:53:55 +08:00
parent 17c2b4b243
commit 78dee025a7
6 changed files with 203 additions and 34 deletions

View File

@@ -23,6 +23,47 @@ class NewTempEmail:
self.token = None
self.email = None
self.password = None
self.blocked_domains = self.get_blocked_domains()
def get_blocked_domains(self):
"""Get blocked domains list"""
try:
block_url = "https://raw.githubusercontent.com/yeongpin/cursor-free-vip/main/block_domain.txt"
response = requests.get(block_url, timeout=5)
if response.status_code == 200:
# Split text and remove empty lines
domains = [line.strip() for line in response.text.split('\n') if line.strip()]
if self.translator:
print(f"{Fore.CYAN} {self.translator.get('email.blocked_domains_loaded', count=len(domains))}{Style.RESET_ALL}")
else:
print(f"{Fore.CYAN} 已加载 {len(domains)} 个被屏蔽的域名{Style.RESET_ALL}")
return domains
return []
except Exception as e:
if self.translator:
print(f"{Fore.YELLOW}⚠️ {self.translator.get('email.blocked_domains_error', error=str(e))}{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}⚠️ 获取被屏蔽域名列表失败: {str(e)}{Style.RESET_ALL}")
return []
def exclude_blocked_domains(self, domains):
"""Exclude blocked domains"""
if not self.blocked_domains:
return domains
filtered_domains = []
for domain in domains:
if domain['domain'] not in self.blocked_domains:
filtered_domains.append(domain)
excluded_count = len(domains) - len(filtered_domains)
if excluded_count > 0:
if self.translator:
print(f"{Fore.YELLOW}⚠️ {self.translator.get('email.domains_excluded', domains=excluded_count)}{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}⚠️ 已排除 {excluded_count} 个被屏蔽的域名{Style.RESET_ALL}")
return filtered_domains
def _generate_credentials(self):
"""生成随机用户名和密码"""
@@ -39,42 +80,112 @@ class NewTempEmail:
print(f"{Fore.CYAN} 正在访问 {self.selected_service['name']}...{Style.RESET_ALL}")
# 获取可用域名列表
domains_response = requests.get(f"{self.api_url}/domains")
if domains_response.status_code != 200:
raise Exception(f"{self.translator.get('email.failed_to_get_available_domains')}")
try:
domains_response = requests.get(f"{self.api_url}/domains", timeout=10)
if domains_response.status_code != 200:
print(f"{Fore.RED}{self.translator.get('email.domains_list_error', error=domains_response.status_code)}{Style.RESET_ALL}")
print(f"{Fore.RED}{self.translator.get('email.domains_list_error', error=domains_response.text)}{Style.RESET_ALL}")
raise Exception(f"{self.translator.get('email.failed_to_get_available_domains') if self.translator else 'Failed to get available domains'}")
domains = domains_response.json()["hydra:member"]
print(f"{Fore.CYAN} {self.translator.get('email.available_domains_loaded', count=len(domains))}{Style.RESET_ALL}")
domains = domains_response.json()["hydra:member"]
if not domains:
raise Exception(f"{self.translator.get('email.no_available_domains')}")
if not domains:
raise Exception(f"{self.translator.get('email.no_available_domains') if self.translator else '没有可用域名'}")
except Exception as e:
print(f"{Fore.RED}❌ 获取域名列表时出错: {str(e)}{Style.RESET_ALL}")
raise
# 排除被屏蔽的域名
try:
filtered_domains = self.exclude_blocked_domains(domains)
if self.translator:
print(f"{Fore.CYAN} {self.translator.get('email.domains_filtered', count=len(filtered_domains))}{Style.RESET_ALL}")
else:
print(f"{Fore.CYAN} 过滤后剩余 {len(filtered_domains)} 个可用域名{Style.RESET_ALL}")
if not filtered_domains:
if self.translator:
print(f"{Fore.RED}{self.translator.get('email.all_domains_blocked')}{Style.RESET_ALL}")
else:
print(f"{Fore.RED}❌ 所有域名都被屏蔽了,尝试切换服务{Style.RESET_ALL}")
# 切换到另一个服务
for service in self.services:
if service["api_url"] != self.api_url:
self.selected_service = service
self.api_url = service["api_url"]
if self.translator:
print(f"{Fore.CYAN} {self.translator.get('email.switching_service', service=service['name'])}{Style.RESET_ALL}")
else:
print(f"{Fore.CYAN} 切换到 {service['name']} 服务{Style.RESET_ALL}")
return self.create_email() # 递归调用
raise Exception(f"{self.translator.get('email.no_available_domains_after_filtering') if self.translator else '过滤后没有可用域名'}")
except Exception as e:
print(f"{Fore.RED}❌ 过滤域名时出错: {str(e)}{Style.RESET_ALL}")
raise
# 生成随机用户名和密码
username, password = self._generate_credentials()
self.password = password
try:
username, password = self._generate_credentials()
self.password = password
# 创建邮箱账户
selected_domain = filtered_domains[0]['domain']
email = f"{username}@{selected_domain}"
print(f"{Fore.CYAN} 尝试创建邮箱: {email}{Style.RESET_ALL}")
account_data = {
"address": email,
"password": password
}
except Exception as e:
print(f"{Fore.RED}❌ 生成凭据时出错: {str(e)}{Style.RESET_ALL}")
raise
# 创建邮箱账户
email = f"{username}@{domains[0]['domain']}"
account_data = {
"address": email,
"password": password
}
create_response = requests.post(f"{self.api_url}/accounts", json=account_data)
if create_response.status_code != 201:
raise Exception(f"{self.translator.get('email.failed_to_create_account')}")
# 创建账户
try:
create_response = requests.post(f"{self.api_url}/accounts", json=account_data, timeout=15)
if create_response.status_code != 201:
print(f"{Fore.RED}❌ 创建账户失败: 状态码 {create_response.status_code}{Style.RESET_ALL}")
print(f"{Fore.RED}❌ 响应内容: {create_response.text}{Style.RESET_ALL}")
# 如果是域名问题,尝试下一个域名
if len(filtered_domains) > 1 and ("domain" in create_response.text.lower() or "address" in create_response.text.lower()):
print(f"{Fore.YELLOW}⚠️ 尝试使用下一个可用域名...{Style.RESET_ALL}")
# 将当前域名添加到屏蔽列表
if selected_domain not in self.blocked_domains:
self.blocked_domains.append(selected_domain)
# 递归调用自己
return self.create_email()
raise Exception(f"{self.translator.get('email.failed_to_create_account') if self.translator else '创建账户失败'}")
except Exception as e:
print(f"{Fore.RED}❌ 创建账户时出错: {str(e)}{Style.RESET_ALL}")
raise
# 获取访问令牌
token_data = {
"address": email,
"password": password
}
token_response = requests.post(f"{self.api_url}/token", json=token_data)
if token_response.status_code != 200:
raise Exception(f"{self.translator.get('email.failed_to_get_access_token')}")
try:
token_data = {
"address": email,
"password": password
}
token_response = requests.post(f"{self.api_url}/token", json=token_data, timeout=10)
if token_response.status_code != 200:
print(f"{Fore.RED}❌ 获取令牌失败: 状态码 {token_response.status_code}{Style.RESET_ALL}")
print(f"{Fore.RED}❌ 响应内容: {token_response.text}{Style.RESET_ALL}")
raise Exception(f"{self.translator.get('email.failed_to_get_access_token') if self.translator else '获取访问令牌失败'}")
self.token = token_response.json()["token"]
self.email = email
except Exception as e:
print(f"{Fore.RED}❌ 获取令牌时出错: {str(e)}{Style.RESET_ALL}")
raise
self.token = token_response.json()["token"]
self.email = email
if self.translator:
print(f"{Fore.GREEN}{self.translator.get('email.create_success')}: {email}{Style.RESET_ALL}")
else: