import hashlib import os from datetime import datetime from datetime import timedelta import mysql.connector import pandas as pd from minio import Minio from minio.error import S3Error minio_public_url = '/oss/' # MySQL 连接配置 db_config = { 'host': '127.0.0.1', 'port': 3306, # 'host': '192.168.50.100', # 'port': 23306, 'user': 'root', 'password': '123456', 'database': 'physical-boot' } # minio 配置 minio_client = Minio( "47.102.126.67:29000", # MinIO服务器地址或IP # "192.168.50.100:29000", # MinIO服务器地址或IP access_key="root", # 替换为你的Access Key secret_key="FeF4qA6uQBHeRd", # 替换为你的Secret Key secure=False # 如果使用的是http则为False ) bucket_name = 'physical2' def get_md5(input_string): # 创建MD5对象 md5_obj = hashlib.md5() # 更新对象,注意字符串需要编码为字节 md5_obj.update(input_string.encode('utf-8')) # 返回MD5值的十六进制字符串 return md5_obj.hexdigest() def fetch_db_import_record(connection,data): cursor = connection.cursor() try: """保存数据到 MySQL 数据库""" insert_query = """select count(1) from `nasa_data_record` where `id`= %s;""" cursor.execute(insert_query, data) result = cursor.fetchone() return result[0] except Exception as e: print(e) finally: cursor.close() def save_to_db_import_record(connection,data): cursor = connection.cursor() try: """保存数据到 MySQL 数据库""" insert_query = """INSERT INTO `nasa_data_record` (`id`, `create_by`, `create_time`, `update_by`, `update_time`, `sys_org_code`, `device_type`, `device_name`, `device_mode`, `device_function`, `device_batch`, `manufacturer`, `experiment_date`, `data_source`, `experiment_user`, `total_count_nasa`, `file_list`,`origin_data`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);""" cursor.execute(insert_query, data) connection.commit() except Exception as e: print(e) finally: cursor.close() def save_to_db_oss_file(connection,data): cursor = connection.cursor() try: """保存数据到 MySQL 数据库""" insert_query = """INSERT INTO `oss_file` (`id`,`file_name`,`url`,`create_by`,`create_time` ) VALUES (%s, %s, %s, %s, %s);""" cursor.execute(insert_query, data) connection.commit() except Exception as e: print(e) finally: cursor.close() def upload_to_minio(connection,folder_path,type): if not minio_client.bucket_exists(bucket_name): minio_client.make_bucket(bucket_name) folder_name = os.path.basename(folder_path) # 遍历文件夹中的所有文件,并上传 file_ids = [] for file_name in os.listdir(folder_path): file_path = os.path.join(folder_path, file_name) # 检查是否是文件,忽略非文件类型 if os.path.isfile(file_path): object_name = f"{type}/{folder_name}/{file_name}" try: # 上传文件到 MinIO # url=minio_client.fput_object(bucket_name, object_name, file_path) print(f"已上传: {file_path} -> {bucket_name}/{object_name}") # file_id = get_md5(object_name) file_ids.append({file_name:os.path.join(minio_public_url,bucket_name,object_name)}) # db_file = [file_id, file_name, # minio_public_url + bucket_name + '/' + object_name, 'admin', datetime.now()] # save_to_db_oss_file(connection,db_file) # expires = timedelta(days=1) # presigned_url = minio_client.presigned_get_object(bucket_name, object_name, expires=expires) # print(f"Presigned URL for {object_name}: {presigned_url}") except S3Error as err: print(f"上传 {file_name} 时出错: {err}") return file_ids