from argparse import ArgumentParser,ArgumentTypeError import csv import asyncio from pyppeteer import launch from pyquery import PyQuery as pq import json import math from os import linesep as EOL ua = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0' sources = ['artcraft', 'photographers_formulary', 'bandh'] csvFile = './sources.csv' csvOutput = './supply.csv' def is_source (source) : if source in sources : return source raise ArgumentTypeError('Not in list') async def artcraft (url, chemical) : browser = await launch() page = await browser.newPage() await page.setUserAgent(ua) print(url) await page.goto(url) await asyncio.sleep(1) html = await page.content() #with open('./test.html', 'w') as file : # file.write(html) data = parse_artcraft_product(html) await asyncio.sleep(5) for variant in data : #print(variant) variant_url = f'{url}?variant={variant["id"]}' print(variant_url) await page.goto(variant_url) html = await page.content() variant['url'] = variant_url variant['price'] = parse_artcraft_price(html) await browser.close() #print(html) for v in data : artcraft_line(chemical, v) async def artcraft_test (url, chemical) : print(url) with open('./test.html', 'r') as file : html = file.read() data = parse_artcraft_product(html) print(data) def artcraft_line (chemical, v) : #chemical,url,grams,milliliters,price line = f'{chemical},{v["url"]},{v["weight"]},,{v["price"]}{EOL}' print(line) with open(csvOutput, 'a') as file : file.write(line) def parse_artcraft_id (text) : return text.split('-')[2].split('_')[0] def parse_artcraft_weight (text) : text = text.lower() text = text.replace('-1/2', '.5') if 'pound' in text : val = float(text.split(' ')[0]) * 453.592 elif 'gram' in text : val = float(text.split(' ')[0]) else : val = None return val def parse_artcraft_price (html) : #with open('./test.html', 'w') as file : # file.write(html) d = pq(html) price = None for p in d('span.price-item').items() : price = p.text() break #print(price) if price is None : return None dollars = price.replace('$', '').replace(' ', '').replace('USD', '').strip() #print(dollars) cents = math.ceil(float(dollars) * 100.0) return cents def parse_artcraft_product (html) : d = pq(html) data = [] scr = d('variant-radios script').text() objs = json.loads(scr) for o in objs: data.append({ 'id' : o['id'], 'weight' : parse_artcraft_weight(o['title']) }) return data async def photographers_formulary (url, chemical) : browser = await launch() page = await browser.newPage() await page.setUserAgent(ua) print(url) await page.goto(url) await asyncio.sleep(1) html = await page.content() #with open('./test.html', 'w') as file : # file.write(html) data = parse_photographers_formulary_product(html) for b in data : await asyncio.sleep(2) print(b['weight']) b['url'] = url if b['type'] == 'li' : index = b['index'] + 1 await page.click(f'.productOptionViewRadio ul li:nth-of-type({index}) label input') await asyncio.sleep(2) html = await page.content() b['price'] = parse_photographers_formulary_price(html) #print(b['price']) elif b['type'] == 'select' : await page.select('.productOptionViewSelect select', b['value']) await asyncio.sleep(2) html = await page.content() b['price'] = parse_photographers_formulary_price(html) #print(b['price']) await browser.close() for d in data : photographers_formulary_line(chemical, d) async def photographers_formulary_test (url, chemical) : print(url) with open('./test.html', 'r') as file : html = file.read() data = parse_photographers_formulary_product(html) print(data) def parse_photographers_formulary_product (html) : d = pq(html) data = [] i = 0 select = d('.productOptionViewSelect select') if len(select) == 0 : for b in d('.productOptionViewRadio ul li').items() : data.append({ 'index' : i, 'type' : 'li', 'weight' : parse_photographers_formulary_weight(b.find('span').text()) }) i+=1 elif len(select) == 1 : for o in d('.productOptionViewSelect select option').items() : if o.attr('value').strip() != '' : data.append({ 'index' : i, 'type' : 'select', 'value' : o.attr('value'), 'weight' : parse_photographers_formulary_weight(o.text()) }) i+=1 return data def parse_photographers_formulary_weight (text) : parts = text.split(' ') if parts[2] == 'g' : return float(parts[1]) elif parts[2] == 'lb' : return float(parts[1]) * 453.592 else : return None def parse_photographers_formulary_price (html) : #with open('./test.html', 'w') as file : # file.write(html) d = pq(html) price = None for p in d('em.ProductPrice').items() : price = p.text() break #print(price) if price is None : return None dollars = price.replace('$', '').strip() #print(dollars) cents = math.ceil(float(dollars) * 100.0) return cents def photographers_formulary_line (chemical, v) : #chemical,url,grams,milliliters,price line = f'{chemical},{v["url"]},{v["weight"]},,{v["price"]}{EOL}' print(line) with open(csvOutput, 'a') as file : file.write(line) async def main () : parser = ArgumentParser(description='Refresh prices from sources') parser.add_argument('-s', '--source', type=is_source, required=False, default=None, help='Only run on single source') args = parser.parse_args() #with open(csvOutput, 'w') as file : # file.write('chemical,url,grams,milliliters,price' + EOL) with open(csvFile, newline='') as csvfile: reader = csv.reader(csvfile, delimiter=',', quotechar='|') for row in reader: source = row[0] chemical = row[1] url = row[2] if source == 'source' or (args.source is not None and source != args.source) : continue if source == 'artcraft' : await artcraft(url, chemical) elif source == 'photographers_formulary' : await photographers_formulary(url, chemical) if __name__ == '__main__' : asyncio.get_event_loop().run_until_complete(main())