import 'dotenv/config' import express from 'express'; import { Express, Request, Response, NextFunction } from 'express' import fs from 'fs/promises'; import { createReadStream } from 'fs'; import { tmpdir } from 'os'; import { join } from 'path'; import { createHash, Hash } from 'crypto'; import { Database } from 'sqlite3'; import bodyParser from 'body-parser'; import multer from 'multer'; import { v4 as uuid } from 'uuid'; import { getType } from 'mime'; import { createLog } from './log' import { sendMail } from './mail'; import type { Logger } from 'winston'; import * as Handlebars from 'handlebars'; const port : number = typeof process.env['PORT'] !== 'undefined' ? parseInt(process.env['PORT'], 10) : 3333; const data : string = typeof process.env['DATADIR'] !== 'undefined' ? process.env['DATADIR'] : './data'; const dbPath : string = join(data, 'queue.sqlite'); const app : Express = express(); const tmp : string = tmpdir(); const db : Database = new Database(dbPath); let log : Logger = createLog('server'); const accepted : string[] = ['application/zip', 'application/x-zip-compressed']; const storage = multer.diskStorage({ destination: function (req : any, file : any, cb : any) { cb(null, tmp) }, filename: function (req: any, file: any, cb: any) { cb(null, `${+new Date()}_${file.originalname}`) } }); function fileFilter (req: any, file: any, cb: any) { if (accepted.indexOf(file.mimetype) !== -1) { cb(null, true); } else { log.warn(`Filetype ${file.mimetype} is not of type zip`); cb(new Error("Dataset is not of type zip"), false); } } const uploadZip : any = multer({ storage, fileFilter }); const uploadOnnx : any = multer({ storage, fileFilter }); app.use(bodyParser.json()); app.use(bodyParser.urlencoded({ extended: true })); async function createTemplate (filePath : string) : Promise> { let tmpl : string; try { tmpl = await fs.readFile(filePath, 'utf8'); } catch (err) { log.error(err); return null } return Handlebars.compile(tmpl); } let index : HandlebarsTemplateDelegate; function hash (path : string) : Promise { return new Promise((resolve : Function, reject : Function) => { const hashSum : Hash = createHash('sha256'); const stream : any = createReadStream(path); stream.on('error', (err : Error) => reject(err)); stream.on('data', (chunk : Buffer) => hashSum.update(chunk)); stream.on('end', () => resolve(hashSum.digest('hex'))); }); } async function exists (path : string) : Promise { try { await fs.access(path); return true; } catch { return false; } } async function add (email : string, name : string, dataset : string, model : string) : Promise { const query : string = `INSERT INTO queue (id, email, name, dataset, model) VALUES ( ?, ?, ?, ?, ?);`; const id : string = uuid(); return new Promise((resolve : Function, reject : Function) => { return db.run(query, [id, email, name, dataset, model], (err : Error, row : any) => { if (err) return reject(err); log.info(`Added job ${id} to queue`); return resolve(id); }); }); } async function status (id : string) : Promise { const query : string = `SELECT name, model, started, completed, failed, meta FROM queue WHERE id = ? LIMIT 1;`; let jobStatus : string = 'Unknown'; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [id], (err : Error, rows : any) => { if (err) return reject(err); if (rows.length !== 1) { return resolve(jobStatus); } const obj : any = rows[0]; if (rows[0].started === null) { jobStatus = `Has not started` } else if (rows[0].failed !== null) { jobStatus = `Failed
${rows[0].meta}
`; } else if (rows[0].completed !== null) { jobStatus = `Completed ${rows[0].completed} Download`; } else if (rows[0].started !== null) { jobStatus = `Started ${rows[0].started}`; } log.info(`Got status for job ${id}: ${jobStatus}`); return resolve(jobStatus); }); }); } async function name (id : string) : Promise { const query : string = `SELECT name, meta FROM queue WHERE id = ? LIMIT 1;`; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [id], (err : Error, rows : any) => { if (err) return reject(err); if (rows.length < 1) { return reject(new Error(`Job ${id} does not exist`)); } return resolve(rows[0].name); }); }); } async function dataset (id : string) : Promise { const query : string = `SELECT dataset FROM queue WHERE id = ? LIMIT 1;`; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [id], (err : Error, rows : any) => { if (err) return reject(err); if (rows.length < 1) { return reject(new Error(`Dataset ${id} does not exist`)); } return resolve(rows[0].dataset); }); }); } async function job () : Promise { const query : string = `SELECT id FROM queue WHERE started IS NULL AND completed IS NULL AND failed IS NULL ORDER BY created ASC LIMIT 1;`; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [], (err : Error, rows : any) => { if (err) return reject(err); if (rows.length < 1) { return resolve([]); } return resolve([rows[0].id]); }); }); } async function jobs () : Promise { const query : string = `SELECT id FROM queue WHERE started IS NULL AND completed IS NULL AND failed IS NULL ORDER BY created ASC;`; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [], (err : Error, rows : any) => { if (err) return reject(err); return resolve(rows.map((el : any) => el.id)); }); }); } async function claim (id : string) : Promise { const query : string = `SELECT * FROM queue WHERE id = ? LIMIT 1;`; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [id], (err : Error, rows : any) => { if (err) return reject(err); if (rows.length < 1) { return reject(new Error(`Dataset ${id} does not exist`)); } if (rows[0].started !== null) { return reject(new Error(`Job ${id} is already claimed`)); } const claimQuery : string = `UPDATE queue SET started = CURRENT_TIMESTAMP WHERE id = ?;`; return db.run(claimQuery, [id], (err : Error, row : any) => { if (err) return reject(err); return resolve(rows[0]); }) }); }); } async function fail (id : string, meta : string | null) : Promise { const query : string = `UPDATE queue SET failed = CURRENT_TIMESTAMP, meta = ? WHERE id = ?;`; return new Promise((resolve : Function, reject : Function) => { return db.run(query, [ meta, id], (err : Error, row : any) => { if (err) return reject(err); return resolve(true); }); }); } async function complete (id : string, meta : string | null) : Promise { const query : string = `UPDATE queue SET completed = CURRENT_TIMESTAMP, meta = ? WHERE id = ?;`; return new Promise((resolve : Function, reject : Function) => { return db.run(query, [ meta, id ], (err : Error, row : any) => { if (err) return reject(err); return resolve(true); }); }); } async function all () : Promise { const query : string = `SELECT * FROM queue ORDER BY created DESC;`; return new Promise((resolve : Function, reject : Function) => { return db.all(query, [], (err : Error, rows : any) => { if (err) return reject(err); return resolve(rows); }); }); } function annotate (row : any) { row.hasModel = row.completed != null; if (row.completed != null) { row.status = 'Completed'; } else if (row.failed != null) { row.status = 'Failed'; } else if (row.started != null) { row.status = 'Training'; } else { row.status = 'Waiting'; } return row; } app.get('/', async (req : Request, res : Response, next : NextFunction) => { let html : string; let rows : any[]; let input : any; try { rows = await all(); } catch (err) { log.error(err); return next('ERROR: Could not retrieve jobs from queue'); } rows = rows.map(annotate); input = { rows, hasTable : typeof rows == 'undefined' ? false : rows.length > 0 }; html = index(input); res.send(html); }); app.post('/', uploadZip.single('dataset'), async (req : Request, res : Response, next : NextFunction) => { let fileHash : string; let filePath : string; let fileExists : boolean; let id : string; req.setTimeout(0); if (typeof req.file === 'undefined' || req.file === null) { log.error('No file in upload'); return next('ERROR: Please upload dataset as zip file'); } try { fileHash = await hash(req.file.path); } catch (err) { log.error(err); return next(`Error hashing file ${req.file.originalname}`); } filePath = join(data, `${fileHash}.zip`); try { fileExists = await exists(filePath); } catch (err) { log.error(err); } if (!fileExists) { try { await fs.copyFile(req.file.path, filePath); log.info(`Saved dataset with hash ${fileHash}`); } catch (err) { log.error(err); return next(err); } } else { log.warn(`Dataset with hash ${fileHash} already exists...`); } try { await fs.unlink(req.file.path); } catch (err) { log.error(err); } try { id = await add(req.body.email, req.body.name, fileHash, req.body.model); } catch (err) { log.info(err); return next(`Error adding training job ${req.body.name}`); } res.send(`Dataset for job ${req.body.name} has been uploaded successfully. You will be emailed when your job has started and when it has completed training.
Monitor job status here: ${id}`); }); app.post('/job/:id', uploadOnnx.single('model'), async (req : Request, res : Response, next : NextFunction) => { let filePath : string; let meta : string = null; let id : string; req.setTimeout(0); if (typeof req.file === 'undefined' || req.file === null) { log.error('No file in upload'); return next('ERROR: Please model as zip file'); } id = req.params.id; filePath = join(data, `${id}.zip`); if (typeof req.body.meta !== 'undefined') { meta = req.body.meta; } try { await fs.copyFile(req.file.path, filePath); log.info(`Saved model for job ${id}`); } catch (err) { log.error(err); return next(err); } try { await fs.unlink(req.file.path); } catch (err) { log.error(err); } try { await complete(id, meta); } catch (err) { log.error(err); return next(`Error completing training job ${id}`); } res.json({ id }); }); app.get('/job/:id', async (req : Request, res : Response, next : NextFunction) => { let jobStatus : string; if (typeof req.params.id === 'undefined' || req.params.id === null) { log.error(`No job id provided`); return next('Invalid request'); } if (req.params.id.length !== 36) { log.error(`Job id ${req.params.id} is invalid`); return next('Invalid job id'); } try { jobStatus = await status(req.params.id); } catch (err) { log.error(err); return next('Error getting job status'); } return res.send(`Job: ${req.params.id}
Status: ${jobStatus}`); }); app.get('/model/:id', async (req : Request, res: Response, next : NextFunction) => { let filePath : string; let fileExists : boolean = false; let id : string; let fileName : string; let fileStream : any let mimeType : string; let stream : any; if (typeof req.params.id === 'undefined' || req.params.id === null) { log.error(`No job id provided`); return next('Invalid request'); } id = req.params.id; filePath = join(data, `${id}.zip`); try { fileExists = await exists(filePath); } catch (err) { log.error(err); return next(`Error checking whether model for job ${id} exists`); } if (!fileExists) { log.warn(`Model for job ${id} does not exist`) return next(`Model for job ${id} does not exist`); } try { fileName = await name(id); } catch (err) { log.error(err); return next(`Error getting job ${id}`); } mimeType = getType(filePath); res.setHeader('Content-disposition', `attachment; filename=${fileName}.zip`); res.setHeader('Content-type', mimeType); stream = createReadStream(filePath); stream.pipe(res); }); app.get('/dataset/:id', async (req : Request, res: Response, next : NextFunction) => { let filePath : string; let fileExists : boolean = false; let id : string; let datasetHash : string; let fileStream : any let mimeType : string; let stream : any; if (typeof req.params.id === 'undefined' || req.params.id === null) { log.error(`No dataset id provided`); return next('Invalid request'); } id = req.params.id; try { datasetHash = await dataset(id); } catch (err) { log.error(err); return next(`Error getting dataset for job ${id}`); } filePath = join(data, `${datasetHash}.zip`); try { fileExists = await exists(filePath); } catch (err) { log.error(err); return next(`Error checking whether dataset for job ${id} exists`); } if (!fileExists) { log.warn(`Dataset for job ${id} does not exist`) return next(`Dataset for job ${id} does not exist`); } mimeType = getType(filePath); res.setHeader('Content-disposition', `attachment; filename=${datasetHash}.zip`); res.setHeader('Content-type', mimeType); stream = createReadStream(filePath); stream.pipe(res); }); app.get('/job', async (req : Request, res: Response, next : NextFunction) => { let jobArr : string[]; try { jobArr = await job(); } catch (err) { log.error(err); return next('Error getting job'); } res.json(jobArr); }); app.get('/jobs', async (req : Request, res: Response, next : NextFunction) => { let jobArr : string[]; try { jobArr = await jobs(); } catch (err) { log.error(err); return next('Error getting job'); } res.json(jobArr); }); app.post('/job/claim/:id', async (req : Request, res: Response, next : NextFunction) => { let id : string; let jobObj : any; let resObj : any = {}; if (typeof req.params.id === 'undefined' || req.params.id === null) { log.error(`No dataset id provided`); return next('Invalid request'); } id = req.params.id; try { jobObj = await claim(id); log.info(`Job ${id} was claimed`); } catch (err) { log.error(err); return next('Error claiming job'); } resObj.id = id; resObj.path = `/dataset/${id}`; resObj.dataset = jobObj.dataset; resObj.model = jobObj.model; resObj.name = jobObj.name; resObj.email = jobObj.email; res.json(resObj); }); app.post('/job/fail/:id', async (req : Request, res: Response, next : NextFunction) => { let id : string; let meta : string = null; if (typeof req.params.id === 'undefined' || req.params.id === null) { log.error(`No dataset id provided`); return next('Invalid request'); } id = req.params.id; if (typeof req.body.meta !== 'undefined') { meta = req.body.meta; } try { await fail(id, meta); log.info(`Job ${id} failed`); } catch (err) { log.error(err); return next('Error failing job'); } res.json(true); }); app.listen(port, async () => { index = await createTemplate('./views/index.hbs'); log.info(`yolo_web running on port ${port}`); })