import Flow from '@flowjs/flow.js' import { URLBuilder } from '@rokoli/bnb' import { APICreate, APIListPaginated, APIResponseError, APIRetrieve, APIUpdate, createExtendableAPI, ExtendableAPI, } from '@rokoli/bnb/drf' import { useTimeoutPoll } from '@vueuse/core' import { parseISO } from 'date-fns' import { sortBy } from 'lodash' import { defineStore } from 'pinia' import { computed, MaybeRefOrGetter, Ref, ref, shallowReadonly, toValue, watch } from 'vue' import { createTankURL } from '@/api' import { tankAuthInit } from '@/stores/auth' import { File as TankFile, FileMetadata, Import, Show } from '@/types' import { components } from '@/tank-types' import { useShowStore } from '@/stores/shows' type TankFileCreateData = | components['schemas']['v1.FileCreateRequest'] | { showId: Show['id'] sourceURI: string } type ImportProgress = { step: 'fetching' | 'normalizing' | 'unknown' progress: number } type ImportLogLine = { stream: string; line: string; timestamp: string } type ImportLog = ImportLogLine[] type ParsedImportLogLine = { stream: 'stderr' | 'stdout'; time: Date; content: string } type ParsedImportLog = ParsedImportLogLine[] type ImportLogMap = Record<string, ImportLog> type ParsedImportLogMap = Record<string, ParsedImportLog> export class TimeoutError extends Error {} const endpoint: URLBuilder = (...subPaths) => { const showStore = useShowStore() return createTankURL.prefix('shows', showStore.selectedShow?.slug as string, 'files')(...subPaths) } function APIUpload(api: ExtendableAPI<TankFile>, pendingImportFileIds: Ref<Set<TankFile['id']>>) { const { create } = APICreate<TankFile, TankFileCreateData>(api) const { partialUpdate } = APIUpdate<TankFile, FileMetadata>(api) const { retrieve } = APIRetrieve<TankFile>(api) async function waitForImportState( file: TankFile, requiredState: 'running' | 'done', options: { maxWaitTimeSeconds: number } | undefined = undefined, ) { const maxWaitTimeSeconds = options?.maxWaitTimeSeconds ?? 5 * 60 const startTime = new Date().getTime() / 1000 let waitTime = 0 while (waitTime < maxWaitTimeSeconds) { let data try { const response = await fetch( api.createRequest( api.endpoint(file.id, 'import', new URLSearchParams({ waitFor: requiredState })), undefined, ), ) await api.maybeThrowResponse(response) data = await response.json() } catch (e) { if (e instanceof APIResponseError) { // We got a timeout, this is to be expected for long-running requests. if (e.response.status === 504) continue // The import state might vanish if the import has been completed, so 404 for done-state is fine. if (e.response.status === 404 && requiredState === 'done') return throw e } else { throw e } } finally { waitTime = new Date().getTime() / 1000 - startTime } if (data?.state === requiredState) { return } } throw new TimeoutError('Maximum wait time passed') } function _uploadFile(file: File, tankFile: TankFile) { return new Promise((resolve, reject) => { const flow = new Flow({ ...tankAuthInit.getRequestDefaults(), target: api.endpoint(tankFile.id, 'upload'), chunkSize: 100 * 1024, prioritizeFirstAndLastChunk: true, }) flow.on('fileSuccess', () => { resolve(tankFile) }) flow.on('fileError', (file: unknown, message: string) => { reject(new Error(message)) }) flow.addFile(file) flow.upload() }) } type ImportOptions = { onCreate?: (file: TankFile) => unknown onDone?: (file: TankFile) => unknown } async function uploadFile( file: File, show: Show, options: ImportOptions | undefined = undefined, ) { const url = encodeURI(encodeURI(`upload://${file.name}`)) const tankFile = await create({ showId: show.id, sourceURI: url }) options?.onCreate?.(tankFile) await waitForImportState(tankFile, 'running') try { pendingImportFileIds.value.add(tankFile.id) await _uploadFile(file, tankFile) await waitForImportState(tankFile, 'done') } finally { pendingImportFileIds.value.delete(tankFile.id) } let importedTankFile = (await retrieve(tankFile.id, { useCached: false })) as TankFile if (!importedTankFile?.metadata?.title) { importedTankFile = await partialUpdate(tankFile.id, { title: file.name }) } options?.onDone?.(importedTankFile) return importedTankFile } async function importFileURL( url: string, show: Show, options: ImportOptions | undefined = undefined, ) { const tankFile = await create({ showId: show.id, sourceURI: url }) options?.onCreate?.(tankFile) await waitForImportState(tankFile, 'running') try { pendingImportFileIds.value.add(tankFile.id) await waitForImportState(tankFile, 'done') } finally { pendingImportFileIds.value.delete(tankFile.id) } const importedTankFile = (await retrieve(tankFile.id, { useCached: false })) as TankFile options?.onDone?.(importedTankFile) return importedTankFile } return { uploadFile, importFileURL } } function useImportWatcher(pendingImportFileIds: Ref<Set<TankFile['id']>>) { const importProgressMap = ref(new Map<TankFile['id'], ImportProgress>()) const endpoint: URLBuilder = (...subPaths) => { const showStore = useShowStore() return createTankURL.prefix( 'shows', showStore.selectedShow?.slug as string, 'imports', )(...subPaths) } const { api } = createExtendableAPI<Import>(endpoint, tankAuthInit) const { listIsolated } = APIListPaginated(api) const { pause, resume, isActive } = useTimeoutPoll( async () => { const results = await listIsolated(1, { limit: 1_000_000 }) const importIds = new Set(results.items.map((o) => o.id)) // attach current imports to imports map for (const importObj of results.items) { // @ts-expect-error Import.progress type is invalid const { step, progress } = importObj.progress as ImportProgress importProgressMap.value.set(importObj.id, { step, progress: progress * 100 }) } // cleanup finished imports pendingImportFileIds.value = importIds for (const importId of Array.from(importProgressMap.value.keys())) { if (!importIds.has(importId)) importProgressMap.value.delete(importId) } }, 1000, { immediate: false }, ) watch( pendingImportFileIds, () => { const shouldBeActive = pendingImportFileIds.value.size > 0 if (shouldBeActive && !isActive.value) resume() else if (!shouldBeActive && isActive.value) pause() }, { deep: true }, ) return shallowReadonly(importProgressMap) } export function useFileUploadProgress(fileId: MaybeRefOrGetter<TankFile['id']>) { const filesStore = useFilesStore() return computed<ImportProgress | null>(() => { return filesStore.importProgressMap.get(toValue(fileId)) ?? null }) } function parseLog(log: ImportLog): ParsedImportLog { const result: { stream: 'stderr' | 'stdout'; content: string; time: Date }[] = [] for (const { stream, line, timestamp } of log) { const time = parseISO(timestamp) result.push({ stream: stream.includes('stderr') ? 'stderr' : 'stdout', content: line, time, }) } return sortBy(result, (item) => item.time.getTime()) } function APIFetchLog(api: ExtendableAPI<TankFile>) { const importLogs = ref(new Map<TankFile['id'], ImportLogMap>()) async function retrieveLog(id: TankFile['id']) { const response = await fetch(api.createRequest(endpoint(id, 'logs'), undefined)) await api.maybeThrowResponse(response) const data = (await response.json()).results as ImportLogMap importLogs.value.set(id, data) return data } return { importLogs: shallowReadonly(importLogs), retrieveLog, } } export function useFileImportLogs(fileId: MaybeRefOrGetter<TankFile['id']>) { const filesStore = useFilesStore() function fetchLogs(force = false) { if (force || Object.keys(logs.value).length === 0) { void filesStore.retrieveLog(toValue(fileId)) } } const logs = computed<ParsedImportLogMap>(() => { const importLogMap = filesStore.importLogs.get(toValue(fileId)) if (!importLogMap) return {} return Object.fromEntries( Object.entries(importLogMap).map(([key, log]) => [key, parseLog(log)]), ) }) return { logs, fetchLogs } } export const useFilesStore = defineStore('files', () => { const { api, base } = createExtendableAPI<TankFile>(endpoint, tankAuthInit) const pendingFileUploadIds = ref(new Set<TankFile['id']>()) const importProgressMap = useImportWatcher(pendingFileUploadIds) const { list } = APIListPaginated(api) const { partialUpdate } = APIUpdate<TankFile, FileMetadata>(api) return { ...base, list, partialUpdate, importProgressMap, ...APIRetrieve(api), ...APIUpload(api, pendingFileUploadIds), ...APIFetchLog(api), } })