Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running async code inside do_exchange #44132

Open
pvardanis opened this issue Sep 16, 2024 · 0 comments
Open

Running async code inside do_exchange #44132

pvardanis opened this issue Sep 16, 2024 · 0 comments

Comments

@pvardanis
Copy link

pvardanis commented Sep 16, 2024

Describe the usage question you have. Please include as many useful details as possible.

I have a do_exchange() method that needs to interact with async code that's an external dependency where I don't have any control of it whatsoever. Unfortunately, my understanding around concurrency/multi-threading isn't the best. I do know though that mixing multi-threaded with async code isn't a recommended approach, if there's any alternative please let me know.

I'm creating a new event loop inside the do_exchange method, so when multiple requests arrive the server spawns up multiple threads as expected, and each thread should (?) have its own event loop. Whenever the external async code is called, I'm using an async lock to make sure no multiple threads access it simultaneously since it has stateful variables that I don't want to change at the same time (and it's not thread-safe anyways).

def do_exchange(
        self,
        context: flight.ServerCallContext,
        descriptor: flight.FlightDescriptor,
        reader: flight.FlightStreamReader,
        writer: flight.FlightStreamWriter,
    ) -> None:
        """This method implements the `do_exchange` method of the FlightServerBase
        class.

        :param context: A ServerCallContext object.
        :param descriptor: A FlightDescriptor object.
        :param reader: A FlightStreamReader object.
        :param writer: A FlightStreamWriter object.
        """
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        is_first_batch = True
        while True:
            logger.info("Processing data...")
            (writer, reader, is_first_batch) = loop.run_until_complete(
                self._run_inference_and_write_to_stream(writer, reader, is_first_batch),
            )
            logger.info("Output data ready to be consumed.")

however when two client requests arrive at the same time I'm getting:

pyarrow._flight.FlightServerError: Task <Task pending name='Task-2' coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at /Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157> cb=[_run_until_complete_cb() at /Users/panagiotisvardanis/.pyenv/versions/3.8.19/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop. Detail: Python exception: RuntimeError. gRPC client debug context: UNKNOWN:Error received from peer ipv4:0.0.0.0:8080 {created_time:"2024-09-16T15:44:17.886048+02:00", grpc_status:2, grpc_message:"Task <Task pending name=\'Task-2\' coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at /Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157> cb=[_run_until_complete_cb() at /Users/panagiotisvardanis/.pyenv/versions/3.8.19/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop. Detail: Python exception: RuntimeError"}. Client context: OK

the "rough" async code looks like this:

async def process_data(input_data):
    logger.info("Sleeping for 1 second...")
    await asyncio.sleep(1)
    logger.info("Done sleeping.")
    return {"output": np.array([4, 5, 6])}

I do see in the logs that two threads are spawned indeed, but when they both reach the external async code the above error is called.

-------------------------------- live log call ---------------------------------
INFO     Processing data...
INFO     Processing data...
INFO     Is `PythonStep` awaitable? True
INFO     Is `PythonStep` awaitable? True
INFO     Acquiring lock...
INFO     Acquiring lock...
INFO     Sleeping for 1 second...                                                        
INFO     Sleeping for 1 second...
FAILED                                                                   [100%]
------------------------------ live log teardown -------------------------------
INFO     Done sleeping.
INFO     Releasing lock...
INFO     Converting `InferenceData` to `pa.RecordBatch`...

If I remove the async lock from process_data(), it waits forever.

Component(s)

FlightRPC, Python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant