Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Streaming support #195

Merged
merged 1 commit into from
Mar 28, 2023
Merged

Streaming support #195

merged 1 commit into from
Mar 28, 2023

Conversation

n3bul4
Copy link
Contributor

@n3bul4 n3bul4 commented Mar 17, 2023

Utilize retrofit2.http.Streaming and retrofit2.Call in additional OpenAIApi methods to enable a streamable ResponseBody.

Utilize retrofit2.Callback to get the streamable ResponseBody, parse Server Sent Events (SSE) and emit them using io.reactivex.FlowableEmitter.

Enable:

  • Streaming of raw bytes
  • Streaming of Java objects
  • Shutdown of OkHttp ExecutorService

Fixes: #51, #83, #182, #184

Utilize retrofit2.http.Streaming and retrofit2.Call<ResponseBody>
in additional OpenAIApi methods to enable a streamable ResponseBody.

Utilize retrofit2.Callback to get the streamable ResponseBody,
parse Server Sent Events (SSE) and emit them using
io.reactivex.FlowableEmitter.

Enable:

- Streaming of raw bytes
- Streaming of Java objects
- Shutdown of OkHttp ExecutorService

Fixes: TheoKanning#51, TheoKanning#83, TheoKanning#182, TheoKanning#184
@kubecutle
Copy link

Looking forward to release.

@Ruanandxian
Copy link

有办法保证向前端推流的稳定性吗,向前端推流的过程总是有卡顿感

@n3bul4
Copy link
Contributor Author

n3bul4 commented Mar 21, 2023

@Ruanandxian
Hello, unfortunately I don't understand Chinese, but Google Translate gives me the following:

"Is there a way to ensure the stability of pushing the stream to the front end, the process of pushing the stream to the front end always has a sense of lag ."

I'm not sure if I understand you correctly, but it may be that you don't flush the OutputStream. Note that ServletOutputStreams and the like buffer bytes written to it and don't write them out immediately. To write the received data immediately you must flush the output stream after each write call.

Something like this could help:

service  
   .streamChatCompletionBytes(request)
   .doOnError(e -> {
       e.printStackTrace();
    })
    .blockingForEach(bytes -> {
       response.write(bytes);
       response.flush(); //flush to write out immediately
    });

Otherwise please provide some more information. Some code examples could help to understand what exactly you would like to achieve.

@DOOB-B
Copy link

DOOB-B commented Mar 22, 2023

Thank you for providing the code that allows me to output results in stream mode in the project

@n3bul4
Copy link
Contributor Author

n3bul4 commented Mar 22, 2023

Thank you for providing the code that allows me to output results in stream mode in the project

You're welcome! I'm glad to hear that the code I provided was useful for you in your project.

@TheoKanning TheoKanning merged commit 7dc5b5b into TheoKanning:main Mar 28, 2023
@TheoKanning
Copy link
Owner

Thank you! I'm going to add some tests to this and clean up a few things, but it'll be in the next release

@Mrblw
Copy link
Contributor

Mrblw commented Mar 28, 2023

I use spring-boot-starter-webflux to recieve the stream, then to extract the response text and use Flux to renturn for Interface caller,but I feel the response is not async。 Is some thing wrong?
there is my code
webClient.post() .uri("https://api.openai.com/v1/chat/completions") .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.TEXT_EVENT_STREAM) .header(HttpHeaders.CONNECTION, "keep-alive") .header("Authorization", String.format("Bearer %s", properties.getToken())) .bodyValue(body) .retrieve() .bodyToFlux(String.class).map(response -> buildResult(response, composeResult, request))

@cryptoapebot
Copy link

If you look at @n3bul4 comment above, he was having issues w/ flushing the stream?

@n3bul4
Copy link
Contributor Author

n3bul4 commented Mar 28, 2023

Hey @Mrblw,

I haven't worked with webflux yet but I am pretty sure

.bodyValue(body)
.retrieve() 
.bodyToFlux(String.class)

will read the whole response body at once into a single String instance. If I am wrong with my assumption and you get multiple chunks of the response body as Strings it could be, that you do not flush each chunk after retrieval as @cryptoapebot has already stated.

@phazei
Copy link

phazei commented Mar 28, 2023

Could someone please provide an example of how to utilize this?

@cryptoapebot
Copy link

Could someone please provide an example of how to utilize this?

https://github.com/TheoKanning/openai-java/blob/main/example/src/main/java/example/OpenAiApiStreamExample.java

You might have to add:
import java.time.Duration;

And when you create the service.

OpenAiService service = new OpenAiService(token, Duration.ofSeconds(35));

If you are using gradle, then in the top level directory just run:
./gradlew example:run

Also make sure you have OPENAI_TOKEN token set in your environment to your openAI license key.

@TrautmannP
Copy link

I haven't tried it yet, but you can find here some examples of how to normally handle streaming responses (Going to test when the version gets released): https://www.baeldung.com/spring-mvc-sse-streams

Hope this helps :-)

And thanks for the PR! 👍 🥇

@n3bul4
Copy link
Contributor Author

n3bul4 commented Mar 31, 2023

@phazei
Another way to use this in spring looks like this:

@GetMapping("/")
@Streaming
public ResponseEntity<StreamingResponseBody> stream() {
  StreamingResponseBody responseBody = response -> {
	ChatCompletionRequest request = ChatCompletionRequest.builder()
		.messages(messages)
		.stream(true)
		.model("gpt-3.5-turbo")
		.maxTokens(MAX_TOKENS)
		.temperature(1.0)
		.frequencyPenalty(0.3)
		.presencePenalty(0.3)
		.build();

	service
		.streamChatCompletionBytes(request)
		.doOnError(e -> {
			e.printStackTrace();
		})
		.doOnComplete(new Action() {
			@Override
			public void run() throws Exception {
				//do something on completion
			}
		})
		.blockingForEach(bytes -> {
			response.write(bytes);
			response.flush(); //immediately write out buffered bytes
		});
  };

  return ResponseEntity.ok()
           .contentType(MediaType.TEXT_EVENT_STREAM)
           .body(responseBody);
}

@an9xyz
Copy link

an9xyz commented Apr 25, 2023

@phazei Another way to use this in spring looks like this:

@GetMapping("/")
@Streaming
public ResponseEntity<StreamingResponseBody> stream() {
  StreamingResponseBody responseBody = response -> {
	ChatCompletionRequest request = ChatCompletionRequest.builder()
		.messages(messages)
		.stream(true)
		.model("gpt-3.5-turbo")
		.maxTokens(MAX_TOKENS)
		.temperature(1.0)
		.frequencyPenalty(0.3)
		.presencePenalty(0.3)
		.build();

	service
		.streamChatCompletionBytes(request)
		.doOnError(e -> {
			e.printStackTrace();
		})
		.doOnComplete(new Action() {
			@Override
			public void run() throws Exception {
				//do something on completion
			}
		})
		.blockingForEach(bytes -> {
			response.write(bytes);
			response.flush(); //immediately write out buffered bytes
		});
  };

  return ResponseEntity.ok()
           .contentType(MediaType.TEXT_EVENT_STREAM)
           .body(responseBody);
}

@n3bul4 hi, I used this code, but there is a difference compared to directly calling OpenAI. The first data response time from OpenAI is about 3 seconds, while using the above code, the first response time is about 17 seconds. I am confused about why there is such a difference.

@n3bul4
Copy link
Contributor Author

n3bul4 commented Apr 25, 2023

@phazei Another way to use this in spring looks like this:

@GetMapping("/")
@Streaming
public ResponseEntity<StreamingResponseBody> stream() {
  StreamingResponseBody responseBody = response -> {
	ChatCompletionRequest request = ChatCompletionRequest.builder()
		.messages(messages)
		.stream(true)
		.model("gpt-3.5-turbo")
		.maxTokens(MAX_TOKENS)
		.temperature(1.0)
		.frequencyPenalty(0.3)
		.presencePenalty(0.3)
		.build();

	service
		.streamChatCompletionBytes(request)
		.doOnError(e -> {
			e.printStackTrace();
		})
		.doOnComplete(new Action() {
			@Override
			public void run() throws Exception {
				//do something on completion
			}
		})
		.blockingForEach(bytes -> {
			response.write(bytes);
			response.flush(); //immediately write out buffered bytes
		});
  };

  return ResponseEntity.ok()
           .contentType(MediaType.TEXT_EVENT_STREAM)
           .body(responseBody);
}

@n3bul4 hi, I used this code, but there is a difference compared to directly calling OpenAI. The first data response time from OpenAI is about 3 seconds, while using the above code, the first response time is about 17 seconds. I am confused about why there is such a difference.

@an9xyz hi, what do you mean by "directly calling OpenAI"? The code is actually directly calling the streaming part of the OpenAI API. I am using about the same code in a project and have not encountered any issues with abnormal delays. Notice, that sometimes OpenAI API is overloaded (especially when using a trial account) and response times can vary strongly on peek times but it should not be related to above code.

@an9xyz
Copy link

an9xyz commented Apr 25, 2023

My point is to use the OpenAI example -> 3. How much time is saved by streaming a chat completion(Link)
An example of using the Python library requests is as follows:

Message received 2.10 seconds after request: {
  "role": "assistant"
}
Message received 2.10 seconds after request: {
  "content": "\n\n"
}
Message received 2.10 seconds after request: {
  "content": "1"
}
Message received 2.11 seconds after request: {
  "content": ","
}
....

Modify openai.api_base=http://127.0.0.1:10008/stream to point to the Java server endpoint.
result:

Message received 10.10 seconds after request: {
  "role": "assistant"
}
Message received 10.10 seconds after request: {
  "content": "\n\n"
}
Message received 10.10 seconds after request: {
  "content": "1"
}
Message received 10.11 seconds after request: {
  "content": ","
}
....

@n3bul4 I noticed that your code uses @GetMapping("/"), should I be using Post instead?
Thank you for your reply.

@cryptoapebot
Copy link

Just a note, I don't think streaming is mean to be a time saving feature. It can start delivering partial results to a user quicker for better UX so there is less wait for first responses, but overall, any request will take longer overall. So it's meant to be a usability tradeoff.

@n3bul4
Copy link
Contributor Author

n3bul4 commented Apr 25, 2023

My point is to use the OpenAI example -> 3. How much time is saved by streaming a chat completion(Link) An example of using the Python library requests is as follows:

Message received 2.10 seconds after request: {
  "role": "assistant"
}
Message received 2.10 seconds after request: {
  "content": "\n\n"
}
Message received 2.10 seconds after request: {
  "content": "1"
}
Message received 2.11 seconds after request: {
  "content": ","
}
....

Modify openai.api_base=http://127.0.0.1:10008/stream to point to the Java server endpoint. result:

Message received 10.10 seconds after request: {
  "role": "assistant"
}
Message received 10.10 seconds after request: {
  "content": "\n\n"
}
Message received 10.10 seconds after request: {
  "content": "1"
}
Message received 10.11 seconds after request: {
  "content": ","
}
....

@n3bul4 I noticed that your code uses @GetMapping("/"), should I be using Post instead? Thank you for your reply.

@an9xyz The example code I provided is using GetMapping annotation, because the EventSource Browser API only supports GET requests and I am using the servlet endpoint with javascript EventSource.

You actually must not use POST for the example servlet, because the servlet is annotated as GetMapping. What I find a bit strange is, that you should actually see a spring error if you are POSTing to a GetMapping as far as there is no PostMapping annotation for the same path (i.e. "/").

What is happening if you just simply enter the URL (http://127.0.0.1:10008/stream) into the browser? Do you experience any delays? This test would at least perform an HTTP GET request.

I think the problem here is, that you are POSTing to a GET mapping (openai-python uses POST) although I wonder why spring is not complaining about it.

I am not sure what exactly you would like to achieve. If you want to use an EventSource with javascript to read the response than the code I provided is one way to go. In this case you should not test the servlet with the openai-python library, because it is using POST.

I you don't have to use EventSource with javascript you can change the GetMapping annotation to PostMapping. I would try it out and look at the results.

I hope this helps.

@an9xyz
Copy link

an9xyz commented Apr 25, 2023

@an9xyz The example code I provided is using GetMapping annotation, because the EventSource Browser API only supports GET requests and I am using the servlet endpoint with javascript EventSource.

You actually must not use POST for the example servlet, because the servlet is annotated as GetMapping. What I find a bit strange is, that you should actually see a spring error if you are POSTing to a GetMapping as far as there is no PostMapping annotation for the same path (i.e. "/").

What is happening if you just simply enter the URL (http://127.0.0.1:10008/stream) into the browser? Do you experience any delays? This test would at least perform an HTTP GET request.

I think the problem here is, that you are POSTing to a GET mapping (openai-python uses POST) although I wonder why spring is not complaining about it.

I am not sure what exactly you would like to achieve. If you want to use an EventSource with javascript to read the response than the code I provided is one way to go. In this case you should not test the servlet with the openai-python library, because it is using POST.

I you don't have to use EventSource with javascript you can change the GetMapping annotation to PostMapping. I would try it out and look at the results.

I hope this helps.

Sorry, I didn't explain clearly. Your response was really helpful 👍. @n3bul4
Actually, what I meant to express was how to achieve the stream effect of OpenAI API without using EventSource.
Thanks again.

@n3bul4
Copy link
Contributor Author

n3bul4 commented Apr 25, 2023

@an9xyz you can use the example code to achieve streaming. EventSource is the javascript way of handling content-type text/event-stream and it requires GET to work. It is basically up to you which HTTP method you use or which content-type you choose. I would say it depends on the use case.

If your goal is something like ChatGPT use the code I provided, as ChatGPT is using EventSource.

Otherwise please try to describe what you would like to achieve. Where should the data of the stream go? If you just want to put it somewhere into a database or file I wouldn't use streaming at all.

@an9xyz
Copy link

an9xyz commented Apr 26, 2023

@n3bul4 what I want to achieve is to encapsulate the OpenAI interface and provide a public service internally. Your example code works well for local debugging with streaming response, but when I deploy it to the test environment, the request is blocked and there is no streaming response effect, and the result is like returning all data as with a regular API request.
I don't know where the blocking occurs. We are not considering ChatGPT Web here, only the server-side.

UPDATE: It is likely a configuration problem with Nginx, and I am still trying to solve it.

@h2cone
Copy link

h2cone commented Jul 14, 2023

@n3bul4 what I want to achieve is to encapsulate the OpenAI interface and provide a public service internally. Your example code works well for local debugging with streaming response, but when I deploy it to the test environment, the request is blocked and there is no streaming response effect, and the result is like returning all data as with a regular API request. I don't know where the blocking occurs. We are not considering ChatGPT Web here, only the server-side.

UPDATE: It is likely a configuration problem with Nginx, and I am still trying to solve it.

I encountered the same issue, suspecting it was caused by Nginx buffering. I changed the configuration according to For Server-Sent Events (SSE) what Nginx proxy configuration is appropriate?, but the problem still persists. May I ask how you resolved it?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Problem with stream attribute for CompletionRequest.