Python works in progress: can currently locate the QR code through a very slow process. Audio is going to prove to be a challenge with the BMPCC onboard mic
This commit is contained in:
parent
cd22ac0f38
commit
6fba96e253
|
@ -1 +1,4 @@
|
||||||
*.DS_Store
|
*.DS_Store
|
||||||
|
py/env
|
||||||
|
*.wav
|
||||||
|
*.png
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
import numpy as np
|
||||||
|
from scipy.io.wavfile import read, write
|
||||||
|
from scipy.signal import butter, filtfilt
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
from time import strftime, localtime
|
||||||
|
|
||||||
|
parser = ArgumentParser(prog='python3 audio_detect.py', description='Example script that uses ffmpeg and bandpass filter to locate audio signature')
|
||||||
|
parser.add_argument('video', type=str, help='Video file to analyze')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not os.path.exists(args.video) :
|
||||||
|
print(f'Video {args.video} does not exist')
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
print(f'Detecting audio signature in {args.video}')
|
||||||
|
|
||||||
|
def rm (file) :
|
||||||
|
if os.path.exists(file):
|
||||||
|
os.remove(file)
|
||||||
|
|
||||||
|
def bandpass_filter(data, lowcut, highcut, fs, order=5):
|
||||||
|
"""Apply a bandpass filter to the audio data."""
|
||||||
|
nyq = 0.5 * fs
|
||||||
|
low = lowcut / nyq
|
||||||
|
high = highcut / nyq
|
||||||
|
b, a = butter(order, [low, high], btype='band')
|
||||||
|
y = filtfilt(b, a, data)
|
||||||
|
return y
|
||||||
|
|
||||||
|
def apply_bandpass_to_wav(input_file, output_file, lowcut, highcut, order=5):
|
||||||
|
"""Read a WAV file, apply a bandpass filter, and write the result to a new WAV file."""
|
||||||
|
fs, data = read(input_file)
|
||||||
|
|
||||||
|
print(f'Applying {lowcut} -> {highcut} bandpass on {input_file}')
|
||||||
|
if data.ndim == 2:
|
||||||
|
filtered_data = np.zeros_like(data)
|
||||||
|
for channel in range(data.shape[1]):
|
||||||
|
filtered_data[:, channel] = bandpass_filter(data[:, channel], lowcut, highcut, fs, order)
|
||||||
|
else:
|
||||||
|
filtered_data = bandpass_filter(data, lowcut, highcut, fs, order)
|
||||||
|
|
||||||
|
write(output_file, fs, filtered_data.astype(np.int16))
|
||||||
|
|
||||||
|
def export_audio (video) :
|
||||||
|
print(f'Exporting audio from {video}...')
|
||||||
|
|
||||||
|
tmp_file, filename = tempfile.mkstemp(suffix='.wav')
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
'ffmpeg',
|
||||||
|
'-hwaccel', 'auto',
|
||||||
|
'-y',
|
||||||
|
'-i', f'"{video}"',
|
||||||
|
'-vn',
|
||||||
|
'-acodec', 'pcm_s16le',
|
||||||
|
f'"{filename}"'
|
||||||
|
]
|
||||||
|
|
||||||
|
proc = subprocess.Popen(' '.join(cmd), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell = True)
|
||||||
|
|
||||||
|
for line in proc.stdout :
|
||||||
|
#print(line.decode('ascii').strip())
|
||||||
|
a=1
|
||||||
|
|
||||||
|
return filename
|
||||||
|
|
||||||
|
audio_file = export_audio(args.video)
|
||||||
|
print(audio_file)
|
||||||
|
apply_bandpass_to_wav(audio_file, 'extracted.wav', 290, 310)
|
||||||
|
|
||||||
|
#rm(audio_file)
|
|
@ -0,0 +1,3 @@
|
||||||
|
opencv-python
|
||||||
|
numpy
|
||||||
|
scipy
|
|
@ -0,0 +1,116 @@
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import cv2
|
||||||
|
from time import strftime, localtime
|
||||||
|
import json
|
||||||
|
|
||||||
|
qcd = cv2.QRCodeDetector()
|
||||||
|
|
||||||
|
parser = ArgumentParser(prog='python3 video_detect.py', description='Example script that uses ffmpeg, ffprobe and qreader to detect start of slate')
|
||||||
|
parser.add_argument('video', type=str, help='Video file to analyze')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not os.path.exists(args.video) :
|
||||||
|
print(f'Video {args.video} does not exist')
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
print(f'Detecting QR in {args.video}')
|
||||||
|
|
||||||
|
def rm (file) :
|
||||||
|
if os.path.exists(file):
|
||||||
|
os.remove(file)
|
||||||
|
|
||||||
|
def getFrameCount (video) :
|
||||||
|
count = -1
|
||||||
|
cmd = [
|
||||||
|
'ffprobe',
|
||||||
|
'-v', 'error',
|
||||||
|
'-count_frames',
|
||||||
|
'-select_streams', 'v:0',
|
||||||
|
'-show_entries', 'stream=nb_read_frames',
|
||||||
|
'-of', 'default=nokey=1:noprint_wrappers=1',
|
||||||
|
f'"{video}"'
|
||||||
|
]
|
||||||
|
|
||||||
|
proc = subprocess.Popen(' '.join(cmd), stdout=subprocess.PIPE, shell=True)
|
||||||
|
|
||||||
|
for line in proc.stdout :
|
||||||
|
count = int(line.decode('ascii').strip())
|
||||||
|
break
|
||||||
|
|
||||||
|
#print(' '.join(cmd))
|
||||||
|
|
||||||
|
return count
|
||||||
|
|
||||||
|
def exportFrame (video, frame) :
|
||||||
|
epoch = None
|
||||||
|
success = False
|
||||||
|
tmp_file, filename = tempfile.mkstemp(suffix='.png')
|
||||||
|
cmd = [
|
||||||
|
'ffmpeg',
|
||||||
|
'-hwaccel', 'auto',
|
||||||
|
'-y',
|
||||||
|
'-i', f'"{video}"',
|
||||||
|
'-vf', f"select='gte(n\\,{frame})'",
|
||||||
|
'-vframes', '1',
|
||||||
|
'-compression_algo', 'raw',
|
||||||
|
'-pix_fmt', 'rgb24',
|
||||||
|
'-crf', '0',
|
||||||
|
f'"{filename}"'
|
||||||
|
]
|
||||||
|
|
||||||
|
proc = subprocess.Popen(' '.join(cmd), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell = True)
|
||||||
|
|
||||||
|
for line in proc.stdout :
|
||||||
|
#print(line.decode('ascii').strip())
|
||||||
|
a=1
|
||||||
|
|
||||||
|
#print(filename)
|
||||||
|
|
||||||
|
img = cv2.cvtColor(cv2.imread(filename), cv2.COLOR_BGR2RGB)
|
||||||
|
|
||||||
|
retval, decoded_info, points, straight_qrcode = qcd.detectAndDecodeMulti(img)
|
||||||
|
|
||||||
|
if not retval or decoded_info == '()' :
|
||||||
|
print(f'[{frame}] No QR detected')
|
||||||
|
else :
|
||||||
|
for qr in decoded_info :
|
||||||
|
if len(qr) == len('9999999999') :
|
||||||
|
print(points)
|
||||||
|
epoch = int(qr)
|
||||||
|
success = True
|
||||||
|
|
||||||
|
rm(filename)
|
||||||
|
|
||||||
|
return success, epoch
|
||||||
|
|
||||||
|
count = getFrameCount(args.video)
|
||||||
|
|
||||||
|
print(f'Video has {count} frames')
|
||||||
|
|
||||||
|
for i in range(1, count):
|
||||||
|
success, epoch = exportFrame(args.video, i)
|
||||||
|
if success :
|
||||||
|
obj = {
|
||||||
|
'frame' : i,
|
||||||
|
'frames' : count,
|
||||||
|
'start_time' : epoch,
|
||||||
|
'time_str' : strftime('%Y-%m-%d %H:%M:%S', localtime(epoch))
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(f'{args.video}.meta.json', 'w') as f :
|
||||||
|
f.write(json.dumps(obj))
|
||||||
|
|
||||||
|
print(json.dumps(obj))
|
||||||
|
break
|
||||||
|
|
||||||
|
#${this.bin} -hwaccel auto -y -i "${video}" -vf "select='gte(n\\,${frameNum})'${scale}" -vframes 1 -compression_algo raw -pix_fmt rgb24 -crf 0 "${tmpoutput}"
|
||||||
|
|
||||||
|
#ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames -of default=nokey=1:noprint_wrappers=1 "${video}"
|
||||||
|
|
||||||
|
#proc = subprocess.run(cmd, stdout=subprocess.PIPE)
|
||||||
|
#for line in proc.stdout :
|
||||||
|
# print(f'[BUILD] {line}')
|
||||||
|
|
Loading…
Reference in New Issue