feat(api): add new request/respond json stream for API

This commit is contained in:
Julien Oculi 2024-07-02 13:01:49 +02:00
parent 5593878c66
commit 5365f11ec6

View file

@ -1,5 +1,8 @@
import { JsonValue } from '$std/json/common.ts' import { JsonValue } from '$std/json/common.ts'
import { decodeBase64 } from "@std/encoding/base64" import { decodeBase64 } from '@std/encoding/base64'
import { JsonStringifyStream } from '@std/json'
import { JsonParseStream } from '@std/json/json-parse-stream'
import { TextLineStream } from '@std/streams/text-line-stream'
export type JsonCompatible = JsonValue | { toJSON(): JsonValue } | unknown export type JsonCompatible = JsonValue | { toJSON(): JsonValue } | unknown
@ -71,6 +74,91 @@ export type ApiPayload<ApiResponse extends JsonCompatible = never> = {
error: string error: string
} }
export async function respondApiStream<
Payload extends JsonCompatible,
>(
source:
| ReadableStream<Payload>
| Iterable<Payload>
| AsyncIterable<Payload>,
): Promise<Response> {
const stream = new TransformStream<
ApiPayload<Payload>,
ApiPayload<Payload>
>()
const writer = stream.writable.getWriter()
try {
await writer.ready
for await (const data of source) {
writer.write({ kind: 'success', data })
}
} catch (error) {
writer.write({ kind: 'error', error })
} finally {
writer.close()
}
const body = stream.readable
.pipeThrough(new JsonStringifyStream())
.pipeThrough(new TextEncoderStream())
return new Response(body)
}
export async function* requestApiStream<
Payload extends JsonCompatible | undefined,
ApiResponse extends JsonCompatible,
>(
route: string,
method: 'GET' | 'POST' | 'DELETE' | 'PATCH',
payload?: Payload | null,
): AsyncGenerator<ApiResponse, void, void> {
const csrf = getCookie('_CSRF') ?? ''
const base = new URL('/api/', location.origin)
const endpoint = new URL(
route.startsWith('/') ? `.${route}` : route,
base.href,
)
const response = await fetch(endpoint, {
method,
headers: {
'Content-Type': 'application/json; charset=utf-8',
'X-CSRF-TOKEN': csrf,
},
body: payload ? JSON.stringify(payload) : null,
})
const { body } = response
if (body === null) {
throw new TypeError(`api response stream is null`)
}
const stream = body
.pipeThrough(new TextDecoderStream()) // convert Uint8Array to string
.pipeThrough(new TextLineStream()) // transform into a stream where each chunk is divided by a newline
.pipeThrough(new JsonParseStream()) as unknown as ReadableStream<
ApiPayload<ApiResponse>
> // parse each chunk as JSON
for await (const payload of stream) {
if (payload.kind === 'error') {
throw new Error(
`api stream error while getting "${endpoint.href}"`,
{
cause: payload.error,
},
)
}
yield payload.data
}
}
function getCookie(name: string): string | undefined { function getCookie(name: string): string | undefined {
const cookiesEntries = document.cookie.split(';').map((cookie) => const cookiesEntries = document.cookie.split(';').map((cookie) =>
cookie.trim().split('=') cookie.trim().split('=')
@ -82,4 +170,4 @@ function getCookie(name: string): string | undefined {
export function base64ToString(base64: string): string { export function base64ToString(base64: string): string {
const bytes = decodeBase64(base64) const bytes = decodeBase64(base64)
return new TextDecoder().decode(bytes) return new TextDecoder().decode(bytes)
} }