我如何流式传输OpenAI的完成API?

11
我想通过OpenAI的API流式传输完成结果
文档中提到使用服务器推送事件(server-sent events),似乎这个功能在flask中无法实现,因此我尝试在客户端处理(我知道这会暴露API密钥)。但是,由于OpenAI API需要进行POST请求,所以似乎它与eventSource API不兼容。我尝试使用fetch (使用可读流),但是当我尝试按照示例将其转换为JSON时,出现以下错误:Uncaught (in promise) SyntaxError: Unexpected token 'd', "data: {"id"... is not valid JSON(我知道这不是有效的JSON)。似乎它在解析整个结果而不是每个单独的流。
data: {"id": "cmpl-5l11I1kS2n99uzNiNVpTjHi3kyied", "object": "text_completion", "created": 1661887020, "choices": [{"text": " to", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-002"}

data: {"id": "cmpl-5l11I1kS2n99uzNiNVpTjHi3kyied", "object": "text_completion", "created": 1661887020, "choices": [{"text": " AL", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-002"}

data: {"id": "cmpl-5l11I1kS2n99uzNiNVpTjHi3kyied", "object": "text_completion", "created": 1661887020, "choices": [{"text": "I", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-002"}

我希望能得到一些指导或一个简单的代码示例,因为我已经苦思冥想了一段时间。谢谢!


1
嗨,首先删除 "data:" const data = bytes.toString(); const message = data.replace(/^data: /, ""); console.log(JSON.parse(message));就这样。 - Anthony Sychev
7个回答

11

最终获得了这段可运行的代码:

import { Configuration, OpenAIApi } from "openai";
import dotenv from "dotenv";
dotenv.config({ override: true });

const openai = new OpenAIApi(new Configuration({ apiKey: process.env.OPENAI_KEY }));

const getText = async (prompt, callback) => {
    const completion = await openai.createCompletion(
        {
            model: "text-davinci-003",
            prompt: prompt,
            max_tokens: 1000,
            stream: true,
        },
        { responseType: "stream" }
    );
    return new Promise((resolve) => {
        let result = "";
        completion.data.on("data", (data) => {
            const lines = data
                ?.toString()
                ?.split("\n")
                .filter((line) => line.trim() !== "");
            for (const line of lines) {
                const message = line.replace(/^data: /, "");
                if (message == "[DONE]") {
                    resolve(result);
                } else {
                    let token;
                    try {
                        token = JSON.parse(message)?.choices?.[0]?.text;
                    } catch {
                        console.log("ERROR", json);
                    }
                    result += token;
                    if (token) {
                        callback(token);
                    }
                }
            }
        });
    });
};
    
console.log(await getText("Who was the latest president of USA?", (c) => process.stdout.write(c)));

3
我遇到了一个错误,completion.data.on is not a function - Sung Kim
2
请使用 Node v18。 - Extender
我不确定为什么,但我认为这是OpenAI端的问题。使用这段代码甚至原生CURL调用,流根本不起作用。它确实将响应分成块,但仍在整个调用完成后响应,而不是按部就班地响应。 - Aousaf Rashid
它将调用char回调函数并以流模式输入;我只需要在最后返回完整的答案,所以我也会将其返回并使用console.log()打印出来。 - Extender

6

在浏览器中,你可以使用 fetch API,例如:

const response = await fetch('https://api.openai.com/v1/completions', {
      method: 'POST',
      headers: {
        Authorization: `Bearer ${config.apiKey}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        model: 'text-davinci-003',
        prompt: input,
        stream: true,
      }),
    });
    const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader();
    if (!reader) return;
    // eslint-disable-next-line no-constant-condition
    while (true) {
      // eslint-disable-next-line no-await-in-loop
      const { value, done } = await reader.read();
      if (done) break;
      let dataDone = false;
      const arr = value.split('\n');
      arr.forEach((data) => {
        if (data.length === 0) return; // ignore empty message
        if (data.startsWith(':')) return; // ignore sse comment message
        if (data === 'data: [DONE]') {
          dataDone = true;
          return;
        }
        const json = JSON.parse(data.substring(6));
        console.log(json);
      });
      if (dataDone) break;
    }

我收到了“reader null”错误,请帮忙,我的响应是200状态码。 - famfamfam
非常感谢!这真是为我节省了很多工作量,对于https://chatapi.haerer.dev来说。 - undefined

3

这段代码有效。


    import { Configuration, OpenAIApi } from "openai";
    
    const configuration = new Configuration({
          apiKey: process.env.OPENAI_KEY,
        });
    
    const openai = new OpenAIApi(configuration);
    
    const completion = await openai.createCompletion(
            {
                model: "text-davinci-003",
                prompt: prompt,
                max_tokens: 100,
                stream: true,
            },
            { responseType: "stream" }
        );
       
    completion.data.on("data", (data) => {
          const lines = data
            ?.toString()
            ?.split("\n")
            .filter((line) => line.trim() !== "");
          for (const line of lines) {
            const message = line.replace(/^data: /, "");
            if (message === "[DONE]") {
              break; // Stream finished
            }
            try {
              const parsed = JSON.parse(message);
              console.log(parsed.choices[0].text);
            } catch (error) {
              console.error("Could not JSON parse stream message", message, error);
            }
          }
        });


3

openai包不支持本地流式完成。不过有一个解决方法,请参见https://github.com/openai/openai-node/issues/18#issuecomment-1369996933。为了方便起见,下面是代码的复制粘贴。

try {
    const res = await openai.createCompletion({
        model: "text-davinci-002",
        prompt: "It was the best of times",
        max_tokens: 100,
        temperature: 0,
        stream: true,
    }, { responseType: 'stream' });
    
    res.data.on('data', data => {
        const lines = data.toString().split('\n').filter(line => line.trim() !== '');
        for (const line of lines) {
            const message = line.replace(/^data: /, '');
            if (message === '[DONE]') {
                return; // Stream finished
            }
            try {
                const parsed = JSON.parse(message);
                console.log(parsed.choices[0].text);
            } catch(error) {
                console.error('Could not JSON parse stream message', message, error);
            }
        }
    });
} catch (error) {
    if (error.response?.status) {
        console.error(error.response.status, error.message);
        error.response.data.on('data', data => {
            const message = data.toString();
            try {
                const parsed = JSON.parse(message);
                console.error('An error occurred during OpenAI request: ', parsed);
            } catch(error) {
                console.error('An error occurred during OpenAI request: ', message);
            }
        });
    } else {
        console.error('An error occurred during OpenAI request', error);
    }
}

1
我使用了openai库提供的onDownloadProgress选项。
async generateTextStream(
  req: OpenaiCompletionRequest,
  onDownloadProgressCallback: (pe: ProgressEvent) => void
): Promise<any> {
  return await this.openai.createCompletion(
    {
      model: req.model,
      prompt: req.prompt,
      max_tokens: req.max_tokens,
      temperature: 0.2,
      stream: true
    },
    {
      responseType: 'stream',
      onDownloadProgress: onDownloadProgressCallback,
    },
  );
}

然后数据就在进度事件中。

const result = await this.openAiService
  .generateTextStream(
    { model: 'text-davinci-003', prompt: this.input, max_tokens: 1000 },
    (pe: ProgressEvent) => {
      const res: string = pe.target['response'];
      console.log(pe);
      const newReturnedData = res.split('data:');
      let lastPart = newReturnedData[newReturnedData.length - 1];
      lastPart = lastPart.trim()
      console.log(lastPart);
    },
  );

您还需要将字符串解析为对象。(我已创建了响应类以获得更好的 IntelliSense)
const data: OpenaiEditResponseData = JSON.parse(lastPart);
console.log(data.choices[0].text);

0

我曾经使用https://github.com/SpellcraftAI/openai-streams 取得了好的成果,该工具将流式细节与我的应用程序分离开来。另外,在Langchain存储库中也有一个流式实现,可以在这里找到文档:https://js.langchain.com/docs/getting-started/guide-llm#streaming

他们的实现可以在这里查看:https://github.com/SpellcraftAI/openai-streams/blob/40e1fd03788327ae3d63dfe1ffb2432087fb0921/src/lib/streaming/streams.ts#L43

方便起见,已复制如下:

async start(controller) {
      const parser = createParser(async (event) => {
        if (event.type === "event") {
          const { data } = event;
          /**
           * Break if event stream finished.
           */
          if (data === "[DONE]") {
            const controllerIsClosed = controller.desiredSize === null;
            if (!controllerIsClosed) {
              controller.close();
            }

            await onDone?.();
            return;
          }
          /**
           * Verify we have a valid JSON object and then enqueue it.
           */
          try {
            const parsed = JSON.parse(data);
            controller.enqueue(ENCODER.encode(data));

            /**
             * In `tokens` mode, if the user runs out of tokens and the stream
             * does not complete, we will throw a MAX_TOKENS error. In `raw`
             * mode, we leave it up to the user to handle this.
             *
             * This requires iterating over result.choices[] and throwing an
             * error if any of them have `{ finish_reason: "length" }`.
             */
            if (mode === "tokens" && parsed?.choices) {
              const { choices } = parsed;
              for (const choice of choices) {
                if (choice?.finish_reason === "length") {
                  throw new OpenAIError("MAX_TOKENS");
                }
              }
            }
          } catch (e) {
            controller.error(e);
          }
        }
      });
      /**
       * Feed the parser with decoded chunks from the raw stream.
       */
      for await (const chunk of yieldStream(stream)) {
        const decoded = DECODER.decode(chunk);

        try {
          const parsed = JSON.parse(decoded);

          if (parsed.hasOwnProperty("error"))
            controller.error(new Error(parsed.error.message));
        } catch (e) {}

        parser.feed(decoded);
      }
    },
  });

-6
请使用以下代码:

const { Configuration, OpenAIApi } = require("openai");

const configuration = new Configuration({
    apiKey: process.env.REACT_APP_APIKEY,// your api key
  });
const openai = new OpenAIApi(configuration);
let fetchData = async () => {
        await openai
          .createCompletion({
            model: "text-davinci-002",
            prompt: `hello i am searched text`,
            max_tokens: 500,
            temperature: 0,
          })
          .then(response => {
         
            console.log(response.data.choices[0].text);
          })
          .catch(err => console.log(err));
      };
      fetchData();

您将收到数据,以对象形式呈现 -> data -> choices[0] -> text

2
这段代码片段没有展示"stream"参数的使用。我也在寻找一个有效的例子,但似乎OpenAI NPM模块不是寻找的地方,因为它目前不支持流式传输。 - Keraf
4
这段代码没有回答问题,没有使用流(streams)。 - al1812

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接