Pickling issues with dask delayed

Hello, all. This is my first time posting to a coding forum of any kind, so I apologize in advance for any issues / inefficiencies. I’ve tried to reduce my code to the min reproducible example, but I’m sure there is room for improvement.

My objective is to use dask.delayed to multithread the following process: use scapy “sendrecv.sniff” to read pcap files and interpret their custom format, then take the pertinent information from the packets to populate a dask dataframe for further calculations.

I’m having issues with pickling within my CustomPacketHolder class. The first section of code below produces the expected output without issue:

import pandas as pd
from functools import partial
import dask.dataframe as dd
from dask.delayed import delayed
import dask
from dask.distributed import Client
from scapy.fields import PacketListField

class CustomPacket():
    a = 1
    b = 2

class CustomPacketHolder():
    header = 0
    others = CustomPacket()

class PicklablePacket:
    def __init__(self, pkt):
        self.contents = pkt.others
    
    def get_sub_packets(self):
        return self.contents.b + self.contents.a
    

def process(lists,packet):
    sub_packets = PicklablePacket(CustomPacketHolder()).get_sub_packets()
    lists.append(sub_packets)
    
def sniff(pcap_file,func):
    result = func(pcap_file)
    return result
    
@dask.delayed
def sniff_packets(pcap_file):
    lists = []
    sniff(pcap_file,partial(process,lists))
    df_pandas = pd.DataFrame(lists,columns=["num"])
    return df_pandas

pcap_paths = [[1,2,3],[4,5,6]]
client = Client()

dfs = [delayed(sniff_packets)(f) for f in pcap_paths]
df = dd.from_delayed(dfs,meta={"num":int}).compute()
print(df)

Output:

/home/sof1/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 41337 instead
  warnings.warn(
/usr/lib/python3/dist-packages/scipy/__init__.py:146: UserWarning: A NumPy version >=1.17.3 and <1.25.0 is required for this version of SciPy (detected version 1.26.4
  warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"
/usr/lib/python3/dist-packages/scipy/__init__.py:146: UserWarning: A NumPy version >=1.17.3 and <1.25.0 is required for this version of SciPy (detected version 1.26.4
  warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"
   num
0    3
0    3

Revising one line in CustomPacketHolder causes dask to tell me that the function is not picklable:

class CustomPacketHolder():
    header = 0
    #REVISED LINE:
    others = [PacketListField("data",[],CustomPacket)]

Output:

/home/sof1/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 39743 instead
  warnings.warn(
2024-07-12 14:29:08,720 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x70320c730e20>
 0. 123360790803648
>.
Traceback (most recent call last):
  File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_struct.Struct' object
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     62         try:
---> 63             result = pickle.dumps(x, **dump_kwargs)
     64         except Exception:

PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     67             buffers.clear()
---> 68             pickler.dump(x)
     69             result = f.getvalue()

PicklingError: Can't pickle <function sniff_packets at 0x7032442f7760>: it's not the same object as __main__.sniff_packets

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in serialize(x, serializers, on_error, context, iterate_collection)
    362         try:
--> 363             header, frames = dumps(x, context=context) if wants_context else dumps(x)
    364             header["serializer"] = name

~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in pickle_dumps(x, context)
     77 
---> 78     frames[0] = pickle.dumps(
     79         x,

~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     80             buffers.clear()
---> 81             result = cloudpickle.dumps(x, **dump_kwargs)
     82         except Exception:

~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol, buffer_callback)
   1478         cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479         cp.dump(obj)
   1480         return file.getvalue()

~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
   1244         try:
-> 1245             return super().dump(obj)
   1246         except RuntimeError as e:

TypeError: cannot pickle '_struct.Struct' object

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_19074/592240782.py in <module>
      3 
      4 dfs = [delayed(sniff_packets)(f) for f in pcap_paths]
----> 5 df = dd.from_delayed(dfs,meta={"num":int}).compute()
      6 print(df)

~/.local/lib/python3.10/site-packages/dask_expr/_collection.py in compute(self, fuse, **kwargs)
    474             out = out.repartition(npartitions=1)
    475         out = out.optimize(fuse=fuse)
--> 476         return DaskMethodsMixin.compute(out, **kwargs)
    477 
    478     def analyze(self, filename: str | None = None, format: str | None = None) -> None:

~/.local/lib/python3.10/site-packages/dask/base.py in compute(self, **kwargs)
    374         dask.compute
    375         """
--> 376         (result,) = compute(self, traverse=False, **kwargs)
    377         return result
    378 

~/.local/lib/python3.10/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    660 
    661     with shorten_traceback():
--> 662         results = schedule(dsk, keys, **kwargs)
    663 
    664     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in serialize(x, serializers, on_error, context, iterate_collection)
    387         except Exception:
    388             raise TypeError(msg) from exc
--> 389         raise TypeError(msg, str_x) from exc
    390     else:  # pragma: nocover
    391         raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x70320c730e20>\n 0. 123360790803648\n>')

I have attempted to confirm that my class is picklable, and I believe it is based on the below code output:

import pickle
    
#pickle:
with open("CustomPacketHolder.pickle", "wb") as file:
    pickle.dump(CustomPacketHolder, file)
    
#unpickle:
with open('CustomPacketHolder.pickle', 'rb') as f:
    data = pickle.load(f)

print(data)

Output:

<class '__main__.CustomPacketHolder'>

I don’t know where to go from here - if everything points to my class being picklable, why am I getting the pickling error? Any assistance on this issue would be so appreciated!

(Probably irrelevant) background: the CustomPacketHolder and CustomPacket classes do not use “self” by design, to comply with scapy’s requirements, hence the “PicklablePacket” class.

Hi @dasking_for_a_friend, welcome to Dask community!

Thanks for the detailed post, it’s perfect to start the conversation.

There is probably some pickling problem with the scapy pacakge as you were somehow guessing.

Could you try the following code:

#pickle:
with open("CustomPacketHolder.pickle", "wb") as file:
    pickle.dump(CustomPacketHolder(), file)

to be sure an instantiated object is pickable?

Thank you for your reply! The instance of the class appears to be picklable as well. Running this:

with open("CustomPacketHolder.pickle", "wb") as file:
    pickle.dump(CustomPacketHolder(), file)
    
#unpickle:
with open('CustomPacketHolder.pickle', 'rb') as f:
    data = pickle.load(f)

print(data)

Produced this:

<__main__.CustomPacketHolder object at 0x7aa291ae6f80>

EDIT: I have also run the below tests:

import pandas as pd
from functools import partial
import dask.dataframe as dd
from dask.delayed import delayed
import dask
from dask.distributed import Client
from scapy.fields import PacketListField

class CustomPacket():
    a = 1
    b = 2

class CustomPacketHolder():
    header = 0
    others = [PacketListField("data",[],CustomPacket)]

class PicklablePacket:
    def __init__(self, pkt):
        self.contents = pkt.others
    
    def get_sub_packets(self):
        return self.contents[0].name
    

def process(lists,packet):
    sub_packets = PicklablePacket(CustomPacketHolder()).get_sub_packets()
    lists.append(sub_packets)
    
def sniff(pcap_file,func):
    result = func(pcap_file)
    return result
    
def sniff_packets(pcap_file):
    lists = []
    sniff(pcap_file,partial(process,lists))
    df_pandas = pd.DataFrame(lists,columns=["num"])
    return df_pandas

pcap_paths = [[1,2,3],[4,5,6]]
sniff_packets(pcap_paths[0])

Produced the expected output:


num
0	data

I further checked that sniff_packets pickled using this code:

with open("sniff_packets.pickle", "wb") as file:
    pickle.dump(sniff_packets(pcap_paths[0]), file)
    
#unpickle:
with open('sniff_packets.pickle', 'rb') as f:
    data = pickle.load(f)

print(data)

Output:

    num
0  data

Adding the dask @delayed decorator and trying to pickle sniff_packets again:

@dask.delayed
def sniff_packets(pcap_file):
    lists = []
    sniff(pcap_file,partial(process,lists))
    df_pandas = pd.DataFrame(lists,columns=["num"])
    return df_pandas

with open("sniff_packets.pickle", "wb") as file:
    pickle.dump(sniff_packets(pcap_paths[0]), file)
    
#unpickle:
with open('sniff_packets.pickle', 'rb') as f:
    data = pickle.load(f)

print(data)

Leads to the original error:

---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
/tmp/ipykernel_4299/1700632070.py in <module>
      1 with open("sniff_packets.pickle", "wb") as file:
----> 2     pickle.dump(sniff_packets(pcap_paths[0]), file)
      3 
      4 #unpickle:
      5 with open('sniff_packets.pickle', 'rb') as f:

PicklingError: Can't pickle <function sniff_packets at 0x7aa110747010>: it's not the same object as __main__.sniff_packets

EDIT 2: Based on this I removed the decorator

import pandas as pd
from functools import partial
import dask.dataframe as dd
from dask.delayed import delayed
import dask
from dask.distributed import Client
from scapy.fields import PacketListField

class CustomPacket():
    a = 1
    b = 2

class CustomPacketHolder():
    header = 0
    others = [PacketListField("data",[],CustomPacket)]

class PicklablePacket:
    def __init__(self, pkt):
        self.contents = pkt.others
    
    def get_sub_packets(self):
        return self.contents[0].name
    
def process(lists,packet):
    sub_packets = PicklablePacket(CustomPacketHolder()).get_sub_packets()
    lists.append(sub_packets)
    
def sniff(pcap_file,func):
    result = func(pcap_file)
    return result
    
def sniff_packets(pcap_file):
    lists = []
    sniff(pcap_file,partial(process,lists))
    df_pandas = pd.DataFrame(lists,columns=["num"])
    return df_pandas

pcap_paths = [[1,2,3],[4,5,6]]
client = Client()

dfs = [delayed(sniff_packets)(f) for f in pcap_paths]
df = dd.from_delayed(dfs,meta={"num":str}).compute()
print(df)

Which changes the error to:

/home/sof1/.local/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 33939 instead
  warnings.warn(
2024-07-15 10:12:54,909 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7a9ea6de1db0>
 0. 134821823055488
>.
Traceback (most recent call last):
  File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_struct.Struct' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/sof1/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/sof1/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_struct.Struct' object
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     76                 buffers.clear()
---> 77                 result = cloudpickle.dumps(x, **dump_kwargs)
     78     except Exception:

~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol, buffer_callback)
   1478         cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479         cp.dump(obj)
   1480         return file.getvalue()

~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
   1244         try:
-> 1245             return super().dump(obj)
   1246         except RuntimeError as e:

TypeError: cannot pickle '_struct.Struct' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in serialize(x, serializers, on_error, context, iterate_collection)
    362         try:
--> 363             header, frames = dumps(x, context=context) if wants_context else dumps(x)
    364             header["serializer"] = name

~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in pickle_dumps(x, context)
     77 
---> 78     frames[0] = pickle.dumps(
     79         x,

~/.local/lib/python3.10/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     80             buffers.clear()
---> 81             result = cloudpickle.dumps(x, **dump_kwargs)
     82         except Exception:

~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol, buffer_callback)
   1478         cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479         cp.dump(obj)
   1480         return file.getvalue()

~/.local/lib/python3.10/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
   1244         try:
-> 1245             return super().dump(obj)
   1246         except RuntimeError as e:

TypeError: cannot pickle '_struct.Struct' object

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_4299/112649421.py in <module>
      3 
      4 dfs = [delayed(sniff_packets)(f) for f in pcap_paths]
----> 5 df = dd.from_delayed(dfs,meta={"num":str}).compute()
      6 print(df)

~/.local/lib/python3.10/site-packages/dask_expr/_collection.py in compute(self, fuse, **kwargs)
    474             out = out.repartition(npartitions=1)
    475         out = out.optimize(fuse=fuse)
--> 476         return DaskMethodsMixin.compute(out, **kwargs)
    477 
    478     def analyze(self, filename: str | None = None, format: str | None = None) -> None:

~/.local/lib/python3.10/site-packages/dask/base.py in compute(self, **kwargs)
    374         dask.compute
    375         """
--> 376         (result,) = compute(self, traverse=False, **kwargs)
    377         return result
    378 

~/.local/lib/python3.10/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    660 
    661     with shorten_traceback():
--> 662         results = schedule(dsk, keys, **kwargs)
    663 
    664     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

~/.local/lib/python3.10/site-packages/distributed/protocol/serialize.py in serialize(x, serializers, on_error, context, iterate_collection)
    387         except Exception:
    388             raise TypeError(msg) from exc
--> 389         raise TypeError(msg, str_x) from exc
    390     else:  # pragma: nocover
    391         raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7a9ea6de1db0>\n 0. 134821823055488\n>')

EDIT 3: Assuming it isn’t a problem to remove the decorator (and multithreading will still work), I now have to figure out the source of the “struct.Struct” error.

class CustomPacket():
    a = 1
    b = 2

experiment = PacketListField("data",[],CustomPacket)

#pickle:
with open("experiment.pickle", "wb") as file:
    pickle.dump(experiment, file)
    
#unpickle:
with open('experiment.pickle', 'rb') as f:
    data = pickle.load(f)

print(data)

Is not picklable:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_4299/490038758.py in <module>
      1 #pickle:
      2 with open("experiment.pickle", "wb") as file:
----> 3     pickle.dump(experiment, file)
      4 
      5 #unpickle:

TypeError: cannot pickle '_struct.Struct' object

Creating a picklable wrapper does not help:

class CustomPacket():
    a = 1
    b = 2
    
class PicklableCustomPacket():
    def __init__(self,packet):
        self.packet = packet

experiment = PacketListField("data",[],PicklableCustomPacket(CustomPacket))

#pickle:
with open("experiment.pickle", "wb") as file:
    pickle.dump(experiment, file)
    
#unpickle:
with open('experiment.pickle', 'rb') as f:
    data = pickle.load(f)

print(data)

Output:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_4299/490038758.py in <module>
      1 #pickle:
      2 with open("experiment.pickle", "wb") as file:
----> 3     pickle.dump(experiment, file)
      4 
      5 #unpickle:

TypeError: cannot pickle '_struct.Struct' object

So, my new question is - do I have to use the @dask.delayed decorator? If not, then my only remaining problem is to figure out an alternative to my current class structure such that I can pickle my custom packet, which may involve using the scapy library differently or not using it at all.

Thanks for all these updates!

You don’t have to use the Delayed decorator, you can use it as a function too.

So yes, you need to resolve this pickling issue.