0

First of all, I'm completely new to using databases other than sqlite, which I have only used a bit, and python is also something I've only used for about 6 months so bear with me if I miss something obvious or have completely misunderstood something.

I have a lot of historical market data I'm scraping (~15000 items for three regions) and to do it efficiently I was trying to do it by using a process for each region and then multithreading each process to get all the items. The response I get from my scraping for each item is a list of dicts which I then want to insert using Session.execute(). I have not gotten this to work yet (if you know a way please guide me in the right direction as well), so for now I went with only multithreading as I have successfully used that to insert data into the regionid and typeid tables. Still I get no data inserted into my historical_data table and no errors when I run my script. I tried enabling sqlalchemy logging with

import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

and that shows me what I expect for my calls to get data from the regionid and typeid tables during main, but after that nothing, does that mean I have no connection to the database after I multithread or is the logger just not good at handling multithreading?

With the regionid and typeid tables, I used Session.merge() and handled the data for each item with a for-loop so I'm guessing it's my use of Session.execute() that's off?

I try to insert all my historical data into a postgres database using sqlalchemy.orm The actual script I use to try to insert the data is as follows:

if __name__ == '__main__':
    print("Start database session")
    Base.metadata.create_all(engine)
    Session = scoped_session(session_factory)
    ini_params()
    print("Get typeids and regionids from database")
    typeids = get_typeids() #get all typeids from typeid table
    regionids = get_regionids() #get all regionids from regionid table
    typeids = typeid_test_list #uncomment for debug
    print(typeids)
    for idx, regionid in enumerate(regionids):
        no_data = data_to_db(regionid, typeids, idx)
        #no_data = multi_processing(regionid, typeids, idx)
    print(no_data)

def data_to_db(regionid, typeids, idx):
    ini_params()
    position = int(idx)
    no_data_typeids = []
    prefix = 'Getting data for region ' + str(regionid)
    typeid_length = len(typeids)
    with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
            futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
            for future in concurrent.futures.as_completed(futures):
                pbar.update(1)
    return no_data_typeids

def multithread_func(typeid, regionid):
    today = datetime.date.today()
    history = get_market_history(regionid, typeid) #URL-scraper
    if history != "error":
        import_full_history(history)
    else:
        return typeid
    return 0

def import_full_history(history):
    get_data_session = Session()
    print(type(history))
    get_data_session.execute(historical_data.insert(), item_dict)
    get_data_session.commit()
    Session.remove()
    return 0

and my database is built like so

Base

engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()

regionids

Session = scoped_session(session_factory)
class Regionid(Base):
    __tablename__ = 'regionids'
    regionid = Column(Integer, primary_key=True)
    query = Session.query_property()
    def __init__(self, regionid):
        self.regionid = regionid

typeids

Session = scoped_session(session_factory)
class Typeid(Base):
    __tablename__ = 'typeids'
    typeid = Column(Integer, primary_key=True)
    query = Session.query_property()
    def __init__(self, typeid):
        self.typeid = typeid

historical_data

class Historical_data(Base):
    __tablename__ = 'historical_data'
    
    id = Column(Integer, primary_key=True)
    typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
    regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
    date = Column(Date)
    average = Column(Float)
    highest = Column(Float)
    lowest = Column(Float)
    order_count = Column(Integer)
    volume = Column(Integer)
    buy_weighted_avg = Column(Float)
    buy_maxval = Column(Float)
    buy_minval = Column(Float)
    buy_stddev = Column(Float)
    buy_median = Column(Float)
    buy_volume = Column(Float)
    buy_numorders = Column(Integer)
    buy_fivepercent = Column(Float)
    sell_weighted_avg = Column(Float)
    sell_maxval = Column(Float)
    sell_minval = Column(Float)
    sell_stddev = Column(Float)
    sell_median = Column(Float)
    sell_volume = Column(Float)
    sell_numorders = Column(Integer)
    sell_fivepercent = Column(Float)
    
def __init__(self, title, release_date):
    self.typeid = typeid
    self.regionid = regionid
    self.date = date
    self.average = average
    self.highest = highest
    self.lowest = lowest
    self.order_count = order_count
    self.volume = volume
    self.buy_weighted_avg = buy_weighted_avg
    self.buy_maxval = buy_maxval
    self.buy_minval = buy_minval
    self.buy_stddev = buy_stddev
    self.buy_median = buy_median
    self.buy_volume = buy_volume
    self.buy_numorders = buy_numorders
    self.buy_fivepercent = buy_fivepercent
    self.sell_weighted_avg = sell_weighted_avg
    self.sell_maxval = sell_maxval
    self.sell_minval = sell_minval
    self.sell_stddev = sell_stddev
    self.sell_median = sell_median
    self.sell_volume = sell_volume
    self.sell_numorders = sell_numorders
    self.sell_fivepercent = sell_fivepercent

1 Answer 1

1

I have managed to get it to work by using bulk_insert_mappings() so simply just by changing my import_full_history() to

def import_full_esi_history(history):
    get_data_session = Session()
    get_data_session.bulk_insert_mappings(Historical_data, history)
    get_data_session.commit()
    Session.remove()
    return 0

I got it to insert data. IT also works with multithreading and multiprocessing combined like I originally intended. Seems like Session.insert() only works with one column at a time and my dicts in the list was for entire rows

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.