亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使 Boto3 上傳調用阻塞(單線程)

使 Boto3 上傳調用阻塞(單線程)

慕桂英546537 2022-01-18 17:47:48
編輯:我最初的假設被證明部分錯誤。我在這里添加了一個冗長的答案,我邀請其他人對其進行壓力測試和糾正。我正在尋找一種以單線程方式利用 Boto3 S3 API 來模擬線程安全鍵值存儲的方法。簡而言之,我想使用調用線程而不是新線程來進行上傳。.upload_fileobj()據我所知, Boto3(或)中方法的默認行為.upload_file()是將任務啟動到新線程并None立即返回。從文檔:這是一種托管傳輸,如有必要,它將在多個線程中執行分段上傳。(如果我對此的理解首先是錯誤的,那么對此進行更正也會有所幫助。這是在 Boto3 1.9.134 中。)>>> import io>>> import boto3>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')>>> buf = io.BytesIO(b"test")>>> res = bucket.upload_fileobj(buf, 'testobj')>>> res is NoneTrue現在,假設這buf不是一個短的 4 字節字符串,而是一個巨大的文本 blob,它將花費不可忽略的時間來完全上傳。我還使用此函數來檢查具有給定鍵的對象是否存在:def key_exists_in_bucket(bucket_obj, key: str) -> bool:    try:        bucket_obj.Object(key).load()    except botocore.exceptions.ClientError:        return False    else:        return True如果對象按名稱存在,我的意圖是不重寫該對象。這里的競爭條件相當明顯:異步啟動上傳,然后快速檢查key_exists_in_bucket(),False如果對象仍在寫入,則返回,然后不必要地再次寫入它。有沒有辦法確保bucket.upload_fileobj()由當前線程而不是在該方法范圍內創建的新線程調用?我意識到這會減慢速度。在這種情況下,我愿意犧牲速度。
查看完整描述

3 回答

?
滄海一幻覺

TA貢獻1824條經驗 獲得超5個贊

我認為,由于這個問題的答案和另一個類似問題的答案似乎直接沖突,所以最好直接使用pdb.


概括

boto3 默認情況下使用多個線程 (10)

但是,它不是異步的,因為它在返回之前等待(加入)這些線程,而不是使用“即發即棄”技術

因此,以這種方式,如果您嘗試與來自多個客戶端的 s3 存儲桶通信,則讀/寫線程安全性就位。

細節

我在這里努力解決的一個方面是多個(子線程)并不意味著頂級方法本身是非阻塞的:如果調用線程開始上傳到多個子線程,然后等待這些線程完成并返回,我敢說這仍然是一個阻塞電話。反過來asyncio說,如果方法調用是一個“即發即棄”的調用。使用threading,這實際上歸結為是否x.join()曾經被調用過。


這是取自 Victor Val 的初始代碼,用于啟動調試器:


import io

import pdb


import boto3


# From dd if=/dev/zero of=100mb.txt  bs=50M  count=1

buf = io.BytesIO(open('100mb.txt', 'rb').read())

bucket = boto3.resource('s3').Bucket('test-threads')

pdb.run("bucket.upload_fileobj(buf, '100mb')")

此堆棧幀來自 Boto 1.9.134。


現在跳入pdb:


.upload_fileobj() 首先調用一個嵌套方法——還沒有太多可看的。


(Pdb) s

--Call--

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()

-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,

(Pdb) s


(Pdb) l

574     

575         :type Config: boto3.s3.transfer.TransferConfig

576         :param Config: The transfer configuration to be used when performing the

577             upload.

578         """

579  ->     return self.meta.client.upload_fileobj(

580             Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,

581             Callback=Callback, Config=Config)

582     

583     

584  

所以頂級方法確實返回了一些東西,但目前還不清楚那個東西最終會變成什么None。


所以我們進入了那個。


現在,.upload_fileobj()確實有一個config參數,默認情況下是 None :


(Pdb) l 531

526     

527         subscribers = None

528         if Callback is not None:

529             subscribers = [ProgressCallbackInvoker(Callback)]

530     

531         config = Config

532         if config is None:

533             config = TransferConfig()

534     

535         with create_transfer_manager(self, config) as manager:

536             future = manager.upload(

這意味著config成為默認值TransferConfig():


use_threads-- 如果為 True,則執行 S3 傳輸時將使用線程。如果為 False,則不會使用線程來執行傳輸:所有邏輯都將在主線程中運行。

max_concurrency-- 請求執行傳輸的最大線程數。如果 use_threads 設置為 False,則忽略提供的值,因為傳輸只會使用主線程。

哇啦,他們在這里:


(Pdb) unt 534

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()

-> with create_transfer_manager(self, config) as manager:

(Pdb) config

<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>

(Pdb) config.use_threads

True

(Pdb) config.max_concurrency

10

現在我們在調用堆棧中下降一個級別以使用TransferManager(上下文管理器)。此時,max_concurrency已被用作類似名稱的參數max_request_concurrency:


# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223


    # The executor responsible for making S3 API transfer requests

    self._request_executor = BoundedExecutor(

        max_size=self._config.max_request_queue_size,

        max_num_threads=self._config.max_request_concurrency,

        tag_semaphores={

            IN_MEMORY_UPLOAD_TAG: TaskSemaphore(

                self._config.max_in_memory_upload_chunks),

            IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(

                self._config.max_in_memory_download_chunks)

        },

        executor_cls=executor_cls

    )

至少在這個 boto3 版本中,該類來自單獨的庫s3transfer。


(Pdb) n

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()

-> future = manager.upload(

(Pdb) manager

<s3transfer.manager.TransferManager object at 0x7f178db437f0>

(Pdb) manager._config

<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>

(Pdb) manager._config.use_threads

True

(Pdb) manager._config.max_concurrency

10

接下來,讓我們進入manager.upload(). 這是該方法的全文:


(Pdb) l 290, 303

290  ->         if extra_args is None:

291                 extra_args = {}

292             if subscribers is None:

293                 subscribers = []

294             self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)

295             call_args = CallArgs(

296                 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,

297                 subscribers=subscribers

298             )

299             extra_main_kwargs = {}

300             if self._bandwidth_limiter:

301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter

302             return self._submit_transfer(

303                 call_args, UploadSubmissionTask, extra_main_kwargs)


(Pdb) unt 301

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()

-> return self._submit_transfer(

(Pdb) extra_main_kwargs

{}


(Pdb) UploadSubmissionTask

<class 's3transfer.upload.UploadSubmissionTask'>

(Pdb) call_args

<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>


(Pdb) l 300, 5

300             if self._bandwidth_limiter:

301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter

302  ->         return self._submit_transfer(

303                 call_args, UploadSubmissionTask, extra_main_kwargs)

304     

305         def download(self, bucket, key, fileobj, extra_args=None,

啊,太可愛了——所以我們至少需要再往下一層才能看到實際的底層上傳。


(Pdb) s

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()

-> call_args, UploadSubmissionTask, extra_main_kwargs)

(Pdb) s

--Call--

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()

-> def _submit_transfer(self, call_args, submission_task_cls,

(Pdb) s

> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()

-> if not extra_main_kwargs:


(Pdb) l 440, 10

440  ->         if not extra_main_kwargs:

441                 extra_main_kwargs = {}

442     

443             # Create a TransferFuture to return back to the user

444             transfer_future, components = self._get_future_with_components(

445                 call_args)

446     

447             # Add any provided done callbacks to the created transfer future

448             # to be invoked on the transfer future being complete.

449             for callback in get_callbacks(transfer_future, 'done'):

450                 components['coordinator'].add_done_callback(callback)

好的,所以現在我們有一個TransferFuture, 定義在沒有明確的證據表明線程已經被啟動了,但是當涉及到期貨s3transfer/futures.py 時,它肯定聽起來像這樣。


(Pdb) l

444             transfer_future, components = self._get_future_with_components(

445                 call_args)

446     

447             # Add any provided done callbacks to the created transfer future

448             # to be invoked on the transfer future being complete.

449  ->         for callback in get_callbacks(transfer_future, 'done'):

450                 components['coordinator'].add_done_callback(callback)

451     

452             # Get the main kwargs needed to instantiate the submission task

453             main_kwargs = self._get_submission_task_main_kwargs(

454                 transfer_future, extra_main_kwargs)

(Pdb) transfer_future

<s3transfer.futures.TransferFuture object at 0x7f178db5a780>

下面的最后一行來自TransferCoordinator課堂,乍一看似乎很重要:


class TransferCoordinator(object):

    """A helper class for managing TransferFuture"""

    def __init__(self, transfer_id=None):

        self.transfer_id = transfer_id

        self._status = 'not-started'

        self._result = None

        self._exception = None

        self._associated_futures = set()

        self._failure_cleanups = []

        self._done_callbacks = []

        self._done_event = threading.Event()  # < ------ !!!!!!

您通常會看到threading.Event 一個線程用于發出事件狀態的信號,而其他線程可以等待該事件發生。


TransferCoordinator是由 .使用的TransferFuture.result()。


好的,從上面循環回來,我們現在在s3transfer.futures.BoundedExecutor它的max_num_threads屬性:


class BoundedExecutor(object):

    EXECUTOR_CLS = futures.ThreadPoolExecutor

    # ...

    def __init__(self, max_size, max_num_threads, tag_semaphores=None,

                 executor_cls=None):

    self._max_num_threads = max_num_threads

    if executor_cls is None:

        executor_cls = self.EXECUTOR_CLS

    self._executor = executor_cls(max_workers=self._max_num_threads)

這基本上相當于:


from concurrent import futures


_executor = futures.ThreadPoolExecutor(max_workers=10)

但是仍然存在一個問題:這是一種“即發即棄”,還是調用實際上是在等待線程完成并返回?


似乎是后者。 .result()來電self._done_event.wait(MAXINT)。


# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249


def result(self):

    self._done_event.wait(MAXINT)


    # Once done waiting, raise an exception if present or return the

    # final result.

    if self._exception:

        raise self._exception

    return self._result

最后,重新運行 Victor Val 的測試,這似乎證實了上述內容:


>>> import boto3

>>> import time

>>> import io

>>> 

>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())

>>> 

>>> bucket = boto3.resource('s3').Bucket('test-threads')

>>> start = time.time()

>>> print("starting to upload...")

starting to upload...

>>> bucket.upload_fileobj(buf, '100mb')

>>> print("finished uploading")

finished uploading

>>> end = time.time()

>>> print("time: {}".format(end-start))

time: 2.6030001640319824

(此示例在網絡優化實例上運行時,此執行時間可能更短。但 2.5 秒仍然是一個明顯的大塊時間,并且根本不表示線程被啟動并且沒有等待。)


最后,這是一個Callbackfor的示例.upload_fileobj()。它遵循文檔中的示例。


首先,一個小幫手可以有效地獲取緩沖區的大?。?/p>


def get_bufsize(buf, chunk=1024) -> int:

    start = buf.tell()

    try:

        size = 0 

        while True: 

            out = buf.read(chunk) 

            if out: 

                size += chunk 

            else: 

                break

        return size

    finally:

        buf.seek(start)

類本身:


import os

import sys

import threading

import time


class ProgressPercentage(object):

    def __init__(self, filename, buf):

        self._filename = filename

        self._size = float(get_bufsize(buf))

        self._seen_so_far = 0

        self._lock = threading.Lock()

        self.start = None


    def __call__(self, bytes_amount):

        with self._lock:

            if not self.start:

                self.start = time.monotonic()

            self._seen_so_far += bytes_amount

            percentage = (self._seen_so_far / self._size) * 100

            sys.stdout.write(

                "\r%s  %s of %s  (%.2f%% done, %.2fs elapsed\n" % (

                    self._filename, self._seen_so_far, self._size,

                    percentage, time.monotonic() - self.start))

            # Use sys.stdout.flush() to update on one line

            # sys.stdout.flush()

例子:


In [19]: import io 

    ...:  

    ...: from boto3.session import Session 

    ...:  

    ...: s3 = Session().resource("s3") 

    ...: bucket = s3.Bucket("test-threads") 

    ...: buf = io.BytesIO(open('100mb.txt', 'rb').read()) 

    ...:  

    ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))                                                                                                                                                                      

mykey  262144 of 104857600.0  (0.25% done, 0.00s elapsed

mykey  524288 of 104857600.0  (0.50% done, 0.00s elapsed

mykey  786432 of 104857600.0  (0.75% done, 0.01s elapsed

mykey  1048576 of 104857600.0  (1.00% done, 0.01s elapsed

mykey  1310720 of 104857600.0  (1.25% done, 0.01s elapsed

mykey  1572864 of 104857600.0  (1.50% done, 0.02s elapsed


查看完整回答
反對 回復 2022-01-18
?
吃雞游戲

TA貢獻1829條經驗 獲得超7個贊

upload_fileobj接受一個 Config 參數。這是一個boto3.s3.transfer.TransferConfig對象,它又具有一個名為use_threads(默認為 true)的參數 - 如果為 True,則執行 S3 傳輸時將使用線程。如果為 False,則不會使用線程來執行傳輸:所有邏輯都將在主線程中運行。

希望這對你有用。


查看完整回答
反對 回復 2022-01-18
?
慕尼黑8549860

TA貢獻1818條經驗 獲得超11個贊

測試該方法是否阻塞:

我自己根據經驗測試了這種行為。首先,我生成了一個 100MB 的文件:


dd if=/dev/zero of=100mb.txt  bs=100M  count=1

然后我嘗試以與您相同的方式上傳文件并測量所花費的時間:


import boto3

import time

import io

file = open('100mb.txt', 'rb')

buf = io.BytesIO(file.read())

bucket = boto3.resource('s3').Bucket('testbucket')

start = time.time()

print("starting to upload...")

bucket.upload_fileobj(buf, '100mb')

print("finished uploading")

end = time.time()

print("time: {}".format(end-start))

upload_fileobj() 方法完成并讀取下一個 python 行(1gb 文件需要 50 秒)需要 8 秒以上,所以我假設這個方法是阻塞的。


使用線程測試:


使用多個線程時,即使使用選項 use_threads=False ,我也可以驗證該方法是否同時支持多個傳輸。我開始上傳一個 200mb 的文件,然后是一個 100mb 的文件,然后 100mb 的文件首先完成。這證實了TransferConfig中的并發與多部分傳輸有關。


代碼:


import boto3

import time

import io

from boto3.s3.transfer import TransferConfig

import threading


config = TransferConfig(use_threads=False)


bucket = boto3.resource('s3').Bucket('testbucket')

def upload(filename):

     file = open(filename, 'rb')

     buf = io.BytesIO(file.read())

     start = time.time()

     print("starting to upload file {}".format(filename))

     bucket.upload_fileobj(buf,filename,Config=config)

     end = time.time()

     print("finished uploading file {}. time: {}".format(filename,end-start))

x1 = threading.Thread(target=upload, args=('200mb.txt',))

x2 = threading.Thread(target=upload, args=('100mb.txt',))

x1.start()

time.sleep(2)

x2.start()

輸出:


開始上傳文件 200mb.txt

開始上傳文件 100mb.txt

完成上傳文件 100mb.txt。時間:46.35254502296448

完成上傳文件200mb.txt。時間:61.70564889907837


使用會話進行測試:

如果您希望上傳方法按照調用的順序完成,這就是您所需要的。


代碼:


import boto3

import time

import io

from boto3.s3.transfer import TransferConfig

import threading


config = TransferConfig(use_threads=False)


session = boto3.session.Session()

s3 = session.resource('s3')

bucket = s3.Bucket('testbucket')

def upload(filename):

     file = open(filename, 'rb')

     buf = io.BytesIO(file.read())

     start = time.time()

     print("starting to upload file {}".format(filename))

     bucket.upload_fileobj(buf,filename)

     end = time.time()

     print("finished uploading file {}. time: {}".format(filename,end-start))

x1 = threading.Thread(target=upload, args=('200mb.txt',))

x2 = threading.Thread(target=upload, args=('100mb.txt',))

x1.start()

time.sleep(2)

x2.start()

輸出:


開始上傳文件 200mb.txt

開始上傳文件 100mb.txt

完成上傳文件 200mb.txt。時間:46.62478971481323

完成上傳文件100mb.txt。時間:50.515950202941895


我發現的一些資源:

-這是在 SO 中提出的關于阻塞或非阻塞方法的問題。這不是決定性的,但那里可能有相關信息。

- GitHub 上存在一個開放問題,允許在 boto3 中進行異步傳輸。

- 還有像aioboto和aiobotocore這樣的工具,專門用于允許從/到 s3 和其他 aws 服務的異步下載和上傳。


關于我之前的回答:

您可以在此處閱讀有關 boto3 中的文件傳輸配置的信息。特別是:


傳輸操作使用線程來實現并發??梢酝ㄟ^將 use_threads 屬性設置為 False 來禁用線程使用。


最初我認為這與同時執行的多個傳輸有關。但是,閱讀源代碼時,使用TransferConfig時參數max_concurrency中的注釋解釋說并發不是指多次傳輸,而是指 “將發出請求以執行傳輸的線程數”。所以這是用來加速傳輸的東西。use_threads屬性僅用于允許多部分傳輸中的并發性。


查看完整回答
反對 回復 2022-01-18
  • 3 回答
  • 0 關注
  • 362 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號