I'm trying to parallelize text processing on a large DataFrame stored in an HDFS, but when I try to map the chunk from the select() iterator, I get the error below.

    def parallelize_hdfs(self):
        cores = cpu_count()
        pool = Pool(cores)
        self.text_object = pd.DataFrame()

        chunk_size = self.hdf_store.get_storer(self.hdf_table).nrows // 1000

        hdfs_iterator = self.hdf_store.select(self.hdf_table,
                                             start = 0,
                                             stop = 50,
                                             columns=['text'],
                                             chunksize=chunk_size)

        for chunk in hdfs_iterator:
            df_split = np.array_split(chunk, cores)

            self.text_object.append(
                    pd.concat(
                            pool.map(self.processor_hdfs, df_split)
                    )
            )
C:\project\base_processor.py in process_hdfs(self, hdfs_file, table_name, text_col_name, nrows)
     42         self.hdf_nrows = nrows
     43         start = time.time()
---> 44         self.parallelize_hdfs()
     45         end = time.time()
     46         logger.info("Elapsed time: %2.2f sec" % (end - start))

C:\project\base_processor.py in parallelize_hdfs(self)
    212             self.text_object.append(
    213                     pd.concat(
--> 214                             pool.map(self.processor_hdfs, df_split)
    215                     )
    216             )

C:\project\Anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):

C:\project\Anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):

C:\project\Anaconda3\lib\multiprocessing\pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    383                         break
    384                     try:
--> 385                         put(task)
    386                     except Exception as e:
    387                         job, ind = task[:2]

C:\project\Anaconda3\lib\multiprocessing\connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

C:\project\lib\multiprocessing\reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: can't pickle File objects

I have absolutely no idea why it's trying to pickle a file object since when I print the types there's never a file object to be found. The only args I'm passing are self.processor_hdfs, which is just an apply(), and df_split which is a list of DataFrames.

Any help or tips on how to get this working is greatly appreciated. Even if I'm approaching this whole thing wrong, I'd love to know.

Comment From: jreback

you can't pass an open file object to child processes this way. You need to do the actual query in the subprocesses, opening & closing the file there. dask has an impl of distributed read_hdf5 that will allow this FYI.

Comment From: BrendanMartin

Thank you @jreback. But why is it an open file object? I thought each chunk was the actual dataframe?