Исходный код tabun_api.utils
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import re
import sys
import time
import random
import logging
import platform
import mimetypes
from hashlib import md5
import lxml
import lxml.html
import lxml.etree
# import html5lib
import iso8601
from .compat import text, text_types, binary, urequest, PY2, BaseCookie
#: Логгер tabun_api.
logger = logging.getLogger('tabun_api')
#: Месяцы, для парсинга даты.
mons = ('января', 'февраля', 'марта', 'апреля', 'мая', 'июня', 'июля', 'августа', 'сентября', 'октября', 'ноября', 'декабря')
#: Блочные элементы, для красивого вывода в htmlToString (устарело, используйте HTMLFormatter.block_elems)
block_elems = ("div", "p", "blockquote", "section", "ul", "li", "h1", "h2", "h3", "h4", "h5", "h6")
#: Регулярка для парсинга ютуба для выдирания превьюшки.
youtube_regex = re.compile(r'youtube.com\/embed\/(.{10,15})((\?)|($))')
#: Регулярка для парсинга ссылки на аватарку — из неё можно узнать много полезного!
ava_regex = re.compile(r"\/((images)|(storage))\/([0-9]+)\/([0-9]+)\/([0-9]+)\/([0-9]+)\/([0-9]+)\/([0-9]+)\/avatar_([0-9]+)x([0-9]+)\.(...)(\?([0-9]+))?")
#: Регулярка для расшифровки почты, которую шифрует CloudFlare.
cf_email = re.compile(r'<[A-Za-z]+( href="/cdn-cgi/l/email-protection")? class="__cf_email__".*? data-cfemail="([0-9a-f]+)".+?</[A-Za-z]+>', re.DOTALL)
cf_email_b = re.compile(r'<[A-Za-z]+( href="/cdn-cgi/l/email-protection")? class="__cf_email__".*? data-cfemail="([0-9a-f]+)".+?</[A-Za-z]+>'.encode('utf-8'), re.DOTALL)
#: Тоже регулярка для расшифровки почты, которую шифрует CloudFlare, но для ссылок.
cf_email_a = re.compile(r'<[Aa]\s([^>]*)href="/cdn-cgi/l/email-protection#([0-9a-f]+)"(\s[^>]*)?>', re.DOTALL)
cf_email_a_b = re.compile(r'<[Aa]\s([^>]*)href="/cdn-cgi/l/email-protection#([0-9a-f]+)"(\s[^>]*)?>'.encode('utf-8'), re.DOTALL)
#: Регулярка, убирающая скрипты расшифровки почты от CloudFlare.
cf_email_s = re.compile(r'<script.{1,2048}getAttribute\(.data-cfemail.\).{1,2048}</script>', re.DOTALL)
cf_email_s_b = re.compile(r'<script.{1,2048}getAttribute\(.data-cfemail.\).{1,2048}</script>'.encode('utf-8'), re.DOTALL)
[документация]def parse_html(data, encoding='utf-8'):
"""Парсит HTML-код и возвращает lxml.etree-элемент."""
# if isinstance(data, text): encoding = None
# doc = html5lib.parse(data, treebuilder="lxml", namespaceHTMLElements=False, encoding=encoding)
if isinstance(data, binary):
data = data.decode(encoding, "replace")
doc = lxml.html.fromstring(data)
return doc
[документация]def parse_html_fragment(data, encoding='utf-8'):
"""Парсит кусок HTML-кода и возвращает список lxml.etree-элементов и строк."""
# if isinstance(data, text): encoding = None
# doc = html5lib.parseFragment(data, treebuilder="lxml", namespaceHTMLElements=False, encoding=encoding)
if isinstance(data, binary):
data = data.decode(encoding, "replace")
doc = lxml.html.fragments_fromstring(data)
return doc
[документация]class HTMLFormatter(object):
"""Гибкий и расширяемый конвертер lxml-элементов в красивые строки
подобно браузеру links и родственным ему.
В словаре ``params`` можно указать следующие параметры:
* ``fancy`` (True/False, по умолчанию True) — грамотно форматирует спойлеры
и убирает кнопку ката при наличии
* ``strike_mode`` (``unicode`` (по умолчанию) или ``html``) — как форматировать
зачёркивания
* ``vk_links`` (True/False, по умолчанию False) — преобразует ссылки вида
``https://vk.com/foo`` в ``@foo (текст ссылки)`` для отправки во ВКонтакте
* ``disable_links`` (True/False, по умолчанию False) — удаляет ссылки, если
текст ссылки совпадает с самой ссылкой (примитивный антиспам)
Пример использования::
node = parse_html_fragment('<div>a <s>bc</s> <hr/> d ef<br/>g<a></a>cut</div>')[0]
HTMLFormatter({'strike_mode': 'html'}).format(node, with_cutted=False)
Результат::
a <s>bc</s>
=====
d ef
g
"""
NEWLINE = -1
block_elems = ("div", "p", "blockquote", "section", "ul", "li", "h1", "h2", "h3", "h4", "h5", "h6")
fancy = True
strike_mode = 'unicode'
vk_links = False
disable_links = False
def __init__(self, params=None):
for k in ('fancy', 'strike_mode', 'vk_links', 'disable_links'):
if params and k in params:
setattr(self, k, params[k])
[документация] def format(self, node, with_cutted=True):
"""Форматирует lxml-элемент.
:param node: сам lxml-элемент
:param bool with_cutted: если False, обработка будет прекращена после
натыкания на тег-кат (когда :func:`~tabun_api.utils.is_cut` вернёт True)
:rtype: строка
"""
return self.format_part(node, with_cutted)[1]
def format_part(self, node, with_cutted=True):
if isinstance(node, text_types):
return False, text(node)
if node.text:
data = [self.format_text(self.escape(node.text), [])]
else:
data = ['']
depth = [node] # for <div><ul><li>: [<div>, <ul>]
full_queue = [list(node.getchildren())] # for <div><ul><li>: [[div children after current ul], [ul children after current li]]
full_queue[0].reverse()
# Самые часто используемые функции вытащим заранее (говорят, так слегка быстрее)
process_br = getattr(self, 'process_br', None)
process_a = getattr(self, 'process_a', None)
process_span = getattr(self, 'process_span', None)
default_process = self.default_process
format_item_tail = self.format_item_tail
cut_stop = False
while True:
# Проверяем, что очередь не пуста
while full_queue and not full_queue[-1]:
full_queue.pop()
depth.pop()
if not full_queue or not depth:
assert not depth
assert not full_queue
break
# Берём следующий элемент (может быть любой вложенности — избегаем рекурсии зазря)
item = full_queue[-1].pop()
# С помощью специального значения избегаем лишних переносов на границах блоков
if item == self.NEWLINE:
sdata = data[-1].rstrip(' ')
if not sdata.endswith('\n'):
data[-1] = sdata
data.append('\n')
continue
# Это может быть просто текст между элементами (добавляется ниже в format_item_tail)
if isinstance(item, text_types):
ntext = self.format_text(item, data)
if ntext:
data.append(ntext)
continue
# Это может быть кат в полном посте — тогда сразу закругляемся
if not with_cutted and is_cut(item):
cut_stop = True
break
format_item_tail(item, data, depth, full_queue)
children = []
# Обрабатываем элемент
tag = item.tag.lower().replace('-', '_')
if tag == 'br':
process_func = process_br
elif tag == 'a':
process_func = process_a
elif tag == 'span':
process_func = process_span
else:
process_func = getattr(self, 'process_' + tag, None)
if process_func is None:
process_func = default_process
children = process_func(item, data, depth, with_cutted=with_cutted)
if children is None:
# None означает, что надо закругляться
break
elif children:
# Обрабатываем потомков в следующей итерации
children.reverse()
full_queue.append(children)
depth.append(item)
return cut_stop, ''.join(data).strip()
def element_children(self, item, noblock=False):
children = []
is_block = not noblock and item.tag in self.block_elems
# Начало элемента-блока
if is_block:
children.append(self.NEWLINE)
# Содержимое элемента
if item.text:
children.append(self.escape(item.text))
ch = list(item.getchildren())
if ch:
children.extend(ch)
# Конец элемента-блока (есть смысл только при непустом блоке)
if is_block and (item.text or ch):
children.append(self.NEWLINE)
return children
def process_br(self, item, data, depth, with_cutted):
# Не более одной пустой строчки при нескольких <br/> подряд
if data[-1].rstrip(' ').endswith('\n\n'):
pass
elif len(data) > 1 and data[-1].rstrip(' ') == '\n' and data[-2].rstrip(' ').endswith('\n'):
pass
else:
data[-1] = data[-1].rstrip(' ')
data.append('\n')
return []
def default_process(self, item, data, depth, with_cutted):
return self.element_children(item)
def format_item_tail(self, item, data, depth, full_queue):
# Текст после тега не относится к самому тегу и добавится после обработки потомков
if item.tail:
full_queue[-1].append(self.escape(item.tail))
def format_text(self, ntext, data=None):
# Форматирует текст подобно HTML — не более одного пробела подряд
if '\n' in ntext:
ntext = ntext.replace('\n', ' ')
if '\r' in ntext:
ntext = ntext.replace('\r', ' ')
if '\t' in ntext:
ntext = ntext.replace('\t', ' ')
if data and (data[-1].endswith('\n') or data[-1].endswith(' ')):
ntext = ntext.lstrip(' ')
elif ntext.startswith(' '):
ntext = ' ' + ntext.lstrip(' ')
if ntext.endswith(' '):
ntext = ntext.rstrip(' ') + ' '
if ' ' in ntext:
ntext = re.sub(r' +', ' ', ntext)
return ntext
def escape(self, ntext):
if not self.vk_links:
return ntext
return ntext.replace('@', '@').replace('(', '(').replace(')', ')').replace('*', '*').replace('[', '[').replace(']', ']')
def process_a(self, item, data, depth, with_cutted):
if self.fancy and item.get('title') == "Читать дальше":
# Кнопку ката пропускаем (её можно сделать фейковой, поэтому can_next = True)
return []
href = item.get('href')
if self.vk_links and href and 'vk.com/' in href:
# Заменяем ссылки на vk.com на ссылки в специальном формате ВК
g = re.match(r'^(https?:)?//([\.A-z0-9_-]+\.)?vk\.com/([\.A-z0-9_-]+)$', href)
path = g.groups()[2] if g else None
for stopword in ("wall", "photo", "page", "board", "video", "videos", "audio", "audios", "topic", "app", "album", "note"):
# ссылки вида «чтотоНОМЕР» специального формата о ВК не имеют, поэтому пропускаем их
if not path or (
len(path) > len(stopword) and
path.startswith(stopword) and
(path[len(stopword)].isdigit() or path[len(stopword)] == '-')
):
return self.element_children(item, noblock=True)
stop_cut, tmp = self.format_part(item, with_cutted)
if not tmp.strip():
return self.element_children(item, noblock=True)
data.append(' @' + path + ' (')
data.append(self.escape(tmp))
data.append(') ')
return None if stop_cut else []
stop_cut, tmp = self.format_part(item, with_cutted)
if self.disable_links and (
tmp == href or
href.find('://') < 7 and tmp == href[href.find('://') + 3:] or
href.startswith('//') and tmp == href[2:]
):
return None if stop_cut else []
return self.element_children(item, noblock=True)
def process_span(self, item, data, depth, with_cutted):
if not self.fancy:
return self.element_children(item, noblock=True)
if item.get('class') == 'spoiler-title':
# Заголовок спойлера опускаем всегда
return []
if item.get('class') == 'spoiler-body':
# Тело спойлера опускаем, только если оно фейковое
if depth[-1].tag != 'span' or depth[-1].get('class') != 'spoiler':
return []
ch = self.element_children(item, noblock=True)
if ch:
# Имитируем display: block
ch.insert(0, self.NEWLINE)
ch.append(self.NEWLINE)
else:
ch.append(self.NEWLINE)
return ch
return self.element_children(item, noblock=True)
def process_li(self, item, data, depth, with_cutted):
ch = [self.NEWLINE, '• ']
ch.extend(self.element_children(item, noblock=True))
ch.append(self.NEWLINE)
return ch
def process_blockquote(self, item, data, depth, with_cutted):
ch = [self.NEWLINE, '«']
ch.extend(self.element_children(item, noblock=True))
ch.append('»')
ch.append(self.NEWLINE)
return ch
def process_hr(self, item, data, depth, with_cutted):
return [self.NEWLINE, '=====', self.NEWLINE]
def process_s(self, item, data, depth, with_cutted):
if self.strike_mode == 'unicode':
# Без рекурсии никак :(
stop_cut, tmp = self.format_part(item, with_cutted)
tmp2 = []
for x in tmp:
tmp2.append(x)
tmp2.append('\u0336')
result = ''.join(tmp2)
data.append(result)
return None if stop_cut else []
elif self.strike_mode == 'html':
result = ['<s>']
result.extend(self.element_children(item, noblock=True))
result.append('</s>')
return result
else:
return self.element_children(item, noblock=True)
def htmlToString(node, with_cutted=True, fancy=True, vk_links=False, hr_lines=True, disable_links=False):
import warnings
warnings.warn('utils.htmlToString is deprecated; use utils.HTMLFormatter instead of it', FutureWarning, stacklevel=2)
if isinstance(node, text_types):
return text(node)
data = ""
newlines = 0
if node.text:
ndata = node.text.replace("\n", " ")
if newlines:
ndata = ndata.lstrip()
data += ndata
if ndata:
newlines = 0
prev_text = None
prev_after = None
for item in node.iterchildren():
if prev_text:
ndata = prev_text.replace("\n", " ")
if newlines:
ndata = ndata.lstrip()
data += ndata
if ndata:
newlines = 0
if prev_after:
ndata = prev_after.replace("\n", " ")
if newlines:
ndata = ndata.lstrip()
data += ndata
if ndata:
newlines = 0
if item.tail:
prev_after = item.tail
else:
prev_after = None
prev_text = item.text
if item.tag == "br":
if newlines < 2:
data += "\n"
newlines += 1
elif item.tag == "hr":
if hr_lines:
data += "\n=====\n"
else:
data += "\n"
newlines = 1
elif fancy and item.get('class') == 'spoiler-title':
prev_text = None
continue
elif fancy and item.tag == 'a' and item.get('title') == "Читать дальше":
prev_text = None
continue
elif not with_cutted and is_cut(item):
return data.strip()
elif item.tag in ("img",):
continue
elif vk_links and item.tag == "a" and item.get('href', '').find("://vk.com/") > 0 and item.text_content().strip():
href = item.get('href')
addr = href[href.find("com/") + 4:]
if addr and addr[-1] in (".", ")"):
addr = addr[:-1]
stop = False
for c in ("/", "?", "&", "(", ",", ")", "|"):
if c in addr:
stop = True
break
if stop:
data += item.text_content()
prev_text = None
continue
for typ in ("wall", "photo", "page", "video", "topic", "app", "album", "note"):
if addr.find(typ) == 0:
stop = True
break
if stop:
data += item.text_content()
prev_text = None
continue
ndata = item.text_content().replace("[", " ").replace("|", " ").replace("]", " ")
data += " [" + addr + "|" + ndata + "] "
prev_text = None
elif disable_links and item.tag == "a" and item.get('href', '').endswith(item.text_content().strip()) and abs(len(item.get('href', '')) - len(item.text_content().strip())) < 10:
prev_text = None
continue
else:
if item.tag in ("li", ):
data += "• "
elif data and item.tag in block_elems and not newlines:
data += "\n"
newlines = 1
if prev_text:
prev_text = None
tmp = htmlToString(item, with_cutted=with_cutted, fancy=fancy, vk_links=vk_links, hr_lines=hr_lines)
newlines = 0
if item.tag == "s": # зачёркивание
tmp1 = ""
for x in tmp:
tmp1 += x + '\u0336'
# tmp1 = "<s>" + tmp1 + "</s>"
elif item.tag == "blockquote": # цитата
tmp1 = " «" + tmp + "»\n"
newlines = 1
else:
tmp1 = tmp
data += tmp1
if not with_cutted:
for item2 in item.iterchildren():
if is_cut(item2):
return data.strip()
if item.tag in block_elems and not newlines:
data += "\n"
newlines = 1
if prev_text:
ndata = prev_text.replace("\n", " ")
if newlines:
ndata = ndata.lstrip()
data += ndata
if ndata:
newlines = 0
if prev_after:
ndata = prev_after.replace("\n", " ")
if newlines:
ndata = ndata.lstrip()
data += ndata
if ndata:
newlines = 0
return data.strip()
[документация]def node2string(node, encoding="utf-8"):
"""Переводит html-элемент в байтовую строку."""
return lxml.etree.tostring(node, method="html", encoding=encoding) # pylint: disable=no-member
[документация]def mon2num(s):
"""Переводит названия месяцев в числа, чтобы строку можно было скормить в strftime."""
for i in range(len(mons)):
s = s.replace(mons[i], text(i + 1))
return s
[документация]def is_cut(item):
"""Возвращает True, если тег похож на кат (на момент написания это
``<a id="cut"></a>`` (новый) или ``<a></a>`` (старый)).
"""
return item.tag == "a" and not item.get("href") and not item.text_content() and not item.getchildren()
[документация]def find_images(body, spoiler_title=True, no_other=False):
"""Ищет картинки в lxml-элементе и возвращает их список в виде
[[ссылки до ката], [ссылки после ката]].
:param bool spoiler_title: включать ли картинки с заголовков спойлеров
:param bool no_other: исключать ли всякий мусор. Фильтрация простейшая:
по наличию "smile" или "gif" в ссылке, также убираются табунские аватарки
и навигация АльтерБРЕДаций.
:rtype: [list, list]
"""
imgs = [[], []]
links = [[], []]
start = False
for item in body.iterchildren():
# FIXME: не работает, если кат внутри другого тега
if not start and is_cut(item):
start = True
continue
if item.tag == "img":
imgs[1 if start else 0].append(item)
else:
limgs = item.xpath('.//img')
if not limgs:
limgs = item.xpath('.//a')
imgs[1 if start else 0].extend(limgs)
for i in (0, 1):
tags = imgs[i]
if not tags:
continue
for img in tags:
src = img.get("src") or img.get("data-src")
if not src:
src = img.get("href")
if not src:
continue
if src[-4:].lower() not in ('jpeg', '.jpg', '.png'):
continue
if "<" in src:
continue
if no_other and (
".gif" in src.lower() or
"smile" in src.lower() or
("/avatar_" in src and '/images/' in src) or
src.endswith('1_Prev.png') or src.endswith('2_Clear.png') or
src.endswith('3_VK.png') or src.endswith('4_New.png') or
src.endswith('5_Next.png') # АБД
):
continue
if not spoiler_title and img.getparent() is not None and img.getparent().get("class") == "spoiler-title":
# Hint: если вы пишете пост и хотите, чтобы картика бралась даже из заголовка спойлера,
# достаточно лишь положить её внутрь какого-нибудь ещё тега, например <strong>.
continue
links[i].append(src)
if not links[0] and not links[1]:
videos = body.xpath('.//iframe')
for video in videos:
match = youtube_regex.search(video.get('src', ''))
if not match:
continue
if match.groups()[0]:
links[1].append('http://i4.ytimg.com/vi/%s/sddefault.jpg' % match.groups()[0])
return links
# copypasted from:
# https://github.com/ActiveState/code/tree/5cf284892339de263d760babf6a15a9675843d94/recipes/Python/146306_Http_client_POST_using
# and modified by andreymal
[документация]def encode_multipart_formdata(fields, files, boundary=None):
"""Возвращает кортеж (content_type, body), готовый для отправки HTTP POST--запроса.
Значения полей и файлов могут быть строками (закодируются в utf-8),
bytes или числами (будут преобразованы в строку).
:param fields: простые поля запроса
:type fields: коллекция кортежей (название, значение) или словарь
:param fields: файлы запроса (MIME-тип будет выбран по расширению)
:type fields: коллекция кортежей (название, имя файла, значение)
:param boundary: boundary (по умолчанию генерируется случайные)
:type boundary: строка или bytes
:rtype: (строка, bytes)
"""
if isinstance(fields, dict):
fields = fields.items()
if boundary is None:
boundary = b'----------' + md5((text(int(time.time())) + text(random.randrange(1000))).encode('utf-8')).hexdigest().encode('utf-8')
elif isinstance(boundary, text):
boundary = boundary.encode('utf-8')
L = []
for (key, value) in fields:
key = text(key).encode('utf-8')
if isinstance(value, text):
value = value.encode('utf-8')
elif isinstance(value, (int, float, complex)):
value = text(value).encode('utf-8')
elif not isinstance(value, binary):
raise ValueError('Value should be bytes, not %s' % type(value))
L.append(b'--' + boundary)
L.append(('Content-Disposition: form-data; name="%s"' % key.decode('utf-8')).encode('utf-8'))
L.append(b'')
L.append(value)
for (key, filename, value) in files:
key = text(key).encode('utf-8')
filename = text(filename).encode('utf-8')
if isinstance(value, text):
value = value.encode('utf-8')
elif not isinstance(value, binary):
raise ValueError('Value should be bytes, not %s' % type(value))
L.append(b'--' + boundary)
L.append((
'Content-Disposition: form-data; name="%s"; filename="%s"' % (key.decode('utf-8'), filename.decode('utf-8'))
).encode('utf-8'))
L.append(('Content-Type: %s' % get_content_type(filename.decode('utf-8'))).encode('utf-8'))
L.append(b'')
L.append(value)
L.append(b'--' + boundary + b'--')
L.append(b'')
body = b'\r\n'.join(L)
content_type = 'multipart/form-data; boundary=%s' % boundary.decode('utf-8')
return content_type, body
[документация]def get_content_type(filename):
"""return mimetypes.guess_type(filename)[0] or 'application/octet-stream'"""
return text(mimetypes.guess_type(filename)[0] or 'application/octet-stream')
[документация]def send_form(url, fields, files, timeout=None, headers=None):
"""Отправляет форму, пользуясь функцией :func:`~tabun_api.utils.encode_multipart_formdata`.
Значения полей и файлов могут быть строками (закодируются в utf-8),
bytes или числами (будут преобразованы в строку).
:param fields: простые поля запроса
:type fields: коллекция кортежей (название, значение)
:param fields: файлы запроса (MIME-тип будет выбран по расширению)
:type fields: коллекция кортежей (название, имя файла, значение)
:param float timeout: сколько ожидать ответа, не дождётся - кидается исключением urllib
:param headers: дополнительные HTTP-заголовки (повторяться не могут)
:type headers: кортежи из двух строк/bytes или словарь
:rtype: ``urllib.addinfourl`` / ``urllib.response.addinfourl``
"""
content_type, data = encode_multipart_formdata(fields, files)
if not isinstance(url, urequest.Request):
if PY2 and isinstance(url, text):
url = url.encode('utf-8')
url = urequest.Request(url)
if isinstance(headers, dict):
headers = headers.items()
if headers:
for header, value in headers:
if not isinstance(header, str): # py2 and py3
header = str(header)
if isinstance(value, text):
value = value.encode('utf-8')
url.add_header(header, value)
url.add_unredirected_header(str('Content-type'), content_type.encode('utf-8'))
url.data = data
if timeout is None:
return urequest.urlopen(url)
else:
return urequest.urlopen(url, timeout=timeout)
[документация]def find_substring(s, start, end, extend=False, with_start=True, with_end=True):
"""Возвращает подстроку, находящуюся между кусками строки ``start`` и ``end``,
или ``None``, если не нашлось.
При ``extend=True`` кусок строки end ищется с конца (``rfind``).
"""
f1 = s.find(start)
if f1 < 0:
return
f2 = (s.rfind if extend else s.find)(end, f1 + len(start))
if f2 < 0:
return
return s[f1 + (0 if with_start else len(start)):f2 + (len(end) if with_end else 0)]
[документация]def download(url, maxmem=20 * 1024 * 1024, timeout=5, waitout=15, headers=None):
"""Скачивает данные по ссылке. Имеет защиту от переполнения памяти
и слишком долгого ожидания, чтобы всякие боты тут не висли.
В случае чего кидает ``IOError``.
:param url: ссылка, которую скачать
:type url: строка
:param int maxmem: допустимый максимальный размер скачиваемых данных
:param float timeout: как долго можно ждать ответа
:param float waitout: как долго можно скачивать данные
(простенькая защита от Slow TCP DoS Attack — timeout тут не поможет)
:rtype: bytes
"""
# TODO: non-ASCII URL
from tabun_api import http_headers
url = text(url)
if url.startswith('//'):
url = 'http:' + url
req = urequest.Request(url.encode("utf-8") if PY2 else url)
request_headers = http_headers.copy()
if headers:
request_headers.update({k.title(): v for k, v in headers.items()})
for header, value in request_headers.items():
if not isinstance(header, str): # py2 and py3
header = str(header)
if isinstance(value, text):
value = value.encode('utf-8')
req.add_header(header, value)
resp = urequest.urlopen(req, timeout=timeout)
size = resp.headers.get('content-length')
if size and size.isdigit() and int(size) > maxmem:
raise IOError("Too big")
data = b''
start_dwnl = time.time()
while 1:
if len(data) > maxmem:
raise IOError("Too big")
tmp = resp.read(64 * 1024)
if not tmp:
break
data += tmp
if time.time() - start_dwnl >= waitout:
raise IOError("Too long")
resp.close()
return data
[документация]def find_good_image(urls, maxmem=20 * 1024 * 1024):
"""Ищет годную картинку из предложенного списка ссылок и возвращает ссылку
и скачанные данные картинки (файл, bytes).
Такой простенький фильтр смайликов и элементов оформления поста по размеру.
Требует PIL или Pillow.
Не грузит картинки размером больше maxmem байт, дабы не вылететь от
нехватки памяти.
"""
try:
from PIL import Image
except ImportError:
import Image
from io import BytesIO
good_image = None, None
for url in urls:
url = text(url)
if url.find('//dl.dropboxusercontent.com/') in (5, 6):
waitout = 60
elif url.find('//dl.dropbox.com/') in (5, 6):
waitout = 60
else:
waitout = 15
try:
data = download(url, maxmem, waitout=waitout)
except IOError:
continue
try:
img = Image.open(BytesIO(data))
except:
continue
if img.size[0] < 100 or img.size[1] < 100:
continue
good_image = url, data
break
return good_image
[документация]def generate_comments_tree(comms):
"""Строит дерево комментариев из словаря, возвращаемого функциями get_comments[_from].
Формат элемента: [(комментарий, элемент), (комментарий, элемент), ...]
Возвращает само такое дерево и список номеров комментариев-сирот
(по идее должен быть пустой, но мало ли).
:param comms: словарь комментариев
:type comms: {id: :func:`~tabun_api.Comment`}
:rtype: (list, list)
"""
tree_dict = {}
tree = []
orphans = []
for comment in sorted(comms.values(), key=lambda x: x.comment_id):
item = (comment, [])
tree_dict[comment.comment_id] = item
if not comment.parent_id:
tree.append(item)
continue
parent = tree_dict.get(comment.parent_id)
if not parent:
tree.append(item)
orphans.append(comment.comment_id)
else:
parent[1].append(item)
return tree, orphans
[документация]def parse_avatar_url(url):
"""Парсит ссылку на аватарку и возвращает id пользователя,
дату отправки, размер, расширение и какой-то номер с конца ссылки.
Если не удалось распарсить, то всё ``None``.
:param url: ссылка
:type url: строка
:rtype: (int, "YYYY-MM-DD", (int, int), строка, int или None)
"""
match = ava_regex.search(url)
if not match:
return None, None, None, None, None
g = match.groups()
user_id = int(g[3] + g[4] + g[5])
date = g[6] + "-" + g[7] + "-" + g[8]
size = (int(g[9]), int(g[10]))
ext = g[11]
num = int(g[13]) if g[13] is not None else None
return user_id, date, size, ext, num
def decode_cf_email(data):
key = int(data[0:2], 16)
use_bytes = not isinstance(data, text)
result = b''
for i in range(1, len(data) // 2):
b = int(data[i * 2:i * 2 + 2], 16) ^ key
if PY2:
result += chr(b)
else:
result += bytes([b])
return result if use_bytes else result.decode('utf-8')
def decode_cf_email_for_link(m):
left, data, right = m.groups()
mail = decode_cf_email(data)
if isinstance(data, text):
result = '<a {}href="mailto:{}"{}>'.format(left or '', mail, right or '')
else:
result = b''.join((
b'<a ', left or b'',
b'href="mailto:', mail,
b'"', right or b'',
b'>',
))
return result
[документация]def replace_cloudflare_emails(data):
"""Декодирует почты, которые зашифровал CloudFlare, в html-странице."""
is_text = isinstance(data, text)
# В текстах
r = cf_email if is_text else cf_email_b
result = r.sub(lambda x: decode_cf_email(x.groups()[1]), data)
# В ссылках
ra = cf_email_a if is_text else cf_email_a_b
result = ra.sub(decode_cf_email_for_link, result)
# Убираем <script>
rs = cf_email_s if is_text else cf_email_s_b
result = rs.sub('' if is_text else b'', result)
return result
[документация]def normalize_body(body=None, raw_body=None, cls='text'):
"""Кодирует lxml-элемент в исходник html или наоборот декодирует исходник в lxml-элемент."""
if body is not None and raw_body is None:
raw_body = lxml.etree.tostring(body, method="xml", encoding="utf-8") # pylint: disable=no-member
raw_body = raw_body.replace(b' ', b'\r').decode('utf-8')
raw_body = raw_body[raw_body.find(">") + 1:raw_body.rfind("</")] # <div class="text">body</div>
# Занимаемся подгонкой под оригинальный исходник
# Табун принудительно сводит несколько br подряд в <br/>\r\n<br/>, чем и пользуемся, обходя баг lxml
while '<br/><' in raw_body:
raw_body = raw_body.replace('<br/><', '<br/>\r\n<')
raw_body = raw_body.replace(' allowfullscreen=""/>', ' allowfullscreen></iframe>')
# это типа тег <cut>
raw_body = raw_body.replace('<a rel="nofollow"/>', '<a rel="nofollow"></a>', 1)
# однако полученный исходник всё равно не совпадает в точности с исходником на Табуне,
# например, из-за разного порядка атрибутов, " и битой вёрстки, так что осторожно
elif raw_body is not None and body is None:
body = parse_html_fragment(('<div class="%s">' % cls) + raw_body + '</div>')[0]
return body, text(raw_body) if raw_body is not None else None
[документация]def html_escape(s, single_quote=False):
"""Заменяет на html-сущности следующие символы:
``&``, ``<``, ``>``, ``"``.
При ``single_quote=True`` ещё и ``'`` на ``'``.
"""
table = [('&', '&'), ('<', '<'), ('>', '>'), ('"', '"')]
if single_quote:
table.append(("'", '''))
if isinstance(s, binary):
table = [(a.encode('ascii'), b.encode('ascii')) for a, b in table]
for a, b in table:
s = s.replace(a, b)
return s
[документация]def escape_topic_contents(data, may_be_short=False):
"""
Экранирует содержимое постов и личных сообщений для защиты от поехавшей
вёрстки и багов lxml.
"""
if not isinstance(data, binary):
# '\xa0'.strip() => ''
# b'\xa0'.strip() => b'\xa0' — придерживаюсь этого варианта
raise ValueError('data should be bytes')
f1 = 0
f2 = 0
last_end = 0
buf = []
while True:
# определяем границы тела очередного поста/сообщения
f1 = data.find(b'<div class="topic-content text">', last_end)
if f1 < 0:
break
f2 = data.find(b'<footer', f1)
if f2 < 0:
break
# В личном сообщении перед footer ещё идёт список участников
f3 = data.find(b'<div class="talk-search', f1, f2)
if f3 > -1:
f2 = f3
f2 = data.rfind(b'</div>', f1, f2)
if f2 < 0:
break
# старые топики-ссылки
if data.rfind(b'<div class="topic-url"', f1, f2) > 0:
f2 = data.rfind(b'</div>', f1, data.rfind(b'<div class="topic-url"', f1, f2))
if f2 < 0:
break
# топики-файлы
if data.rfind(b'<div class="download"', f1, f2) > 0:
f2 = data.rfind(b'</div>', f1, data.rfind(b'<div class="download"', f1, f2))
if f2 < 0:
break
# выясняем, есть кат или нет
body = data[data.find(b'>', f1) + 1:f2].strip()
short = None
if may_be_short:
fa = body.rfind('title="Читать дальше">'.encode('utf-8'))
if fa > 0:
fa2 = body.find(b'</a>', fa)
if fa2 > 0 and fa2 == len(body) - 4:
short = body[body.find(b">", fa) + 1:fa2].strip()
body = body[:body.rfind(b'<', 0, fa)].rstrip()
# выпиливаем header при его наличии
# TODO: перепроверить, встречается ли такое на новом Табуне
if body.startswith(b'<header'):
body = body[body.find(b'</header>') + 9:].lstrip()
# экранируем тело
body = html_escape(body)
# собираем страницу обратно
buf.extend((
data[last_end:f1],
('<div class="topic-content text" data-escaped="1" data-short="%s" data-short-text="%s">' % (
1 if short is not None else 0, short.decode('utf-8') if short is not None else ''
)).encode('utf-8'),
body,
b'</div>'
))
last_end = f2 + 6
buf.append(data[last_end:])
return b''.join(buf)
[документация]def escape_comment_contents(data):
"""Экранирует содержимое комментов."""
if not isinstance(data, binary):
raise ValueError('data should be bytes')
f1 = 0
f2 = 0
last_end = 0
buf = []
while True:
# определяем границы очередного коммента
sect_start = data.find(b'<section', last_end)
if sect_start < 0:
break
sect_end = data.find(b'</section>', sect_start)
if sect_end < 0:
break
prev_last_end = last_end
last_end = sect_end
if data.find(b'class="comment ', sect_start, sect_end) < 0 and data.find(b'class="comment"', sect_start, sect_end) < 0 and data.find(b'class="comment\n', sect_start, sect_end) < 0:
# не коммент
buf.append(data[prev_last_end:last_end])
continue
# Выделяем текст коммента
f1 = data.find(b'class="comment-content"', sect_start, sect_end)
if f1 >= 0:
f = data.find(b'<div class="text current">', f1, f1 + 1000)
if f < 0:
f1 = data.find(b'<div class="text">', f1, f1 + 1000)
else:
f1 = f
del f
if f1 < 0:
# Коммент без текста? (На новом Табуне таких быть вроде не должно)
buf.append(data[prev_last_end:last_end])
continue
# После текста коммента всегда идёт блок с информацией о комменте
# (даже для удалённых и скрытых комментариев, хотя тогда блок пустой)
f2 = data.rfind(b'<div class="comment-info', f1, sect_end)
# Но если его почему-то вдруг нет, то можно ориентироваться на окончание section
if f2 < 0:
f2 = sect_end
# Ищем </div>, который закрывает <div class="comment-content">
f2 = data.rfind(b'</div>', f1, f2)
if f2 >= 0:
# И от него ищем </div>, который закрывает <div class="text">
f2 = data.rfind(b'</div>', f1, f2)
if f2 < 0:
logger.warning('Cannot find </div></div> in escape_comment_contents! Please report to andreymal.')
buf.append(data[prev_last_end:last_end])
continue
# экранируем тело
body = data[data.find(b'>', f1, f2) + 1:f2].strip()
body = html_escape(body)
# собираем страницу обратно
buf.extend((
data[prev_last_end:f1],
b'<div class="text" data-escaped="1">',
body,
data[f2:last_end]
))
buf.append(data[last_end:])
return b''.join(buf)
[документация]def escape_blog_content(data):
"""Экранирует описание блога."""
if not isinstance(data, binary):
raise ValueError('data should be bytes')
f1 = 0
f2 = 0
# Ищем начало блока
div_begin = b'<div class="blog-description">'
f1 = data.find(b'<div class="blog-content text">')
if f1 >= 0:
f1 = data.find(div_begin, f1, f1 + 200)
# Ищем конец
if f1 >= 0:
f2 = data.find(b'<ul class="blog-info">', f1 + 1)
if f2 >= 0:
f2 = data.rfind(b'</div>', f1 + 1, f2)
if f1 < 0 or f2 < 0:
# Не нашли
return data
body = data[f1 + len(div_begin):f2].strip()
body = html_escape(body)
result = (
data[:f1],
b'<div class="blog-content text" data-escaped="1">',
body,
data[f2:]
)
return b''.join(result)
[документация]def escape_profile_content(data):
"""Экранирует содержимое блока «О себе» профиля."""
if not isinstance(data, binary):
raise ValueError('data should be bytes')
f1 = 0
f2 = 0
# Ищем начало блока «О себе»
div_begin = b'<div class="text">'
f1 = data.find(b'<div class="profile-info-about">')
if f1 >= 0:
f1 = data.find('<h3>О себе</h3>'.encode('utf-8'), f1, f1 + 1500)
if f1 >= 0:
f1 = data.find(div_begin, f1, f1 + 200)
# Ищем конец
if f1 >= 0:
f2 = data.find(b'</div>', f1 + 1)
if f1 < 0 or f2 < 0:
# Не нашли
return data
body = data[f1 + len(div_begin):f2].strip()
body = html_escape(body)
result = (
data[:f1],
b'<div class="text" data-escaped="1">',
body,
data[f2:]
)
return b''.join(result)
[документация]def parse_datetime(s, utc=True):
"""Парсит дату-время в формате ISO 8601 и возвращает объект datetime с часовым поясом.
При utc=True возвращает время в UTC (без привязки к часовому поясу для совместимости с Python 2),
иначе — что распарсилось.
"""
tm = iso8601.parse_date(s)
if not utc:
return tm
return (tm - tm.utcoffset()).replace(tzinfo=None)
[документация]def gen_user_agent():
"""Генерирует кусочек юзерагента с информацией о системе."""
# pylint: disable=E1101
context = {
'system': platform.system() or 'NA',
'machine': platform.machine() or 'NA',
'release': platform.release() or 'NA',
'pyi': platform.python_implementation() or 'Python',
'pyv': platform.python_version(),
'pyiv': platform.python_version(),
'urv': urequest.__version__,
}
if context['pyi'] == 'PyPy':
context['pyiv'] = '{}.{}.{}'.format(
sys.pypy_version_info.major,
sys.pypy_version_info.minor,
sys.pypy_version_info.micro,
)
if sys.pypy_version_info.releaselevel != 'final':
context['pyiv'] = context['pyiv'] + sys.pypy_version_info.releaselevel
return '({system} {machine} {release}) Python/{pyv} {pyi}/{pyiv} urllib/{urv}'.format(**context)
[документация]def get_cookies_dict(headers):
"""Возвращает словарь с печеньками, взятыми из HTTP-заголовков ``Set-Cookie``.
Ключи и значения — Unicode-строки (``unicode`` в Python 2, ``str`` в Python 3).
:param http.client.HTTPMessage headers: объект с HTTP-заголовками
:rtype: dict
"""
cook = BaseCookie()
if PY2:
cook.load(headers.get("set-cookie") or b'')
else:
for x in headers.get_all("set-cookie") or ():
cook.load(x)
result = {}
for k, v in cook.items():
v = v.value
if PY2:
k = k.decode('utf-8')
v = v.decode('utf-8')
result[k] = v
return result
[документация]def is_module_available(name):
"""Проверяет доступность модуля для импорта, при этом не импортируя его.
:param str name: имя модуля
:rtype: bool
"""
if PY2 and isinstance(name, text):
name = name.encode('utf-8')
elif not isinstance(name, text):
name = name.decode('utf-8')
if PY2:
import imp
path = None
try:
for x in name.split('.'):
if path is not None:
path = [path]
path = imp.find_module(x, path)[1]
return True
except ImportError:
return False
elif sys.version_info < (3, 4):
import importlib
return importlib.find_loader(name) is not None
else:
# >= 3.4
import importlib.util
return importlib.util.find_spec(name) is not None
[документация]def build_proxy_params(proxy=None):
"""Парсит URL с параметрами прокси-сервера и возвращает словарь
с аргументами для конструктора SocksiPyHandler. Если прокси пустой,
возвращает просто None.
:param str proxy: URL с параметрами прокси
:rtype: dict или None
"""
if not proxy:
return None
import socks
if PY2:
from urlparse import urlparse
else:
from urllib.parse import urlparse
proxy_info = urlparse(proxy)
proxy_args = {
'proxyaddr': proxy_info.hostname,
'proxyport': proxy_info.port,
# 'rdns': default,
'username': proxy_info.username,
'password': proxy_info.password,
}
if proxy_info.scheme == 'socks4':
proxy_args['proxytype'] = socks.PROXY_TYPE_SOCKS4
elif proxy_info.scheme == 'socks5':
proxy_args['proxytype'] = socks.PROXY_TYPE_SOCKS5
elif proxy_info.scheme == 'http':
proxy_args['proxytype'] = socks.PROXY_TYPE_HTTP
else:
raise ValueError('Unknown proxy protocol: {!r}'.format(proxy_info.scheme))
return proxy_args