PKy;QH tWWzhihu/author.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json from .common import * from .base import BaseZhihu class Author(BaseZhihu): """用户类,请使用``ZhihuClient.answer``方法构造对象.""" @class_common_init(re_author_url, True) def __init__(self, url, name=None, motto=None, follower_num=None, question_num=None, answer_num=None, upvote_num=None, thank_num=None, photo_url=None, session=None): """创建用户类实例. :param str url: 用户主页url,形如 http://www.zhihu.com/people/7sdream :param str name: 用户名字,可选 :param str motto: 用户简介,可选 :param int follower_num: 用户粉丝数,可选 :param int question_num: 用户提问数,可选 :param int answer_num: 用户答案数,可选 :param int upvote_num: 用户获得赞同数,可选 :param int thank_num: 用户获得感谢数,可选 :param str photo_url: 用户头像地址,可选 :param Session session: 使用的网络会话,为空则使用新会话。 :return: 用户对象 :rtype: Author """ self.url = url self._session = session self.card = None self._nav_list = None self._name = name self._motto = motto self._follower_num = follower_num self._question_num = question_num self._answer_num = answer_num self._upvote_num = upvote_num self._thank_num = thank_num self._photo_url = photo_url def _gen_soup(self, content): self.soup = BeautifulSoup(content) self._nav_list = self.soup.find( 'div', class_='profile-navbar').find_all('a') def _make_card(self): if self.card is None and self.url is not None: params = {'url_token': self.id} real_params = {'params': json.dumps(params)} r = self._session.get(Get_Profile_Card_URL, params=real_params) self.card = BeautifulSoup(r.content) @property def id(self): """获取用户id,就是网址最后那一部分. :return: 用户id :rtype: str """ return re.match(r'^.*/([^/]+)/$', self.url).group(1) \ if self.url is not None else '' @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_hash_id') def hash_id(self): """获取作者的内部hash id(用不到就忽视吧~) :return: 用户hash id :rtype: str """ div = self.soup.find('div', class_='zm-profile-header-op-btns') if div is not None: return div.button['data-id'] else: ga = self.soup.find('script', attrs={'data-name': 'ga_vars'}) return json.loads(ga.text)['user_hash'] @property @check_soup('_name', '_make_card') def name(self): """获取用户名字. :return: 用户名字 :rtype: str """ if self.url is None: return '匿名用户' if self.soup is not None: return self.soup.find('div', class_='title-section').span.text else: assert self.card is not None return self.card.find('span', class_='name').text @property @check_soup('_motto', '_make_card') def motto(self): """获取用户自我介绍,由于历史原因,我还是把这个属性叫做motto吧. :return: 用户自我介绍 :rtype: str """ if self.url is None: return '' else: if self.soup is not None: bar = self.soup.find( 'div', class_='title-section') if len(bar.contents) < 4: return '' else: return bar.contents[3].text else: assert self.card is not None motto = self.card.find('div', class_='tagline') return motto.text if motto is not None else '' @property @check_soup('_photo_url', '_make_card') def photo_url(self): """获取用户头像图片地址. :return: 用户头像url :rtype: str """ if self.url is not None: if self.soup is not None: img = self.soup.find('img', class_='Avatar Avatar--l')['src'] return img.replace('_l', '_r') else: assert(self.card is not None) return PROTOCOL + self.card.img['src'].replace('_xs', '_r') else: return 'http://pic1.zhimg.com/da8e974dc_r.jpg' @property @check_soup('_followee_num') def followee_num(self): """获取关注了多少人. :return: 关注的人数 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'div', class_='zm-profile-side-following').a.strong.text) return number @property @check_soup('_follower_num') def follower_num(self): """获取追随者数量,就是关注此人的人数. :return: 追随者数量 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'div', class_='zm-profile-side-following zg-clear').find_all( 'a')[1].strong.text) return number @property @check_soup('_upvote_num') def upvote_num(self): """获取收到的的赞同数量. :return: 收到的的赞同数量 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'span', class_='zm-profile-header-user-agree').strong.text) return number @property @check_soup('_thank_num') def thank_num(self): """获取收到的感谢数量. :return: 收到的感谢数量 :rtype: int """ if self.url is None: return 0 else: number = int(self.soup.find( 'span', class_='zm-profile-header-user-thanks').strong.text) return number @property @check_soup('_weibo_url') def weibo_url(self): """获取用户微博链接. :return: 微博链接地址,如没有则返回 ‘unknown’ :rtype: str """ if self.url is None: return None else: tmp = self.soup.find( 'a', class_='zm-profile-header-user-weibo') return tmp['href'] if tmp is not None else 'unknown' @property def business(self): """用户的行业. :return: 用户的行业,如没有则返回 ‘unknown’ :rtype: str """ return self._find_user_profile('business') @property def location(self): """用户的所在地. :return: 用户的所在地,如没有则返回 ‘unknown’ :rtype: str """ return self._find_user_profile('location') @property def education(self): """用户的教育状况. :return: 用户的教育状况,如没有则返回 ‘unknown’ :rtype: str """ return self._find_user_profile('education') def _find_user_profile(self, t): self._make_soup() if self.url is None: return 'unknown' else: res = self.soup.find( 'span', class_=t) if res and res.has_attr('title'): return res['title'] else: return 'unknown' @property @check_soup('_gender') def gender(self): """用户的性别. :return: 用户的性别(male/female/unknown) :rtype: str """ if self.url is None: return 'unknown' else: return 'female' \ if self.soup.find('i', class_='icon-profile-female') \ else 'male' @property @check_soup('_question_num') def question_num(self): """获取提问数量. :return: 提问数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[1].span.text) @property @check_soup('_answer_num') def answer_num(self): """获取答案数量. :return: 答案数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[2].span.text) @property @check_soup('_post_num') def post_num(self): """获取专栏文章数量. :return: 专栏文章数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[3].span.text) @property @check_soup('_collection_num') def collection_num(self): """获取收藏夹数量. :return: 收藏夹数量 :rtype: int """ if self.url is None: return 0 else: return int(self._nav_list[4].span.text) @property @check_soup('_followed_column_num') def followed_column_num(self): """获取用户关注的专栏数 :return: 关注的专栏数 :rtype: int """ if self.url is not None: tag = self.soup.find('div', class_='zm-profile-side-columns') if tag is not None: return int(re_get_number.match( tag.parent.strong.text).group(1)) return 0 @property @check_soup('_followed_topic_num') def followed_topic_num(self): """获取用户关注的话题数 :return: 关注的话题数 :rtype: int """ if self.url is not None: tag = self.soup.find('div', class_='zm-profile-side-topics') if tag is not None: return int(re_get_number.match( tag.parent.strong.text).group(1)) return 0 @property def questions(self): """获取用户的所有问题. :return: 用户的所有问题,返回生成器. :rtype: Question.Iterable """ from .question import Question if self.url is None or self.question_num == 0: return for page_index in range(1, (self.question_num - 1) // 20 + 2): html = self._session.get( self.url + 'asks?page=' + str(page_index)).text soup = BeautifulSoup(html) question_links = soup.find_all('a', class_='question_link') question_datas = soup.find_all( 'div', class_='zm-profile-section-main') for link, data in zip(question_links, question_datas): url = Zhihu_URL + link['href'] title = link.text answer_num = int( re_get_number.match(data.div.contents[4]).group(1)) follower_num = int( re_get_number.match(data.div.contents[6]).group(1)) q = Question(url, title, follower_num, answer_num, session=self._session) yield q @property def answers(self): """获取用户的所有答案. :return: 用户所有答案,返回生成器. :rtype: Answer.Iterable """ from .question import Question from .answer import Answer if self.url is None or self.answer_num == 0: return for page_index in range(1, (self.answer_num - 1) // 20 + 2): html = self._session.get( self.url + 'answers?page=' + str(page_index)).text soup = BeautifulSoup(html) questions = soup.find_all('a', class_='question_link') upvotes = soup.find_all('a', class_='zm-item-vote-count') for q, upvote in zip(questions, upvotes): answer_url = Zhihu_URL + q['href'] question_url = Zhihu_URL + re_a2q.match(q['href']).group(1) question_title = q.text upvote = int(upvote['data-votecount']) question = Question(question_url, question_title, session=self._session) yield Answer(answer_url, question, self, upvote, session=self._session) @property def followers(self): """获取关注此用户的人. :return: 关注此用户的人,返回生成器 :rtype: Author.Iterable """ for x in self._follow_ee_ers('er'): yield x @property def followees(self): """获取用户关注的人. :return: 用户关注的人的,返回生成器 :rtype: Author.Iterable """ for x in self._follow_ee_ers('ee'): yield x def _follow_ee_ers(self, t): if self.url is None: return if t == 'er': request_url = Author_Get_More_Followers_URL else: request_url = Author_Get_More_Followees_URL self._make_card() if self.hash_id is None: self._make_soup() headers = dict(Default_Header) headers['Referer'] = self.url + 'follow' + t + 's' params = {"order_by": "created", "offset": 0, "hash_id": self.hash_id} data = {'_xsrf': self.xsrf, 'method': 'next', 'params': ''} gotten_date_num = 20 offset = 0 while gotten_date_num == 20: params['offset'] = offset data['params'] = json.dumps(params) res = self._session.post(request_url, data=data, headers=headers) json_data = res.json() gotten_date_num = len(json_data['msg']) offset += gotten_date_num for html in json_data['msg']: soup = BeautifulSoup(html) h2 = soup.find('h2') author_name = h2.a.text author_url = h2.a['href'] author_motto = soup.find('div', class_='zg-big-gray').text author_photo = PROTOCOL + soup.a.img['src'].replace('_m', '_r') numbers = [int(re_get_number.match(x.text).group(1)) for x in soup.find_all('a', target='_blank')] try: yield Author(author_url, author_name, author_motto, *numbers, photo_url=author_photo, session=self._session) except ValueError: # invalid url yield ANONYMOUS @property def collections(self): """获取用户收藏夹. :return: 用户收藏夹,返回生成器 :rtype: Collection.Iterable """ from .collection import Collection if self.url is None or self.collection_num == 0: return else: collection_num = self.collection_num for page_index in range(1, (collection_num - 1) // 20 + 2): html = self._session.get( self.url + 'collections?page=' + str(page_index)).text soup = BeautifulSoup(html) collections_names = soup.find_all( 'a', class_='zm-profile-fav-item-title') collection_follower_nums = soup.find_all( 'div', class_='zm-profile-fav-bio') for c, f in zip(collections_names, collection_follower_nums): c_url = Zhihu_URL + c['href'] c_name = c.text c_fn = int(re_get_number.match(f.contents[2]).group(1)) yield Collection(c_url, self, c_name, c_fn, session=self._session) @property def columns(self): """获取用户专栏. :return: 用户专栏,返回生成器 :rtype: Column.Iterable """ from .column import Column if self.url is None or self.post_num == 0: return soup = BeautifulSoup(self._session.get(self.url + 'posts').text) column_tags = soup.find_all('div', class_='column') for column_tag in column_tags: name = column_tag.div.a.span.text url = column_tag.div.a['href'] follower_num = int(re_get_number.match( column_tag.div.div.a.text).group(1)) footer = column_tag.find('div', class_='footer') if footer is None: post_num = 0 else: post_num = int( re_get_number.match(footer.a.text).group(1)) yield Column(url, name, follower_num, post_num, session=self._session) @property def followed_columns(self): """获取用户关注的专栏. :return: 用户关注的专栏,返回生成器 :rtype: Column.Iterable """ from .column import Column if self.url is None: return if self.followed_column_num > 0: tag = self.soup.find('div', class_='zm-profile-side-columns') if tag is not None: for a in tag.find_all('a'): yield Column(a['href'], a.img['alt'], session=self._session) if self.followed_column_num > 7: offset = 7 gotten_data_num = 20 while gotten_data_num == 20: params = { 'hash_id': self.hash_id, 'limit': 20, 'offset': offset } data = { 'method': 'next', '_xsrf': self.xsrf, 'params': json.dumps(params) } j = self._session.post(Author_Get_More_Follow_Column_URL, data=data).json() gotten_data_num = len(j['msg']) offset += gotten_data_num for msg in map(BeautifulSoup, j['msg']): name = msg.strong.text url = msg.a['href'] post_num = int(re_get_number.match( msg.span.text).group(1)) yield Column(url, name, post_num=post_num, session=self._session) @property def followed_topics(self): """获取用户关注的话题. :return: 用户关注的话题,返回生成器 :rtype: Topic.Iterable """ from .topic import Topic if self.url is None: return if self.followed_topic_num > 0: tag = self.soup.find('div', class_='zm-profile-side-topics') if tag is not None: for a in tag.find_all('a'): yield Topic(Zhihu_URL + a['href'], a.img['alt'], session=self._session) if self.followed_topic_num > 7: offset = 7 gotten_data_num = 20 while gotten_data_num == 20: data = {'start': 0, 'offset': offset, '_xsrf': self.xsrf} j = self._session.post( Author_Get_More_Follow_Topic_URL.format(self.id), data=data).json() gotten_data_num = j['msg'][0] offset += gotten_data_num topic_item = BeautifulSoup(j['msg'][1]).find_all( 'div', class_='zm-profile-section-item') for div in topic_item: name = div.strong.text url = Zhihu_URL + div.a['href'] yield Topic(url, name, session=self._session) @property def activities(self): """获取用户的最近动态. :return: 最近动态,返回生成器,具体说明见 :class:`.Activity` :rtype: Activity.Iterable """ from .activity import Activity if self.url is None: return gotten_feed_num = 20 start = '0' api_url = self.url + 'activities' while gotten_feed_num == 20: data = {'_xsrf': self.xsrf, 'start': start} res = self._session.post(api_url, data=data) gotten_feed_num = res.json()['msg'][0] soup = BeautifulSoup(res.json()['msg'][1]) acts = soup.find_all( 'div', class_='zm-profile-section-item zm-item clearfix') start = acts[-1]['data-time'] if len(acts) > 0 else 0 for act in acts: # --- ignore Round Table temporarily --- if act.attrs['data-type-detail'] == "member_follow_roundtable": continue # --- --- --- --- -- --- --- --- --- --- yield Activity(act, self._session, self) @property def last_activity_time(self): """获取用户最后一次活动的时间 :return: 用户最后一次活动的时间,返回值为 unix 时间戳 :rtype: int """ self._make_soup() act = self.soup.find( 'div', class_='zm-profile-section-item zm-item clearfix') return int(act['data-time']) if act is not None else -1 def is_zero_user(self): """返回当前用户是否为三零用户,其实是四零: 赞同0,感谢0,提问0,回答0. :return: 是否是三零用户 :rtype: bool """ return self.upvote_num + self.thank_num + \ self.question_num + self.answer_num == 0 class _Anonymous: def __init__(self): self.name = "匿名用户" self.url = '' ANONYMOUS = _Anonymous() """匿名用户常量,通过 ``zhihu.ANONYMOUS`` 访问。 提问者、回答者、点赞者、问题关注者、评论者都可能是 ``ANONYMOUS`` """ PK ,\Hzhihu/__init__.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- __author__ = '7sDream' __version__ = '0.3.12' from .client import ZhihuClient from .question import Question from .author import Author, ANONYMOUS from .activity import Activity from .acttype import ActType from .answer import Answer from .collection import Collection from .column import Column from .post import Post from .topic import Topic __all__ = ['ZhihuClient', 'Question', 'Author', 'ActType', 'Activity', 'Answer', 'Collection', 'Column', 'Post', 'Topic', 'ANONYMOUS'] PKy;QHrBzzzhihu/activity.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from datetime import datetime from .common import * from .acttype import ActType from .question import Question from .answer import Answer from .column import Column from .post import Post from .topic import Topic from .author import Author, ANONYMOUS from .collection import Collection class Activity: """用户动态类,请使用Author.activities获取.""" def __init__(self, act, session, author): """创建用户动态类实例. :param bs4.element.Tag act: 表示用户动态的页面元素 :param Session session: 使用的网络会话 :param Author author: Activity 所属的用户对象 :return: 用户动态对象 :rtype: Activity :说明: 根据Activity.type不同可以获取不同属性,具体请看 :class:`.ActType` """ self._session = session self._author = author self._type = ActType.from_str(act.attrs['data-type-detail']) useless_tag = act.div.find('a', class_='zg-link') if useless_tag is not None: useless_tag.extract() attribute = self._get_assemble_method(self.type)(act) self._attr = attribute.__class__.__name__.lower() setattr(self, self._attr, attribute) self._time = datetime.fromtimestamp(int(act['data-time'])) @property def type(self): """ :return: 用户动态类型, 具体参见 :class:`.ActType` :rtype: class:`.ActType` """ return self._type @property def content(self): """获取此对象中能提供的那个属性,对应表请查看 :class:`.ActType` 类. :return: 对象提供的对象 :rtype: Author or Question or Answer or Topic or Column or Post """ return getattr(self, self._attr) @property def time(self): """ :return: 返回用户执行 Activity 操作的时间 :rtype: datetime.datetime """ return self._time def __find_post(self, act): column_url = act.find('a', class_='column_link')['href'] column_name = act.find('a', class_='column_link').text column = Column(column_url, column_name, session=self._session) try: author_tag = act.find('div', class_='author-info') author_url = Zhihu_URL + author_tag.a['href'] author_info = list(author_tag.stripped_strings) author_name = author_info[0] author_motto = author_info[1] \ if len(author_info) > 1 else '' author = Author(author_url, author_name, author_motto, session=self._session) except TypeError: author = ANONYMOUS post_url = act.find('a', class_='post-link')['href'] post_title = act.find('a', class_='post-link').text post_comment_num, post_upvote_num = self._parse_un_cn(act) return Post(post_url, column, author, post_title, post_upvote_num, post_comment_num, session=self._session) def _assemble_create_post(self, act): return self.__find_post(act) def _assemble_voteup_post(self, act): return self.__find_post(act) def _assemble_follow_column(self, act): return Column(act.div.a['href'], act.div.a.text, session=self._session) def _assemble_follow_topic(self, act): topic_url = Zhihu_URL + act.div.a['href'] topic_name = act.div.a['title'] return Topic(topic_url, topic_name, session=self._session) def _assemble_answer_question(self, act): question_url = Zhihu_URL + re_a2q.match(act.div.find_all('a')[-1]['href']).group(1) question_title = act.div.find_all('a')[-1].text question = Question(question_url, question_title, session=self._session) answer_url = Zhihu_URL + act.div.find_all('a')[-1]['href'] answer_comment_num, answer_upvote_num = self._parse_un_cn(act) return Answer(answer_url, question, self._author, answer_upvote_num, session=self._session) def _assemble_voteup_answer(self, act): question_url = Zhihu_URL + re_a2q.match(act.div.a['href']).group(1) question_title = act.div.a.text question = Question(question_url, question_title, session=self._session) try_find_author = act.find_all('a', class_='author-link', href=re.compile('^/people/[^/]*$')) if len(try_find_author) == 0: author_url = None author_name = '匿名用户' author_motto = '' photo_url = None else: try_find_author = try_find_author[-1] author_url = Zhihu_URL + try_find_author['href'] author_name = try_find_author.text try_find_motto = try_find_author.parent.span if try_find_motto is None: author_motto = '' else: author_motto = try_find_motto['title'] photo_url = PROTOCOL + try_find_author.parent.a.img[ 'src'].replace('_s', '_r') author = Author(author_url, author_name, author_motto, photo_url=photo_url, session=self._session) answer_url = Zhihu_URL + act.div.a['href'] answer_comment_num, answer_upvote_num = self._parse_un_cn(act) return Answer(answer_url, question, author, answer_upvote_num, session=self._session) def _assemble_ask_question(self, act): return Question(Zhihu_URL + act.div.contents[3]['href'], list(act.div.children)[3].text, session=self._session) def _assemble_follow_question(self, act): return Question(Zhihu_URL + act.div.a['href'], act.div.a.text, session=self._session) def _assemble_follow_collection(self, act): url = act.div.a['href'] if not url.startswith('http'): url = Zhihu_URL + url return Collection(url, session=self._session) def _get_assemble_method(self, act_type): assemble_methods = { ActType.UPVOTE_POST: self._assemble_voteup_post, ActType.FOLLOW_COLUMN: self._assemble_follow_column, ActType.UPVOTE_ANSWER: self._assemble_voteup_answer, ActType.ANSWER_QUESTION: self._assemble_answer_question, ActType.ASK_QUESTION: self._assemble_ask_question, ActType.FOLLOW_QUESTION: self._assemble_follow_question, ActType.FOLLOW_TOPIC: self._assemble_follow_topic, ActType.PUBLISH_POST: self._assemble_create_post, ActType.FOLLOW_COLLECTION: self._assemble_follow_collection } if act_type in assemble_methods: return assemble_methods[act_type] else: raise ValueError('invalid activity type') @staticmethod def _parse_un_cn(act): upvote_num = int(act.find('a', class_='zm-item-vote-count')['data-votecount']) comment = act.find('a', class_='toggle-comment') comment_text = next(comment.stripped_strings) comment_num_match = re_get_number.match(comment_text) comment_num = int(comment_num_match.group(1)) if comment_num_match is not None else 0 return comment_num, upvote_num PKy;QH*a zhihu/acttype.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import enum match = { 'ANSWER_QUESTION': 'member_answer_question', 'UPVOTE_ANSWER': 'member_voteup_answer', 'ASK_QUESTION': 'member_ask_question', 'FOLLOW_QUESTION': 'member_follow_question', 'UPVOTE_POST': 'member_voteup_article', 'FOLLOW_COLUMN': 'member_follow_column', 'FOLLOW_TOPIC': 'member_follow_topic', 'PUBLISH_POST': 'member_create_article', 'FOLLOW_COLLECTION': 'member_follow_favlist' } reverse_match = {v: k for k, v in match.items()} class ActType(enum.Enum): """用于表示用户动态的类型. :常量说明: ================= ================ ============ ===================== 常量名 说明 提供属性 属性类型 ================= ================ ============ ===================== ANSWER_QUESTION 回答了一个问题 answer :class:`.Answer` UPVOTE_ANSWER 赞同了一个回答 answer :class:`.Answer` ASK_QUESTION 提出了一个问题 question :class:`.Question` FOLLOW_QUESTION 关注了一个问题 question :class:`.Question` UPVOTE_POST 赞同了一篇文章 post :class:`.Post` FOLLOW_COLUMN 关注了一个专栏 column :class:`.Column` FOLLOW_TOPIC 关注了一个话题 topic :class:`.Topic` PUBLISH_POST 发表了一篇文章 post :class:`.Post` FOLLOW_COLLECTION 关注了一个收藏夹 collection :class:`.Collection` ================= ================ ============ ===================== """ ANSWER_QUESTION = 1 UPVOTE_ANSWER = 2 ASK_QUESTION = 4 FOLLOW_QUESTION = 8 UPVOTE_POST = 16 FOLLOW_COLUMN = 32 FOLLOW_TOPIC = 64 PUBLISH_POST = 128 FOLLOW_COLLECTION = 256 @classmethod def from_str(cls, div_class): return cls.__getattr__(reverse_match[div_class]) def __str__(self): return match[self.name] class CollectActType(enum.Enum): """用于表示收藏夹操作的类型. :常量说明: ================= ============== 常量名 说明 ================= ============== INSERT_ANSWER 在收藏夹中增加一个回答 DELETE_ANSWER 在收藏夹中删除一个回答 CREATE_COLLECTION 创建收藏夹 ================= ============== """ INSERT_ANSWER = 1 DELETE_ANSWER = 2 CREATE_COLLECTION = 3 PKy;QH%**zhihu/collection.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from .common import * from .base import BaseZhihu class Collection(BaseZhihu): """收藏夹,请使用``ZhihuClient.collection``方法构造对象.""" @class_common_init(re_collection_url) def __init__(self, url, owner=None, name=None, follower_num=None, session=None): """创建收藏夹类实例. :param str url: 收藏夹主页url,必须 :param Author owner: 收藏夹拥有者,可选 :param str name: 收藏夹标题,可选 :param int follower_num: 收藏夹关注人数,可选 :param Session session: 使用的网络会话,为空则使用新会话。 :return: 收藏夹对象 :rtype: Collection """ self.url = url self._session = session self.soup = None self._name = name self._owner = owner self._follower_num = follower_num self._id = int(re.match(r'.*/(\d+)', self.url).group(1)) @property def id(self): """获取收藏夹id(网址最后的部分). :return: 收藏夹id :rtype: int """ return self._id @property @check_soup('_cid') def cid(self): """获取收藏夹内部Id(用不到忽视就好) :return: 内部Id :rtype: int """ return int(re_get_number.match( self.soup.find('a', attrs={'name': 'focus'})['id']).group(1)) @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find( 'input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_name') def name(self): """获取收藏夹名字. :return: 收藏夹名字 :rtype: str """ return re_del_empty_line.match( self.soup.find('h2', id='zh-fav-head-title').text).group(1) @property @check_soup('_owner') def owner(self): """获取收藏夹拥有者,返回Author对象. :return: 收藏夹拥有者 :rtype: Author """ from .author import Author a = self.soup.find('h2', class_='zm-list-content-title').a name = a.text url = Zhihu_URL + a['href'] motto = self.soup.find( 'div', id='zh-single-answer-author-info').div.text photo_url = PROTOCOL + self.soup.find( 'img', class_='zm-list-avatar-medium')['src'].replace('_m', '_r') return Author(url, name, motto, photo_url=photo_url, session=self._session) @property @check_soup('_follower_num') def follower_num(self): """获取关注此收藏夹的人数. :return: 关注此收藏夹的人数 :rtype: int """ href = re_collection_url_split.match(self.url).group(1) return int(self.soup.find('a', href=href + 'followers').text) @property def followers(self): """获取关注此收藏夹的用户 :return: 关注此收藏夹的用户 :rtype: Author.Iterable """ self._make_soup() followers_url = self.url + 'followers' for x in common_follower(followers_url, self.xsrf, self._session): yield x @property def questions(self): """获取收藏夹内所有问题对象. :return: 收藏夹内所有问题,返回生成器 :rtype: Question.Iterable """ self._make_soup() # noinspection PyTypeChecker for question in self._page_get_questions(self.soup): yield question i = 2 while True: soup = BeautifulSoup(self._session.get( self.url[:-1] + '?page=' + str(i)).text) for question in self._page_get_questions(soup): if question == 0: return yield question i += 1 @property def answers(self): """获取收藏夹内所有答案对象. :return: 收藏夹内所有答案,返回生成器 :rtype: Answer.Iterable """ self._make_soup() # noinspection PyTypeChecker for answer in self._page_get_answers(self.soup): yield answer i = 2 while True: soup = BeautifulSoup(self._session.get( self.url[:-1] + '?page=' + str(i)).text) for answer in self._page_get_answers(soup): if answer == 0: return yield answer i += 1 @property def logs(self): """获取收藏夹日志 :return: 收藏夹日志中的操作,返回生成器 :rtype: CollectActivity.Iterable """ import time from datetime import datetime from .answer import Answer from .acttype import CollectActType self._make_soup() gotten_feed_num = 20 offset = 0 data = { 'start': 0, '_xsrf': self.xsrf } api_url = self.url + 'log' while gotten_feed_num == 20: data['offset'] = offset res = self._session.post(url=api_url, data=data) gotten_feed_num = res.json()['msg'][0] soup = BeautifulSoup(res.json()['msg'][1]) offset += gotten_feed_num zm_items = soup.find_all('div', class_='zm-item') for zm_item in zm_items: act_time = datetime.strptime(zm_item.find('time').text, "%Y-%m-%d %H:%M:%S") if zm_item.find('ins'): try: answer = Answer(Zhihu_URL + zm_item.find('ins').a['href'], session=self._session) type = CollectActType.INSERT_ANSWER yield CollectActivity(type, act_time, self.owner, self, answer) except ValueError: type = CollectActType.CREATE_COLLECTION yield CollectActivity(type, act_time, self.owner, self) elif zm_item.find('del'): type = CollectActType.DELETE_ANSWER answer = Answer(Zhihu_URL + zm_item.find('del').a['href'], session=self._session) yield CollectActivity(type, act_time, self.owner, self, answer) else: continue data['start'] = zm_items[-1]['id'][8:] time.sleep(0.5) def _page_get_questions(self, soup): from .question import Question question_tags = soup.find_all("div", class_="zm-item") if len(question_tags) == 0: yield 0 return else: for question_tag in question_tags: if question_tag.h2 is not None: question_title = question_tag.h2.a.text question_url = Zhihu_URL + question_tag.h2.a['href'] yield Question(question_url, question_title, session=self._session) def _page_get_answers(self, soup): from .question import Question from .author import Author, ANONYMOUS from .answer import Answer answer_tags = soup.find_all("div", class_="zm-item") if len(answer_tags) == 0: yield 0 return else: question = None for tag in answer_tags: # 判断是否是'建议修改的回答'等情况 url_tag = tag.find('a', class_='answer-date-link') if url_tag is None: reason = tag.find('div', id='answer-status').p.text print("pass a answer, reason %s ." % reason) continue if tag.h2 is not None: question_title = tag.h2.a.text question_url = Zhihu_URL + tag.h2.a['href'] question = Question(question_url, question_title, session=self._session) answer_url = Zhihu_URL + url_tag['href'] div = tag.find('div', class_='zm-item-answer-author-info') author_link = div.find('a', class_='author-link') if author_link is not None: author_url = Zhihu_URL + author_link['href'] author_name = author_link.text motto_span = div.find('span', class_='bio') author_motto = motto_span['title'] if motto_span else '' author = Author(author_url, author_name, author_motto, session=self._session) else: author = ANONYMOUS upvote = int(tag.find( 'a', class_='zm-item-vote-count')['data-votecount']) answer = Answer(answer_url, question, author, upvote, session=self._session) yield answer class CollectActivity: """收藏夹操作, 请使用``Collection.logs``构造对象.""" def __init__(self, type, time, owner, collection, answer=None): """创建收藏夹操作类实例 :param acttype.CollectActType type: 操作类型 :param datetime.datetime time: 进行操作的时间 :param Author owner: 收藏夹的拥有者 :param Collection collection: 所属收藏夹 :param Answer answer: 收藏的答案,可选 :return: CollectActivity """ self._type = type self._time = time self._owner = owner self._collection = collection self._answer = answer @property def type(self): """ :return: 收藏夹操作类型, 具体参见 :class:`.CollectActType` :rtype: :class:`.CollectActType` """ return self._type @property def answer(self): """ :return: 添加或删除收藏的答案, 若是创建收藏夹操作返回 None :rtype: Answer or None """ return self._answer @property def time(self): """ :return: 进行操作的时间 :rtype: datetime.datetime """ return self._time @property def owner(self): """ :return: 收藏夹的拥有者 :rtype: Author """ return self._owner @property def collection(self): """ :return: 所属收藏夹 :rtype: Collection """ return self._collection PKy;QH99@ @ zhihu/column.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from .common import * from .base import BaseZhihu, JsonAsSoupMixin class Column(JsonAsSoupMixin, BaseZhihu): """专栏类,请使用``ZhihuClient.column``方法构造对象.""" @class_common_init(re_column_url) def __init__(self, url, name=None, follower_num=None, post_num=None, session=None): """创建专栏类实例. :param str url: 专栏url :param str name: 专栏名,可选 :param int follower_num: 关注者数量,可选 :param int post_num: 文章数量,可选 :param Session session: 使用的网络会话,为空则使用新会话。 :return: 专栏对象 :rtype: Column """ self._in_name = re_column_url.match(url).group(1) self.url = url self._session = session self._name = name self._follower_num = follower_num self._post_num = post_num def _make_soup(self): if self.soup is None: json = self._get_content() self._gen_soup(json) def _get_content(self): origin_host = self._session.headers.get('Host') self._session.headers.update(Host='zhuanlan.zhihu.com') res = self._session.get(Column_Data.format(self._in_name)) self._session.headers.update(Host=origin_host) return res.json() @property @check_soup('_name') def name(self): """获取专栏名称. :return: 专栏名称 :rtype: str """ return self.soup['name'] @property @check_soup('_follower_num') def follower_num(self): """获取关注人数. :return: 关注人数 :rtype: int """ return int(self.soup['followersCount']) @property @check_soup('_post_num') def post_num(self): """获取专栏文章数. :return: 专栏文章数 :rtype: int """ return int(self.soup['postsCount']) @property def posts(self): """获取专栏的所有文章. :return: 专栏所有文章,返回生成器 :rtype: Post.Iterable """ origin_host = self._session.headers.get('Host') for offset in range(0, (self.post_num - 1) // 10 + 1): self._session.headers.update(Host='zhuanlan.zhihu.com') res = self._session.get( Column_Posts_Data.format(self._in_name, offset * 10)) soup = res.json() self._session.headers.update(Host=origin_host) for post in soup: yield self._parse_post_data(post) def _parse_post_data(self, post): from .author import Author from .post import Post url = Column_Url + post['url'] template = post['author']['avatar']['template'] photo_id = post['author']['avatar']['id'] photo_url = template.format(id=photo_id, size='r') author = Author(post['author']['profileUrl'], post['author']['name'], post['author']['bio'], photo_url=photo_url, session=self._session) title = post['title'] upvote_num = post['likesCount'] comment_num = post['commentsCount'] print(url) return Post(url, self, author, title, upvote_num, comment_num, session=self._session) PKaSXH;;zhihu/question.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import time from datetime import datetime from .common import * from .base import BaseZhihu class Question(BaseZhihu): """问题类,请使用``ZhihuClient.question``方法构造对象.""" @class_common_init(re_question_url, trailing_slash=False) def __init__(self, url, title=None, followers_num=None, answer_num=None, creation_time=None, author=None, session=None): """创建问题类实例. :param str url: 问题url. 现在支持两种 url 1. https://www.zhihu.com/question/qid 2. https://www.zhihu.com/question/qid?sort=created 区别在于,使用第一种,调用 ``question.answers`` 的时候会按投票排序返回答案; 使用第二种, 会按时间排序返回答案, 后提交的答案先返回 :param str title: 问题标题,可选, :param int followers_num: 问题关注人数,可选 :param int answer_num: 问题答案数,可选 :param datetime.datetime creation_time: 问题创建时间,可选 :param Author author: 提问者,可选 :return: 问题对象 :rtype: Question """ self._session = session self._url = url self._title = title self._answer_num = answer_num self._followers_num = followers_num self._id = int(re.match(r'.*/(\d+)', self.url).group(1)) self._author = author self._creation_time = creation_time self._logs = None self._deleted = None @property def url(self): # always return url like https://www.zhihu.com/question/1234/ url = re.match(re_question_url_std, self._url).group() return url if url.endswith('/') else url + '/' @property def id(self): """获取问题id(网址最后的部分). :return: 问题id :rtype: int """ return self._id @property @check_soup('_qid') def qid(self): """获取问题内部id(用不到就忽视吧) :return: 问题内部id :rtype: int """ return int(self.soup.find( 'div', id='zh-question-detail')['data-resourceid']) @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_html') def html(self): """获取页面源码. :return: 页面源码 :rtype: str """ return self.soup.prettify() @property @check_soup('_title') def title(self): """获取问题标题. :return: 问题标题 :rtype: str """ return self.soup.find('h2', class_='zm-item-title') \ .text.replace('\n', '') @property @check_soup('_details') def details(self): """获取问题详细描述,目前实现方法只是直接获取文本,效果不满意……等更新. :return: 问题详细描述 :rtype: str """ return self.soup.find("div", id="zh-question-detail").div.text @property @check_soup('_answer_num') def answer_num(self): """获取问题答案数量. :return: 问题答案数量 :rtype: int """ answer_num_block = self.soup.find('h3', id='zh-question-answer-num') # 当0人回答或1回答时,都会找不到 answer_num_block, # 通过找答案的赞同数block来判断到底有没有答案。 # (感谢知乎用户 段晓晨 提出此问题) if answer_num_block is None: if self.soup.find('span', class_='count') is not None: return 1 else: return 0 return int(answer_num_block['data-num']) @property @check_soup('_follower_num') def follower_num(self): """获取问题关注人数. :return: 问题关注人数 :rtype: int """ follower_num_block = self.soup.find('div', class_='zg-gray-normal') # 无人关注时 找不到对应block,直接返回0 (感谢知乎用户 段晓晨 提出此问题) if follower_num_block.strong is None: return 0 return int(follower_num_block.strong.text) @property @check_soup('_topics') def topics(self): """获取问题所属话题. :return: 问题所属话题 :rtype: list(str) """ topics_list = [] for topic in self.soup.find_all('a', class_='zm-item-tag'): topics_list.append(topic.text.replace('\n', '')) return topics_list @property def followers(self): """获取关注此问题的用户 :return: 关注此问题的用户 :rtype: Author.Iterable :问题: 要注意若执行过程中另外有人关注,可能造成重复获取到某些用户 """ self._make_soup() followers_url = self.url + 'followers' for x in common_follower(followers_url, self.xsrf, self._session): yield x @property def answers(self): """获取问题的所有答案. :return: 问题的所有答案,返回生成器 :rtype: Answer.Iterable """ from .author import Author from .answer import Answer self._make_soup() # TODO: 统一逻辑. 完全可以都用 _parse_answer_html 的逻辑替换 if self._url.endswith('sort=created'): pager = self.soup.find('div', class_='zm-invite-pager') if pager is None: max_page = 1 else: max_page = int(pager.find_all('span')[-2].a.text) for page in range(1, max_page + 1): if page == 1: soup = self.soup else: url = self._url + '&page=%d' % page soup = BeautifulSoup(self._session.get(url).content) error_answers = soup.find_all('div', id='answer-status') for each in error_answers: each['class'] = 'zm-editable-content' answers_wrap = soup.find('div', id='zh-question-answer-wrap') # 正式处理 authors = answers_wrap.find_all( 'div', class_='zm-item-answer-author-info') urls = answers_wrap.find_all('a', class_='answer-date-link') upvote_nums = answers_wrap.find_all('div', class_='zm-item-vote-info') contents = answers_wrap.find_all( 'div', class_='zm-editable-content') assert len(authors) == len(urls) == len(upvote_nums) == len(contents) for author, url, upvote_num, content in \ zip(authors, urls, upvote_nums, contents): a_url, name, motto, photo = parser_author_from_tag(author) author_obj = Author(a_url, name, motto, photo_url=photo, session=self._session) url = Zhihu_URL + url['href'] upvote_num = int(upvote_num['data-votecount']) content = answer_content_process(content) yield Answer(url, self, author_obj, upvote_num, content, session=self._session) else: new_header = dict(Default_Header) new_header['Referer'] = self.url params = {"url_token": self.id, 'pagesize': '50', 'offset': 0} data = {'_xsrf': self.xsrf, 'method': 'next', 'params': ''} for i in range(0, (self.answer_num - 1) // 50 + 1): if i == 0: # 修正各种建议修改的回答…… error_answers = self.soup.find_all('div', id='answer-status') for each in error_answers: each['class'] = 'zm-editable-content' answers_wrap = self.soup.find('div', id='zh-question-answer-wrap') # 正式处理 authors = answers_wrap.find_all( 'div', class_='zm-item-answer-author-info') urls = answers_wrap.find_all('a', class_='answer-date-link') upvote_nums = answers_wrap.find_all('div', class_='zm-item-vote-info') contents = answers_wrap.find_all( 'div', class_='zm-editable-content') assert len(authors) == len(urls) == len(upvote_nums) == len(contents) for author, url, upvote_num, content in \ zip(authors, urls, upvote_nums, contents): a_url, name, motto, photo = parser_author_from_tag(author) author_obj = Author(a_url, name, motto, photo_url=photo, session=self._session) url = Zhihu_URL + url['href'] upvote_num = int(upvote_num['data-votecount']) content = answer_content_process(content) yield Answer(url, self, author_obj, upvote_num, content, session=self._session) else: params['offset'] = i * 50 data['params'] = json.dumps(params) r = self._session.post(Question_Get_More_Answer_URL, data=data, headers=new_header) answer_list = r.json()['msg'] for answer_html in answer_list: yield self._parse_answer_html(answer_html, Author, Answer) @property def top_answer(self): """获取排名第一的答案. :return: 排名第一的答案 :rtype: Answer """ for a in self.answers: return a def top_i_answer(self, i): """获取排名某一位的答案. :param int i: 要获取的答案的排名 :return: 答案对象,能直接获取的属性参见answers方法 :rtype: Answer """ for j, a in enumerate(self.answers): if j == i - 1: return a def top_i_answers(self, i): """获取排名在前几位的答案. :param int i: 获取前几个 :return: 答案对象,返回生成器 :rtype: Answer.Iterable """ for j, a in enumerate(self.answers): if j <= i - 1: yield a else: return @property @check_soup('_author') def author(self): """获取问题的提问者. :return: 提问者 :rtype: Author or zhihu.ANONYMOUS """ from .author import Author, ANONYMOUS logs = self._query_logs() author_a = logs[-1].find_all('div')[0].a if author_a.text == '匿名用户': return ANONYMOUS else: url = Zhihu_URL + author_a['href'] return Author(url, name=author_a.text, session=self._session) @property @check_soup('_creation_time') def creation_time(self): """ :return: 问题创建时间 :rtype: datetime.datetime """ logs = self._query_logs() time_string = logs[-1].find('div', class_='zm-item-meta').time['datetime'] return datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S") @property @check_soup('_last_edit_time') def last_edit_time(self): """ :return: 问题最后编辑时间 :rtype: datetime.datetime """ data = {'_xsrf': self.xsrf, 'offset': '1'} res = self._session.post(self.url + 'log', data=data) _, content = res.json()['msg'] soup = BeautifulSoup(content) time_string = soup.find_all('time')[0]['datetime'] return datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S") def _query_logs(self): if self._logs is None: gotten_feed_num = 20 start = '0' offset = 0 api_url = self.url + 'log' while gotten_feed_num == 20: data = {'_xsrf': self.xsrf, 'offset': offset, 'start': start} res = self._session.post(api_url, data=data) gotten_feed_num, content = res.json()['msg'] offset += gotten_feed_num soup = BeautifulSoup(content) logs = soup.find_all('div', class_='zm-item') start = logs[-1]['id'][8:] if len(logs) > 0 else '0' time.sleep(0.2) # prevent from posting too quickly self._logs = logs return self._logs def refresh(self): """刷新 Question object 的属性. 例如回答数增加了, 先调用 ``refresh()`` 再访问 answer_num 属性, 可获得更新后的答案数量. :return: None """ super().refresh() self._html = None self._title = None self._details = None self._answer_num = None self._follower_num = None self._topics = None self._last_edit_time = None self._logs = None @property @check_soup('_deleted') def deleted(self): """问题是否被删除, 被删除了返回 True, 未被删除返回 False :return: True or False """ return self._deleted def _parse_answer_html(self, answer_html, Author, Answer): soup = BeautifulSoup(answer_html) # 修正各种建议修改的回答…… error_answers = soup.find_all('div', id='answer-status') for each in error_answers: each['class'] = 'zm-editable-content' answer_url = self.url + 'answer/' + soup.div['data-atoken'] author = soup.find('div', class_='zm-item-answer-author-info') upvote_num = int(soup.find( 'div', class_='zm-item-vote-info')['data-votecount']) content = soup.find('div', class_='zm-editable-content') content = answer_content_process(content) a_url, name, motto, photo = parser_author_from_tag(author) author = Author(a_url, name, motto, photo_url=photo, session=self._session) return Answer(answer_url, self, author, upvote_num, content, session=self._session) def _get_content(self): # override base class's method cause we need self._url not self.url if self._url.endswith('/'): resp = self._session.get(self._url[:-1]) else: resp = self._session.get(self._url) if resp.status_code == 404: self._deleted = True else: self._deleted = False return resp.content PKy;QHzhihu/comment.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- class Comment: """评论类,一般不直接使用,而是作为``Answer.comments``迭代器的返回类型.""" def __init__(self, cid, answer, author, upvote_num, content, time, group_id=None): """创建评论类实例. :param int cid: 评论ID :param int group_id: 评论所在的组ID :param Answer answer: 评论所在的答案对象 :param Author author: 评论的作者对象 :param int upvote_num: 评论赞同数量 :param str content: 评论内容 :param datetime.datetime creation_time: 评论发表时间 :return: 评论对象 :rtype: Comment """ self.cid = cid self.answer = answer self.author = author self.upvote_num = upvote_num self.content = content self.creation_time = time self._group_id = group_id PKaSXHx zhihu/post.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- from .common import * from .base import BaseZhihu, JsonAsSoupMixin class Post(JsonAsSoupMixin, BaseZhihu): """专栏文章类,请使用``ZhihuClient.post``方法构造对象.""" @class_common_init(re_post_url) def __init__(self, url, column=None, author=None, title=None, upvote_num=None, comment_num=None, session=None): """创建专栏文章类实例. :param str url: 文章url :param Column column: 文章所属专栏,可选 :param Author author: 文章作者,可选 :param str title: 文章标题,可选 :param int upvote_num: 文章赞同数,可选 :param int comment_num: 文章评论数,可选 :param Session session: 使用的网络会话,为空则使用新会话 :return: 专栏文章对象 :rtype: Post """ match = re_post_url.match(url) self.url = url self._session = session self._column = column self._author = author self._title = title self._upvote_num = upvote_num self._comment_num = comment_num self._column_in_name = match.group(1) # 专栏内部名称 self._slug = int(match.group(2)) # 文章编号 def _make_soup(self): if self.soup is None: json = self._get_content() self._gen_soup(json) def _get_content(self): origin_host = self._session.headers.get('Host') self._session.headers.update(Host='zhuanlan.zhihu.com') json = self._session.get( Column_Post_Data.format(self.column_in_name, self.slug)).json() self._session.headers.update(Host=origin_host) return json @property def column_in_name(self): """获取文章所在专栏的内部名称(用不到就忽视吧~) :return: 专栏的内部名称 :rtype: str """ return self._column_in_name @property def slug(self): """获取文章的编号(用不到就忽视吧~) :return: 文章编号 :rtype: int """ return self._slug @property @check_soup('_column') def column(self): """获取文章所在专栏. :return: 文章所在专栏 :rtype: Column """ from .column import Column url = Column_Url + '/' + self.soup['column']['slug'] name = self.soup['column']['name'] return Column(url, name, session=self._session) @property @check_soup('_author') def author(self): """获取文章作者. :return: 文章作者 :rtype: Author """ from .author import Author url = self.soup['author']['profileUrl'] name = self.soup['author']['name'] motto = self.soup['author']['bio'] template = self.soup['author']['avatar']['template'] photo_id = self.soup['author']['avatar']['id'] photo_url = template.format(id=photo_id, size='r') return Author(url, name, motto, photo_url=photo_url, session=self._session) @property @check_soup('_title') def title(self): """获取文章标题. :return: 文章标题 :rtype: str """ return self.soup['title'] @property @check_soup('_upvote_num') def upvote_num(self): """获取文章赞同数量. :return: 文章赞同数 :rtype: int """ return int(self.soup['likesCount']) @property @check_soup('_comment_num') def comment_num(self): """获取评论数量. :return: 评论数量 :rtype: int """ return self.soup['commentsCount'] def save(self, filepath=None, filename=None, mode="md"): """保存答案为 Html 文档或 markdown 文档. :param str filepath: 要保存的文件所在的目录, 不填为当前目录下以专栏标题命名的目录, 设为"."则为当前目录。 :param str filename: 要保存的文件名, 不填则默认为 所在文章标题 - 作者名.html/md。 如果文件已存在,自动在后面加上数字区分。 **自定义文件名时请不要输入后缀 .html 或 .md。** :param str mode: 保存类型,可选 `html` 、 `markdown` 、 `md` 。 :return: 无 :rtype: None """ if mode not in ["html", "md", "markdown"]: raise ValueError("`mode` must be 'html', 'markdown' or 'md'," " got {0}".format(mode)) file = get_path(filepath, filename, mode, self.column.name, self.title + '-' + self.author.name) with open(file, 'wb') as f: if mode == "html": f.write(self.soup['content'].encode('utf-8')) else: import html2text h2t = html2text.HTML2Text() h2t.body_width = 0 f.write(h2t.handle(self.soup['content']).encode('utf-8')) @property def upvoters(self): """获取文章的点赞用户 :return: 文章的点赞用户,返回生成器。 """ from .author import Author, ANONYMOUS self._make_soup() headers = dict(Default_Header) headers['Host'] = 'zhuanlan.zhihu.com' json = self._session.get( Post_Get_Upvoter.format( self.column_in_name, self.slug ), headers=headers ).json() for au in json: try: yield Author( au['profileUrl'], au['name'], au['bio'], photo_url=au['avatar']['template'].format( id=au['avatar']['id'], size='r'), session=self._session ) except ValueError: # invalid url yield ANONYMOUS PKy;QHu9955zhihu/answer.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json from datetime import datetime from .common import * from .base import BaseZhihu from .collection import Collection from .author import Author, ANONYMOUS class Answer(BaseZhihu): """答案类,请使用``ZhihuClient.answer``方法构造对象.""" @class_common_init(re_ans_url) def __init__(self, url, question=None, author=None, upvote_num=None, content=None, session=None): """创建答案类实例. :param str url: 答案url :param Question question: 答案所在的问题对象,可选 :param Author author: 答案回答者对象,可选 :param int upvote_num: 答案赞同数量,可选 :param str content: 答案内容,可选 :param Session session: 使用的网络会话,为空则使用新会话 :return: 答案对象 :rtype: Answer """ self.url = url self._session = session self._question = question self._author = author self._upvote_num = upvote_num self._content = content self._deleted = None @property def id(self): """答案的id :return: 答案id :rtype: int """ return int(re.match(r'.*/(\d+)/$', self.url).group(1)) @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_aid') def aid(self): """获取答案的内部id,某些POST操作需要此参数 :return: 答案内部id :rtype: str """ return int(self.soup.find('div', class_='zm-item-answer')['data-aid']) @property @check_soup('_html') def html(self): """获取网页源码 :return: 网页源码 :rtype: str """ return self.soup.prettify() @property @check_soup('_author') def author(self): """获取答案作者. :return: 答案作者 :rtype: Author """ from .author import Author author = self.soup.find('div', class_='zm-item-answer-author-info') url, name, motto, photo = parser_author_from_tag(author) if name == '匿名用户': return ANONYMOUS else: return Author(url, name, motto, photo_url=photo, session=self._session) @property @check_soup('_question') def question(self): """获取答案所在问题. :return: 答案所在问题 :rtype: Question """ from .question import Question question_link = self.soup.find( "h2", class_="zm-item-title zm-editable-content").a url = Zhihu_URL + question_link["href"] title = question_link.text followers_num = int(self.soup.find( 'div', class_='zh-question-followers-sidebar').div.a.strong.text) answers_num = int(re_get_number.match(self.soup.find( 'div', class_='zh-answers-title').h3.a.text).group(1)) return Question(url, title, followers_num, answers_num, session=self._session) @property @check_soup('_upvote_num') def upvote_num(self): """获取答案赞同数量. :return: 答案赞同数量 :rtype: int """ return int(self.soup.find( 'div', class_='zm-item-vote-info')['data-votecount']) @property def upvoters(self): """获取答案点赞用户,返回生成器. :return: 点赞用户 :rtype: Author.Iterable """ self._make_soup() next_req = '/answer/' + str(self.aid) + '/voters_profile' while next_req != '': data = self._session.get(Zhihu_URL + next_req).json() next_req = data['paging']['next'] for html in data['payload']: soup = BeautifulSoup(html) yield self._parse_author_soup(soup) @property @check_soup('_content') def content(self): """以处理过的Html代码形式返回答案内容. :return: 答案内容 :rtype: str """ answer_wrap = self.soup.find('div', id='zh-question-answer-wrap') content = answer_wrap.find('div', class_='zm-editable-content') content = answer_content_process(content) return content @property @check_soup('_creation_time') def creation_time(self): """获取答案创建时间 :return: 答案创建时间 :rtype: datetime.datetime """ return datetime.fromtimestamp(int(self.soup.find( 'div', class_='zm-item-answer')['data-created'])) @property @check_soup('_collect_num') def collect_num(self): """获取答案收藏数 :return: 答案收藏数量 :rtype: int """ element = self.soup.find("a", { "data-za-a": "click_answer_collected_count" }) if element is None: return 0 else: return int(element.get_text()) @property def collections(self): """获取包含该答案的收藏夹 :return: 包含该答案的收藏夹 :rtype: Collection.Iterable collect_num 未必等于 len(collections),比如: https://www.zhihu.com/question/20064699/answer/13855720 显示被收藏 38 次,但只有 30 个收藏夹 """ import time gotten_feed_num = 20 offset = 0 data = { 'method':'next', '_xsrf': self.xsrf } while gotten_feed_num >= 10: data['params'] = "{\"answer_url\": %d,\"offset\": %d}" % (self.id, offset) res = self._session.post(url=Get_Collection_Url, data=data) gotten_feed_num = len(res.json()['msg']) offset += gotten_feed_num soup = BeautifulSoup(''.join(res.json()['msg'])) for zm_item in soup.find_all('div', class_='zm-item'): url = Zhihu_URL + zm_item.h2.a['href'] name = zm_item.h2.a.text links = zm_item.div.find_all('a') owner = Author(links[0]['href'], session=self._session) follower_num = int(links[1].text.split()[0]) yield Collection(url, owner=owner, name=name, follower_num=follower_num, session=self._session) time.sleep(0.2) # prevent from posting too quickly def save(self, filepath=None, filename=None, mode="html"): """保存答案为Html文档或markdown文档. :param str filepath: 要保存的文件所在的目录, 不填为当前目录下以问题标题命名的目录, 设为"."则为当前目录。 :param str filename: 要保存的文件名, 不填则默认为 所在问题标题 - 答主名.html/md。 如果文件已存在,自动在后面加上数字区分。 **自定义文件名时请不要输入后缀 .html 或 .md。** :param str mode: 保存类型,可选 `html` 、 `markdown` 、 `md` 。 :return: 无 :rtype: None """ if mode not in ["html", "md", "markdown"]: raise ValueError("`mode` must be 'html', 'markdown' or 'md'," " got {0}".format(mode)) file = get_path(filepath, filename, mode, self.question.title, self.question.title + '-' + self.author.name) with open(file, 'wb') as f: if mode == "html": f.write(self.content.encode('utf-8')) else: import html2text h2t = html2text.HTML2Text() h2t.body_width = 0 f.write(h2t.handle(self.content).encode('utf-8')) def _parse_author_soup(self, soup): from .author import Author, ANONYMOUS author_tag = soup.find('div', class_='body') if author_tag.string is None: author_name = author_tag.div.a['title'] author_url = author_tag.div.a['href'] author_motto = author_tag.div.span.text photo_url = PROTOCOL + soup.a.img['src'].replace('_m', '_r') numbers_tag = soup.find_all('li') numbers = [int(re_get_number.match(x.get_text()).group(1)) for x in numbers_tag] # noinspection PyTypeChecker return Author(author_url, author_name, author_motto, None, numbers[2], numbers[3], numbers[0], numbers[1], photo_url, session=self._session) else: return ANONYMOUS @property @check_soup('_comment_num') def comment_num(self): """ :return: 答案下评论的数量 :rtype: int """ comment_num_string = self.soup.find('a', class_=' meta-item toggle-comment').text number = comment_num_string.split()[0] return int(number) if number.isdigit() else 0 @property def comments(self): """获取答案下的所有评论. :return: 答案下的所有评论,返回生成器 :rtype: Comments.Iterable """ import math from .author import Author, ANONYMOUS from .comment import Comment api_url = Get_Answer_Comment_URL.format(self.aid) page = pages = 1 while page <= pages: res = self._session.get(api_url + '?page=' + str(page)) if page == 1: total = int(res.json()['paging']['totalCount']) if total == 0: return pages = math.ceil(total / 30) page += 1 comment_items = res.json()['data'] for comment_item in comment_items: comment_id = comment_item['id'] content = comment_item['content'] upvote_num = comment_item['likesCount'] time_string = comment_item['createdTime'][:19] time = datetime.strptime(time_string, "%Y-%m-%dT%H:%M:%S") if comment_item['author'].get('url') != None: a_url = comment_item['author']['url'] a_name = comment_item['author']['name'] photo_url_tmp = comment_item['author']['avatar']['template'] photo_url_id = comment_item['author']['avatar']['id'] a_photo_url = photo_url_tmp.replace( '{id}', photo_url_id).replace('_{size}', '') author_obj = Author(a_url, a_name, photo_url=a_photo_url, session=self._session) else: author_obj = ANONYMOUS yield Comment(comment_id, self, author_obj, upvote_num, content, time) @property def latest_comments(self): """获取答案下的所有评论。较新的评论先返回。 使用该方法比 ``reversed(list(answer.comments))`` 效率高 因为现在靠后的热门评论会被挪到前面,所以返回的评论未必严格满足时间先后关系 :return: 答案下的所有评论,返回生成器 :rtype: Comments.Iterable """ import math from .author import Author, ANONYMOUS from .comment import Comment if self.comment_num == 0: return pages = math.ceil(self.comment_num / 30) api_url = Get_Answer_Comment_URL.format(self.aid) for page in range(pages, 0, -1): res = self._session.get(api_url + '?page=' + str(page)) comment_items = res.json()['data'] for comment_item in reversed(comment_items): comment_id = comment_item['id'] content = comment_item['content'] upvote_num = comment_item['likesCount'] time_string = comment_item['createdTime'][:19] time = datetime.strptime(time_string, "%Y-%m-%dT%H:%M:%S") if comment_item['author'].get('url') != None: a_url = comment_item['author']['url'] a_name = comment_item['author']['name'] photo_url_tmp = comment_item['author']['avatar']['template'] photo_url_id = comment_item['author']['avatar']['id'] a_photo_url = photo_url_tmp.replace( '{id}', photo_url_id).replace('_{size}', '') author_obj = Author(a_url, a_name, photo_url=a_photo_url, session=self._session) else: author_obj = ANONYMOUS yield Comment(comment_id, self, author_obj, upvote_num, content, time) def refresh(self): """刷新 Answer object 的属性. 例如赞同数增加了, 先调用 ``refresh()`` 再访问 upvote_num属性, 可获得更新后的赞同数. :return: None """ super().refresh() self._html = None self._upvote_num = None self._content = None self._collect_num = None self._comment_num = None @property @check_soup('_deleted') def deleted(self): """答案是否被删除, 被删除了返回 True, 为被删除返回 False :return: True or False """ return self._deleted PK+\HĈajDjDzhihu/topic.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from datetime import datetime from .common import * from .base import BaseZhihu class Topic(BaseZhihu): """答案类,请使用``ZhihuClient.topic``方法构造对象.""" @class_common_init(re_topic_url) def __init__(self, url, name=None, session=None): """创建话题类实例. :param url: 话题url :param name: 话题名称,可选 :return: Topic """ self.url = url self._session = session self._name = name self._id = int(re_topic_url.match(self.url).group(1)) @property def id(self): """获取话题Id(网址最后那串数字) :return: 话题Id :rtype: int """ return self._id @property @check_soup('_xsrf') def xsrf(self): """获取知乎的反xsrf参数(用不到就忽视吧~) :return: xsrf参数 :rtype: str """ return self.soup.find('input', attrs={'name': '_xsrf'})['value'] @property @check_soup('_tid') def tid(self): """话题内部Id,有时候要用到 :return: 话题内部Id :rtype: int """ return int(self.soup.find( 'div', id='zh-topic-desc')['data-resourceid']) @property @check_soup('_name') def name(self): """获取话题名称. :return: 话题名称 :rtype: str """ return self.soup.find('h1').text @property def parents(self): """获取此话题的父话题。 注意:由于没找到有很多父话题的话题来测试, 所以本方法可能再某些时候出现问题,请不吝反馈。 :return: 此话题的父话题,返回生成器 :rtype: Topic.Iterable """ self._make_soup() parent_topic_tag = self.soup.find('div', class_='parent-topic') if parent_topic_tag is None: yield [] else: for topic_tag in parent_topic_tag.find_all('a'): yield Topic(Zhihu_URL + topic_tag['href'], topic_tag.text.strip(), session=self._session) @property def children(self): """获取此话题的子话题 :return: 此话题的子话题, 返回生成器 :rtype: Topic.Iterable """ self._make_soup() child_topic_tag = self.soup.find('div', class_='child-topic') if child_topic_tag is None: return [] elif '共有' not in child_topic_tag.contents[-2].text: for topic_tag in child_topic_tag.div.find_all('a'): yield Topic(Zhihu_URL + topic_tag['href'], topic_tag.text.strip(), session=self._session) else: flag = 'load' child = '' data = {'_xsrf': self.xsrf} params = { 'parent': self.id } while flag == 'load': params['child'] = child res = self._session.post(Topic_Get_Children_Url, params=params, data=data) j = map(lambda x: x[0], res.json()['msg'][1]) *topics, last = j for topic in topics: yield Topic(Zhihu_URL + '/topic/' + topic[2], topic[1], session=self._session) flag = last[0] child = last[2] if flag == 'topic': yield Topic(Zhihu_URL + '/topic/' + last[2], last[1], session=self._session) @property @check_soup('_follower_num') def follower_num(self): """获取话题关注人数. :return: 关注人数 :rtype: int """ follower_num_block = self.soup.find( 'div', class_='zm-topic-side-followers-info') # 无人关注时 找不到对应block,直接返回0 (感谢知乎用户 段晓晨 提出此问题) if follower_num_block.strong is None: return 0 return int(follower_num_block.strong.text) @property def followers(self): """获取话题关注者 :return: 话题关注者,返回生成器 :rtype: Author.Iterable """ from .author import Author, ANONYMOUS self._make_soup() gotten_data_num = 20 data = { '_xsrf': self.xsrf, 'start': '', 'offset': 0 } while gotten_data_num == 20: res = self._session.post( Topic_Get_More_Follower_Url.format(self.id), data=data) j = res.json()['msg'] gotten_data_num = j[0] data['offset'] += gotten_data_num soup = BeautifulSoup(j[1]) divs = soup.find_all('div', class_='zm-person-item') for div in divs: h2 = div.h2 url = Zhihu_URL + h2.a['href'] name = h2.a.text motto = h2.next_element.text try: yield Author(url, name, motto, session=self._session) except ValueError: # invalid url yield ANONYMOUS data['start'] = int(re_get_number.match(divs[-1]['id']).group(1)) @property @check_soup('_photo_url') def photo_url(self): """获取话题头像图片地址. :return: 话题头像url :rtype: str """ img = self.soup.find('a', id='zh-avartar-edit-form').img['src'] return img.replace('_m', '_r') @property @check_soup('_description') def description(self): """获取话题描述信息. :return: 话题描述信息 :rtype: str """ desc = self.soup.find('div', class_='zm-editable-content').text return desc @property def top_authors(self): """获取最佳回答者 :return: 此话题下最佳回答者,一般来说是5个,要不就没有,返回生成器 :rtype: Author.Iterable """ from .author import Author, ANONYMOUS self._make_soup() t = self.soup.find('div', id='zh-topic-top-answerer') if t is None: return for d in t.find_all('div', class_='zm-topic-side-person-item-content'): url = Zhihu_URL + d.a['href'] name = d.a.text motto = d.div['title'] try: yield Author(url, name, motto, session=self._session) except ValueError: # invalid url yield ANONYMOUS @property def top_answers(self): """获取话题下的精华答案. :return: 话题下的精华答案,返回生成器. :rtype: Answer.Iterable """ from .question import Question from .answer import Answer from .author import Author, ANONYMOUS top_answers_url = Topic_Top_Answers_Url.format(self.id) params = {'page': 1} while True: # 超出50页直接返回 if params['page'] > 50: return res = self._session.get(top_answers_url, params=params) params['page'] += 1 soup = BeautifulSoup(res.content) # 不够50页,来到错误页面 返回 if soup.find('div', class_='error') is not None: return questions = soup.find_all('a', class_='question_link') answers = soup.find_all('a', class_='answer-date-link') authors = soup.find_all('div', class_='zm-item-answer-author-info') upvotes = soup.find_all('a', class_='zm-item-vote-count') for ans, up, q, au in zip(answers, upvotes, questions, authors): answer_url = Zhihu_URL + ans['href'] question_url = Zhihu_URL + q['href'] question_title = q.text upvote = int(up['data-votecount']) question = Question(question_url, question_title, session=self._session) if au.a is None: author = ANONYMOUS else: author_url = Zhihu_URL + au.a['href'] author_name = au.a.text author_motto = au.strong['title'] if au.strong else '' author = Author(author_url, author_name, author_motto, session=self._session) yield Answer(answer_url, question, author, upvote, session=self._session) @property def questions(self): """获取话题下的所有问题(按时间降序排列) :return: 话题下所有问题,返回生成器 :rtype: Question.Iterable """ from .question import Question question_url = Topic_Questions_Url.format(self.id) params = {'page': 1} older_time_stamp = int(time.time()) * 1000 while True: res = self._session.get(question_url, params=params) soup = BeautifulSoup(res.content) if soup.find('div', class_='error') is not None: return questions = soup.find_all('div', class_='question-item') questions = list(filter( lambda x: int(x.h2.span['data-timestamp']) < older_time_stamp, questions)) for qu_div in questions: url = Zhihu_URL + qu_div.h2.a['href'] title = qu_div.h2.a.text creation_time = datetime.fromtimestamp( int(qu_div.h2.span['data-timestamp']) // 1000) yield Question(url, title, creation_time=creation_time, session=self._session) older_time_stamp = int(questions[-1].h2.span['data-timestamp']) params['page'] += 1 @property def unanswered_questions(self): """获取话题下的等待回答的问题 什么是「等待回答」的问题:https://www.zhihu.com/question/40470324 :return: 话题下等待回答的问题,返回生成器 :rtype: Question.Iterable """ from .question import Question question_url = Topic_Unanswered_Question_Url.format(self.id) params = {'page': 1} while True: res = self._session.get(question_url, params=params) soup = BeautifulSoup(res.content) if soup.find('div', class_='error') is not None: return questions = soup.find_all('div', class_='question-item') for qu_div in questions: url = Zhihu_URL + qu_div.h2.a['href'] title = qu_div.h2.a.text yield Question(url, title, session=self._session) params['page'] += 1 @property def answers(self): """获取话题下所有答案(按时间降序排列) :return: 话题下所有答案,返回生成器 :rtype: Answer.Iterable """ from .question import Question from .answer import Answer from .author import Author, ANONYMOUS newest_url = Topic_Newest_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(newest_url) soup = BeautifulSoup(res.content) while True: divs = soup.find_all('div', class_='folding') # 如果话题下无答案,则直接返回 if len(divs) == 0: return last_score = divs[-1]['data-score'] for div in divs: q = div.find('a', class_="question_link") question_url = Zhihu_URL + q['href'] question_title = q.text question = Question(question_url, question_title, session=self._session) ans = div.find('a', class_='answer-date-link') answer_url = Zhihu_URL + ans['href'] up = div.find('a', class_='zm-item-vote-count') upvote = int(up['data-votecount']) au = div.find('div', class_='zm-item-answer-author-info') if au.a is None: author = ANONYMOUS else: author_url = Zhihu_URL + au.a['href'] author_name = au.a.text author_motto = au.strong['title'] if au.strong else '' author = Author(author_url, author_name, author_motto, session=self._session) yield Answer(answer_url, question, author, upvote, session=self._session) params['offset'] = last_score res = self._session.post(newest_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到内容数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @property def hot_questions(self): """获取话题下热门的问题 :return: 话题下的热门动态中的问题,按热门度顺序返回生成器 :rtype: Question.Iterable """ from .question import Question hot_questions_url = Topic_Hot_Questions_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(hot_questions_url) soup = BeautifulSoup(res.content) while True: questions_duplicate = soup.find_all('a', class_='question_link') # 如果话题下无问题,则直接返回 if len(questions_duplicate) == 0: return # 去除重复的问题 questions = list(set(questions_duplicate)) questions.sort(key=self._get_score, reverse=True) last_score = soup.find_all( 'div', class_='feed-item')[-1]['data-score'] for q in questions: question_url = Zhihu_URL + q['href'] question_title = q.text question = Question(question_url, question_title, session=self._session) yield question params['offset'] = last_score res = self._session.post(hot_questions_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到问题数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @property def hot_answers(self): """获取话题下热门的回答 :return: 话题下的热门动态中的回答,按热门度顺序返回生成器 :rtype: Question.Iterable """ from .question import Question from .author import Author from .answer import Answer hot_questions_url = Topic_Hot_Questions_Url.format(self.id) params = {'start': 0, '_xsrf': self.xsrf} res = self._session.get(hot_questions_url) soup = BeautifulSoup(res.content) while True: answers_div = soup.find_all('div', class_='feed-item') last_score = answers_div[-1]['data-score'] for div in answers_div: # 没有 text area 的情况是:答案被和谐。 if not div.textarea: continue question_url = Zhihu_URL + div.h2.a['href'] question_title = div.h2.a.text question = Question(question_url, question_title, session=self._session) author_link = div.find('a', class_='author-link') if not author_link: author_url = None author_name = '匿名用户' author_motto = '' else: author_url = Zhihu_URL + author_link['href'] author_name = author_link.text author_motto_span = div.find('span', class_='bio') author_motto = author_motto_span['title'] \ if author_motto_span else '' author = Author(author_url, author_name, author_motto, session=self._session) body = div.find('div', class_='entry-body') answer_url = question_url + "/answer/" + body['data-atoken'] upvote_num = int(div.find( 'a', class_='zm-item-vote-count')['data-votecount']) yield Answer(answer_url, question, author, upvote_num, session=self._session) params['offset'] = last_score res = self._session.post(hot_questions_url, data=params) gotten_feed_num = res.json()['msg'][0] # 如果得到问题数量为0则返回 if gotten_feed_num == 0: return soup = BeautifulSoup(res.json()['msg'][1]) @staticmethod def _get_score(tag): h2 = tag.parent div = h2.parent try: _ = h2['class'] return div['data-score'] except KeyError: return div.parent.parent['data-score'] PKaSXHI400 zhihu/base.pyfrom .common import BeautifulSoup from requests import Response import json class BaseZhihu: def _gen_soup(self, content): self.soup = BeautifulSoup(content) def _get_content(self): resp = self._session.get(self.url[:-1]) if self.__class__.__name__ == 'Answer': if 'answer' in resp.url: self._deleted = False else: self._deleted = True return resp.content def _make_soup(self): if self.url and not self.soup: self._gen_soup(self._get_content()) def refresh(self): # refresh self.soup's content self._gen_soup(self._get_content()) @classmethod def from_html(cls, content): obj = cls(url=None) obj._gen_soup(content) return obj class JsonAsSoupMixin: def _gen_soup(self, content): # 为了让`from_html`对外提供统一的接口, 判断一下输入, 如果是bytes 或者 str 则用json处理, # 否则认为是由_get_content返回的dict if isinstance(content, bytes): r = Response() r._content = content soup = r.json() self.soup = soup elif isinstance(content, str): self.soup = json.loads(content) else: self.soup = content PKy;QHv?WWzhihu/client.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time import json import requests import importlib from .common import * class ZhihuClient: """知乎客户端类,内部维护了自己专用的网络会话,可用cookies或账号密码登录.""" def __init__(self, cookies=None): """创建客户端类实例. :param str cookies: 见 :meth:`.login_with_cookies` 中 ``cookies`` 参数 :return: 知乎客户端对象 :rtype: ZhihuClient """ self._session = requests.Session() self._session.headers.update(Default_Header) if cookies is not None: assert isinstance(cookies, str) self.login_with_cookies(cookies) # ===== login staff ===== @staticmethod def _get_captcha_url(): return Captcha_URL_Prefix + str(int(time.time() * 1000)) def get_captcha(self): """获取验证码数据。 :return: 验证码图片数据。 :rtype: bytes """ # some unbelievable zhihu logic self._session.get(Zhihu_URL) data = {'email': '', 'password': '', 'remember_me': 'true'} self._session.post(Login_URL, data=data) r = self._session.get(self._get_captcha_url()) return r.content def login(self, email, password, captcha): """登陆知乎. :param str email: 邮箱 :param str password: 密码 :param str captcha: 验证码 :return: ======== ======== ============== ==================== 元素序号 元素类型 意义 说明 ======== ======== ============== ==================== 0 int 是否成功 0为成功,1为失败 1 str 失败原因 登录成功则为空字符串 2 str cookies字符串 登录失败则为空字符串 ======== ======== ============== ==================== :rtype: (int, str, str) """ data = {'email': email, 'password': password, 'remember_me': 'true', 'captcha': captcha} r = self._session.post(Login_URL, data=data) j = r.json() code = int(j['r']) message = j['msg'] cookies_str = json.dumps(self._session.cookies.get_dict()) \ if code == 0 else '' return code, message, cookies_str def login_with_cookies(self, cookies): """使用cookies文件或字符串登录知乎 :param str cookies: ============== =========================== 参数形式 作用 ============== =========================== 文件名 将文件内容作为cookies字符串 cookies字符串 直接提供cookies字符串 ============== =========================== :return: 无 :rtype: None """ if os.path.isfile(cookies): with open(cookies) as f: cookies = f.read() cookies_dict = json.loads(cookies) self._session.cookies.update(cookies_dict) def login_in_terminal(self): """不使用cookies,在终端中根据提示登陆知乎 :return: 如果成功返回cookies字符串 :rtype: str """ print('====== zhihu login =====') email = input('email: ') password = input('password: ') captcha_data = self.get_captcha() with open('captcha.gif', 'wb') as f: f.write(captcha_data) print('please check captcha.gif for captcha') captcha = input('captcha: ') os.remove('captcha.gif') print('====== logging.... =====') code, msg, cookies = self.login(email, password, captcha) if code == 0: print('login successfully') else: print('login failed, reason: {0}'.format(msg)) return cookies def create_cookies(self, file): cookies_str = self.login_in_terminal() if cookies_str: with open(file, 'w') as f: f.write(cookies_str) print('cookies file created.') else: print('can\'t create cookies.') # ===== network staff ===== def set_proxy(self, proxy): """设置代理 :param str proxy: 使用 "http://example.com:port" 的形式 :return: 无 :rtype: None :说明: 由于一个 :class:`.ZhihuClient` 对象和它创建出来的其他知乎对象共用 一个Session,所以调用这个方法也会将所有生成出的知乎类设置上代理。 """ self._session.proxies.update({'http': proxy}) # ===== getter staff ====== def me(self): """获取使用特定cookies的Me实例 :return: cookies对应的Me对象 :rtype: Me """ from .me import Me headers = dict(Default_Header) headers['Host'] = 'zhuanlan.zhihu.com' res = self._session.get(Get_Me_Info_Url, headers=headers) json_data = res.json() url = json_data['profileUrl'] name = json_data['name'] motto = json_data['bio'] photo = json_data['avatar']['template'].format( id=json_data['avatar']['id'], size='r') return Me(url, name, motto, photo, session=self._session) def __getattr__(self, item: str): """本函数用于获取各种类,如 `Answer` `Question` 等. :支持的形式有: 1. client.answer() 2. client.author() 3. client.collection() 4. client.column() 5. client.post() 6. client.question() 7. client.topic() 参数均为对应页面的url,返回对应的类的实例。 """ def getter(url): return getattr(module, item.capitalize())(url, session=self._session) attr_list = ['answer', 'author', 'collection', 'column', 'post', 'question', 'topic'] if item.lower() in attr_list: module = importlib.import_module('.'+item.lower(), 'zhihu') return getter PK+\H=2) ( (zhihu/common.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import functools import re import os from requests import Session from bs4 import BeautifulSoup as _Bs from bs4 import Tag, NavigableString from requests.packages.urllib3.util import Retry try: __import__('lxml') BeautifulSoup = lambda makeup: _Bs(makeup, 'lxml') except ImportError: BeautifulSoup = lambda makeup: _Bs(makeup, 'html.parser') Default_Header = {'X-Requested-With': 'XMLHttpRequest', 'Referer': 'http://www.zhihu.com', 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; ' 'rv:39.0) Gecko/20100101 Firefox/39.0', 'Host': 'www.zhihu.com'} Zhihu_URL = 'https://www.zhihu.com' Login_URL = Zhihu_URL + '/login/email' Captcha_URL_Prefix = Zhihu_URL + '/captcha.gif?r=' Get_Profile_Card_URL = Zhihu_URL + '/node/MemberProfileCardV2' Question_Get_More_Answer_URL = Zhihu_URL + '/node/QuestionAnswerListV2' Answer_Add_Comment_URL = Zhihu_URL + '/node/AnswerCommentAddV2' Answer_Comment_Box_URL = Zhihu_URL + '/node/AnswerCommentBoxV2' Get_Answer_Comment_URL = Zhihu_URL + '/r/answers/{0}/comments' Author_Get_More_Followers_URL = Zhihu_URL + '/node/ProfileFollowersListV2' Author_Get_More_Followees_URL = Zhihu_URL + '/node/ProfileFolloweesListV2' Author_Get_More_Follow_Column_URL = Zhihu_URL + \ '/node/ProfileFollowedColumnsListV2' Author_Get_More_Follow_Topic_URL = Zhihu_URL + \ '/people/{0}/topics' PROTOCOL = '' Column_Url = 'http://zhuanlan.zhihu.com' Column_API = Column_Url + '/api/columns' Column_Data = Column_API + '/{0}' Column_Posts_Data = Column_API + '/{0}/posts?limit=10&offset={1}' Column_Post_Data = Column_API + '/{0}/posts/{1}' Post_Get_Upvoter = Column_API + '/{0}/posts/{1}/likers' Topic_Url = Zhihu_URL + '/topic' Topic_Get_Children_Url = Topic_Url + '/{0}/organize/entire' Topic_Get_More_Follower_Url = Topic_Url + '/{0}/followers' Topic_Questions_Url = Topic_Url + '/{0}/questions' Topic_Unanswered_Question_Url = Topic_Url + '/{0}/unanswered' Topic_Top_Answers_Url = Topic_Url + '/{0}/top-answers' Topic_Hot_Questions_Url = Topic_Url + '/{0}/hot' Topic_Newest_Url = Topic_Url + '/{0}/newest' Get_Me_Info_Url = Column_Url + '/api/me' Upvote_Answer_Url = Zhihu_URL + '/node/AnswerVoteBarV2' Upvote_Article_Url = Column_API + '/{0}/posts/{1}/rating' Follow_Author_Url = Zhihu_URL + '/node/MemberFollowBaseV2' Follow_Question_Url = Zhihu_URL + '/node/QuestionFollowBaseV2' Follow_Topic_Url = Zhihu_URL + '/node/TopicFollowBaseV2' Follow_Collection_Url = Zhihu_URL + '/collection/follow' Unfollow_Collection_Url = Zhihu_URL + '/collection/unfollow' Thanks_Url = Zhihu_URL + '/answer/thanks' Cancel_Thanks_Url = Zhihu_URL + '/answer/cancel_thanks' Send_Message_Url = Zhihu_URL + '/inbox/post' Unhelpful_Url = Zhihu_URL + '/answer/not_helpful' Cancel_Unhelpful_Url = Zhihu_URL + '/answer/helpful' Get_Collection_Url = Zhihu_URL + '/node/AnswerFavlists' re_question_url = re.compile( r'^https?://www\.zhihu\.com/question/\d+(\?sort=created|/?)$') re_question_url_std = re.compile(r'^https?://www\.zhihu\.com/question/\d+/?') re_ans_url = re.compile( r'^https?://www\.zhihu\.com/question/\d+/answer/\d+/?$') re_author_url = re.compile(r'^https?://www\.zhihu\.com/people/[^/]+/?$') re_collection_url = re.compile(r'^https?://www\.zhihu\.com/collection/\d+/?$') re_column_url = re.compile(r'^http://zhuanlan\.zhihu\.com/([^/]+)/?$') re_post_url = re.compile(r'^http://zhuanlan\.zhihu\.com/([^/]+)/(\d+)/?$') re_topic_url = re.compile(r'^https?://www\.zhihu\.com/topic/(\d+)/?$') re_a2q = re.compile(r'(.*)/a.*') re_collection_url_split = re.compile(r'.*(/c.*)') re_get_number = re.compile(r'[^\d]*(\d+).*') re_del_empty_line = re.compile(r'\n*(.*)\n*') def check_soup(attr, soup_type='_make_soup'): def real(func): @functools.wraps(func) def wrapper(self): # noinspection PyTypeChecker value = getattr(self, attr, None) if value is None: if soup_type == '_make_soup': getattr(self, soup_type)() elif self.soup is None: getattr(self, soup_type)() value = func(self) setattr(self, attr, value) return value return wrapper return real def class_common_init(url_re, allowed_none=True, trailing_slash=True): def real(func): @functools.wraps(func) def wrapper(self, url, *args, **kwargs): if url is None and not allowed_none: raise ValueError('Invalid Url: ' + url) if url is not None: if url_re.match(url) is None: raise ValueError('Invalid URL: ' + url) if not url.endswith('/') and trailing_slash: url += '/' if 'session' not in kwargs.keys() or kwargs['session'] is None: kwargs['session'] = Session() kwargs['session'].mount('https://', Retry(5)) kwargs['session'].mount('http://', Retry(5)) self.soup = None return func(self, url, *args, **kwargs) return wrapper return real def remove_invalid_char(text): """去除字符串中的无效字符,一般用于保存文件时保证文件名的有效性. :param str text: 待处理的字符串 :return: 处理后的字符串 :rtype: str """ invalid_char_list = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\n'] res = '' for char in text: if char not in invalid_char_list: res += char return res def parser_author_from_tag(author): author_link = author.find('a', class_='author-link') if author_link is None: return None, '匿名用户', '', '' else: author_name = author_link.text motto_span = author.find('span', class_='bio') author_motto = motto_span['title'] \ if motto_span is not None else '' author_url = Zhihu_URL + author_link['href'] avatar_link = author.find('a', class_='avatar-link') photo_url = PROTOCOL + avatar_link.img['src'].replace('_s', '_r') return author_url, author_name, author_motto, photo_url def parser_author_from_comment(author): author_avatar = author.find('a', class_='zm-item-link-avatar') if author_avatar is None: return None, '匿名用户', '' else: author_link = author.find('a', class_='zg-link') author_name = author_link.text author_url = author_link['href'] avatar_link = author.find('img', class_='zm-item-img-avatar') photo_url = PROTOCOL + avatar_link['src'].replace('_s', '_r') return author_url, author_name, photo_url def answer_content_process(content): content = clone_bs4_elem(content) del content['class'] soup = BeautifulSoup( '') soup.body.append(content) no_script_list = soup.find_all("noscript") for no_script in no_script_list: no_script.extract() img_list = soup.find_all( "img", class_=["origin_image", "content_image"]) for img in img_list: if "content_image" in img['class']: img['data-original'] = img['data-actualsrc'] new_img = soup.new_tag('img', src=PROTOCOL + img['data-original']) img.replace_with(new_img) if img.next_sibling is None: new_img.insert_after(soup.new_tag('br')) useless_list = soup.find_all("i", class_="icon-external") for useless in useless_list: useless.extract() return soup.prettify() def get_path(path, filename, mode, default_path, default_name): if path is None: path = os.path.join( os.getcwd(), remove_invalid_char(default_path)) if filename is None: filename = remove_invalid_char(default_name) if os.path.isdir(path) is False: os.makedirs(path) temp = filename i = 0 while os.path.isfile(os.path.join(path, temp) + '.' + mode): i += 1 temp = filename + str(i) return os.path.join(path, temp) + '.' + mode def common_follower(url, xsrf, session): from .author import Author, ANONYMOUS headers = dict(Default_Header) headers['Referer'] = url data = {'offset': 0, '_xsrf': xsrf} gotten_data_num = 20 offset = 0 while gotten_data_num == 20: data['offset'] = offset res = session.post(url, data=data, headers=headers) json_data = res.json()['msg'] gotten_data_num = json_data[0] offset += gotten_data_num soup = BeautifulSoup(json_data[1]) follower_divs = soup.find_all('div', class_='zm-profile-card') for div in follower_divs: if div.a is not None: author_name = div.a['title'] author_url = Zhihu_URL + div.a['href'] author_motto = div.find('div', class_='zg-big-gray').text author_photo = PROTOCOL + div.img['src'].replace('_m', '_r') numbers = [re_get_number.match(a.text).group(1) for a in div.find_all('a', target='_blank')] try: yield Author(author_url, author_name, author_motto, *numbers, photo_url=author_photo, session=session) except ValueError: # invalid url yield ANONYMOUS else: yield ANONYMOUS def clone_bs4_elem(el): """Clone a bs4 tag before modifying it. Code from `http://stackoverflow.com/questions/23057631/clone-element-with -beautifulsoup` """ if isinstance(el, NavigableString): return type(el)(el) copy = Tag(None, el.builder, el.name, el.namespace, el.nsprefix) # work around bug where there is no builder set # https://bugs.launchpad.net/beautifulsoup/+bug/1307471 copy.attrs = dict(el.attrs) for attr in ('can_be_empty_element', 'hidden'): setattr(copy, attr, getattr(el, attr)) for child in el.contents: copy.append(clone_bs4_elem(child)) return copy PKy;QH;3&& zhihu/me.py#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json from .common import * from .author import Author class Me(Author): """封装了相关操作(如点赞,关注问题)的类。 请使用 :meth:`.ZhihuClient.me` 方法获取实例。 """ def __init__(self, url, name, motto, photo_url, session): super(Me, self).__init__(url, name, motto, photo_url=photo_url, session=session) def vote(self, something, vote='up'): """给答案或文章点赞或取消点赞 :param Answer/Post something: 需要点赞的答案或文章对象 :param str vote: ===== ================ ====== 取值 说明 默认值 ===== ================ ====== up 赞同 √ down 反对 X clear 既不赞同也不反对 X ===== ================ ====== :return: 成功返回True,失败返回False :rtype: bool """ from .answer import Answer from zhihu import Post if isinstance(something, Answer): mapping = { 'up': 'vote_up', 'clear': 'vote_neutral', 'down': 'vote_down' } if vote not in mapping.keys(): raise ValueError('Invalid vote value: {0}'.format(vote)) if something.author.url == self.url: return False params = {'answer_id': str(something.aid)} data = { '_xsrf': something.xsrf, 'method': mapping[vote], 'params': json.dumps(params) } headers = dict(Default_Header) headers['Referer'] = something.question.url[:-1] res = self._session.post(Upvote_Answer_Url, headers=headers, data=data) return res.json()['r'] == 0 elif isinstance(something, Post): mapping = { 'up': 'like', 'clear': 'none', 'down': 'dislike' } if vote not in mapping.keys(): raise ValueError('Invalid vote value: {0}'.format(vote)) if something.author.url == self.url: return False put_url = Upvote_Article_Url.format( something.column_in_name, something.slug) data = {'value': mapping[vote]} headers = { 'Content-Type': 'application/json;charset=utf-8', 'Host': 'zhuanlan.zhihu.com', 'Referer': something.url[:-1], 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; ' 'rv:39.0) Gecko/20100101 Firefox/39.0', 'X-XSRF-TOKEN': self._session.cookies.get('XSRF-TOKEN') } res = self._session.put(put_url, json.dumps(data), headers=headers) return res.status_code == 204 else: raise ValueError('argument something need to be ' 'zhihu.Answer or zhihu.Post object.') def thanks(self, answer, thanks=True): """感谢或取消感谢回答 :param Answer answer: 要感谢或取消感谢的回答 :param thanks: True-->感谢,False-->取消感谢 :return: 成功返回True,失败返回False :rtype: bool """ from .answer import Answer if isinstance(answer, Answer) is False: raise ValueError('argument answer need to be Zhihu.Answer object.') if answer.author.url == self.url: return False data = { '_xsrf': answer.xsrf, 'aid': answer.aid } res = self._session.post(Thanks_Url if thanks else Cancel_Thanks_Url, data=data) return res.json()['r'] == 0 def follow(self, something, follow=True): """关注用户、问题、话题或收藏夹 :param Author/Question/Topic something: 需要关注的对象 :param bool follow: True-->关注,False-->取消关注 :return: 成功返回True,失败返回False :rtype: bool """ from .question import Question from .topic import Topic from .collection import Collection if isinstance(something, Author): if something.url == self.url: return False data = { '_xsrf': something.xsrf, 'method': ' follow_member' if follow else 'unfollow_member', 'params': json.dumps({'hash_id': something.hash_id}) } res = self._session.post(Follow_Author_Url, data=data) return res.json()['r'] == 0 elif isinstance(something, Question): data = { '_xsrf': something.xsrf, 'method': 'follow_question' if follow else 'unfollow_question', 'params': json.dumps({'question_id': str(something.qid)}) } res = self._session.post(Follow_Question_Url, data=data) return res.json()['r'] == 0 elif isinstance(something, Topic): data = { '_xsrf': something.xsrf, 'method': 'follow_topic' if follow else 'unfollow_topic', 'params': json.dumps({'topic_id': something.tid}) } res = self._session.post(Follow_Topic_Url, data=data) return res.json()['r'] == 0 elif isinstance(something, Collection): data = { '_xsrf': something.xsrf, 'favlist_id': something.cid } res = self._session.post( Follow_Collection_Url if follow else Unfollow_Collection_Url, data=data) return res.json()['r'] == 0 else: raise ValueError('argument something need to be ' 'zhihu.Author, zhihu.Question' ', Zhihu.Topic or Zhihu.Collection object.') def add_comment(self, answer, content): """给指定答案添加评论 :param Answer answer: 答案对象 :param string content: 评论内容 :return: 成功返回 True,失败返回 False :rtype: bool """ from .answer import Answer if isinstance(answer, Answer) is False: raise ValueError('argument answer need to be Zhihu.Answer object.') if not content: raise ValueError('answer content cannot be empty') data = { 'method': 'add_comment', 'params': json.dumps({'answer_id': answer.aid, 'content': content}), '_xsrf': answer.xsrf } res = self._session.post(Answer_Add_Comment_URL, data=data) return res.json()['r'] == 0 def send_message(self, author, content): """发送私信给一个用户 :param Author author: 接收私信用户对象 :param string content: 发送给用户的私信内容 :return: 成功返回 True,失败返回 False :rtype: bool """ if isinstance(author, Author) is False: raise ValueError('argument answer need to be Zhihu.Author object.') if not content: raise ValueError('answer content cannot be empty') if author.url == self.url: return False data = { 'member_id': author.hash_id, 'content': content, 'token': '', '_xsrf': author.xsrf } res = self._session.post(Send_Message_Url, data=data) return res.json()['r'] == 0 def block(self, something, block=True): """屏蔽某个用户、话题 :param Author/Topic something: :param block: True-->屏蔽,False-->取消屏蔽 :return: 成功返回 True,失败返回 False :rtype: bool """ from .topic import Topic if isinstance(something, Author): if something.url == self.url: return False data = { '_xsrf': something.xsrf, 'action': 'add' if block else 'cancel', } block_author_url = something.url + 'block' res = self._session.post(block_author_url, data=data) return res.json()['r'] == 0 elif isinstance(something, Topic): tid = something.tid data = { '_xsrf': something.xsrf, 'method': 'add' if block else 'del', 'tid': tid, } block_topic_url = 'http://www.zhihu.com/topic/ignore' res = self._session.post(block_topic_url, data=data) return res.status_code == 200 else: raise ValueError('argument something need to be ' 'Zhihu.Author or Zhihu.Topic object.') def unhelpful(self, answer, unhelpful=True): """没有帮助或取消没有帮助回答 :param Answer answer: 要没有帮助或取消没有帮助回答 :param unhelpful: True-->没有帮助,False-->取消没有帮助 :return: 成功返回 True,失败返回 False :rtype: bool """ from .answer import Answer if isinstance(answer, Answer) is False: raise ValueError('argument answer need to be Zhihu.Answer object.') if answer.author.url == self.url: return False data = { '_xsrf': answer.xsrf, 'aid': answer.aid } res = self._session.post(Unhelpful_Url if unhelpful else Cancel_Unhelpful_Url, data=data) return res.json()['r'] == 0 PK9,\H&MM*zhihu_py3-0.3.12.dist-info/DESCRIPTION.rstzhihu-py3 : 知乎非官方API库 with Python3 ======================================== |Author| |Build| |DocumentationStatus| |PypiVersion| |License| |PypiDownloadStatus| **紧急:知乎改用 https 了,遇到 invalid url 的朋友们请尽快 pypi 更新到 0.3.3 以上版本!** Dev 分支coding内容: - unittest - 忙着跟上知乎的节奏(dog fuck's zhihu) 具体请看\ `ChangeLog `__\ 。 **有问题请开Issue,几个小时后无回应可加最后面的QQ群询问。** 友链: - \ `zhihurss `__\ :一个基于 zhihu-py3 做的跨平台知乎 rss(any user) 的客户端。 功能 ---- 由于知乎没有公开API,加上受到\ `zhihu-python `__\ 项目的启发,在Python3下重新写了一个知乎的数据解析模块。 提供的功能一句话概括为,用户提供知乎的网址构用于建对应类的对象,可以获取到某些需要的数据。 简单例子: .. code:: python from zhihu import ZhihuClient Cookies_File = 'cookies.json' client = ZhihuClient(Cookies_File) url = 'http://www.zhihu.com/question/24825703' question = client.question(url) print(question.title) print(question.answer_num) print(question.follower_num) print(question.topics) for answer in question.answers: print(answer.author.name, answer.upvote_num) 这段代码的输出为: :: 关系亲密的人之间要说「谢谢」吗? 627 4322 ['心理学', '恋爱', '社会', '礼仪', '亲密关系'] 龙晓航 50 小不点儿 198 芝士就是力量 89 欧阳忆希 425 ... 另外还有\ ``Author(用户)``\ 、\ ``Answer(答案)``\ 、\ ``Collection(收藏夹)``\ 、\ ``Column(专栏)``\ 、\ ``Post(文章)``\ 、\ ``Topic(话题)``\ 等类可以使用,\ ``Answer``,\ ``Post``\ 类提供了\ ``save``\ 方法能将答案或文章保存为HTML或Markdown格式,具体请看文档,或者\ ``zhihu-test.py``\ 。 安装 ---- .. class:: bold 本项目依赖于\ `requests `__\ 、\ `BeautifulSoup4 `__\ 、\ `html2text `__ 已将项目发布到pypi,请使用下列命令安装 .. code:: bash (sudo) pip(3) install (--upgrade) zhihu-py3 希望开启lxml的话请使用: .. code:: bash (sudo) pip(3) install (--upgrade) zhihu-py3[lxml] 因为lxml解析html效率高而且容错率强,在知乎使用\ ``
``\ 时,自带的html.parser会将其转换成\ ``
...
``\ ,而lxml则转换为\ ``
``\ ,更为标准且美观,所以推荐使用第二个命令。 不安装lxml也能使用本模块,此时会自动使用html.parser作为解析器。 PS 若在安装lxml时出错,请安装libxml和libxslt后重试: .. code:: bash sudo apt-get install libxml2 libxml2-dev libxslt1.1 libxslt1-dev 准备工作 -------- 第一次使用推荐运行以下代码生成 cookies 文件: .. code:: python from zhihu import ZhihuClient ZhihuClient().create_cookies('cookies.json') 运行结果 :: ====== zhihu login ===== email: password: please check captcha.gif for captcha captcha: ====== logging.... ===== login successfully cookies file created. 运行成功后会在目录下生成\ ``cookies.json``\ 文件。 以下示例皆以登录成功为前提。 建议在正式使用之前运行\ ``zhihu-test.py``\ 测试一下。 用法实例 -------- 为了精简 Readme,本部分移动至文档内。 请看文档的「用法示例」部分。 登录方法综述 --------------------------------------------- 为了精简 Readme,本部分移动至文档内。 请看文档的「登录方法综述」部分。 文档 ---- 终于搞定了文档这个磨人的小妖精,可惜 Sphinx 还是不会用 T^T 先随意弄成这样吧: `Master版文档 `__ `Dev版文档 `__ TODO List --------- - [x] 增加获取用户关注者,用户追随者 - [x] 增加获取答案点赞用户功能 - [x] 获取用户头像地址 - [x] 打包为标准Python模块 - [x] 重构代码,增加\ ``ZhihuClient``\ 类,使类可以自定义cookies文件 - [x] 收藏夹关注者,问题关注者等等 - [x] ``ZhihuClient``\ 增加各种用户操作(比如给某答案点赞) - [ ] Unittest (因为知乎可能会变,所以这个有点难 - [x] 增加获取用户关注专栏数和关注专栏的功能 - [x] 增加获取用户关注话题数和关注话题的功能 - [x] 评论类也要慢慢提上议程了吧 联系我 ------ Github:\ `@7sDream `__ 知乎:\ `@7sDream `__ 新浪微博:\ `@Dilover `__ 邮箱:\ `给我发邮件 `__ 编程交流群:478786205 .. |Author| image:: https://img.shields.io/badge/Author-7sDream-blue.svg :target: https://github.com/7sDream .. |DocumentationStatus| image:: https://readthedocs.org/projects/zhihu-py3/badge/?version=latest :target: https://readthedocs.org/projects/zhihu-py3/?badge=latest .. |PypiVersion| image:: https://img.shields.io/pypi/v/zhihu-py3.svg :target: https://pypi.python.org/pypi/zhihu-py3 .. |PypiDownloadStatus| image:: https://img.shields.io/pypi/dd/zhihu-py3.svg :target: https://pypi.python.org/pypi/zhihu-py3 .. |License| image:: https://img.shields.io/pypi/l/zhihu-py3.svg :target: https://github.com/7sDream/zhihu-py3/blob/master/LICENSE .. |Build| image:: https://travis-ci.org/7sDream/zhihu-py3.svg?branch=dev :target: http://www.zhihu.com/people/7sdream PK9,\H+?(zhihu_py3-0.3.12.dist-info/metadata.json{"classifiers": ["Development Status :: 3 - Alpha", "Environment :: Web Environment", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python :: 3", "Topic :: Internet :: WWW/HTTP", "Topic :: Software Development :: Libraries :: Python Modules"], "download_url": "https://github.com/7sDream/zhihu-py3", "extensions": {"python.details": {"contacts": [{"email": "didislover@gmail.com", "name": "7sDream", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://github.com/7sDream/zhihu-py3"}}}, "extras": ["lxml"], "generator": "bdist_wheel (0.26.0)", "keywords": ["zhihu", "network", "spider", "html"], "license": "MIT", "metadata_version": "2.0", "name": "zhihu-py3", "run_requires": [{"requires": ["beautifulsoup4", "html2text", "requests"]}, {"extra": "lxml", "requires": ["lxml"]}], "summary": "Zhihu UNOFFICIAL API library in python3, with help of bs4, lxml, requests and html2text.", "version": "0.3.12"}PK9,\H4N(zhihu_py3-0.3.12.dist-info/top_level.txtzhihu PK9,\H}\\ zhihu_py3-0.3.12.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py3-none-any PK9,\H~4= #zhihu_py3-0.3.12.dist-info/METADATAMetadata-Version: 2.0 Name: zhihu-py3 Version: 0.3.12 Summary: Zhihu UNOFFICIAL API library in python3, with help of bs4, lxml, requests and html2text. Home-page: https://github.com/7sDream/zhihu-py3 Author: 7sDream Author-email: didislover@gmail.com License: MIT Download-URL: https://github.com/7sDream/zhihu-py3 Keywords: zhihu,network,spider,html Platform: UNKNOWN Classifier: Development Status :: 3 - Alpha Classifier: Environment :: Web Environment Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: MIT License Classifier: Operating System :: OS Independent Classifier: Programming Language :: Python :: 3 Classifier: Topic :: Internet :: WWW/HTTP Classifier: Topic :: Software Development :: Libraries :: Python Modules Requires-Dist: beautifulsoup4 Requires-Dist: html2text Requires-Dist: requests Provides-Extra: lxml Requires-Dist: lxml; extra == 'lxml' zhihu-py3 : 知乎非官方API库 with Python3 ======================================== |Author| |Build| |DocumentationStatus| |PypiVersion| |License| |PypiDownloadStatus| **紧急:知乎改用 https 了,遇到 invalid url 的朋友们请尽快 pypi 更新到 0.3.3 以上版本!** Dev 分支coding内容: - unittest - 忙着跟上知乎的节奏(dog fuck's zhihu) 具体请看\ `ChangeLog `__\ 。 **有问题请开Issue,几个小时后无回应可加最后面的QQ群询问。** 友链: - \ `zhihurss `__\ :一个基于 zhihu-py3 做的跨平台知乎 rss(any user) 的客户端。 功能 ---- 由于知乎没有公开API,加上受到\ `zhihu-python `__\ 项目的启发,在Python3下重新写了一个知乎的数据解析模块。 提供的功能一句话概括为,用户提供知乎的网址构用于建对应类的对象,可以获取到某些需要的数据。 简单例子: .. code:: python from zhihu import ZhihuClient Cookies_File = 'cookies.json' client = ZhihuClient(Cookies_File) url = 'http://www.zhihu.com/question/24825703' question = client.question(url) print(question.title) print(question.answer_num) print(question.follower_num) print(question.topics) for answer in question.answers: print(answer.author.name, answer.upvote_num) 这段代码的输出为: :: 关系亲密的人之间要说「谢谢」吗? 627 4322 ['心理学', '恋爱', '社会', '礼仪', '亲密关系'] 龙晓航 50 小不点儿 198 芝士就是力量 89 欧阳忆希 425 ... 另外还有\ ``Author(用户)``\ 、\ ``Answer(答案)``\ 、\ ``Collection(收藏夹)``\ 、\ ``Column(专栏)``\ 、\ ``Post(文章)``\ 、\ ``Topic(话题)``\ 等类可以使用,\ ``Answer``,\ ``Post``\ 类提供了\ ``save``\ 方法能将答案或文章保存为HTML或Markdown格式,具体请看文档,或者\ ``zhihu-test.py``\ 。 安装 ---- .. class:: bold 本项目依赖于\ `requests `__\ 、\ `BeautifulSoup4 `__\ 、\ `html2text `__ 已将项目发布到pypi,请使用下列命令安装 .. code:: bash (sudo) pip(3) install (--upgrade) zhihu-py3 希望开启lxml的话请使用: .. code:: bash (sudo) pip(3) install (--upgrade) zhihu-py3[lxml] 因为lxml解析html效率高而且容错率强,在知乎使用\ ``
``\ 时,自带的html.parser会将其转换成\ ``
...
``\ ,而lxml则转换为\ ``
``\ ,更为标准且美观,所以推荐使用第二个命令。 不安装lxml也能使用本模块,此时会自动使用html.parser作为解析器。 PS 若在安装lxml时出错,请安装libxml和libxslt后重试: .. code:: bash sudo apt-get install libxml2 libxml2-dev libxslt1.1 libxslt1-dev 准备工作 -------- 第一次使用推荐运行以下代码生成 cookies 文件: .. code:: python from zhihu import ZhihuClient ZhihuClient().create_cookies('cookies.json') 运行结果 :: ====== zhihu login ===== email: password: please check captcha.gif for captcha captcha: ====== logging.... ===== login successfully cookies file created. 运行成功后会在目录下生成\ ``cookies.json``\ 文件。 以下示例皆以登录成功为前提。 建议在正式使用之前运行\ ``zhihu-test.py``\ 测试一下。 用法实例 -------- 为了精简 Readme,本部分移动至文档内。 请看文档的「用法示例」部分。 登录方法综述 --------------------------------------------- 为了精简 Readme,本部分移动至文档内。 请看文档的「登录方法综述」部分。 文档 ---- 终于搞定了文档这个磨人的小妖精,可惜 Sphinx 还是不会用 T^T 先随意弄成这样吧: `Master版文档 `__ `Dev版文档 `__ TODO List --------- - [x] 增加获取用户关注者,用户追随者 - [x] 增加获取答案点赞用户功能 - [x] 获取用户头像地址 - [x] 打包为标准Python模块 - [x] 重构代码,增加\ ``ZhihuClient``\ 类,使类可以自定义cookies文件 - [x] 收藏夹关注者,问题关注者等等 - [x] ``ZhihuClient``\ 增加各种用户操作(比如给某答案点赞) - [ ] Unittest (因为知乎可能会变,所以这个有点难 - [x] 增加获取用户关注专栏数和关注专栏的功能 - [x] 增加获取用户关注话题数和关注话题的功能 - [x] 评论类也要慢慢提上议程了吧 联系我 ------ Github:\ `@7sDream `__ 知乎:\ `@7sDream `__ 新浪微博:\ `@Dilover `__ 邮箱:\ `给我发邮件 `__ 编程交流群:478786205 .. |Author| image:: https://img.shields.io/badge/Author-7sDream-blue.svg :target: https://github.com/7sDream .. |DocumentationStatus| image:: https://readthedocs.org/projects/zhihu-py3/badge/?version=latest :target: https://readthedocs.org/projects/zhihu-py3/?badge=latest .. |PypiVersion| image:: https://img.shields.io/pypi/v/zhihu-py3.svg :target: https://pypi.python.org/pypi/zhihu-py3 .. |PypiDownloadStatus| image:: https://img.shields.io/pypi/dd/zhihu-py3.svg :target: https://pypi.python.org/pypi/zhihu-py3 .. |License| image:: https://img.shields.io/pypi/l/zhihu-py3.svg :target: https://github.com/7sDream/zhihu-py3/blob/master/LICENSE .. |Build| image:: https://travis-ci.org/7sDream/zhihu-py3.svg?branch=dev :target: http://www.zhihu.com/people/7sdream PK9,\HMM!zhihu_py3-0.3.12.dist-info/RECORDzhihu/__init__.py,sha256=X8Msn8SBwLyiDWCMTgWX2RHi6pB0nKMHw-wEuZ8YdF4,540 zhihu/activity.py,sha256=kLUfzsl_JBe44r0z-eb8tlPCUA35O_pLQpEN9GbLcJA,7290 zhihu/acttype.py,sha256=Ooil312jf03mx9MXH2LjYCE6UoL3O8xx3bipWWHjq-0,2557 zhihu/answer.py,sha256=lCbGyXQx-mhioI3L_oKjS-S84C_dqRba9uW4ZUFiWes,13708 zhihu/author.py,sha256=DL4vGjFmtstgTJiZKZMznk3aZEwOch_8ipGMtFJWbVU,22465 zhihu/base.py,sha256=nR09xkKdqqFdsNxP5ZZmhDos7j1i_19r_Kz2DLPsbRM,1328 zhihu/client.py,sha256=MEhljYqUqxRuyOG8yZ0oT0Tg1_s3vUfQ3HYL_D5jJho,6231 zhihu/collection.py,sha256=wBcnhREecM0W2oULlyJP7gL7ToZNyPY2ptt-Ix4UkEc,10757 zhihu/column.py,sha256=ldZiHvV-UlIeCE3m9N6DJs6vXx7GIpdSzcogjLOTAoE,3392 zhihu/comment.py,sha256=jifYQwDJP_HyPAwNZgrT9N03dRQfViaSwmiov2KYfAU,943 zhihu/common.py,sha256=9XV1xdHDamWJN8to3JCNUWyrY_Iw7Akf2ob3-tV2PjU,10253 zhihu/me.py,sha256=O9qfRfr_X42U826J7m8RedNPLwlj5SKHXXqrt-YRbNI,9922 zhihu/post.py,sha256=mUI3vAuCBEyUj4fw486da1kn3IneA5rVIG8JHV29K8E,6098 zhihu/question.py,sha256=Fxr2rXdiJ9P9u5BpBWhqu0rHyuoh7R3XhmYC8cGmbqo,15256 zhihu/topic.py,sha256=GwJ062agNp0l5kspEGJauKl-QvF2dbArLs-WHViBWBQ,17514 zhihu_py3-0.3.12.dist-info/DESCRIPTION.rst,sha256=ruy5DAH5awpTrxP3JxHqP6AS5KRndRl9rB0voRXG7fY,5965 zhihu_py3-0.3.12.dist-info/METADATA,sha256=4_HMjgIaoqUTxcVt_ROl6Q-uqnT_kRlB7JDFVyd4fPU,6864 zhihu_py3-0.3.12.dist-info/RECORD,, zhihu_py3-0.3.12.dist-info/WHEEL,sha256=zX7PHtH_7K-lEzyK75et0UBa3Bj8egCBMXe1M4gc6SU,92 zhihu_py3-0.3.12.dist-info/metadata.json,sha256=DfQ7GAy8uMKBcON5MvnbbeAFMJQ5dm5JcuyGxpZyNEg,1052 zhihu_py3-0.3.12.dist-info/top_level.txt,sha256=yYIbmLgGlT_g5nGBwGDWfVHXMFA_Bh8nXF7DST_ti-0,6 PKy;QH tWWzhihu/author.pyPK ,\HWzhihu/__init__.pyPKy;QHrBzz9Zzhihu/activity.pyPKy;QH*a vzhihu/acttype.pyPKy;QH%** zhihu/collection.pyPKy;QH99@ @ Czhihu/column.pyPKaSXH;;zhihu/question.pyPKy;QHwzhihu/comment.pyPKaSXHx Tzhihu/post.pyPKy;QHu9955Qzhihu/answer.pyPK+\HĈajDjD Fzhihu/topic.pyPKaSXHI400 zhihu/base.pyPKy;QHv?WWzhihu/client.pyPK+\H=2) ( (zhihu/common.pyPKy;QH;3&& zhihu/me.pyPK9,\H&MM*zhihu_py3-0.3.12.dist-info/DESCRIPTION.rstPK9,\H+?(9zhihu_py3-0.3.12.dist-info/metadata.jsonPK9,\H4N(zhihu_py3-0.3.12.dist-info/top_level.txtPK9,\H}\\ zhihu_py3-0.3.12.dist-info/WHEELPK9,\H~4= #zhihu_py3-0.3.12.dist-info/METADATAPK9,\HMM!/zhihu_py3-0.3.12.dist-info/RECORDPK6