亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Psycopg2 在類中自動重新連接

Psycopg2 在類中自動重新連接

函數式編程 2022-12-14 20:45:00
我有課程可以連接到我的數據庫。import psycopg2, psycopg2.extensionsfrom parseini import configimport pandas as pd, pandas.io.sql as sqlioclass MyDatabase:    def __init__(self, name='mydb.ini'):        self.params = config(filename=name)        self.my_connection = psycopg2.connect(**self.params)        self.my_cursor = self.my_connection.cursor()    def fetch_all_as_df(self, sql_statement):        return sqlio.read_sql_query(sql_statement, self.my_connection)    def df_to_sql(self, df):        table = 'sometable'        return sqlio.to_sql(df, table, self.my_connection)    def __del__(self):        self.my_cursor.close()        self.my_connection.close()在我的案例中,如何重新連接到數據庫并處理 psycopg2.OperationalError?
查看完整描述

1 回答

?
飲歌長嘯

TA貢獻1951條經驗 獲得超3個贊

psycopg2.InterfaceError您可以制作一個裝飾器,在或psycopg2.OperationalError被提升時嘗試重新連接。

這只是它如何工作的一個例子,可能需要調整:


import time

from functools import wraps

import psycopg2, psycopg2.extensions



def retry(fn):

    @wraps(fn)

    def wrapper(*args, **kw):

        cls = args[0]

        for x in range(cls._reconnectTries):

            print(x, cls._reconnectTries)

            try:

                return fn(*args, **kw)

            except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:

                print ("\nDatabase Connection [InterfaceError or OperationalError]")

                print ("Idle for %s seconds" % (cls._reconnectIdle))

                time.sleep(cls._reconnectIdle)

                cls._connect()

    return wrapper



class MyDatabase:

    _reconnectTries = 5

    _reconnectIdle = 2  # wait seconds before retying


    def __init__(self, name='mydb.ini'):

        self.my_connection = None

        self.my_cursor = None

        self.params = config(filename=name)

        self._connect()


    def _connect(self):

        self.my_connection = psycopg2.connect(**self.params)

        self.my_cursor = self.my_connection.cursor()


    @retry

    def fetch_all_as_df(self, sql_statement):

        return sqlio.read_sql_query(sql_statement, self.my_connection)


    @retry

    def dummy(self):

        self.my_cursor.execute('select 1+2 as result')

        return self.my_cursor.fetchone()


    @retry

    def df_to_sql(self, df):

        table = 'sometable'

        return sqlio.to_sql(df, table, self.my_connection)


    def __del__(self):

        # Maybe there is a connection but no cursor, whatever close silently!

        for c in (self.my_cursor, self.my_connection):

            try:

                c.close()

            except:

                pass



db = MyDatabase()

time.sleep(30)  # some time to shutdown the database

print(db.dummy())

輸出:


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds

(3,)

注意:_connect它本身沒有修飾,所以這段代碼假定初始連接總是有效!


查看完整回答
反對 回復 2022-12-14
  • 1 回答
  • 0 關注
  • 321 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號