This commit is contained in:
ls
2024-10-23 17:39:31 +08:00
parent 5e0bbbd757
commit e7851a09bc
4 changed files with 445 additions and 380 deletions

92
scripts/common.py Normal file
View File

@@ -0,0 +1,92 @@
import hashlib
import os
from datetime import datetime
import mysql.connector
from minio import Minio
from minio.error import S3Error
minio_public_url = 'http://58.215.212.230:8005/oss/'
# MySQL 连接配置
db_config = {
# 'host': 'physical-mysql',
# 'port': 3306,
'host': '192.168.50.100',
'port': 23306,
'user': 'root',
'password': '123456',
'database': 'physical-boot'
}
# minio 配置
minio_client = Minio(
# "physical-minio:9000", # MinIO服务器地址或IP
"192.168.50.100:29000", # MinIO服务器地址或IP
access_key="root", # 替换为你的Access Key
secret_key="12345678", # 替换为你的Secret Key
secure=False # 如果使用的是http则为False
)
bucket_name = 'physical'
def get_md5(input_string):
# 创建MD5对象
md5_obj = hashlib.md5()
# 更新对象,注意字符串需要编码为字节
md5_obj.update(input_string.encode('utf-8'))
# 返回MD5值的十六进制字符串
return md5_obj.hexdigest()
def save_to_db_import_record(connection,data):
cursor = connection.cursor()
try:
"""保存数据到 MySQL 数据库"""
insert_query = """INSERT INTO `import_record` (`id`, `create_by`, `create_time`, `update_by`, `update_time`, `sys_org_code`, `device_type`, `device_name`, `device_mode`, `device_function`, `device_batch`, `manufacturer`, `experiment_date`, `data_source`, `experiment_user`, `total_count`, `file_list`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""
cursor.execute(insert_query, data)
connection.commit()
except Exception as e:
print(e)
finally:
cursor.close()
def save_to_db_oss_file(connection,data):
cursor = connection.cursor()
try:
"""保存数据到 MySQL 数据库"""
insert_query = """INSERT INTO `oss_file` (`id`,`file_name`,`url`,`create_by`,`create_time` )
VALUES (%s, %s, %s, %s, %s);"""
cursor.execute(insert_query, data)
connection.commit()
except Exception as e:
print(e)
finally:
cursor.close()
def upload_to_minio(connection,folder_path,type):
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
folder_name = os.path.basename(folder_path)
# 遍历文件夹中的所有文件,并上传
file_ids = []
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
# 检查是否是文件,忽略非文件类型
if os.path.isfile(file_path):
object_name = f"{type}/{folder_name}/{file_name}"
try:
# 上传文件到 MinIO
minio_client.fput_object(bucket_name, object_name, file_path)
print(f"已上传: {file_path} -> {bucket_name}/{object_name}")
file_id = get_md5(object_name)
file_ids.append(file_id)
db_file = [file_id, file_name,
minio_public_url + bucket_name + '/' + object_name, 'admin', datetime.now()]
save_to_db_oss_file(connection,db_file)
except S3Error as err:
print(f"上传 {file_name} 时出错: {err}")
return file_ids

View File

@@ -1,5 +1,4 @@
import csv import csv
import hashlib
import os import os
import re import re
from datetime import datetime from datetime import datetime
@@ -7,102 +6,19 @@ from datetime import datetime
import mysql.connector import mysql.connector
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from minio import Minio
from minio.error import S3Error from common import upload_to_minio, save_to_db_import_record, db_config
# 设置下载目录 # 设置下载目录
download_dir = 'downloaded_files' download_dir = 'downloaded_files'
os.makedirs(download_dir, exist_ok=True) os.makedirs(download_dir, exist_ok=True)
# MySQL 连接配置
db_config = {
'host': '192.168.50.100',
'port': 23306,
'user': 'root',
'password': '123456',
'database': 'physical-boot'
}
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
# minio 配置
minio_client = Minio(
"192.168.50.100:29000", # MinIO服务器地址或IP
access_key="root", # 替换为你的Access Key
secret_key="12345678", # 替换为你的Secret Key
secure=False # 如果使用的是http则为False
)
bucket_name = 'physical'
def get_md5(input_string):
# 创建MD5对象
md5_obj = hashlib.md5()
# 更新对象,注意字符串需要编码为字节
md5_obj.update(input_string.encode('utf-8'))
# 返回MD5值的十六进制字符串
return md5_obj.hexdigest()
def save_to_mysql(data):
try:
"""保存数据到 MySQL 数据库"""
insert_query = """INSERT INTO `import_record` (`id`, `create_by`, `create_time`, `update_by`, `update_time`, `sys_org_code`, `device_type`, `device_name`, `device_mode`, `device_function`, `device_batch`, `manufacturer`, `experiment_date`, `data_source`, `experiment_user`, `total_count`, `file_list`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""
cursor.execute(insert_query, data)
connection.commit()
except Exception as e:
# 处理 ZeroDivisionError 的代码
print(e)
# 获取网页内容 # 获取网页内容
url = 'https://esarad.esa.int/' url = 'https://esarad.esa.int/'
def save_to_db_file(data):
try:
"""保存数据到 MySQL 数据库"""
insert_query = """INSERT INTO `oss_file` (`id`,`file_name`,`url`,`create_by`,`create_time` )
VALUES (%s, %s, %s, %s, %s);"""
cursor.execute(insert_query, data)
connection.commit()
except Exception as e:
# 处理 ZeroDivisionError 的代码
print(e)
def upload_to_minio(folder_path):
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
folder_name = os.path.basename(folder_path)
# 遍历文件夹中的所有文件,并上传
file_ids = []
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
# 检查是否是文件,忽略非文件类型
if os.path.isfile(file_path):
object_name = f"ESA/{folder_name}/{file_name}"
try:
# 上传文件到 MinIO
minio_client.fput_object(bucket_name, object_name, file_path)
print(f"已上传: {file_path} -> {bucket_name}/{object_name}")
file_id = get_md5(object_name)
file_ids.append(file_id)
db_file = [file_id, file_name,
'http://58.215.212.230:8005/oss/' + bucket_name + '/' + object_name, 'admin', datetime.now()]
save_to_db_file(db_file)
except S3Error as err:
print(f"上传 {file_name} 时出错: {err}")
return file_ids
def scrape(): def scrape():
esa_connection = mysql.connector.connect(**db_config)
try: try:
response = requests.get(url) response = requests.get(url)
@@ -286,7 +202,7 @@ def scrape():
else: else:
print(f'Failed to download: {download_url}') print(f'Failed to download: {download_url}')
upload_ids = upload_to_minio(folder_path) upload_ids = upload_to_minio(esa_connection, folder_path,'ESA')
data_db = ['ESA-' + cells[0].get_text(strip=True), 'Crawler', datetime.now(), None, None, None, data_db = ['ESA-' + cells[0].get_text(strip=True), 'Crawler', datetime.now(), None, None, None,
cells[5].get_text(strip=True), cells[5].get_text(strip=True),
cells[1].get_text(strip=True), cells[1].get_text(strip=True), cells[1].get_text(strip=True), cells[1].get_text(strip=True),
@@ -295,10 +211,9 @@ def scrape():
cells[11].get_text(strip=True), cells[11].get_text(strip=True),
'ESA', None, None, ','.join(upload_ids) 'ESA', None, None, ','.join(upload_ids)
] ]
save_to_mysql(data_db) save_to_db_import_record(esa_connection, data_db)
else: else:
print(f'Error: {response.status_code}') print(f'Error: {response.status_code}')
finally: finally:
# 关闭游标和连接 # 关闭游标和连接
cursor.close() esa_connection.close()
connection.close()

View File

@@ -1,8 +1,13 @@
import requests
import os
import time
import re
import csv import csv
import os
import re
import time
from datetime import datetime
import mysql.connector
import requests
from common import db_config, upload_to_minio, save_to_db_import_record, get_md5
# 定义 API URL # 定义 API URL
api_url = 'https://radhome.gsfc.nasa.gov/radhome/dev/parts.cfc?method=getParts' api_url = 'https://radhome.gsfc.nasa.gov/radhome/dev/parts.cfc?method=getParts'
@@ -28,7 +33,6 @@ data = {
'sord': 'asc', 'sord': 'asc',
} }
# 创建文件夹以保存文件 # 创建文件夹以保存文件
os.makedirs('downloaded_files', exist_ok=True) os.makedirs('downloaded_files', exist_ok=True)
@@ -57,158 +61,183 @@ csv_header = [
"测试参数单位", "测试参数结果", "是否为加速试验后数据", "是否为退火数据", "测试参数单位", "测试参数结果", "是否为加速试验后数据", "是否为退火数据",
"退火温度", "退火时间", "原始数据", "数据处理方法", "其他需要说明的事项" "退火温度", "退火时间", "原始数据", "数据处理方法", "其他需要说明的事项"
] ]
def scrape(): def scrape():
nasa1_connection = mysql.connector.connect(**db_config)
try:
# 发送请求 # 发送请求
response = requests.post(api_url, headers=headers, data=data) response = requests.post(api_url, headers=headers, data=data)
response.raise_for_status() # 检查请求是否成功 response.raise_for_status() # 检查请求是否成功
# 解析 JSON 数据 # 解析 JSON 数据
json_data = response.json() json_data = response.json()
# 遍历数据并下载文件 # 遍历数据并下载文件
for row in json_data['ROWS']: for row in json_data['ROWS']:
part_number = row[0] # 部件编号 part_number = row[0] # 部件编号
file_links_str = row[4] # 文件链接 file_links_str = row[4] # 文件链接
# 使用正则表达式分隔文件名 # 使用正则表达式分隔文件名
file_links = re.split(r';|(?<=\.pdf)', file_links_str) file_links = re.split(r';|(?<=\.pdf)', file_links_str)
# 创建目录 # 创建目录
part_number_dir = os.path.join('downloaded_files', part_number) folder_path = os.path.join('downloaded_files', part_number)
os.makedirs(part_number_dir, exist_ok=True) os.makedirs(folder_path, exist_ok=True)
# 创建 CSV 文件 # 创建 CSV 文件
csv_file_path = os.path.join(part_number_dir, 'data.csv') csv_file_path = os.path.join(folder_path, 'data.csv')
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file: with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
csv_writer = csv.writer(csv_file) csv_writer = csv.writer(csv_file)
csv_writer.writerow(csv_header) # 写入表头 csv_writer.writerow(csv_header) # 写入表头
# 填写 CSV 数据 # 填写 CSV 数据
csv_row = [ csv_row = [
"", # 序号 "", # 序号
"", # 试验对象类型 "", # 试验对象类型
row[3], # 试验开始日期 row[3], # 试验开始日期
"", # 试验结束日期 "", # 试验结束日期
"", # 试验对象名称 "", # 试验对象名称
row[1], # 试验对象型号 row[1], # 试验对象型号
"", # 试验对象数量 "", # 试验对象数量
row[6], # 试验性质 row[6], # 试验性质
"", # 试验目的 "", # 试验目的
"", # 装置名称 "", # 装置名称
"", # 数据提供单位 "", # 数据提供单位
"", # 试验委托单位 "", # 试验委托单位
"", # 失效判据 "", # 失效判据
"", # 失效数量 "", # 失效数量
"", # 试验结果描述 "", # 试验结果描述
"", # 成果 "", # 成果
"", # 来源项目名称 "", # 来源项目名称
"", # 来源项目类型 "", # 来源项目类型
"", # 分类 "", # 分类
"", # 元器件名称 "", # 元器件名称
"", # 元器件型号 "", # 元器件型号
"", # 元器件批号 "", # 元器件批号
row[2], # 生产单位 row[2], # 生产单位
"", # 是否国产 "", # 是否国产
"", # 元器件成熟度 "", # 元器件成熟度
"", # 晶圆材料 "", # 晶圆材料
"", # 晶圆批号 "", # 晶圆批号
"", # 封装材料 "", # 封装材料
"", # 封装技术 "", # 封装技术
"", # 是否倒装 "", # 是否倒装
"", # 制造工艺 "", # 制造工艺
"", # 工艺特征尺寸 "", # 工艺特征尺寸
"", # 工艺平台 "", # 工艺平台
"", # 工艺代号 "", # 工艺代号
"", # 工艺版本 "", # 工艺版本
"", # 质量等级 "", # 质量等级
"", # 加固措施 "", # 加固措施
"", # 工作原理 "", # 工作原理
"", # 供货能力 "", # 供货能力
"", # 应用经历 "", # 应用经历
"", # 规范手册 "", # 规范手册
"", # 器件图片 "", # 器件图片
"", # 电子系统分类 "", # 电子系统分类
"", # 电子系统名称 "", # 电子系统名称
"", # 电子系统型号 "", # 电子系统型号
"", # 生产单位 "", # 生产单位
"", # 电子系统功能 "", # 电子系统功能
"", # 电子系统加固措施 "", # 电子系统加固措施
"", # 电子系统图片 "", # 电子系统图片
"", # 材料名称 "", # 材料名称
"", # 材料型号 "", # 材料型号
"", # 材料组分 "", # 材料组分
"", # 材料用途 "", # 材料用途
"", # 材料生产单位 "", # 材料生产单位
"", # 材料物理结构 "", # 材料物理结构
"", # 材料使用经历 "", # 材料使用经历
"", # 辐照试验大纲 "", # 辐照试验大纲
"", # 大纲审核专家类别 "", # 大纲审核专家类别
"", # 辐照试验所依据的标准规范 "", # 辐照试验所依据的标准规范
"", # 试验步骤(过程)描述 "", # 试验步骤(过程)描述
"", # 辐照过程是否加电 "", # 辐照过程是否加电
"", # 直流偏置条件描述 "", # 直流偏置条件描述
"", # 交流偏置条件描述 "", # 交流偏置条件描述
"", # 时钟频率 "", # 时钟频率
"", # 测试图形 "", # 测试图形
"", # 其他偏置条件 "", # 其他偏置条件
"", # 辐照偏置原理图 "", # 辐照偏置原理图
"", # 测试方式 "", # 测试方式
"", # 测试原理图 "", # 测试原理图
"", # 试验用仪器名称 "", # 试验用仪器名称
"", # 试验用仪器型号 "", # 试验用仪器型号
"", # 试验用仪器生产厂家 "", # 试验用仪器生产厂家
"", # 试验用仪器检定证书 "", # 试验用仪器检定证书
"", # 试验用软件名称 "", # 试验用软件名称
"", # 试验用软件开发单位 "", # 试验用软件开发单位
"", # 试验用软件版本号 "", # 试验用软件版本号
"", # 试验现场照片 "", # 试验现场照片
"", # 测试人员姓名 "", # 测试人员姓名
"", # 测试人员单位 "", # 测试人员单位
"", # 测试人员电话 "", # 测试人员电话
"", # 装置运行人员 "", # 装置运行人员
"", # 第三方人员 "", # 第三方人员
"", # 第三方人员单位 "", # 第三方人员单位
"", # 第三方人员电话 "", # 第三方人员电话
"", # 其他需要说明的事项 "", # 其他需要说明的事项
"", # 是否采用铅铝屏蔽 "", # 是否采用铅铝屏蔽
"", # 剂量率 "", # 剂量率
"", # 总剂量 "", # 总剂量
"", # 剂量等效材料 "", # 剂量等效材料
"", # 试验对象编号 "", # 试验对象编号
"", # 测试参数名称 "", # 测试参数名称
"", # 测试参数单位 "", # 测试参数单位
"", # 测试参数结果 "", # 测试参数结果
"", # 是否为加速试验后数据 "", # 是否为加速试验后数据
"", # 是否为退火数据 "", # 是否为退火数据
"", # 退火温度 "", # 退火温度
"", # 退火时间 "", # 退火时间
"", # 原始数据 "", # 原始数据
"", # 数据处理方法 "", # 数据处理方法
"", # 其他需要说明的事项 "", # 其他需要说明的事项
] ]
# 写入 CSV 行 # 写入 CSV 行
csv_writer.writerow(csv_row) csv_writer.writerow(csv_row)
# 下载文件 # 下载文件
for file_name in file_links: for file_name in file_links:
file_name = file_name.strip() # 去除空格 file_name = file_name.strip() # 去除空格
if file_name: if file_name:
# 拼接文件完整 URL # 拼接文件完整 URL
file_url = file_name if file_name.startswith('http') else file_prefix + file_name file_url = file_name if file_name.startswith('http') else file_prefix + file_name
try: try:
# 下载文件 # 下载文件
file_response = requests.get(file_url) file_response = requests.get(file_url)
file_response.raise_for_status() file_response.raise_for_status()
# 保存文件 # 保存文件
file_path = os.path.join(part_number_dir, os.path.basename(file_url)) file_path = os.path.join(folder_path, os.path.basename(file_url))
with open(file_path, 'wb') as file: with open(file_path, 'wb') as file:
file.write(file_response.content) file.write(file_response.content)
print(f"NASA Download file: {file_path}") print(f"NASA Download file: {file_path}")
except requests.RequestException as e:
print(f"NASA Download file error : {file_url}error: {e}")
except requests.RequestException as e:
print(f"NASA Download file error : {file_url}error: {e}")
upload_ids = upload_to_minio(nasa1_connection, folder_path,'NASA')
device_type=row[6]
device_name=row[0]
device_mode=row[0]
device_function=row[1]
device_batch=None
manufacturer=row[2]
experiment_date=row[3]
data_db = [get_md5(row[0]), 'Crawler', datetime.now(), None, None, None,
device_type,
device_name, device_mode,
device_function,
device_batch, manufacturer,
experiment_date,
'NASA', None, None, ','.join(upload_ids)
]
save_to_db_import_record(nasa1_connection, data_db)
finally:
# 关闭游标和连接
nasa1_connection.close()
# 输出数据 # 输出数据
# for row in json_data['ROWS']: # for row in json_data['ROWS']:
# print(row) # print(row)

View File

@@ -1,7 +1,12 @@
import csv
import os
from datetime import datetime
import mysql.connector
import requests import requests
from lxml import html from lxml import html
import os
import csv from common import db_config, upload_to_minio, save_to_db_import_record
# 定义页面 URL # 定义页面 URL
url = 'https://radhome.gsfc.nasa.gov/radhome/papers/TIDPart.html' url = 'https://radhome.gsfc.nasa.gov/radhome/papers/TIDPart.html'
@@ -30,159 +35,183 @@ csv_header = [
"测试参数单位", "测试参数结果", "是否为加速试验后数据", "是否为退火数据", "测试参数单位", "测试参数结果", "是否为加速试验后数据", "是否为退火数据",
"退火温度", "退火时间", "原始数据", "数据处理方法", "其他需要说明的事项" "退火温度", "退火时间", "原始数据", "数据处理方法", "其他需要说明的事项"
] ]
def scrape(): def scrape():
# 发送请求 nasa2_connection = mysql.connector.connect(**db_config)
response = requests.get(url) try:
response.raise_for_status() # 发送请求
response = requests.get(url)
response.raise_for_status()
# 使用 lxml 解析 HTML # 使用 lxml 解析 HTML
tree = html.fromstring(response.text) tree = html.fromstring(response.text)
# 找到目标表格 # 找到目标表格
table = tree.xpath('/html/body/table[3]')[0] # 获取第一个表格 table = tree.xpath('/html/body/table[3]')[0] # 获取第一个表格
# 遍历表格行,跳过表头 # 遍历表格行,跳过表头
for row in table.xpath('.//tr')[1:]: # 跳过表头 for row in table.xpath('.//tr')[1:]: # 跳过表头
columns = row.xpath('.//td') columns = row.xpath('.//td')
if len(columns) < 8: if len(columns) < 8:
continue # 跳过不完整的行 continue # 跳过不完整的行
part_number = columns[2].text_content().strip() # 第三列 Part Number
file_link_tag = columns[7].xpath('//a') # 第八列的 <a> 标签
part_number = columns[2].text_content().strip() # 第三列 Part Number # 获取文件名和下载链接
file_link_tag = columns[7].xpath('//a') # 第八列的 <a> 标签 file_url = 'https://radhome.gsfc.nasa.gov/' + file_link_tag[0].get('href')
# 获取文件名和下载链接 # 创建目录
file_url = 'https://radhome.gsfc.nasa.gov/' + file_link_tag[0].get('href') folder_path = os.path.join('downloaded_files', part_number)
os.makedirs(folder_path, exist_ok=True)
# 创建目录 # 创建 CSV 文件
part_number_dir = os.path.join('downloaded_files', part_number) csv_file_path = os.path.join(folder_path, 'data.csv')
os.makedirs(part_number_dir, exist_ok=True) with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(csv_header) # 写入表头
# 创建 CSV 文件 # 填写 CSV 数据
csv_file_path = os.path.join(part_number_dir, 'data.csv') csv_row = [
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file: "", # 序号
csv_writer = csv.writer(csv_file) "", # 试验对象类型
csv_writer.writerow(csv_header) # 写入表头 columns[8].text_content().strip(), # 试验开始日期
"", # 试验结束日期
"", # 试验对象名称
columns[2].text_content().strip(), # 试验对象型号
"", # 试验对象数量
columns[4].text_content().strip(), # 试验性质
"", # 试验目的
"", # 装置名称
"", # 数据提供单位
"", # 试验委托单位
"", # 失效判据
"", # 失效数量
"", # 试验结果描述
"", # 成果
"", # 来源项目名称
"", # 来源项目类型
"", # 分类
"", # 元器件名称
"", # 元器件型号
"", # 元器件批号
columns[5].text_content().strip(), # 生产单位
"", # 是否国产
"", # 元器件成熟度
"", # 晶圆材料
"", # 晶圆批号
"", # 封装材料
"", # 封装技术
"", # 是否倒装
"", # 制造工艺
"", # 工艺特征尺寸
"", # 工艺平台
"", # 工艺代号
"", # 工艺版本
"", # 质量等级
"", # 加固措施
"", # 工作原理
"", # 供货能力
"", # 应用经历
"", # 规范手册
"", # 器件图片
"", # 电子系统分类
"", # 电子系统名称
"", # 电子系统型号
"", # 生产单位
"", # 电子系统功能
"", # 电子系统加固措施
"", # 电子系统图片
"", # 材料名称
"", # 材料型号
"", # 材料组分
"", # 材料用途
"", # 材料生产单位
"", # 材料物理结构
"", # 材料使用经历
"", # 辐照试验大纲
"", # 大纲审核专家类别
"", # 辐照试验所依据的标准规范
"", # 试验步骤(过程)描述
"", # 辐照过程是否加电
"", # 直流偏置条件描述
"", # 交流偏置条件描述
"", # 时钟频率
"", # 测试图形
"", # 其他偏置条件
"", # 辐照偏置原理图
"", # 测试方式
"", # 测试原理图
"", # 试验用仪器名称
"", # 试验用仪器型号
"", # 试验用仪器生产厂家
"", # 试验用仪器检定证书
"", # 试验用软件名称
"", # 试验用软件开发单位
"", # 试验用软件版本号
"", # 试验现场照片
"", # 测试人员姓名
"", # 测试人员单位
"", # 测试人员电话
"", # 装置运行人员
"", # 第三方人员
"", # 第三方人员单位
"", # 第三方人员电话
"", # 其他需要说明的事项
"", # 是否采用铅铝屏蔽
"", # 剂量率
"", # 总剂量
"", # 剂量等效材料
"", # 试验对象编号
"", # 测试参数名称
"", # 测试参数单位
"", # 测试参数结果
"", # 是否为加速试验后数据
"", # 是否为退火数据
"", # 退火温度
"", # 退火时间
"", # 原始数据
"", # 数据处理方法
"", # 其他需要说明的事项
]
# 写 CSV 数据 # 写 CSV
csv_row = [ csv_writer.writerow(csv_row)
"", # 序号
"", # 试验对象类型
columns[8].text_content().strip(), # 试验开始日期
"", # 试验结束日期
"", # 试验对象名称
columns[2].text_content().strip(), # 试验对象型号
"", # 试验对象数量
columns[4].text_content().strip(), # 试验性质
"", # 试验目的
"", # 装置名称
"", # 数据提供单位
"", # 试验委托单位
"", # 失效判据
"", # 失效数量
"", # 试验结果描述
"", # 成果
"", # 来源项目名称
"", # 来源项目类型
"", # 分类
"", # 元器件名称
"", # 元器件型号
"", # 元器件批号
columns[5].text_content().strip(), # 生产单位
"", # 是否国产
"", # 元器件成熟度
"", # 晶圆材料
"", # 晶圆批号
"", # 封装材料
"", # 封装技术
"", # 是否倒装
"", # 制造工艺
"", # 工艺特征尺寸
"", # 工艺平台
"", # 工艺代号
"", # 工艺版本
"", # 质量等级
"", # 加固措施
"", # 工作原理
"", # 供货能力
"", # 应用经历
"", # 规范手册
"", # 器件图片
"", # 电子系统分类
"", # 电子系统名称
"", # 电子系统型号
"", # 生产单位
"", # 电子系统功能
"", # 电子系统加固措施
"", # 电子系统图片
"", # 材料名称
"", # 材料型号
"", # 材料组分
"", # 材料用途
"", # 材料生产单位
"", # 材料物理结构
"", # 材料使用经历
"", # 辐照试验大纲
"", # 大纲审核专家类别
"", # 辐照试验所依据的标准规范
"", # 试验步骤(过程)描述
"", # 辐照过程是否加电
"", # 直流偏置条件描述
"", # 交流偏置条件描述
"", # 时钟频率
"", # 测试图形
"", # 其他偏置条件
"", # 辐照偏置原理图
"", # 测试方式
"", # 测试原理图
"", # 试验用仪器名称
"", # 试验用仪器型号
"", # 试验用仪器生产厂家
"", # 试验用仪器检定证书
"", # 试验用软件名称
"", # 试验用软件开发单位
"", # 试验用软件版本号
"", # 试验现场照片
"", # 测试人员姓名
"", # 测试人员单位
"", # 测试人员电话
"", # 装置运行人员
"", # 第三方人员
"", # 第三方人员单位
"", # 第三方人员电话
"", # 其他需要说明的事项
"", # 是否采用铅铝屏蔽
"", # 剂量率
"", # 总剂量
"", # 剂量等效材料
"", # 试验对象编号
"", # 测试参数名称
"", # 测试参数单位
"", # 测试参数结果
"", # 是否为加速试验后数据
"", # 是否为退火数据
"", # 退火温度
"", # 退火时间
"", # 原始数据
"", # 数据处理方法
"", # 其他需要说明的事项
]
# 写入 CSV 行 try:
csv_writer.writerow(csv_row) # 下载文件
file_response = requests.get(file_url)
try: file_response.raise_for_status()
# 下载文件
file_response = requests.get(file_url)
file_response.raise_for_status()
# 保存文件
file_path = os.path.join(part_number_dir, os.path.basename(file_url))
with open(file_path, 'wb') as file:
file.write(file_response.content)
print(f"NASA2 Download file: {file_path}")
except requests.RequestException as e:
print(f"NASA2 Download file error: {file_url}error: {e}")
# 保存文件
file_path = os.path.join(folder_path, os.path.basename(file_url))
with open(file_path, 'wb') as file:
file.write(file_response.content)
print(f"NASA2 Download file: {file_path}")
except requests.RequestException as e:
print(f"NASA2 Download file error: {file_url}error: {e}")
upload_ids = upload_to_minio(nasa2_connection, folder_path, 'NASA')
device_type = columns[4].text_content().strip()
device_name = columns[2].text_content().strip()
device_mode = columns[2].text_content().strip()
device_function = columns[3].text_content().strip()
device_batch = None
manufacturer = None
experiment_date = columns[8].text_content().strip()
data_db = ['NASA-' + columns[0].text_content().strip(), 'Crawler', datetime.now(), None, None, None,
device_type,
device_name, device_mode,
device_function,
device_batch, manufacturer,
experiment_date,
'NASA', None, None, ','.join(upload_ids)
]
save_to_db_import_record(nasa2_connection, data_db)
except Exception as e:
print(f"error: {e}")
finally:
# 关闭游标和连接
nasa2_connection.close()
# 输出数据 # 输出数据
# for row in table.xpath('.//tr')[1:]: # for row in table.xpath('.//tr')[1:]:
# columns = row.xpath('.//td') # columns = row.xpath('.//td')