14 Commits

Author SHA1 Message Date
351cb10f01 Merge pull request 'fix-media' (#12) from fix-media into master
Reviewed-on: #12
2023-05-23 14:47:07 +00:00
5c5dc707f5 fix headers search author 2023-05-23 16:46:07 +02:00
f69298179a reduce line code and add private method 2023-05-23 13:45:59 +02:00
d3ec7d147d loop replace 2023-05-23 11:22:37 +02:00
0fc6e78a18 fix title rendered 2023-05-23 00:02:51 +02:00
3718b807ba more message debug 2023-05-21 21:14:36 +02:00
75772ba7f0 remove doublon 2023-05-21 21:12:00 +02:00
769b7f43fc fix add or update post 2023-05-18 00:24:41 +02:00
ba42d56be1 fix webpage 2023-05-16 00:15:16 +02:00
d18f4e1579 Add clean 2023-05-15 23:51:45 +02:00
8bdaea3910 add remove command 2023-05-15 23:42:18 +02:00
f3cb5c4069 fix parameters 2023-05-15 23:22:41 +02:00
cfb24bed0e add remove parameters 2023-05-15 23:21:25 +02:00
ee8674fd59 add remove class 2023-05-15 23:13:55 +02:00
4 changed files with 291 additions and 107 deletions

View File

@@ -8,7 +8,29 @@ from concurrent.futures import as_completed, wait
import argparse, logging, threading
from lib.WPImport import WPimport
from lib.WPExport import WPExport
from lib.WPRemove import WPRemove
def remove(args, basic, logger):
removeWp = WPRemove(basic=basic, wordpress="", logger=logger)
if args.remove == True:
for i in args.wordpress.split(","):
removeWp.setUrl(i)
removeWp.cleanPosts()
removeWp.cleanTags()
removeWp.cleanCategories()
removeWp.cleanMedia()
else:
for i in args.wordpress.split(","):
removeWp.setUrl(i)
if args.posts == True:
removeWp.cleanPosts()
if args.categories == True:
removeWp.cleanCategories()
if args.tags == True:
removeWp.cleanTags()
if args.media == True:
removeWp.cleanMedia()
del removeWp
def download(name_thread, max_thread, url, logger, parser, directory, html, img):
@@ -113,7 +135,22 @@ if __name__ == '__main__':
import_parser.add_argument("--canalblog", help="URL Canalblog", default="")
import_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
import_parser.add_argument("--serial", help="Serial execution", action="store_true")
import_parser.add_argument("--remove", help="Remove all articles", action="store_true")
import_parser.add_argument("--remove-all", dest="remove", help="Remove all", action="store_true")
import_parser.add_argument("--remove-posts", help="Remove all posts", dest="posts", action="store_true")
import_parser.add_argument("--remove-categories", help="Remove all categories", dest="categories", action="store_true")
import_parser.add_argument("--remove-tags", help="Remove all tags", dest="tags", action="store_true")
import_parser.add_argument("--remove-media", help="Remove all media", dest="media", action="store_true")
remove_parser = subparsers.add_parser("remove")
remove_parser.add_argument("--user", help="wordpress user", required=True)
remove_parser.add_argument("--password", help="password wordpress's user", default="")
remove_parser.add_argument("--wordpress", help="URL Wordpress", required=True)
remove_parser.add_argument("--all", dest="remove", help="Remove all (posts, media, tags, categories)", action="store_true")
remove_parser.add_argument("--posts", help="Remove all posts", action="store_true")
remove_parser.add_argument("--categories", help="Remove all categories", action="store_true")
remove_parser.add_argument("--tags", help="Remove all tags", action="store_true")
remove_parser.add_argument("--media", help="Remove all media", action="store_true")
@@ -156,7 +193,7 @@ if __name__ == '__main__':
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
if args.command == "import":
if args.command == "import" or args.command == "remove":
password = args.password
if len(args.password) == 0:
password = getpass()
@@ -165,6 +202,7 @@ if __name__ == '__main__':
exit(1)
basic = HTTPBasicAuth(args.user, password)
if args.command == "import":
wordpress = args.wordpress.split(",")
importWp = WPimport(basic=basic, wordpress="", logger=logger, parser=args.parser)
if len(args.file) > 0:
@@ -172,10 +210,7 @@ if __name__ == '__main__':
importWp.setUrl(i)
importWp.fromFile(files=args.file.split(","))
if len(args.directory) > 0:
if args.remove:
for i in args.wordpress.split(","):
importWp.setUrl(i)
importWp.removeAll()
remove(args, basic, logger)
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [
@@ -185,10 +220,7 @@ if __name__ == '__main__':
except Exception as err:
logger.error("Threading error : {0}".format(err))
if len(args.canalblog) > 0:
if args.remove:
for i in args.wordpress.split(","):
importWp.setUrl(i)
importWp.removeAll()
remove(args, basic, logger)
try:
with futures.ThreadPoolExecutor(max_workers=int(args.parallel)) as ex:
wait_for = [
@@ -230,6 +262,9 @@ if __name__ == '__main__':
]
except Exception as err:
logger.error("Threading error : {0}".format(err))
exit(0)
if args.command == "remove":
remove(args, basic, logger)
exit(0)

View File

@@ -103,16 +103,20 @@ class WPExport:
if i not in webpage[section]["page"]:
webpage[section]["page"].append(i)
soup = BeautifulSoup(page.text, self._parser)
class_div = pagingfirstline = soup.find_all("div", class_="pagingfirstline")
class_div = soup.find_all("div", class_="pagingfirstline")
if len(class_div) > 0:
pagingfirstline = class_div[0].find_all("a")
if len(pagingfirstline) > 1:
lastpage = pagingfirstline[len(pagingfirstline)-1].get("href", "/")
self._logger.debug("{0} : Last page {1}".format(self._name, lastpage))
element_lastpage = lastpage.split("/")[len(lastpage.split("/"))-1]
number_page = element_lastpage.split("-")[0].split("p")[1]
number_lastpage = int(number_page) / 10
setPageDivided = int(number_lastpage) / max_thread
if setPageDivided > int(setPageDivided):
setPageDivided = setPageDivided + 1
setPagePart = setPageDivided * (index_thread + 1)
firstPagePart = (setPagePart - setPageDivided)
@@ -120,7 +124,7 @@ class WPExport:
self._logger.debug("{0} : First range : {1}".format(self._name, int(firstPagePart)))
self._logger.debug("{0} : Last range : {1}".format(self._name, int(setPagePart)))
for j in range(int(firstPagePart),int(setPagePart)):
for j in range(int(firstPagePart),int(setPagePart)+1):
paging = j * 10
categorie = urlparse(i).path.split("/")
url_paging = "{0}/archives/p{1}-10.html".format(self._url, paging)
@@ -133,7 +137,9 @@ class WPExport:
if page.status_code == 200:
soup = BeautifulSoup(page.text, self._parser)
h2 = soup.find_all("h2")
self._logger.debug("{0} : {1} H2 : {2}".format(self._name, url_paging, h2))
for title in h2:
self._logger.debug("{0} : {1} a : {2}".format(self._name, url_paging, title.find_all("a")))
href = title.find_all("a")[0].get("href", "/")
if href not in webpage[section]["article"]:
try:

View File

@@ -5,6 +5,7 @@ from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from slugify import slugify
class WPimport:
# Constructor
@@ -14,7 +15,7 @@ class WPimport:
self._wordpress = wordpress
self._logger = logger
self._parser = parser
self._headers_json = {'Content-Type': 'application/json', 'Accept':'application/json'}
self._headers_json = {'Content-Type': 'application/json; charset=utf-8', 'Accept':'application/json'}
self._request = requests.Session()
@@ -86,37 +87,26 @@ class WPimport:
else:
self._addOrUpdateFeaturedMedia(soup)
def removeAll(self):
params = {"per_page":100}
try:
self._logger.info("{0} : List posts to remove for url : {1}".format(self._name, self._wordpress))
r = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params, headers=self._headers_json)
except Exception as err:
self._logger.error("{0} : Connection error for list post to remove : {1}".format(self._name, err))
if r.status_code == 200:
result = r.json()
if len(result) > 0:
for i in result:
self._logger.info("{0} : Remove article for url {1} : {2}".format(self._name, self._wordpress, i["title"]["rendered"]))
params = {"force":1}
try:
r = self._request.delete("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, i["id"]), auth=self._basic, headers=self._headers_json , params=params)
if r.status_code == 200:
self._logger.info("{0} : Post removed for URL {1} : {2}".format(self._name, self._wordpress, i["title"]["rendered"]))
else:
self._logger.error("{0} : Connection error for post {1} with status code {2}".format(self._name, self._wordpress, i["title"]["rendered"]))
except Exception as err:
self._logger.error("{0} : Connection error for post remove : {1}".format(self._name, err))
exit(1)
self.removeAll()
else:
self._logger.error("{0} : Error for list to remove due status code {1}".format(self._name, r.status_code))
self._logger.debug("{0} : Content error : {1}".format(self._name, r.content))
# Private method
## replace caracter
def _replaceCaracter(self, title_rendered):
list_replace = {'’': "'", '–': '-', '…': '...', '« ': '"', ' »': '"', '« ': '"', ' »': '"', '’': "'", '"‘': "'"}
for old, new in list_replace.items():
title_rendered = title_rendered.replace(old, new)
return title_rendered
## remove space
def _removeSpace(self, title):
if title[len(title)-1] == " ":
title = title[:-1]
if title[0] == " ":
title = title[1:]
return title
## Get all files
def _getFiles(self, item):
@@ -221,7 +211,7 @@ class WPimport:
## Add or update img
def _addOrUpdateMedia(self, href_img, page):
media_authorized = ["png", "jpg", "jpeg", "svg"]
media_authorized = ["png", "jpg", "jpeg", "svg", "gif"]
media = {"id":"", "rendered":""}
split_fileimg = href_img.split("/")
img_name = split_fileimg[len(split_fileimg)-1]
@@ -465,33 +455,56 @@ class WPimport:
for i in liste:
for j in element[i]:
element_exist = False
try:
params = {"params":j}
page = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for {1} : {2}".format(self._name, i, err))
exit(1)
if page.status_code == 200:
element_exist = True
result = page.json()
listelement[i].append(result[0]["id"])
else:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if element_exist is False:
data = {"name": j}
self._logger.debug("{0} : URL : {1} ".format("http://{1}/wp-json/wp/v2/{2}".format(self._name, self._wordpress, i)))
self._logger.debug("{0} : data : {1}".format(self._name, data))
self._logger.debug("{0} : headers : {1}".format(self._name, self._headers_form))
title_element = self._removeSpace(j)
for index in range(1,10):
self._logger.info("{0} : search {1} with index {2} : {3}".format(self._name, i, index, title_element))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, headers=self._headers_json, data=data)
params = {"search":title_element, "per_page":"100", "page":index}
page = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, params=params)
except ConnectionError as err:
self._logger.error("{0} : Connection error for {1} : {2}".format(self._name, i, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for {1} : {2}".format(self._name, i, err))
if page.status_code == 200:
result = page.json()
self._logger.debug("{0} : content {3} {2} : {1}".format(self._name, result, title_element, i))
if len(result) > 0:
for k in result:
title_rendered = k["name"]
self._logger.debug("{0} : content {2} : {1}".format(self._name, title_rendered, i))
self._logger.debug("{0} : size of content {3} : {2} - {1}".format(self._name, len(title_rendered), len(title_element), i))
if len(title_element) != len(title_rendered):
title_rendered = self._replaceCaracter(title_rendered)
if title_element == title_rendered:
self._logger.info("{0} : {1} found : {2}".format(self._name, i, title_rendered))
element_exist = True
listelement[i].append(k["id"])
else:
break
if page.status_code == 400:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
break
else:
self._logger.error("{0} : {1} not found due status code : {2}".format(self._name, i, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
self._logger.debug("{0} : Element {3} {2} is {1}".format(self._name, element_exist, title_element, i))
if element_exist == False:
data = {"name": title_element}
self._logger.info("{0} : Create {1} : {2}".format(self._name, i, title_element))
self._logger.debug("{0} : Data : {1}".format(self._name, data))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, i), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for post {1} : {2}".format(self._name, i, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for post {1} : {2}".format(self._name, i, err))
if page.status_code == 201:
self._logger.info("{0} : {1} created : {2}".format(self._name, i, j))
result = page.json()
listelement[i].append(result["id"])
else:
@@ -514,62 +527,117 @@ class WPimport:
bodyhtml = bodyhtml.replace(i["old_src"], o.path)
hour = articledate[0].text
time = dateheader[0].text.split(" ")
self._logger.debug("{0} : Title post : |{1}|".format(self._name, title))
title = self._removeSpace(title)
self._logger.debug("{0} : Rendered Title post : |{1}|".format(self._name, title))
data = {"title":title, "content":bodyhtml, "status":"publish", "date": "{0}-{1}-{2}T{3}:00".format(time[2],month[time[1]],time[0], hour), "tags": listelement["tags"], "categories": listelement["categories"]}
params = {"search":author}
self._logger.debug("{0} : Data for post : |{1}| : {2}" .format(self._name, title, data))
params = {"search":author, "per_page":100}
try:
self._logger.info("{0} : Get author : {1}".format(self._name, author))
page = self._request.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.info("{0} : Search author : {1}".format(self._name, author))
page = self._request.get("http://{0}/wp-json/wp/v2/users".format(self._wordpress), auth=self._basic, headers=self._headers_json, params=params)
self._logger.debug("{0} : End Search author : {1}".format(self._name, author))
self._logger.debug("{0} : Debug requests : {1}".format(self._name, page.content))
except ConnectionError as err:
self._logger.error("{0} : Connection error for get author : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for get author : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.info("{0} : Get author id : {1}".format(self._name, result))
result = page.json()
data["author"] = result[0]["id"]
for a in result:
data["author"] = a["id"]
else:
self._logger.error("{0} : Connection error with status code for get author : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(page.content))
params = {"search": title}
try:
page = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params)
except Exception as err:
self._logger.error("{0} : Connection error for search post : {1}".format(self._name, err))
exit(1)
page_is_exist = True
if page.status_code == 200:
result = page.json()
if len(result) == 0:
page_is_exist = False
else:
for i in result:
if i["title"]["rendered"] == title:
post_id = i["id"]
self._logger.debug("{0} : Data for post to update : {1}".format(self._name, result[0]))
self._logger.info("{0} : Page {1} already exist and going to update".format(self._name, title))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except Exception as err:
self._logger.error("{0} : Connection error for update post : {1}".format(self._name, err))
exit(1)
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Post updated : {1}".format(self._name, title))
self._addOrUpdateComment(result["id"], comment_post, result["title"]["raw"])
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
else:
self._logger.error("{0} : Connection for update post error with status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
page_is_exist = False
if page_is_exist == False:
for index in range(1,10):
params = {"search": title, "per_page":100, "page": index}
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
self._logger.info("{0} : Search post with index {2} : {1}".format(self._name, title, index))
page = self._request.get("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, params=params, headers=self._headers_json)
except ConnectionError as err:
self._logger.error("{0} : Connection error for search post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for search post : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.debug("{0} : Encoding : {1}".format(self._name, page.encoding))
page.encoding = "utf-8"
result = page.json()
if len(result) == 0:
break
self._logger.info("{0} : Number result posts : {1}".format(self._name, len(result)))
count = 0
for i in result:
title_rendered = i["title"]["rendered"]
self._logger.info("{0} : Search title posts for |{2}| : |{1}|".format(self._name, title_rendered, title))
if len(title_rendered) != len(title):
title_rendered = self._replaceCaracter(title_rendered)
self._logger.debug("{0} : Search title posts for |{2}| : |{1}|".format(self._name, title_rendered, title))
self._logger.debug("{0} : SIze of title : {1} - {2}".format(self._name, len(title), len(title_rendered)))
if title_rendered == title:
page_is_exist = True
post_id = i["id"]
count = count + 1
if count > 1:
self._logger.info("{0} : Page {1} is double and going to delete".format(self._name, title))
try:
params = {"force":1}
page = self._request.delete("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, params=params)
except ConnectionError as err:
self._logger.error("{0} : Connection error for deleted post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for deleted post : {1}".format(self._name, err))
if page.status_code == 200:
self._logger.info("{0} : Post deleted : {1}".format(self._name, title))
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
else:
self._logger.debug("{0} : Data for post to update : {1}".format(self._name, i))
self._logger.info("{0} : Page {1} already exist and going to update".format(self._name, title))
try:
page = self._request.post("http://{0}/wp-json/wp/v2/posts/{1}".format(self._wordpress, post_id), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for update post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for update post : {1}".format(self._name, err))
if page.status_code == 200:
result = page.json()
self._logger.info("{0} : Post updated : {1}".format(self._name, title))
self._addOrUpdateComment(result["id"], comment_post, result["title"]["raw"])
self._linkImgPost(result["title"]["raw"], list_img, result["id"])
else:
self._logger.error("{0} : Post not updated due status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page.status_code == 400:
self._logger.error("{0} : Connection for update post unauthorized : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
break
else:
self._logger.error("{0} : Connection for update post error with status code : {1}".format(self._name, page.status_code))
self._logger.debug("{0} : {1}".format(self._name, page.content))
if page_is_exist is False:
try:
self._logger.info("{0} : Creating posts : {1}".format(self._name, data["title"]))
page = self._request.post("http://{0}/wp-json/wp/v2/posts".format(self._wordpress), auth=self._basic, headers=self._headers_json, data=json.dumps(data))
except ConnectionError as err:
self._logger.error("{0} : Connection error for create post : {1}".format(self._name, err))
exit(1)
except Exception as err:
self._logger.error("{0} : Exception error for create post : {1}".format(self._name, err))
if page.status_code == 201:
result = page.json()
self._logger.info("{0} : Post added : {1}".format(self._name, result["title"]["raw"]))

75
lib/WPRemove.py Normal file
View File

@@ -0,0 +1,75 @@
#!/usr/bin/python3
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import requests, os, logging, re, json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class WPRemove:
# Constructor
def __init__(self, name="Thread-0", basic=None, wordpress="", logger=None):
self._name = name
self._basic = basic
self._wordpress = wordpress
self._logger = logger
self._headers_json = {'Content-Type': 'application/json', 'Accept':'application/json'}
self._request = requests.Session()
retries = Retry(connect=10, read=10, redirect=5,
status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2)
self._request.mount('http://', HTTPAdapter(max_retries=retries))
# Destructor
def __del__(self):
print("{0} : Import finished for {1}".format(self._name, self._wordpress))
# Public method
def setUrl(self, wordpress):
self._wordpress = wordpress
def cleanPosts(self):
self._removeAll("posts")
def cleanTags(self):
self._removeAll("tags")
def cleanCategories(self):
self._removeAll("categories")
def cleanMedia(self):
self._removeAll("media")
# Private method
def _removeAll(self, composant):
params = {"per_page":100}
try:
self._logger.info("{0} : List {2} to remove for url : {1}".format(self._name, self._wordpress, composant))
r = self._request.get("http://{0}/wp-json/wp/v2/{1}".format(self._wordpress, composant), auth=self._basic, params=params, headers=self._headers_json)
except Exception as err:
self._logger.error("{0} : Connection error for list {1} to remove : {2}".format(self._name, composant, err))
if r.status_code == 200:
result = r.json()
if len(result) > 0:
for i in result:
self._logger.info("{0} : Remove {2} for url {1} : {3}".format(self._name, self._wordpress, composant, i["title"]["rendered"]))
params = {"force":1}
try:
r = self._request.delete("http://{0}/wp-json/wp/v2/{1}/{2}".format(self._wordpress, composant, i["id"]), auth=self._basic, headers=self._headers_json , params=params)
if r.status_code == 200:
self._logger.info("{0} : Post removed for URL {1} {2} : {3}".format(self._name, self._wordpress, composant, i["title"]["rendered"]))
else:
self._logger.error("{0} : Connection error for post {1} {2} {3} with status code {4}".format(self._name, self._wordpress, composant, i["title"]["rendered"], r.status_code))
except Exception as err:
self._logger.error("{0} : Connection error for {1} remove : {2}".format(self._name, composant, err))
exit(1)
self._removeAll(composant)
else:
self._logger.error("{0} : Error for list to remove {1} due status code {2}".format(self._name, composant, r.status_code))
self._logger.debug("{0} : Content error for {1} : {2}".format(self._name, composant, r.content))