I have a process running in Python 3.7 that loads JSON files, gathers file rows into chunks in async queues, and incrementally posts chunks to ElasticSearch for indexing.
Chunking is meant to avoid overloading the ElasticSearch connection.
def load_files_to_queue(file_queue, incomplete_files, doc_queue, index):
logger.info("Initializing load files to queue")
while True:
try:
current_file = file_queue.get(False)
logger.info("Loading {} into queue.".format(current_file))
iteration_counter = 0
with open(current_file) as loaded_file:
iterator = json_iterator(loaded_file)
current_type = "doctype"
chunk = []
for row in iterator:
# Every so often check the queue size
iteration_counter += 1
if iteration_counter > 5000:
# If it gets too big, pause until it has gone
# down a bunch.
if doc_queue.qsize() > 30:
logger.info(
"Doc queue at {}, pausing until smaller.".format(
doc_queue.qsize()
)
)
while doc_queue.qsize() > 10:
time.sleep(0.5)
iteration_counter = 0
for transformed in transform_single_doc(current_type, row, index):
if transformed:
chunk.append(transformed)
# NOTE: Send messages in chunks in stead of single rows so that queue
# has less frequent locking
if len(chunk) >= DOC_QUEUE_CHUNK_SIZE:
doc_queue.put(chunk)
chunk = []
if chunk:
doc_queue.put(chunk)
incomplete_files.remove(current_file)
logger.info("Finished loading {} into queue.".format(current_file))
logger.info("There are {} files left to load.".format(file_queue.qsize()))
except Empty:
break
def bulk_load_from_queue(file_queue, incomplete_files, doc_queue, chunk_size=500):
"""
Represents a single worker thread loading docs into ES
"""
logger.info("Initialize bulk doc loader {}".format(threading.current_thread()))
conn = Elasticsearch(settings.ELASTICSEARCH, timeout=180)
dequeue_results(
streaming_bulk(
conn,
load_docs_from_queue(file_queue, incomplete_files, doc_queue),
max_retries=2,
initial_backoff=10,
chunk_size=chunk_size,
yield_ok=False,
raise_on_exception=True,
raise_on_error=True,
)
)
logger.info("Shutting down doc loader {}".format(threading.current_thread()))
Occasionally an error like this would happen in bulk_load_from_queue, which I interpret to mean the chunk was too large.
TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would be [1024404322/976.9mb], which is larger than the limit of [1011774259/964.9mb], real usage: [1013836880/966.8mb], new bytes reserved: [10567442/10mb], usages [request=32880/32.1kb, fielddata=7440/7.2kb, in_flight_requests=164031664/156.4mb, accounting=46679308/44.5mb]')
Re-running usually resolved this, but the error has become too frequent. So I looked to enforce a chunk size limit in load_files_to_queue like so:
for transformed in transform_single_doc(current_type, row, index):
if transformed:
chunk_size = chunk_size + sys.getsizeof(transformed)
chunk.append(transformed)
# NOTE: Send messages in chunks in stead of single rows so that queue
# has less frequent locking
if (
chunk_size >= DOC_QUEUE_CHUNK_SIZE
or len(chunk) >= DOC_QUEUE_CHUNK_LEN
):
doc_queue.put(chunk)
chunk = []
chunk_size = 0
if len(chunk) > 0:
doc_queue.put(chunk)
This results in a handful of these errors towards the end of processing:
ConnectionResetError
[Errno 104] Connection reset by peer
and then:
EOFError multiprocessing.connection in _recv