From 6f7504e669897443a06dfd880db736f33bb048fa Mon Sep 17 00:00:00 2001 From: Valentin CZERYBA Date: Sat, 8 Apr 2023 12:27:30 +0200 Subject: [PATCH] separate file class --- WPImport.py | 268 ++++++++++++++++++++++++++++++++++++++++++++ insert_wordpress.py | 268 +------------------------------------------- 2 files changed, 270 insertions(+), 266 deletions(-) create mode 100644 WPImport.py diff --git a/WPImport.py b/WPImport.py new file mode 100644 index 0000000..9b4691c --- /dev/null +++ b/WPImport.py @@ -0,0 +1,268 @@ +#!/usr/bin/python3 + +from bs4 import BeautifulSoup +from urllib.parse import urlparse +import requests, os, logging, re, json + +class WPimport: + # Constructor + def __init__(self, basic, wordpress, logger): + self._basic = basic + self._wordpress = wordpress + self._logger = logger + + # Public method + + def fromDirectory(self, directory): + directory = "{0}/archives".format(directory) + directories = self._getDirectories([], "{0}".format(directory)) + files = self._getFiles(directories) + self.fromFile(files) + + + def fromFile(self, files): + for file in files: + if os.path.exists(file): + logger.info("Fichier en cours de traitement : {0}".format(file)) + with open(file, 'r') as f: + content = f.read() + soup = BeautifulSoup(content, 'html.parser') + articlebody = soup.find_all("div", class_="articlebody") + if len(articlebody) > 0: + self._addOrUpdatePost(soup) + else: + self._addOrUpdateFeaturedMedia(soup) + + # Private method + + ## Get all files + + def _getFiles(self, item): + files = [] + for i in item: + for j in os.listdir(i): + if os.path.isfile("{0}/{1}".format(i, j)): + files.append("{0}/{1}".format(i, j)) + return files + + ## Get directories + + def _getDirectories(self, subdirectory, item): + sub = subdirectory + for i in os.listdir(item): + if os.path.isdir("{0}/{1}".format(item, i)): + sub.append("{0}/{1}".format(item, i)) + subdirectory = self._getDirectories(sub, "{0}/{1}".format(item, i)) + return subdirectory + + ## Add or update featured media + + def _addOrUpdateFeaturedMedia(self, soup): + item_div = soup.find_all("div", {"data-edittype": "post"}) + for i in item_div: + h2 = i.find_all("h2")[0].text + params = {"search":h2, "type":"post"} + page = requests.get("http://{0}/wp-json/wp/v2/search".format(self._wordpress), auth=self._basic, params=params) + if page.status_code == 200: + result = page.json() + if len(result) > 0: + if h2 == result[0]["title"]: + img = i.find_all("img") + if len(img) > 0: + img_src = img[0].get("src") + page = requests.get(img_src) + if page.status_code == 200: + name_img = img_src.replace("_q", "") + name_img = name_img.split("/")[len(name_img.split("/"))-1] + params = {"search": name_img} + page = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params) + if page.status_code == 200: + res = page.json() + if len(res) > 0: + id_media = res[0]["id"] + headers = {'Content-Type': 'application/json', 'Accept':'application/json'} + data = {"featured_media": id_media} + r = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, result[0]["id"]), auth=self._basic, headers=headers, data=json.dumps(data)) + if r.status_code == 200: + logger.info("Ajout media featured : {0}".format(r.json()["title"]["raw"])) + else: + logger.info("Aucun media trouvé pour {0}".format(h2)) + + ## Association image to post + + def _linkImgPost(self, title, list_img, post_id): + for i in list_img: + data = {"post": post_id} + r = requests.post("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, i["id"]), auth=self._basic, data=data) + if r.status_code == 200: + logger.info("Association d'une image à l'article {0}".format(title)) + + ## Add or update img + + def _addOrUpdateMedia(self, href_img, page): + media = {"id":"", "rendered":""} + split_fileimg = href_img.split("/") + img_name = split_fileimg[len(split_fileimg)-1] + params = { "search": img_name} + r = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params) + if r.status_code == 200: + res = r.json() + if len(res) > 0: + params = {"force":1} + r = requests.delete("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, res[0]["id"]), auth=self._basic, params=params) + if r.status_code == 200: + logger.info("Image supprimé {0}".format(img_name)) + data = page.content + img_type = "image/png" + if img_name.split(".")[1] == "jpg" or img_name.split(".")[1] == "jpeg": + img_type = "image/jpg" + headers={ 'Content-Type': img_type,'Content-Disposition' : 'attachment; filename={0}'.format(img_name)} + r = requests.post("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, headers=headers, data=data) + if r.status_code == 201: + logger.info("Ajout d'image {0}".format(img_name)) + res = r.json() + media["id"] = res["id"] + media["rendered"] = res["guid"]["rendered"] + return media + + ## Add or Update post + + def _addOrUpdatePost(self, soup): + tags = [] + month = {"janvier":"01", "février": "02", "mars": "03", "avril":"04", "mai": "05", "juin": "06", "juillet": "07", "août": "08", "septembre": "09", "octobre": "10", "novembre": "11", "décembre": "12"} + liste = ["categories", "tags"] + elements = {} + element = {} + listelement = {} + + for i in liste: + page = requests.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress,i)) + if page.status_code == 200: + elements[i] = page.json() + element[i] = [] + listelement[i] = [] + + articletitle = soup.find_all("h2", class_="articletitle") + articlebody = soup.find_all("div", class_="articlebody") + articledate = soup.find_all("span", class_="articledate") + articleacreator = soup.find_all("span", class_="articlecreator") + dateheader = soup.find_all("div", class_="dateheader") + itemfooter = soup.find_all("div", class_="itemfooter") + comment = soup.find_all("div", class_="comment_item") + img_a = articlebody[0].find_all("a", {"target": "_blank"}) + list_img = [] + for i in img_a: + new_img = {} + img = i.find_all("img") + if len(img) > 0: + href_a = i.get("href") + href_img = img[0].get("src") + new_img["old_src"]=href_img + new_img["old_href"]=href_a + page_img = requests.get(href_img) + if page_img.status_code == 404: + href_img = href_a + page_img = requests.get(href_a) + if page_img.status_code == 200: + media=self._addOrUpdateMedia(href_img, page_img) + new_img["id"]=media["id"] + new_img["new_src"]=media["rendered"] + list_img.append(new_img) + if href_img != href_a: + media=self._addOrUpdateMedia(href_a, page_img) + new_img["id"]=media["id"] + new_img["new_src"]=media["rendered"] + list_img.append(new_img) + + comment_post = [] + for i in comment: + comment_item = i.text.split("\n") + footer = i.find_all("div", class_="itemfooter") + comment_author = footer[0].text.split(",")[0].replace("Posté par ", "") + comment_date = footer[0].find_all("abbr")[0].get("title") + comment_content = "

" + for j in range(0, len(comment_item)-2): + if len(comment_item[j]) > 0: + comment_content = comment_content + comment_item[j] + "
" + comment_content = comment_content + "

" + comment_post.append({"author": comment_author, "date": comment_date, "content": comment_content}) + a = itemfooter[0].find_all("a", {"rel": True}) + for i in a: + rel = i.get("rel") + if rel[0] == 'tag': + href = i.get("href") + if re.search(r'/tag/', href): + element["tags"].append(i.text) + if re.search(r'/archives/', href): + element["categories"].append(i.text) + for i in liste: + for j in element[i]: + element_exist = False + for k in elements[i]: + if k["name"] == j: + element_exist = True + listelement[i].append(k["id"]) + if element_exist is False: + data = {"name": j} + page = requests.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, data=data) + if page.status_code == 201: + result = page.json() + listelement[i].append(result["id"]) + + title = articletitle[0].text + author = articleacreator[0].text.lower() + body = articlebody[0].find_all("p") + bodyhtml = "

" + for i in body: + if len(i.text) == 1: + bodyhtml = bodyhtml + "
" + else: + bodyhtml = bodyhtml + str(i).replace("

", "").replace("

", "").replace("
", "
") + "
" + bodyhtml = bodyhtml + "

" + for i in list_img: + o = urlparse(i["new_src"]) + bodyhtml = bodyhtml.replace(i["old_href"], o.path) + bodyhtml = bodyhtml.replace(i["old_src"], o.path) + hour = articledate[0].text + time = dateheader[0].text.split(" ") + data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]} + params = {"search":author} + page = requests.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params) + if page.status_code == 200: + result = page.json() + data["author"] = result[0]["id"] + + params = {"search":title} + page = requests.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params) + page_exist = True + headers = {'Content-Type': 'application/json', 'Accept':'application/json'} + if page.status_code == 200: + result = page.json() + if len(result) == 0: + page_exist = False + else: + logger.info("La page {0} existe deja et mis à jour".format(title)) + post_id = result[0]["id"] + page = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=headers, data=json.dumps(data)) + if page.status_code == 200: + result = page.json() + logger.info("Article mis à jour : {0}".format(result["title"]["raw"])) + self._linkImgPost(result["title"]["raw"], list_img, result["id"]) + + + + if page_exist == False: + page = requests.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=headers, data=json.dumps(data)) + if page.status_code == 201: + result = page.json() + logger.info("Article ajoute : {0}".format(result["title"]["raw"])) + for i in comment_post: + data = {"post": result["id"], "content": i["content"], "date": i["date"], "author_name": i["author"]} + page = requests.post("http://{0}/wp-json/wp/v2/comments".format(self._wordpress), auth=self._basic, data=data) + if page.status_code == 201: + logger.info("Commentaire ajoute pour {0}".format(result["title"]["raw"])) + self._linkImgPost(result["title"]["raw"], list_img, result["id"]) + + + + diff --git a/insert_wordpress.py b/insert_wordpress.py index 2fc36fa..68d2244 100644 --- a/insert_wordpress.py +++ b/insert_wordpress.py @@ -1,272 +1,8 @@ #!/usr/bin/python3 -from bs4 import BeautifulSoup -from urllib.parse import urlparse from requests.auth import HTTPBasicAuth from getpass import getpass -import requests, os, argparse, logging, re, json - -class WPimport: - # Constructor - def __init__(self, basic, wordpress, logger): - self._basic = basic - self._wordpress = wordpress - self._logger = logger - - # Public method - - def fromDirectory(self, directory): - directory = "{0}/archives".format(directory) - directories = self._getDirectories([], "{0}".format(directory)) - files = self._getFiles(directories) - self.fromFile(files) - - - def fromFile(self, files): - for file in files: - if os.path.exists(file): - logger.info("Fichier en cours de traitement : {0}".format(file)) - with open(file, 'r') as f: - content = f.read() - soup = BeautifulSoup(content, 'html.parser') - articlebody = soup.find_all("div", class_="articlebody") - if len(articlebody) > 0: - self._addOrUpdatePost(soup) - else: - self._addOrUpdateFeaturedMedia(soup) - - # Private method - - ## Get all files - - def _getFiles(self, item): - files = [] - for i in item: - for j in os.listdir(i): - if os.path.isfile("{0}/{1}".format(i, j)): - files.append("{0}/{1}".format(i, j)) - return files - - ## Get directories - - def _getDirectories(self, subdirectory, item): - sub = subdirectory - for i in os.listdir(item): - if os.path.isdir("{0}/{1}".format(item, i)): - sub.append("{0}/{1}".format(item, i)) - subdirectory = self._getDirectories(sub, "{0}/{1}".format(item, i)) - return subdirectory - - ## Add or update featured media - - def _addOrUpdateFeaturedMedia(self, soup): - item_div = soup.find_all("div", {"data-edittype": "post"}) - for i in item_div: - h2 = i.find_all("h2")[0].text - params = {"search":h2, "type":"post"} - page = requests.get("http://{0}/wp-json/wp/v2/search".format(self._wordpress), auth=self._basic, params=params) - if page.status_code == 200: - result = page.json() - if len(result) > 0: - if h2 == result[0]["title"]: - img = i.find_all("img") - if len(img) > 0: - img_src = img[0].get("src") - page = requests.get(img_src) - if page.status_code == 200: - name_img = img_src.replace("_q", "") - name_img = name_img.split("/")[len(name_img.split("/"))-1] - params = {"search": name_img} - page = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params) - if page.status_code == 200: - res = page.json() - if len(res) > 0: - id_media = res[0]["id"] - headers = {'Content-Type': 'application/json', 'Accept':'application/json'} - data = {"featured_media": id_media} - r = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, result[0]["id"]), auth=self._basic, headers=headers, data=json.dumps(data)) - if r.status_code == 200: - logger.info("Ajout media featured : {0}".format(r.json()["title"]["raw"])) - else: - logger.info("Aucun media trouvé pour {0}".format(h2)) - - ## Association image to post - - def _linkImgPost(self, title, list_img, post_id): - for i in list_img: - data = {"post": post_id} - r = requests.post("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, i["id"]), auth=self._basic, data=data) - if r.status_code == 200: - logger.info("Association d'une image à l'article {0}".format(title)) - - ## Add or update img - - def _addOrUpdateMedia(self, href_img, page): - media = {"id":"", "rendered":""} - split_fileimg = href_img.split("/") - img_name = split_fileimg[len(split_fileimg)-1] - params = { "search": img_name} - r = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params) - if r.status_code == 200: - res = r.json() - if len(res) > 0: - params = {"force":1} - r = requests.delete("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, res[0]["id"]), auth=self._basic, params=params) - if r.status_code == 200: - logger.info("Image supprimé {0}".format(img_name)) - data = page.content - img_type = "image/png" - if img_name.split(".")[1] == "jpg" or img_name.split(".")[1] == "jpeg": - img_type = "image/jpg" - headers={ 'Content-Type': img_type,'Content-Disposition' : 'attachment; filename={0}'.format(img_name)} - r = requests.post("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, headers=headers, data=data) - if r.status_code == 201: - logger.info("Ajout d'image {0}".format(img_name)) - res = r.json() - media["id"] = res["id"] - media["rendered"] = res["guid"]["rendered"] - return media - - ## Add or Update post - - def _addOrUpdatePost(self, soup): - tags = [] - month = {"janvier":"01", "février": "02", "mars": "03", "avril":"04", "mai": "05", "juin": "06", "juillet": "07", "août": "08", "septembre": "09", "octobre": "10", "novembre": "11", "décembre": "12"} - liste = ["categories", "tags"] - elements = {} - element = {} - listelement = {} - - for i in liste: - page = requests.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress,i)) - if page.status_code == 200: - elements[i] = page.json() - element[i] = [] - listelement[i] = [] - - articletitle = soup.find_all("h2", class_="articletitle") - articlebody = soup.find_all("div", class_="articlebody") - articledate = soup.find_all("span", class_="articledate") - articleacreator = soup.find_all("span", class_="articlecreator") - dateheader = soup.find_all("div", class_="dateheader") - itemfooter = soup.find_all("div", class_="itemfooter") - comment = soup.find_all("div", class_="comment_item") - img_a = articlebody[0].find_all("a", {"target": "_blank"}) - list_img = [] - for i in img_a: - new_img = {} - img = i.find_all("img") - if len(img) > 0: - href_a = i.get("href") - href_img = img[0].get("src") - new_img["old_src"]=href_img - new_img["old_href"]=href_a - page_img = requests.get(href_img) - if page_img.status_code == 404: - href_img = href_a - page_img = requests.get(href_a) - if page_img.status_code == 200: - media=self._addOrUpdateMedia(href_img, page_img) - new_img["id"]=media["id"] - new_img["new_src"]=media["rendered"] - list_img.append(new_img) - if href_img != href_a: - media=self._addOrUpdateMedia(href_a, page_img) - new_img["id"]=media["id"] - new_img["new_src"]=media["rendered"] - list_img.append(new_img) - - comment_post = [] - for i in comment: - comment_item = i.text.split("\n") - footer = i.find_all("div", class_="itemfooter") - comment_author = footer[0].text.split(",")[0].replace("Posté par ", "") - comment_date = footer[0].find_all("abbr")[0].get("title") - comment_content = "

" - for j in range(0, len(comment_item)-2): - if len(comment_item[j]) > 0: - comment_content = comment_content + comment_item[j] + "
" - comment_content = comment_content + "

" - comment_post.append({"author": comment_author, "date": comment_date, "content": comment_content}) - a = itemfooter[0].find_all("a", {"rel": True}) - for i in a: - rel = i.get("rel") - if rel[0] == 'tag': - href = i.get("href") - if re.search(r'/tag/', href): - element["tags"].append(i.text) - if re.search(r'/archives/', href): - element["categories"].append(i.text) - for i in liste: - for j in element[i]: - element_exist = False - for k in elements[i]: - if k["name"] == j: - element_exist = True - listelement[i].append(k["id"]) - if element_exist is False: - data = {"name": j} - page = requests.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, data=data) - if page.status_code == 201: - result = page.json() - listelement[i].append(result["id"]) - - title = articletitle[0].text - author = articleacreator[0].text.lower() - body = articlebody[0].find_all("p") - bodyhtml = "

" - for i in body: - if len(i.text) == 1: - bodyhtml = bodyhtml + "
" - else: - bodyhtml = bodyhtml + str(i).replace("

", "").replace("

", "").replace("
", "
") + "
" - bodyhtml = bodyhtml + "

" - for i in list_img: - o = urlparse(i["new_src"]) - bodyhtml = bodyhtml.replace(i["old_href"], o.path) - bodyhtml = bodyhtml.replace(i["old_src"], o.path) - hour = articledate[0].text - time = dateheader[0].text.split(" ") - data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]} - params = {"search":author} - page = requests.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params) - if page.status_code == 200: - result = page.json() - data["author"] = result[0]["id"] - - params = {"search":title} - page = requests.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params) - page_exist = True - headers = {'Content-Type': 'application/json', 'Accept':'application/json'} - if page.status_code == 200: - result = page.json() - if len(result) == 0: - page_exist = False - else: - logger.info("La page {0} existe deja et mis à jour".format(title)) - post_id = result[0]["id"] - page = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=headers, data=json.dumps(data)) - if page.status_code == 200: - result = page.json() - logger.info("Article mis à jour : {0}".format(result["title"]["raw"])) - self._linkImgPost(result["title"]["raw"], list_img, result["id"]) - - - - if page_exist == False: - page = requests.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=headers, data=json.dumps(data)) - if page.status_code == 201: - result = page.json() - logger.info("Article ajoute : {0}".format(result["title"]["raw"])) - for i in comment_post: - data = {"post": result["id"], "content": i["content"], "date": i["date"], "author_name": i["author"]} - page = requests.post("http://{0}/wp-json/wp/v2/comments".format(self._wordpress), auth=self._basic, data=data) - if page.status_code == 201: - logger.info("Commentaire ajoute pour {0}".format(result["title"]["raw"])) - self._linkImgPost(result["title"]["raw"], list_img, result["id"]) - - - - +import argparse, logging +import WPImport if __name__ == '__main__':