1

I have the below code for connecting to all kinds of databases I need in my project:

import sqlalchemy as db
from clickhouse_driver import Client
from clickhouse_connect import get_client

from sqlalchemy.orm import create_session
from src.data.connection import ConectionInfo
import psycopg2
import mariadb
import socket


class DataBase:
    def __init__(self, model_data_source):
        self._schema = model_data_source['schema']
        self._connection_type = model_data_source['data_source']['connection_type']
        self.connectionInfo = ConectionInfo.ConectionInfo(
            model_data_source['data_source'], model_data_source['db'])
        self._engin = db.create_engine(self.connectionInfo.url)
 
        if self._connection_type == 'SQL':
            self._engin = db.create_engine(self.connectionInfo.url)

        
        elif self._connection_type == 'POSTGRES':
            self._engin = db.create_engine(self.connectionInfo.url)
            keepalive_kwargs = {
                "keepalives": 1,
                "keepalives_idle": 60,
                "keepalives_interval": 10,
                "keepalives_count": 5
                }

            self.postgres_con = psycopg2.connect(
                database=self.connectionInfo.database,
                user=self.connectionInfo.user,
                password=self.connectionInfo.password,
                host=self.connectionInfo.host,
                port=self.connectionInfo.port,**keepalive_kwargs
            )
            self.post_cursor = self.postgres_con.cursor()
         


    def get_table(self, table_name, primary_key='id'):
        metadata = db.MetaData(schema=self._schema)
        return db.Table(table_name, metadata, db.Column(primary_key, db.Integer,
                                                        primary_key=True),
                        autoload=True,
                        autoload_with=self._engin)

    def execute(self, query_string,param=None):
        # connection = self._engin.connect()
        
         
        if self._connection_type == 'POSTGRES':
            try:
                self.post_cursor.execute(query_string)
            except Exception as ex:
                try:
                    self.post_cursor.close()
                    self.post_cursor = self.postgres_con.cursor()
                except Exception as ex:
                    self.postgres_con.close()
                    self.postgres_con = psycopg2.connect(
                        database=self.connectionInfo.database,
                        user=self.connectionInfo.user,
                        password=self.connectionInfo.password,
                        host=self.connectionInfo.host,
                        port=self.connectionInfo.port
                    )
                self.post_cursor = self.postgres_con.cursor()
                self.post_cursor.execute(query_string)
     
            result = self.post_cursor.fetchall()
            # cols = [desc[0] for desc in self.post_cursor.description]
            return [result, self.post_cursor.description]
        



    # def insert_execute(self, query_string):
    #     # connection = self._engin.connect()
    #     if (self._connection_type == 'SQL'):
    #         con = self._engin.connect()
    #         return con.execute(query_string)
    #     if self._connection_type == 'clickhouse':
    #         return self._client .execute(query_string, with_column_types=True)

    #     if self._connection_type == 'POSTGRES':
    #         result = self.post_cursor.execute(query_string)
    #         id_of_new_row = self.post_cursor.fetchone()[0]
    #         self.postgres_con.commit()

    #         return id_of_new_row

    def delete_execute(self, query_string):
        # connection = self._engin.connect()
        if (self._connection_type == 'SQL'):
            con = self._engin.connect()
            return con.execute(query_string)
        if self._connection_type == 'clickhouse':
            return self._client.execute(query_string, with_column_types=True)
        if self._connection_type == 'POSTGRES':
            result = self.post_cursor.execute(query_string)
            if result is not None:
                result = self.post_cursor.fetchone()[0]
            # rows_deleted = self.post_cursor.rowcount
            self.postgres_con.commit()
            return result

    def get_session(self):
        return create_session(bind=self._engin)

    def get_engine(self):
        return self._engin

it was ok but recently I got this error constantly during connecting to Postgres:

SSL SYSCALL error: EOF detected

But I can connect to Postgres or run queries in Datagrip. Can anyone help me to resolve this issue? I tested different solutions in this post too.

the complete log for database is :2023-12-21 12:51:17,394 WARNING: Retry got exception: 'connection problems'
/tmp/postgres:5432 - rejecting connections
2023-12-21 12:51:17,595 WARNING: Failed to determine PostgreSQL state from the connection, falling back to cached role
2023-12-21 12:51:17,597 ERROR: Exception when called state_handler.last_operation()
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/patroni/ha.py", line 157, in update_lock
last_lsn = self.state_handler.last_operation()
File "/usr/local/lib/python3.6/site-packages/patroni/postgresql/__init__.py", line 993, in last_operation
return self._wal_position(self.is_leader(), self._cluster_info_state_get('wal_position'),
File "/usr/local/lib/python3.6/site-packages/patroni/postgresql/__init__.py", line 354, in _cluster_info_state_get
raise PostgresConnectionException(self._cluster_info_state['error'])
patroni.exceptions.PostgresConnectionException: "'Too many retry attempts'"
2023-12-21 12:51:17,639 WARNING: Failed to determine PostgreSQL state from the connection, falling back to cached role
2023-12-21 12:51:19,507 INFO: Error communicating with PostgreSQL. Will try again later
/tmp/postgres:5432 - accepting connections
12
  • 1) Why are you creating a SQLAlchemy connection and a psycopg2 connection? 2) What is the value for self.connectionInfo.url? Add answers as text update to question text. Commented Dec 20, 2023 at 21:15
  • I need both SQLAlchemy and and psycopg2. the url is: 'postgresql://my_user:[email protected]:5432/my_db' Commented Dec 20, 2023 at 22:53
  • No you don't create_engine will call psycopg2. See Engine Configuration Commented Dec 20, 2023 at 22:57
  • @Adrian Klaver I didnt use self._engin to execute. I used self.post_cursor.execute(query_string) and ihave rror in this line Commented Dec 21, 2023 at 1:27
  • The only thing I know to do is write a test script that just makes a psycopg2 connection using the same parameters and see if you get an error. If so also look at the Postgres log to see what if anything is actually hitting the database. Commented Dec 21, 2023 at 5:23

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.