• 17 Posts
  • 11 Comments
Joined 5M ago
cake
Cake day: Apr 18, 2022

help-circle
rss
How to get conflicting files with same relative paths from multiple folders?
I want to make a virtual file system from a few folders and want to check if there are any conflicting files. So I want to provide a few folders and get files with the same path relative to their folders. How can I find the conflicts? This is what I've done so far. The `get_files` and `remove_duplicates` functions aren't working as I expected. ```python import os import shutil import sys from collections import Counter from pathlib import Path from typing import List def main(): folders = sys.argv[1:] if len(folders) < 2: print("Please provide at least 2 folders") exit(1) files = get_files(folders) conflicting_files = find_conflicting_files(files) conflicting_files = remove_duplicates(conflicting_files) print_conflicting_files(conflicting_files) def get_files(folders): files = [] for folder in folders: files.extend([os.path.relpath(path, folder) for path in Path(folder).rglob("*")]) return files def test_get_files(): try: os.makedirs("test/folder1/a", exist_ok=True) os.makedirs("test/folder2/b", exist_ok=True) open("test/folder1/a/file", "w").close() open("test/folder2/b/file", "w").close() folders = ["test/folder1", "test/folder2"] assert get_files(folders) == ["a/file", "b/file"] finally: shutil.rmtree("test") def find_conflicting_files(files) -> List: return [file for file, cnt in Counter(files).items() if cnt > 1] def test_find_conflicting_files(): files = [ ["a", "b", "c"], ["a", "b", "d"], ["a", "b", "e"], ["a", "b", "f"], ] assert find_conflicting_files(files) == ["a", "a", "a", "b", "b", "b"] def remove_duplicates(l: List) -> List: return [*set(l)] def test_remove_duplicates(): files = ["a", "a", "b", "b", "c", "c"] assert remove_duplicates(files) == ["a", "b", "c"] def print_conflicting_files(files): for file in files: print(file) if __name__ == "__main__": main() ```
fedilink

How to find files with same relative paths in multiple folders?
I want to make a virtual file system from a few folders and want to check if there are any conflicting files. So I want to provide a few folders and get files with the same path relative to their folders. How can I find the conflicts? ```bash #!/usr/bin/env bash # Find conflicting paths in multiple folders to use with mergerfs # Usage: ./conflict.sh /path/to/folder1 /path/to/folder2 /path/to/folder3 > conflicts.txt find_conflicting_files() { local folders=("$@") local files=() if [[ ${#folders[@]} -lt 2 ]]; then echo "Please provide at least 2 folders" exit 1 fi for folder in "${folders[@]}"; do files+=("$(find "$folder" -type f)") done # find all conflicting files # print the conflicting files for conflict in "${conflicts[@]}"; do echo "$conflict" done ```
fedilink

Find leaf folders that aren't hidden folders
I have a folder structure with some epubs and json files in the down-most folders (not counting the `.ts` folders). I'm exporting tags from the json files to tagspaces, by creating a `.ts` folder with some other `json` files. I've already processed part of the files and now I want to find the leaf folders that don't have a `.ts` folder in their path, to find the remaining files without having to process the others twice. I want to process the files in the directories as I find them instead of getting a list of directories and then looping through them. On the example below I've returned the list of directories only to be able to test it. So for this example I only want to do something for the folder `t5`: test ├── t1 │   ├── t2 │   │   └── t5 │   └── t3 │   └── .ts └── .ts └── t4 This is what I've tried: import os import shutil from typing import List def process_files_in_leaf_subdirectories(dir: str) -> List[str]: dirs = [] for root, subdirs, filenames in os.walk(dir): if subdirs or '.ts' in root: continue dirs.append(root) return dirs def test_process_files_in_leaf_subdirectories(): os.makedirs('tmp/t1/t2/t5', exist_ok=True) os.makedirs('tmp/t1/t3/.ts', exist_ok=True) os.makedirs('tmp/.ts/t4', exist_ok=True) assert get_files_in_leaf_subdirectories('tmp') == ['tmp/t1/t2/t5'] shutil.rmtree('tmp') The next example works fine but it gets the list of directories instead of processing the files as they are found: import os import shutil from pathlib import Path from typing import List def process_files_in_leaf_dir(leaves: List[Path]) -> List[str]: files = [] for dir in leaves: for meta_file in dir.glob("*.json"): files.append(meta_file) return files def find_leaf_dirs(root_path: Path) -> Path: # filter subdirectories child_dirs = [path for path in root_path.iterdir() if path.is_dir()] # if no child_dir, yield & return if not child_dirs: yield root_path return # otherwise iter tru subdir for path in child_dirs: # ignore hidden dir if path.stem[0] == ".": continue # step in and recursive yield yield from find_leaf_dirs(path) def test_process_files_in_leaf_dir(): os.makedirs('tmp/t1/t2/t5', exist_ok=True) os.makedirs('tmp/t1/t3/.ts', exist_ok=True) os.makedirs('tmp/.ts/t4', exist_ok=True) Path('tmp/t1/t2/t5/test.json').touch() Path('tmp/t1/t3/test.json').touch() Path('tmp/t1/t3/.ts/test.json').touch() Path('tmp/.ts/t4/test.json').touch() leaves = list(find_leaf_dirs(Path('tmp'))) assert process_files_in_leaf_dir(leaves) == [Path('tmp/t1/t2/t5') / 'test.json'] shutil.rmtree('tmp') [context](https://git.disroot.org/hirrolot19/AO3Scraper/src/commit/bfe85bef77bffa0c84dd8481af50c2ac9097fc62/export_tags.py#L57-L80)
fedilink

Scraping website asynchronously with aiohttp_scraper library using pool of proxies
I'm on linux and I've installed and executed `redis` on the background, which is required by [aiohttp_scraper][1]. The library didn't say I had to install `redis` myself so maybe it's missing some others step I have to take before I can use it. The following code works fine: import asyncio from aiohttp_scraper import Proxies from aiohttp_scraper import ScraperSession as ClientSession from urllib.request import urlopen def scrape(): TEST_URL = "https://books.toscrape.com/catalogue/" urls = [f"{TEST_URL}page-{str(i)}.html" for i in range(1, 5)] scraper = WebScraper(urls) asyncio.run(scraper.run()) print(scraper.master_dict) def get_proxies() -> Proxies: PROXY_URL = "https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt" proxies = urlopen(PROXY_URL).read().decode('utf-8').splitlines() return Proxies( proxies=proxies, redis_uri="redis://localhost:6379", window_size_in_minutes=5, max_requests_per_window=300, ) class WebScraper(object): def __init__(self, urls): self.urls = urls self.proxies = get_proxies() self.master_dict = {} async def run(self): loop = asyncio.get_event_loop() async with ClientSession(loop=loop) as session: tasks = [loop.create_task(self.fetch(session, url)) for url in self.urls] await asyncio.gather(*tasks) async def fetch(self, session, url): async with session.get(url) as response: print(response.status) self.master_dict[url] = await response.text() if __name__ == "__main__": scrape() But if I change line 34 to async with ClientSession(loop=loop, proxies=self.proxies) as session: then the code hangs every time I execute it. The only thing I see in the output is: ❯ python test.py Task was destroyed but it is pending! task: <Task pending name='Task-6' coro=<RedisConnection._read_data() running at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:168]> Task was destroyed but it is pending! task: <Task pending name='Task-7' coro=<RedisConnection._read_data() running at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:168]> Task was destroyed but it is pending! task: <Task pending name='Task-8' coro=<RedisConnection._read_data() running at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:168]> [1]: https://github.com/jgontrum/aiohttp-scraper
fedilink

Scrape a website asynchronously using a list of tor circuits
I want to scrape a website asynchronously using a list of tor circuits with different exit nodes and making sure each exit node only makes a request every 5 seconds. For testing purposes, I'm using the website https://books.toscrape.com/ and I'm lowering the sleep time, number of circuits and number of pages to scrape. It works fine without tor, but I'm getting the following error when I use tor.: 2022-09-06 11:08:49,380 [DEBUG] Loaded 10 authorities dir 2022-09-06 11:08:49,383 [DEBUG] Loaded 141 fallbacks dir 2022-09-06 11:08:49,383 [DEBUG] Using selector: EpollSelector 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' {} import asyncio import aiohttp import logging from docopt import docopt from torpy import TorClient from typing import Dict, List logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("debug.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def main(): """ Usage: scraper.py <url>... [--tor] scraper.py -h | --help Options: -h --help Show this screen. --tor Use tor to scrape website """ args = docopt(main.__doc__) urls = args['<url>'] tor = args['--tor'] scrape_website(urls, tor) def scrape_test_website() -> None: TEST_URL = "https://books.toscrape.com/catalogue/" urls = [f"{TEST_URL}page-{str(i)}.html" for i in range(1, 5)] print(scrape_website(urls, tor=True)) def scrape_website(urls: List[str], tor: bool = False) -> Dict: if tor: scraper = TorWebScraper(urls) else: scraper = WebScraper(urls) asyncio.run(scraper.run()) return scraper.master_dict class WebScraper(object): def __init__(self, urls: List[str]): self.urls = urls self.all_data = [] self.master_dict = {} async def fetch(self, url: str) -> str: try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: text = await response.text() return url, text except Exception as e: logger.error(e) async def run(self) -> None: tasks = [] for url in self.urls: tasks.append(self.fetch(url)) self.all_data = await asyncio.gather(*tasks) for data in self.all_data: if data is not None: url = data[0] self.master_dict[url] = {'raw_html': data[1]} def get_circuits(n: int = 2) -> List: """ Get a list of one-hop tor circuits with different nodes """ circuits = [] with TorClient() as tor: for _ in range(n): circuits.append(tor.create_circuit()) return circuits class TorWebScraper(WebScraper): def __init__(self, urls: List[str]): super().__init__(urls) self.circuits = get_circuits(2) async def fetch(self, url: str) -> str: try: async with aiohttp.ClientSession() as session: for circuit in self.circuits: async with circuit.create_stream() as stream: async with session.get(url, proxy=stream.proxy) as response: await asyncio.sleep(20e-3) text = await response.text() return url, text except Exception as e: logger.error(e) if __name__ == '__main__': #main() scrape_test_website()
fedilink

import cbor2
def hash(tag: Dict) -> str:
    return f"{WUTAG_NAMESPACE}.{base64.b64encode(cbor2.dumps(tag))}"


def test_hash():
    tag = {'name': 'tag1', 'color': '#000000'}
    assert hash('tag') == "user.wutag.b'Y3RhZw=='"

Still need a test for the rust code to make sure they are the same.


Import tags to wutag. CBOR serialization of a dictionary?
> [Wutag][1] tags are currently stored in a way that makes them unique to wutag. > They might get picked up by other programs, but they won't look the > same. The reason why is that tags get serialized in cbor and prefixed > with wutag. for easier future extraction as can be seen [here][2]: fn hash(&self) -> Result<String> { serde_cbor::to_vec(&self) .map(|tag| format!("{}.{}", WUTAG_NAMESPACE, base64::encode(tag))) .map_err(Error::from) How can I serialize the tags in python? import base64 import pytest import xattr WUTAG_NAMESPACE = "user.wutag" def hash(tag: Dict) -> str: return f"{WUTAG_NAMESPACE}.{pickle.dumps(tag).encode('base64', 'strict')}" # AttributeError: 'bytes' object has no attribute 'encode' #return f"{WUTAG_NAMESPACE}.{base64.b64encode(json.dumps(tag))}" # TypeError: a bytes-like object is required, not 'str' def test_hash(): tag = {'name': 'tag1', 'color': '#000000'} assert hash('tag') == 'user.wutag.dGFn' def hash_tags(tags: List[Dict]) -> List[str]: return [hash(tag) for tag in tags] def import_tags_wutag(path: str, tags: List[str]) -> List[str]: xattr.setxattr(path, WUTAG_NAMESPACE, hash_tags(tags)) Here is another try that doesn't seem to work either: def write_wutag_tags(parent_folder: str, tags: List[str]) -> None: for format in FORMAT_LIST: filepath = get_filepath(parent_folder, format) subprocess.run(['wutag', 'set', filepath, *tags]) This last one fails with this error if any of the tags is empty. error: The argument '<tags>...' requires a value but none was supplied USAGE: wutag set <pattern> <tags>... For more information try --help Otherwise it works fine. [1]: https://github.com/vv9k/wutag [2]: https://github.com/vv9k/wutag/blob/6d697957cc87ad75c380b6cd7f7ecaee2ef83182/wutag_core/tag.rs#L99-L102
fedilink

Need help parsing arguments for a script
I want to be able to call [this script](https://git.disroot.org/hirrolot19/AO3Scraper/src/commit/9315d6d339e5b760251fa27bd1e8016019f56793/extras/import_ao3_tags.py) with a single id, a list of ids, a url, or a csv file. Is there a simple way to express it? Here I've only added options to call the script with nothing, a single id or a list of ids. But it already looks too messy for my taste. def parse_args(): try: if len(sys.argv) == 3: # List if sys.argv[2].startswith("[") and sys.argv[2].endswith("]"): work_ids = sys.argv[2].strip("[").strip("]").split(",") download_ao3_fics_and_import_tags(work_ids) else: # single work_id download_ao3_fic_and_import_tags(sys.argv[2]) elif len(sys.argv) != 3: main() else: usage() except Exception: usage()
fedilink

Looking for a guide to scrape concurrently with tor
Like this guide but with concurrency: [Scraping Websites with Python, Selenium, and Tor](https://scribe.rip/m/global-identity?redirectUrl=https%3A%2F%2Fpython.plainenglish.io%2Fthe-big-data-heist-a6b073b30de5) It doesn't need to have anything about Selenium.
fedilink

It was easier than I expected:

while driver.find_element(By.XPATH, '//*[@id="connectLongContent"]').is_displayed():
        time.sleep(1)

There is also the tor-browser-selenium library to consider.


How to wait until connected to the tor network with selenium?
Following the guide [Scraping Websites with Python, Selenium, and Tor][1] I don't know how could I wait only until the tor browser is connected instead of waiting a fixed amount of time. Something like what there is in the comment below, which isn't working right now. The `torStatus` is just an example with doesn't really exist. I'm looking for an actual way to tell when it is connected. def connect_tor() -> None: driver.find_element(By.XPATH, '//*[@id="connectButton"]').click() #while driver.find_element(By.XPATH, '//*[@id="torStatus"]').get_attribute("value") != "connected": #time.sleep(1) time.sleep(20) [1]: https://scribe.rip/m/global-identity?redirectUrl=https%3A%2F%2Fpython.plainenglish.io%2Fthe-big-data-heist-a6b073b30de5
fedilink


Get list of trending packages. Trending algorithm?
[I'm using](https://lemmy.ml/post/414930) the z-score algorithm as mentioned [here](https://stackoverflow.com/a/826509). Which trending algorithm would you use?
fedilink

How would I run the usermod command while I keep getting the error user is currently used by process?

Something like?

nohup ps -ef | grep user | awk '{ print $2 }' | sudo xargs kill -9; sleep 2; usermod --home /mnt/T/home user

I used pkill -KILL -u user and then I could use the usermod command.

Edit: the first solution above ended up giving me some trouble. I ended up following this solution.


How can I move my home folder to an external drive?
Edit: doing this has slowed down the computer. I don't recommend it. I'm going to revert the changes. I would like to move all the content of my home folder to an external drive to preserve the SSD from too many reads and writes when downloading stuff. Would it be safe to just move everything to `/mnt/T/home` and symlink with `ln -s /mnt/T/home /home/user`? The hard drive is configured to mount on system startup. I don't know if this would generate issues with any program that may refuse to follow symlinks. Maybe it's safer to [create a home partition][1] and [move it][2]? [1]: https://www.howtogeek.com/116742/how-to-create-a-separate-home-partition-after-installing-ubuntu/ [2]: https://www.howtogeek.com/442101/how-to-move-your-linux-home-directory-to-another-hard-drive/
fedilink

I had to solve this by using two databases

const DATABASE_URL: &str = "postgres://admin:1234@localhost:5432";
const POSTGRES_DATABASE_URL: &str = "postgres://admin:1234@localhost:5432/postgres";

since I couldn’t drop the database while using it. So in the end I connect first to the postgres database and then refresh and connect to the other database.

Solution context


[SOLVED] How to connect to postgres database with sea-orm?
I've [installed postgres on manjaro](https://wiki.archlinux.org/title/PostgreSQL), then I've created the user wiki with password wiki. ``` ❯ sudo -iu postgres [postgres@arch-desktop ~]$ createuser -P wiki [postgres@arch-desktop ~]$ psql postgres=# \du Role name wiki ``` And when I try to run [this program](https://codeberg.org/LifeSymbiont/media_wiki/src/commit/e1687774d6ebdb9afacb9d8cad5ab3f6a44b90b0) I get the error ``` thread 'main' panicked at 'Connection Error: error returned from database: database "wiki" does not exist', src/main.rs:53:9 ``` [According to this](https://www.sea-ql.org/sea-orm-tutorial/ch01-01-project-setup.html#connecting-to-the-database-server) ``` // Change this according to your database implementation, // or supply it as an environment variable. // the database URL string follows the following format: // "protocol://username:password@host:port/database" const DATABASE_URL: &str = "postgres://wiki:wiki@localhost:5432"; ``` it shouldn't be using `wiki` as the database name. [In the tutorial](https://www.sea-ql.org/sea-orm-tutorial/ch01-01-project-setup.html#creating-a-database), why is it creating the database after it's already connected to it? Shouldn't it check if the database exists and create it only if it doesn't? If I create the database manually with ``` [postgres@arch-desktop ~]$ createdb media_wiki_db postgres=# ALTER DATABASE media_wiki_db OWNER TO wiki; ``` and use this instead ``` const DATABASE_URL: &str = "postgres://wiki:wiki@localhost:5432/media_wiki_db"; ``` I get the error ``` thread 'main' panicked at 'Execution Error: error returned from database: cannot drop the currently open database', src/main.rs:53:9 ``` thread 'main' panicked at 'Execution Error: error returned from database: permission denied to create database', src/main.rs:53:9 If I use the postgres database ``` const DATABASE_URL: &str = "postgres://wiki:wiki@localhost:5432/postgres"; ``` I get ``` thread 'main' panicked at 'Execution Error: error returned from database: permission denied to create database', src/main.rs:53:9 ``` Probably missing `CREATEDB` permission ``` ❯ sudo -iu postgres [postgres@arch-desktop ~]$ createdb test postgres=# ALTER USER wiki CREATEDB; postgres=# ALTER DATABASE test OWNER TO wiki; postgres=# \l \q Ctrl+D ``` Changed to `const DATABASE_URL: &str = "postgres://wiki:wiki@localhost:5432/test";` ``` ❯ cargo run thread 'main' panicked at 'Connection Error: error returned from database: database "test/media_wiki_db" does not exist', src/main.rs:53:9 ``` What am I doing wrong? How do I connect to the database? Actually what I would like to do is drop the database, recreate it and connect. At least for now.
fedilink



How to close ssh in nested tmux session?
I've opened ssh and it has gone into the tmux session opened in the other computer so now I see two tmux one inside another. Now I don't know how to exit ssh. When I pressed `Ctrl+D` it closed a panel in the nested tmux session. When I tried to detach `Ctrl+B D` it closed the tmux session in my computer.
fedilink


I think it’s because I’ve changed the digest version from digest = “0.9” to digest = “0.10.3”



proc.expect(pattern_duration) needed to go before total = self.get_total_frames(proc)


[Solved] Show progress bar of a ffmpeg video convertion
I'm trying to print a progress bar while executing ffmpeg but I'm having trouble getting the total number of frames and the current frame being processed. The error I get is `AttributeError: 'NoneType' object has no attribute 'groups'`. I've copied the function from [this program][1] and for some reason it works there but not here, even though I haven't changed that part. [main.py][2] ``` pattern_duration = re.compile( 'duration[ \t\r]?:[ \t\r]?(.+?),[ \t\r]?start', re.IGNORECASE) pattern_progress = re.compile('time=(.+?)[ \t\r]?bitrate', re.IGNORECASE) def execute_ffmpeg(self, manager, cmd): proc = expect.spawn(cmd, encoding='utf-8') self.update_progress_bar(proc, manager) self.raise_ffmpeg_error(proc) def update_progress_bar(self, proc, manager): total = self.get_total_frames(proc) cont = 0 pbar = self.initialize_progress_bar(manager) try: proc.expect(pattern_duration) while True: progress = self.get_current_frame(proc) percent = progress / total * 100 pbar.update(percent - cont) cont = percent except expect.EOF: pass finally: if pbar is not None: pbar.close() def raise_ffmpeg_error(self, proc): proc.expect(expect.EOF) res = proc.before res += proc.read() exitstatus = proc.wait() if exitstatus: raise ffmpeg.Error('ffmpeg', '', res) def initialize_progress_bar(self, manager): pbar = None pbar = manager.counter( total=100, desc=self.path.rsplit(os.path.sep, 1)[-1], unit='%', bar_format=BAR_FMT, counter_format=COUNTER_FMT, leave=False ) return pbar def get_total_frames(self, proc): return sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) def get_current_frame(self, proc): proc.expect(pattern_progress) return sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) ``` [1]: https://github.com/JavierOramas/video-diet/blob/966b5192902e55bd60ae06a5df195ec41bcd5d71/diet_video/__init__.py [2]: https://codeberg.org/LifeSymbiont/reduce_video_size/src/commit/ef3cd0974ecd1c4d0a17b6394499650c9fc3da2b/main.py
fedilink


[Solved] Need to understand and simplify this function that converts a video with ffmpeg while showing a progress bar
This function is giving me an error but I've copied it from [the video-diet program](https://github.com/JavierOramas/video-diet/blob/81a5df4ad27e8cd6fff1be4974067631343a4354/diet_video/__init__.py#L42) and I don't really understand it so I can't simplify it. Could someone who understands it, explain it step by step? ```py def convert_video_progress_bar(self, manager, cmd): name = self.path.rsplit(os.path.sep, 1)[-1] proc = expect.spawn(cmd, encoding='utf-8') pbar = None try: proc.expect(pattern_duration) total = sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) cont = 0 pbar = manager.counter( total=100, desc=name, unit='%', bar_format=BAR_FMT, counter_format=COUNTER_FMT, leave=False ) while True: proc.expect(pattern_progress) progress = sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) percent = progress/total*100 pbar.update(percent-cont) cont = percent except expect.EOF: traceback.print_exc() finally: if pbar is not None: pbar.close() proc.expect(expect.EOF) res = proc.before res += proc.read() exitstatus = proc.wait() if exitstatus: raise ffmpeg.Error('ffmpeg', '', res) ```
fedilink

I want to get the names and the percentages from the string. I’ve managed to get the percentage by substituting match for search. But I’m still getting an error while getting the name.


[SOLVED] Get list of strings from a messed multiline string
I want to do the following but I don't know how to get the name and percentage: `mymodule_test.py` ``` import unittest from unittest import TestCase from mymodule import get_wanted_cards, get_percentage class GetCardsTestCase(TestCase): def test_get_wanted_cards(self): s = ''' ↑ A Little Chat 92 (1%) 0 (0%) NEW ↑ An Offer You Can't Refuse 88 (87%) 0 (0%) NEW ↑ Angelic Observer 92 (91%) 0 (0%) NEW ''' expected = ["4 An Offer You Can't Refuse", "4 Angelic Observer"] actual = get_wanted_cards(s) self.assertEqual(actual, expected) def test_get_percentage(self): s = "92 (1%)" expected = 1 actual = get_percentage(s) self.assertEqual(actual, expected) if __name__ == '__main__': unittest.main() ``` `mymodule.py` ``` import re from typing import List def get_wanted_cards(s: str) -> List[str]: res = [] for line in s.splitlines(): array = line.split("\t") if len(array) < 5: continue name = array[1] percent = get_percentage(array[2]) if percent >= 50: res += "4 " + name return res def get_percentage(s: str) -> int: return int(re.match(r'\(([0-9]*)%\)', s).group(1)) if __name__ == "__main__": pass ```
fedilink