10 Commits

Author SHA1 Message Date
351cb10f01 Merge pull request 'fix-media' (#12) from fix-media into master
Reviewed-on: #12
2023-05-23 14:47:07 +00:00
5c5dc707f5 fix headers search author 2023-05-23 16:46:07 +02:00
f69298179a reduce line code and add private method 2023-05-23 13:45:59 +02:00
d3ec7d147d loop replace 2023-05-23 11:22:37 +02:00
0fc6e78a18 fix title rendered 2023-05-23 00:02:51 +02:00
3718b807ba more message debug 2023-05-21 21:14:36 +02:00
75772ba7f0 remove doublon 2023-05-21 21:12:00 +02:00
769b7f43fc fix add or update post 2023-05-18 00:24:41 +02:00
ba42d56be1 fix webpage 2023-05-16 00:15:16 +02:00
d18f4e1579 Add clean 2023-05-15 23:51:45 +02:00
3 changed files with 184 additions and 88 deletions

View File

@@ -12,7 +12,7 @@ from lib.WPRemove import WPRemove
def remove(args, basic, logger): def remove(args, basic, logger):
removeWp = WPRemove(basic=basic, wordpress="", logger=logger) removeWp = WPRemove(basic=basic, wordpress="", logger=logger)
if args.remove: if args.remove == True:
for i in args.wordpress.split(","): for i in args.wordpress.split(","):
removeWp.setUrl(i) removeWp.setUrl(i)
removeWp.cleanPosts() removeWp.cleanPosts()
@@ -22,13 +22,13 @@ def remove(args, basic, logger):
else: else:
for i in args.wordpress.split(","): for i in args.wordpress.split(","):
removeWp.setUrl(i) removeWp.setUrl(i)
if args.posts: if args.posts == True:
removeWp.cleanPosts() removeWp.cleanPosts()
if args.categories: if args.categories == True:
removeWp.cleanCategories() removeWp.cleanCategories()
if args.tags: if args.tags == True:
removeWp.cleanTags() removeWp.cleanTags()
if args.media: if args.media == True:
removeWp.cleanMedia() removeWp.cleanMedia()
del removeWp del removeWp
@@ -136,10 +136,10 @@ if __name__ == '__main__':
import_parser.add_argument("--wordpress", help="URL Wordpress", required=True) import_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
import_parser.add_argument("--serial", help="Serial execution", action="store_true") import_parser.add_argument("--serial", help="Serial execution", action="store_true")
import_parser.add_argument("--remove-all", dest="remove", help="Remove all", action="store_true") import_parser.add_argument("--remove-all", dest="remove", help="Remove all", action="store_true")
remove_parser.add_argument("--remove-posts", help="Remove all posts", dest="posts", action="store_true") import_parser.add_argument("--remove-posts", help="Remove all posts", dest="posts", action="store_true")
remove_parser.add_argument("--remove-categories", help="Remove all categories", dest="categories", action="store_true") import_parser.add_argument("--remove-categories", help="Remove all categories", dest="categories", action="store_true")
remove_parser.add_argument("--remove-tags", help="Remove all tags", dest="tags", action="store_true") import_parser.add_argument("--remove-tags", help="Remove all tags", dest="tags", action="store_true")
remove_parser.add_argument("--remove-media", help="Remove all media", dest="media", action="store_true") import_parser.add_argument("--remove-media", help="Remove all media", dest="media", action="store_true")
remove_parser = subparsers.add_parser("remove") remove_parser = subparsers.add_parser("remove")
@@ -193,7 +193,7 @@ if __name__ == '__main__':
fileHandler.setFormatter(formatter) fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler) logger.addHandler(fileHandler)
if args.command == "import": if args.command == "import" or args.command == "remove":
password = args.password password = args.password
if len(args.password) == 0: if len(args.password) == 0:
password = getpass() password = getpass()
@@ -202,6 +202,7 @@ if __name__ == '__main__':
exit(1) exit(1)
basic = HTTPBasicAuth(args.user, password) basic = HTTPBasicAuth(args.user, password)
if args.command == "import":
wordpress = args.wordpress.split(",") wordpress = args.wordpress.split(",")
importWp = WPimport(basic=basic, wordpress="", logger=logger, parser=args.parser) importWp = WPimport(basic=basic, wordpress="", logger=logger, parser=args.parser)
if len(args.file) > 0: if len(args.file) > 0:
@@ -261,16 +262,9 @@ if __name__ == '__main__':
] ]
except Exception as err: except Exception as err:
logger.error("Threading error : {0}".format(err)) logger.error("Threading error : {0}".format(err))
exit(0)
if args.command == "remove":
password = args.password if args.command == "remove":
if len(args.password) == 0: remove(args, basic, logger)
password = getpass()
if len(password) == 0:
logger.error("No password error !!! ")
exit(1)
basic = HTTPBasicAuth(args.user, password)
remove(args, basic, logger)
exit(0) exit(0)

View File

@@ -103,16 +103,20 @@ class WPExport:
if i not in webpage[section]["page"]: if i not in webpage[section]["page"]:
webpage[section]["page"].append(i) webpage[section]["page"].append(i)
soup = BeautifulSoup(page.text, self._parser) soup = BeautifulSoup(page.text, self._parser)
class_div = pagingfirstline = soup.find_all("div", class_="pagingfirstline") class_div = soup.find_all("div", class_="pagingfirstline")
if len(class_div) > 0: if len(class_div) > 0:
pagingfirstline = class_div[0].find_all("a") pagingfirstline = class_div[0].find_all("a")
if len(pagingfirstline) > 1: if len(pagingfirstline) > 1:
lastpage = pagingfirstline[len(pagingfirstline)-1].get("href", "/") lastpage = pagingfirstline[len(pagingfirstline)-1].get("href", "/")
self._logger.debug("{0} : Last page {1}".format(self._name, lastpage))
element_lastpage = lastpage.split("/")[len(lastpage.split("/"))-1] element_lastpage = lastpage.split("/")[len(lastpage.split("/"))-1]
number_page = element_lastpage.split("-")[0].split("p")[1] number_page = element_lastpage.split("-")[0].split("p")[1]
number_lastpage = int(number_page) / 10 number_lastpage = int(number_page) / 10
setPageDivided = int(number_lastpage) / max_thread setPageDivided = int(number_lastpage) / max_thread
if setPageDivided > int(setPageDivided):
setPageDivided = setPageDivided + 1
setPagePart = setPageDivided * (index_thread + 1) setPagePart = setPageDivided * (index_thread + 1)
firstPagePart = (setPagePart - setPageDivided) firstPagePart = (setPagePart - setPageDivided)
@@ -120,7 +124,7 @@ class WPExport:
self._logger.debug("{0} : First range : {1}".format(self._name, int(firstPagePart))) self._logger.debug("{0} : First range : {1}".format(self._name, int(firstPagePart)))
self._logger.debug("{0} : Last range : {1}".format(self._name, int(setPagePart))) self._logger.debug("{0} : Last range : {1}".format(self._name, int(setPagePart)))
for j in range(int(firstPagePart),int(setPagePart)): for j in range(int(firstPagePart),int(setPagePart)+1):
paging = j * 10 paging = j * 10
categorie = urlparse(i).path.split("/") categorie = urlparse(i).path.split("/")
url_paging = "{0}/archives/p{1}-10.html".format(self._url, paging) url_paging = "{0}/archives/p{1}-10.html".format(self._url, paging)
@@ -133,7 +137,9 @@ class WPExport:
if page.status_code == 200: if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser) soup = BeautifulSoup(page.text, self._parser)
h2 = soup.find_all("h2") h2 = soup.find_all("h2")
self._logger.debug("{0} : {1} H2 : {2}".format(self._name, url_paging, h2))
for title in h2: for title in h2:
self._logger.debug("{0} : {1} a : {2}".format(self._name, url_paging, title.find_all("a")))
href = title.find_all("a")[0].get("href", "/") href = title.find_all("a")[0].get("href", "/")
if href not in webpage[section]["article"]: if href not in webpage[section]["article"]:
try: try:

View File

@@ -5,6 +5,7 @@ from urllib.parse import urlparse
import requests, os, logging, re, json import requests, os, logging, re, json
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry from requests.packages.urllib3.util.retry import Retry
from slugify import slugify
class WPimport: class WPimport:
# Constructor # Constructor
@@ -14,7 +15,7 @@ class WPimport:
self._wordpress = wordpress self._wordpress = wordpress
self._logger = logger self._logger = logger
self._parser = parser self._parser = parser
self._headers_json = {'Content-Type': 'application/json', 'Accept':'application/json'} self._headers_json = {'Content-Type': 'application/json; charset=utf-8', 'Accept':'application/json'}
self._request = requests.Session() self._request = requests.Session()
@@ -89,6 +90,23 @@ class WPimport:
# Private method # Private method
## replace caracter
def _replaceCaracter(self, title_rendered):
list_replace = {'’': "'", '–': '-', '…': '...', '« ': '"', ' »': '"', '« ': '"', ' »': '"', '’': "'", '"‘': "'"}
for old, new in list_replace.items():
title_rendered = title_rendered.replace(old, new)
return title_rendered
## remove space
def _removeSpace(self, title):
if title[len(title)-1] == " ":
title = title[:-1]
if title[0] == " ":
title = title[1:]
return title
## Get all files ## Get all files
def _getFiles(self, item): def _getFiles(self, item):
@@ -193,7 +211,7 @@ class WPimport:
## Add or update img ## Add or update img
def _addOrUpdateMedia(self, href_img, page): def _addOrUpdateMedia(self, href_img, page):
media_authorized = ["png", "jpg", "jpeg", "svg"] media_authorized = ["png", "jpg", "jpeg", "svg", "gif"]
media = {"id":"", "rendered":""} media = {"id":"", "rendered":""}
split_fileimg = href_img.split("/") split_fileimg = href_img.split("/")
img_name = split_fileimg[len(split_fileimg)-1] img_name = split_fileimg[len(split_fileimg)-1]
@@ -437,33 +455,56 @@ class WPimport:
for i in liste: for i in liste:
for j in element[i]: for j in element[i]:
element_exist = False element_exist = False
try: title_element = self._removeSpace(j)
params = {"params":j} for index in range(1,10):
page = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, params=params) self._logger.info("{0} : search {1} with index {2} : {3}".format(self._name, i, index, title_element))
except Exception as err:
self._logger.error("{0} : Connection error for {1} : {2}".format(self._name, i, err))
exit(1)
if page.status_code == 200:
element_exist = True
result = page.json()
listelement[i].append(result[0]["id"])
else:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if element_exist is False:
data = {"name": j}
self._logger.debug("{0} : URL : {1} ".format("http://{1}/wp-json/wp/v2/{2}".format(self._name, self._wordpress, i)))
self._logger.debug("{0} : data : {1}".format(self._name, data))
self._logger.debug("{0} : headers : {1}".format(self._name, self._headers_form))
try: try:
page = self._request.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, headers=self._headers_json, data=data) params = {"search":title_element, "per_page":"100", "page":index}
page = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, params=params)
except ConnectionError as err:
self._logger.error("{0} : Connection error for {1} : {2}".format(self._name, i, err))
exit(1)
except Exception as err: except Exception as err:
self._logger.error("{0} : Exception error for {1} : {2}".format(self._name, i, err))
if page.status_code == 200:
result = page.json()
self._logger.debug("{0} : content {3} {2} : {1}".format(self._name, result, title_element, i))
if len(result) > 0:
for k in result:
title_rendered = k["name"]
self._logger.debug("{0} : content {2} : {1}".format(self._name, title_rendered, i))
self._logger.debug("{0} : size of content {3} : {2} - {1}".format(self._name, len(title_rendered), len(title_element), i))
if len(title_element) != len(title_rendered):
title_rendered = self._replaceCaracter(title_rendered)
if title_element == title_rendered:
self._logger.info("{0} : {1} found : {2}".format(self._name, i, title_rendered))
element_exist = True
listelement[i].append(k["id"])
else:
break
if page.status_code == 400:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
break
else:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
self._logger.debug("{0} : Element {3} {2} is {1}".format(self._name, element_exist, title_element, i))
if element_exist == False:
data = {"name": title_element}
self._logger.info("{0} : Create {1} : {2}".format(self._name, i, title_element))
self._logger.debug("{0} : Data : {1}".format(self._name, data))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for post {1} : {2}".format(self._name, i, err)) self._logger.error("{0} : Connection error for post {1} : {2}".format(self._name, i, err))
exit(1) exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for post {1} : {2}".format(self._name, i, err))
if page.status_code == 201: if page.status_code == 201:
self._logger.info("{0} : {1} created : {2}".format(self._name, i, j))
result = page.json() result = page.json()
listelement[i].append(result["id"]) listelement[i].append(result["id"])
else: else:
@@ -486,62 +527,117 @@ class WPimport:
bodyhtml = bodyhtml.replace(i["old_src"], o.path) bodyhtml = bodyhtml.replace(i["old_src"], o.path)
hour = articledate[0].text hour = articledate[0].text
time = dateheader[0].text.split(" ") time = dateheader[0].text.split(" ")
self._logger.debug("{0} : Title post : |{1}|".format(self._name, title))
title = self._removeSpace(title)
self._logger.debug("{0} : Rendered Title post : |{1}|".format(self._name, title))
data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]} data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]}
params = {"search":author} self._logger.debug("{0} : Data for post : |{1}| : {2}" .format(self._name, title, data))
params = {"search":author, "per_page":100}
try: try:
self._logger.info("{0} : Get author : {1}".format(self._name, author)) self._logger.info("{0} : Search author : {1}".format(self._name, author))
page = self._request.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params) page = self._request.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, headers=self._headers_json, params=params)
except Exception as err: self._logger.debug("{0} : End Search author : {1}".format(self._name, author))
self._logger.debug("{0} : Debug requests : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get author : {1}".format(self._name, err)) self._logger.error("{0} : Connection error for get author : {1}".format(self._name, err))
exit(1) exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get author : {1}".format(self._name, err))
if page.status_code == 200: if page.status_code == 200:
self._logger.info("{0} : Get author id : {1}".format(self._name, result))
result = page.json() result = page.json()
data["author"] = result[0]["id"] for a in result:
data["author"] = a["id"]
else: else:
self._logger.error("{0} : Connection error with status code for get author : {1}".format(self._name, page.status_code)) self._logger.error("{0} : Connection error with status code for get author : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(page.content)) self._logger.debug("{0} : {1}".format(page.content))
page_is_exist = False
params = {"search": title} for index in range(1,10):
try: params = {"search": title, "per_page":100, "page": index}
page = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for search post : {1}".format(self._name, err))
exit(1)
page_is_exist = True
if page.status_code == 200:
result = page.json()
if len(result) == 0:
page_is_exist = False
else:
for i in result:
if i["title"]["rendered"] == title:
post_id = i["id"]
self._logger.debug("{0} : Data for post to update : {1}".format(self._name, result[0]))
self._logger.info("{0} : Page {1} already exist and going to update".format(self._name, title))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except Exception as err:
self._logger.error("{0} : Connection error for update post : {1}".format(self._name, err))
exit(1)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Post updated : {1}".format(self._name, title))
self._addOrUpdateComment(result["id"], comment_post, result["title"]["raw"])
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
else:
self._logger.error("{0} : Connection for update post error with status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page_is_exist == False:
try: try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data)) self._logger.info("{0} : Search post with index {2} : {1}".format(self._name, title, index))
page = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params, headers=self._headers_json)
except ConnectionError as err:
self._logger.error("{0} : Connection error for search post : {1}".format(self._name, err))
exit(1)
except Exception as err: except Exception as err:
self._logger.error("{0} : Exception error for search post : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.debug("{0} : Encoding : {1}".format(self._name, page.encoding))
page.encoding = "utf-8"
result = page.json()
if len(result) == 0:
break
self._logger.info("{0} : Number result posts : {1}".format(self._name, len(result)))
count = 0
for i in result:
title_rendered = i["title"]["rendered"]
self._logger.info("{0} : Search title posts for |{2}| : |{1}|".format(self._name, title_rendered, title))
if len(title_rendered) != len(title):
title_rendered = self._replaceCaracter(title_rendered)
self._logger.debug("{0} : Search title posts for |{2}| : |{1}|".format(self._name, title_rendered, title))
self._logger.debug("{0} : SIze of title : {1} - {2}".format(self._name, len(title), len(title_rendered)))
if title_rendered == title:
page_is_exist = True
post_id = i["id"]
count = count + 1
if count > 1:
self._logger.info("{0} : Page {1} is double and going to delete".format(self._name, title))
try:
params = {"force":1}
page = self._request.delete("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, params=params)
except ConnectionError as err:
self._logger.error("{0} : Connection error for deleted post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for deleted post : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.info("{0} : Post deleted : {1}".format(self._name, title))
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
else:
self._logger.debug("{0} : Data for post to update : {1}".format(self._name, i))
self._logger.info("{0} : Page {1} already exist and going to update".format(self._name, title))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for update post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for update post : {1}".format(self._name, err))
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Post updated : {1}".format(self._name, title))
self._addOrUpdateComment(result["id"], comment_post, result["title"]["raw"])
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page.status_code == 400:
self._logger.error("{0} : Connection for update post unauthorized : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
break
else:
self._logger.error("{0} : Connection for update post error with status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page_is_exist is False:
try:
self._logger.info("{0} : Creating posts : {1}".format(self._name, data["title"]))
page = self._request.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create post : {1}".format(self._name, err)) self._logger.error("{0} : Connection error for create post : {1}".format(self._name, err))
exit(1) exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for create post : {1}".format(self._name, err))
if page.status_code == 201: if page.status_code == 201:
result = page.json() result = page.json()
self._logger.info("{0} : Post added : {1}".format(self._name, result["title"]["raw"])) self._logger.info("{0} : Post added : {1}".format(self._name, result["title"]["raw"]))