Files
physical-boot/scripts/common.py
2025-11-29 13:04:09 +08:00

110 lines
3.9 KiB
Python

import hashlib
import os
from datetime import datetime
from datetime import timedelta
import mysql.connector
import pandas as pd
from minio import Minio
from minio.error import S3Error
minio_public_url = '/oss/'
# MySQL 连接配置
db_config = {
'host': '127.0.0.1',
'port': 3306,
# 'host': '192.168.50.100',
# 'port': 23306,
'user': 'root',
'password': '123456',
'database': 'physical-boot'
}
# minio 配置
minio_client = Minio(
"47.102.126.67:29000", # MinIO服务器地址或IP
# "192.168.50.100:29000", # MinIO服务器地址或IP
access_key="root", # 替换为你的Access Key
secret_key="FeF4qA6uQBHeRd", # 替换为你的Secret Key
secure=False # 如果使用的是http则为False
)
bucket_name = 'physical2'
def get_md5(input_string):
# 创建MD5对象
md5_obj = hashlib.md5()
# 更新对象,注意字符串需要编码为字节
md5_obj.update(input_string.encode('utf-8'))
# 返回MD5值的十六进制字符串
return md5_obj.hexdigest()
def fetch_db_import_record(connection,data):
cursor = connection.cursor()
try:
"""保存数据到 MySQL 数据库"""
insert_query = """select count(1) from `nasa_data_record` where `id`= %s;"""
cursor.execute(insert_query, data)
result = cursor.fetchone()
return result[0]
except Exception as e:
print(e)
finally:
cursor.close()
def save_to_db_import_record(connection,data):
cursor = connection.cursor()
try:
"""保存数据到 MySQL 数据库"""
insert_query = """INSERT INTO `nasa_data_record` (`id`, `create_by`, `create_time`, `update_by`, `update_time`, `sys_org_code`, `device_type`, `device_name`, `device_mode`, `device_function`, `device_batch`, `manufacturer`, `experiment_date`, `data_source`, `experiment_user`, `total_count_nasa`, `file_list`,`origin_data`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""
cursor.execute(insert_query, data)
connection.commit()
except Exception as e:
print(e)
finally:
cursor.close()
def save_to_db_oss_file(connection,data):
cursor = connection.cursor()
try:
"""保存数据到 MySQL 数据库"""
insert_query = """INSERT INTO `oss_file` (`id`,`file_name`,`url`,`create_by`,`create_time` )
VALUES (%s, %s, %s, %s, %s);"""
cursor.execute(insert_query, data)
connection.commit()
except Exception as e:
print(e)
finally:
cursor.close()
def upload_to_minio(connection,folder_path,type):
if not minio_client.bucket_exists(bucket_name):
minio_client.make_bucket(bucket_name)
folder_name = os.path.basename(folder_path)
# 遍历文件夹中的所有文件,并上传
file_ids = []
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
# 检查是否是文件,忽略非文件类型
if os.path.isfile(file_path):
object_name = f"{type}/{folder_name}/{file_name}"
try:
# 上传文件到 MinIO
# url=minio_client.fput_object(bucket_name, object_name, file_path)
print(f"已上传: {file_path} -> {bucket_name}/{object_name}")
# file_id = get_md5(object_name)
file_ids.append({file_name:os.path.join(minio_public_url,bucket_name,object_name)})
# db_file = [file_id, file_name,
# minio_public_url + bucket_name + '/' + object_name, 'admin', datetime.now()]
# save_to_db_oss_file(connection,db_file)
# expires = timedelta(days=1)
# presigned_url = minio_client.presigned_get_object(bucket_name, object_name, expires=expires)
# print(f"Presigned URL for {object_name}: {presigned_url}")
except S3Error as err:
print(f"上传 {file_name} 时出错: {err}")
return file_ids