Spaces:
Build error
Build error
| from selenium import webdriver | |
| from selenium.webdriver.support.ui import Select | |
| from selenium.webdriver.common.by import By | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| import os | |
| import time | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.common.action_chains import ActionChains | |
| import chromedriver_autoinstaller | |
| class ScrapeThatData: | |
| def __init__(self, time_threshold = 10): | |
| try: | |
| chrome_options = webdriver.ChromeOptions() | |
| chrome_options.add_argument('--no-sandbox') | |
| self.driver = webdriver.Chrome(options=chrome_options) | |
| except: | |
| chromedriver_autoinstaller.install() | |
| chrome_options = webdriver.ChromeOptions() | |
| chrome_options.add_argument('--no-sandbox') | |
| self.driver = webdriver.Chrome(options=chrome_options) | |
| self.wait = WebDriverWait(self.driver,time_threshold) | |
| self.attribute_dict = {'status':1 ,'conditions':2, 'interventions': 3, 'study type':4, | |
| 'phase':5, 'sponsor':6, 'funder type':7 , 'study design': 8, | |
| 'outcome measures':9, 'number enrolled':10, 'sex':11, 'age':12, | |
| 'nct number': 13, 'other ids':14, 'title acronym': 15 , 'study start': 16, | |
| 'primary completion': 17, 'study completion': 18 , 'first posted': 19, | |
| 'last update posted': 20 , 'results first posted': 21 , 'locations':22, 'study documents': 23} | |
| self.status_dict = {'not yet recruiting' : 'notYetRecrCB', | |
| 'recruiting' : 'recruitingCB', | |
| 'enrolling by invitation':'enrollingByInvCB', | |
| 'active, not recruiting': 'activeCB', | |
| 'suspended': 'suspendedCB', | |
| 'terminated':'terminatedCB', | |
| 'completed':'completedCB', | |
| 'withdrawn': 'withdrawnCB', | |
| 'unknown status': 'unknownCB'} | |
| def clicking_show_hide_cols(self, driver): | |
| columns = driver.find_element(By.XPATH,'//*[@id="theDataTable_wrapper"]/div[3]/button') | |
| action_chain = ActionChains(driver) | |
| action_chain.move_to_element(columns).click() | |
| action_chain.perform() | |
| def select_attributes_to_show(self, listed_attributes, attribute_dict): | |
| ll = [value.lower() for value in listed_attributes if value.lower() in ['status', 'conditions', 'interventions', 'locations']] | |
| if ll: | |
| to_show = [value.lower() for value in listed_attributes if value.lower() not in ll] | |
| to_hide = [value for value in ['status', 'conditions', 'interventions', 'locations'] if value not in ll] | |
| to_click = to_hide + to_show | |
| for att in to_click: | |
| self.clicking_show_hide_cols(self.driver) | |
| time.sleep(1) | |
| self.wait.until(EC.presence_of_element_located((By.XPATH,'//*[@id="theDataTable_wrapper"]/div[3]/div[2]/button['+ str(attribute_dict[att]) + ']'))).click() | |
| time.sleep(1) | |
| else: | |
| for att in listed_attributes: | |
| self.clicking_show_hide_cols(self.driver) | |
| time.sleep(1) | |
| self.wait.until(EC.presence_of_element_located((By.XPATH,'//*[@id="theDataTable_wrapper"]/div[3]/div[2]/button['+ str(attribute_dict[att.lower()]) + ']'))).click() | |
| time.sleep(1) | |
| def select_by_status(self, listed_states, status_dict): | |
| if listed_states: | |
| for status in listed_states: | |
| self.driver.find_element(By.ID,status_dict[status.lower()]).click() | |
| self.driver.find_element(By.XPATH,'//*[@id="FiltersBody"]/div[1]/input[1]').click() | |
| time.sleep(3) | |
| select = Select(self.driver.find_element_by_name('theDataTable_length')) | |
| select.select_by_value('100') | |
| def collect_data_search_page(self,l_ordered, amount_of_data = None): | |
| class_name = '' | |
| page_index = 1 | |
| elements = [l_ordered] | |
| while 'disabled' not in class_name : | |
| time.sleep(10) | |
| print('Getting data from page {}'.format(page_index)) | |
| #Counting how many rows of the table appear | |
| table = self.driver.find_element(By.ID,'theDataTable') | |
| row_count = len(table.find_elements(By.TAG_NAME,"tr")) | |
| #Looping table page | |
| for index in range(1, row_count): | |
| row = [] | |
| if 'status' in l_ordered: | |
| self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR,'#theDataTable > tbody > tr:nth-child('+str(index)+') > td:nth-child(3)'))) | |
| status_element = self.driver.find_elements(By.CLASS_NAME,'#theDataTable > tbody > tr:nth-child('+str(index)+') > td:nth-child(3) > span') | |
| row.append(status_element.text.strip()) | |
| for i, val in enumerate(l_ordered): | |
| if val == 'status': | |
| continue | |
| self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR,'#theDataTable > tbody > tr:nth-child('+str(index)+') > td:nth-child('+str(4+i)+')'))) | |
| element = self.driver.find_elements(By.CLASS_NAME,'#theDataTable > tbody > tr:nth-child('+str(index)+') > td:nth-child('+str(4+i)+')') | |
| try: | |
| row.append(element.text.strip()) | |
| except: | |
| print(i, element) | |
| else: | |
| for i, val in enumerate(l_ordered): | |
| self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR,'#theDataTable > tbody > tr:nth-child('+str(index)+') > td:nth-child('+str(3+i)+')'))) | |
| element = self.driver.find_elements(By.CLASS_NAME,'#theDataTable > tbody > tr:nth-child('+str(index)+') > td:nth-child('+str(3+i)+')') | |
| try: | |
| row.append(element.text.strip()) | |
| except: | |
| print(i, element) | |
| elements.append(row) | |
| #Getting next page button | |
| next_page= self.driver.find_element(By.ID,"theDataTable_next") | |
| #Getting the class attribute of the next page button | |
| class_name = next_page.get_attribute('class') | |
| #Going to the next page | |
| next_page.click() | |
| page_index += 1 | |
| if amount_of_data: | |
| if len(elements) >= amount_of_data or row_count < amount_of_data : | |
| break | |
| else: | |
| continue | |
| return elements | |
| def get_criteria(self, NCTnumber): | |
| url = 'https://clinicaltrials.gov/ct2/show/' + NCTnumber | |
| ClinicalTrialpage = requests.get(url) | |
| soup = BeautifulSoup(ClinicalTrialpage.text, 'html.parser') | |
| wrapping_crit_class = soup.find_all("div", {"class": "tr-indent2"}) | |
| list_elements = wrapping_crit_class[1].find_all(re.compile("(ul|ol)")) | |
| inclusion, exclusion = ('','') | |
| if not list_elements: | |
| print ("WARNING: Study number " + NCTnumber + " doesn't have eligibility criteria or HTML tag format is not a list") | |
| else: | |
| if len(list_elements) == 1: | |
| try: | |
| if wrapping_crit_class[1].find(text = 'Inclusion Criteria:'): | |
| inclusion = list_elements[0].find_all("li") | |
| elif wrapping_crit_class[1].find(text = 'Exclusion Criteria:'): | |
| exclusion = list_elements[0].find_all("li") | |
| except: | |
| print('criteria doesnt exist') | |
| else: | |
| inclusion = list_elements[0].find_all("li") | |
| exclusion = list_elements[1].find_all("li") | |
| inclusion = ' '.join([t.text.strip() for t in inclusion ]) | |
| exclusion = ' '.join([t.text.strip() for t in exclusion ]) | |
| return(inclusion, exclusion) | |
| #function that gets number of patients enrolled in a study | |
| def get_enrollment (self, NCTnumber): | |
| url = 'https://clinicaltrials.gov/ct2/show/' + NCTnumber | |
| ClinicalTrialpage = requests.get(url) | |
| soup = BeautifulSoup(ClinicalTrialpage.text, 'html.parser') | |
| enrollment = '' | |
| wrapping_enrol_class = soup.find_all('td', {'headers':'studyInfoColData','style':"padding-left:1em"}) | |
| if not wrapping_enrol_class: | |
| print('WARNING: Number of Participants in Study number '+ NCTnumber +' is unavailable') | |
| else: | |
| enrollment = wrapping_enrol_class[1] | |
| enrollment = enrollment.text.split()[0] | |
| if enrollment.isdigit() == False: | |
| print ('WARNING: Number of Participants in Study number '+ NCTnumber +' is unavailable') | |
| else: | |
| return(enrollment) | |
| def __call__(self, condition, listed_attributes, listed_states, amount_of_data): | |
| self.driver.get('https://clinicaltrials.gov/ct2/results?cond=' + condition + '&rank=1&view=record#rowId0') | |
| self.select_attributes_to_show(listed_attributes, self.attribute_dict) | |
| try: | |
| self.select_by_status(listed_states, self.status_dict) | |
| except: | |
| print('select by status is a problem') | |
| n = [] | |
| for i in listed_attributes: | |
| n.append(self.attribute_dict[i.lower()]) | |
| attribute_ordered = [list(self.attribute_dict.keys())[list(self.attribute_dict.values()).index(i)]for i in sorted(n)] | |
| search_data = self.collect_data_search_page(attribute_ordered, amount_of_data=amount_of_data) | |
| nct_numbers = [e[search_data[0].index('nct number')] for e in search_data[1:]] | |
| search_data[0].extend(['inclusion', 'exclusion', 'enrollment']) | |
| for index, nct in enumerate(nct_numbers): | |
| if index % 100 == 0 and index!= 0: | |
| print("Collected Data from {} Studies: ".format(index)) | |
| inc, exc = self.get_criteria(nct) | |
| enrol = self.get_enrollment(nct) | |
| search_data[index + 1].extend([inc, exc, enrol]) | |
| return search_data | |
| # except: | |
| # print('no data available with the specified status') | |