separate file class

This commit is contained in:
Valentin CZERYBA 2023-04-08 12:27:30 +02:00
parent d58ead52b2
commit 6f7504e669
2 changed files with 270 additions and 266 deletions

268
WPImport.py Normal file
View File

@ -0,0 +1,268 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
class WPimport:
# Constructor
def __init__(self, basic, wordpress, logger):
self._basic = basic
self._wordpress = wordpress
self._logger = logger
# Public method
def fromDirectory(self, directory):
directory = "{0}/archives".format(directory)
directories = self._getDirectories([], "{0}".format(directory))
files = self._getFiles(directories)
self.fromFile(files)
def fromFile(self, files):
for file in files:
if os.path.exists(file):
logger.info("Fichier en cours de traitement : {0}".format(file))
with open(file, 'r') as f:
content = f.read()
soup = BeautifulSoup(content, 'html.parser')
articlebody = soup.find_all("div", class_="articlebody")
if len(articlebody) > 0:
self._addOrUpdatePost(soup)
else:
self._addOrUpdateFeaturedMedia(soup)
# Private method
## Get all files
def _getFiles(self, item):
files = []
for i in item:
for j in os.listdir(i):
if os.path.isfile("{0}/{1}".format(i, j)):
files.append("{0}/{1}".format(i, j))
return files
## Get directories
def _getDirectories(self, subdirectory, item):
sub = subdirectory
for i in os.listdir(item):
if os.path.isdir("{0}/{1}".format(item, i)):
sub.append("{0}/{1}".format(item, i))
subdirectory = self._getDirectories(sub, "{0}/{1}".format(item, i))
return subdirectory
## Add or update featured media
def _addOrUpdateFeaturedMedia(self, soup):
item_div = soup.find_all("div", {"data-edittype": "post"})
for i in item_div:
h2 = i.find_all("h2")[0].text
params = {"search":h2, "type":"post"}
page = requests.get("http://{0}/wp-json/wp/v2/search".format(self._wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
if len(result) > 0:
if h2 == result[0]["title"]:
img = i.find_all("img")
if len(img) > 0:
img_src = img[0].get("src")
page = requests.get(img_src)
if page.status_code == 200:
name_img = img_src.replace("_q", "")
name_img = name_img.split("/")[len(name_img.split("/"))-1]
params = {"search": name_img}
page = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params)
if page.status_code == 200:
res = page.json()
if len(res) > 0:
id_media = res[0]["id"]
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
data = {"featured_media": id_media}
r = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, result[0]["id"]), auth=self._basic, headers=headers, data=json.dumps(data))
if r.status_code == 200:
logger.info("Ajout media featured : {0}".format(r.json()["title"]["raw"]))
else:
logger.info("Aucun media trouvé pour {0}".format(h2))
## Association image to post
def _linkImgPost(self, title, list_img, post_id):
for i in list_img:
data = {"post": post_id}
r = requests.post("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, i["id"]), auth=self._basic, data=data)
if r.status_code == 200:
logger.info("Association d'une image à l'article {0}".format(title))
## Add or update img
def _addOrUpdateMedia(self, href_img, page):
media = {"id":"", "rendered":""}
split_fileimg = href_img.split("/")
img_name = split_fileimg[len(split_fileimg)-1]
params = { "search": img_name}
r = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params)
if r.status_code == 200:
res = r.json()
if len(res) > 0:
params = {"force":1}
r = requests.delete("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, res[0]["id"]), auth=self._basic, params=params)
if r.status_code == 200:
logger.info("Image supprimé {0}".format(img_name))
data = page.content
img_type = "image/png"
if img_name.split(".")[1] == "jpg" or img_name.split(".")[1] == "jpeg":
img_type = "image/jpg"
headers={ 'Content-Type': img_type,'Content-Disposition' : 'attachment; filename={0}'.format(img_name)}
r = requests.post("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, headers=headers, data=data)
if r.status_code == 201:
logger.info("Ajout d'image {0}".format(img_name))
res = r.json()
media["id"] = res["id"]
media["rendered"] = res["guid"]["rendered"]
return media
## Add or Update post
def _addOrUpdatePost(self, soup):
tags = []
month = {"janvier":"01", "février": "02", "mars": "03", "avril":"04", "mai": "05", "juin": "06", "juillet": "07", "août": "08", "septembre": "09", "octobre": "10", "novembre": "11", "décembre": "12"}
liste = ["categories", "tags"]
elements = {}
element = {}
listelement = {}
for i in liste:
page = requests.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress,i))
if page.status_code == 200:
elements[i] = page.json()
element[i] = []
listelement[i] = []
articletitle = soup.find_all("h2", class_="articletitle")
articlebody = soup.find_all("div", class_="articlebody")
articledate = soup.find_all("span", class_="articledate")
articleacreator = soup.find_all("span", class_="articlecreator")
dateheader = soup.find_all("div", class_="dateheader")
itemfooter = soup.find_all("div", class_="itemfooter")
comment = soup.find_all("div", class_="comment_item")
img_a = articlebody[0].find_all("a", {"target": "_blank"})
list_img = []
for i in img_a:
new_img = {}
img = i.find_all("img")
if len(img) > 0:
href_a = i.get("href")
href_img = img[0].get("src")
new_img["old_src"]=href_img
new_img["old_href"]=href_a
page_img = requests.get(href_img)
if page_img.status_code == 404:
href_img = href_a
page_img = requests.get(href_a)
if page_img.status_code == 200:
media=self._addOrUpdateMedia(href_img, page_img)
new_img["id"]=media["id"]
new_img["new_src"]=media["rendered"]
list_img.append(new_img)
if href_img != href_a:
media=self._addOrUpdateMedia(href_a, page_img)
new_img["id"]=media["id"]
new_img["new_src"]=media["rendered"]
list_img.append(new_img)
comment_post = []
for i in comment:
comment_item = i.text.split("\n")
footer = i.find_all("div", class_="itemfooter")
comment_author = footer[0].text.split(",")[0].replace("Posté par ", "")
comment_date = footer[0].find_all("abbr")[0].get("title")
comment_content = "<p>"
for j in range(0, len(comment_item)-2):
if len(comment_item[j]) > 0:
comment_content = comment_content + comment_item[j] + "<br />"
comment_content = comment_content + "</p>"
comment_post.append({"author": comment_author, "date": comment_date, "content": comment_content})
a = itemfooter[0].find_all("a", {"rel": True})
for i in a:
rel = i.get("rel")
if rel[0] == 'tag':
href = i.get("href")
if re.search(r'/tag/', href):
element["tags"].append(i.text)
if re.search(r'/archives/', href):
element["categories"].append(i.text)
for i in liste:
for j in element[i]:
element_exist = False
for k in elements[i]:
if k["name"] == j:
element_exist = True
listelement[i].append(k["id"])
if element_exist is False:
data = {"name": j}
page = requests.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, data=data)
if page.status_code == 201:
result = page.json()
listelement[i].append(result["id"])
title = articletitle[0].text
author = articleacreator[0].text.lower()
body = articlebody[0].find_all("p")
bodyhtml = "<p>"
for i in body:
if len(i.text) == 1:
bodyhtml = bodyhtml + "<br />"
else:
bodyhtml = bodyhtml + str(i).replace("<p>", "").replace("</p>", "").replace("<br>", "<br />") + "<br />"
bodyhtml = bodyhtml + "</p>"
for i in list_img:
o = urlparse(i["new_src"])
bodyhtml = bodyhtml.replace(i["old_href"], o.path)
bodyhtml = bodyhtml.replace(i["old_src"], o.path)
hour = articledate[0].text
time = dateheader[0].text.split(" ")
data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]}
params = {"search":author}
page = requests.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
data["author"] = result[0]["id"]
params = {"search":title}
page = requests.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params)
page_exist = True
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
if page.status_code == 200:
result = page.json()
if len(result) == 0:
page_exist = False
else:
logger.info("La page {0} existe deja et mis à jour".format(title))
post_id = result[0]["id"]
page = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=headers, data=json.dumps(data))
if page.status_code == 200:
result = page.json()
logger.info("Article mis à jour : {0}".format(result["title"]["raw"]))
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
if page_exist == False:
page = requests.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=headers, data=json.dumps(data))
if page.status_code == 201:
result = page.json()
logger.info("Article ajoute : {0}".format(result["title"]["raw"]))
for i in comment_post:
data = {"post": result["id"], "content": i["content"], "date": i["date"], "author_name": i["author"]}
page = requests.post("http://{0}/wp-json/wp/v2/comments".format(self._wordpress), auth=self._basic, data=data)
if page.status_code == 201:
logger.info("Commentaire ajoute pour {0}".format(result["title"]["raw"]))
self._linkImgPost(result["title"]["raw"], list_img, result["id"])

View File

@ -1,272 +1,8 @@
#!/usr/bin/python3 #!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
from getpass import getpass from getpass import getpass
import requests, os, argparse, logging, re, json import argparse, logging
import WPImport
class WPimport:
# Constructor
def __init__(self, basic, wordpress, logger):
self._basic = basic
self._wordpress = wordpress
self._logger = logger
# Public method
def fromDirectory(self, directory):
directory = "{0}/archives".format(directory)
directories = self._getDirectories([], "{0}".format(directory))
files = self._getFiles(directories)
self.fromFile(files)
def fromFile(self, files):
for file in files:
if os.path.exists(file):
logger.info("Fichier en cours de traitement : {0}".format(file))
with open(file, 'r') as f:
content = f.read()
soup = BeautifulSoup(content, 'html.parser')
articlebody = soup.find_all("div", class_="articlebody")
if len(articlebody) > 0:
self._addOrUpdatePost(soup)
else:
self._addOrUpdateFeaturedMedia(soup)
# Private method
## Get all files
def _getFiles(self, item):
files = []
for i in item:
for j in os.listdir(i):
if os.path.isfile("{0}/{1}".format(i, j)):
files.append("{0}/{1}".format(i, j))
return files
## Get directories
def _getDirectories(self, subdirectory, item):
sub = subdirectory
for i in os.listdir(item):
if os.path.isdir("{0}/{1}".format(item, i)):
sub.append("{0}/{1}".format(item, i))
subdirectory = self._getDirectories(sub, "{0}/{1}".format(item, i))
return subdirectory
## Add or update featured media
def _addOrUpdateFeaturedMedia(self, soup):
item_div = soup.find_all("div", {"data-edittype": "post"})
for i in item_div:
h2 = i.find_all("h2")[0].text
params = {"search":h2, "type":"post"}
page = requests.get("http://{0}/wp-json/wp/v2/search".format(self._wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
if len(result) > 0:
if h2 == result[0]["title"]:
img = i.find_all("img")
if len(img) > 0:
img_src = img[0].get("src")
page = requests.get(img_src)
if page.status_code == 200:
name_img = img_src.replace("_q", "")
name_img = name_img.split("/")[len(name_img.split("/"))-1]
params = {"search": name_img}
page = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params)
if page.status_code == 200:
res = page.json()
if len(res) > 0:
id_media = res[0]["id"]
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
data = {"featured_media": id_media}
r = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, result[0]["id"]), auth=self._basic, headers=headers, data=json.dumps(data))
if r.status_code == 200:
logger.info("Ajout media featured : {0}".format(r.json()["title"]["raw"]))
else:
logger.info("Aucun media trouvé pour {0}".format(h2))
## Association image to post
def _linkImgPost(self, title, list_img, post_id):
for i in list_img:
data = {"post": post_id}
r = requests.post("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, i["id"]), auth=self._basic, data=data)
if r.status_code == 200:
logger.info("Association d'une image à l'article {0}".format(title))
## Add or update img
def _addOrUpdateMedia(self, href_img, page):
media = {"id":"", "rendered":""}
split_fileimg = href_img.split("/")
img_name = split_fileimg[len(split_fileimg)-1]
params = { "search": img_name}
r = requests.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params)
if r.status_code == 200:
res = r.json()
if len(res) > 0:
params = {"force":1}
r = requests.delete("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, res[0]["id"]), auth=self._basic, params=params)
if r.status_code == 200:
logger.info("Image supprimé {0}".format(img_name))
data = page.content
img_type = "image/png"
if img_name.split(".")[1] == "jpg" or img_name.split(".")[1] == "jpeg":
img_type = "image/jpg"
headers={ 'Content-Type': img_type,'Content-Disposition' : 'attachment; filename={0}'.format(img_name)}
r = requests.post("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, headers=headers, data=data)
if r.status_code == 201:
logger.info("Ajout d'image {0}".format(img_name))
res = r.json()
media["id"] = res["id"]
media["rendered"] = res["guid"]["rendered"]
return media
## Add or Update post
def _addOrUpdatePost(self, soup):
tags = []
month = {"janvier":"01", "février": "02", "mars": "03", "avril":"04", "mai": "05", "juin": "06", "juillet": "07", "août": "08", "septembre": "09", "octobre": "10", "novembre": "11", "décembre": "12"}
liste = ["categories", "tags"]
elements = {}
element = {}
listelement = {}
for i in liste:
page = requests.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress,i))
if page.status_code == 200:
elements[i] = page.json()
element[i] = []
listelement[i] = []
articletitle = soup.find_all("h2", class_="articletitle")
articlebody = soup.find_all("div", class_="articlebody")
articledate = soup.find_all("span", class_="articledate")
articleacreator = soup.find_all("span", class_="articlecreator")
dateheader = soup.find_all("div", class_="dateheader")
itemfooter = soup.find_all("div", class_="itemfooter")
comment = soup.find_all("div", class_="comment_item")
img_a = articlebody[0].find_all("a", {"target": "_blank"})
list_img = []
for i in img_a:
new_img = {}
img = i.find_all("img")
if len(img) > 0:
href_a = i.get("href")
href_img = img[0].get("src")
new_img["old_src"]=href_img
new_img["old_href"]=href_a
page_img = requests.get(href_img)
if page_img.status_code == 404:
href_img = href_a
page_img = requests.get(href_a)
if page_img.status_code == 200:
media=self._addOrUpdateMedia(href_img, page_img)
new_img["id"]=media["id"]
new_img["new_src"]=media["rendered"]
list_img.append(new_img)
if href_img != href_a:
media=self._addOrUpdateMedia(href_a, page_img)
new_img["id"]=media["id"]
new_img["new_src"]=media["rendered"]
list_img.append(new_img)
comment_post = []
for i in comment:
comment_item = i.text.split("\n")
footer = i.find_all("div", class_="itemfooter")
comment_author = footer[0].text.split(",")[0].replace("Posté par ", "")
comment_date = footer[0].find_all("abbr")[0].get("title")
comment_content = "<p>"
for j in range(0, len(comment_item)-2):
if len(comment_item[j]) > 0:
comment_content = comment_content + comment_item[j] + "<br />"
comment_content = comment_content + "</p>"
comment_post.append({"author": comment_author, "date": comment_date, "content": comment_content})
a = itemfooter[0].find_all("a", {"rel": True})
for i in a:
rel = i.get("rel")
if rel[0] == 'tag':
href = i.get("href")
if re.search(r'/tag/', href):
element["tags"].append(i.text)
if re.search(r'/archives/', href):
element["categories"].append(i.text)
for i in liste:
for j in element[i]:
element_exist = False
for k in elements[i]:
if k["name"] == j:
element_exist = True
listelement[i].append(k["id"])
if element_exist is False:
data = {"name": j}
page = requests.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, data=data)
if page.status_code == 201:
result = page.json()
listelement[i].append(result["id"])
title = articletitle[0].text
author = articleacreator[0].text.lower()
body = articlebody[0].find_all("p")
bodyhtml = "<p>"
for i in body:
if len(i.text) == 1:
bodyhtml = bodyhtml + "<br />"
else:
bodyhtml = bodyhtml + str(i).replace("<p>", "").replace("</p>", "").replace("<br>", "<br />") + "<br />"
bodyhtml = bodyhtml + "</p>"
for i in list_img:
o = urlparse(i["new_src"])
bodyhtml = bodyhtml.replace(i["old_href"], o.path)
bodyhtml = bodyhtml.replace(i["old_src"], o.path)
hour = articledate[0].text
time = dateheader[0].text.split(" ")
data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]}
params = {"search":author}
page = requests.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params)
if page.status_code == 200:
result = page.json()
data["author"] = result[0]["id"]
params = {"search":title}
page = requests.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params)
page_exist = True
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
if page.status_code == 200:
result = page.json()
if len(result) == 0:
page_exist = False
else:
logger.info("La page {0} existe deja et mis à jour".format(title))
post_id = result[0]["id"]
page = requests.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=headers, data=json.dumps(data))
if page.status_code == 200:
result = page.json()
logger.info("Article mis à jour : {0}".format(result["title"]["raw"]))
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
if page_exist == False:
page = requests.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=headers, data=json.dumps(data))
if page.status_code == 201:
result = page.json()
logger.info("Article ajoute : {0}".format(result["title"]["raw"]))
for i in comment_post:
data = {"post": result["id"], "content": i["content"], "date": i["date"], "author_name": i["author"]}
page = requests.post("http://{0}/wp-json/wp/v2/comments".format(self._wordpress), auth=self._basic, data=data)
if page.status_code == 201:
logger.info("Commentaire ajoute pour {0}".format(result["title"]["raw"]))
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
if __name__ == '__main__': if __name__ == '__main__':