ctognelaesriaustralia-com-au-esridist

Threading and subprocessing for speed and resilience

Blog Post created by ctognelaesriaustralia-com-au-esridist Employee on May 20, 2019

I thought I would provide a write-up of some code I've written that makes use of subprocesses and threading. Using these two in tandem for my particular process provided enormous parallel speed gains, and excellent resilience: one process crashing doesn't trash the whole run.

 

I've provided the code below for ease of access. Rest easy: it is in a repository.

 

The task I was given was to identify and extract information from layer files and MXDs. For this case, there were about 40,000 files. In late 2017, when I first performed this task (on 160,000 files), I broke it into steps and glued it all together with batch and text files. It was... difficult. For the revisit, I wanted to smooth it out.

This code discovers, analyses and reports in one go.

 

Sharing the load

To split the load up between many threads, you can make use of Python queues: Queue.Queue():

import Queue
DATAFILE_QUEUE = Queue.Queue()
for datafile in datafiles:
   # validate
   DATAFILE_QUEUE.put(datafile)

 

A queue is a first-in, first-out sequence. The queue can keep track of which tasks it has handed out and mark them done when the child reports they are complete. You can get a rough idea of the remaining tasks by running DATAFILE_QUEUE.qsize() in your console, but it's better to watch your logs, I think.

 

To spin up threads that can consume the items in the queue, you write a worker function that performs the task you want done, then create a sequence of n threads with the right parameters (see the full code for a proper example below):

 

 

def interrogate_datafile_thread():
   while True:
      datafile = DATAFILE_QUEUE.get()
      do_work(datafile)

# 100 threads to do the work
process_threads = [threading.Thread(target=interrogate_datafile_thread) for _i in xrange(100)]

for thread in process_threads:
   thread.start()


Collecting data

To write logs and result data, you will need separate threads. If your worker function tries to handle logging and result writing, the result will be a mishappen mess: at best, unreadable data, at worst, file lock city. So use as many queues as you need, and write worker functions to do the writing.

 

def write_log_thread():
    """
    Take a string from the queue, format it, and write to a text file.
    """

    while True:
        with open(LOG_FILE, 'ab') as log_f:
            message = LOG_QUEUE.get()
            now = datetime.datetime.now().ctime()
            log_f.write('[{}]: {}\n'.format(now, message))
        LOG_QUEUE.task_done()

write_log = threading.Thread(target=write_log_thread)
write_log.start()

 

Subprocessing

 

This is the part that does the heavy lifting, the actual analysis of the datafile, or whatever your prone-to-crash process is. I haven't included the analyse_datafile script here (it's getting stuff out of MXD and layer files), but you get the idea. It took me a while to get this to work, but it is fairly straightforward:

 

def subprocess_analyse_datafile(f, subprocess_flags):
    """
    Start a subprocess to analyse a datafile. If the subprocess
    crashes, it won't derail the whole program.
    """

    try:
        cmd = [
            SUBPROCESS_PYTHON_EXE,
            'analyse_datafile.py',
            f,
        ]
        shell_output = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            creationflags=subprocess_flags,
        )
        out, err = shell_output.communicate()
        shell_output.terminate()

        if not err:
            if len(out) >= 2:
                parsed_list = ast.literal_eval(
                    out[0 : out.rfind(']') + 1]
                )
                return parsed_list
            else:
                raise Exception('An unknown error occurred')
        else:
            raise Exception(err)
   except Exception as e:
        raise

 

Something to point out is that the subprocess effectively prints its output, which is captured by the thread and parsed into a data structure (a list with many dictionaries in this case) using ast. A drawback in this case, is, if there's no error, but it doesn't come out as a list, we don't know why.

 

if not err:
    if len(out) >= 2:
        parsed_list = ast.literal_eval(
            out[0 : out.rfind(']') + 1]
        )
        return parsed_list

 

Data and errors are written to stdout and stderr, which is made available to the thread via subprocess.PIPE and picked up by:

out, err = shell_output.communicate()

 

As you see above, the thread can catch errors from stderr and raise Exceptions for the worker thread to handle. The worker thread doesn't die, it lives on to handle more data!

 

Two quick thoughts

 

First: don't use standard types as exit criteria unless you know your input data is squeaky clean. In my case, using plain old None was causing threads to close down, because the source data had some empty strings in it, something I still have to track down. Make something up:

 

THREAD_EXIT_SIGNAL = (None, None)

 

Second: don't code in sleeps and waits that will block your input. You can still, using IDLE at least, query the state and get data back--remember, the main thread isn't doing anything other than supporting child threads, so it's not blocked.

 

I hope that's useful. 

Outcomes