All features in yolo_web server for managing jobs. Add first scripts fir getting dataset on worker machines

This commit is contained in:
Matt McWilliams 2023-06-23 20:22:05 -04:00
parent 05e7be2635
commit 82a7d598fe
9 changed files with 395 additions and 54 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
node_modules node_modules
data/* data/*
.env

2
default.env Normal file
View File

@ -0,0 +1,2 @@
YOLO_WEB_URL=http://localhost:3333
YOLOv5=../yolo_train/

175
dist/index.js vendored
View File

@ -38,7 +38,8 @@ function fileFilter(req, file, cb) {
cb(new Error("Dataset is not of type zip"), false); cb(new Error("Dataset is not of type zip"), false);
} }
} }
const upload = (0, multer_1.default)({ storage, fileFilter }); const uploadZip = (0, multer_1.default)({ storage, fileFilter });
const uploadOnnx = (0, multer_1.default)({ storage });
app.use(body_parser_1.default.json()); app.use(body_parser_1.default.json());
app.use(body_parser_1.default.urlencoded({ extended: true })); app.use(body_parser_1.default.urlencoded({ extended: true }));
function hash(path) { function hash(path) {
@ -80,6 +81,10 @@ async function status(id) {
return db.all(query, [id], (err, rows) => { return db.all(query, [id], (err, rows) => {
if (err) if (err)
return reject(err); return reject(err);
if (rows.length !== 1) {
return resolve(jobStatus);
}
const obj = rows[0];
if (rows[0].started === null) { if (rows[0].started === null) {
jobStatus = `Has not started`; jobStatus = `Has not started`;
} }
@ -89,6 +94,9 @@ async function status(id) {
else if (rows[0].completed !== null) { else if (rows[0].completed !== null) {
jobStatus = `Completed ${rows[0].completed} <a href="/model/${rows[0].model}/${id}">Download</a>`; jobStatus = `Completed ${rows[0].completed} <a href="/model/${rows[0].model}/${id}">Download</a>`;
} }
else if (rows[0].started !== null) {
jobStatus = `Started ${rows[0].started}`;
}
console.log(`Got status for job ${id}: ${jobStatus}`); console.log(`Got status for job ${id}: ${jobStatus}`);
return resolve(jobStatus); return resolve(jobStatus);
}); });
@ -132,9 +140,64 @@ async function job() {
if (err) if (err)
return reject(err); return reject(err);
if (rows.length < 1) { if (rows.length < 1) {
return resolve(null); return resolve([]);
} }
return resolve(rows[0].id); return resolve([rows[0].id]);
});
});
}
async function jobs() {
const query = `SELECT id FROM queue WHERE
started IS NULL
AND completed IS NULL
AND failed IS NULL
ORDER BY created ASC;`;
return new Promise((resolve, reject) => {
return db.all(query, [], (err, rows) => {
if (err)
return reject(err);
return resolve(rows.map((el) => el.id));
});
});
}
async function claim(id) {
const query = `SELECT * FROM queue WHERE id = ? LIMIT 1;`;
return new Promise((resolve, reject) => {
return db.all(query, [id], (err, rows) => {
if (err)
return reject(err);
if (rows.length < 1) {
return reject(new Error(`Dataset ${id} does not exist`));
}
if (rows[0].started !== null) {
return reject(new Error(`Job ${id} is already claimed`));
}
const claimQuery = `UPDATE queue SET started = CURRENT_TIMESTAMP WHERE id = ?;`;
return db.run(claimQuery, [id], (err, row) => {
if (err)
return reject(err);
return resolve(rows[0]);
});
});
});
}
async function fail(id, meta) {
const query = `UPDATE queue SET failed = CURRENT_TIMESTAMP, meta = ? WHERE id = ?;`;
return new Promise((resolve, reject) => {
return db.run(query, [meta, id], (err, row) => {
if (err)
return reject(err);
return resolve(true);
});
});
}
async function complete(id, meta) {
const query = `UPDATE queue SET completed = CURRENT_TIMESTAMP, meta = ? WHERE id = ?;`;
return new Promise((resolve, reject) => {
return db.run(query, [meta, id], (err, row) => {
if (err)
return reject(err);
return resolve(true);
}); });
}); });
} }
@ -149,7 +212,7 @@ app.get('/', async (req, res, next) => {
} }
res.send(html); res.send(html);
}); });
app.post('/', upload.single('dataset'), async (req, res, next) => { app.post('/', uploadZip.single('dataset'), async (req, res, next) => {
let fileHash; let fileHash;
let filePath; let filePath;
let fileExists; let fileExists;
@ -175,7 +238,7 @@ app.post('/', upload.single('dataset'), async (req, res, next) => {
} }
if (!fileExists) { if (!fileExists) {
try { try {
await promises_1.default.rename(req.file.path, filePath); await promises_1.default.copyFile(req.file.path, filePath);
console.log(`Saved dataset with hash ${fileHash}`); console.log(`Saved dataset with hash ${fileHash}`);
} }
catch (err) { catch (err) {
@ -185,13 +248,13 @@ app.post('/', upload.single('dataset'), async (req, res, next) => {
} }
else { else {
console.warn(`Dataset with hash ${fileHash} already exists...`); console.warn(`Dataset with hash ${fileHash} already exists...`);
}
try { try {
await promises_1.default.unlink(req.file.path); await promises_1.default.unlink(req.file.path);
} }
catch (err) { catch (err) {
console.error(err); console.error(err);
} }
}
try { try {
id = await add(req.body.email, req.body.name, fileHash, req.body.model); id = await add(req.body.email, req.body.name, fileHash, req.body.model);
} }
@ -201,6 +264,42 @@ app.post('/', upload.single('dataset'), async (req, res, next) => {
} }
res.send(`<html><body>Dataset for job ${req.body.name} has been uploaded successfully. You will be emailed when your job has started and when it has completed training. <br /> Monitor job status here: <a href="/job/${id}">${id}</a></body></html>`); res.send(`<html><body>Dataset for job ${req.body.name} has been uploaded successfully. You will be emailed when your job has started and when it has completed training. <br /> Monitor job status here: <a href="/job/${id}">${id}</a></body></html>`);
}); });
app.post('/job/:id', uploadOnnx.single('model'), async (req, res, next) => {
let filePath;
let meta = null;
let id;
req.setTimeout(0);
if (typeof req.file === 'undefined' && req.file === null) {
console.error('No file in upload');
return next('ERROR: Please model as ONNX file');
}
filePath = (0, path_1.join)(data, `${id}.onnx`);
if (typeof req.body.meta !== 'undefined') {
meta = req.body.meta;
}
try {
await promises_1.default.copyFile(req.file.path, filePath);
console.log(`Saved model for job ${id}`);
}
catch (err) {
console.error(err);
return next(err);
}
try {
await promises_1.default.unlink(req.file.path);
}
catch (err) {
console.error(err);
}
try {
await complete(id, meta);
}
catch (err) {
console.log(err);
return next(`Error completing training job ${id}`);
}
res.json({ id });
});
app.get('/job/:id', async (req, res, next) => { app.get('/job/:id', async (req, res, next) => {
let jobStatus; let jobStatus;
if (typeof req.params.id === 'undefined' || req.params.id === null) { if (typeof req.params.id === 'undefined' || req.params.id === null) {
@ -297,18 +396,72 @@ app.get('/dataset/:id', async (req, res, next) => {
stream.pipe(res); stream.pipe(res);
}); });
app.get('/job', async (req, res, next) => { app.get('/job', async (req, res, next) => {
let jobId; let jobArr;
try { try {
jobId = await job(); jobArr = await job();
} }
catch (err) { catch (err) {
console.error(err); console.error(err);
return next('Error getting job'); return next('Error getting job');
} }
res.json([jobId]); res.json(jobArr);
});
app.get('/jobs', async (req, res, next) => {
let jobArr;
try {
jobArr = await jobs();
}
catch (err) {
console.error(err);
return next('Error getting job');
}
res.json(jobArr);
});
app.post('/job/claim/:id', async (req, res, next) => {
let id;
let jobObj;
let resObj = {};
if (typeof req.params.id === 'undefined' || req.params.id === null) {
console.error(`No dataset id provided`);
return next('Invalid request');
}
id = req.params.id;
try {
jobObj = await claim(id);
console.log(`Job ${id} was claimed`);
}
catch (err) {
console.error(err);
return next('Error claiming job');
}
resObj.id = id;
resObj.path = `/dataset/${id}`;
resObj.dataset = jobObj.dataset;
resObj.model = jobObj.model;
resObj.name = jobObj.name;
res.json(resObj);
});
app.post('/job/fail/:id', async (req, res, next) => {
let id;
let meta = null;
if (typeof req.params.id === 'undefined' || req.params.id === null) {
console.error(`No dataset id provided`);
return next('Invalid request');
}
id = req.params.id;
if (typeof req.body.meta !== 'undefined') {
meta = req.body.meta;
}
try {
await fail(id, meta);
console.log(`Job ${id} failed`);
}
catch (err) {
console.error(err);
return next('Error failing job');
}
res.json(true);
}); });
//app.get('/jobs');
//app.post('/job/started/:id', )
app.listen(port, () => { app.listen(port, () => {
console.log(`yolo_web running on port ${port}`); console.log(`yolo_web running on port ${port}`);
}); });

2
dist/index.js.map vendored

File diff suppressed because one or more lines are too long

9
docker-compose.yaml Normal file
View File

@ -0,0 +1,9 @@
services:
yolo_web:
image: yolo_web
container_name: yolo_web
volumes:
- './views:/code/views'
- './data/:/code/data'
ports:
- '3333:3333'

24
scripts/claim.sh Normal file
View File

@ -0,0 +1,24 @@
#!/bin/bash
set -e
if [ -f .env ]; then
source .env
fi
if [ -z ${1} ]; then
echo "Please provide a job"
exit 1
fi
JOB="${1}"
JSON=$(curl -s -X POST "${YOLO_WEB_URL}/job/claim/${JOB}")
if [[ "${JSON}" != *"{"* ]]; then
echo ERROR
echo "${JSON}"
exit 1
fi
bash ./scripts/train.sh "${JSON}"

20
scripts/job.sh Normal file
View File

@ -0,0 +1,20 @@
#!/bin/bash
set -e
if [ -f .env ]; then
source .env
fi
if [ -z ${YOLO_WEB_URL} ]; then
YOLO_WEB_URL=http://localhost:3333
fi
JOB_JSON=$(curl -s "${YOLO_WEB_URL}/job")
JOB=$(echo $JOB_JSON | jq -r '.[0]')
if [[ "${JOB}" != "null" ]]; then
bash ./scripts/claim.sh "${JOB}"
else
echo "No jobs"
fi

47
scripts/train.sh Normal file
View File

@ -0,0 +1,47 @@
set -e
if [ -f .env ]; then
source .env
fi
if [ -z ${1} ]; then
echo "Please provide a JSON object"
exit 1
fi
function fail () {
ID="${1}"
META=${2}
URL="${YOLO_WEB_URL}/job/fail/${ID}"
echo "${META}"
curl -s -X POST \
-H 'Content-Type: application/json' \
-d "{\"meta\":\"${META}\"}" "${URL}"
}
JSON="${1}"
ID=$(echo $JSON | jq -r '.id')
DATASET=$(echo $JSON | jq -r '.dataset')
MODEL=$(echo $JSON | jq -r '.model')
NAME=$(echo $JSON | jq -r '.name')
FILEPATH=$(echo $JSON | jq -r '.path')
DOWNLOAD="${YOLO_WEB_URL}${FILEPATH}"
DEST="${YOLOv5}${DATASET}.zip"
UNZIPPED="${YOLOv5}${DATASET}"
echo "Downloading ${DOWNLOAD}"
if [ ! -f "${DEST}" ]; then
wget -q -O "${DEST}" "${DOWNLOAD}"
fi
unzip "${DEST}" -d "${UNZIPPED}"
if [ ! -f "${UNZIPPED}/data.yaml" ]; then
fail "${ID}" "Invalid dataset"
rm -rf "${UNZIPPED}"
rm "${DEST}"
exit 1
fi

View File

@ -38,7 +38,8 @@ function fileFilter (req: any, file: any, cb: any) {
} }
} }
const upload : any = multer({ storage, fileFilter }); const uploadZip : any = multer({ storage, fileFilter });
const uploadOnnx : any = multer({ storage });
app.use(bodyParser.json()); app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true })); app.use(bodyParser.urlencoded({ extended: true }));
@ -82,12 +83,18 @@ async function status (id : string) : Promise<string> {
return new Promise((resolve : Function, reject : Function) => { return new Promise((resolve : Function, reject : Function) => {
return db.all(query, [id], (err : Error, rows : any) => { return db.all(query, [id], (err : Error, rows : any) => {
if (err) return reject(err); if (err) return reject(err);
if (rows.length !== 1) {
return resolve(jobStatus);
}
const obj : any = rows[0];
if (rows[0].started === null) { if (rows[0].started === null) {
jobStatus = `Has not started` jobStatus = `Has not started`
} else if (rows[0].failed !== null) { } else if (rows[0].failed !== null) {
jobStatus = `Failed <br /> <pre>${rows[0].meta}</pre>`; jobStatus = `Failed <br /> <pre>${rows[0].meta}</pre>`;
} else if (rows[0].completed !== null) { } else if (rows[0].completed !== null) {
jobStatus = `Completed ${rows[0].completed} <a href="/model/${rows[0].model}/${id}">Download</a>`; jobStatus = `Completed ${rows[0].completed} <a href="/model/${rows[0].model}/${id}">Download</a>`;
} else if (rows[0].started !== null) {
jobStatus = `Started ${rows[0].started}`;
} }
console.log(`Got status for job ${id}: ${jobStatus}`); console.log(`Got status for job ${id}: ${jobStatus}`);
return resolve(jobStatus); return resolve(jobStatus);
@ -121,7 +128,7 @@ async function dataset (id : string) : Promise<string> {
}); });
} }
async function job () : Promise<string|null> { async function job () : Promise<string[]> {
const query : string = `SELECT id FROM queue WHERE const query : string = `SELECT id FROM queue WHERE
started IS NULL started IS NULL
AND completed IS NULL AND completed IS NULL
@ -132,9 +139,23 @@ async function job () : Promise<string|null> {
return db.all(query, [], (err : Error, rows : any) => { return db.all(query, [], (err : Error, rows : any) => {
if (err) return reject(err); if (err) return reject(err);
if (rows.length < 1) { if (rows.length < 1) {
return resolve(null); return resolve([]);
} }
return resolve(rows[0].id); return resolve([rows[0].id]);
});
});
}
async function jobs () : Promise<string[]> {
const query : string = `SELECT id FROM queue WHERE
started IS NULL
AND completed IS NULL
AND failed IS NULL
ORDER BY created ASC;`;
return new Promise((resolve : Function, reject : Function) => {
return db.all(query, [], (err : Error, rows : any) => {
if (err) return reject(err);
return resolve(rows.map((el : any) => el.id));
}); });
}); });
} }
@ -150,7 +171,7 @@ async function claim (id : string) : Promise<string> {
if (rows[0].started !== null) { if (rows[0].started !== null) {
return reject(new Error(`Job ${id} is already claimed`)); return reject(new Error(`Job ${id} is already claimed`));
} }
const claimQuery : string = `UPDATE queue SET started = CURRENT_TIMESTAMP WHERE id = ? LIMIT 1;`; const claimQuery : string = `UPDATE queue SET started = CURRENT_TIMESTAMP WHERE id = ?;`;
return db.run(claimQuery, [id], (err : Error, row : any) => { return db.run(claimQuery, [id], (err : Error, row : any) => {
if (err) return reject(err); if (err) return reject(err);
return resolve(rows[0]); return resolve(rows[0]);
@ -159,10 +180,20 @@ async function claim (id : string) : Promise<string> {
}); });
} }
async function fail (id : string, meta : string) : Promise<boolean> { async function fail (id : string, meta : string | null) : Promise<boolean> {
const query : string = `UPDATE queue SET failed = CURRENT_TIMESTAMP WHERE id = ? LIMIT 1;`; const query : string = `UPDATE queue SET failed = CURRENT_TIMESTAMP, meta = ? WHERE id = ?;`;
return new Promise((resolve : Function, reject : Function) => { return new Promise((resolve : Function, reject : Function) => {
return db.run(query, [ id ], (err : Error, row : any) => { return db.run(query, [ meta, id], (err : Error, row : any) => {
if (err) return reject(err);
return resolve(true);
});
});
}
async function complete (id : string, meta : string | null) : Promise<boolean> {
const query : string = `UPDATE queue SET completed = CURRENT_TIMESTAMP, meta = ? WHERE id = ?;`;
return new Promise((resolve : Function, reject : Function) => {
return db.run(query, [ meta, id ], (err : Error, row : any) => {
if (err) return reject(err); if (err) return reject(err);
return resolve(true); return resolve(true);
}); });
@ -180,7 +211,7 @@ app.get('/', async (req : Request, res : Response, next : NextFunction) => {
res.send(html); res.send(html);
}); });
app.post('/', upload.single('dataset'), async (req : Request, res : Response, next : NextFunction) => { app.post('/', uploadZip.single('dataset'), async (req : Request, res : Response, next : NextFunction) => {
let fileHash : string; let fileHash : string;
let filePath : string; let filePath : string;
let fileExists : boolean; let fileExists : boolean;
@ -209,21 +240,21 @@ app.post('/', upload.single('dataset'), async (req : Request, res : Response, ne
if (!fileExists) { if (!fileExists) {
try { try {
await fs.rename(req.file.path, filePath); await fs.copyFile(req.file.path, filePath);
console.log(`Saved dataset with hash ${fileHash}`); console.log(`Saved dataset with hash ${fileHash}`);
} catch (err) { } catch (err) {
console.error(err); console.error(err);
return next(err); return next(err);
} }
} else { } else {
console.warn(`Dataset with hash ${fileHash} already exists...`); console.warn(`Dataset with hash ${fileHash} already exists...`);
}
try { try {
await fs.unlink(req.file.path); await fs.unlink(req.file.path);
} catch (err) { } catch (err) {
console.error(err); console.error(err);
} }
}
try { try {
id = await add(req.body.email, req.body.name, fileHash, req.body.model); id = await add(req.body.email, req.body.name, fileHash, req.body.model);
@ -235,6 +266,47 @@ app.post('/', upload.single('dataset'), async (req : Request, res : Response, ne
res.send(`<html><body>Dataset for job ${req.body.name} has been uploaded successfully. You will be emailed when your job has started and when it has completed training. <br /> Monitor job status here: <a href="/job/${id}">${id}</a></body></html>`); res.send(`<html><body>Dataset for job ${req.body.name} has been uploaded successfully. You will be emailed when your job has started and when it has completed training. <br /> Monitor job status here: <a href="/job/${id}">${id}</a></body></html>`);
}); });
app.post('/job/:id', uploadOnnx.single('model'), async (req : Request, res : Response, next : NextFunction) => {
let filePath : string;
let meta : string = null;
let id : string;
req.setTimeout(0);
if (typeof req.file === 'undefined' && req.file === null) {
console.error('No file in upload');
return next('ERROR: Please model as ONNX file');
}
filePath = join(data, `${id}.onnx`);
if (typeof req.body.meta !== 'undefined') {
meta = req.body.meta;
}
try {
await fs.copyFile(req.file.path, filePath);
console.log(`Saved model for job ${id}`);
} catch (err) {
console.error(err);
return next(err);
}
try {
await fs.unlink(req.file.path);
} catch (err) {
console.error(err);
}
try {
await complete(id, meta);
} catch (err) {
console.log(err);
return next(`Error completing training job ${id}`);
}
res.json({ id });
});
app.get('/job/:id', async (req : Request, res : Response, next : NextFunction) => { app.get('/job/:id', async (req : Request, res : Response, next : NextFunction) => {
let jobStatus : string; let jobStatus : string;
@ -344,16 +416,29 @@ app.get('/dataset/:id', async (req : Request, res: Response, next : NextFunction
}); });
app.get('/job', async (req : Request, res: Response, next : NextFunction) => { app.get('/job', async (req : Request, res: Response, next : NextFunction) => {
let jobId : string; let jobArr : string[];
try { try {
jobId = await job(); jobArr = await job();
} catch (err) { } catch (err) {
console.error(err); console.error(err);
return next('Error getting job'); return next('Error getting job');
} }
res.json([jobId]); res.json(jobArr);
});
app.get('/jobs', async (req : Request, res: Response, next : NextFunction) => {
let jobArr : string[];
try {
jobArr = await jobs();
} catch (err) {
console.error(err);
return next('Error getting job');
}
res.json(jobArr);
}); });
app.post('/job/claim/:id', async (req : Request, res: Response, next : NextFunction) => { app.post('/job/claim/:id', async (req : Request, res: Response, next : NextFunction) => {
@ -370,22 +455,24 @@ app.post('/job/claim/:id', async (req : Request, res: Response, next : NextFunct
try { try {
jobObj = await claim(id); jobObj = await claim(id);
console.log(`Job ${id} was claimed`);
} catch (err) { } catch (err) {
console.error(err); console.error(err);
return next('Error claiming job'); return next('Error claiming job');
} }
resJob.id = id; resObj.id = id;
resJob.datasetPath = `/dataset/${id}`; resObj.path = `/dataset/${id}`;
resJob.model = jobObj.model; resObj.dataset = jobObj.dataset;
resObj.model = jobObj.model;
resObj.name = jobObj.name;
res.json(resJob); res.json(resObj);
}); });
app.post('/job/fail/:id', async (req : Request, res: Response, next : NextFunction) => { app.post('/job/fail/:id', async (req : Request, res: Response, next : NextFunction) => {
let id : string; let id : string;
let jobObj : any; let meta : string = null;
let resObj : any = {};
if (typeof req.params.id === 'undefined' || req.params.id === null) { if (typeof req.params.id === 'undefined' || req.params.id === null) {
console.error(`No dataset id provided`); console.error(`No dataset id provided`);
@ -393,22 +480,20 @@ app.post('/job/fail/:id', async (req : Request, res: Response, next : NextFuncti
} }
id = req.params.id; id = req.params.id;
if (typeof req.body.meta !== 'undefined') {
try { meta = req.body.meta;
jobObj = await claim(id);
} catch (err) {
console.error(err);
return next('Error claiming job');
} }
resJob.id = id; try {
resJob.datasetPath = `/dataset/${id}`; await fail(id, meta);
resJob.model = jobObj.model; console.log(`Job ${id} failed`);
} catch (err) {
console.error(err);
return next('Error failing job');
}
res.json(resJob); res.json(true);
}); });
//app.get('/jobs');
//app.post('/job/started/:id', )
app.listen(port, () => { app.listen(port, () => {
console.log(`yolo_web running on port ${port}`); console.log(`yolo_web running on port ${port}`);