from sklearn.cluster import MiniBatchKMeans import numpy as np import argparse import cv2 from common import convert_color, closest_color_weighted_euclidean, closest_color_euclidean, create_colored_image, remove_from_list, list_match, to_luma, convertScale import os import subprocess from multiprocessing.pool import ThreadPool from functools import cmp_to_key def sort_pallete (a, b): A = to_luma(a, 'BGR') B = to_luma(b, 'BGR') if A > B : return 1 if B < A : return -1 return 0 class Posterize: """Posterize an image and then find nearest colors to use""" colors = [] colors_dict = {} original_colors = [] layers = [] previews = [] svgs = [] headless = False jobs = 1 pallete = None pallete_space = 'BGR' comparison_space = 'BGR' image = None h = 0 w = 0 n_colors = 3 max_particles = 17000 conf = os.path.abspath('./conf/base.conf') stipple_gen = os.path.abspath('../../stipple_gen') white = [255, 255, 255] output = None def __init__ (self, image, pallete, n_colors, output, headless, jobs) : self.image = cv2.imread(image) (self.h, self.w) = self.image.shape[:2] self.pallete = pallete self.n_colors = n_colors + 1 self.output = output self.headless = headless self.jobs = jobs if not os.path.exists(self.output) : print(f'Output directory {self.output} does not exist, creating...') os.makedirs(self.output) self.flatten_pallete() self.posterize() self.determine_colors() self.stipple() self.preview() self.optimize() def posterize (self): lab = cv2.cvtColor(self.image, cv2.COLOR_BGR2LAB) feature = lab.reshape((self.h * self.w, 3)) clusters = MiniBatchKMeans(n_clusters = self.n_colors, n_init = 'auto') labels = clusters.fit_predict(feature) quant = clusters.cluster_centers_.astype('uint8')[labels] rquant = quant.reshape((self.h, self.w, 3)) rfeature = feature.reshape((self.h, self.w, 3)) bgrquant = cv2.cvtColor(rquant, cv2.COLOR_LAB2BGR) #bgrfeature = cv2.cvtColor(rfeature, cv2.COLOR_LAB2BGR) self.image = bgrquant self.show(bgrquant) def determine_colors (self): reshaped = self.image.reshape(-1, self.image.shape[2]) self.original_colors = np.unique(reshaped, axis=0) white, white_dist = closest_color_weighted_euclidean(self.original_colors, [255, 255, 255], 'BGR') blank = create_colored_image(self.w, self.h, [255, 255, 255]) composite = create_colored_image(self.w, self.h, [255, 255, 255]) mask = self.extract_color_mask(self.image, white) layer_name = f'WHITE.png' output_layer = os.path.join(self.output, layer_name) cv2.imwrite(output_layer, mask) self.layers.append({ 'layer' : output_layer, 'color' : white, 'space' : self.pallete_space }) for i in range(self.n_colors) : if list_match(self.original_colors[i], white) : continue original = self.original_colors[i] #BGR mask = self.extract_color_mask(self.image, original) original_normalized = convert_color(original, 'BGR', self.pallete_space) if self.pallete_space == 'RGB' or self.pallete_space == 'BGR' : closest, dist = closest_color_weighted_euclidean(self.colors, original_normalized, self.pallete_space) else : closest, dist = closest_color_euclidean(self.colors, original_normalized) self.colors = remove_from_list(self.colors, closest) name = self.match_color_name(closest) layer_name = f'{name}.png' output_layer = os.path.join(self.output, layer_name) cv2.imwrite(output_layer, mask) self.layers.append({ 'layer' : output_layer, 'color' : closest, 'space' : self.pallete_space }) mask = cv2.bitwise_not(mask) composite[mask > 0] = np.array(closest) composite_name = f'posterized.png' composite_path = os.path.join(self.output, composite_name) cv2.imwrite(composite_path, composite) self.show(composite) def extract_color_mask (self, image, color): mask = cv2.inRange(image, color, color) return cv2.bitwise_not(mask) def flatten_pallete (self) : for color in self.pallete.colors: self.colors.append(color['color']) self.colors_dict[f"{color['color'][0]},{color['color'][1]},{color['color'][2]}"] = color['name'] self.pallete_space = color['space'] #self.colors = sorted(self.colors, key=cmp_to_key(sort_pallete)) #for color in self.colors : # print(to_luma(color, self.pallete_space)) #quit() def match_color_name (self, key) : return self.colors_dict[f'{key[0]},{key[1]},{key[2]}'] def stipple (self) : sanity_check = 0 cmds = [] for layer in self.layers : if 'WHITE.png' in layer['layer'] : continue l = cv2.imread(layer['layer'], 0) (h, w) = l.shape[:2] total = h * w black = total - cv2.countNonZero(l) ratio = black/total max_particles = round(ratio * self.max_particles) input_image = os.path.abspath(layer['layer']) dir_name, file_name = os.path.split(input_image) file_part, ext = os.path.splitext(file_name) output_image = os.path.join(dir_name, f'{file_part}_preview.png') output_svg = os.path.join(dir_name, f'{file_part}.svg') cmd = [ 'bash', 'stipple_gen.sh', '--inputImage', input_image, '--outputImage', output_image, '--outputSVG', output_svg, '--config', self.conf, '--maxParticles', str(max_particles) ] cmds.append({ 'cmd' : cmd, 'color' : layer['color'], 'output_image' : output_image, 'output_svg' : output_svg }) if self.jobs > 1 and len(cmds) > 1: print(f'Running {self.jobs} stipple_gen processes simultaneously for {len(cmds)} jobs') results = [] pool = ThreadPool(self.jobs) for job in cmds : result = pool.apply_async(self.render, (job,)) results.append(result) [result.wait() for result in results] else : for job in cmds : self.render(job) def render (self, job) : name = os.path.basename(job['output_svg']) print(f'Starting stipple_gen {name}...') proc = subprocess.Popen(job['cmd'], cwd = self.stipple_gen, stdout=subprocess.PIPE) while True: line = proc.stdout.readline() if not line: break l = line.decode('ascii').strip() print(f'[{name}] {l}') self.svgs.append(job['output_svg']) self.previews.append({ 'layer' : job['output_image'], 'color' : job['color'] }) def preview (self) : composite = create_colored_image(self.w, self.h, [255, 255, 255]) for layer in self.previews : l = cv2.imread(layer['layer'], 0) mask = cv2.bitwise_not(l) composite[mask > 0] = np.array(convert_color(layer['color'], self.pallete_space, 'BGR')) composite_name = f'preview.png' composite_path = os.path.join(self.output, composite_name) cv2.imwrite(composite_path, composite) self.show(composite) def optimize (self) : if self.jobs > 1 and len(self.svgs) > 1: print(f'Running {self.jobs} svgopt processes simultaneously for {len(self.svgs)} jobs') results = [] pool = ThreadPool(self.jobs) for svg in self.svgs: result = pool.apply_async(self.svgopt, (svg,)) results.append(result) [result.wait() for result in results] else : for svg in self.svgs : self.svgopt(svg) def svgopt (self, svg) : name = os.path.basename(svg) cmd = [ 'svgopt', svg, svg] print(f'Optimizing {name}...') proc = subprocess.Popen(cmd, cwd = self.stipple_gen, stdout=subprocess.PIPE) while True: line = proc.stdout.readline() if not line: break l = line.decode('ascii').strip() print(f'[{name}] {l}') def show (self, mat) : if not self.headless : cv2.imshow('image', mat) cv2.waitKey(0) cv2.destroyAllWindows()