From 52e9a14b2cc4f28351c58a260f065291df5a7d7e Mon Sep 17 00:00:00 2001 From: Dorae <86833913+Toperlock@users.noreply.github.com> Date: Mon, 27 May 2024 14:27:16 +0800 Subject: [PATCH] Support conversion `AND` logic rules --- main.py | 78 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/main.py b/main.py index 22e587e..c614b7f 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import pandas as pd +import re import concurrent.futures import os import json @@ -6,20 +7,49 @@ import requests import yaml import ipaddress +# 映射字典 +MAP_DICT = {'DOMAIN-SUFFIX': 'domain_suffix', 'HOST-SUFFIX': 'domain_suffix', 'DOMAIN': 'domain', 'HOST': 'domain', 'host': 'domain', + 'DOMAIN-KEYWORD':'domain_keyword', 'HOST-KEYWORD': 'domain_keyword', 'host-keyword': 'domain_keyword', 'IP-CIDR': 'ip_cidr', + 'ip-cidr': 'ip_cidr', 'IP-CIDR6': 'ip_cidr', + 'IP6-CIDR': 'ip_cidr','SRC-IP-CIDR': 'source_ip_cidr', 'GEOIP': 'geoip', 'DST-PORT': 'port', + 'SRC-PORT': 'source_port', "URL-REGEX": "domain_regex"} + def read_yaml_from_url(url): response = requests.get(url) - response.raise_for_status() # Raise an HTTPError for bad responses + response.raise_for_status() yaml_data = yaml.safe_load(response.text) return yaml_data def read_list_from_url(url): df = pd.read_csv(url, header=None, names=['pattern', 'address', 'other', 'other2', 'other3']) filtered_rows = [] + rules = [] + # 处理逻辑规则 + if 'AND' in df['pattern'].values: + and_rows = df[df['pattern'].str.contains('AND', na=False)] + for _, row in and_rows.iterrows(): + rule = { + "type": "logical", + "mode": "and", + "rules": [] + } + pattern = ",".join(row.values.astype(str)) + components = re.findall(r'\((.*?)\)', pattern) + for component in components: + for keyword in MAP_DICT.keys(): + if keyword in component: + match = re.search(f'{keyword},(.*)', component) + if match: + value = match.group(1) + rule["rules"].append({ + MAP_DICT[keyword]: value + }) + rules.append(rule) for index, row in df.iterrows(): if 'AND' not in row['pattern']: filtered_rows.append(row) df_filtered = pd.DataFrame(filtered_rows, columns=['pattern', 'address', 'other', 'other2', 'other3']) - return df_filtered + return df_filtered, rules def is_ipv4_or_ipv6(address): try: @@ -62,9 +92,9 @@ def parse_and_convert_to_dataframe(link): rows.append({'pattern': pattern.strip(), 'address': address.strip(), 'other': None}) df = pd.DataFrame(rows, columns=['pattern', 'address', 'other']) except: - df = read_list_from_url(link) + df, rules_from_url = read_list_from_url(link) else: - df = read_list_from_url(link) + df, rules_from_url = read_list_from_url(link) return df # 对字典进行排序,含list of dict @@ -80,40 +110,21 @@ def sort_dict(obj): def parse_list_file(link, output_directory): with concurrent.futures.ThreadPoolExecutor() as executor: - # 使用executor.map并行处理链接 - results = list(executor.map(parse_and_convert_to_dataframe, [link])) - # 拼接为一个DataFrame - df = pd.concat(results, ignore_index=True) - - # 删除pattern中包含#号的行 - df = df[~df['pattern'].str.contains('#')].reset_index(drop=True) - - # 映射字典 - map_dict = {'DOMAIN-SUFFIX': 'domain_suffix', 'HOST-SUFFIX': 'domain_suffix', 'DOMAIN': 'domain', 'HOST': 'domain', 'host': 'domain', - 'DOMAIN-KEYWORD':'domain_keyword', 'HOST-KEYWORD': 'domain_keyword', 'host-keyword': 'domain_keyword', 'IP-CIDR': 'ip_cidr', - 'ip-cidr': 'ip_cidr', 'IP-CIDR6': 'ip_cidr', - 'IP6-CIDR': 'ip_cidr','SRC-IP-CIDR': 'source_ip_cidr', 'GEOIP': 'geoip', 'DST-PORT': 'port', - 'SRC-PORT': 'source_port', "URL-REGEX": "domain_regex"} - - # 删除不在字典中的pattern - df = df[df['pattern'].isin(map_dict.keys())].reset_index(drop=True) - - # 删除重复行 - df = df.drop_duplicates().reset_index(drop=True) - # 替换pattern为字典中的值 - df['pattern'] = df['pattern'].replace(map_dict) - - # 创建自定义文件夹 - os.makedirs(output_directory, exist_ok=True) + results= list(executor.map(parse_and_convert_to_dataframe, [link])) # 使用executor.map并行处理链接 + df = pd.concat(results, ignore_index=True) # 拼接为一个DataFrame + df = df[~df['pattern'].str.contains('#')].reset_index(drop=True) # 删除pattern中包含#号的行 + df = df[df['pattern'].isin(MAP_DICT.keys())].reset_index(drop=True) # 删除不在字典中的pattern + df = df.drop_duplicates().reset_index(drop=True) # 删除重复行 + df['pattern'] = df['pattern'].replace(MAP_DICT) # 替换pattern为字典中的值 + os.makedirs(output_directory, exist_ok=True) # 创建自定义文件夹 result_rules = {"version": 1, "rules": []} domain_entries = [] - for pattern, addresses in df.groupby('pattern')['address'].apply(list).to_dict().items(): if pattern == 'domain_suffix': rule_entry = {pattern: ['.' + address.strip() for address in addresses]} result_rules["rules"].append(rule_entry) - # domain_entries.extend([address.strip() for address in addresses]) + # domain_entries.extend([address.strip() for address in addresses]) # 1.9以下的版本需要额外处理 domain_suffix elif pattern == 'domain': domain_entries.extend([address.strip() for address in addresses]) else: @@ -124,6 +135,11 @@ def parse_list_file(link, output_directory): if domain_entries: result_rules["rules"].insert(0, {'domain': domain_entries}) + # 处理逻辑规则 + _, rules_from_url = read_list_from_url(link) + if rules_from_url: + result_rules["rules"].extend(rules_from_url) + # 使用 output_directory 拼接完整路径 file_name = os.path.join(output_directory, f"{os.path.basename(link).split('.')[0]}.json") with open(file_name, 'w', encoding='utf-8') as output_file: