Python Multiprocessing geoprocessing sys.stdout flush error

3418
5
12-04-2013 03:03 PM
NathanHeick
Occasional Contributor
I am trying to run a geoprocessing script tool in a multiprocessing.Pool.  Initially, I had problems with pickling my tool, which was imported from a custom toolbox.  Then, I defined a top-level function that executed the call.  My tool runs and I am even seeing a message from the tool, but then I get the following error:

Process PoolWorker-1:
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\ArcGIS10.1\lib\multiprocessing\forking.py", line 379, in main
    exitcode = self._bootstrap()
  File "C:\Python27\ArcGIS10.1\lib\multiprocessing\process.py", line 275, in _bootstrap
    sys.stderr.flush()
AttributeError: 'geoprocessing sys.stdout object' object has no attribute 'flush'

After that, the program hangs.  Looking at the multiprocessing package, it looks a SystemExit error is raised while or after the tool is run.  Any ideas?
Tags (2)
0 Kudos
5 Replies
NathanHeick
Occasional Contributor
As I'm troubleshooting, I'll keep track of my findings here, in case it helps anyone later on.  I believe that sys.stderr is being redirected to a custom geoprocessing stderr object somewhere in arcpy which I couldn't locate.  Also, line 275 above does not refer to a SystemExit exception, but any other exception.  I suspect I am not giving the arguments for the apply_async call properly.  I think the issue is avoidable, but I don't think the custom stderr object from arcpy has a flush method like the normal stderr.
0 Kudos
NathanHeick
Occasional Contributor
After a lot of review of the multiprocessing module code, I believe the worker process in the process pool is raising an exception when it tries to put the arcpy.Result object on outqueue.  Trying to run the geoprocessing tool manually and pickling the result raised the following PicklingError.

PicklingError: Can't pickle 'geoprocessing server result object' object: <geoprocessing server result object object at 0x13DC4698>

I guess the moral of the story is that I really need to understand pickling as it relates to arcpy.
0 Kudos
NathanHeick
Occasional Contributor
After reading more about pickling, it appears that the function result from Pool.apply_async needs to be defined in the top-level of the main module.  Adding the following solved my pickling problem for the results of the geoprocessing tool:

from arcpy import Result
0 Kudos
StacyRendall1
Occasional Contributor III
Glad you found the answer.

Your initial post would suggest to me that the message from the tool was the problem at that point in time. Multiprocessed workers can't really write output in the normal way. What I typically do is pass a string along with the results that contains any useful information.
0 Kudos
NathanHeick
Occasional Contributor
Actually, after further research, I believe the return value from Pool.apply_async needs to be an object defined at the top-level of any module.  In the end, I decided to write my own Result class that stores the information available through the arcpy.Result methods and lets you retrieve them with an identical interface.  It is picklable if all the inputs and outputs are picklable.  For my case, I'm mostly using datasets represented as path strings.

class Result(object):

    def __init__(self, status, inputs, outputs, messages):
        """Initializes the result.

        ARGUMENT            DESCRIPTION
        --------            -----------
        status              The job status.
        inputs              The inputs to the job.
        outputs             The outputs of the job.
        messages            The messages of the job represented as a list of
                            tuples whose values are (severity, message).
        
        STATUS              DESCRIPTION
        ------              -----------
        0                   New
        1                   Submitted
        2                   Waiting
        3                   Executing
        4                   Succeeded
        5                   Failed
        6                   Timed Out
        7                   Cancelling
        8                   Cancelled
        9                   Deleting
        10                  Deleted
        
        SEVERITY            DESCRIPTION
        --------            -----------
        0                   Informational Message
        1                   Warning Message
        2                   Error Message
        """

        self._status = status
        """The job status.

        STATUS              DESCRIPTION
        ------              -----------
        0                   New
        1                   Submitted
        2                   Waiting
        3                   Executing
        4                   Succeeded
        5                   Failed
        6                   Timed Out
        7                   Cancelling
        8                   Cancelled
        9                   Deleting
        10                  Deleted
        """

        self._inputs = inputs
        """The inputs to the job."""

        self._outputs = outputs
        """The outputs of the job."""

        self._messages = messages
        """The messages of the job represented as a list of tuples whose values
        are (severity, message).
        
        SEVERITY            DESCRIPTION
        --------            -----------
        0                   Informational Message
        1                   Warning Message
        2                   Error Message
        """

    @classmethod
    def from_arcpy_result(cls, result):
        """Creates a Result from an arcpy.Result instance.

        ARGUMENT            DESCRIPTION
        --------            -----------
        result              An instance of arcpy.Result.
        """

        assert isinstance(result, arcpy.Result), (
            "Result %r is not an instance of arcpy.Result." % result)

        status = result.status
        inputs = [result.getInput(i) for i in range(result.inputCount)]
        outputs = [result.getOutput(i) for i in range(result.outputCount)]
        messages = []
        for i in range(result.messageCount):
            messages.append((result.getSeverity(i), result.getMessage(i)))

        return cls(status, inputs, outputs, messages)

    @property
    def inputCount(self):
        """The number of inputs of the job."""

        return len(self._inputs)

    @property
    def maxSeverity(self):
        """The maximum severity of the messages.
        
        SEVERITY            DESCRIPTION
        --------            -----------
        0                   Informational Message
        1                   Warning Message
        2                   Error Message
        """

        return max([severity for severity, message in self._messages])

    @property
    def messageCount(self):
        """The number of messages."""

        return len(self._messages)

    @property
    def outputCount(self):
        """The number of outputs."""

        return len(self._outputs)

    @property
    def resultID(self):
        """The job ID if the tool is a geoprocessing service, else ""."""

        return self._resultID

    @property
    def status(self):
        """The job status.

        STATUS              DESCRIPTION
        ------              -----------
        0                   New
        1                   Submitted
        2                   Waiting
        3                   Executing
        4                   Succeeded
        5                   Failed
        6                   Timed Out
        7                   Cancelling
        8                   Cancelled
        9                   Deleting
        10                  Deleted
        """

        return self._status

    def cancel(self):
        """Cancels the job."""

        pass

    def getInput(self, index):
        """Returns the input at the given index, either as a record set or a
        string.

        ARGUMENT            DESCRIPTION
        --------            -----------
        index               Index of the input to return.
        """

        return self._inputs[index]

    def getMapImageURL(self, *args, **kwargs):
        """Returns a map service image for a given output, if one exists."""

        raise NotImplementedError

    def getMessage(self, index):
        """Returns the message at the given index.

        ARGUMENT            DESCRIPTION
        --------            -----------
        index               Index of the message to return.
        """

        return self._messages[index]

    def getMessages(self, severity=0):
        """Returns messages of the given severity or greater.

        SEVERITY            DESCRIPTION
        --------            -----------
        0                   Informational Message
        1                   Warning Message
        2                   Error Message
        """

        messages = []
        for s, m in self._messages:
            if s >= severity:
                messages.append(m)

        return "\n".join(messages)

    def getOutput(self, index):
        """Returns the output at the given index, either as a record set or a
        string.

        ARGUMENT            DESCRIPTION
        --------            -----------
        index               Index of the output to return.
        """

        return self._outputs[index]

    def getSeverity(self, index):
        """"Returns the severity of the message at the given index.

        SEVERITY            DESCRIPTION
        --------            -----------
        0                   Informational Message
        1                   Warning Message
        2                   Error Message
        """

        return self._messages[index][0]