diff --git a/scripts/common.py b/scripts/common.py new file mode 100644 index 0000000..8668b09 --- /dev/null +++ b/scripts/common.py @@ -0,0 +1,92 @@ +import hashlib +import os +from datetime import datetime + +import mysql.connector +from minio import Minio +from minio.error import S3Error + +minio_public_url = 'http://58.215.212.230:8005/oss/' +# MySQL 连接配置 +db_config = { + # 'host': 'physical-mysql', + # 'port': 3306, + 'host': '192.168.50.100', + 'port': 23306, + 'user': 'root', + 'password': '123456', + 'database': 'physical-boot' +} + +# minio 配置 +minio_client = Minio( + # "physical-minio:9000", # MinIO服务器地址或IP + "192.168.50.100:29000", # MinIO服务器地址或IP + access_key="root", # 替换为你的Access Key + secret_key="12345678", # 替换为你的Secret Key + secure=False # 如果使用的是http则为False +) +bucket_name = 'physical' + + +def get_md5(input_string): + # 创建MD5对象 + md5_obj = hashlib.md5() + # 更新对象,注意字符串需要编码为字节 + md5_obj.update(input_string.encode('utf-8')) + # 返回MD5值的十六进制字符串 + return md5_obj.hexdigest() + + +def save_to_db_import_record(connection,data): + cursor = connection.cursor() + try: + """保存数据到 MySQL 数据库""" + insert_query = """INSERT INTO `import_record` (`id`, `create_by`, `create_time`, `update_by`, `update_time`, `sys_org_code`, `device_type`, `device_name`, `device_mode`, `device_function`, `device_batch`, `manufacturer`, `experiment_date`, `data_source`, `experiment_user`, `total_count`, `file_list`) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""" + cursor.execute(insert_query, data) + connection.commit() + except Exception as e: + print(e) + finally: + cursor.close() + + +def save_to_db_oss_file(connection,data): + cursor = connection.cursor() + try: + """保存数据到 MySQL 数据库""" + insert_query = """INSERT INTO `oss_file` (`id`,`file_name`,`url`,`create_by`,`create_time` ) + VALUES (%s, %s, %s, %s, %s);""" + cursor.execute(insert_query, data) + connection.commit() + except Exception as e: + print(e) + finally: + cursor.close() + + +def upload_to_minio(connection,folder_path,type): + if not minio_client.bucket_exists(bucket_name): + minio_client.make_bucket(bucket_name) + + folder_name = os.path.basename(folder_path) + # 遍历文件夹中的所有文件,并上传 + file_ids = [] + for file_name in os.listdir(folder_path): + file_path = os.path.join(folder_path, file_name) + # 检查是否是文件,忽略非文件类型 + if os.path.isfile(file_path): + object_name = f"{type}/{folder_name}/{file_name}" + try: + # 上传文件到 MinIO + minio_client.fput_object(bucket_name, object_name, file_path) + print(f"已上传: {file_path} -> {bucket_name}/{object_name}") + file_id = get_md5(object_name) + file_ids.append(file_id) + db_file = [file_id, file_name, + minio_public_url + bucket_name + '/' + object_name, 'admin', datetime.now()] + save_to_db_oss_file(connection,db_file) + except S3Error as err: + print(f"上传 {file_name} 时出错: {err}") + return file_ids diff --git a/scripts/esa.py b/scripts/esa.py index d51271c..dbfed67 100644 --- a/scripts/esa.py +++ b/scripts/esa.py @@ -1,5 +1,4 @@ import csv -import hashlib import os import re from datetime import datetime @@ -7,102 +6,19 @@ from datetime import datetime import mysql.connector import requests from bs4 import BeautifulSoup -from minio import Minio -from minio.error import S3Error + +from common import upload_to_minio, save_to_db_import_record, db_config # 设置下载目录 download_dir = 'downloaded_files' os.makedirs(download_dir, exist_ok=True) -# MySQL 连接配置 -db_config = { - 'host': '192.168.50.100', - 'port': 23306, - 'user': 'root', - 'password': '123456', - 'database': 'physical-boot' -} -connection = mysql.connector.connect(**db_config) -cursor = connection.cursor() - -# minio 配置 -minio_client = Minio( - "192.168.50.100:29000", # MinIO服务器地址或IP - access_key="root", # 替换为你的Access Key - secret_key="12345678", # 替换为你的Secret Key - secure=False # 如果使用的是http则为False -) -bucket_name = 'physical' - - -def get_md5(input_string): - # 创建MD5对象 - md5_obj = hashlib.md5() - - # 更新对象,注意字符串需要编码为字节 - md5_obj.update(input_string.encode('utf-8')) - - # 返回MD5值的十六进制字符串 - return md5_obj.hexdigest() - - -def save_to_mysql(data): - try: - """保存数据到 MySQL 数据库""" - - insert_query = """INSERT INTO `import_record` (`id`, `create_by`, `create_time`, `update_by`, `update_time`, `sys_org_code`, `device_type`, `device_name`, `device_mode`, `device_function`, `device_batch`, `manufacturer`, `experiment_date`, `data_source`, `experiment_user`, `total_count`, `file_list`) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""" - - cursor.execute(insert_query, data) - connection.commit() - except Exception as e: - # 处理 ZeroDivisionError 的代码 - print(e) - - # 获取网页内容 url = 'https://esarad.esa.int/' -def save_to_db_file(data): - try: - """保存数据到 MySQL 数据库""" - insert_query = """INSERT INTO `oss_file` (`id`,`file_name`,`url`,`create_by`,`create_time` ) - VALUES (%s, %s, %s, %s, %s);""" - cursor.execute(insert_query, data) - connection.commit() - except Exception as e: - # 处理 ZeroDivisionError 的代码 - print(e) - - -def upload_to_minio(folder_path): - if not minio_client.bucket_exists(bucket_name): - minio_client.make_bucket(bucket_name) - - folder_name = os.path.basename(folder_path) - # 遍历文件夹中的所有文件,并上传 - file_ids = [] - for file_name in os.listdir(folder_path): - file_path = os.path.join(folder_path, file_name) - # 检查是否是文件,忽略非文件类型 - if os.path.isfile(file_path): - object_name = f"ESA/{folder_name}/{file_name}" - try: - # 上传文件到 MinIO - minio_client.fput_object(bucket_name, object_name, file_path) - print(f"已上传: {file_path} -> {bucket_name}/{object_name}") - file_id = get_md5(object_name) - file_ids.append(file_id) - db_file = [file_id, file_name, - 'http://58.215.212.230:8005/oss/' + bucket_name + '/' + object_name, 'admin', datetime.now()] - save_to_db_file(db_file) - except S3Error as err: - print(f"上传 {file_name} 时出错: {err}") - return file_ids - - def scrape(): + esa_connection = mysql.connector.connect(**db_config) try: response = requests.get(url) @@ -286,7 +202,7 @@ def scrape(): else: print(f'Failed to download: {download_url}') - upload_ids = upload_to_minio(folder_path) + upload_ids = upload_to_minio(esa_connection, folder_path,'ESA') data_db = ['ESA-' + cells[0].get_text(strip=True), 'Crawler', datetime.now(), None, None, None, cells[5].get_text(strip=True), cells[1].get_text(strip=True), cells[1].get_text(strip=True), @@ -295,10 +211,9 @@ def scrape(): cells[11].get_text(strip=True), 'ESA', None, None, ','.join(upload_ids) ] - save_to_mysql(data_db) + save_to_db_import_record(esa_connection, data_db) else: print(f'Error: {response.status_code}') finally: # 关闭游标和连接 - cursor.close() - connection.close() + esa_connection.close() diff --git a/scripts/nasa1.py b/scripts/nasa1.py index 4e44f6a..bf326a6 100644 --- a/scripts/nasa1.py +++ b/scripts/nasa1.py @@ -1,8 +1,13 @@ -import requests -import os -import time -import re import csv +import os +import re +import time +from datetime import datetime + +import mysql.connector +import requests + +from common import db_config, upload_to_minio, save_to_db_import_record, get_md5 # 定义 API URL api_url = 'https://radhome.gsfc.nasa.gov/radhome/dev/parts.cfc?method=getParts' @@ -28,7 +33,6 @@ data = { 'sord': 'asc', } - # 创建文件夹以保存文件 os.makedirs('downloaded_files', exist_ok=True) @@ -57,158 +61,183 @@ csv_header = [ "测试参数单位", "测试参数结果", "是否为加速试验后数据", "是否为退火数据", "退火温度", "退火时间", "原始数据", "数据处理方法", "其他需要说明的事项" ] + + def scrape(): + nasa1_connection = mysql.connector.connect(**db_config) + try: # 发送请求 - response = requests.post(api_url, headers=headers, data=data) - response.raise_for_status() # 检查请求是否成功 + response = requests.post(api_url, headers=headers, data=data) + response.raise_for_status() # 检查请求是否成功 - # 解析 JSON 数据 - json_data = response.json() - # 遍历数据并下载文件 - for row in json_data['ROWS']: - part_number = row[0] # 部件编号 - file_links_str = row[4] # 文件链接 + # 解析 JSON 数据 + json_data = response.json() + # 遍历数据并下载文件 + for row in json_data['ROWS']: + part_number = row[0] # 部件编号 + file_links_str = row[4] # 文件链接 - # 使用正则表达式分隔文件名 - file_links = re.split(r';|(?<=\.pdf)', file_links_str) + # 使用正则表达式分隔文件名 + file_links = re.split(r';|(?<=\.pdf)', file_links_str) - # 创建目录 - part_number_dir = os.path.join('downloaded_files', part_number) - os.makedirs(part_number_dir, exist_ok=True) + # 创建目录 + folder_path = os.path.join('downloaded_files', part_number) + os.makedirs(folder_path, exist_ok=True) - # 创建 CSV 文件 - csv_file_path = os.path.join(part_number_dir, 'data.csv') - with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file: - csv_writer = csv.writer(csv_file) - csv_writer.writerow(csv_header) # 写入表头 + # 创建 CSV 文件 + csv_file_path = os.path.join(folder_path, 'data.csv') + with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file: + csv_writer = csv.writer(csv_file) + csv_writer.writerow(csv_header) # 写入表头 - # 填写 CSV 数据 - csv_row = [ - "", # 序号 - "", # 试验对象类型 - row[3], # 试验开始日期 - "", # 试验结束日期 - "", # 试验对象名称 - row[1], # 试验对象型号 - "", # 试验对象数量 - row[6], # 试验性质 - "", # 试验目的 - "", # 装置名称 - "", # 数据提供单位 - "", # 试验委托单位 - "", # 失效判据 - "", # 失效数量 - "", # 试验结果描述 - "", # 成果 - "", # 来源项目名称 - "", # 来源项目类型 - "", # 分类 - "", # 元器件名称 - "", # 元器件型号 - "", # 元器件批号 - row[2], # 生产单位 - "", # 是否国产 - "", # 元器件成熟度 - "", # 晶圆材料 - "", # 晶圆批号 - "", # 封装材料 - "", # 封装技术 - "", # 是否倒装 - "", # 制造工艺 - "", # 工艺特征尺寸 - "", # 工艺平台 - "", # 工艺代号 - "", # 工艺版本 - "", # 质量等级 - "", # 加固措施 - "", # 工作原理 - "", # 供货能力 - "", # 应用经历 - "", # 规范手册 - "", # 器件图片 - "", # 电子系统分类 - "", # 电子系统名称 - "", # 电子系统型号 - "", # 生产单位 - "", # 电子系统功能 - "", # 电子系统加固措施 - "", # 电子系统图片 - "", # 材料名称 - "", # 材料型号 - "", # 材料组分 - "", # 材料用途 - "", # 材料生产单位 - "", # 材料物理结构 - "", # 材料使用经历 - "", # 辐照试验大纲 - "", # 大纲审核专家类别 - "", # 辐照试验所依据的标准规范 - "", # 试验步骤(过程)描述 - "", # 辐照过程是否加电 - "", # 直流偏置条件描述 - "", # 交流偏置条件描述 - "", # 时钟频率 - "", # 测试图形 - "", # 其他偏置条件 - "", # 辐照偏置原理图 - "", # 测试方式 - "", # 测试原理图 - "", # 试验用仪器名称 - "", # 试验用仪器型号 - "", # 试验用仪器生产厂家 - "", # 试验用仪器检定证书 - "", # 试验用软件名称 - "", # 试验用软件开发单位 - "", # 试验用软件版本号 - "", # 试验现场照片 - "", # 测试人员姓名 - "", # 测试人员单位 - "", # 测试人员电话 - "", # 装置运行人员 - "", # 第三方人员 - "", # 第三方人员单位 - "", # 第三方人员电话 - "", # 其他需要说明的事项 - "", # 是否采用铅铝屏蔽 - "", # 剂量率 - "", # 总剂量 - "", # 剂量等效材料 - "", # 试验对象编号 - "", # 测试参数名称 - "", # 测试参数单位 - "", # 测试参数结果 - "", # 是否为加速试验后数据 - "", # 是否为退火数据 - "", # 退火温度 - "", # 退火时间 - "", # 原始数据 - "", # 数据处理方法 - "", # 其他需要说明的事项 - ] + # 填写 CSV 数据 + csv_row = [ + "", # 序号 + "", # 试验对象类型 + row[3], # 试验开始日期 + "", # 试验结束日期 + "", # 试验对象名称 + row[1], # 试验对象型号 + "", # 试验对象数量 + row[6], # 试验性质 + "", # 试验目的 + "", # 装置名称 + "", # 数据提供单位 + "", # 试验委托单位 + "", # 失效判据 + "", # 失效数量 + "", # 试验结果描述 + "", # 成果 + "", # 来源项目名称 + "", # 来源项目类型 + "", # 分类 + "", # 元器件名称 + "", # 元器件型号 + "", # 元器件批号 + row[2], # 生产单位 + "", # 是否国产 + "", # 元器件成熟度 + "", # 晶圆材料 + "", # 晶圆批号 + "", # 封装材料 + "", # 封装技术 + "", # 是否倒装 + "", # 制造工艺 + "", # 工艺特征尺寸 + "", # 工艺平台 + "", # 工艺代号 + "", # 工艺版本 + "", # 质量等级 + "", # 加固措施 + "", # 工作原理 + "", # 供货能力 + "", # 应用经历 + "", # 规范手册 + "", # 器件图片 + "", # 电子系统分类 + "", # 电子系统名称 + "", # 电子系统型号 + "", # 生产单位 + "", # 电子系统功能 + "", # 电子系统加固措施 + "", # 电子系统图片 + "", # 材料名称 + "", # 材料型号 + "", # 材料组分 + "", # 材料用途 + "", # 材料生产单位 + "", # 材料物理结构 + "", # 材料使用经历 + "", # 辐照试验大纲 + "", # 大纲审核专家类别 + "", # 辐照试验所依据的标准规范 + "", # 试验步骤(过程)描述 + "", # 辐照过程是否加电 + "", # 直流偏置条件描述 + "", # 交流偏置条件描述 + "", # 时钟频率 + "", # 测试图形 + "", # 其他偏置条件 + "", # 辐照偏置原理图 + "", # 测试方式 + "", # 测试原理图 + "", # 试验用仪器名称 + "", # 试验用仪器型号 + "", # 试验用仪器生产厂家 + "", # 试验用仪器检定证书 + "", # 试验用软件名称 + "", # 试验用软件开发单位 + "", # 试验用软件版本号 + "", # 试验现场照片 + "", # 测试人员姓名 + "", # 测试人员单位 + "", # 测试人员电话 + "", # 装置运行人员 + "", # 第三方人员 + "", # 第三方人员单位 + "", # 第三方人员电话 + "", # 其他需要说明的事项 + "", # 是否采用铅铝屏蔽 + "", # 剂量率 + "", # 总剂量 + "", # 剂量等效材料 + "", # 试验对象编号 + "", # 测试参数名称 + "", # 测试参数单位 + "", # 测试参数结果 + "", # 是否为加速试验后数据 + "", # 是否为退火数据 + "", # 退火温度 + "", # 退火时间 + "", # 原始数据 + "", # 数据处理方法 + "", # 其他需要说明的事项 + ] - # 写入 CSV 行 - csv_writer.writerow(csv_row) + # 写入 CSV 行 + csv_writer.writerow(csv_row) - # 下载文件 - for file_name in file_links: - file_name = file_name.strip() # 去除空格 - if file_name: - # 拼接文件完整 URL - file_url = file_name if file_name.startswith('http') else file_prefix + file_name + # 下载文件 + for file_name in file_links: + file_name = file_name.strip() # 去除空格 + if file_name: + # 拼接文件完整 URL + file_url = file_name if file_name.startswith('http') else file_prefix + file_name - try: - # 下载文件 - file_response = requests.get(file_url) - file_response.raise_for_status() + try: + # 下载文件 + file_response = requests.get(file_url) + file_response.raise_for_status() - # 保存文件 - file_path = os.path.join(part_number_dir, os.path.basename(file_url)) - with open(file_path, 'wb') as file: - file.write(file_response.content) - print(f"NASA Download file: {file_path}") - except requests.RequestException as e: - print(f"NASA Download file error : {file_url},error: {e}") + # 保存文件 + file_path = os.path.join(folder_path, os.path.basename(file_url)) + with open(file_path, 'wb') as file: + file.write(file_response.content) + print(f"NASA Download file: {file_path}") + except requests.RequestException as e: + print(f"NASA Download file error : {file_url},error: {e}") + + upload_ids = upload_to_minio(nasa1_connection, folder_path,'NASA') + device_type=row[6] + device_name=row[0] + device_mode=row[0] + device_function=row[1] + device_batch=None + manufacturer=row[2] + experiment_date=row[3] + data_db = [get_md5(row[0]), 'Crawler', datetime.now(), None, None, None, + device_type, + device_name, device_mode, + device_function, + device_batch, manufacturer, + experiment_date, + 'NASA', None, None, ','.join(upload_ids) + ] + save_to_db_import_record(nasa1_connection, data_db) + finally: + # 关闭游标和连接 + nasa1_connection.close() # 输出数据 # for row in json_data['ROWS']: # print(row) diff --git a/scripts/nasa2.py b/scripts/nasa2.py index eed25ea..932b4be 100644 --- a/scripts/nasa2.py +++ b/scripts/nasa2.py @@ -1,7 +1,12 @@ +import csv +import os +from datetime import datetime + +import mysql.connector import requests from lxml import html -import os -import csv + +from common import db_config, upload_to_minio, save_to_db_import_record # 定义页面 URL url = 'https://radhome.gsfc.nasa.gov/radhome/papers/TIDPart.html' @@ -30,159 +35,183 @@ csv_header = [ "测试参数单位", "测试参数结果", "是否为加速试验后数据", "是否为退火数据", "退火温度", "退火时间", "原始数据", "数据处理方法", "其他需要说明的事项" ] + + def scrape(): - # 发送请求 - response = requests.get(url) - response.raise_for_status() + nasa2_connection = mysql.connector.connect(**db_config) + try: + # 发送请求 + response = requests.get(url) + response.raise_for_status() - # 使用 lxml 解析 HTML - tree = html.fromstring(response.text) + # 使用 lxml 解析 HTML + tree = html.fromstring(response.text) - # 找到目标表格 - table = tree.xpath('/html/body/table[3]')[0] # 获取第一个表格 + # 找到目标表格 + table = tree.xpath('/html/body/table[3]')[0] # 获取第一个表格 - # 遍历表格行,跳过表头 - for row in table.xpath('.//tr')[1:]: # 跳过表头 - columns = row.xpath('.//td') - if len(columns) < 8: - continue # 跳过不完整的行 + # 遍历表格行,跳过表头 + for row in table.xpath('.//tr')[1:]: # 跳过表头 + columns = row.xpath('.//td') + if len(columns) < 8: + continue # 跳过不完整的行 + part_number = columns[2].text_content().strip() # 第三列 Part Number + file_link_tag = columns[7].xpath('//a') # 第八列的 标签 - part_number = columns[2].text_content().strip() # 第三列 Part Number - file_link_tag = columns[7].xpath('//a') # 第八列的 标签 + # 获取文件名和下载链接 + file_url = 'https://radhome.gsfc.nasa.gov/' + file_link_tag[0].get('href') - # 获取文件名和下载链接 - file_url = 'https://radhome.gsfc.nasa.gov/' + file_link_tag[0].get('href') + # 创建目录 + folder_path = os.path.join('downloaded_files', part_number) + os.makedirs(folder_path, exist_ok=True) - # 创建目录 - part_number_dir = os.path.join('downloaded_files', part_number) - os.makedirs(part_number_dir, exist_ok=True) + # 创建 CSV 文件 + csv_file_path = os.path.join(folder_path, 'data.csv') + with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file: + csv_writer = csv.writer(csv_file) + csv_writer.writerow(csv_header) # 写入表头 - # 创建 CSV 文件 - csv_file_path = os.path.join(part_number_dir, 'data.csv') - with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file: - csv_writer = csv.writer(csv_file) - csv_writer.writerow(csv_header) # 写入表头 + # 填写 CSV 数据 + csv_row = [ + "", # 序号 + "", # 试验对象类型 + columns[8].text_content().strip(), # 试验开始日期 + "", # 试验结束日期 + "", # 试验对象名称 + columns[2].text_content().strip(), # 试验对象型号 + "", # 试验对象数量 + columns[4].text_content().strip(), # 试验性质 + "", # 试验目的 + "", # 装置名称 + "", # 数据提供单位 + "", # 试验委托单位 + "", # 失效判据 + "", # 失效数量 + "", # 试验结果描述 + "", # 成果 + "", # 来源项目名称 + "", # 来源项目类型 + "", # 分类 + "", # 元器件名称 + "", # 元器件型号 + "", # 元器件批号 + columns[5].text_content().strip(), # 生产单位 + "", # 是否国产 + "", # 元器件成熟度 + "", # 晶圆材料 + "", # 晶圆批号 + "", # 封装材料 + "", # 封装技术 + "", # 是否倒装 + "", # 制造工艺 + "", # 工艺特征尺寸 + "", # 工艺平台 + "", # 工艺代号 + "", # 工艺版本 + "", # 质量等级 + "", # 加固措施 + "", # 工作原理 + "", # 供货能力 + "", # 应用经历 + "", # 规范手册 + "", # 器件图片 + "", # 电子系统分类 + "", # 电子系统名称 + "", # 电子系统型号 + "", # 生产单位 + "", # 电子系统功能 + "", # 电子系统加固措施 + "", # 电子系统图片 + "", # 材料名称 + "", # 材料型号 + "", # 材料组分 + "", # 材料用途 + "", # 材料生产单位 + "", # 材料物理结构 + "", # 材料使用经历 + "", # 辐照试验大纲 + "", # 大纲审核专家类别 + "", # 辐照试验所依据的标准规范 + "", # 试验步骤(过程)描述 + "", # 辐照过程是否加电 + "", # 直流偏置条件描述 + "", # 交流偏置条件描述 + "", # 时钟频率 + "", # 测试图形 + "", # 其他偏置条件 + "", # 辐照偏置原理图 + "", # 测试方式 + "", # 测试原理图 + "", # 试验用仪器名称 + "", # 试验用仪器型号 + "", # 试验用仪器生产厂家 + "", # 试验用仪器检定证书 + "", # 试验用软件名称 + "", # 试验用软件开发单位 + "", # 试验用软件版本号 + "", # 试验现场照片 + "", # 测试人员姓名 + "", # 测试人员单位 + "", # 测试人员电话 + "", # 装置运行人员 + "", # 第三方人员 + "", # 第三方人员单位 + "", # 第三方人员电话 + "", # 其他需要说明的事项 + "", # 是否采用铅铝屏蔽 + "", # 剂量率 + "", # 总剂量 + "", # 剂量等效材料 + "", # 试验对象编号 + "", # 测试参数名称 + "", # 测试参数单位 + "", # 测试参数结果 + "", # 是否为加速试验后数据 + "", # 是否为退火数据 + "", # 退火温度 + "", # 退火时间 + "", # 原始数据 + "", # 数据处理方法 + "", # 其他需要说明的事项 + ] - # 填写 CSV 数据 - csv_row = [ - "", # 序号 - "", # 试验对象类型 - columns[8].text_content().strip(), # 试验开始日期 - "", # 试验结束日期 - "", # 试验对象名称 - columns[2].text_content().strip(), # 试验对象型号 - "", # 试验对象数量 - columns[4].text_content().strip(), # 试验性质 - "", # 试验目的 - "", # 装置名称 - "", # 数据提供单位 - "", # 试验委托单位 - "", # 失效判据 - "", # 失效数量 - "", # 试验结果描述 - "", # 成果 - "", # 来源项目名称 - "", # 来源项目类型 - "", # 分类 - "", # 元器件名称 - "", # 元器件型号 - "", # 元器件批号 - columns[5].text_content().strip(), # 生产单位 - "", # 是否国产 - "", # 元器件成熟度 - "", # 晶圆材料 - "", # 晶圆批号 - "", # 封装材料 - "", # 封装技术 - "", # 是否倒装 - "", # 制造工艺 - "", # 工艺特征尺寸 - "", # 工艺平台 - "", # 工艺代号 - "", # 工艺版本 - "", # 质量等级 - "", # 加固措施 - "", # 工作原理 - "", # 供货能力 - "", # 应用经历 - "", # 规范手册 - "", # 器件图片 - "", # 电子系统分类 - "", # 电子系统名称 - "", # 电子系统型号 - "", # 生产单位 - "", # 电子系统功能 - "", # 电子系统加固措施 - "", # 电子系统图片 - "", # 材料名称 - "", # 材料型号 - "", # 材料组分 - "", # 材料用途 - "", # 材料生产单位 - "", # 材料物理结构 - "", # 材料使用经历 - "", # 辐照试验大纲 - "", # 大纲审核专家类别 - "", # 辐照试验所依据的标准规范 - "", # 试验步骤(过程)描述 - "", # 辐照过程是否加电 - "", # 直流偏置条件描述 - "", # 交流偏置条件描述 - "", # 时钟频率 - "", # 测试图形 - "", # 其他偏置条件 - "", # 辐照偏置原理图 - "", # 测试方式 - "", # 测试原理图 - "", # 试验用仪器名称 - "", # 试验用仪器型号 - "", # 试验用仪器生产厂家 - "", # 试验用仪器检定证书 - "", # 试验用软件名称 - "", # 试验用软件开发单位 - "", # 试验用软件版本号 - "", # 试验现场照片 - "", # 测试人员姓名 - "", # 测试人员单位 - "", # 测试人员电话 - "", # 装置运行人员 - "", # 第三方人员 - "", # 第三方人员单位 - "", # 第三方人员电话 - "", # 其他需要说明的事项 - "", # 是否采用铅铝屏蔽 - "", # 剂量率 - "", # 总剂量 - "", # 剂量等效材料 - "", # 试验对象编号 - "", # 测试参数名称 - "", # 测试参数单位 - "", # 测试参数结果 - "", # 是否为加速试验后数据 - "", # 是否为退火数据 - "", # 退火温度 - "", # 退火时间 - "", # 原始数据 - "", # 数据处理方法 - "", # 其他需要说明的事项 - ] + # 写入 CSV 行 + csv_writer.writerow(csv_row) - # 写入 CSV 行 - csv_writer.writerow(csv_row) - - try: - # 下载文件 - file_response = requests.get(file_url) - file_response.raise_for_status() - - # 保存文件 - file_path = os.path.join(part_number_dir, os.path.basename(file_url)) - with open(file_path, 'wb') as file: - file.write(file_response.content) - print(f"NASA2 Download file: {file_path}") - except requests.RequestException as e: - print(f"NASA2 Download file error: {file_url},error: {e}") + try: + # 下载文件 + file_response = requests.get(file_url) + file_response.raise_for_status() + # 保存文件 + file_path = os.path.join(folder_path, os.path.basename(file_url)) + with open(file_path, 'wb') as file: + file.write(file_response.content) + print(f"NASA2 Download file: {file_path}") + except requests.RequestException as e: + print(f"NASA2 Download file error: {file_url},error: {e}") + upload_ids = upload_to_minio(nasa2_connection, folder_path, 'NASA') + device_type = columns[4].text_content().strip() + device_name = columns[2].text_content().strip() + device_mode = columns[2].text_content().strip() + device_function = columns[3].text_content().strip() + device_batch = None + manufacturer = None + experiment_date = columns[8].text_content().strip() + data_db = ['NASA-' + columns[0].text_content().strip(), 'Crawler', datetime.now(), None, None, None, + device_type, + device_name, device_mode, + device_function, + device_batch, manufacturer, + experiment_date, + 'NASA', None, None, ','.join(upload_ids) + ] + save_to_db_import_record(nasa2_connection, data_db) + except Exception as e: + print(f"error: {e}") + finally: + # 关闭游标和连接 + nasa2_connection.close() # 输出数据 # for row in table.xpath('.//tr')[1:]: # columns = row.xpath('.//td')