亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在多線程環境下使用DataFrame.to_sql時MySQL死鎖

在多線程環境下使用DataFrame.to_sql時MySQL死鎖

烙印99 2023-08-15 16:44:32
我在 docker 容器內有一個多線程 ETL 進程,看起來像這樣的簡化代碼:class Query(abc.ABC):? ? def __init__(self):? ? ? ? self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)? ? def load(self, df: pd.DataFrame) -> None:? ? ? ? df.to_sql(? ? ? ? ? ? name=self.table, con=self.connection, if_exists="replace", index=False,? ? ? ? )? ? @abc.abstractmethod? ? def transform(self, data: object) -> pd.DataFrame:? ? ? ? pass? ? @abc.abstractmethod? ? def extract(self) -> object:? ? ? ? pass? ? #? other methods...class ComplianceInfluxQuery(Query):? ? # Implements abstract methods... load method is the same as Query classALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]while True:? ? with ThreadPoolExecutor(max_workers=8) as pool:? ? ? ? for query in ALL_QUERIES:? ? ? ? ? ? pool.submit(execute_etl, query) # execute_etl function calls extract, transform and loadload()許多類繼承自 Query,具有與類中所示相同的實現,Query它只是將 pandas DataFrame 對象加載到 sql 表中,并替換該表(如果存在)。所有類同時運行,并在完成后將結果加載到 MySQLExtract()數據庫Transform()。每個類都會將不同的表加載到數據庫中。load()當調用該方法時,我經常會從隨機線程中遇到死鎖:日志顯示了load()兩個線程幾乎同時調用的方法。無論數據如何,這種情況都可能發生在所有類中。我運行了命令SHOW ENGINE INNODB STATUS,那里沒有列出死鎖。cc_influx_share_count我檢查了 General_log 表以更好地了解死鎖期間發生的情況,但除了死鎖的線程在(我認為)應該具有的情況下沒有向表中插入任何值這一事實之外,沒有注意到任何有用的信息:該錯誤于 09:48:28,241 提出SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;此 ETL 是運行 MySQL 的唯一進程。我已閱讀有關死鎖發生原因的文檔,但我無法理解兩個之間沒有連接的不同表如何導致死鎖。我知道我可以簡單地load()再次運行該方法直到成功,但我想了解為什么會發生死鎖以及如何防止它們。MySQL版本是8.0.21。蟒蛇3.8.4。sqlalchemy 1.3.19。熊貓 1.0.5。PyMySQL 0.10.1。
查看完整描述

2 回答

?
拉丁的傳說

TA貢獻1789條經驗 獲得超8個贊

如果多個連接嘗試同時 INSERT 或 UPDATE 到同一個表,則可能會因表索引的爭用而導致死鎖。

你的問題說你從多個線程執行 INSERT。執行 INSERT 需要檢查主鍵唯一性和外鍵有效性等約束,然后更新表示這些約束的索引。所以多個并發更新

  1. 必須鎖定索引以供讀取,然后

  2. 鎖定它們以進行寫入。

從你的問題來看,MySQL有時會陷入死鎖情況(一個線程按a,b順序鎖定索引,另一個線程按b,a順序鎖定索引)。如果不同的線程可以同時將行插入到不同的表中,并且這些表通過外鍵約束相互關聯,則索引維護相對容易陷入死鎖情況。

您可以通過在執行加載之前更改要填充的表以刪除所有索引(自動增量主鍵除外),然后在之后重新創建它們來解決此問題。

或者,您可以擺脫并發性并僅使用一個線程執行ETLL部分。由于所有索引維護,線程并不能像直觀上那樣幫助提高吞吐量。

避免在多個并發線程上運行數據定義語言(CREATE TABLE、CREATE INDEX 等)。對這些東西進行故障排除比其價值更麻煩。

此外,在事務中包裝大約數百行的每個塊的 INSERT 可以以驚人的方式幫助 ETL 吞吐量。在每個主干之前,說BEGIN TRANSACTION; 在每個塊之后說COMMIT; 為什么這有幫助?因為 COMMIT 操作需要時間,并且不在顯式事務中的每個操作后面都會有一個隱式 COMMIT。


查看完整回答
反對 回復 2023-08-15
?
鴻蒙傳說

TA貢獻1865條經驗 獲得超7個贊

我發現這個問題的一個可能的解決方案是重試機制。如果發生死鎖 - 睡眠并多嘗試幾次直到成功,同時將 DF 保留在內存中:


class Query(abc.ABC):

    def __init__(self):

        self.engine = MysqlEngine.engine()


    ....

    ....


    def load(self, df: pd.DataFrame) -> None:

        for i in range(5):  # If load fails due to a deadlock, try 4 more times

            try:

                df.to_sql(

                    name=self.table,

                    con=self.engine.connect(),

                    if_exists="replace",

                    index=False,

                )

                return

            except sqlalchemy.exc.OperationalError as ex:

                if "1213" in repr(ex):

                    logging.warning(

                        "Failed to acquire lock for %s", self.__class__.__name__

                    )

                sleep(1)

死鎖仍然會發生,并且您會損失一些性能,但它勝過重新執行整個 Extrac - Transform。


查看完整回答
反對 回復 2023-08-15
  • 2 回答
  • 0 關注
  • 230 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號