2 回答

TA貢獻1789條經驗 獲得超8個贊
如果多個連接嘗試同時 INSERT 或 UPDATE 到同一個表,則可能會因表索引的爭用而導致死鎖。
你的問題說你從多個線程執行 INSERT。執行 INSERT 需要檢查主鍵唯一性和外鍵有效性等約束,然后更新表示這些約束的索引。所以多個并發更新
必須鎖定索引以供讀取,然后
鎖定它們以進行寫入。
從你的問題來看,MySQL有時會陷入死鎖情況(一個線程按a,b順序鎖定索引,另一個線程按b,a順序鎖定索引)。如果不同的線程可以同時將行插入到不同的表中,并且這些表通過外鍵約束相互關聯,則索引維護相對容易陷入死鎖情況。
您可以通過在執行加載之前更改要填充的表以刪除所有索引(自動增量主鍵除外),然后在之后重新創建它們來解決此問題。
或者,您可以擺脫并發性并僅使用一個線程執行ETL的L部分。由于所有索引維護,線程并不能像直觀上那樣幫助提高吞吐量。
避免在多個并發線程上運行數據定義語言(CREATE TABLE、CREATE INDEX 等)。對這些東西進行故障排除比其價值更麻煩。
此外,在事務中包裝大約數百行的每個塊的 INSERT 可以以驚人的方式幫助 ETL 吞吐量。在每個主干之前,說BEGIN TRANSACTION;
在每個塊之后說COMMIT;
為什么這有幫助?因為 COMMIT 操作需要時間,并且不在顯式事務中的每個操作后面都會有一個隱式 COMMIT。

TA貢獻1865條經驗 獲得超7個贊
我發現這個問題的一個可能的解決方案是重試機制。如果發生死鎖 - 睡眠并多嘗試幾次直到成功,同時將 DF 保留在內存中:
class Query(abc.ABC):
def __init__(self):
self.engine = MysqlEngine.engine()
....
....
def load(self, df: pd.DataFrame) -> None:
for i in range(5): # If load fails due to a deadlock, try 4 more times
try:
df.to_sql(
name=self.table,
con=self.engine.connect(),
if_exists="replace",
index=False,
)
return
except sqlalchemy.exc.OperationalError as ex:
if "1213" in repr(ex):
logging.warning(
"Failed to acquire lock for %s", self.__class__.__name__
)
sleep(1)
死鎖仍然會發生,并且您會損失一些性能,但它勝過重新執行整個 Extrac - Transform。
添加回答
舉報