|
|
@ -1,4 +1,23 @@
|
|
|
|
import click
|
|
|
|
import click
|
|
|
|
|
|
|
|
from numpy import require
|
|
|
|
|
|
|
|
from longinus.utils.dataset import Dataset
|
|
|
|
|
|
|
|
from longinus.coeff_encoders.neural_network import NeuralNetworkEncoder
|
|
|
|
|
|
|
|
from longinus.utils.image_encoder import ImageEncoder
|
|
|
|
|
|
|
|
from PIL import Image
|
|
|
|
|
|
|
|
from longinus.utils.tools import psnr
|
|
|
|
|
|
|
|
from rich.console import Console
|
|
|
|
|
|
|
|
from rich.spinner import Spinner
|
|
|
|
|
|
|
|
from rich.progress import Progress
|
|
|
|
|
|
|
|
from sklearn.model_selection import train_test_split
|
|
|
|
|
|
|
|
import traceback
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
import turbojpeg
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from PIL.JpegImagePlugin import JpegImageFile
|
|
|
|
|
|
|
|
console = Console()
|
|
|
|
|
|
|
|
|
|
|
|
@click.group()
|
|
|
|
@click.group()
|
|
|
|
def cli():
|
|
|
|
def cli():
|
|
|
@ -6,20 +25,172 @@ def cli():
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
@click.command()
|
|
|
|
def reencode():
|
|
|
|
@click.option('-i', '--input-dataset-path', type=click.Path(exists=True), required=True)
|
|
|
|
"""Reencodes the image using custom coefficient calculating algorithm"""
|
|
|
|
@click.option('-o', '--output-images-path', type=click.Path(exists=True), required=True)
|
|
|
|
click.echo("Test")
|
|
|
|
@click.option('-m', '--input-model-path', type=click.Path(exists=True), required=True)
|
|
|
|
|
|
|
|
@click.option('-e', '--export-data-path', type=str, required=True)
|
|
|
|
|
|
|
|
def bulk_bmp_reencode(input_dataset_path: str, input_model_path: str, output_images_path: str, export_data_path: str):
|
|
|
|
|
|
|
|
psnr_table = []
|
|
|
|
|
|
|
|
with Progress() as progress:
|
|
|
|
|
|
|
|
task = progress.add_task("[bold yellow]Processing images from provided dataset...", total=len(os.listdir(input_dataset_path)))
|
|
|
|
|
|
|
|
for filename in os.listdir(input_dataset_path):
|
|
|
|
|
|
|
|
console.print(f"\n[yellow]Processing image: {filename}")
|
|
|
|
|
|
|
|
file = os.path.join(input_dataset_path, filename)
|
|
|
|
|
|
|
|
if os.path.isfile(file):
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
nn_start_time = time.perf_counter_ns()
|
|
|
|
|
|
|
|
input_image = ImageEncoder(file)
|
|
|
|
|
|
|
|
input_image.read()
|
|
|
|
|
|
|
|
input_data_array = []
|
|
|
|
|
|
|
|
input_data_array.append(input_image.luma_array)
|
|
|
|
|
|
|
|
data = pd.DataFrame(input_data_array)
|
|
|
|
|
|
|
|
encoder = NeuralNetworkEncoder(
|
|
|
|
|
|
|
|
pretrained_weights_path=input_model_path
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
result_quant_table = encoder.predict_quantization_table(data)
|
|
|
|
|
|
|
|
result_quant_table = result_quant_table.flatten()
|
|
|
|
|
|
|
|
# print(result_quant_table)
|
|
|
|
|
|
|
|
output_image_path = os.path.join(output_images_path, filename.replace('.bmp', '.jpeg'))
|
|
|
|
|
|
|
|
jpeg_lib_output_image_path = os.path.join(output_images_path, f"jpeglib_{filename.replace('.bmp', '.jpeg')}")
|
|
|
|
|
|
|
|
turbojpeg_output_image_path = os.path.join(output_images_path, f"turbojpeg_{filename.replace('.bmp', '.jpeg')}")
|
|
|
|
|
|
|
|
output_image = ImageEncoder(output_image_path)
|
|
|
|
|
|
|
|
output_image.load_image_from_mem(input_image.luma_array_2d)
|
|
|
|
|
|
|
|
output_image.override_coefficients(result_quant_table)
|
|
|
|
|
|
|
|
output_image.write()
|
|
|
|
|
|
|
|
nn_end_time = time.perf_counter_ns()
|
|
|
|
|
|
|
|
test_output_image = ImageEncoder(output_image_path)
|
|
|
|
|
|
|
|
test_output_image.read()
|
|
|
|
|
|
|
|
jpeglib_start_time = time.perf_counter_ns()
|
|
|
|
|
|
|
|
jpeglib_image = Image.open(file)
|
|
|
|
|
|
|
|
jpeglib_image = jpeglib_image.convert('L')
|
|
|
|
|
|
|
|
jpeglib_image.save(jpeg_lib_output_image_path, "JPEG")
|
|
|
|
|
|
|
|
jpeglib_end_time = time.perf_counter_ns()
|
|
|
|
|
|
|
|
jpeglib_output_image = ImageEncoder(jpeg_lib_output_image_path)
|
|
|
|
|
|
|
|
jpeglib_output_image.read()
|
|
|
|
|
|
|
|
turbojpeg_start_time = time.perf_counter_ns()
|
|
|
|
|
|
|
|
turbojpeg_image = Image.open(file).convert('L')
|
|
|
|
|
|
|
|
turbojpeg_image_output = turbojpeg.compress(np.array(turbojpeg_image), 75, turbojpeg.SAMP.GRAY)
|
|
|
|
|
|
|
|
with open(turbojpeg_output_image_path, "wb") as turbojpeg_output_file:
|
|
|
|
|
|
|
|
turbojpeg_output_file.write(turbojpeg_image_output)
|
|
|
|
|
|
|
|
turbojpeg_stop_time = time.perf_counter_ns()
|
|
|
|
|
|
|
|
turbojpeg_image_new = ImageEncoder(turbojpeg_output_image_path)
|
|
|
|
|
|
|
|
turbojpeg_image_new.read()
|
|
|
|
|
|
|
|
#if isinstance(jpeglib_output_image.image_ptr, JpegImageFile):
|
|
|
|
|
|
|
|
# if hasattr(jpeglib_output_image.image_ptr, 'quantization'):
|
|
|
|
|
|
|
|
# print(jpeglib_output_image.image_ptr.quantization[0])
|
|
|
|
|
|
|
|
calculated_psnr_original_neural_network = psnr(input_image.luma_array_2d, test_output_image.luma_array_2d)
|
|
|
|
|
|
|
|
calculated_psnr_original_jpeglib = psnr(input_image.luma_array_2d, jpeglib_output_image.luma_array_2d)
|
|
|
|
|
|
|
|
calculated_psnr_original_turbojpeg = psnr(input_image.luma_array_2d, turbojpeg_image_new.luma_array_2d)
|
|
|
|
|
|
|
|
result = {
|
|
|
|
|
|
|
|
'input_image': file,
|
|
|
|
|
|
|
|
'output_image': output_image_path,
|
|
|
|
|
|
|
|
'psnr_orig_to_nn': calculated_psnr_original_neural_network,
|
|
|
|
|
|
|
|
'input_size': os.path.getsize(file),
|
|
|
|
|
|
|
|
'output_size': os.path.getsize(output_image_path),
|
|
|
|
|
|
|
|
'jpeglib_size': os.path.getsize(jpeg_lib_output_image_path),
|
|
|
|
|
|
|
|
'turbojpeg_size': os.path.getsize(turbojpeg_output_image_path),
|
|
|
|
|
|
|
|
'psnr_orig_to_jpeglib': calculated_psnr_original_jpeglib,
|
|
|
|
|
|
|
|
'psnr_orig_to_turbojpeg': calculated_psnr_original_turbojpeg,
|
|
|
|
|
|
|
|
'compression_duration_nn': nn_end_time - nn_start_time,
|
|
|
|
|
|
|
|
'compression_duration_jpeglib': jpeglib_end_time - jpeglib_start_time,
|
|
|
|
|
|
|
|
'compression_duration_turbojpeg': turbojpeg_stop_time - turbojpeg_start_time
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
psnr_table.append(result)
|
|
|
|
|
|
|
|
progress.update(task, advance=1)
|
|
|
|
|
|
|
|
console.print("[bold green]Done!")
|
|
|
|
|
|
|
|
psnr_dataframe = pd.DataFrame(psnr_table)
|
|
|
|
|
|
|
|
psnr_dataframe.to_csv(export_data_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
|
|
|
@click.option('-i', '--input-image-path', type=click.Path(exists=True), required=True)
|
|
|
|
|
|
|
|
@click.option('-m', '--input-model-path', type=click.Path(exists=True), required=True)
|
|
|
|
|
|
|
|
def reencode(input_image_path: str, input_model_path: str):
|
|
|
|
|
|
|
|
"""Reencodes the BMP image using custom coefficient calculating algorithm"""
|
|
|
|
|
|
|
|
click.echo("Reencode")
|
|
|
|
|
|
|
|
input_image = ImageEncoder(input_image_path)
|
|
|
|
|
|
|
|
input_image.read()
|
|
|
|
|
|
|
|
input_data_array = []
|
|
|
|
|
|
|
|
input_data_array.append(input_image.luma_array)
|
|
|
|
|
|
|
|
data = pd.DataFrame(input_data_array)
|
|
|
|
|
|
|
|
encoder = NeuralNetworkEncoder(
|
|
|
|
|
|
|
|
pretrained_weights_path=input_model_path
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
result_quant_table = encoder.predict_quantization_table(data)
|
|
|
|
|
|
|
|
result_quant_table = result_quant_table.flatten()
|
|
|
|
|
|
|
|
print(result_quant_table)
|
|
|
|
|
|
|
|
output_image = ImageEncoder(f"{input_image_path}_converted.jpeg")
|
|
|
|
|
|
|
|
output_image.load_image_from_mem(input_image.luma_array_2d)
|
|
|
|
|
|
|
|
output_image.override_coefficients(result_quant_table)
|
|
|
|
|
|
|
|
output_image.write()
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
@click.command()
|
|
|
|
@click.argument('dataset_path', type=click.Path(exists=True), required=True)
|
|
|
|
@click.option('-p', 'dataset_path', type=click.Path(exists=True), required=True)
|
|
|
|
def train(dataset_path: str):
|
|
|
|
@click.option('-o', 'output_model_path', type=str)
|
|
|
|
|
|
|
|
@click.option('-i', 'input_model_path', type=click.Path(exists=True))
|
|
|
|
|
|
|
|
def train(dataset_path: str, output_model_path: str, input_model_path: str):
|
|
|
|
"""Train the models using the dataset"""
|
|
|
|
"""Train the models using the dataset"""
|
|
|
|
click.echo("Train")
|
|
|
|
click.echo("Train")
|
|
|
|
pass
|
|
|
|
if not input_model_path:
|
|
|
|
|
|
|
|
encoder = NeuralNetworkEncoder(
|
|
|
|
|
|
|
|
internal_activation_function="relu",
|
|
|
|
|
|
|
|
external_activation_function="linear",
|
|
|
|
|
|
|
|
optimizer="adam",
|
|
|
|
|
|
|
|
loss_function="mean_squared_error",
|
|
|
|
|
|
|
|
image_dimension_x=512,
|
|
|
|
|
|
|
|
image_dimension_y=512
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
dataset = Dataset(dataset_path=dataset_path)
|
|
|
|
|
|
|
|
dataset.load_prepared_dataset()
|
|
|
|
|
|
|
|
dataset_size = dataset.dataset_size
|
|
|
|
|
|
|
|
current_index = 0
|
|
|
|
|
|
|
|
for batch in range(100, dataset_size, 100):
|
|
|
|
|
|
|
|
if (current_index > 0):
|
|
|
|
|
|
|
|
print("Loading training model...")
|
|
|
|
|
|
|
|
encoder = NeuralNetworkEncoder(pretrained_weights_path=output_model_path)
|
|
|
|
|
|
|
|
print(f"Extracting luma values for first index={current_index}")
|
|
|
|
|
|
|
|
extracted_luma_values = dataset.extract_luma_table(batch_size=100, first_item=current_index)
|
|
|
|
|
|
|
|
print(f"Extracting dct values for first index={current_index}")
|
|
|
|
|
|
|
|
extracted_dct_values = dataset.extract_dct_table(batch_size=100, first_item=current_index)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Splitting the dataset")
|
|
|
|
|
|
|
|
train_luma_df, temp_luma_df = train_test_split(extracted_luma_values, test_size=0.4, random_state=42)
|
|
|
|
|
|
|
|
validation_luma_df, test_luma_df = train_test_split(temp_luma_df, test_size=0.5, random_state=42)
|
|
|
|
|
|
|
|
train_dct_df, temp_dct_df = train_test_split(extracted_dct_values, test_size=0.4, random_state=42)
|
|
|
|
|
|
|
|
validation_dct_df, test_dct_df = train_test_split(temp_dct_df, test_size=0.5, random_state=42)
|
|
|
|
|
|
|
|
print("Training the model...")
|
|
|
|
|
|
|
|
encoder.train(
|
|
|
|
|
|
|
|
train_imageset=train_luma_df,
|
|
|
|
|
|
|
|
train_quantization_dataset=train_dct_df,
|
|
|
|
|
|
|
|
validation_dataset=validation_luma_df,
|
|
|
|
|
|
|
|
validation_quantization_dataset=validation_dct_df,
|
|
|
|
|
|
|
|
epochs=32,
|
|
|
|
|
|
|
|
batch_size=5
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
print("Exporting the model")
|
|
|
|
|
|
|
|
encoder.export_weights(output_model_path)
|
|
|
|
|
|
|
|
current_index = batch
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
|
|
|
|
@click.option('-p', 'dataset_path', type=click.Path(exists=True), required=True)
|
|
|
|
|
|
|
|
def prepare_dataset(dataset_path: str):
|
|
|
|
|
|
|
|
click.echo("Prepare dataset")
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
dataset = Dataset(dataset_path=dataset_path)
|
|
|
|
|
|
|
|
with console.status("[bold yellow]Preparing the dataset for training...") as status:
|
|
|
|
|
|
|
|
spinner = Spinner("dots", text="Processing")
|
|
|
|
|
|
|
|
dataset.prepare_dataset()
|
|
|
|
|
|
|
|
console.print("[bold green]Done! Processed files are now in directory: tmp_database")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
console.print("[bold red]Error when preparing dataset")
|
|
|
|
|
|
|
|
print(e)
|
|
|
|
|
|
|
|
print(traceback.print_exc())
|
|
|
|
|
|
|
|
|
|
|
|
cli.add_command(reencode)
|
|
|
|
cli.add_command(reencode)
|
|
|
|
cli.add_command(train)
|
|
|
|
cli.add_command(train)
|
|
|
|
|
|
|
|
cli.add_command(prepare_dataset)
|
|
|
|
|
|
|
|
cli.add_command(bulk_bmp_reencode)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
if __name__ == "__main__":
|
|
|
|
cli()
|
|
|
|
cli()
|
|
|
|