I'm trying to parallelize text processing on a large DataFrame stored in an HDFS, but when I try to map the chunk from the select()
iterator, I get the error below.
def parallelize_hdfs(self):
cores = cpu_count()
pool = Pool(cores)
self.text_object = pd.DataFrame()
chunk_size = self.hdf_store.get_storer(self.hdf_table).nrows // 1000
hdfs_iterator = self.hdf_store.select(self.hdf_table,
start = 0,
stop = 50,
columns=['text'],
chunksize=chunk_size)
for chunk in hdfs_iterator:
df_split = np.array_split(chunk, cores)
self.text_object.append(
pd.concat(
pool.map(self.processor_hdfs, df_split)
)
)
C:\project\base_processor.py in process_hdfs(self, hdfs_file, table_name, text_col_name, nrows)
42 self.hdf_nrows = nrows
43 start = time.time()
---> 44 self.parallelize_hdfs()
45 end = time.time()
46 logger.info("Elapsed time: %2.2f sec" % (end - start))
C:\project\base_processor.py in parallelize_hdfs(self)
212 self.text_object.append(
213 pd.concat(
--> 214 pool.map(self.processor_hdfs, df_split)
215 )
216 )
C:\project\Anaconda3\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
C:\project\Anaconda3\lib\multiprocessing\pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
C:\project\Anaconda3\lib\multiprocessing\pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
383 break
384 try:
--> 385 put(task)
386 except Exception as e:
387 job, ind = task[:2]
C:\project\Anaconda3\lib\multiprocessing\connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
C:\project\lib\multiprocessing\reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: can't pickle File objects
I have absolutely no idea why it's trying to pickle a file object since when I print the types there's never a file object to be found. The only args I'm passing are self.processor_hdfs
, which is just an apply()
, and df_split
which is a list of DataFrames.
Any help or tips on how to get this working is greatly appreciated. Even if I'm approaching this whole thing wrong, I'd love to know.
Comment From: jreback
you can't pass an open file object to child processes this way. You need to do the actual query in the subprocesses, opening & closing the file there. dask has an impl of distributed read_hdf5
that will allow this FYI.
Comment From: BrendanMartin
Thank you @jreback. But why is it an open file object? I thought each chunk was the actual dataframe?