Merge pull request 'fix-media' (#12) from fix-media into master

Reviewed-on: #12
This commit is contained in:
v4l3n71n 2023-05-23 14:47:07 +00:00
commit 351cb10f01
5 changed files with 390 additions and 135 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
backup*/
wp-navigation
web_scrap.log
*.log
__pycache__/

View File

@ -3,23 +3,49 @@ from requests.auth import HTTPBasicAuth
from getpass import getpass
from urllib.parse import urlparse
from concurrent import futures
from concurrent.futures import as_completed, wait
import argparse, logging, threading
from lib.WPImport import WPimport
from lib.WPExport import WPExport
from lib.WPRemove import WPRemove
def remove(args, basic, logger):
removeWp = WPRemove(basic=basic, wordpress="", logger=logger)
if args.remove == True:
for i in args.wordpress.split(","):
removeWp.setUrl(i)
removeWp.cleanPosts()
removeWp.cleanTags()
removeWp.cleanCategories()
removeWp.cleanMedia()
else:
for i in args.wordpress.split(","):
removeWp.setUrl(i)
if args.posts == True:
removeWp.cleanPosts()
if args.categories == True:
removeWp.cleanCategories()
if args.tags == True:
removeWp.cleanTags()
if args.media == True:
removeWp.cleanMedia()
del removeWp
def download(name_thread, max_thread, url, logger, parser, directory, html, img):
exportWp = WPExport(name="Thread-{0}".format(int(name_thread) + 1), url=url, logger=logger, parser=parser, directory=directory)
webpage = exportWp.getUrlPage(name_thread, max_thread)
if html is False:
exportWp.downloadHTML(webpage)
for i in ["article", "page"]:
for j in ["publications", "principal"]:
if html is False:
exportWp.downloadHTML(webpage[j][i])
if img is False:
exportWp.downloadImg(webpage[j][i])
if args.img is False:
exportWp.downloadImg(webpage)
del exportWp
def importUrl(name_thread, max_thread, canalblog, logger, parser, wordpress, basic, serial):
canalblog = canalblog.split(",")
@ -40,7 +66,10 @@ def importUrl(name_thread, max_thread, canalblog, logger, parser, wordpress, bas
del exportWp
for j in wordpress:
importWp = WPimport(name=name, basic=basic, wordpress=j, logger=logger, parser=parser)
importWp.fromUrl(webpage)
for k in ["article", "page"]:
for l in ["publications", "principal"]:
importWp.fromUrl(webpage[l][k])
del importWp
else:
if len(canalblog) != len(wordpress):
@ -58,7 +87,11 @@ def importUrl(name_thread, max_thread, canalblog, logger, parser, wordpress, bas
webpage = exportWp.getUrlPage(name_thread, max_thread)
del exportWp
importWp = WPimport(name=name, basic=basic, wordpress=wordpress[i], logger=logger, parser=parser)
importWp.fromUrl(webpage)
for k in ["article", "page"]:
for l in ["publications", "principal"]:
importWp.fromUrl(webpage[l][k])
del importWp
@ -96,11 +129,29 @@ if __name__ == '__main__':
import_parser = subparsers.add_parser("import")
import_parser.add_argument("--user", help="wordpress user", required=True)
import_parser.add_argument("--password", help="password wordpress's user", default="")
import_parser.add_argument("--file", help="HTML file", default="")
import_parser.add_argument("--directory", help="HTML directory", default="")
import_parser.add_argument("--canalblog", help="URL Canalblog", default="")
import_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
import_parser.add_argument("--serial", help="Serial execution", action="store_true")
import_parser.add_argument("--remove-all", dest="remove", help="Remove all", action="store_true")
import_parser.add_argument("--remove-posts", help="Remove all posts", dest="posts", action="store_true")
import_parser.add_argument("--remove-categories", help="Remove all categories", dest="categories", action="store_true")
import_parser.add_argument("--remove-tags", help="Remove all tags", dest="tags", action="store_true")
import_parser.add_argument("--remove-media", help="Remove all media", dest="media", action="store_true")
remove_parser = subparsers.add_parser("remove")
remove_parser.add_argument("--user", help="wordpress user", required=True)
remove_parser.add_argument("--password", help="password wordpress's user", default="")
remove_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
remove_parser.add_argument("--all", dest="remove", help="Remove all (posts, media, tags, categories)", action="store_true")
remove_parser.add_argument("--posts", help="Remove all posts", action="store_true")
remove_parser.add_argument("--categories", help="Remove all categories", action="store_true")
remove_parser.add_argument("--tags", help="Remove all tags", action="store_true")
remove_parser.add_argument("--media", help="Remove all media", action="store_true")
export_parser = subparsers.add_parser("export")
@ -142,20 +193,24 @@ if __name__ == '__main__':
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
if args.command == "import":
password = getpass()
if len(password) == 0:
logger.error("No password error !!! ")
exit(1)
if args.command == "import" or args.command == "remove":
password = args.password
if len(args.password) == 0:
password = getpass()
if len(password) == 0:
logger.error("No password error !!! ")
exit(1)
basic = HTTPBasicAuth(args.user, password)
if args.command == "import":
wordpress = args.wordpress.split(",")
importWp = WPimport(basic, "", logger, args.parser)
importWp = WPimport(basic=basic, wordpress="", logger=logger, parser=args.parser)
if len(args.file) > 0:
for i in wordpress:
importWp.setUrl(i)
importWp.fromFile(files=args.file.split(","))
if len(args.directory) > 0:
remove(args, basic, logger)
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [
@ -165,6 +220,7 @@ if __name__ == '__main__':
except Exception as err:
logger.error("Threading error : {0}".format(err))
if len(args.canalblog) > 0:
remove(args, basic, logger)
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [
@ -195,6 +251,8 @@ if __name__ == '__main__':
exportWp.downloadCss()
del exportWp
if args.html is False or args.img is False:
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
@ -204,6 +262,9 @@ if __name__ == '__main__':
]
except Exception as err:
logger.error("Threading error : {0}".format(err))
exit(0)
if args.command == "remove":
remove(args, basic, logger)
exit(0)

View File

@ -13,6 +13,7 @@ class WPExport:
self._dir = directory
self._name = name
self._request = requests.Session()
retries = Retry(total=10,
@ -27,6 +28,7 @@ class WPExport:
# Public method
# Set name
def setName(self, name):
@ -83,9 +85,14 @@ class WPExport:
self._logger.error("{0} : URL did not get due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
webpage = []
webpage = {"principal": {"page":[], "article":[]}, "publications": {"page":[], "article":[]}}
for i in page_url:
section = "publications"
o = urlparse(i)
o = o._replace(scheme="https")
i = o.geturl().replace(":///", "://")
if i == "{0}/".format(self._url):
section = "principal"
try:
page = self._request.get(i)
except Exception as err:
@ -93,19 +100,23 @@ class WPExport:
exit(1)
if page.status_code == 200:
self._logger.info("{0} : page : {1}".format(self._name, i))
if i not in webpage:
webpage.append(i)
if i not in webpage[section]["page"]:
webpage[section]["page"].append(i)
soup = BeautifulSoup(page.text, self._parser)
class_div = pagingfirstline = soup.find_all("div", class_="pagingfirstline")
class_div = soup.find_all("div", class_="pagingfirstline")
if len(class_div) > 0:
pagingfirstline = class_div[0].find_all("a")
if len(pagingfirstline) > 1:
lastpage = pagingfirstline[len(pagingfirstline)-1].get("href", "/")
self._logger.debug("{0} : Last page {1}".format(self._name, lastpage))
element_lastpage = lastpage.split("/")[len(lastpage.split("/"))-1]
number_page = element_lastpage.split("-")[0].split("p")[1]
number_lastpage = int(number_page) / 10
setPageDivided = int(number_lastpage) / max_thread
if setPageDivided > int(setPageDivided):
setPageDivided = setPageDivided + 1
setPagePart = setPageDivided * (index_thread + 1)
firstPagePart = (setPagePart - setPageDivided)
@ -113,36 +124,38 @@ class WPExport:
self._logger.debug("{0} : First range : {1}".format(self._name, int(firstPagePart)))
self._logger.debug("{0} : Last range : {1}".format(self._name, int(setPagePart)))
for j in range(int(firstPagePart),int(setPagePart)):
for j in range(int(firstPagePart),int(setPagePart)+1):
paging = j * 10
categorie = urlparse(i).path.split("/")
url_paging = "{0}/archives/p{1}-10.html".format(self._url, paging)
if len(categorie) > 2:
url_paging = "{0}/archives/{1}/p{2}-10.html".format(self._url, categorie[2], paging)
self._logger.info("{0} : {1}".format(self._name, url_paging))
if url_paging not in webpage:
webpage.append(url_paging)
if url_paging not in webpage[section]["page"]:
webpage[section]["page"].append(url_paging)
page = self._request.get(url_paging)
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
h2 = soup.find_all("h2")
self._logger.debug("{0} : {1} H2 : {2}".format(self._name, url_paging, h2))
for title in h2:
self._logger.debug("{0} : {1} a : {2}".format(self._name, url_paging, title.find_all("a")))
href = title.find_all("a")[0].get("href", "/")
if href not in webpage:
if href not in webpage[section]["article"]:
try:
o = urlparse(href)
o = o._replace(scheme="https").geturl()
except Exception as err:
self._logger.error("parsing error : {0}".format(err))
exit(1)
webpage.append(o)
webpage[section]["article"].append(o)
else:
self._logger.error("{0} : web didn't get due status code : {1}".format(self._name, page.status_code))
self._logger.debug(page.content)
self._logger.debug("{0} : {1}".format(self._name, page.content))
return webpage
# Private method
#
# Create path

View File

@ -5,6 +5,7 @@ from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from slugify import slugify
class WPimport:
# Constructor
@ -14,11 +15,11 @@ class WPimport:
self._wordpress = wordpress
self._logger = logger
self._parser = parser
self._headers_json = {'Content-Type': 'application/json', 'Accept':'application/json'}
self._headers_json = {'Content-Type': 'application/json; charset=utf-8', 'Accept':'application/json'}
self._request = requests.Session()
retries = Retry(total=10,
retries = Retry(connect=10, read=10, redirect=5,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request.mount('http://', HTTPAdapter(max_retries=retries))
@ -26,7 +27,7 @@ class WPimport:
# Destructor
def __del__(self):
self._logger.info("{0} : Import finished for {1}".format(self._name, self._wordpress))
print("{0} : Import finished for {1}".format(self._name, self._wordpress))
# Public method
@ -86,8 +87,26 @@ class WPimport:
else:
self._addOrUpdateFeaturedMedia(soup)
# Private method
## replace caracter
def _replaceCaracter(self, title_rendered):
list_replace = {'’': "'", '–': '-', '…': '...', '« ': '"', ' »': '"', '« ': '"', ' »': '"', '’': "'", '"‘': "'"}
for old, new in list_replace.items():
title_rendered = title_rendered.replace(old, new)
return title_rendered
## remove space
def _removeSpace(self, title):
if title[len(title)-1] == " ":
title = title[:-1]
if title[0] == " ":
title = title[1:]
return title
## Get all files
def _getFiles(self, item):
@ -192,54 +211,62 @@ class WPimport:
## Add or update img
def _addOrUpdateMedia(self, href_img, page):
media_authorized = ["png", "jpg", "jpeg", "svg", "gif"]
media = {"id":"", "rendered":""}
split_fileimg = href_img.split("/")
img_name = split_fileimg[len(split_fileimg)-1]
self._logger.debug("{0} : Search for image {1} with URL {2}".format(self._name, img_name, "http://{0}/wp-json/wp/v2/media".format(self._wordpress)))
params = { "search": img_name}
try:
r = self._request.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for search media : {1}".format(self._name, err))
exit(1)
self._logger.debug("{0} : Search for image {1} and his status code {2}".format(self._name, img_name, r.status_code))
if r.status_code == 200:
res = r.json()
if len(res) > 0:
params = {"force":1}
img_type_file = img_name.split(".")[len(img_name.split("."))-1]
is_img = True
if img_type_file not in media_authorized:
self._logger.error("{0} : Element {1} is not image".format(self._name,img_name))
is_img = False
if is_img is True:
self._logger.debug("{0} : Search for image {1} with URL {2}".format(self._name, img_name, "http://{0}/wp-json/wp/v2/media".format(self._wordpress)))
params = { "search": img_name}
try:
r = self._request.get("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for search media : {1}".format(self._name, err))
exit(1)
self._logger.debug("{0} : Search for image {1} and his status code {2}".format(self._name, img_name, r.status_code))
if r.status_code == 200:
res = r.json()
self._logger.debug("{0} : Number of image in search : {1}".format(self._name, len(res)))
if len(res) > 0:
params = {"force":1}
try:
r = self._request.delete("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, res[0]["id"]), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} Connection error for delete image : {1}".format(self._name, err))
exit(1)
if r.status_code == 200:
self._logger.info("{0} : Image removed {1}".format(self._name, img_name))
else:
self._logger.error("{0} : Image {1} not removed due status code : {2}".format(self._name, img_name, r.status_code))
self._logger.debug("{0} : {1}".format(self._name, r.content))
data = page.content
img_type = "image/{0}".format(img_type_file)
if img_type_file == "jpg":
img_type = "image/jpeg"
headers={ 'Content-Type': img_type,'Content-Disposition' : 'attachment; filename={0}'.format(img_name)}
try:
r = self._request.delete("http://{0}/wp-json/wp/v2/media/{1}".format(self._wordpress, res[0]["id"]), auth=self._basic, params=params)
r = self._request.post("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, headers=headers, data=data)
except Exception as err:
self._logger.error("{0} Connection error for delete image : {1}".format(self._name, err))
self._logger.error("{0} : Connection error for add image : {1}".format(self._name, err))
exit(1)
if r.status_code == 200:
self._logger.info("{0} : Image removed {1}".format(self._name, img_name))
if r.status_code == 201:
self._logger.info("{0} : Image added {1}".format(self._name, img_name))
res = r.json()
media["id"] = res["id"]
media["rendered"] = res["guid"]["rendered"]
else:
self._logger.error("{0} : Image not removed due status code : {1}".format(self._name, r.status_code))
self._logger.error("{0} : Image {1}.{2} not added due status code : {3}".format(self._name, img_name, img_type, r.status_code))
self._logger.debug("{0} : {1}".format(self._name, r.content))
data = page.content
img_type = "image/png"
if img_name.split(".")[1] == "jpg" or img_name.split(".")[1] == "jpeg":
img_type = "image/jpg"
headers={ 'Content-Type': img_type,'Content-Disposition' : 'attachment; filename={0}'.format(img_name)}
try:
r = self._request.post("http://{0}/wp-json/wp/v2/media".format(self._wordpress), auth=self._basic, headers=headers, data=data)
except Exception as err:
self._logger.error("{0} : Connection error for add image : {1}".format(self._name, err))
exit(1)
if r.status_code == 201:
self._logger.info("{0} : Image added {1}".format(self._name, img_name))
res = r.json()
media["id"] = res["id"]
media["rendered"] = res["guid"]["rendered"]
else:
self._logger.error("{0} : Image not added due status code : {1}".format(self._name, r.status_code))
self._logger.debug(r.content)
else:
self._logger.error("{0} : Connection error for search image with status code : {1}".format(self._name, r.status_code))
self._logger.debug("{0} : {1}".format(self._name, r.content))
self._logger.error("{0} : Connection error for search image with status code : {1}".format(self._name, r.status_code))
self._logger.debug("{0} : {1}".format(self._name, r.content))
return media
@ -428,33 +455,56 @@ class WPimport:
for i in liste:
for j in element[i]:
element_exist = False
try:
params = {"params":j}
page = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for {1} : {2}".format(self._name, i, err))
exit(1)
if page.status_code == 200:
element_exist = True
result = page.json()
listelement[i].append(result[0]["id"])
else:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if element_exist is False:
data = {"name": j}
self._logger.debug("{0} : URL : {1} ".format("http://{1}/wp-json/wp/v2/{2}".format(self._name, self._wordpress, i)))
self._logger.debug("{0} : data : {1}".format(self._name, data))
self._logger.debug("{0} : headers : {1}".format(self._name, self._headers_form))
title_element = self._removeSpace(j)
for index in range(1,10):
self._logger.info("{0} : search {1} with index {2} : {3}".format(self._name, i, index, title_element))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, headers=self._headers_json, data=data)
params = {"search":title_element, "per_page":"100", "page":index}
page = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, params=params)
except ConnectionError as err:
self._logger.error("{0} : Connection error for {1} : {2}".format(self._name, i, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for {1} : {2}".format(self._name, i, err))
if page.status_code == 200:
result = page.json()
self._logger.debug("{0} : content {3} {2} : {1}".format(self._name, result, title_element, i))
if len(result) > 0:
for k in result:
title_rendered = k["name"]
self._logger.debug("{0} : content {2} : {1}".format(self._name, title_rendered, i))
self._logger.debug("{0} : size of content {3} : {2} - {1}".format(self._name, len(title_rendered), len(title_element), i))
if len(title_element) != len(title_rendered):
title_rendered = self._replaceCaracter(title_rendered)
if title_element == title_rendered:
self._logger.info("{0} : {1} found : {2}".format(self._name, i, title_rendered))
element_exist = True
listelement[i].append(k["id"])
else:
break
if page.status_code == 400:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
break
else:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
self._logger.debug("{0} : Element {3} {2} is {1}".format(self._name, element_exist, title_element, i))
if element_exist == False:
data = {"name": title_element}
self._logger.info("{0} : Create {1} : {2}".format(self._name, i, title_element))
self._logger.debug("{0} : Data : {1}".format(self._name, data))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for post {1} : {2}".format(self._name, i, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for post {1} : {2}".format(self._name, i, err))
if page.status_code == 201:
self._logger.info("{0} : {1} created : {2}".format(self._name, i, j))
result = page.json()
listelement[i].append(result["id"])
else:
@ -477,61 +527,117 @@ class WPimport:
bodyhtml = bodyhtml.replace(i["old_src"], o.path)
hour = articledate[0].text
time = dateheader[0].text.split(" ")
self._logger.debug("{0} : Title post : |{1}|".format(self._name, title))
title = self._removeSpace(title)
self._logger.debug("{0} : Rendered Title post : |{1}|".format(self._name, title))
data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]}
params = {"search":author}
self._logger.debug("{0} : Data for post : |{1}| : {2}" .format(self._name, title, data))
params = {"search":author, "per_page":100}
try:
page = self._request.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.info("{0} : Search author : {1}".format(self._name, author))
page = self._request.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, headers=self._headers_json, params=params)
self._logger.debug("{0} : End Search author : {1}".format(self._name, author))
self._logger.debug("{0} : Debug requests : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get author : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get author : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.info("{0} : Get author id : {1}".format(self._name, result))
result = page.json()
data["author"] = result[0]["id"]
for a in result:
data["author"] = a["id"]
else:
self._logger.error("{0} : Connection error with status code for get author : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(page.content))
page_is_exist = False
params = {"search":title}
try:
page = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for search post : {1}".format(self._name, err))
exit(1)
page_exist = True
headers = {'Content-Type': 'application/json', 'Accept':'application/json'}
if page.status_code == 200:
result = page.json()
if len(result) == 0:
page_exist = False
else:
self._logger.info("{0} : Page {1} already exist and going to update".format(self._name, title))
post_id = result[0]["id"]
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=headers, data=json.dumps(data))
except Exception as err:
self._logger.error("{0} : Connection error for update post : {1}".format(self._name, err))
exit(1)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Post updated : {1}".format(self._name, result["title"]["raw"]))
self._addOrUpdateComment(result["id"], comment_post, result["title"]["raw"])
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
else:
self._logger.error("{0} : Connection for update post error with status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page_exist == False:
for index in range(1,10):
params = {"search": title, "per_page":100, "page": index}
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=headers, data=json.dumps(data))
self._logger.info("{0} : Search post with index {2} : {1}".format(self._name, title, index))
page = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params, headers=self._headers_json)
except ConnectionError as err:
self._logger.error("{0} : Connection error for search post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for search post : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.debug("{0} : Encoding : {1}".format(self._name, page.encoding))
page.encoding = "utf-8"
result = page.json()
if len(result) == 0:
break
self._logger.info("{0} : Number result posts : {1}".format(self._name, len(result)))
count = 0
for i in result:
title_rendered = i["title"]["rendered"]
self._logger.info("{0} : Search title posts for |{2}| : |{1}|".format(self._name, title_rendered, title))
if len(title_rendered) != len(title):
title_rendered = self._replaceCaracter(title_rendered)
self._logger.debug("{0} : Search title posts for |{2}| : |{1}|".format(self._name, title_rendered, title))
self._logger.debug("{0} : SIze of title : {1} - {2}".format(self._name, len(title), len(title_rendered)))
if title_rendered == title:
page_is_exist = True
post_id = i["id"]
count = count + 1
if count > 1:
self._logger.info("{0} : Page {1} is double and going to delete".format(self._name, title))
try:
params = {"force":1}
page = self._request.delete("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, params=params)
except ConnectionError as err:
self._logger.error("{0} : Connection error for deleted post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for deleted post : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.info("{0} : Post deleted : {1}".format(self._name, title))
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
else:
self._logger.debug("{0} : Data for post to update : {1}".format(self._name, i))
self._logger.info("{0} : Page {1} already exist and going to update".format(self._name, title))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for update post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for update post : {1}".format(self._name, err))
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Post updated : {1}".format(self._name, title))
self._addOrUpdateComment(result["id"], comment_post, result["title"]["raw"])
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page.status_code == 400:
self._logger.error("{0} : Connection for update post unauthorized : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
break
else:
self._logger.error("{0} : Connection for update post error with status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page_is_exist is False:
try:
self._logger.info("{0} : Creating posts : {1}".format(self._name, data["title"]))
page = self._request.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for create post : {1}".format(self._name, err))
if page.status_code == 201:
result = page.json()
self._logger.info("{0} : Post added : {1}".format(self._name, result["title"]["raw"]))

75
lib/WPRemove.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class WPRemove:
# Constructor
def __init__(self, name="Thread-0", basic=None, wordpress="", logger=None):
self._name = name
self._basic = basic
self._wordpress = wordpress
self._logger = logger
self._headers_json = {'Content-Type': 'application/json', 'Accept':'application/json'}
self._request = requests.Session()
retries = Retry(connect=10, read=10, redirect=5,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request.mount('http://', HTTPAdapter(max_retries=retries))
# Destructor
def __del__(self):
print("{0} : Import finished for {1}".format(self._name, self._wordpress))
# Public method
def setUrl(self, wordpress):
self._wordpress = wordpress
def cleanPosts(self):
self._removeAll("posts")
def cleanTags(self):
self._removeAll("tags")
def cleanCategories(self):
self._removeAll("categories")
def cleanMedia(self):
self._removeAll("media")
# Private method
def _removeAll(self, composant):
params = {"per_page":100}
try:
self._logger.info("{0} : List {2} to remove for url : {1}".format(self._name, self._wordpress, composant))
r = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, composant), auth=self._basic, params=params, headers=self._headers_json)
except Exception as err:
self._logger.error("{0} : Connection error for list {1} to remove : {2}".format(self._name, composant, err))
if r.status_code == 200:
result = r.json()
if len(result) > 0:
for i in result:
self._logger.info("{0} : Remove {2} for url {1} : {3}".format(self._name, self._wordpress, composant, i["title"]["rendered"]))
params = {"force":1}
try:
r = self._request.delete("http://{0}/wp-json/wp/v2/{1}/{2}".format(self._wordpress, composant, i["id"]), auth=self._basic, headers=self._headers_json , params=params)
if r.status_code == 200:
self._logger.info("{0} : Post removed for URL {1} {2} : {3}".format(self._name, self._wordpress, composant, i["title"]["rendered"]))
else:
self._logger.error("{0} : Connection error for post {1} {2} {3} with status code {4}".format(self._name, self._wordpress, composant, i["title"]["rendered"], r.status_code))
except Exception as err:
self._logger.error("{0} : Connection error for {1} remove : {2}".format(self._name, composant, err))
exit(1)
self._removeAll(composant)
else:
self._logger.error("{0} : Error for list to remove {1} due status code {2}".format(self._name, composant, r.status_code))
self._logger.debug("{0} : Content error for {1} : {2}".format(self._name, composant, r.content))