130 lines
3.5 KiB
Python
130 lines
3.5 KiB
Python
import os
|
||
from datetime import timedelta
|
||
import mimetypes
|
||
from urllib.parse import urlparse
|
||
|
||
from minio import Minio
|
||
from minio.error import S3Error
|
||
|
||
|
||
def _env_bool(name: str, default: bool = False) -> bool:
|
||
v = os.environ.get(name)
|
||
if v is None:
|
||
return default
|
||
return str(v).strip().lower() in {'1', 'true', 'yes', 'y', 'on'}
|
||
|
||
|
||
def _normalize_endpoint(minio_url: str):
|
||
if not minio_url:
|
||
return None, None
|
||
|
||
u = str(minio_url).strip()
|
||
parsed = urlparse(u)
|
||
if parsed.scheme in {'http', 'https'}:
|
||
endpoint = parsed.netloc
|
||
secure = parsed.scheme == 'https'
|
||
else:
|
||
endpoint = u
|
||
secure = None
|
||
|
||
endpoint = endpoint.strip().rstrip('/')
|
||
return endpoint, secure
|
||
|
||
|
||
def _get_env(*names: str, default: str | None = None) -> str | None:
|
||
for n in names:
|
||
v = os.environ.get(n)
|
||
if v is not None and str(v).strip() != '':
|
||
return str(v).strip()
|
||
return default
|
||
|
||
|
||
def get_minio_client() -> Minio | None:
|
||
minio_url = _get_env('MINIO_URL', 'MINIO_ENDPOINT')
|
||
access_key = _get_env('MINIO_ACCESS_KEY')
|
||
secret_key = _get_env('MINIO_SECRET_KEY')
|
||
|
||
if not minio_url or not access_key or not secret_key:
|
||
return None
|
||
|
||
endpoint, secure_from_url = _normalize_endpoint(minio_url)
|
||
if not endpoint:
|
||
return None
|
||
|
||
secure = _env_bool('MINIO_SECURE', default=secure_from_url if secure_from_url is not None else False)
|
||
region = _get_env('MINIO_REGION', default=None)
|
||
|
||
return Minio(
|
||
endpoint=endpoint,
|
||
access_key=access_key,
|
||
secret_key=secret_key,
|
||
secure=secure,
|
||
region=region,
|
||
)
|
||
|
||
|
||
def get_bucket_name() -> str:
|
||
return _get_env('MINIO_BUCKET', default='achievement') or 'achievement'
|
||
|
||
|
||
def ensure_bucket_exists() -> bool:
|
||
client = get_minio_client()
|
||
bucket = get_bucket_name()
|
||
if client is None:
|
||
print('ℹ️ MinIO 环境变量未配置,跳过桶检查')
|
||
return False
|
||
|
||
if not bucket:
|
||
print('ℹ️ MINIO_BUCKET 为空,跳过桶检查')
|
||
return False
|
||
|
||
try:
|
||
exists = client.bucket_exists(bucket)
|
||
except S3Error as e:
|
||
print(f'❌ MinIO 连接失败: {e}')
|
||
return False
|
||
|
||
if exists:
|
||
print(f'ℹ️ MinIO 桶已存在: {bucket}')
|
||
return True
|
||
|
||
try:
|
||
region = _get_env('MINIO_REGION', default=None)
|
||
if region:
|
||
client.make_bucket(bucket, location=region)
|
||
else:
|
||
client.make_bucket(bucket)
|
||
print(f'✅ MinIO 桶已创建: {bucket}')
|
||
return True
|
||
except S3Error as e:
|
||
print(f'❌ MinIO 创建桶失败: {e}')
|
||
return False
|
||
|
||
|
||
def upload_file(file_path: str, object_name: str, content_type: str | None = None) -> str:
|
||
client = get_minio_client()
|
||
if client is None:
|
||
raise RuntimeError('MinIO 未配置')
|
||
|
||
bucket = get_bucket_name()
|
||
ensure_bucket_exists()
|
||
|
||
ct = content_type
|
||
if not ct:
|
||
guessed, _ = mimetypes.guess_type(object_name)
|
||
ct = guessed or 'application/octet-stream'
|
||
|
||
client.fput_object(bucket, object_name, file_path, content_type=ct)
|
||
return object_name
|
||
|
||
|
||
def presigned_get_url(object_name: str, expires_seconds: int = 8 * 60 * 60) -> str:
|
||
client = get_minio_client()
|
||
if client is None:
|
||
raise RuntimeError('MinIO 未配置')
|
||
|
||
bucket = get_bucket_name()
|
||
ensure_bucket_exists()
|
||
exp = max(1, int(expires_seconds or 0))
|
||
return client.presigned_get_object(bucket, object_name, expires=timedelta(seconds=exp))
|