TAGS :Viewed: 12 - Published at: a few seconds ago

[ Python multi-threading file processing ]

I have few files that resides on a server, im trying to implement a multi threading process to improve the performance, I read a tutorial but have few questions implementing it,

Here are the files,

filelistread = ['h:\\file1.txt', \
                'h:\\file2.txt', \
                'h:\\file3.txt', \
                'h:\\file4.txt']

filelistwrte = ['h:\\file1-out.txt','h:\\file2-out.txt','h:\\file3-out.txt','h:\\file4-out.txt']


def workermethod(inpfile, outfile):
    f1 = open(inpfile,'r')
    f2 = open(outfile,'w')
    x = f1.readlines()
    for each in x:
        f2.write(each)
    f1.close()
    f2.close()

How do I implement using the thread class and queue?

I started with the below class but not sure how to pass the inpfile and outputfile to the run method..Any inputs are appreciated

class ThreadUrl(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()

Answer 1


You're mixing up two different solutions.

If you want to create a dedicated worker thread for each file, you don't need a queue for anything. If you want to create a threadpool and a queue of files, you don't want to pass inpfile and outfile to the run method; you want to put them in each job on the queue.

How do you choose between the two? Well, the first is obviously simpler, but if you have, say, 1000 files to copy, you'll end up creating 1000 threads, which is more threads than you ever want to create, and far more threads than the number of parallel copies the OS will be able to handle. A thread pool lets you create, say, 8 threads, and put 1000 jobs on a queue, and they'll be distributed to the threads as appropriate, so 8 jobs are running at a time.

Let's start with solution 1, a dedicated worker thread for each file.

First, if you aren't married to subclassing Thread, there's really no reason to do so here. You can pass a target function and an args tuple to the default constructor, and then the run method will just do target(*args), exactly as you want. So:

t = threading.Thread(target=workermethod, args=(inpfile, outfile))

That's all you need. When each thread runs, it will call workermethod(inpfile, outfile) and then exit.

However, if you do want to subclass Thread for some reason, you can. You can pass the inpfile and outfile in at construction time, and your run method would just be that workermethod modified to use self.inpfile and self.outfile instead of taking parameters. Like this:

class ThreadUrl(threading.Thread):
    def __init__(self, inpfile, outfile):
        threading.Thread.__init__(self)
        self.inpfile, self.outfile = inpfile, outfile

    def run(self):
        f1 = open(self.inpfile,'r')
        f2 = open(self.outfile,'w')
        x = f1.readlines()
        for each in x:
            f2.write(each)
        f1.close()
        f2.close()

Either way, I'd suggest using with statements instead of explicit open and close, and getting rid of the readlines (which unnecessarily reads the entire file into memory), unless you need to deal with really old versions of Python:

    def run(self):
        with open(self.inpfile,'r') as f1, open(self.outfile,'w') as f2:
            for line in f1:
                f2.write(line)

Now, on to solution 2: a threadpool and a queue.

Again, you don't need a subclass here; the differences between the two ways of doing things are the same as in solution 1. But sticking with the subclass design you've started, you want something like this:

class ThreadUrl(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            inpfile, outfile = self.queue.get()
            workermethod(inpfile, outfile)

Then you start your threads by passing a single queue to all of them:

q = queue.Queue
threadpool = [ThreadUrl(q) for i in range(poolsize)]

And submit jobs like this:

q.put((inpfile, outfile))

If you're going to be doing serious work with threadpools, you may want to look into using a robust, flexible, simple, and optimized implementation instead of coding something up yourself. For example, you might want to be able to cancel jobs, shutdown the queue nicely, join the whole pool instead of joining threads one by one, do batching or smart load balancing, etc.

If you're using Python 3, you should look at the standard-library ThreadPoolExecutor. If you're stuck with Python 2, or can't figure out Futures, you might want to look at the ThreadPool class hidden inside the multiprocessing module. Both of these have the advantage that switching from multithreading to multiprocessing (if, say, it turns out that you have some CPU-bound work that needs to be parallelized along with your IO) is trivial. You can also search PyPI and you'll find multiple other good implementations.

As a side note, you don't want to call the queue queue, because that will shadow the module name. Also, it's a bit confusing to have something called workermethod that's actually a free function rather than a method.

Finally, if all you're doing is copying the files, you probably don't want to read in text mode, or go line by line. In fact, you probably don't want to implement it yourself at all; just use the appropriate copy function from shutil. You can do that with any of the above methods very easily. For example, instead of this:

t = threading.Thread(target=workermethod, args=(inpfile, outfile))

do this:

t = threading.Thread(target=shutil.copyfile, args=(inpfile, outfile))

In fact, it looks like your whole program can be replaced by:

threads = [threading.Thread(target=shutil.copyfile, args=(inpfile, outfile))
           for (inpfile, outfile) in zip(filelistread, filelistwrte)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()