PK y;QHtW W zhihu/author.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json from .common import * from .base import BaseZhihu class Author(BaseZhihu): """用户类,请使用``ZhihuClient.answer``方法构造对象.""" @class_common_init(re_author_url, True) def __init__(self, url, name=None, motto=None, follower_num=None, question_num=None, answer_num=None, upvote_num=None, thank_num=None, photo_url=None, session=None): """创建用户类实例. :param str url: 用户主页url,形如 http://www.zhihu.com/people/7sdream :param str name: 用户名字,可选 :param str motto: 用户简介,可选 :param int follower_num: 用户粉丝数,可选 :param int question_num: 用户提问数,可选 :param int answer_num: 用户答案数,可选 :param int upvote_num: 用户获得赞同数,可选 :param int thank_num: 用户获得感谢数,可选 :param str photo_url: 用户头像地址,可选 :param Session session: 使用的网络会话,为空则使用新会话。 :return: 用户对象 :rtype: Author """ self.url = url self._session = session self.card = None self._nav_list = None self._name = name self._motto = motto self._follower_num = follower_num self._question_num = question_num self._answer_num = answer_num self._upvote_num = upvote_num self._thank_num = thank_num self._photo_url = photo_url def _gen_soup(self, content): self.soup = BeautifulSoup(content) self._nav_list = self.soup.find( 'div', class_='profile-navbar').find_all('a') def _make_card(self): if self.card is None and self.url is not None: params = {'url_token': self.id} real_params = {'params': json.dumps(params)} r = self._session.get(Get_Profile_Card_URL, params=real_params) self.card = BeautifulSoup(r.content) @property def id(self): """获取用户id,就是网址最后那一部分. :return: 用户id :rtype: str """ return re.match(r'^.*/([^/]+)/$', self.url).group(1) \ if self.url is not None else '' @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_hash_id') def hash_id(self): """获取作者的内部hash id(用不到就忽视吧~) :return: 用户hash id :rtype: str """ div = self.soup.find('div', class_='zm-profile-header-op-btns') if div is not None: return div.button['data-id'] else: ga = self.soup.find('script', attrs={'data-name': 'ga_vars'}) return json.loads(ga.text)['user_hash'] @property @check_soup('_name', '_make_card') def name(self): """获取用户名字. :return: 用户名字 :rtype: str """ if self.url is None: return '匿名用户' if self.soup is not None: return self.soup.find('div', class_='title-section').span.text else: assert self.card is not None return self.card.find('span', class_='name').text @property @check_soup('_motto', '_make_card') def motto(self): """获取用户自我介绍,由于历史原因,我还是把这个属性叫做motto吧. :return: 用户自我介绍 :rtype: str """ if self.url is None: return '' else: if self.soup is not None: bar = self.soup.find( 'div', class_='title-section') if len(bar.contents) < 4: return '' else: return bar.contents[3].text else: assert self.card is not None motto = self.card.find('div', class_='tagline') return motto.text if motto is not None else '' @property @check_soup('_photo_url', '_make_card') def photo_url(self): """获取用户头像图片地址. :return: 用户头像url :rtype: str """ if self.url is not None: if self.soup is not None: img = self.soup.find('img', class_='Avatar Avatar--l')['src'] return img.replace('_l', '_r') else: assert(self.card is not None) return PROTOCOL + self.card.img['src'].replace('_xs', '_r') else: return 'http://pic1.zhimg.com/da8e974dc_r.jpg' @property @check_soup('_followee_num') def followee_num(self): """获取关注了多少人. :return: 关注的人数 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'div', class_='zm-profile-side-following').a.strong.text) return number @property @check_soup('_follower_num') def follower_num(self): """获取追随者数量,就是关注此人的人数. :return: 追随者数量 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'div', class_='zm-profile-side-following zg-clear').find_all( 'a')[1].strong.text) return number @property @check_soup('_upvote_num') def upvote_num(self): """获取收到的的赞同数量. :return: 收到的的赞同数量 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'span', class_='zm-profile-header-user-agree').strong.text) return number @property @check_soup('_thank_num') def thank_num(self): """获取收到的感谢数量. :return: 收到的感谢数量 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'span', class_='zm-profile-header-user-thanks').strong.text) return number @property @check_soup('_weibo_url') def weibo_url(self): """获取用户微博链接. :return: 微博链接地址,如没有则返回 ‘unknown’ :rtype: str """ if self.url is None: return None else: tmp = self.soup.find( 'a', class_='zm-profile-header-user-weibo') return tmp['href'] if tmp is not None else 'unknown' @property def business(self): """用户的行业. :return: 用户的行业,如没有则返回 ‘unknown’ :rtype: str """ return self._find_user_profile('business') @property def location(self): """用户的所在地. :return: 用户的所在地,如没有则返回 ‘unknown’ :rtype: str """ return self._find_user_profile('location') @property def education(self): """用户的教育状况. :return: 用户的教育状况,如没有则返回 ‘unknown’ :rtype: str """ return self._find_user_profile('education') def _find_user_profile(self, t): self._make_soup() if self.url is None: return 'unknown' else: res = self.soup.find( 'span', class_=t) if res and res.has_attr('title'): return res['title'] else: return 'unknown' @property @check_soup('_gender') def gender(self): """用户的性别. :return: 用户的性别(male/female/unknown) :rtype: str """ if self.url is None: return 'unknown' else: return 'female' \ if self.soup.find('i', class_='icon-profile-female') \ else 'male' @property @check_soup('_question_num') def question_num(self): """获取提问数量. :return: 提问数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[1].span.text) @property @check_soup('_answer_num') def answer_num(self): """获取答案数量. :return: 答案数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[2].span.text) @property @check_soup('_post_num') def post_num(self): """获取专栏文章数量. :return: 专栏文章数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[3].span.text) @property @check_soup('_collection_num') def collection_num(self): """获取收藏夹数量. :return: 收藏夹数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[4].span.text) @property @check_soup('_followed_column_num') def followed_column_num(self): """获取用户关注的专栏数 :return: 关注的专栏数 :rtype: int """ if self.url is not None: tag = self.soup.find('div', class_='zm-profile-side-columns') if tag is not None: return int(re_get_number.match( tag.parent.strong.text).group(1)) return 0 @property @check_soup('_followed_topic_num') def followed_topic_num(self): """获取用户关注的话题数 :return: 关注的话题数 :rtype: int """ if self.url is not None: tag = self.soup.find('div', class_='zm-profile-side-topics') if tag is not None: return int(re_get_number.match( tag.parent.strong.text).group(1)) return 0 @property def questions(self): """获取用户的所有问题. :return: 用户的所有问题,返回生成器. :rtype: Question.Iterable """ from .question import Question if self.url is None or self.question_num == 0: return for page_index in range(1, (self.question_num - 1) // 20 + 2): html = self._session.get( self.url + 'asks?page=' + str(page_index)).text soup = BeautifulSoup(html) question_links = soup.find_all('a', class_='question_link') question_datas = soup.find_all( 'div', class_='zm-profile-section-main') for link, data in zip(question_links, question_datas): url = Zhihu_URL + link['href'] title = link.text answer_num = int( re_get_number.match(data.div.contents[4]).group(1)) follower_num = int( re_get_number.match(data.div.contents[6]).group(1)) q = Question(url, title, follower_num, answer_num, session=self._session) yield q @property def answers(self): """获取用户的所有答案. :return: 用户所有答案,返回生成器. :rtype: Answer.Iterable """ from .question import Question from .answer import Answer if self.url is None or self.answer_num == 0: return for page_index in range(1, (self.answer_num - 1) // 20 + 2): html = self._session.get( self.url + 'answers?page=' + str(page_index)).text soup = BeautifulSoup(html) questions = soup.find_all('a', class_='question_link') upvotes = soup.find_all('a', class_='zm-item-vote-count') for q, upvote in zip(questions, upvotes): answer_url = Zhihu_URL + q['href'] question_url = Zhihu_URL + re_a2q.match(q['href']).group(1) question_title = q.text upvote = int(upvote['data-votecount']) question = Question(question_url, question_title, session=self._session) yield Answer(answer_url, question, self, upvote, session=self._session) @property def followers(self): """获取关注此用户的人. :return: 关注此用户的人,返回生成器 :rtype: Author.Iterable """ for x in self._follow_ee_ers('er'): yield x @property def followees(self): """获取用户关注的人. :return: 用户关注的人的,返回生成器 :rtype: Author.Iterable """ for x in self._follow_ee_ers('ee'): yield x def _follow_ee_ers(self, t): if self.url is None: return if t == 'er': request_url = Author_Get_More_Followers_URL else: request_url = Author_Get_More_Followees_URL self._make_card() if self.hash_id is None: self._make_soup() headers = dict(Default_Header) headers['Referer'] = self.url + 'follow' + t + 's' params = {"order_by": "created", "offset": 0, "hash_id": self.hash_id} data = {'_xsrf': self.xsrf, 'method': 'next', 'params': ''} gotten_date_num = 20 offset = 0 while gotten_date_num == 20: params['offset'] = offset data['params'] = json.dumps(params) res = self._session.post(request_url, data=data, headers=headers) json_data = res.json() gotten_date_num = len(json_data['msg']) offset += gotten_date_num for html in json_data['msg']: soup = BeautifulSoup(html) h2 = soup.find('h2') author_name = h2.a.text author_url = h2.a['href'] author_motto = soup.find('div', class_='zg-big-gray').text author_photo = PROTOCOL + soup.a.img['src'].replace('_m', '_r') numbers = [int(re_get_number.match(x.text).group(1)) for x in soup.find_all('a', target='_blank')] try: yield Author(author_url, author_name, author_motto, *numbers, photo_url=author_photo, session=self._session) except ValueError: # invalid url yield ANONYMOUS @property def collections(self): """获取用户收藏夹. :return: 用户收藏夹,返回生成器 :rtype: Collection.Iterable """ from .collection import Collection if self.url is None or self.collection_num == 0: return else: collection_num = self.collection_num for page_index in range(1, (collection_num - 1) // 20 + 2): html = self._session.get( self.url + 'collections?page=' + str(page_index)).text soup = BeautifulSoup(html) collections_names = soup.find_all( 'a', class_='zm-profile-fav-item-title') collection_follower_nums = soup.find_all( 'div', class_='zm-profile-fav-bio') for c, f in zip(collections_names, collection_follower_nums): c_url = Zhihu_URL + c['href'] c_name = c.text c_fn = int(re_get_number.match(f.contents[2]).group(1)) yield Collection(c_url, self, c_name, c_fn, session=self._session) @property def columns(self): """获取用户专栏. :return: 用户专栏,返回生成器 :rtype: Column.Iterable """ from .column import Column if self.url is None or self.post_num == 0: return soup = BeautifulSoup(self._session.get(self.url + 'posts').text) column_tags = soup.find_all('div', class_='column') for column_tag in column_tags: name = column_tag.div.a.span.text url = column_tag.div.a['href'] follower_num = int(re_get_number.match( column_tag.div.div.a.text).group(1)) footer = column_tag.find('div', class_='footer') if footer is None: post_num = 0 else: post_num = int( re_get_number.match(footer.a.text).group(1)) yield Column(url, name, follower_num, post_num, session=self._session) @property def followed_columns(self): """获取用户关注的专栏. :return: 用户关注的专栏,返回生成器 :rtype: Column.Iterable """ from .column import Column if self.url is None: return if self.followed_column_num > 0: tag = self.soup.find('div', class_='zm-profile-side-columns') if tag is not None: for a in tag.find_all('a'): yield Column(a['href'], a.img['alt'], session=self._session) if self.followed_column_num > 7: offset = 7 gotten_data_num = 20 while gotten_data_num == 20: params = { 'hash_id': self.hash_id, 'limit': 20, 'offset': offset } data = { 'method': 'next', '_xsrf': self.xsrf, 'params': json.dumps(params) } j = self._session.post(Author_Get_More_Follow_Column_URL, data=data).json() gotten_data_num = len(j['msg']) offset += gotten_data_num for msg in map(BeautifulSoup, j['msg']): name = msg.strong.text url = msg.a['href'] post_num = int(re_get_number.match( msg.span.text).group(1)) yield Column(url, name, post_num=post_num, session=self._session) @property def followed_topics(self): """获取用户关注的话题. :return: 用户关注的话题,返回生成器 :rtype: Topic.Iterable """ from .topic import Topic if self.url is None: return if self.followed_topic_num > 0: tag = self.soup.find('div', class_='zm-profile-side-topics') if tag is not None: for a in tag.find_all('a'): yield Topic(Zhihu_URL + a['href'], a.img['alt'], session=self._session) if self.followed_topic_num > 7: offset = 7 gotten_data_num = 20 while gotten_data_num == 20: data = {'start': 0, 'offset': offset, '_xsrf': self.xsrf} j = self._session.post( Author_Get_More_Follow_Topic_URL.format(self.id), data=data).json() gotten_data_num = j['msg'][0] offset += gotten_data_num topic_item = BeautifulSoup(j['msg'][1]).find_all( 'div', class_='zm-profile-section-item') for div in topic_item: name = div.strong.text url = Zhihu_URL + div.a['href'] yield Topic(url, name, session=self._session) @property def activities(self): """获取用户的最近动态. :return: 最近动态,返回生成器,具体说明见 :class:`.Activity` :rtype: Activity.Iterable """ from .activity import Activity if self.url is None: return gotten_feed_num = 20 start = '0' api_url = self.url + 'activities' while gotten_feed_num == 20: data = {'_xsrf': self.xsrf, 'start': start} res = self._session.post(api_url, data=data) gotten_feed_num = res.json()['msg'][0] soup = BeautifulSoup(res.json()['msg'][1]) acts = soup.find_all( 'div', class_='zm-profile-section-item zm-item clearfix') start = acts[-1]['data-time'] if len(acts) > 0 else 0 for act in acts: # --- ignore Round Table temporarily --- if act.attrs['data-type-detail'] == "member_follow_roundtable": continue # --- --- --- --- -- --- --- --- --- --- yield Activity(act, self._session, self) @property def last_activity_time(self): """获取用户最后一次活动的时间 :return: 用户最后一次活动的时间,返回值为 unix 时间戳 :rtype: int """ self._make_soup() act = self.soup.find( 'div', class_='zm-profile-section-item zm-item clearfix') return int(act['data-time']) if act is not None else -1 def is_zero_user(self): """返回当前用户是否为三零用户,其实是四零: 赞同0,感谢0,提问0,回答0. :return: 是否是三零用户 :rtype: bool """ return self.upvote_num + self.thank_num + \ self.question_num + self.answer_num == 0 class _Anonymous: def __init__(self): self.name = "匿名用户" self.url = '' ANONYMOUS = _Anonymous() """匿名用户常量,通过 ``zhihu.ANONYMOUS`` 访问。 提问者、回答者、点赞者、问题关注者、评论者都可能是 ``ANONYMOUS`` """ PK ,\H zhihu/__init__.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- __author__ = '7sDream' __version__ = '0.3.12' from .client import ZhihuClient from .question import Question from .author import Author, ANONYMOUS from .activity import Activity from .acttype import ActType from .answer import Answer from .collection import Collection from .column import Column from .post import Post from .topic import Topic __all__ = ['ZhihuClient', 'Question', 'Author', 'ActType', 'Activity', 'Answer', 'Collection', 'Column', 'Post', 'Topic', 'ANONYMOUS'] PK y;QHrBz z zhihu/activity.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from datetime import datetime from .common import * from .acttype import ActType from .question import Question from .answer import Answer from .column import Column from .post import Post from .topic import Topic from .author import Author, ANONYMOUS from .collection import Collection class Activity: """用户动态类,请使用Author.activities获取.""" def __init__(self, act, session, author): """创建用户动态类实例. :param bs4.element.Tag act: 表示用户动态的页面元素 :param Session session: 使用的网络会话 :param Author author: Activity 所属的用户对象 :return: 用户动态对象 :rtype: Activity :说明: 根据Activity.type不同可以获取不同属性,具体请看 :class:`.ActType` """ self._session = session self._author = author self._type = ActType.from_str(act.attrs['data-type-detail']) useless_tag = act.div.find('a', class_='zg-link') if useless_tag is not None: useless_tag.extract() attribute = self._get_assemble_method(self.type)(act) self._attr = attribute.__class__.__name__.lower() setattr(self, self._attr, attribute) self._time = datetime.fromtimestamp(int(act['data-time'])) @property def type(self): """ :return: 用户动态类型, 具体参见 :class:`.ActType` :rtype: class:`.ActType` """ return self._type @property def content(self): """获取此对象中能提供的那个属性,对应表请查看 :class:`.ActType` 类. :return: 对象提供的对象 :rtype: Author or Question or Answer or Topic or Column or Post """ return getattr(self, self._attr) @property def time(self): """ :return: 返回用户执行 Activity 操作的时间 :rtype: datetime.datetime """ return self._time def __find_post(self, act): column_url = act.find('a', class_='column_link')['href'] column_name = act.find('a', class_='column_link').text column = Column(column_url, column_name, session=self._session) try: author_tag = act.find('div', class_='author-info') author_url = Zhihu_URL + author_tag.a['href'] author_info = list(author_tag.stripped_strings) author_name = author_info[0] author_motto = author_info[1] \ if len(author_info) > 1 else '' author = Author(author_url, author_name, author_motto, session=self._session) except TypeError: author = ANONYMOUS post_url = act.find('a', class_='post-link')['href'] post_title = act.find('a', class_='post-link').text post_comment_num, post_upvote_num = self._parse_un_cn(act) return Post(post_url, column, author, post_title, post_upvote_num, post_comment_num, session=self._session) def _assemble_create_post(self, act): return self.__find_post(act) def _assemble_voteup_post(self, act): return self.__find_post(act) def _assemble_follow_column(self, act): return Column(act.div.a['href'], act.div.a.text, session=self._session) def _assemble_follow_topic(self, act): topic_url = Zhihu_URL + act.div.a['href'] topic_name = act.div.a['title'] return Topic(topic_url, topic_name, session=self._session) def _assemble_answer_question(self, act): question_url = Zhihu_URL + re_a2q.match(act.div.find_all('a')[-1]['href']).group(1) question_title = act.div.find_all('a')[-1].text question = Question(question_url, question_title, session=self._session) answer_url = Zhihu_URL + act.div.find_all('a')[-1]['href'] answer_comment_num, answer_upvote_num = self._parse_un_cn(act) return Answer(answer_url, question, self._author, answer_upvote_num, session=self._session) def _assemble_voteup_answer(self, act): question_url = Zhihu_URL + re_a2q.match(act.div.a['href']).group(1) question_title = act.div.a.text question = Question(question_url, question_title, session=self._session) try_find_author = act.find_all('a', class_='author-link', href=re.compile('^/people/[^/]*$')) if len(try_find_author) == 0: author_url = None author_name = '匿名用户' author_motto = '' photo_url = None else: try_find_author = try_find_author[-1] author_url = Zhihu_URL + try_find_author['href'] author_name = try_find_author.text try_find_motto = try_find_author.parent.span if try_find_motto is None: author_motto = '' else: author_motto = try_find_motto['title'] photo_url = PROTOCOL + try_find_author.parent.a.img[ 'src'].replace('_s', '_r') author = Author(author_url, author_name, author_motto, photo_url=photo_url, session=self._session) answer_url = Zhihu_URL + act.div.a['href'] answer_comment_num, answer_upvote_num = self._parse_un_cn(act) return Answer(answer_url, question, author, answer_upvote_num, session=self._session) def _assemble_ask_question(self, act): return Question(Zhihu_URL + act.div.contents[3]['href'], list(act.div.children)[3].text, session=self._session) def _assemble_follow_question(self, act): return Question(Zhihu_URL + act.div.a['href'], act.div.a.text, session=self._session) def _assemble_follow_collection(self, act): url = act.div.a['href'] if not url.startswith('http'): url = Zhihu_URL + url return Collection(url, session=self._session) def _get_assemble_method(self, act_type): assemble_methods = { ActType.UPVOTE_POST: self._assemble_voteup_post, ActType.FOLLOW_COLUMN: self._assemble_follow_column, ActType.UPVOTE_ANSWER: self._assemble_voteup_answer, ActType.ANSWER_QUESTION: self._assemble_answer_question, ActType.ASK_QUESTION: self._assemble_ask_question, ActType.FOLLOW_QUESTION: self._assemble_follow_question, ActType.FOLLOW_TOPIC: self._assemble_follow_topic, ActType.PUBLISH_POST: self._assemble_create_post, ActType.FOLLOW_COLLECTION: self._assemble_follow_collection } if act_type in assemble_methods: return assemble_methods[act_type] else: raise ValueError('invalid activity type') @staticmethod def _parse_un_cn(act): upvote_num = int(act.find('a', class_='zm-item-vote-count')['data-votecount']) comment = act.find('a', class_='toggle-comment') comment_text = next(comment.stripped_strings) comment_num_match = re_get_number.match(comment_text) comment_num = int(comment_num_match.group(1)) if comment_num_match is not None else 0 return comment_num, upvote_num PK y;QH*a zhihu/acttype.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import enum match = { 'ANSWER_QUESTION': 'member_answer_question', 'UPVOTE_ANSWER': 'member_voteup_answer', 'ASK_QUESTION': 'member_ask_question', 'FOLLOW_QUESTION': 'member_follow_question', 'UPVOTE_POST': 'member_voteup_article', 'FOLLOW_COLUMN': 'member_follow_column', 'FOLLOW_TOPIC': 'member_follow_topic', 'PUBLISH_POST': 'member_create_article', 'FOLLOW_COLLECTION': 'member_follow_favlist' } reverse_match = {v: k for k, v in match.items()} class ActType(enum.Enum): """用于表示用户动态的类型. :常量说明: ================= ================ ============ ===================== 常量名 说明 提供属性 属性类型 ================= ================ ============ ===================== ANSWER_QUESTION 回答了一个问题 answer :class:`.Answer` UPVOTE_ANSWER 赞同了一个回答 answer :class:`.Answer` ASK_QUESTION 提出了一个问题 question :class:`.Question` FOLLOW_QUESTION 关注了一个问题 question :class:`.Question` UPVOTE_POST 赞同了一篇文章 post :class:`.Post` FOLLOW_COLUMN 关注了一个专栏 column :class:`.Column` FOLLOW_TOPIC 关注了一个话题 topic :class:`.Topic` PUBLISH_POST 发表了一篇文章 post :class:`.Post` FOLLOW_COLLECTION 关注了一个收藏夹 collection :class:`.Collection` ================= ================ ============ ===================== """ ANSWER_QUESTION = 1 UPVOTE_ANSWER = 2 ASK_QUESTION = 4 FOLLOW_QUESTION = 8 UPVOTE_POST = 16 FOLLOW_COLUMN = 32 FOLLOW_TOPIC = 64 PUBLISH_POST = 128 FOLLOW_COLLECTION = 256 @classmethod def from_str(cls, div_class): return cls.__getattr__(reverse_match[div_class]) def __str__(self): return match[self.name] class CollectActType(enum.Enum): """用于表示收藏夹操作的类型. :常量说明: ================= ============== 常量名 说明 ================= ============== INSERT_ANSWER 在收藏夹中增加一个回答 DELETE_ANSWER 在收藏夹中删除一个回答 CREATE_COLLECTION 创建收藏夹 ================= ============== """ INSERT_ANSWER = 1 DELETE_ANSWER = 2 CREATE_COLLECTION = 3 PK y;QH%* * zhihu/collection.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from .common import * from .base import BaseZhihu class Collection(BaseZhihu): """收藏夹,请使用``ZhihuClient.collection``方法构造对象.""" @class_common_init(re_collection_url) def __init__(self, url, owner=None, name=None, follower_num=None, session=None): """创建收藏夹类实例. :param str url: 收藏夹主页url,必须 :param Author owner: 收藏夹拥有者,可选 :param str name: 收藏夹标题,可选 :param int follower_num: 收藏夹关注人数,可选 :param Session session: 使用的网络会话,为空则使用新会话。 :return: 收藏夹对象 :rtype: Collection """ self.url = url self._session = session self.soup = None self._name = name self._owner = owner self._follower_num = follower_num self._id = int(re.match(r'.*/(\d+)', self.url).group(1)) @property def id(self): """获取收藏夹id(网址最后的部分). :return: 收藏夹id :rtype: int """ return self._id @property @check_soup('_cid') def cid(self): """获取收藏夹内部Id(用不到忽视就好) :return: 内部Id :rtype: int """ return int(re_get_number.match( self.soup.find('a', attrs={'name': 'focus'})['id']).group(1)) @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find( 'input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_name') def name(self): """获取收藏夹名字. :return: 收藏夹名字 :rtype: str """ return re_del_empty_line.match( self.soup.find('h2', id='zh-fav-head-title').text).group(1) @property @check_soup('_owner') def owner(self): """获取收藏夹拥有者,返回Author对象. :return: 收藏夹拥有者 :rtype: Author """ from .author import Author a = self.soup.find('h2', class_='zm-list-content-title').a name = a.text url = Zhihu_URL + a['href'] motto = self.soup.find( 'div', id='zh-single-answer-author-info').div.text photo_url = PROTOCOL + self.soup.find( 'img', class_='zm-list-avatar-medium')['src'].replace('_m', '_r') return Author(url, name, motto, photo_url=photo_url, session=self._session) @property @check_soup('_follower_num') def follower_num(self): """获取关注此收藏夹的人数. :return: 关注此收藏夹的人数 :rtype: int """ href = re_collection_url_split.match(self.url).group(1) return int(self.soup.find('a', href=href + 'followers').text) @property def followers(self): """获取关注此收藏夹的用户 :return: 关注此收藏夹的用户 :rtype: Author.Iterable """ self._make_soup() followers_url = self.url + 'followers' for x in common_follower(followers_url, self.xsrf, self._session): yield x @property def questions(self): """获取收藏夹内所有问题对象. :return: 收藏夹内所有问题,返回生成器 :rtype: Question.Iterable """ self._make_soup() # noinspection PyTypeChecker for question in self._page_get_questions(self.soup): yield question i = 2 while True: soup = BeautifulSoup(self._session.get( self.url[:-1] + '?page=' + str(i)).text) for question in self._page_get_questions(soup): if question == 0: return yield question i += 1 @property def answers(self): """获取收藏夹内所有答案对象. :return: 收藏夹内所有答案,返回生成器 :rtype: Answer.Iterable """ self._make_soup() # noinspection PyTypeChecker for answer in self._page_get_answers(self.soup): yield answer i = 2 while True: soup = BeautifulSoup(self._session.get( self.url[:-1] + '?page=' + str(i)).text) for answer in self._page_get_answers(soup): if answer == 0: return yield answer i += 1 @property def logs(self): """获取收藏夹日志 :return: 收藏夹日志中的操作,返回生成器 :rtype: CollectActivity.Iterable """ import time from datetime import datetime from .answer import Answer from .acttype import CollectActType self._make_soup() gotten_feed_num = 20 offset = 0 data = { 'start': 0, '_xsrf': self.xsrf } api_url = self.url + 'log' while gotten_feed_num == 20: data['offset'] = offset res = self._session.post(url=api_url, data=data) gotten_feed_num = res.json()['msg'][0] soup = BeautifulSoup(res.json()['msg'][1]) offset += gotten_feed_num zm_items = soup.find_all('div', class_='zm-item') for zm_item in zm_items: act_time = datetime.strptime(zm_item.find('time').text, "%Y-%m-%d %H:%M:%S") if zm_item.find('ins'): try: answer = Answer(Zhihu_URL + zm_item.find('ins').a['href'], session=self._session) type = CollectActType.INSERT_ANSWER yield CollectActivity(type, act_time, self.owner, self, answer) except ValueError: type = CollectActType.CREATE_COLLECTION yield CollectActivity(type, act_time, self.owner, self) elif zm_item.find('del'): type = CollectActType.DELETE_ANSWER answer = Answer(Zhihu_URL + zm_item.find('del').a['href'], session=self._session) yield CollectActivity(type, act_time, self.owner, self, answer) else: continue data['start'] = zm_items[-1]['id'][8:] time.sleep(0.5) def _page_get_questions(self, soup): from .question import Question question_tags = soup.find_all("div", class_="zm-item") if len(question_tags) == 0: yield 0 return else: for question_tag in question_tags: if question_tag.h2 is not None: question_title = question_tag.h2.a.text question_url = Zhihu_URL + question_tag.h2.a['href'] yield Question(question_url, question_title, session=self._session) def _page_get_answers(self, soup): from .question import Question from .author import Author, ANONYMOUS from .answer import Answer answer_tags = soup.find_all("div", class_="zm-item") if len(answer_tags) == 0: yield 0 return else: question = None for tag in answer_tags: # 判断是否是'建议修改的回答'等情况 url_tag = tag.find('a', class_='answer-date-link') if url_tag is None: reason = tag.find('div', id='answer-status').p.text print("pass a answer, reason %s ." % reason) continue if tag.h2 is not None: question_title = tag.h2.a.text question_url = Zhihu_URL + tag.h2.a['href'] question = Question(question_url, question_title, session=self._session) answer_url = Zhihu_URL + url_tag['href'] div = tag.find('div', class_='zm-item-answer-author-info') author_link = div.find('a', class_='author-link') if author_link is not None: author_url = Zhihu_URL + author_link['href'] author_name = author_link.text motto_span = div.find('span', class_='bio') author_motto = motto_span['title'] if motto_span else '' author = Author(author_url, author_name, author_motto, session=self._session) else: author = ANONYMOUS upvote = int(tag.find( 'a', class_='zm-item-vote-count')['data-votecount']) answer = Answer(answer_url, question, author, upvote, session=self._session) yield answer class CollectActivity: """收藏夹操作, 请使用``Collection.logs``构造对象.""" def __init__(self, type, time, owner, collection, answer=None): """创建收藏夹操作类实例 :param acttype.CollectActType type: 操作类型 :param datetime.datetime time: 进行操作的时间 :param Author owner: 收藏夹的拥有者 :param Collection collection: 所属收藏夹 :param Answer answer: 收藏的答案,可选 :return: CollectActivity """ self._type = type self._time = time self._owner = owner self._collection = collection self._answer = answer @property def type(self): """ :return: 收藏夹操作类型, 具体参见 :class:`.CollectActType` :rtype: :class:`.CollectActType` """ return self._type @property def answer(self): """ :return: 添加或删除收藏的答案, 若是创建收藏夹操作返回 None :rtype: Answer or None """ return self._answer @property def time(self): """ :return: 进行操作的时间 :rtype: datetime.datetime """ return self._time @property def owner(self): """ :return: 收藏夹的拥有者 :rtype: Author """ return self._owner @property def collection(self): """ :return: 所属收藏夹 :rtype: Collection """ return self._collection PK y;QH99@ @ zhihu/column.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from .common import * from .base import BaseZhihu, JsonAsSoupMixin class Column(JsonAsSoupMixin, BaseZhihu): """专栏类,请使用``ZhihuClient.column``方法构造对象.""" @class_common_init(re_column_url) def __init__(self, url, name=None, follower_num=None, post_num=None, session=None): """创建专栏类实例. :param str url: 专栏url :param str name: 专栏名,可选 :param int follower_num: 关注者数量,可选 :param int post_num: 文章数量,可选 :param Session session: 使用的网络会话,为空则使用新会话。 :return: 专栏对象 :rtype: Column """ self._in_name = re_column_url.match(url).group(1) self.url = url self._session = session self._name = name self._follower_num = follower_num self._post_num = post_num def _make_soup(self): if self.soup is None: json = self._get_content() self._gen_soup(json) def _get_content(self): origin_host = self._session.headers.get('Host') self._session.headers.update(Host='zhuanlan.zhihu.com') res = self._session.get(Column_Data.format(self._in_name)) self._session.headers.update(Host=origin_host) return res.json() @property @check_soup('_name') def name(self): """获取专栏名称. :return: 专栏名称 :rtype: str """ return self.soup['name'] @property @check_soup('_follower_num') def follower_num(self): """获取关注人数. :return: 关注人数 :rtype: int """ return int(self.soup['followersCount']) @property @check_soup('_post_num') def post_num(self): """获取专栏文章数. :return: 专栏文章数 :rtype: int """ return int(self.soup['postsCount']) @property def posts(self): """获取专栏的所有文章. :return: 专栏所有文章,返回生成器 :rtype: Post.Iterable """ origin_host = self._session.headers.get('Host') for offset in range(0, (self.post_num - 1) // 10 + 1): self._session.headers.update(Host='zhuanlan.zhihu.com') res = self._session.get( Column_Posts_Data.format(self._in_name, offset * 10)) soup = res.json() self._session.headers.update(Host=origin_host) for post in soup: yield self._parse_post_data(post) def _parse_post_data(self, post): from .author import Author from .post import Post url = Column_Url + post['url'] template = post['author']['avatar']['template'] photo_id = post['author']['avatar']['id'] photo_url = template.format(id=photo_id, size='r') author = Author(post['author']['profileUrl'], post['author']['name'], post['author']['bio'], photo_url=photo_url, session=self._session) title = post['title'] upvote_num = post['likesCount'] comment_num = post['commentsCount'] print(url) return Post(url, self, author, title, upvote_num, comment_num, session=self._session) PK aSXH; ; zhihu/question.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import time from datetime import datetime from .common import * from .base import BaseZhihu class Question(BaseZhihu): """问题类,请使用``ZhihuClient.question``方法构造对象.""" @class_common_init(re_question_url, trailing_slash=False) def __init__(self, url, title=None, followers_num=None, answer_num=None, creation_time=None, author=None, session=None): """创建问题类实例. :param str url: 问题url. 现在支持两种 url 1. https://www.zhihu.com/question/qid 2. https://www.zhihu.com/question/qid?sort=created 区别在于,使用第一种,调用 ``question.answers`` 的时候会按投票排序返回答案; 使用第二种, 会按时间排序返回答案, 后提交的答案先返回 :param str title: 问题标题,可选, :param int followers_num: 问题关注人数,可选 :param int answer_num: 问题答案数,可选 :param datetime.datetime creation_time: 问题创建时间,可选 :param Author author: 提问者,可选 :return: 问题对象 :rtype: Question """ self._session = session self._url = url self._title = title self._answer_num = answer_num self._followers_num = followers_num self._id = int(re.match(r'.*/(\d+)', self.url).group(1)) self._author = author self._creation_time = creation_time self._logs = None self._deleted = None @property def url(self): # always return url like https://www.zhihu.com/question/1234/ url = re.match(re_question_url_std, self._url).group() return url if url.endswith('/') else url + '/' @property def id(self): """获取问题id(网址最后的部分). :return: 问题id :rtype: int """ return self._id @property @check_soup('_qid') def qid(self): """获取问题内部id(用不到就忽视吧) :return: 问题内部id :rtype: int """ return int(self.soup.find( 'div', id='zh-question-detail')['data-resourceid']) @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_html') def html(self): """获取页面源码. :return: 页面源码 :rtype: str """ return self.soup.prettify() @property @check_soup('_title') def title(self): """获取问题标题. :return: 问题标题 :rtype: str """ return self.soup.find('h2', class_='zm-item-title') \ .text.replace('\n', '') @property @check_soup('_details') def details(self): """获取问题详细描述,目前实现方法只是直接获取文本,效果不满意……等更新. :return: 问题详细描述 :rtype: str """ return self.soup.find("div", id="zh-question-detail").div.text @property @check_soup('_answer_num') def answer_num(self): """获取问题答案数量. :return: 问题答案数量 :rtype: int """ answer_num_block = self.soup.find('h3', id='zh-question-answer-num') # 当0人回答或1回答时,都会找不到 answer_num_block, # 通过找答案的赞同数block来判断到底有没有答案。 # (感谢知乎用户 段晓晨 提出此问题) if answer_num_block is None: if self.soup.find('span', class_='count') is not None: return 1 else: return 0 return int(answer_num_block['data-num']) @property @check_soup('_follower_num') def follower_num(self): """获取问题关注人数. :return: 问题关注人数 :rtype: int """ follower_num_block = self.soup.find('div', class_='zg-gray-normal') # 无人关注时 找不到对应block,直接返回0 (感谢知乎用户 段晓晨 提出此问题) if follower_num_block.strong is None: return 0 return int(follower_num_block.strong.text) @property @check_soup('_topics') def topics(self): """获取问题所属话题. :return: 问题所属话题 :rtype: list(str) """ topics_list = [] for topic in self.soup.find_all('a', class_='zm-item-tag'): topics_list.append(topic.text.replace('\n', '')) return topics_list @property def followers(self): """获取关注此问题的用户 :return: 关注此问题的用户 :rtype: Author.Iterable :问题: 要注意若执行过程中另外有人关注,可能造成重复获取到某些用户 """ self._make_soup() followers_url = self.url + 'followers' for x in common_follower(followers_url, self.xsrf, self._session): yield x @property def answers(self): """获取问题的所有答案. :return: 问题的所有答案,返回生成器 :rtype: Answer.Iterable """ from .author import Author from .answer import Answer self._make_soup() # TODO: 统一逻辑. 完全可以都用 _parse_answer_html 的逻辑替换 if self._url.endswith('sort=created'): pager = self.soup.find('div', class_='zm-invite-pager') if pager is None: max_page = 1 else: max_page = int(pager.find_all('span')[-2].a.text) for page in range(1, max_page + 1): if page == 1: soup = self.soup else: url = self._url + '&page=%d' % page soup = BeautifulSoup(self._session.get(url).content) error_answers = soup.find_all('div', id='answer-status') for each in error_answers: each['class'] = 'zm-editable-content' answers_wrap = soup.find('div', id='zh-question-answer-wrap') # 正式处理 authors = answers_wrap.find_all( 'div', class_='zm-item-answer-author-info') urls = answers_wrap.find_all('a', class_='answer-date-link') upvote_nums = answers_wrap.find_all('div', class_='zm-item-vote-info') contents = answers_wrap.find_all( 'div', class_='zm-editable-content') assert len(authors) == len(urls) == len(upvote_nums) == len(contents) for author, url, upvote_num, content in \ zip(authors, urls, upvote_nums, contents): a_url, name, motto, photo = parser_author_from_tag(author) author_obj = Author(a_url, name, motto, photo_url=photo, session=self._session) url = Zhihu_URL + url['href'] upvote_num = int(upvote_num['data-votecount']) content = answer_content_process(content) yield Answer(url, self, author_obj, upvote_num, content, session=self._session) else: new_header = dict(Default_Header) new_header['Referer'] = self.url params = {"url_token": self.id, 'pagesize': '50', 'offset': 0} data = {'_xsrf': self.xsrf, 'method': 'next', 'params': ''} for i in range(0, (self.answer_num - 1) // 50 + 1): if i == 0: # 修正各种建议修改的回答…… error_answers = self.soup.find_all('div', id='answer-status') for each in error_answers: each['class'] = 'zm-editable-content' answers_wrap = self.soup.find('div', id='zh-question-answer-wrap') # 正式处理 authors = answers_wrap.find_all( 'div', class_='zm-item-answer-author-info') urls = answers_wrap.find_all('a', class_='answer-date-link') upvote_nums = answers_wrap.find_all('div', class_='zm-item-vote-info') contents = answers_wrap.find_all( 'div', class_='zm-editable-content') assert len(authors) == len(urls) == len(upvote_nums) == len(contents) for author, url, upvote_num, content in \ zip(authors, urls, upvote_nums, contents): a_url, name, motto, photo = parser_author_from_tag(author) author_obj = Author(a_url, name, motto, photo_url=photo, session=self._session) url = Zhihu_URL + url['href'] upvote_num = int(upvote_num['data-votecount']) content = answer_content_process(content) yield Answer(url, self, author_obj, upvote_num, content, session=self._session) else: params['offset'] = i * 50 data['params'] = json.dumps(params) r = self._session.post(Question_Get_More_Answer_URL, data=data, headers=new_header) answer_list = r.json()['msg'] for answer_html in answer_list: yield self._parse_answer_html(answer_html, Author, Answer) @property def top_answer(self): """获取排名第一的答案. :return: 排名第一的答案 :rtype: Answer """ for a in self.answers: return a def top_i_answer(self, i): """获取排名某一位的答案. :param int i: 要获取的答案的排名 :return: 答案对象,能直接获取的属性参见answers方法 :rtype: Answer """ for j, a in enumerate(self.answers): if j == i - 1: return a def top_i_answers(self, i): """获取排名在前几位的答案. :param int i: 获取前几个 :return: 答案对象,返回生成器 :rtype: Answer.Iterable """ for j, a in enumerate(self.answers): if j <= i - 1: yield a else: return @property @check_soup('_author') def author(self): """获取问题的提问者. :return: 提问者 :rtype: Author or zhihu.ANONYMOUS """ from .author import Author, ANONYMOUS logs = self._query_logs() author_a = logs[-1].find_all('div')[0].a if author_a.text == '匿名用户': return ANONYMOUS else: url = Zhihu_URL + author_a['href'] return Author(url, name=author_a.text, session=self._session) @property @check_soup('_creation_time') def creation_time(self): """ :return: 问题创建时间 :rtype: datetime.datetime """ logs = self._query_logs() time_string = logs[-1].find('div', class_='zm-item-meta').time['datetime'] return datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S") @property @check_soup('_last_edit_time') def last_edit_time(self): """ :return: 问题最后编辑时间 :rtype: datetime.datetime """ data = {'_xsrf': self.xsrf, 'offset': '1'} res = self._session.post(self.url + 'log', data=data) _, content = res.json()['msg'] soup = BeautifulSoup(content) time_string = soup.find_all('time')[0]['datetime'] return datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S") def _query_logs(self): if self._logs is None: gotten_feed_num = 20 start = '0' offset = 0 api_url = self.url + 'log' while gotten_feed_num == 20: data = {'_xsrf': self.xsrf, 'offset': offset, 'start': start} res = self._session.post(api_url, data=data) gotten_feed_num, content = res.json()['msg'] offset += gotten_feed_num soup = BeautifulSoup(content) logs = soup.find_all('div', class_='zm-item') start = logs[-1]['id'][8:] if len(logs) > 0 else '0' time.sleep(0.2) # prevent from posting too quickly self._logs = logs return self._logs def refresh(self): """刷新 Question object 的属性. 例如回答数增加了, 先调用 ``refresh()`` 再访问 answer_num 属性, 可获得更新后的答案数量. :return: None """ super().refresh() self._html = None self._title = None self._details = None self._answer_num = None self._follower_num = None self._topics = None self._last_edit_time = None self._logs = None @property @check_soup('_deleted') def deleted(self): """问题是否被删除, 被删除了返回 True, 未被删除返回 False :return: True or False """ return self._deleted def _parse_answer_html(self, answer_html, Author, Answer): soup = BeautifulSoup(answer_html) # 修正各种建议修改的回答…… error_answers = soup.find_all('div', id='answer-status') for each in error_answers: each['class'] = 'zm-editable-content' answer_url = self.url + 'answer/' + soup.div['data-atoken'] author = soup.find('div', class_='zm-item-answer-author-info') upvote_num = int(soup.find( 'div', class_='zm-item-vote-info')['data-votecount']) content = soup.find('div', class_='zm-editable-content') content = answer_content_process(content) a_url, name, motto, photo = parser_author_from_tag(author) author = Author(a_url, name, motto, photo_url=photo, session=self._session) return Answer(answer_url, self, author, upvote_num, content, session=self._session) def _get_content(self): # override base class's method cause we need self._url not self.url if self._url.endswith('/'): resp = self._session.get(self._url[:-1]) else: resp = self._session.get(self._url) if resp.status_code == 404: self._deleted = True else: self._deleted = False return resp.content PK y;QH zhihu/comment.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- class Comment: """评论类,一般不直接使用,而是作为``Answer.comments``迭代器的返回类型.""" def __init__(self, cid, answer, author, upvote_num, content, time, group_id=None): """创建评论类实例. :param int cid: 评论ID :param int group_id: 评论所在的组ID :param Answer answer: 评论所在的答案对象 :param Author author: 评论的作者对象 :param int upvote_num: 评论赞同数量 :param str content: 评论内容 :param datetime.datetime creation_time: 评论发表时间 :return: 评论对象 :rtype: Comment """ self.cid = cid self.answer = answer self.author = author self.upvote_num = upvote_num self.content = content self.creation_time = time self._group_id = group_id PK aSXHx zhihu/post.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from .common import * from .base import BaseZhihu, JsonAsSoupMixin class Post(JsonAsSoupMixin, BaseZhihu): """专栏文章类,请使用``ZhihuClient.post``方法构造对象.""" @class_common_init(re_post_url) def __init__(self, url, column=None, author=None, title=None, upvote_num=None, comment_num=None, session=None): """创建专栏文章类实例. :param str url: 文章url :param Column column: 文章所属专栏,可选 :param Author author: 文章作者,可选 :param str title: 文章标题,可选 :param int upvote_num: 文章赞同数,可选 :param int comment_num: 文章评论数,可选 :param Session session: 使用的网络会话,为空则使用新会话 :return: 专栏文章对象 :rtype: Post """ match = re_post_url.match(url) self.url = url self._session = session self._column = column self._author = author self._title = title self._upvote_num = upvote_num self._comment_num = comment_num self._column_in_name = match.group(1) # 专栏内部名称 self._slug = int(match.group(2)) # 文章编号 def _make_soup(self): if self.soup is None: json = self._get_content() self._gen_soup(json) def _get_content(self): origin_host = self._session.headers.get('Host') self._session.headers.update(Host='zhuanlan.zhihu.com') json = self._session.get( Column_Post_Data.format(self.column_in_name, self.slug)).json() self._session.headers.update(Host=origin_host) return json @property def column_in_name(self): """获取文章所在专栏的内部名称(用不到就忽视吧~) :return: 专栏的内部名称 :rtype: str """ return self._column_in_name @property def slug(self): """获取文章的编号(用不到就忽视吧~) :return: 文章编号 :rtype: int """ return self._slug @property @check_soup('_column') def column(self): """获取文章所在专栏. :return: 文章所在专栏 :rtype: Column """ from .column import Column url = Column_Url + '/' + self.soup['column']['slug'] name = self.soup['column']['name'] return Column(url, name, session=self._session) @property @check_soup('_author') def author(self): """获取文章作者. :return: 文章作者 :rtype: Author """ from .author import Author url = self.soup['author']['profileUrl'] name = self.soup['author']['name'] motto = self.soup['author']['bio'] template = self.soup['author']['avatar']['template'] photo_id = self.soup['author']['avatar']['id'] photo_url = template.format(id=photo_id, size='r') return Author(url, name, motto, photo_url=photo_url, session=self._session) @property @check_soup('_title') def title(self): """获取文章标题. :return: 文章标题 :rtype: str """ return self.soup['title'] @property @check_soup('_upvote_num') def upvote_num(self): """获取文章赞同数量. :return: 文章赞同数 :rtype: int """ return int(self.soup['likesCount']) @property @check_soup('_comment_num') def comment_num(self): """获取评论数量. :return: 评论数量 :rtype: int """ return self.soup['commentsCount'] def save(self, filepath=None, filename=None, mode="md"): """保存答案为 Html 文档或 markdown 文档. :param str filepath: 要保存的文件所在的目录, 不填为当前目录下以专栏标题命名的目录, 设为"."则为当前目录。 :param str filename: 要保存的文件名, 不填则默认为 所在文章标题 - 作者名.html/md。 如果文件已存在,自动在后面加上数字区分。 **自定义文件名时请不要输入后缀 .html 或 .md。** :param str mode: 保存类型,可选 `html` 、 `markdown` 、 `md` 。 :return: 无 :rtype: None """ if mode not in ["html", "md", "markdown"]: raise ValueError("`mode` must be 'html', 'markdown' or 'md'," " got {0}".format(mode)) file = get_path(filepath, filename, mode, self.column.name, self.title + '-' + self.author.name) with open(file, 'wb') as f: if mode == "html": f.write(self.soup['content'].encode('utf-8')) else: import html2text h2t = html2text.HTML2Text() h2t.body_width = 0 f.write(h2t.handle(self.soup['content']).encode('utf-8')) @property def upvoters(self): """获取文章的点赞用户 :return: 文章的点赞用户,返回生成器。 """ from .author import Author, ANONYMOUS self._make_soup() headers = dict(Default_Header) headers['Host'] = 'zhuanlan.zhihu.com' json = self._session.get( Post_Get_Upvoter.format( self.column_in_name, self.slug ), headers=headers ).json() for au in json: try: yield Author( au['profileUrl'], au['name'], au['bio'], photo_url=au['avatar']['template'].format( id=au['avatar']['id'], size='r'), session=self._session ) except ValueError: # invalid url yield ANONYMOUS PK y;QHu995 5 zhihu/answer.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json from datetime import datetime from .common import * from .base import BaseZhihu from .collection import Collection from .author import Author, ANONYMOUS class Answer(BaseZhihu): """答案类,请使用``ZhihuClient.answer``方法构造对象.""" @class_common_init(re_ans_url) def __init__(self, url, question=None, author=None, upvote_num=None, content=None, session=None): """创建答案类实例. :param str url: 答案url :param Question question: 答案所在的问题对象,可选 :param Author author: 答案回答者对象,可选 :param int upvote_num: 答案赞同数量,可选 :param str content: 答案内容,可选 :param Session session: 使用的网络会话,为空则使用新会话 :return: 答案对象 :rtype: Answer """ self.url = url self._session = session self._question = question self._author = author self._upvote_num = upvote_num self._content = content self._deleted = None @property def id(self): """答案的id :return: 答案id :rtype: int """ return int(re.match(r'.*/(\d+)/$', self.url).group(1)) @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_aid') def aid(self): """获取答案的内部id,某些POST操作需要此参数 :return: 答案内部id :rtype: str """ return int(self.soup.find('div', class_='zm-item-answer')['data-aid']) @property @check_soup('_html') def html(self): """获取网页源码 :return: 网页源码 :rtype: str """ return self.soup.prettify() @property @check_soup('_author') def author(self): """获取答案作者. :return: 答案作者 :rtype: Author """ from .author import Author author = self.soup.find('div', class_='zm-item-answer-author-info') url, name, motto, photo = parser_author_from_tag(author) if name == '匿名用户': return ANONYMOUS else: return Author(url, name, motto, photo_url=photo, session=self._session) @property @check_soup('_question') def question(self): """获取答案所在问题. :return: 答案所在问题 :rtype: Question """ from .question import Question question_link = self.soup.find( "h2", class_="zm-item-title zm-editable-content").a url = Zhihu_URL + question_link["href"] title = question_link.text followers_num = int(self.soup.find( 'div', class_='zh-question-followers-sidebar').div.a.strong.text) answers_num = int(re_get_number.match(self.soup.find( 'div', class_='zh-answers-title').h3.a.text).group(1)) return Question(url, title, followers_num, answers_num, session=self._session) @property @check_soup('_upvote_num') def upvote_num(self): """获取答案赞同数量. :return: 答案赞同数量 :rtype: int """ return int(self.soup.find( 'div', class_='zm-item-vote-info')['data-votecount']) @property def upvoters(self): """获取答案点赞用户,返回生成器. :return: 点赞用户 :rtype: Author.Iterable """ self._make_soup() next_req = '/answer/' + str(self.aid) + '/voters_profile' while next_req != '': data = self._session.get(Zhihu_URL + next_req).json() next_req = data['paging']['next'] for html in data['payload']: soup = BeautifulSoup(html) yield self._parse_author_soup(soup) @property @check_soup('_content') def content(self): """以处理过的Html代码形式返回答案内容. :return: 答案内容 :rtype: str """ answer_wrap = self.soup.find('div', id='zh-question-answer-wrap') content = answer_wrap.find('div', class_='zm-editable-content') content = answer_content_process(content) return content @property @check_soup('_creation_time') def creation_time(self): """获取答案创建时间 :return: 答案创建时间 :rtype: datetime.datetime """ return datetime.fromtimestamp(int(self.soup.find( 'div', class_='zm-item-answer')['data-created'])) @property @check_soup('_collect_num') def collect_num(self): """获取答案收藏数 :return: 答案收藏数量 :rtype: int """ element = self.soup.find("a", { "data-za-a": "click_answer_collected_count" }) if element is None: return 0 else: return int(element.get_text()) @property def collections(self): """获取包含该答案的收藏夹 :return: 包含该答案的收藏夹 :rtype: Collection.Iterable collect_num 未必等于 len(collections),比如: https://www.zhihu.com/question/20064699/answer/13855720 显示被收藏 38 次,但只有 30 个收藏夹 """ import time gotten_feed_num = 20 offset = 0 data = { 'method':'next', '_xsrf': self.xsrf } while gotten_feed_num >= 10: data['params'] = "{\"answer_url\": %d,\"offset\": %d}" % (self.id, offset) res = self._session.post(url=Get_Collection_Url, data=data) gotten_feed_num = len(res.json()['msg']) offset += gotten_feed_num soup = BeautifulSoup(''.join(res.json()['msg'])) for zm_item in soup.find_all('div', class_='zm-item'): url = Zhihu_URL + zm_item.h2.a['href'] name = zm_item.h2.a.text links = zm_item.div.find_all('a') owner = Author(links[0]['href'], session=self._session) follower_num = int(links[1].text.split()[0]) yield Collection(url, owner=owner, name=name, follower_num=follower_num, session=self._session) time.sleep(0.2) # prevent from posting too quickly def save(self, filepath=None, filename=None, mode="html"): """保存答案为Html文档或markdown文档. :param str filepath: 要保存的文件所在的目录, 不填为当前目录下以问题标题命名的目录, 设为"."则为当前目录。 :param str filename: 要保存的文件名, 不填则默认为 所在问题标题 - 答主名.html/md。 如果文件已存在,自动在后面加上数字区分。 **自定义文件名时请不要输入后缀 .html 或 .md。** :param str mode: 保存类型,可选 `html` 、 `markdown` 、 `md` 。 :return: 无 :rtype: None """ if mode not in ["html", "md", "markdown"]: raise ValueError("`mode` must be 'html', 'markdown' or 'md'," " got {0}".format(mode)) file = get_path(filepath, filename, mode, self.question.title, self.question.title + '-' + self.author.name) with open(file, 'wb') as f: if mode == "html": f.write(self.content.encode('utf-8')) else: import html2text h2t = html2text.HTML2Text() h2t.body_width = 0 f.write(h2t.handle(self.content).encode('utf-8')) def _parse_author_soup(self, soup): from .author import Author, ANONYMOUS author_tag = soup.find('div', class_='body') if author_tag.string is None: author_name = author_tag.div.a['title'] author_url = author_tag.div.a['href'] author_motto = author_tag.div.span.text photo_url = PROTOCOL + soup.a.img['src'].replace('_m', '_r') numbers_tag = soup.find_all('li') numbers = [int(re_get_number.match(x.get_text()).group(1)) for x in numbers_tag] # noinspection PyTypeChecker return Author(author_url, author_name, author_motto, None, numbers[2], numbers[3], numbers[0], numbers[1], photo_url, session=self._session) else: return ANONYMOUS @property @check_soup('_comment_num') def comment_num(self): """ :return: 答案下评论的数量 :rtype: int """ comment_num_string = self.soup.find('a', class_=' meta-item toggle-comment').text number = comment_num_string.split()[0] return int(number) if number.isdigit() else 0 @property def comments(self): """获取答案下的所有评论. :return: 答案下的所有评论,返回生成器 :rtype: Comments.Iterable """ import math from .author import Author, ANONYMOUS from .comment import Comment api_url = Get_Answer_Comment_URL.format(self.aid) page = pages = 1 while page <= pages: res = self._session.get(api_url + '?page=' + str(page)) if page == 1: total = int(res.json()['paging']['totalCount']) if total == 0: return pages = math.ceil(total / 30) page += 1 comment_items = res.json()['data'] for comment_item in comment_items: comment_id = comment_item['id'] content = comment_item['content'] upvote_num = comment_item['likesCount'] time_string = comment_item['createdTime'][:19] time = datetime.strptime(time_string, "%Y-%m-%dT%H:%M:%S") if comment_item['author'].get('url') != None: a_url = comment_item['author']['url'] a_name = comment_item['author']['name'] photo_url_tmp = comment_item['author']['avatar']['template'] photo_url_id = comment_item['author']['avatar']['id'] a_photo_url = photo_url_tmp.replace( '{id}', photo_url_id).replace('_{size}', '') author_obj = Author(a_url, a_name, photo_url=a_photo_url, session=self._session) else: author_obj = ANONYMOUS yield Comment(comment_id, self, author_obj, upvote_num, content, time) @property def latest_comments(self): """获取答案下的所有评论。较新的评论先返回。 使用该方法比 ``reversed(list(answer.comments))`` 效率高 因为现在靠后的热门评论会被挪到前面,所以返回的评论未必严格满足时间先后关系 :return: 答案下的所有评论,返回生成器 :rtype: Comments.Iterable """ import math from .author import Author, ANONYMOUS from .comment import Comment if self.comment_num == 0: return pages = math.ceil(self.comment_num / 30) api_url = Get_Answer_Comment_URL.format(self.aid) for page in range(pages, 0, -1): res = self._session.get(api_url + '?page=' + str(page)) comment_items = res.json()['data'] for comment_item in reversed(comment_items): comment_id = comment_item['id'] content = comment_item['content'] upvote_num = comment_item['likesCount'] time_string = comment_item['createdTime'][:19] time = datetime.strptime(time_string, "%Y-%m-%dT%H:%M:%S") if comment_item['author'].get('url') != None: a_url = comment_item['author']['url'] a_name = comment_item['author']['name'] photo_url_tmp = comment_item['author']['avatar']['template'] photo_url_id = comment_item['author']['avatar']['id'] a_photo_url = photo_url_tmp.replace( '{id}', photo_url_id).replace('_{size}', '') author_obj = Author(a_url, a_name, photo_url=a_photo_url, session=self._session) else: author_obj = ANONYMOUS yield Comment(comment_id, self, author_obj, upvote_num, content, time) def refresh(self): """刷新 Answer object 的属性. 例如赞同数增加了, 先调用 ``refresh()`` 再访问 upvote_num属性, 可获得更新后的赞同数. :return: None """ super().refresh() self._html = None self._upvote_num = None self._content = None self._collect_num = None self._comment_num = None @property @check_soup('_deleted') def deleted(self): """答案是否被删除, 被删除了返回 True, 为被删除返回 False :return: True or False """ return self._deleted PK +\HĈajD jD zhihu/topic.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from datetime import datetime from .common import * from .base import BaseZhihu class Topic(BaseZhihu): """答案类,请使用``ZhihuClient.topic``方法构造对象.""" @class_common_init(re_topic_url) def __init__(self, url, name=None, session=None): """创建话题类实例. :param url: 话题url :param name: 话题名称,可选 :return: Topic """ self.url = url self._session = session self._name = name self._id = int(re_topic_url.match(self.url).group(1)) @property def id(self): """获取话题Id(网址最后那串数字) :return: 话题Id :rtype: int """ return self._id @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_tid') def tid(self): """话题内部Id,有时候要用到 :return: 话题内部Id :rtype: int """ return int(self.soup.find( 'div', id='zh-topic-desc')['data-resourceid']) @property @check_soup('_name') def name(self): """获取话题名称. :return: 话题名称 :rtype: str """ return self.soup.find('h1').text @property def parents(self): """获取此话题的父话题。 注意:由于没找到有很多父话题的话题来测试, 所以本方法可能再某些时候出现问题,请不吝反馈。 :return: 此话题的父话题,返回生成器 :rtype: Topic.Iterable """ self._make_soup() parent_topic_tag = self.soup.find('div', class_='parent-topic') if parent_topic_tag is None: yield [] else: for topic_tag in parent_topic_tag.find_all('a'): yield Topic(Zhihu_URL + topic_tag['href'], topic_tag.text.strip(), session=self._session) @property def children(self): """获取此话题的子话题 :return: 此话题的子话题, 返回生成器 :rtype: Topic.Iterable """ self._make_soup() child_topic_tag = self.soup.find('div', class_='child-topic') if child_topic_tag is None: return [] elif '共有' not in child_topic_tag.contents[-2].text: for topic_tag in child_topic_tag.div.find_all('a'): yield Topic(Zhihu_URL + topic_tag['href'], topic_tag.text.strip(), session=self._session) else: flag = 'load' child = '' data = {'_xsrf': self.xsrf} params = { 'parent': self.id } while flag == 'load': params['child'] = child res = self._session.post(Topic_Get_Children_Url, params=params, data=data) j = map(lambda x: x[0], res.json()['msg'][1]) *topics, last = j for topic in topics: yield Topic(Zhihu_URL + '/topic/' + topic[2], topic[1], session=self._session) flag = last[0] child = last[2] if flag == 'topic': yield Topic(Zhihu_URL + '/topic/' + last[2], last[1], session=self._session) @property @check_soup('_follower_num') def follower_num(self): """获取话题关注人数. :return: 关注人数 :rtype: int """ follower_num_block = self.soup.find( 'div', class_='zm-topic-side-followers-info') # 无人关注时 找不到对应block,直接返回0 (感谢知乎用户 段晓晨 提出此问题) if follower_num_block.strong is None: return 0 return int(follower_num_block.strong.text) @property def followers(self): """获取话题关注者 :return: 话题关注者,返回生成器 :rtype: Author.Iterable """ from .author import Author, ANONYMOUS self._make_soup() gotten_data_num = 20 data = { '_xsrf': self.xsrf, 'start': '', 'offset': 0 } while gotten_data_num == 20: res = self._session.post( Topic_Get_More_Follower_Url.format(self.id), data=data) j = res.json()['msg'] gotten_data_num = j[0] data['offset'] += gotten_data_num soup = BeautifulSoup(j[1]) divs = soup.find_all('div', class_='zm-person-item') for div in divs: h2 = div.h2 url = Zhihu_URL + h2.a['href'] name = h2.a.text motto = h2.next_element.text try: yield Author(url, name, motto, session=self._session) except ValueError: # invalid url yield ANONYMOUS data['start'] = int(re_get_number.match(divs[-1]['id']).group(1)) @property @check_soup('_photo_url') def photo_url(self): """获取话题头像图片地址. :return: 话题头像url :rtype: str """ img = self.soup.find('a', id='zh-avartar-edit-form').img['src'] return img.replace('_m', '_r') @property @check_soup('_description') def description(self): """获取话题描述信息. :return: 话题描述信息 :rtype: str """ desc = self.soup.find('div', class_='zm-editable-content').text return desc @property def top_authors(self): """获取最佳回答者 :return: 此话题下最佳回答者,一般来说是5个,要不就没有,返回生成器 :rtype: Author.Iterable """ from .author import Author, ANONYMOUS self._make_soup() t = self.soup.find('div', id='zh-topic-top-answerer') if t is None: return for d in t.find_all('div', class_='zm-topic-side-person-item-content'): url = Zhihu_URL + d.a['href'] name = d.a.text motto = d.div['title'] try: yield Author(url, name, motto, session=self._session) except ValueError: # invalid url yield ANONYMOUS @property def top_answers(self): """获取话题下的精华答案. :return: 话题下的精华答案,返回生成器. :rtype: Answer.Iterable """ from .question import Question from .answer import Answer from .author import Author, ANONYMOUS top_answers_url = Topic_Top_Answers_Url.format(self.id) params = {'page': 1} while True: # 超出50页直接返回 if params['page'] > 50: return res = self._session.get(top_answers_url, params=params) params['page'] += 1 soup = BeautifulSoup(res.content) # 不够50页,来到错误页面 返回 if soup.find('div', class_='error') is not None: return questions = soup.find_all('a', class_='question_link') answers = soup.find_all('a', class_='answer-date-link') authors = soup.find_all('div', class_='zm-item-answer-author-info') upvotes = soup.find_all('a', class_='zm-item-vote-count') for ans, up, q, au in zip(answers, upvotes, questions, authors): answer_url = Zhihu_URL + ans['href'] question_url = Zhihu_URL + q['href'] question_title = q.text upvote = int(up['data-votecount']) question = Question(question_url, question_title, session=self._session) if au.a is None: author = ANONYMOUS else: author_url = Zhihu_URL + au.a['href'] author_name = au.a.text author_motto = au.strong['title'] if au.strong else '' author = Author(author_url, author_name, author_motto, session=self._session) yield Answer(answer_url, question, author, upvote, session=self._session) @property def questions(self): """获取话题下的所有问题(按时间降序排列) :return: 话题下所有问题,返回生成器 :rtype: Question.Iterable """ from .question import Question question_url = Topic_Questions_Url.format(self.id) params = {'page': 1} older_time_stamp = int(time.time()) * 1000 while True: res = self._session.get(question_url, params=params) soup = BeautifulSoup(res.content) if soup.find('div', class_='error') is not None: return questions = soup.find_all('div', class_='question-item') questions = list(filter( lambda x: int(x.h2.span['data-timestamp']) < older_time_stamp, questions)) for qu_div in questions: url = Zhihu_URL + qu_div.h2.a['href'] title = qu_div.h2.a.text creation_time = datetime.fromtimestamp( int(qu_div.h2.span['data-timestamp']) // 1000) yield Question(url, title, creation_time=creation_time, session=self._session) older_time_stamp = int(questions[-1].h2.span['data-timestamp']) params['page'] += 1 @property def unanswered_questions(self): """获取话题下的等待回答的问题 什么是「等待回答」的问题:https://www.zhihu.com/question/40470324 :return: 话题下等待回答的问题,返回生成器 :rtype: Question.Iterable """ from .question import Question question_url = Topic_Unanswered_Question_Url.format(self.id) params = {'page': 1} while True: res = self._session.get(question_url, params=params) soup = BeautifulSoup(res.content) if soup.find('div', class_='error') is not None: return questions = soup.find_all('div', class_='question-item') for qu_div in questions: url = Zhihu_URL + qu_div.h2.a['href'] title = qu_div.h2.a.text yield Question(url, title, session=self._session) params['page'] += 1 @property def answers(self): """获取话题下所有答案(按时间降序排列) :return: 话题下所有答案,返回生成器 :rtype: Answer.Iterable """ from .question import Question from .answer import Answer from .author import Author, ANONYMOUS newest_url = Topic_Newest_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(newest_url) soup = BeautifulSoup(res.content) while True: divs = soup.find_all('div', class_='folding') # 如果话题下无答案,则直接返回 if len(divs) == 0: return last_score = divs[-1]['data-score'] for div in divs: q = div.find('a', class_="question_link") question_url = Zhihu_URL + q['href'] question_title = q.text question = Question(question_url, question_title, session=self._session) ans = div.find('a', class_='answer-date-link') answer_url = Zhihu_URL + ans['href'] up = div.find('a', class_='zm-item-vote-count') upvote = int(up['data-votecount']) au = div.find('div', class_='zm-item-answer-author-info') if au.a is None: author = ANONYMOUS else: author_url = Zhihu_URL + au.a['href'] author_name = au.a.text author_motto = au.strong['title'] if au.strong else '' author = Author(author_url, author_name, author_motto, session=self._session) yield Answer(answer_url, question, author, upvote, session=self._session) params['offset'] = last_score res = self._session.post(newest_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到内容数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @property def hot_questions(self): """获取话题下热门的问题 :return: 话题下的热门动态中的问题,按热门度顺序返回生成器 :rtype: Question.Iterable """ from .question import Question hot_questions_url = Topic_Hot_Questions_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(hot_questions_url) soup = BeautifulSoup(res.content) while True: questions_duplicate = soup.find_all('a', class_='question_link') # 如果话题下无问题,则直接返回 if len(questions_duplicate) == 0: return # 去除重复的问题 questions = list(set(questions_duplicate)) questions.sort(key=self._get_score, reverse=True) last_score = soup.find_all( 'div', class_='feed-item')[-1]['data-score'] for q in questions: question_url = Zhihu_URL + q['href'] question_title = q.text question = Question(question_url, question_title, session=self._session) yield question params['offset'] = last_score res = self._session.post(hot_questions_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到问题数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @property def hot_answers(self): """获取话题下热门的回答 :return: 话题下的热门动态中的回答,按热门度顺序返回生成器 :rtype: Question.Iterable """ from .question import Question from .author import Author from .answer import Answer hot_questions_url = Topic_Hot_Questions_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(hot_questions_url) soup = BeautifulSoup(res.content) while True: answers_div = soup.find_all('div', class_='feed-item') last_score = answers_div[-1]['data-score'] for div in answers_div: # 没有 text area 的情况是:答案被和谐。 if not div.textarea: continue question_url = Zhihu_URL + div.h2.a['href'] question_title = div.h2.a.text question = Question(question_url, question_title, session=self._session) author_link = div.find('a', class_='author-link') if not author_link: author_url = None author_name = '匿名用户' author_motto = '' else: author_url = Zhihu_URL + author_link['href'] author_name = author_link.text author_motto_span = div.find('span', class_='bio') author_motto = author_motto_span['title'] \ if author_motto_span else '' author = Author(author_url, author_name, author_motto, session=self._session) body = div.find('div', class_='entry-body') answer_url = question_url + "/answer/" + body['data-atoken'] upvote_num = int(div.find( 'a', class_='zm-item-vote-count')['data-votecount']) yield Answer(answer_url, question, author, upvote_num, session=self._session) params['offset'] = last_score res = self._session.post(hot_questions_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到问题数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @staticmethod def _get_score(tag): h2 = tag.parent div = h2.parent try: _ = h2['class'] return div['data-score'] except KeyError: return div.parent.parent['data-score'] PK aSXHI40 0 zhihu/base.pyfrom .common import BeautifulSoup from requests import Response import json class BaseZhihu: def _gen_soup(self, content): self.soup = BeautifulSoup(content) def _get_content(self): resp = self._session.get(self.url[:-1]) if self.__class__.__name__ == 'Answer': if 'answer' in resp.url: self._deleted = False else: self._deleted = True return resp.content def _make_soup(self): if self.url and not self.soup: self._gen_soup(self._get_content()) def refresh(self): # refresh self.soup's content self._gen_soup(self._get_content()) @classmethod def from_html(cls, content): obj = cls(url=None) obj._gen_soup(content) return obj class JsonAsSoupMixin: def _gen_soup(self, content): # 为了让`from_html`对外提供统一的接口, 判断一下输入, 如果是bytes 或者 str 则用json处理, # 否则认为是由_get_content返回的dict if isinstance(content, bytes): r = Response() r._content = content soup = r.json() self.soup = soup elif isinstance(content, str): self.soup = json.loads(content) else: self.soup = content PK y;QHv?W W zhihu/client.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time import json import requests import importlib from .common import * class ZhihuClient: """知乎客户端类,内部维护了自己专用的网络会话,可用cookies或账号密码登录.""" def __init__(self, cookies=None): """创建客户端类实例. :param str cookies: 见 :meth:`.login_with_cookies` 中 ``cookies`` 参数 :return: 知乎客户端对象 :rtype: ZhihuClient """ self._session = requests.Session() self._session.headers.update(Default_Header) if cookies is not None: assert isinstance(cookies, str) self.login_with_cookies(cookies) # ===== login staff ===== @staticmethod def _get_captcha_url(): return Captcha_URL_Prefix + str(int(time.time() * 1000)) def get_captcha(self): """获取验证码数据。 :return: 验证码图片数据。 :rtype: bytes """ # some unbelievable zhihu logic self._session.get(Zhihu_URL) data = {'email': '', 'password': '', 'remember_me': 'true'} self._session.post(Login_URL, data=data) r = self._session.get(self._get_captcha_url()) return r.content def login(self, email, password, captcha): """登陆知乎. :param str email: 邮箱 :param str password: 密码 :param str captcha: 验证码 :return: ======== ======== ============== ==================== 元素序号 元素类型 意义 说明 ======== ======== ============== ==================== 0 int 是否成功 0为成功,1为失败 1 str 失败原因 登录成功则为空字符串 2 str cookies字符串 登录失败则为空字符串 ======== ======== ============== ==================== :rtype: (int, str, str) """ data = {'email': email, 'password': password, 'remember_me': 'true', 'captcha': captcha} r = self._session.post(Login_URL, data=data) j = r.json() code = int(j['r']) message = j['msg'] cookies_str = json.dumps(self._session.cookies.get_dict()) \ if code == 0 else '' return code, message, cookies_str def login_with_cookies(self, cookies): """使用cookies文件或字符串登录知乎 :param str cookies: ============== =========================== 参数形式 作用 ============== =========================== 文件名 将文件内容作为cookies字符串 cookies字符串 直接提供cookies字符串 ============== =========================== :return: 无 :rtype: None """ if os.path.isfile(cookies): with open(cookies) as f: cookies = f.read() cookies_dict = json.loads(cookies) self._session.cookies.update(cookies_dict) def login_in_terminal(self): """不使用cookies,在终端中根据提示登陆知乎 :return: 如果成功返回cookies字符串 :rtype: str """ print('====== zhihu login =====') email = input('email: ') password = input('password: ') captcha_data = self.get_captcha() with open('captcha.gif', 'wb') as f: f.write(captcha_data) print('please check captcha.gif for captcha') captcha = input('captcha: ') os.remove('captcha.gif') print('====== logging.... =====') code, msg, cookies = self.login(email, password, captcha) if code == 0: print('login successfully') else: print('login failed, reason: {0}'.format(msg)) return cookies def create_cookies(self, file): cookies_str = self.login_in_terminal() if cookies_str: with open(file, 'w') as f: f.write(cookies_str) print('cookies file created.') else: print('can\'t create cookies.') # ===== network staff ===== def set_proxy(self, proxy): """设置代理 :param str proxy: 使用 "http://example.com:port" 的形式 :return: 无 :rtype: None :说明: 由于一个 :class:`.ZhihuClient` 对象和它创建出来的其他知乎对象共用 一个Session,所以调用这个方法也会将所有生成出的知乎类设置上代理。 """ self._session.proxies.update({'http': proxy}) # ===== getter staff ====== def me(self): """获取使用特定cookies的Me实例 :return: cookies对应的Me对象 :rtype: Me """ from .me import Me headers = dict(Default_Header) headers['Host'] = 'zhuanlan.zhihu.com' res = self._session.get(Get_Me_Info_Url, headers=headers) json_data = res.json() url = json_data['profileUrl'] name = json_data['name'] motto = json_data['bio'] photo = json_data['avatar']['template'].format( id=json_data['avatar']['id'], size='r') return Me(url, name, motto, photo, session=self._session) def __getattr__(self, item: str): """本函数用于获取各种类,如 `Answer` `Question` 等. :支持的形式有: 1. client.answer() 2. client.author() 3. client.collection() 4. client.column() 5. client.post() 6. client.question() 7. client.topic() 参数均为对应页面的url,返回对应的类的实例。 """ def getter(url): return getattr(module, item.capitalize())(url, session=self._session) attr_list = ['answer', 'author', 'collection', 'column', 'post', 'question', 'topic'] if item.lower() in attr_list: module = importlib.import_module('.'+item.lower(), 'zhihu') return getter PK +\H=2) ( ( zhihu/common.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import functools import re import os from requests import Session from bs4 import BeautifulSoup as _Bs from bs4 import Tag, NavigableString from requests.packages.urllib3.util import Retry try: __import__('lxml') BeautifulSoup = lambda makeup: _Bs(makeup, 'lxml') except ImportError: BeautifulSoup = lambda makeup: _Bs(makeup, 'html.parser') Default_Header = {'X-Requested-With': 'XMLHttpRequest', 'Referer': 'http://www.zhihu.com', 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; ' 'rv:39.0) Gecko/20100101 Firefox/39.0', 'Host': 'www.zhihu.com'} Zhihu_URL = 'https://www.zhihu.com' Login_URL = Zhihu_URL + '/login/email' Captcha_URL_Prefix = Zhihu_URL + '/captcha.gif?r=' Get_Profile_Card_URL = Zhihu_URL + '/node/MemberProfileCardV2' Question_Get_More_Answer_URL = Zhihu_URL + '/node/QuestionAnswerListV2' Answer_Add_Comment_URL = Zhihu_URL + '/node/AnswerCommentAddV2' Answer_Comment_Box_URL = Zhihu_URL + '/node/AnswerCommentBoxV2' Get_Answer_Comment_URL = Zhihu_URL + '/r/answers/{0}/comments' Author_Get_More_Followers_URL = Zhihu_URL + '/node/ProfileFollowersListV2' Author_Get_More_Followees_URL = Zhihu_URL + '/node/ProfileFolloweesListV2' Author_Get_More_Follow_Column_URL = Zhihu_URL + \ '/node/ProfileFollowedColumnsListV2' Author_Get_More_Follow_Topic_URL = Zhihu_URL + \ '/people/{0}/topics' PROTOCOL = '' Column_Url = 'http://zhuanlan.zhihu.com' Column_API = Column_Url + '/api/columns' Column_Data = Column_API + '/{0}' Column_Posts_Data = Column_API + '/{0}/posts?limit=10&offset={1}' Column_Post_Data = Column_API + '/{0}/posts/{1}' Post_Get_Upvoter = Column_API + '/{0}/posts/{1}/likers' Topic_Url = Zhihu_URL + '/topic' Topic_Get_Children_Url = Topic_Url + '/{0}/organize/entire' Topic_Get_More_Follower_Url = Topic_Url + '/{0}/followers' Topic_Questions_Url = Topic_Url + '/{0}/questions' Topic_Unanswered_Question_Url = Topic_Url + '/{0}/unanswered' Topic_Top_Answers_Url = Topic_Url + '/{0}/top-answers' Topic_Hot_Questions_Url = Topic_Url + '/{0}/hot' Topic_Newest_Url = Topic_Url + '/{0}/newest' Get_Me_Info_Url = Column_Url + '/api/me' Upvote_Answer_Url = Zhihu_URL + '/node/AnswerVoteBarV2' Upvote_Article_Url = Column_API + '/{0}/posts/{1}/rating' Follow_Author_Url = Zhihu_URL + '/node/MemberFollowBaseV2' Follow_Question_Url = Zhihu_URL + '/node/QuestionFollowBaseV2' Follow_Topic_Url = Zhihu_URL + '/node/TopicFollowBaseV2' Follow_Collection_Url = Zhihu_URL + '/collection/follow' Unfollow_Collection_Url = Zhihu_URL + '/collection/unfollow' Thanks_Url = Zhihu_URL + '/answer/thanks' Cancel_Thanks_Url = Zhihu_URL + '/answer/cancel_thanks' Send_Message_Url = Zhihu_URL + '/inbox/post' Unhelpful_Url = Zhihu_URL + '/answer/not_helpful' Cancel_Unhelpful_Url = Zhihu_URL + '/answer/helpful' Get_Collection_Url = Zhihu_URL + '/node/AnswerFavlists' re_question_url = re.compile( r'^https?://www\.zhihu\.com/question/\d+(\?sort=created|/?)$') re_question_url_std = re.compile(r'^https?://www\.zhihu\.com/question/\d+/?') re_ans_url = re.compile( r'^https?://www\.zhihu\.com/question/\d+/answer/\d+/?$') re_author_url = re.compile(r'^https?://www\.zhihu\.com/people/[^/]+/?$') re_collection_url = re.compile(r'^https?://www\.zhihu\.com/collection/\d+/?$') re_column_url = re.compile(r'^http://zhuanlan\.zhihu\.com/([^/]+)/?$') re_post_url = re.compile(r'^http://zhuanlan\.zhihu\.com/([^/]+)/(\d+)/?$') re_topic_url = re.compile(r'^https?://www\.zhihu\.com/topic/(\d+)/?$') re_a2q = re.compile(r'(.*)/a.*') re_collection_url_split = re.compile(r'.*(/c.*)') re_get_number = re.compile(r'[^\d]*(\d+).*') re_del_empty_line = re.compile(r'\n*(.*)\n*') def check_soup(attr, soup_type='_make_soup'): def real(func): @functools.wraps(func) def wrapper(self): # noinspection PyTypeChecker value = getattr(self, attr, None) if value is None: if soup_type == '_make_soup': getattr(self, soup_type)() elif self.soup is None: getattr(self, soup_type)() value = func(self) setattr(self, attr, value) return value return wrapper return real def class_common_init(url_re, allowed_none=True, trailing_slash=True): def real(func): @functools.wraps(func) def wrapper(self, url, *args, **kwargs): if url is None and not allowed_none: raise ValueError('Invalid Url: ' + url) if url is not None: if url_re.match(url) is None: raise ValueError('Invalid URL: ' + url) if not url.endswith('/') and trailing_slash: url += '/' if 'session' not in kwargs.keys() or kwargs['session'] is None: kwargs['session'] = Session() kwargs['session'].mount('https://', Retry(5)) kwargs['session'].mount('http://', Retry(5)) self.soup = None return func(self, url, *args, **kwargs) return wrapper return real def remove_invalid_char(text): """去除字符串中的无效字符,一般用于保存文件时保证文件名的有效性. :param str text: 待处理的字符串 :return: 处理后的字符串 :rtype: str """ invalid_char_list = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\n'] res = '' for char in text: if char not in invalid_char_list: res += char return res def parser_author_from_tag(author): author_link = author.find('a', class_='author-link') if author_link is None: return None, '匿名用户', '', '' else: author_name = author_link.text motto_span = author.find('span', class_='bio') author_motto = motto_span['title'] \ if motto_span is not None else '' author_url = Zhihu_URL + author_link['href'] avatar_link = author.find('a', class_='avatar-link') photo_url = PROTOCOL + avatar_link.img['src'].replace('_s', '_r') return author_url, author_name, author_motto, photo_url def parser_author_from_comment(author): author_avatar = author.find('a', class_='zm-item-link-avatar') if author_avatar is None: return None, '匿名用户', '' else: author_link = author.find('a', class_='zg-link') author_name = author_link.text author_url = author_link['href'] avatar_link = author.find('img', class_='zm-item-img-avatar') photo_url = PROTOCOL + avatar_link['src'].replace('_s', '_r') return author_url, author_name, photo_url def answer_content_process(content): content = clone_bs4_elem(content) del content['class'] soup = BeautifulSoup( '
') soup.body.append(content) no_script_list = soup.find_all("noscript") for no_script in no_script_list: no_script.extract() img_list = soup.find_all( "img", class_=["origin_image", "content_image"]) for img in img_list: if "content_image" in img['class']: img['data-original'] = img['data-actualsrc'] new_img = soup.new_tag('img', src=PROTOCOL + img['data-original']) img.replace_with(new_img) if img.next_sibling is None: new_img.insert_after(soup.new_tag('br')) useless_list = soup.find_all("i", class_="icon-external") for useless in useless_list: useless.extract() return soup.prettify() def get_path(path, filename, mode, default_path, default_name): if path is None: path = os.path.join( os.getcwd(), remove_invalid_char(default_path)) if filename is None: filename = remove_invalid_char(default_name) if os.path.isdir(path) is False: os.makedirs(path) temp = filename i = 0 while os.path.isfile(os.path.join(path, temp) + '.' + mode): i += 1 temp = filename + str(i) return os.path.join(path, temp) + '.' + mode def common_follower(url, xsrf, session): from .author import Author, ANONYMOUS headers = dict(Default_Header) headers['Referer'] = url data = {'offset': 0, '_xsrf': xsrf} gotten_data_num = 20 offset = 0 while gotten_data_num == 20: data['offset'] = offset res = session.post(url, data=data, headers=headers) json_data = res.json()['msg'] gotten_data_num = json_data[0] offset += gotten_data_num soup = BeautifulSoup(json_data[1]) follower_divs = soup.find_all('div', class_='zm-profile-card') for div in follower_divs: if div.a is not None: author_name = div.a['title'] author_url = Zhihu_URL + div.a['href'] author_motto = div.find('div', class_='zg-big-gray').text author_photo = PROTOCOL + div.img['src'].replace('_m', '_r') numbers = [re_get_number.match(a.text).group(1) for a in div.find_all('a', target='_blank')] try: yield Author(author_url, author_name, author_motto, *numbers, photo_url=author_photo, session=session) except ValueError: # invalid url yield ANONYMOUS else: yield ANONYMOUS def clone_bs4_elem(el): """Clone a bs4 tag before modifying it. Code from `http://stackoverflow.com/questions/23057631/clone-element-with -beautifulsoup` """ if isinstance(el, NavigableString): return type(el)(el) copy = Tag(None, el.builder, el.name, el.namespace, el.nsprefix) # work around bug where there is no builder set # https://bugs.launchpad.net/beautifulsoup/+bug/1307471 copy.attrs = dict(el.attrs) for attr in ('can_be_empty_element', 'hidden'): setattr(copy, attr, getattr(el, attr)) for child in el.contents: copy.append(clone_bs4_elem(child)) return copy PK y;QH;3&