Commit 8a038aec authored by novelailab's avatar novelailab

ImageDataset fixes, scrape and benchmark scripts

parent 0c96e233
......@@ -9,8 +9,12 @@ import simplejpeg
import pickle
from pathlib import Path
from PIL import Image
from tqdm import tqdm
from concurrent.futures import as_completed
import requests
import hashlib
import io
import os
# Does this work with other block_sizes? doesn't seem to.
class FbDataset(data.Dataset):
......@@ -73,6 +77,14 @@ class ShardedImageDataset(data.Dataset):
with open(self.dataset_path, mode="r") as file_obj:
self.mmap = mmap.mmap(file_obj.fileno(), length=0, access=mmap.ACCESS_READ)
#precompute pointer lookup dict for faster random read
self.pointer_lookup = {}
for t in self.index:
offset, length, id = t
if type(id) == dict: # if id is identity
id = id['id']
self.pointer_lookup[id] = (offset, length)
#make so metadata is shardable by world_size(num_gpus)
#and batch_size
self.index = self.index[:len(self.index) - (len(self.index) % (bsz * world_size))]
......@@ -115,25 +127,29 @@ class ShardedImageDataset(data.Dataset):
return data, id
def read_from_id(self, id):
#to be used standalone
offset, size, _ = self.index[self.ids == id][0]
def read_from_id(self, id, decode=True):
offset, size = self.pointer_lookup[id]
data = self.mmap[offset:offset+size]
data = decode_jpeg(data)
if decode:
data = decode_jpeg(data)
return data
def get_metadata(self, id):
return self.metadata[id]
class ImageDatasetBuilder():
def __init__(self, folder_path, name, dataset=True, index=True, metadata=False, threads=None):
def __init__(self, folder_path, name, dataset=True, index=True, metadata=False, threads=None, block_size=4096, align_fs_blocks=True):
self.folder_path = Path(folder_path)
self.dataset_name = name + ".ds"
self.index_name = name + ".index"
self.metadata_name = name + ".metadata"
self.index_name_temp = name + ".temp.index"
self.metadata_name_temp = name + ".temp.metadata"
self.dataset_path = self.folder_path / self.dataset_name
self.index_path = self.folder_path / self.index_name
self.metadata_path = self.folder_path / self.metadata_name
self.index_path_temp = self.folder_path / self.index_name_temp
self.metadata_path_temp = self.folder_path / self.metadata_name_temp
self.open_dataset = dataset
self.open_index = index
self.open_metadata = metadata
......@@ -141,6 +157,8 @@ class ImageDatasetBuilder():
self.index = None
self.metadata = None
self.threads = threads
self.block_size = block_size
self.align_fs_blocks = align_fs_blocks
@property
def is_open(self):
......@@ -200,29 +218,39 @@ class ImageDatasetBuilder():
self.flush_metadata(silent=True)
print("Dataset closed and flushed.")
append_mode = False
if self.open_dataset and self.dataset_path.is_file():
self.dataset = open(self.dataset_path, mode="ab+")
append_mode = True
elif self.open_dataset:
self.dataset = open(self.dataset_path, mode="wb")
else:
raise Exception("Dataset file not found at {}".format(self.dataset_path))
if self.open_index and self.index_path.is_file():
with open(self.index_path, 'rb') as f:
self.index = pickle.load(f)
else:
elif append_mode:
raise Exception("Index file not found at {}".format(self.index_path))
else:
self.index = []
if self.open_metadata and self.metadata_path.is_file():
with open(self.metadata_path, 'rb') as f:
self.metadata = pickle.load(f)
else:
elif append_mode:
raise Exception("Metadata file not found at {}".format(self.metadata_path))
else:
self.metadata = {}
def operate(self, operation, batch, identities, metadata=None, executor=concurrent.futures.ThreadPoolExecutor, **kwargs):
def operate(self, operation, batch, identities, metadata=None, executor=concurrent.futures.ThreadPoolExecutor, use_tqdm=False, **kwargs):
executor = executor(max_workers=self.threads)
futures = executor.map(operation, batch, **kwargs)
futures = executor.map(operation, batch)
if use_tqdm:
futures = tqdm(futures, total=len(batch), leave=False)
futures = list(futures)
for data, identity in zip(futures, identities):
self.write(data, identity)
......@@ -233,7 +261,8 @@ class ImageDatasetBuilder():
except:
return None
else:
data = Image.open(io.BytesIO(image_data))
data = Image.open(io.BytesIO(data))
data = np.asarray(data)
data = simplejpeg.encode_jpeg(data, quality=91)
return data
......@@ -261,8 +290,17 @@ class ImageDatasetBuilder():
if data == None:
return
data_ptr = self.dataset.tell()
data_len = len(data)
self.index.append([data_ptr, data_len, identity])
self.dataset.write(data)
self.index.append([self.dataset.tell(), len(data), identity])
# block align
if self.align_fs_blocks:
remainder = (data_ptr + data_len) % self.block_size
if remainder != 0:
self.dataset.write(bytearray(self.block_size - remainder))
if self.metadata and metadata:
self.metadata[identity] = metadata
......@@ -277,17 +315,27 @@ class ImageDatasetBuilder():
print("Warning: Index not built, couldn't flush")
return
with open(self.index_path, 'wb') as f:
with open(self.index_path_temp, 'wb') as f:
pickle.dump(self.index, f)
try:
os.remove(self.index_path)
except: pass
os.rename(self.index_path_temp, self.index_path)
def flush_metadata(self, silent=False):
if not self.metadata and not silent:
print("Warning: Metadata not built, couldn't flush")
return
with open(self.metadata_path, 'wb') as f:
with open(self.metadata_path_temp, 'wb') as f:
pickle.dump(self.metadata, f)
try:
os.remove(self.metadata_path)
except: pass
os.rename(self.metadata_path_temp, self.metadata_path)
def flush(self, silent=False):
if not self.dataset and not silent:
print("Warning: Dataset not built, couldn't flush")
......
from basedformer.dataset import ShardedImageDataset
import time
import random
dataset_folder = "/home/xuser/nvme1/dataset/danbooru_updated_page/"
dataset = ShardedImageDataset(dataset_folder + "danbooru_updated.ds", dataset_folder + "danbooru_updated.index", None, bsz=1)
#read through first to normalize times / cache
total_size = 0
for key in dataset.pointer_lookup.keys():
offset, size = dataset.pointer_lookup[key]
data = list(dataset.mmap[offset:offset+size]) # convert to list to ensure data is actually read
total_size = total_size + size
#benchmark sequential read
start_time = time.time()
for key in dataset.pointer_lookup.keys():
offset, size = dataset.pointer_lookup[key]
data = list(dataset.mmap[offset:offset+size]) # convert to list to ensure data is actually read
end_time = time.time()
print("Sequential read took " + str(end_time - start_time) + " seconds to read " + str(total_size//1024//1024) + "MiB")
#benchmark random read
keys = list(dataset.pointer_lookup.keys())
random.shuffle(keys)
start_time = time.time()
for key in keys:
offset, size = dataset.pointer_lookup[key]
data = list(dataset.mmap[offset:offset+size]) # convert to list to ensure data is actually read
end_time = time.time()
print("Random read took " + str(end_time - start_time) + " seconds to read " + str(total_size//1024//1024) + "MiB")
\ No newline at end of file
# todo: arg parsing maybe
metadata_folder = "../danbooru_metadata/danbooru_metadata_new/"
existing_dataset = "/home/xuser/nvme1/dataset/danbooru/"
new_dataset_folder = "/home/xuser/nvme1/dataset/danbooru_updated/"
from tqdm import tqdm
import os
import json
from codecs import decode
from basedformer.dataset import ShardedImageDataset, ImageDatasetBuilder
all_metadata = {}
for f in tqdm(os.listdir(metadata_folder)):
try:
metadata = json.loads(decode(open(metadata_folder + f, 'rb').read(), 'zlib').decode("UTF-8"))
for post in metadata:
if "id" in post:
all_metadata[post["id"]] = post
except:
print("Error parsing " + f)
print("Loading old dataset...")
old_dataset = ShardedImageDataset(existing_dataset + "danbooru.ds", existing_dataset + "danbooru.index", existing_dataset + "danbooru.metadata")
old_meta = old_dataset.metadata
old_lookup = old_dataset.pointer_lookup
print("Calculating entries that can be copied from existing db...")
bad_types = ['jpg', 'jpeg']
can_keep = {}
can_keep_list = []
for k in all_metadata.keys():
data = all_metadata[k]
if data['file_ext'] not in bad_types:
if data['id'] in old_meta and data['id'] in old_lookup:
#can_keep.append(data['id'])
can_keep[data['id']] = True
can_keep_list.append(data['id'])
print("can keep %d of %d with new db of %d" % (len(can_keep), len(old_meta), len(all_metadata)))
#estimate new dataset size
total_new_bytes = (os.path.getsize(existing_dataset + "danbooru.ds") * (len(can_keep) / len(old_meta)))
total_new_jpegs = 0
for k in all_metadata.keys():
if k not in can_keep:
data = all_metadata[k]
if data['file_ext'] in bad_types:
total_new_bytes = total_new_bytes + data['file_size']
total_new_jpegs = total_new_jpegs + 1
total_new_bytes = total_new_bytes + int((os.path.getsize(existing_dataset + "danbooru.ds") / len(old_meta)) * (len(all_metadata) - (len(can_keep) + total_new_jpegs)))
print("Estimated new dataset size: %dGiB" % (((total_new_bytes / 1024) / 1024) / 1024))
print("Copyng old db data...")
# detect block size of fs the archive is stored on
block_size = int(os.popen("getconf PAGE_SIZE").read().lstrip().rstrip()) #int(os.popen("stat -fc %s " + new_dataset_folder).read().lstrip().rstrip())
new_dataset = ImageDatasetBuilder(folder_path=new_dataset_folder, name="danbooru_updated", threads=32, block_size=block_size, align_fs_blocks=True)
new_dataset.open()
# how many operations to run at once
copy_chunk_size = 4096
for e in tqdm(range(0, len(can_keep_list), copy_chunk_size)):
chunk = can_keep_list[e:e+copy_chunk_size]
new_dataset.operate(lambda id: old_dataset.read_from_id(id, decode=False), chunk, [ all_metadata[e] for e in chunk ], use_tqdm=True)
new_dataset.flush()
new_dataset.flush_index()
new_dataset.flush_metadata()
print("Scraping...")
to_scrape = []
for k in all_metadata.keys():
if k not in can_keep:
to_scrape.append(k)
def download_danbooru(id):
meta = all_metadata[id]
return new_dataset.url_op(meta["large_file_url"] if meta["has_large"] else meta["file_url"], meta["md5"])
save_every = 25
for e in tqdm(range(0, len(to_scrape), copy_chunk_size)):
chunk = to_scrape[e:e+copy_chunk_size]
new_dataset.operate(download_danbooru, chunk, [ all_metadata[e] for e in chunk ], use_tqdm=True)
if (e // copy_chunk_size) % save_every == 0:
new_dataset.flush()
new_dataset.flush_index()
new_dataset.flush_metadata()
new_dataset.flush()
new_dataset.flush_index()
new_dataset.flush_metadata()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment