Support conversion AND logic rules

This commit is contained in:
Dorae 2024-05-27 14:27:16 +08:00 committed by GitHub
parent 38cc669559
commit 52e9a14b2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

78
main.py
View File

@ -1,4 +1,5 @@
import pandas as pd
import re
import concurrent.futures
import os
import json
@ -6,20 +7,49 @@ import requests
import yaml
import ipaddress
# 映射字典
MAP_DICT = {'DOMAIN-SUFFIX': 'domain_suffix', 'HOST-SUFFIX': 'domain_suffix', 'DOMAIN': 'domain', 'HOST': 'domain', 'host': 'domain',
'DOMAIN-KEYWORD':'domain_keyword', 'HOST-KEYWORD': 'domain_keyword', 'host-keyword': 'domain_keyword', 'IP-CIDR': 'ip_cidr',
'ip-cidr': 'ip_cidr', 'IP-CIDR6': 'ip_cidr',
'IP6-CIDR': 'ip_cidr','SRC-IP-CIDR': 'source_ip_cidr', 'GEOIP': 'geoip', 'DST-PORT': 'port',
'SRC-PORT': 'source_port', "URL-REGEX": "domain_regex"}
def read_yaml_from_url(url):
response = requests.get(url)
response.raise_for_status() # Raise an HTTPError for bad responses
response.raise_for_status()
yaml_data = yaml.safe_load(response.text)
return yaml_data
def read_list_from_url(url):
df = pd.read_csv(url, header=None, names=['pattern', 'address', 'other', 'other2', 'other3'])
filtered_rows = []
rules = []
# 处理逻辑规则
if 'AND' in df['pattern'].values:
and_rows = df[df['pattern'].str.contains('AND', na=False)]
for _, row in and_rows.iterrows():
rule = {
"type": "logical",
"mode": "and",
"rules": []
}
pattern = ",".join(row.values.astype(str))
components = re.findall(r'\((.*?)\)', pattern)
for component in components:
for keyword in MAP_DICT.keys():
if keyword in component:
match = re.search(f'{keyword},(.*)', component)
if match:
value = match.group(1)
rule["rules"].append({
MAP_DICT[keyword]: value
})
rules.append(rule)
for index, row in df.iterrows():
if 'AND' not in row['pattern']:
filtered_rows.append(row)
df_filtered = pd.DataFrame(filtered_rows, columns=['pattern', 'address', 'other', 'other2', 'other3'])
return df_filtered
return df_filtered, rules
def is_ipv4_or_ipv6(address):
try:
@ -62,9 +92,9 @@ def parse_and_convert_to_dataframe(link):
rows.append({'pattern': pattern.strip(), 'address': address.strip(), 'other': None})
df = pd.DataFrame(rows, columns=['pattern', 'address', 'other'])
except:
df = read_list_from_url(link)
df, rules_from_url = read_list_from_url(link)
else:
df = read_list_from_url(link)
df, rules_from_url = read_list_from_url(link)
return df
# 对字典进行排序含list of dict
@ -80,40 +110,21 @@ def sort_dict(obj):
def parse_list_file(link, output_directory):
with concurrent.futures.ThreadPoolExecutor() as executor:
# 使用executor.map并行处理链接
results = list(executor.map(parse_and_convert_to_dataframe, [link]))
# 拼接为一个DataFrame
df = pd.concat(results, ignore_index=True)
# 删除pattern中包含#号的行
df = df[~df['pattern'].str.contains('#')].reset_index(drop=True)
# 映射字典
map_dict = {'DOMAIN-SUFFIX': 'domain_suffix', 'HOST-SUFFIX': 'domain_suffix', 'DOMAIN': 'domain', 'HOST': 'domain', 'host': 'domain',
'DOMAIN-KEYWORD':'domain_keyword', 'HOST-KEYWORD': 'domain_keyword', 'host-keyword': 'domain_keyword', 'IP-CIDR': 'ip_cidr',
'ip-cidr': 'ip_cidr', 'IP-CIDR6': 'ip_cidr',
'IP6-CIDR': 'ip_cidr','SRC-IP-CIDR': 'source_ip_cidr', 'GEOIP': 'geoip', 'DST-PORT': 'port',
'SRC-PORT': 'source_port', "URL-REGEX": "domain_regex"}
# 删除不在字典中的pattern
df = df[df['pattern'].isin(map_dict.keys())].reset_index(drop=True)
# 删除重复行
df = df.drop_duplicates().reset_index(drop=True)
# 替换pattern为字典中的值
df['pattern'] = df['pattern'].replace(map_dict)
# 创建自定义文件夹
os.makedirs(output_directory, exist_ok=True)
results= list(executor.map(parse_and_convert_to_dataframe, [link])) # 使用executor.map并行处理链接
df = pd.concat(results, ignore_index=True) # 拼接为一个DataFrame
df = df[~df['pattern'].str.contains('#')].reset_index(drop=True) # 删除pattern中包含#号的行
df = df[df['pattern'].isin(MAP_DICT.keys())].reset_index(drop=True) # 删除不在字典中的pattern
df = df.drop_duplicates().reset_index(drop=True) # 删除重复行
df['pattern'] = df['pattern'].replace(MAP_DICT) # 替换pattern为字典中的值
os.makedirs(output_directory, exist_ok=True) # 创建自定义文件夹
result_rules = {"version": 1, "rules": []}
domain_entries = []
for pattern, addresses in df.groupby('pattern')['address'].apply(list).to_dict().items():
if pattern == 'domain_suffix':
rule_entry = {pattern: ['.' + address.strip() for address in addresses]}
result_rules["rules"].append(rule_entry)
# domain_entries.extend([address.strip() for address in addresses])
# domain_entries.extend([address.strip() for address in addresses]) # 1.9以下的版本需要额外处理 domain_suffix
elif pattern == 'domain':
domain_entries.extend([address.strip() for address in addresses])
else:
@ -124,6 +135,11 @@ def parse_list_file(link, output_directory):
if domain_entries:
result_rules["rules"].insert(0, {'domain': domain_entries})
# 处理逻辑规则
_, rules_from_url = read_list_from_url(link)
if rules_from_url:
result_rules["rules"].extend(rules_from_url)
# 使用 output_directory 拼接完整路径
file_name = os.path.join(output_directory, f"{os.path.basename(link).split('.')[0]}.json")
with open(file_name, 'w', encoding='utf-8') as output_file: