Hello, all. This is my first time posting to a coding forum of any kind, so I apologize in advance for any issues / inefficiencies. I’ve tried to reduce my code to the min reproducible example, but I’m sure there is room for improvement.
My objective is to use dask.delayed to multithread the following process: use scapy “sendrecv.sniff” to read pcap files and interpret their custom format, then take the pertinent information from the packets to populate a dask dataframe for further calculations.
I’m having issues with pickling within my CustomPacketHolder class. The first section of code below produces the expected output without issue:
import pandas as pd
from functools import partial
import dask.dataframe as dd
from dask.delayed import delayed
import dask
from dask.distributed import Client
from scapy.fields import PacketListField
class CustomPacket():
a = 1
b = 2
class CustomPacketHolder():
header = 0
others = CustomPacket()
class PicklablePacket:
def __init__(self, pkt):
self.contents = pkt.others
def get_sub_packets(self):
return self.contents.b + self.contents.a
def process(lists,packet):
sub_packets = PicklablePacket(CustomPacketHolder()).get_sub_packets()
def sniff(pcap_file,func):
result = func(pcap_file)
return result
def sniff_packets(pcap_file):
lists = []
df_pandas = pd.DataFrame(lists,columns=["num"])
return df_pandas
pcap_paths = [[1,2,3],[4,5,6]]
client = Client()
dfs = [delayed(sniff_packets)(f) for f in pcap_paths]
df = dd.from_delayed(dfs,meta={"num":int}).compute()
/home/sof1/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 41337 instead
/usr/lib/python3/dist-packages/scipy/__init__.py:146: UserWarning: A NumPy version >=1.17.3 and <1.25.0 is required for this version of SciPy (detected version 1.26.4
warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"
/usr/lib/python3/dist-packages/scipy/__init__.py:146: UserWarning: A NumPy version >=1.17.3 and <1.25.0 is required for this version of SciPy (detected version 1.26.4
warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"
0 3
0 3
Revising one line in CustomPacketHolder causes dask to tell me that the function is not picklable:
class CustomPacketHolder():
header = 0
others = [PacketListField("data",[],CustomPacket)]
/home/sof1/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 39743 instead
2024-07-12 14:29:08,720 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x70320c730e20>
0. 123360790803648
Traceback (most recent call last):
File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
_pickle.PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
TypeError: cannot pickle '_struct.Struct' object
PicklingError Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
62 try:
---> 63 result = pickle.dumps(x, **dump_kwargs)
64 except Exception:
PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
67 buffers.clear()
---> 68 pickler.dump(x)
69 result = f.getvalue()
PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in serialize(x, serializers, on_error, context, iterate_collection)
362 try:
--> 363 header, frames = dumps(x, context=context) if wants_context else dumps(x)
364 header["serializer"] = name
~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in pickle_dumps(x, context)
---> 78 frames[0] = pickle.dumps(
79 x,
~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
80 buffers.clear()
---> 81 result = cloudpickle.dumps(x, **dump_kwargs)
82 except Exception:
~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol, buffer_callback)
1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
1480 return file.getvalue()
~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
1244 try:
-> 1245 return super().dump(obj)
1246 except RuntimeError as e:
TypeError: cannot pickle '_struct.Struct' object
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
/tmp/ipykernel_19074/592240782.py in <module>
4 dfs = [delayed(sniff_packets)(f) for f in pcap_paths]
----> 5 df = dd.from_delayed(dfs,meta={"num":int}).compute()
6 print(df)
~/.local/lib/python3.10/site-packages/dask_expr/_collection.py in compute(self, fuse, **kwargs)
474 out = out.repartition(npartitions=1)
475 out = out.optimize(fuse=fuse)
--> 476 return DaskMethodsMixin.compute(out, **kwargs)
478 def analyze(self, filename: str | None = None, format: str | None = None) -> None:
~/.local/lib/python3.10/site-packages/dask/base.py in compute(self, **kwargs)
374 dask.compute
375 """
--> 376 (result,) = compute(self, traverse=False, **kwargs)
377 return result
~/.local/lib/python3.10/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
661 with shorten_traceback():
--> 662 results = schedule(dsk, keys, **kwargs)
664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in serialize(x, serializers, on_error, context, iterate_collection)
387 except Exception:
388 raise TypeError(msg) from exc
--> 389 raise TypeError(msg, str_x) from exc
390 else: # pragma: nocover
391 raise ValueError(f"{on_error=}; expected 'message' or 'raise'")
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x70320c730e20>\n 0. 123360790803648\n>')
I have attempted to confirm that my class is picklable, and I believe it is based on the below code output:
import pickle
with open("CustomPacketHolder.pickle", "wb") as file:
pickle.dump(CustomPacketHolder, file)
with open('CustomPacketHolder.pickle', 'rb') as f:
data = pickle.load(f)
<class '__main__.CustomPacketHolder'>
I don’t know where to go from here - if everything points to my class being picklable, why am I getting the pickling error? Any assistance on this issue would be so appreciated!
(Probably irrelevant) background: the CustomPacketHolder and CustomPacket classes do not use “self” by design, to comply with scapy’s requirements, hence the “PicklablePacket” class.