| @ -0,0 +1,559 @@ | |||||
| # vim: fileencoding=utf-8 | |||||
| # vim: foldmethod=marker foldenable: | |||||
| """ | |||||
| [X] emoji | |||||
| [ ] wego icon | |||||
| [ ] v2.wttr.in | |||||
| [X] astronomical (sunset) | |||||
| [X] time | |||||
| [X] frames | |||||
| [X] colorize rain data | |||||
| [ ] date + locales | |||||
| [X] wind color | |||||
| [ ] highlight current date | |||||
| [ ] bind to real site | |||||
| [ ] max values: temperature | |||||
| [X] max value: rain | |||||
| [ ] comment github | |||||
| [ ] commit | |||||
| """ | |||||
| import sys | |||||
| import re | |||||
| import math | |||||
| import json | |||||
| import datetime | |||||
| import StringIO | |||||
| import requests | |||||
| import diagram | |||||
| import pyjq | |||||
| import pytz | |||||
| import numpy as np | |||||
| from astral import Astral, Location | |||||
| from scipy.interpolate import interp1d | |||||
| from babel.dates import format_datetime | |||||
| from globals import WWO_KEY | |||||
| import constants | |||||
| import translations | |||||
| import wttr_line | |||||
| reload(sys) | |||||
| sys.setdefaultencoding("utf-8") | |||||
| # data processing {{{ | |||||
| def get_data(config): | |||||
| """ | |||||
| Fetch data for `query_string` | |||||
| """ | |||||
| url = ( | |||||
| 'http://' | |||||
| 'localhost:5001/premium/v1/weather.ashx' | |||||
| '?key=%s' | |||||
| '&q=%s&format=json&num_of_days=3&tp=3&lang=None' | |||||
| ) % (WWO_KEY, config["location"]) | |||||
| text = requests.get(url).text | |||||
| parsed_data = json.loads(text) | |||||
| return parsed_data | |||||
| def interpolate_data(input_data, max_width): | |||||
| """ | |||||
| Resample `input_data` to number of `max_width` counts | |||||
| """ | |||||
| x = list(range(len(input_data))) | |||||
| y = input_data | |||||
| xvals = np.linspace(0, len(input_data)-1, max_width) | |||||
| yinterp = interp1d(x, y, kind='cubic') | |||||
| return yinterp(xvals) | |||||
| def jq_query(query, data_parsed): | |||||
| """ | |||||
| Apply `query` to structued data `data_parsed` | |||||
| """ | |||||
| pyjq_data = pyjq.all(query, data_parsed) | |||||
| data = map(float, pyjq_data) | |||||
| return data | |||||
| # }}} | |||||
| # utils {{{ | |||||
| def colorize(string, color_code): | |||||
| return "\033[%sm%s\033[0m" % (color_code, string) | |||||
| # }}} | |||||
| # draw_spark {{{ | |||||
| def draw_spark(data, height, width, color_data): | |||||
| """ | |||||
| Spark-style visualize `data` in a region `height` x `width` | |||||
| """ | |||||
| _BARS = u' _▁▂▃▄▅▇█' | |||||
| def _box(height, row, value, max_value): | |||||
| row_height = 1.0 * max_value / height | |||||
| if row_height * row >= value: | |||||
| return _BARS[0] | |||||
| if row_height * (row+1) <= value: | |||||
| return _BARS[-1] | |||||
| return _BARS[int(1.0*(value - row_height*row)/(row_height*1.0)*len(_BARS))] | |||||
| max_value = max(data) | |||||
| output = "" | |||||
| color_code = 20 | |||||
| for i in range(height): | |||||
| for j in range(width): | |||||
| character = _box(height, height-i-1, data[j], max_value) | |||||
| if data[j] != 0: | |||||
| chance_of_rain = color_data[j]/100.0 * 2 | |||||
| if chance_of_rain > 1: | |||||
| chance_of_rain = 1 | |||||
| color_index = int(5*chance_of_rain) | |||||
| color_code = 16 + color_index # int(math.floor((20-16) * 1.0 * (height-1-i)/height*(max_value/data[j]))) | |||||
| output += "\033[38;5;%sm%s\033[0m" % (color_code, character) | |||||
| output += "\n" | |||||
| # labeling max value | |||||
| if max_value == 0: | |||||
| max_line = " "*width | |||||
| else: | |||||
| max_line = "" | |||||
| for j in range(width): | |||||
| if data[j] == max_value: | |||||
| max_line = "%3.2fmm|%s%%" % (max_value, int(color_data[j])) | |||||
| orig_max_line = max_line | |||||
| # aligning it | |||||
| if len(max_line)/2 < j and len(max_line)/2 + j < width: | |||||
| spaces = " "*(j - len(max_line)/2) | |||||
| max_line = spaces + max_line # + spaces | |||||
| max_line = max_line + " "*(width - len(max_line)) | |||||
| elif len(max_line)/2 + j >= width: | |||||
| max_line = " "*(width - len(max_line)) + max_line | |||||
| max_line = max_line.replace(orig_max_line, colorize(orig_max_line, "38;5;33")) | |||||
| break | |||||
| if max_line: | |||||
| output = "\n" + max_line + "\n" + output + "\n" | |||||
| return output | |||||
| # }}} | |||||
| # draw_diagram {{{ | |||||
| def draw_diagram(data, height, width): | |||||
| option = diagram.DOption() | |||||
| option.size = diagram.Point([width, height]) | |||||
| option.mode = 'g' | |||||
| stream = StringIO.StringIO() | |||||
| gram = diagram.DGWrapper( | |||||
| data=[list(data), range(len(data))], | |||||
| dg_option=option, | |||||
| ostream=stream) | |||||
| gram.show() | |||||
| return stream.getvalue() | |||||
| # }}} | |||||
| # draw_date {{{ | |||||
| def draw_date(config, geo_data): | |||||
| """ | |||||
| """ | |||||
| tzinfo = pytz.timezone(geo_data["timezone"]) | |||||
| locale = config.get("locale", "en_US") | |||||
| datetime_day_start = datetime.datetime.utcnow() | |||||
| answer = "" | |||||
| for day in range(3): | |||||
| datetime_ = datetime_day_start + datetime.timedelta(hours=24*day) | |||||
| date = format_datetime(datetime_, "EEE dd MMM", locale=locale, tzinfo=tzinfo) | |||||
| spaces = ((24-len(date))/2)*" " | |||||
| date = spaces + date + spaces | |||||
| date = " "*(24-len(date)) + date | |||||
| answer += date | |||||
| answer += "\n" | |||||
| for _ in range(3): | |||||
| answer += " "*23 + u"╷" | |||||
| return answer[:-1] + " " | |||||
| # }}} | |||||
| # draw_time {{{ | |||||
| def draw_time(geo_data): | |||||
| """ | |||||
| """ | |||||
| tzinfo = pytz.timezone(geo_data["timezone"]) | |||||
| line = ["", ""] | |||||
| for _ in range(3): | |||||
| part = u"─"*5 + u"┴" + u"─"*5 | |||||
| line[0] += part + u"┼" + part + u"╂" | |||||
| line[0] += "\n" | |||||
| for _ in range(3): | |||||
| line[1] += " 6 12 18 " | |||||
| line[1] += "\n" | |||||
| # highlight current time | |||||
| hour_number = \ | |||||
| (datetime.datetime.now(tzinfo) | |||||
| - datetime.datetime.now(tzinfo).replace(hour=0, minute=0, second=0, microsecond=0) | |||||
| ).seconds//3600 | |||||
| for line_number, _ in enumerate(line): | |||||
| line[line_number] = \ | |||||
| line[line_number][:hour_number] \ | |||||
| + colorize(line[line_number][hour_number], "46") \ | |||||
| + line[line_number][hour_number+1:] | |||||
| return "".join(line) | |||||
| # }}} | |||||
| # draw_astronomical {{{ | |||||
| def draw_astronomical(city_name, geo_data): | |||||
| datetime_day_start = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |||||
| a = Astral() | |||||
| a.solar_depression = 'civil' | |||||
| city = Location() | |||||
| city.latitude = geo_data["latitude"] | |||||
| city.longitude = geo_data["longitude"] | |||||
| city.timezone = geo_data["timezone"] | |||||
| answer = "" | |||||
| moon_line = "" | |||||
| for time_interval in range(72): | |||||
| current_date = ( | |||||
| datetime_day_start | |||||
| + datetime.timedelta(hours=1*time_interval)).replace(tzinfo=pytz.timezone(geo_data["timezone"])) | |||||
| sun = city.sun(date=current_date, local=False) | |||||
| dawn = sun['dawn'] # .replace(tzinfo=None) | |||||
| dusk = sun['dusk'] # .replace(tzinfo=None) | |||||
| sunrise = sun['sunrise'] # .replace(tzinfo=None) | |||||
| sunset = sun['sunset'] # .replace(tzinfo=None) | |||||
| if current_date < dawn: | |||||
| char = " " | |||||
| elif current_date > dusk: | |||||
| char = " " | |||||
| elif dawn < current_date and current_date < sunrise: | |||||
| char = u"─" | |||||
| elif sunset < current_date and current_date < dusk: | |||||
| char = u"─" | |||||
| elif sunrise < current_date and current_date < sunset: | |||||
| char = u"━" | |||||
| answer += char | |||||
| # moon | |||||
| if time_interval % 3 == 0: | |||||
| moon_phase = city.moon_phase( | |||||
| date=datetime_day_start + datetime.timedelta(hours=time_interval)) | |||||
| moon_phase_emoji = constants.MOON_PHASES[int(math.floor(moon_phase*1.0/28.0*8))] | |||||
| if time_interval in [0, 24, 48, 69]: | |||||
| moon_line += moon_phase_emoji + " " | |||||
| else: | |||||
| moon_line += " " | |||||
| answer = moon_line + "\n" + answer + "\n" | |||||
| answer += "\n" | |||||
| return answer | |||||
| # }}} | |||||
| # draw_emoji {{{ | |||||
| def draw_emoji(data): | |||||
| answer = "" | |||||
| for i in data: | |||||
| emoji = constants.WEATHER_SYMBOL.get( | |||||
| constants.WWO_CODE.get( | |||||
| str(int(i)), "Unknown")) | |||||
| space = " "*(3-constants.WEATHER_SYMBOL_WIDTH_VTE.get(emoji)) | |||||
| answer += emoji + space | |||||
| answer += "\n" | |||||
| return answer | |||||
| # }}} | |||||
| # draw_wind {{{ | |||||
| def draw_wind(data, color_data): | |||||
| def _color_code_for_wind_speed(wind_speed): | |||||
| color_codes = [ | |||||
| (3, 82), # 82 | |||||
| (6, 118), # 118 | |||||
| (9, 154), # 154 | |||||
| (12, 250), # 190 | |||||
| (15, 246), # 226 | |||||
| (19, 253), # 220 | |||||
| (23, 214), | |||||
| (27, 208), | |||||
| (31, 202), | |||||
| (-1, 196) | |||||
| ] | |||||
| color_codes = [ | |||||
| (3, 241), # 82 | |||||
| (6, 242), # 118 | |||||
| (9, 243), # 154 | |||||
| (12, 246), # 190 | |||||
| (15, 250), # 226 | |||||
| (19, 253), # 220 | |||||
| (23, 214), | |||||
| (27, 208), | |||||
| (31, 202), | |||||
| (-1, 196) | |||||
| ] | |||||
| for this_wind_speed, this_color_code in color_codes: | |||||
| if wind_speed <= this_wind_speed: | |||||
| return this_color_code | |||||
| return color_codes[-1][1] | |||||
| answer = "" | |||||
| answer_line2 = "" | |||||
| for j, degree in enumerate(data): | |||||
| degree = int(degree) | |||||
| if degree: | |||||
| wind_direction = constants.WIND_DIRECTION[((degree+22)%360)/45] | |||||
| else: | |||||
| wind_direction = "" | |||||
| color_code = "38;5;%s" % _color_code_for_wind_speed(int(color_data[j])) | |||||
| answer += " %s " % colorize(wind_direction, color_code) | |||||
| # wind_speed | |||||
| wind_speed = int(color_data[j]) | |||||
| wind_speed_str = colorize(str(wind_speed), color_code) | |||||
| if wind_speed < 10: | |||||
| wind_speed_str = " " + wind_speed_str + " " | |||||
| elif wind_speed < 100: | |||||
| wind_speed_str = " " + wind_speed_str | |||||
| answer_line2 += wind_speed_str | |||||
| answer += "\n" | |||||
| answer += answer_line2 + "\n" | |||||
| return answer | |||||
| # }}} | |||||
| # panel implementation {{{ | |||||
| def add_frame(output, width, config): | |||||
| """ | |||||
| Add frame arond `output` that has width `width` | |||||
| """ | |||||
| empty_line = " "*width | |||||
| output = "\n".join(u"│"+(x or empty_line)+u"│" for x in output.splitlines()) + "\n" | |||||
| weather_report = \ | |||||
| translations.CAPTION[config["lang"]] \ | |||||
| + " " \ | |||||
| + (config["override_location"] or config["location"]) | |||||
| caption = u"┤ " + " " + weather_report + " " + u" ├" | |||||
| output = u"┌" + caption + u"─"*(width-len(caption)) + u"┐\n" \ | |||||
| + output + \ | |||||
| u"└" + u"─"*width + u"┘\n" | |||||
| return output | |||||
| def generate_panel(data_parsed, geo_data, config): | |||||
| """ | |||||
| """ | |||||
| max_width = 72 | |||||
| precip_mm_query = "[.data.weather[] | .hourly[]] | .[].precipMM" | |||||
| precip_chance_query = "[.data.weather[] | .hourly[]] | .[].chanceofrain" | |||||
| feels_like_query = "[.data.weather[] | .hourly[]] | .[].FeelsLikeC" | |||||
| weather_code_query = "[.data.weather[] | .hourly[]] | .[].weatherCode" | |||||
| wind_direction_query = "[.data.weather[] | .hourly[]] | .[].winddirDegree" | |||||
| wind_speed_query = "[.data.weather[] | .hourly[]] | .[].windspeedKmph" | |||||
| output = "" | |||||
| output += "\n\n" | |||||
| output += draw_date(config, geo_data) | |||||
| output += "\n" | |||||
| output += "\n" | |||||
| output += "\n" | |||||
| data = jq_query(feels_like_query, data_parsed) | |||||
| data_interpolated = interpolate_data(data, max_width) | |||||
| output += draw_diagram(data_interpolated, 10, max_width) | |||||
| output += "\n" | |||||
| output += draw_time(geo_data) | |||||
| data = jq_query(precip_mm_query, data_parsed) | |||||
| color_data = jq_query(precip_chance_query, data_parsed) | |||||
| data_interpolated = interpolate_data(data, max_width) | |||||
| color_data_interpolated = interpolate_data(color_data, max_width) | |||||
| output += draw_spark(data_interpolated, 5, max_width, color_data_interpolated) | |||||
| output += "\n" | |||||
| data = jq_query(weather_code_query, data_parsed) | |||||
| output += draw_emoji(data) | |||||
| data = jq_query(wind_direction_query, data_parsed) | |||||
| color_data = jq_query(wind_speed_query, data_parsed) | |||||
| output += draw_wind(data, color_data) | |||||
| output += "\n" | |||||
| output += draw_astronomical(config["location"], geo_data) | |||||
| output += "\n" | |||||
| output = add_frame(output, max_width, config) | |||||
| return output | |||||
| # }}} | |||||
| # textual information {{{ | |||||
| def textual_information(data_parsed, geo_data, config): | |||||
| """ | |||||
| Add textual information about current weather and | |||||
| astronomical conditions | |||||
| """ | |||||
| def _shorten_full_location(full_location, city_only=False): | |||||
| def _count_runes(string): | |||||
| return len(string.encode('utf-16-le')) // 2 | |||||
| words = full_location.split(",") | |||||
| output = words[0] | |||||
| if city_only: | |||||
| return output | |||||
| for word in words[1:]: | |||||
| if _count_runes(output + "," + word) > 50: | |||||
| return output | |||||
| output += "," + word | |||||
| return output | |||||
| city = Location() | |||||
| city.latitude = geo_data["latitude"] | |||||
| city.longitude = geo_data["longitude"] | |||||
| city.timezone = geo_data["timezone"] | |||||
| output = [] | |||||
| timezone = city.timezone | |||||
| datetime_day_start = datetime.datetime.now()\ | |||||
| .replace(hour=0, minute=0, second=0, microsecond=0) | |||||
| sun = city.sun(date=datetime_day_start, local=True) | |||||
| format_line = "%c %C, %t, %h, %w, %P" | |||||
| current_condition = data_parsed['data']['current_condition'][0] | |||||
| query = {} | |||||
| weather_line = wttr_line.render_line(format_line, current_condition, query) | |||||
| output.append('Weather: %s' % weather_line) | |||||
| output.append('Timezone: %s' % timezone) | |||||
| tmp_output = [] | |||||
| tmp_output.append(' Now: %s' | |||||
| % datetime.datetime.now(pytz.timezone(timezone)).strftime("%H:%M:%S%z")) | |||||
| tmp_output.append('Dawn: %s' | |||||
| % str(sun['dawn'].strftime("%H:%M:%S"))) | |||||
| tmp_output.append('Sunrise: %s' | |||||
| % str(sun['sunrise'].strftime("%H:%M:%S"))) | |||||
| tmp_output.append(' Noon: %s' | |||||
| % str(sun['noon'].strftime("%H:%M:%S "))) | |||||
| tmp_output.append('Sunset: %s' | |||||
| % str(sun['sunset'].strftime("%H:%M:%S"))) | |||||
| tmp_output.append('Dusk: %s' | |||||
| % str(sun['dusk'].strftime("%H:%M:%S"))) | |||||
| tmp_output = [ | |||||
| re.sub("^([A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), x) | |||||
| for x in tmp_output] | |||||
| output.append( | |||||
| "%20s" % tmp_output[0] \ | |||||
| + " | %20s " % tmp_output[1] \ | |||||
| + " | %20s" % tmp_output[2]) | |||||
| output.append( | |||||
| "%20s" % tmp_output[3] \ | |||||
| + " | %20s " % tmp_output[4] \ | |||||
| + " | %20s" % tmp_output[5]) | |||||
| city_only = False | |||||
| suffix = "" | |||||
| if "Simferopol" in timezone: | |||||
| city_only = True | |||||
| suffix = ", Крым" | |||||
| if config["full_address"]: | |||||
| output.append('Location: %s%s [%5.4f,%5.4f]' \ | |||||
| % ( | |||||
| _shorten_full_location(config["full_address"], city_only=city_only), | |||||
| suffix, | |||||
| geo_data["latitude"], | |||||
| geo_data["longitude"], | |||||
| )) | |||||
| output = [ | |||||
| re.sub("^( *[A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), | |||||
| re.sub("^( +[A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), | |||||
| re.sub(r"(\|)", lambda m: colorize(m.group(1), "2"), x))) | |||||
| for x in output] | |||||
| return "".join("%s\n" % x for x in output) | |||||
| # }}} | |||||
| # get_geodata {{{ | |||||
| def get_geodata(location): | |||||
| text = requests.get("http://localhost:8004/%s" % location).text | |||||
| return json.loads(text) | |||||
| # }}} | |||||
| def main(location, override_location=None, data=None, full_address=None): | |||||
| config = { | |||||
| "lang": "en", | |||||
| "locale": "en_US", | |||||
| "location": location, | |||||
| "override_location": override_location, | |||||
| "full_address": full_address, | |||||
| } | |||||
| geo_data = get_geodata(location) | |||||
| if data is None: | |||||
| data_parsed = get_data(config) | |||||
| else: | |||||
| data_parsed = data | |||||
| output = generate_panel(data_parsed, geo_data, config) | |||||
| output += textual_information(data_parsed, geo_data, config) | |||||
| return output | |||||
| if __name__ == '__main__': | |||||
| sys.stdout.write(main(sys.argv[1])) | |||||