Turning JSON Objects from a Fetch Response into an Async Generator with the Streams API

Table of Content

When interacting with large language models, we often want to present the user with content before the full response is available. We can use the Streams API to display partial responses to the user.

Motivating Example

Text-to-JSON offers a streaming API that returns partial JSON objects as they are generated by the language model. Consider parsing a customer object with the JSON schema:

{
  "customer": {
    "company_name": "<string: The name of the company>",
    "vat_id": "<string: The VAT (Value Added Tax) identification number of the company>",
    "contact": {
      "first_name": "<string: The first name of the contact person>",
      "last_name": "<string: The last name of the contact person>",
      "email": "<string: The email address of the contact person>"
    },
    "address": {
      "street": "<string: The street address of the company>",
      "town": "<string: The town or city where the company is located>",
      "zip_code": "<string: The postal code of the company's location>"
    }
  }
}

We built a Chrome extension that helps users of our CRM system create customer profiles. The extension opens a page of our CRM and passes all text selected by the user as a query parameter. The CRM then calls the text-to-json API with the selected text and the schema above. We aim to not hinder the user’s workflow while filling in the form but want to automatically fill in as much of the form as fast as possible.

Text-to-JSON offers a Streaming API /api/v1/inferStreaming?apiToken=... that returns an HTTP response with a stream of JSON objects that looks like this:

{"customer":{}}
{"customer":{"company_name":"Acme Corp"}}
{"customer":{"company_name":"Acme Corp","vat_id":"ATU12345678"}}
{"customer":{"company_name":"Acme Corp","vat_id":"ATU12345678","contact":{}}}
{"customer":{"company_name":"Acme Corp","vat_id":"ATU12345678","contact":{},"address":{}}}
{"customer":{"company_name":"Acme Corp","vat_id":"ATU12345678","contact":{},"address":{"street":"A Street 123"}}}
{"customer":{"company_name":"Acme Corp","vat_id":"ATU12345678","contact":{},"address":{"street":"A Street 123","town":"A Town"}}}
{"customer":{"company_name":"Acme Corp","vat_id":"ATU12345678","contact":{},"address":{"street":"A Street 123","town":"A Town","zip_code":"4020"}}}

Our ttj-client library needs to implement a way to retrieve JSON objects as they come in from the language model. Ideally, we would have an API that always returns the current state of the parsed object as it arrives from the API.

Streams API

As Jake Archibald explained in the year of streams (2016), we can parse a fetch response as a stream.

Fortunately, since 2016, browser vendors have implemented transform streams allowing us to use

response.body.pipeThrough(new TextDecoder()).getReader()

to get a string stream instead of manually decoding the UTF-8 response.

Now we can read a fetch response in chunks like this:

async function streamingFetch(url) {
    const response = await fetch(url);
    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();

    while (true) {
        const chunk = await reader.read();
        if (chunk.done) {
            break;
        }

        // Do something with chunk.value which is a string
        console.log(chunk.value);
    }
}

We can detect the beginning and end of a JSON object by counting opening and closing brackets while keeping track of whether we are inside a string and whether the current character is escaped.

async function *streamingFetch(url) {
    let jsonLevel = 0;
    let isString = false;
    let isEscaped = false;
    let jsonString = '';

    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();

    while (true) {
        const chunk = await reader.read();
        if (chunk.done) {
            return;
        }

        for (let i = 0; i < chunk.value.length; i++) {
            const char = chunk.value[i];
            if ((char === '{' || char === '[') && !isString && !isEscaped) {
                    jsonLevel++;
                } else if ((char === '}' || char === ']') && !isString && !isEscaped) {
                    jsonLevel--;
                }
                else if (char === '"' && !isEscaped) {
                    isString = !isString;
                } else if ((char === '\\' && isEscaped) || (char !== '\\')) {
                    isEscaped = false;
                } else if (char === '\\') {
                    isEscaped = true;
                }

            jsonString += char;
            if (jsonLevel === 0 && !isString && jsonString.trim() !== '') {
                try {
                    yield JSON.parse(jsonString);
                    jsonString = '';
                } catch (e) {
                    console.error(e);
                }
            }
        }
    }
}

We just turned this function call into a generator that yields objects as they are parsed from the stream. All we needed was the * in the function signature and the yield keyword to turn the function into a generator that “yields” a response for every JSON object it finds.

Use Cases

In the case of the ttj-client library, we can use this for any HTTP request that returns a stream of JSON objects.

const { TTJUtils } = require('ttj-utils');
const utils = new TTJUtils();

for (const a of utils.streamFetchJson(await fetch('your-url-here'))) {
    //...
}

But also specifically for the text-to-json streaming API:

async function infer() {
    //...
    for await (const response of ttjClient.inferStreamingBySchema(
        'company name: Acme Corp\na street 123\n1234 Town', 
        {
            customer: {
                company_name: 'string',
                address: {
                    street: 'string',
                    zip_code: 'number',
                    city: 'string'
                }
            }
        }, 
        'openai/gpt-3.5-turbo')) {
            
        console.log(JSON.stringify(response));
    }
}

infer();
/* logs
{"customer":{}}
{"customer":{"company_name":"Acme Corp"}}
{"customer":{"company_name":"Acme Corp","address":{}}}
{"customer":{"company_name":"Acme Corp","address":{"street":"A Street 123"}}}
{"customer":{"company_name":"Acme Corp","address":{"street":"A Street 123","zip_code":1234}}}
{"customer":{"company_name":"Acme Corp","address":{"street":"A Street 123","zip_code":1234,"city":"Town"}}}
*/

We can then progressively update the form fields. Keep in mind that while text-to-json is processing our response, our users might already be typing in the form fields. So don’t overwrite the user’s input with the streaming response. We can avoid this by only updating the form fields if they are empty.

for await (const data of utils.streamFetchJson(response)) {

    if (!this.customer.name && data.customer?.company_name) {
        this.set('customer.name', data.customer?.company_name);
    }
    if (!this.customer.vatId && data.customer?.vat_id) {
        this.set('customer.vatId', data.customer?.vat_id);
    }
    //...
}

Published on 4/19/2024 by Stefan Gussner

Tags: