I have the below code for connecting to all kinds of databases I need in my project:
import sqlalchemy as db
from clickhouse_driver import Client
from clickhouse_connect import get_client
from sqlalchemy.orm import create_session
from src.data.connection import ConectionInfo
import psycopg2
import mariadb
import socket
class DataBase:
def __init__(self, model_data_source):
self._schema = model_data_source['schema']
self._connection_type = model_data_source['data_source']['connection_type']
self.connectionInfo = ConectionInfo.ConectionInfo(
model_data_source['data_source'], model_data_source['db'])
self._engin = db.create_engine(self.connectionInfo.url)
if self._connection_type == 'SQL':
self._engin = db.create_engine(self.connectionInfo.url)
elif self._connection_type == 'POSTGRES':
self._engin = db.create_engine(self.connectionInfo.url)
keepalive_kwargs = {
"keepalives": 1,
"keepalives_idle": 60,
"keepalives_interval": 10,
"keepalives_count": 5
}
self.postgres_con = psycopg2.connect(
database=self.connectionInfo.database,
user=self.connectionInfo.user,
password=self.connectionInfo.password,
host=self.connectionInfo.host,
port=self.connectionInfo.port,**keepalive_kwargs
)
self.post_cursor = self.postgres_con.cursor()
def get_table(self, table_name, primary_key='id'):
metadata = db.MetaData(schema=self._schema)
return db.Table(table_name, metadata, db.Column(primary_key, db.Integer,
primary_key=True),
autoload=True,
autoload_with=self._engin)
def execute(self, query_string,param=None):
# connection = self._engin.connect()
if self._connection_type == 'POSTGRES':
try:
self.post_cursor.execute(query_string)
except Exception as ex:
try:
self.post_cursor.close()
self.post_cursor = self.postgres_con.cursor()
except Exception as ex:
self.postgres_con.close()
self.postgres_con = psycopg2.connect(
database=self.connectionInfo.database,
user=self.connectionInfo.user,
password=self.connectionInfo.password,
host=self.connectionInfo.host,
port=self.connectionInfo.port
)
self.post_cursor = self.postgres_con.cursor()
self.post_cursor.execute(query_string)
result = self.post_cursor.fetchall()
# cols = [desc[0] for desc in self.post_cursor.description]
return [result, self.post_cursor.description]
# def insert_execute(self, query_string):
# # connection = self._engin.connect()
# if (self._connection_type == 'SQL'):
# con = self._engin.connect()
# return con.execute(query_string)
# if self._connection_type == 'clickhouse':
# return self._client .execute(query_string, with_column_types=True)
# if self._connection_type == 'POSTGRES':
# result = self.post_cursor.execute(query_string)
# id_of_new_row = self.post_cursor.fetchone()[0]
# self.postgres_con.commit()
# return id_of_new_row
def delete_execute(self, query_string):
# connection = self._engin.connect()
if (self._connection_type == 'SQL'):
con = self._engin.connect()
return con.execute(query_string)
if self._connection_type == 'clickhouse':
return self._client.execute(query_string, with_column_types=True)
if self._connection_type == 'POSTGRES':
result = self.post_cursor.execute(query_string)
if result is not None:
result = self.post_cursor.fetchone()[0]
# rows_deleted = self.post_cursor.rowcount
self.postgres_con.commit()
return result
def get_session(self):
return create_session(bind=self._engin)
def get_engine(self):
return self._engin
it was ok but recently I got this error constantly during connecting to Postgres:
SSL SYSCALL error: EOF detected
But I can connect to Postgres or run queries in Datagrip. Can anyone help me to resolve this issue? I tested different solutions in this post too.
the complete log for database is :2023-12-21 12:51:17,394 WARNING: Retry got exception: 'connection problems'
/tmp/postgres:5432 - rejecting connections
2023-12-21 12:51:17,595 WARNING: Failed to determine PostgreSQL state from the connection, falling back to cached role
2023-12-21 12:51:17,597 ERROR: Exception when called state_handler.last_operation()
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/patroni/ha.py", line 157, in update_lock
last_lsn = self.state_handler.last_operation()
File "/usr/local/lib/python3.6/site-packages/patroni/postgresql/__init__.py", line 993, in last_operation
return self._wal_position(self.is_leader(), self._cluster_info_state_get('wal_position'),
File "/usr/local/lib/python3.6/site-packages/patroni/postgresql/__init__.py", line 354, in _cluster_info_state_get
raise PostgresConnectionException(self._cluster_info_state['error'])
patroni.exceptions.PostgresConnectionException: "'Too many retry attempts'"
2023-12-21 12:51:17,639 WARNING: Failed to determine PostgreSQL state from the connection, falling back to cached role
2023-12-21 12:51:19,507 INFO: Error communicating with PostgreSQL. Will try again later
/tmp/postgres:5432 - accepting connections
SQLAlchemyconnection and apsycopg2connection? 2) What is the value forself.connectionInfo.url? Add answers as text update to question text.create_enginewill callpsycopg2. See Engine Configurationpsycopg2connection using the same parameters and see if you get an error. If so also look at the Postgres log to see what if anything is actually hitting the database.