First of all, I'm completely new to using databases other than sqlite, which I have only used a bit, and python is also something I've only used for about 6 months so bear with me if I miss something obvious or have completely misunderstood something.
I have a lot of historical market data I'm scraping (~15000 items for three regions) and to do it efficiently I was trying to do it by using a process for each region and then multithreading each process to get all the items. The response I get from my scraping for each item is a list of dicts which I then want to insert using Session.execute(). I have not gotten this to work yet (if you know a way please guide me in the right direction as well), so for now I went with only multithreading as I have successfully used that to insert data into the regionid and typeid tables.
Still I get no data inserted into my historical_data table and no errors when I run my script. I tried enabling sqlalchemy logging with
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
and that shows me what I expect for my calls to get data from the regionid and typeid tables during main, but after that nothing, does that mean I have no connection to the database after I multithread or is the logger just not good at handling multithreading?
With the regionid and typeid tables, I used Session.merge() and handled the data for each item with a for-loop so I'm guessing it's my use of Session.execute() that's off?
I try to insert all my historical data into a postgres database using sqlalchemy.orm The actual script I use to try to insert the data is as follows:
if __name__ == '__main__':
print("Start database session")
Base.metadata.create_all(engine)
Session = scoped_session(session_factory)
ini_params()
print("Get typeids and regionids from database")
typeids = get_typeids() #get all typeids from typeid table
regionids = get_regionids() #get all regionids from regionid table
typeids = typeid_test_list #uncomment for debug
print(typeids)
for idx, regionid in enumerate(regionids):
no_data = data_to_db(regionid, typeids, idx)
#no_data = multi_processing(regionid, typeids, idx)
print(no_data)
def data_to_db(regionid, typeids, idx):
ini_params()
position = int(idx)
no_data_typeids = []
prefix = 'Getting data for region ' + str(regionid)
typeid_length = len(typeids)
with tqdm(typeids, total=typeid_length, desc=prefix, position=position) as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = {executor.submit(multithread_func, typeid, regionid): typeid for typeid in typeids}
for future in concurrent.futures.as_completed(futures):
pbar.update(1)
return no_data_typeids
def multithread_func(typeid, regionid):
today = datetime.date.today()
history = get_market_history(regionid, typeid) #URL-scraper
if history != "error":
import_full_history(history)
else:
return typeid
return 0
def import_full_history(history):
get_data_session = Session()
print(type(history))
get_data_session.execute(historical_data.insert(), item_dict)
get_data_session.commit()
Session.remove()
return 0
and my database is built like so
Base
engine = create_engine('postgresql://user@localhost:5432/historic_market_data', pool_size=12, max_overflow=0)
session_factory = sessionmaker(bind = engine)
Base = declarative_base()
regionids
Session = scoped_session(session_factory)
class Regionid(Base):
__tablename__ = 'regionids'
regionid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, regionid):
self.regionid = regionid
typeids
Session = scoped_session(session_factory)
class Typeid(Base):
__tablename__ = 'typeids'
typeid = Column(Integer, primary_key=True)
query = Session.query_property()
def __init__(self, typeid):
self.typeid = typeid
historical_data
class Historical_data(Base):
__tablename__ = 'historical_data'
id = Column(Integer, primary_key=True)
typeid = Column('typeid', Integer, ForeignKey('typeids.typeid'))
regionid = Column('regionid', Integer, ForeignKey('regionids.regionid'))
date = Column(Date)
average = Column(Float)
highest = Column(Float)
lowest = Column(Float)
order_count = Column(Integer)
volume = Column(Integer)
buy_weighted_avg = Column(Float)
buy_maxval = Column(Float)
buy_minval = Column(Float)
buy_stddev = Column(Float)
buy_median = Column(Float)
buy_volume = Column(Float)
buy_numorders = Column(Integer)
buy_fivepercent = Column(Float)
sell_weighted_avg = Column(Float)
sell_maxval = Column(Float)
sell_minval = Column(Float)
sell_stddev = Column(Float)
sell_median = Column(Float)
sell_volume = Column(Float)
sell_numorders = Column(Integer)
sell_fivepercent = Column(Float)
def __init__(self, title, release_date):
self.typeid = typeid
self.regionid = regionid
self.date = date
self.average = average
self.highest = highest
self.lowest = lowest
self.order_count = order_count
self.volume = volume
self.buy_weighted_avg = buy_weighted_avg
self.buy_maxval = buy_maxval
self.buy_minval = buy_minval
self.buy_stddev = buy_stddev
self.buy_median = buy_median
self.buy_volume = buy_volume
self.buy_numorders = buy_numorders
self.buy_fivepercent = buy_fivepercent
self.sell_weighted_avg = sell_weighted_avg
self.sell_maxval = sell_maxval
self.sell_minval = sell_minval
self.sell_stddev = sell_stddev
self.sell_median = sell_median
self.sell_volume = sell_volume
self.sell_numorders = sell_numorders
self.sell_fivepercent = sell_fivepercent