Skip to content
Snippets Groups Projects
files.ts 9.01 KiB
Newer Older
  • Learn to ignore specific revisions
  • import Flow from '@flowjs/flow.js'
    import { URLBuilder } from '@rokoli/bnb'
    import {
      APICreate,
      APIListPaginated,
      APIResponseError,
      APIRetrieve,
      APIUpdate,
      createExtendableAPI,
      ExtendableAPI,
    } from '@rokoli/bnb/drf'
    import { useTimeoutPoll } from '@vueuse/core'
    
    import { parseISO } from 'date-fns'
    import { sortBy } from 'lodash'
    
    import { defineStore } from 'pinia'
    import { computed, MaybeRefOrGetter, Ref, ref, shallowReadonly, toValue, watch } from 'vue'
    
    import { createTankURL } from '@/api'
    import { tankAuthInit } from '@/stores/auth'
    import { File as TankFile, FileMetadata, Import, Show } from '@/types'
    import { components } from '@/tank-types'
    import { useShowStore } from '@/stores/shows'
    
    type TankFileCreateData =
      | components['schemas']['v1.FileCreateRequest']
      | {
          showId: Show['id']
          sourceURI: string
        }
    
    type ImportProgress = {
      step: 'fetching' | 'normalizing' | 'unknown'
      progress: number
    }
    
    
    type ImportLogLine = { stream: string; line: string; timestamp: string }
    type ImportLog = ImportLogLine[]
    type ParsedImportLogLine = { stream: 'stderr' | 'stdout'; time: Date; content: string }
    type ParsedImportLog = ParsedImportLogLine[]
    type ImportLogMap = Record<string, ImportLog>
    type ParsedImportLogMap = Record<string, ParsedImportLog>
    
    
    export class TimeoutError extends Error {}
    
    const endpoint: URLBuilder = (...subPaths) => {
      const showStore = useShowStore()
      return createTankURL.prefix('shows', showStore.selectedShow?.slug as string, 'files')(...subPaths)
    }
    
    function APIUpload(api: ExtendableAPI<TankFile>, pendingImportFileIds: Ref<Set<TankFile['id']>>) {
      const { create } = APICreate<TankFile, TankFileCreateData>(api)
      const { partialUpdate } = APIUpdate<TankFile, FileMetadata>(api)
      const { retrieve } = APIRetrieve<TankFile>(api)
    
      async function waitForImportState(
        file: TankFile,
        requiredState: 'running' | 'done',
        options: { maxWaitTimeSeconds: number } | undefined = undefined,
      ) {
        const maxWaitTimeSeconds = options?.maxWaitTimeSeconds ?? 5 * 60
        const startTime = new Date().getTime() / 1000
        let waitTime = 0
    
        while (waitTime < maxWaitTimeSeconds) {
          let data
    
          try {
            const response = await fetch(
              api.createRequest(
                api.endpoint(file.id, 'import', new URLSearchParams({ waitFor: requiredState })),
                undefined,
              ),
            )
            await api.maybeThrowResponse(response)
            data = await response.json()
          } catch (e) {
            if (e instanceof APIResponseError) {
              // We got a timeout, this is to be expected for long-running requests.
              if (e.response.status === 504) continue
              // The import state might vanish if the import has been completed, so 404 for done-state is fine.
              if (e.response.status === 404 && requiredState === 'done') return
              throw e
            } else {
              throw e
            }
          } finally {
            waitTime = new Date().getTime() / 1000 - startTime
          }
    
          if (data?.state === requiredState) {
            return
          }
        }
    
        throw new TimeoutError('Maximum wait time passed')
      }
    
      function _uploadFile(file: File, tankFile: TankFile) {
        return new Promise((resolve, reject) => {
          const flow = new Flow({
            ...tankAuthInit.getRequestDefaults(),
            target: api.endpoint(tankFile.id, 'upload'),
            chunkSize: 100 * 1024,
            prioritizeFirstAndLastChunk: true,
          })
          flow.on('fileSuccess', () => {
            resolve(tankFile)
          })
          flow.on('fileError', (file: unknown, message: string) => {
            reject(new Error(message))
          })
          flow.addFile(file)
          flow.upload()
        })
      }
    
      type ImportOptions = {
        onCreate?: (file: TankFile) => unknown
        onDone?: (file: TankFile) => unknown
      }
    
      async function uploadFile(
        file: File,
        show: Show,
        options: ImportOptions | undefined = undefined,
      ) {
        const url = encodeURI(encodeURI(`upload://${file.name}`))
        const tankFile = await create({ showId: show.id, sourceURI: url })
        options?.onCreate?.(tankFile)
        await waitForImportState(tankFile, 'running')
    
        try {
          pendingImportFileIds.value.add(tankFile.id)
          await _uploadFile(file, tankFile)
          await waitForImportState(tankFile, 'done')
        } finally {
          pendingImportFileIds.value.delete(tankFile.id)
        }
    
        let importedTankFile = (await retrieve(tankFile.id, { useCached: false })) as TankFile
        if (!importedTankFile?.metadata?.title) {
          importedTankFile = await partialUpdate(tankFile.id, { title: file.name })
        }
        options?.onDone?.(importedTankFile)
        return importedTankFile
      }
    
      async function importFileURL(
        url: string,
        show: Show,
        options: ImportOptions | undefined = undefined,
      ) {
        const tankFile = await create({ showId: show.id, sourceURI: url })
        options?.onCreate?.(tankFile)
        await waitForImportState(tankFile, 'running')
    
        try {
          pendingImportFileIds.value.add(tankFile.id)
          await waitForImportState(tankFile, 'done')
        } finally {
          pendingImportFileIds.value.delete(tankFile.id)
        }
        const importedTankFile = (await retrieve(tankFile.id, { useCached: false })) as TankFile
        options?.onDone?.(importedTankFile)
        return importedTankFile
      }
    
      return { uploadFile, importFileURL }
    }
    
    function useImportWatcher(pendingImportFileIds: Ref<Set<TankFile['id']>>) {
      const importProgressMap = ref(new Map<TankFile['id'], ImportProgress>())
      const endpoint: URLBuilder = (...subPaths) => {
        const showStore = useShowStore()
        return createTankURL.prefix(
          'shows',
          showStore.selectedShow?.slug as string,
          'imports',
        )(...subPaths)
      }
      const { api } = createExtendableAPI<Import>(endpoint, tankAuthInit)
      const { listIsolated } = APIListPaginated(api)
    
      const { pause, resume, isActive } = useTimeoutPoll(
        async () => {
          const results = await listIsolated(1, { limit: 1_000_000 })
          const importIds = new Set(results.items.map((o) => o.id))
          // attach current imports to imports map
          for (const importObj of results.items) {
            // @ts-expect-error Import.progress type is invalid
            const { step, progress } = importObj.progress as ImportProgress
            importProgressMap.value.set(importObj.id, { step, progress: progress * 100 })
          }
          // cleanup finished imports
          pendingImportFileIds.value = importIds
          for (const importId of Array.from(importProgressMap.value.keys())) {
            if (!importIds.has(importId)) importProgressMap.value.delete(importId)
          }
        },
        1000,
        { immediate: false },
      )
    
      watch(
        pendingImportFileIds,
        () => {
          const shouldBeActive = pendingImportFileIds.value.size > 0
          if (shouldBeActive && !isActive.value) resume()
          else if (!shouldBeActive && isActive.value) pause()
        },
        { deep: true },
      )
    
      return shallowReadonly(importProgressMap)
    }
    
    export function useFileUploadProgress(fileId: MaybeRefOrGetter<TankFile['id']>) {
      const filesStore = useFilesStore()
      return computed<ImportProgress | null>(() => {
        return filesStore.importProgressMap.get(toValue(fileId)) ?? null
      })
    }
    
    
    function parseLog(log: ImportLog): ParsedImportLog {
      const result: { stream: 'stderr' | 'stdout'; content: string; time: Date }[] = []
      for (const { stream, line, timestamp } of log) {
        const time = parseISO(timestamp)
        result.push({
          stream: stream.includes('stderr') ? 'stderr' : 'stdout',
          content: line,
          time,
        })
      }
      return sortBy(result, (item) => item.time.getTime())
    }
    
    function APIFetchLog(api: ExtendableAPI<TankFile>) {
      const importLogs = ref(new Map<TankFile['id'], ImportLogMap>())
      async function retrieveLog(id: TankFile['id']) {
        const response = await fetch(api.createRequest(endpoint(id, 'logs'), undefined))
        await api.maybeThrowResponse(response)
        const data = (await response.json()).results as ImportLogMap
        importLogs.value.set(id, data)
        return data
      }
    
      return {
        importLogs: shallowReadonly(importLogs),
        retrieveLog,
      }
    }
    
    export function useFileImportLogs(fileId: MaybeRefOrGetter<TankFile['id']>) {
      const filesStore = useFilesStore()
    
      function fetchLogs(force = false) {
        if (force || Object.keys(logs.value).length === 0) {
          void filesStore.retrieveLog(toValue(fileId))
        }
      }
    
      const logs = computed<ParsedImportLogMap>(() => {
        const importLogMap = filesStore.importLogs.get(toValue(fileId))
        if (!importLogMap) return {}
        return Object.fromEntries(
          Object.entries(importLogMap).map(([key, log]) => [key, parseLog(log)]),
        )
      })
    
      return { logs, fetchLogs }
    }
    
    
    export const useFilesStore = defineStore('files', () => {
      const { api, base } = createExtendableAPI<TankFile>(endpoint, tankAuthInit)
      const pendingFileUploadIds = ref(new Set<TankFile['id']>())
      const importProgressMap = useImportWatcher(pendingFileUploadIds)
      const { list } = APIListPaginated(api)
      const { partialUpdate } = APIUpdate<TankFile, FileMetadata>(api)
    
      return {
        ...base,
        list,
        partialUpdate,
        importProgressMap,
        ...APIRetrieve(api),
        ...APIUpload(api, pendingFileUploadIds),