Parallel Python Community Forums

Python Forums => Parallel Python Forum => Topic started by: XiaoTao on October 24, 2017, 12:57:12 AM



Title: PP is blocked for long-time read-write
Post by: XiaoTao on October 24, 2017, 12:57:12 AM
I am trying to use PP in my software (https://github.com/XiaoTaoWang/HiC_pipeline) for parallel tasks.
When I tested it with small datasets, it seemed to work well (the pipeline was completed and no exception
occurred). However, something odd happened for large datasets.

During the first stage of my pipeline, each input file was separated into chunks (smaller files, each containing
20000000 lines). Strangely, the maximal number of chunks each process could generate was always 24 in my test.
The 25th file was there, but the program started blocking and no further lines were outputed into it.

The job was submitted by PBS (Portable Batch System), and both "SMP" and "clusters" modes of PP encountered
the same problem.

#######################################################################
The function is defined as follows (modified for brevity):

def splitFASTQ(fq1, fq2):
    
    p1 = os.path.split(fq1)[1].split('.')[0].split('_')
    outname1 = '_'.join(p1[:-1]) + '_chunk{0}_{1}.fastq.gz'
    p2 = os.path.split(fq2)[1].split('.')[0].split('_')
    outname2 = '_'.join(p2[:-1]) + '_chunk{0}_{1}.fastq.gz'
    
    if fq1.endswith('.fastq.gz'):
        pread_1 = subprocess.Popen(['gunzip', fq1, '-c'],
                                    stdout = subprocess.PIPE, bufsize = 4096)
    else:
        pread_1 = subprocess.Popen(['cat', fq1],
                                    stdout = subprocess.PIPE, bufsize = 4096)
    if fq2.endswith('.fastq.gz'):
        pread_2 = subprocess.Popen(['gunzip', fq2, '-c'],
                                    stdout = subprocess.PIPE, bufsize = 4096)
    else:
        pread_2 = subprocess.Popen(['cat', fq2],
                                    stdout = subprocess.PIPE, bufsize = 4096)
                                  
    inStream_1 = pread_1.stdout
    inStream_2 = pread_2.stdout
    
    halted = False
    for counter in xrange(1000000):
        outf1 = outname1.format(counter,p1[-1])
        outf2 = outname2.format(counter,p2[-1])
        outProc1 = gzipWriter(outf1)
        outProc2 = gzipWriter(outf2)
        outStream1 = outProc1.stdin
        outStream2 = outProc2.stdin

        for j in xrange(5000000):

            line = inStream_1.readline()

            try:
                assert line[0] == '@'
            except AssertionError:
                raise IOError('Invalid fastq file')
            except IndexError:
                halted = True
                break


            fastq_entry_1 = (line, inStream_1.readline(), inStream_1.readline(),
                             inStream_1.readline())
            fastq_entry_2 = (inStream_2.readline(), inStream_2.readline(),
                             inStream_2.readline(), inStream_2.readline())

            outStream1.writelines(fastq_entry_1)
            outStream2.writelines(fastq_entry_2)

        outProc1.communicate()
        outProc2.communicate()
        
        yield outf1, outf2
        
        if halted:
            break

The pp.Server class for SMP mode is customized as follows:

class ppLocal(pp.Server):
    
    def __init__(self, per_worker, maximum_worker):
        
        ncpus = self._detect_ncpus()
        n_worker = ncpus // per_worker
        if not n_worker:
            n_worker = 1
        self.n_worker = maximum_worker
        pp.Server.__init__(self, ncpus=self.n_worker)
    
    def _detect_ncpus(self):
        """
        Detects the number of effective CPUs in the system.
        """
        # Linux and Unix
        ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
        if isinstance(ncpus, int) and ncpus > 0:
            return ncpus
        return 1
      
#######################################################################

I don't know what happened, do you have any ideas?


Title: Re: PP is blocked for long-time read-write
Post by: XiaoTao on October 24, 2017, 01:02:22 AM
python 2.7 was used in my tests.