掘金 人工智能 前天 14:02
LabelStudio使用阿里云OSS教程
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了一种优化Label Studio对接阿里云OSS的方案。原方案通过FUSE挂载OSS,存在串行处理慢且不稳定的问题。新方案通过修改Label Studio源码,原生支持阿里云OSS(S3协议兼容),移除不必要的会话令牌,直接生成OSS预签名链接,实现浏览器并行访问,显著提高访问速度并减轻服务器压力。性能提升超过100倍。文章还提供了详细的操作教程,包括修改源码、配置跨域访问以及在Label Studio中配置OSS。

🚀 **问题背景**: Label Studio官方不支持通过AWS S3类型挂载阿里云OSS,社区通用的OSS挂载方案通过FUSE实现,存在串行处理慢且不稳定的问题。

🧠 **解决方案**: 通过修改Label Studio源码`s3/utils.py`,原生支持阿里云OSS(S3协议兼容)。移除boto3 session相关配置,保留access_key、secret_key、endpoint_url,避免不必要的会话令牌使用。

🔑 **关键修改点**: 修改`get_client_and_resource`和`resolve_s3_url`函数,添加`endpoint_url`支持,精简认证字段,去除`aws_session_token`,同时保持对AWS S3标准功能的兼容。使用`docker cp`命令将修改后的`utils.py`文件拷贝到容器中替换原始代码。

✨ **实际效果**: 页面秒级加载大量文件,无需等待服务器串行处理。支持Label Studio直接挂载OSS Bucket,实现数据标注任务的高效预览与处理。

⚙️ **操作步骤**: 首先找到`util.py`文件并保存到Docker服务所在主机,然后使用`docker cp`命令替换容器内的文件,并重启容器。最后,按照阿里云官方文档配置OSS Bucket的CORS规则,确保浏览器访问预签名URL不出现跨域问题。

🚀 Label Studio 支持阿里云 OSS 访问的优化方案

✅ 背景问题

🧠 解决方案概述

数据集访问路径前后对比

    OSS磁盘挂载方案的(串行、且易崩溃):
      拉取阿里云OSS文件列表->web服务器->用户浏览器
    本方案(并行,OSS超低延迟):
      web服务器->用户浏览器OSS服务器->用户浏览器

🔧 修改内容关键点

✨ 实际效果

🔒 实操教程

如果是其它方案部署,按网上教程搜索到服务使用的源码路径,并进行文件替换即可

    到文章底部找到util.py文件并保存到Docker服务所在主机,主机执行以下命令,注意按照自己的容器名称修改命令
docker cp ./utils.py  你的label容器名:/label-studio/label_studio/io_storages/s3/utils.pydocker restart 你的label容器名

跨域访问配置

参考阿里云文档

跨域设置详情

跨域设置截图

LabelStudio中配置OSS

utils.py代码

仅修改了get_client_and_resource函数的创建client resource的逻辑

"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license."""import base64import fnmatchimport loggingimport refrom urllib.parse import urlparseimport boto3from botocore.config import Configfrom botocore.exceptions import ClientErrorfrom django.conf import settingsfrom tldextract import TLDExtractlogger = logging.getLogger(__name__)def get_client_and_resource(    aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, region_name=None, s3_endpoint=None):    aws_access_key_id = aws_access_key_id    aws_secret_access_key = aws_secret_access_key    aws_session_token = aws_session_token    logger.debug(        f'Create boto3 session with '        f'access key id={aws_access_key_id}, '        f'secret key={aws_secret_access_key[:4] + "..." if aws_secret_access_key else None}, '        f'session token={aws_session_token}'    )    s3 = boto3.client(        's3',        aws_access_key_id=aws_access_key_id,        aws_secret_access_key=aws_secret_access_key,        endpoint_url=s3_endpoint,        config=Config(s3={"addressing_style": "virtual"},                      signature_version='v4'))    res = boto3.resource(        's3',        aws_access_key_id=aws_access_key_id,        aws_secret_access_key=aws_secret_access_key,        endpoint_url=s3_endpoint,        config=Config(s3={"addressing_style": "virtual"},                      signature_version='v4'))    return s3, resdef resolve_s3_url(url, client, presign=True, expires_in=3600):    r = urlparse(url, allow_fragments=False)    bucket_name = r.netloc    key = r.path.lstrip('/')    # Return blob as base64 encoded string if presigned urls are disabled    if not presign:        object = client.get_object(Bucket=bucket_name, Key=key)        content_type = object['ResponseMetadata']['HTTPHeaders']['content-type']        object_b64 = 'data:' + content_type + ';base64,' + base64.b64encode(object['Body'].read()).decode('utf-8')        return object_b64    # Otherwise try to generate presigned url    try:        presigned_url = client.generate_presigned_url(            ClientMethod='get_object', Params={'Bucket': bucket_name, 'Key': key}, ExpiresIn=expires_in        )    except ClientError as exc:        logger.warning(f"Can't generate presigned URL. Reason: {exc}")        return url    else:        logger.debug('Presigned URL {presigned_url} generated for {url}'.format(presigned_url=presigned_url, url=url))        return presigned_urlclass AWS(object):    @classmethod    def get_blob_metadata(        cls,        url: str,        bucket_name: str,        client=None,        aws_access_key_id=None,        aws_secret_access_key=None,        aws_session_token=None,        region_name=None,        s3_endpoint=None,    ):        """        Get blob metadata by url        :param url: Object key        :param bucket_name: AWS bucket name        :param client: AWS client for batch processing        :param account_key: Azure account key        :return: Object metadata dict("name": "value")        """        if client is None:            client, _ = get_client_and_resource(                aws_access_key_id=aws_access_key_id,                aws_secret_access_key=aws_secret_access_key,                aws_session_token=aws_session_token,                region_name=region_name,                s3_endpoint=s3_endpoint,            )        object = client.get_object(Bucket=bucket_name, Key=url)        metadata = dict(object)        # remove unused fields        metadata.pop('Body', None)        metadata.pop('ResponseMetadata', None)        return metadata    @classmethod    def validate_pattern(cls, storage, pattern, glob_pattern=True):        """        Validate pattern against S3 Storage        :param storage: S3 Storage instance        :param pattern: Pattern to validate        :param glob_pattern: If True, pattern is a glob pattern, otherwise it is a regex pattern        :return: Message if pattern is not valid, empty string otherwise        """        client, bucket = storage.get_client_and_bucket()        if glob_pattern:            pattern = fnmatch.translate(pattern)        regex = re.compile(pattern)        if storage.prefix:            list_kwargs = {'Prefix': storage.prefix.rstrip('/') + '/'}            if not storage.recursive_scan:                list_kwargs['Delimiter'] = '/'            bucket_iter = bucket.objects.filter(**list_kwargs)        else:            bucket_iter = bucket.objects        bucket_iter = bucket_iter.page_size(settings.CLOUD_STORAGE_CHECK_FOR_RECORDS_PAGE_SIZE).all()        for index, obj in enumerate(bucket_iter):            key = obj.key            # skip directories            if key.endswith('/'):                logger.debug(key + ' is skipped because it is a folder')                continue            if regex and regex.match(key):                logger.debug(key + ' matches file pattern')                return ''        return 'No objects found matching the provided glob pattern'class S3StorageError(Exception):    pass# see https://github.com/john-kurkowski/tldextract?tab=readme-ov-file#note-about-caching# prevents network call on first useextractor = TLDExtract(suffix_list_urls=())def catch_and_reraise_from_none(func):    """    For S3 storages - if s3_endpoint is not on a known domain, catch exception and    raise a new one with the previous context suppressed. See also: https://peps.python.org/pep-0409/    """    def wrapper(self, *args, **kwargs):        try:            return func(self, *args, **kwargs)        except Exception as e:            if self.s3_endpoint and (                domain := extractor.extract_urllib(urlparse(self.s3_endpoint)).registered_domain.lower()            ) not in [trusted_domain.lower() for trusted_domain in settings.S3_TRUSTED_STORAGE_DOMAINS]:                logger.error(f'Exception from unrecognized S3 domain: {e}', exc_info=True)                raise S3StorageError(                    f'Debugging info is not available for s3 endpoints on domain: {domain}. '                    'Please contact your Label Studio devops team if you require detailed error reporting for this domain.'                ) from None            else:                raise e    return wrapper

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Label Studio 阿里云OSS 数据标注 性能优化
相关文章