DCGAN debugging. Getting just garbage
Asked Answered
R

1

31

Introduction:

I am trying to get a CDCGAN (Conditional Deep Convolutional Generative Adversarial Network) to work on the MNIST dataset which should be fairly easy considering that the library (PyTorch) I am using has a tutorial on its website.
But I can't seem to get It working it just produces garbage or the model collapses or both.

What I tried:

  • making the model Conditional semi-supervised learning
  • using batch norm
  • using dropout on each layer besides the input/output layer on the generator and discriminator
  • label smoothing to combat overconfidence
  • adding noise to the images (I guess you call this instance noise) to get a better data distribution
  • use leaky relu to avoid vanishing gradients
  • using a replay buffer to combat forgetting of learned stuff and overfitting
  • playing with hyperparameters
  • comparing it to the model from PyTorch tutorial
  • basically what I did besides some things like Embedding layer ect.

Images my Model generated:

Hyperparameters:

batch_size=50, learning_rate_discrimiantor=0.0001, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, droupout=0.5
enter image description here enter image description here enter image description here enter image description here

batch_size=50, learning_rate_discriminator=0.0003, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, dropout=0
enter image description here enter image description here enter image description here enter image description here

Images Pytorch tutorial Model generated:

Code for the pytorch tutorial dcgan model
As comparison here are the images from the DCGAN from the pytorch turoial:
enter image description here enter image description here enter image description here

My Code:

import torch
import torch.nn as nn
import torchvision
from torchvision import transforms, datasets
import torch.nn.functional as F
from torch import optim as optim
from torch.utils.tensorboard import SummaryWriter

import numpy as np

import os
import time


class Discriminator(torch.nn.Module):
    def __init__(self, ndf=16, dropout_value=0.5):  # ndf feature map discriminator
        super().__init__()
        self.ndf = ndf
        self.droupout_value = dropout_value

        self.condi = nn.Sequential(
            nn.Linear(in_features=10, out_features=64 * 64)
        )

        self.hidden0 = nn.Sequential(
            nn.Conv2d(in_channels=2, out_channels=self.ndf, kernel_size=4, stride=2, padding=1, bias=False),
            nn.LeakyReLU(0.2),
        )
        self.hidden1 = nn.Sequential(
            nn.Conv2d(in_channels=self.ndf, out_channels=self.ndf * 2, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(self.ndf * 2),
            nn.LeakyReLU(0.2),
            nn.Dropout(self.droupout_value)
        )
        self.hidden2 = nn.Sequential(
            nn.Conv2d(in_channels=self.ndf * 2, out_channels=self.ndf * 4, kernel_size=4, stride=2, padding=1, bias=False),
            #nn.BatchNorm2d(self.ndf * 4),
            nn.LeakyReLU(0.2),
            nn.Dropout(self.droupout_value)
        )
        self.hidden3 = nn.Sequential(
            nn.Conv2d(in_channels=self.ndf * 4, out_channels=self.ndf * 8, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(self.ndf * 8),
            nn.LeakyReLU(0.2),
            nn.Dropout(self.droupout_value)
        )
        self.out = nn.Sequential(
            nn.Conv2d(in_channels=self.ndf * 8, out_channels=1, kernel_size=4, stride=1, padding=0, bias=False),
            torch.nn.Sigmoid()
        )

    def forward(self, x, y):
        y = self.condi(y.view(-1, 10))
        y = y.view(-1, 1, 64, 64)

        x = torch.cat((x, y), dim=1)

        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.hidden2(x)
        x = self.hidden3(x)
        x = self.out(x)

        return x


class Generator(torch.nn.Module):
    def __init__(self, n_features=100, ngf=16, c_channels=1, dropout_value=0.5):  # ngf feature map of generator
        super().__init__()
        self.ngf = ngf
        self.n_features = n_features
        self.c_channels = c_channels
        self.droupout_value = dropout_value

        self.hidden0 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=self.n_features + 10, out_channels=self.ngf * 8,
                               kernel_size=4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(self.ngf * 8),
            nn.LeakyReLU(0.2)
        )

        self.hidden1 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=self.ngf * 8, out_channels=self.ngf * 4,
                               kernel_size=4, stride=2, padding=1, bias=False),
            #nn.BatchNorm2d(self.ngf * 4),
            nn.LeakyReLU(0.2),
            nn.Dropout(self.droupout_value)
        )

        self.hidden2 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=self.ngf * 4, out_channels=self.ngf * 2,
                               kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(self.ngf * 2),
            nn.LeakyReLU(0.2),
            nn.Dropout(self.droupout_value)
        )

        self.hidden3 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=self.ngf * 2, out_channels=self.ngf,
                               kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(self.ngf),
            nn.LeakyReLU(0.2),
            nn.Dropout(self.droupout_value)
        )

        self.out = nn.Sequential(
            # "out_channels=1" because gray scale
            nn.ConvTranspose2d(in_channels=self.ngf, out_channels=1, kernel_size=4,
                               stride=2, padding=1, bias=False),
            nn.Tanh()
        )

    def forward(self, x, y):
        x_cond = torch.cat((x, y), dim=1)  # Combine flatten image with conditional input (class labels)

        x = self.hidden0(x_cond)           # Image goes into a "ConvTranspose2d" layer
        x = self.hidden1(x)
        x = self.hidden2(x)
        x = self.hidden3(x)
        x = self.out(x)

        return x


class Logger:
    def __init__(self, model_name, model1, model2, m1_optimizer, m2_optimizer, model_parameter, train_loader):
        self.out_dir = "data"
        self.model_name = model_name
        self.train_loader = train_loader
        self.model1 = model1
        self.model2 = model2
        self.model_parameter = model_parameter
        self.m1_optimizer = m1_optimizer
        self.m2_optimizer = m2_optimizer

        # Exclude Epochs of the model name. This make sense e.g. when we stop a training progress and continue later on.
        self.experiment_name = '_'.join("{!s}={!r}".format(k, v) for (k, v) in model_parameter.items())\
            .replace("Epochs" + "=" + str(model_parameter["Epochs"]), "")

        self.d_error = 0
        self.g_error = 0

        self.tb = SummaryWriter(log_dir=str(self.out_dir + "/log/" + self.model_name + "/runs/" + self.experiment_name))

        self.path_image = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/images/{self.experiment_name}')
        self.path_model = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/model/{self.experiment_name}')

        try:
            os.makedirs(self.path_image)
        except Exception as e:
            print("WARNING: ", str(e))

        try:
            os.makedirs(self.path_model)
        except Exception as e:
            print("WARNING: ", str(e))

    def log_graph(self, model1_input, model2_input, model1_label, model2_label):
        self.tb.add_graph(self.model1, input_to_model=(model1_input, model1_label))
        self.tb.add_graph(self.model2, input_to_model=(model2_input, model2_label))

    def log(self, num_epoch, d_error, g_error):
        self.d_error = d_error
        self.g_error = g_error

        self.tb.add_scalar("Discriminator Train Error", self.d_error, num_epoch)
        self.tb.add_scalar("Generator Train Error", self.g_error, num_epoch)

    def log_image(self, images, epoch, batch_num):
        grid = torchvision.utils.make_grid(images)
        torchvision.utils.save_image(grid, f'{self.path_image}\\Epoch_{epoch}_batch_{batch_num}.png')

        self.tb.add_image("Generator Image", grid)

    def log_histogramm(self):
        for name, param in self.model2.named_parameters():
            self.tb.add_histogram(name, param, self.model_parameter["Epochs"])
            self.tb.add_histogram(f'gen_{name}.grad', param.grad, self.model_parameter["Epochs"])

        for name, param in self.model1.named_parameters():
            self.tb.add_histogram(name, param, self.model_parameter["Epochs"])
            self.tb.add_histogram(f'dis_{name}.grad', param.grad, self.model_parameter["Epochs"])

    def log_model(self, num_epoch):
        torch.save({
            "epoch": num_epoch,
            "model_generator_state_dict": self.model1.state_dict(),
            "model_discriminator_state_dict": self.model2.state_dict(),
            "optimizer_generator_state_dict":  self.m1_optimizer.state_dict(),
            "optimizer_discriminator_state_dict":  self.m2_optimizer.state_dict(),
        }, str(self.path_model + f'\\{time.time()}_epoch{num_epoch}.pth'))

    def close(self, logger, images, num_epoch,  d_error, g_error):
        logger.log_model(num_epoch)
        logger.log_histogramm()
        logger.log(num_epoch, d_error, g_error)
        self.tb.close()

    def display_stats(self, epoch, batch_num, dis_error, gen_error):
        print(f'Epoch: [{epoch}/{self.model_parameter["Epochs"]}] '
              f'Batch: [{batch_num}/{len(self.train_loader)}] '
              f'Loss_D: {dis_error.data.cpu()}, '
              f'Loss_G: {gen_error.data.cpu()}')


def get_MNIST_dataset(num_workers_loader, model_parameter, out_dir="data"):
    compose = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.CenterCrop((64, 64)),
        transforms.ToTensor(),
        torchvision.transforms.Normalize(mean=[0.5], std=[0.5])
    ])

    dataset = datasets.MNIST(
        root=out_dir,
        train=True,
        download=True,
        transform=compose
    )

    train_loader = torch.utils.data.DataLoader(dataset,
                                               batch_size=model_parameter["batch_size"],
                                               num_workers=num_workers_loader,
                                               shuffle=model_parameter["shuffle"])

    return dataset, train_loader


def train_discriminator(p_optimizer, p_noise, p_images, p_fake_target, p_real_target, p_images_labels, p_fake_labels, device):
    p_optimizer.zero_grad()

    # 1.1 Train on real data
    pred_dis_real = discriminator(p_images, p_images_labels)
    error_real = loss(pred_dis_real, p_real_target)

    error_real.backward()

    # 1.2 Train on fake data
    fake_data = generator(p_noise, p_fake_labels).detach()
    fake_data = add_noise_to_image(fake_data, device)
    pred_dis_fake = discriminator(fake_data, p_fake_labels)
    error_fake = loss(pred_dis_fake, p_fake_target)

    error_fake.backward()

    p_optimizer.step()

    return error_fake + error_real


def train_generator(p_optimizer, p_noise, p_real_target, p_fake_labels, device):
    p_optimizer.zero_grad()

    fake_images = generator(p_noise, p_fake_labels)
    fake_images = add_noise_to_image(fake_images, device)
    pred_dis_fake = discriminator(fake_images, p_fake_labels)
    error_fake = loss(pred_dis_fake, p_real_target)  # because
    """
    We use "p_real_target" instead of "p_fake_target" because we want to 
    maximize that the discriminator is wrong.
    """

    error_fake.backward()

    p_optimizer.step()

    return fake_images, pred_dis_fake, error_fake


# TODO change to a Truncated normal distribution
def get_noise(batch_size, n_features=100):
    return torch.FloatTensor(batch_size, n_features, 1, 1).uniform_(-1, 1)


# We flip label of real and fate data. Better gradient flow I have told
def get_real_data_target(batch_size):
    return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.0, 0.2)


def get_fake_data_target(batch_size):
    return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.8, 1.1)


def image_to_vector(images):
    return torch.flatten(images, start_dim=1, end_dim=-1)


def vector_to_image(images):
    return images.view(images.size(0), 1, 28, 28)


def get_rand_labels(batch_size):
    return torch.randint(low=0, high=9, size=(batch_size,))


def load_model(model_load_path):
    if model_load_path:
        checkpoint = torch.load(model_load_path)

        discriminator.load_state_dict(checkpoint["model_discriminator_state_dict"])
        generator.load_state_dict(checkpoint["model_generator_state_dict"])

        dis_opti.load_state_dict(checkpoint["optimizer_discriminator_state_dict"])
        gen_opti.load_state_dict(checkpoint["optimizer_generator_state_dict"])

        return checkpoint["epoch"]

    else:
        return 0


def init_model_optimizer(model_parameter, device):
    # Initialize the Models
    discriminator = Discriminator(ndf=model_parameter["ndf"], dropout_value=model_parameter["dropout"]).to(device)
    generator = Generator(ngf=model_parameter["ngf"], dropout_value=model_parameter["dropout"]).to(device)

    # train
    dis_opti = optim.Adam(discriminator.parameters(), lr=model_parameter["learning_rate_dis"], betas=(0.5, 0.999))
    gen_opti = optim.Adam(generator.parameters(), lr=model_parameter["learning_rate_gen"], betas=(0.5, 0.999))

    return discriminator, generator, dis_opti, gen_opti


def get_hot_vector_encode(labels, device):
    return torch.eye(10)[labels].view(-1, 10, 1, 1).to(device)


def add_noise_to_image(images, device, level_of_noise=0.1):
    return images[0].to(device) + (level_of_noise) * torch.randn(images.shape).to(device)


if __name__ == "__main__":
    # Hyperparameter
    model_parameter = {
        "batch_size": 500,
        "learning_rate_dis": 0.0002,
        "learning_rate_gen": 0.0002,
        "shuffle": False,
        "Epochs": 10,
        "ndf": 64,
        "ngf": 64,
        "dropout": 0.5
    }

    # Parameter
    r_frequent = 10        # How many samples we save for replay per batch (batch_size / r_frequent).
    model_name = "CDCGAN"   # The name of you model e.g. "Gan"
    num_workers_loader = 1  # How many workers should load the data
    sample_save_size = 16   # How many numbers your saved imaged should show
    device = "cuda"         # Which device should be used to train the neural network
    model_load_path = ""    # If set load model instead of training from new
    num_epoch_log = 1       # How frequent you want to log/
    torch.manual_seed(43)   # Sets a seed for torch for reproducibility

    dataset_train, train_loader = get_MNIST_dataset(num_workers_loader, model_parameter)  # Get dataset

    # Initialize the Models and optimizer
    discriminator, generator, dis_opti, gen_opti = init_model_optimizer(model_parameter, device)  # Init model/Optimizer

    start_epoch = load_model(model_load_path)  # when we want to load a model

    # Init Logger
    logger = Logger(model_name, generator, discriminator, gen_opti, dis_opti, model_parameter, train_loader)

    loss = nn.BCELoss()

    images, labels = next(iter(train_loader))  # For logging

    # For testing
    # pred = generator(get_noise(model_parameter["batch_size"]).to(device), get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device))
    # dis = discriminator(images.to(device), get_hot_vector_encode(labels, device))

    logger.log_graph(get_noise(model_parameter["batch_size"]).to(device), images.to(device),
                     get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device),
                     get_hot_vector_encode(labels, device))


    # Array to store
    exp_replay = torch.tensor([]).to(device)

    for num_epoch in range(start_epoch, model_parameter["Epochs"]):
        for batch_num, data_loader in enumerate(train_loader):
            images, labels = data_loader
            images = add_noise_to_image(images, device)  # Add noise to the images

            # 1. Train Discriminator
            dis_error = train_discriminator(
                                            dis_opti,
                                            get_noise(model_parameter["batch_size"]).to(device),
                                            images.to(device),
                                            get_fake_data_target(model_parameter["batch_size"]).to(device),
                                            get_real_data_target(model_parameter["batch_size"]).to(device),
                                            get_hot_vector_encode(labels, device),
                                            get_hot_vector_encode(
                                                get_rand_labels(model_parameter["batch_size"]), device),
                                            device
                                            )

            # 2. Train Generator
            fake_image, pred_dis_fake, gen_error = train_generator(
                                                                  gen_opti,
                                                                  get_noise(model_parameter["batch_size"]).to(device),
                                                                  get_real_data_target(model_parameter["batch_size"]).to(device),
                                                                  get_hot_vector_encode(
                                                                      get_rand_labels(model_parameter["batch_size"]),
                                                                      device),
                                                                  device
                                                                  )


            # Store a random point for experience replay
            perm = torch.randperm(fake_image.size(0))
            r_idx = perm[:max(1, int(model_parameter["batch_size"] / r_frequent))]
            r_samples = add_noise_to_image(fake_image[r_idx], device)
            exp_replay = torch.cat((exp_replay, r_samples), 0).detach()

            if exp_replay.size(0) >= model_parameter["batch_size"]:
                # Train on experienced data
                dis_opti.zero_grad()

                r_label = get_hot_vector_encode(torch.zeros(exp_replay.size(0)).numpy(), device)
                pred_dis_real = discriminator(exp_replay, r_label)
                error_real = loss(pred_dis_real,  get_fake_data_target(exp_replay.size(0)).to(device))

                error_real.backward()

                dis_opti.step()

                print(f'Epoch: [{num_epoch}/{model_parameter["Epochs"]}] '
                      f'Batch: Replay/Experience batch '
                      f'Loss_D: {error_real.data.cpu()}, '
                      )

                exp_replay = torch.tensor([]).to(device)

            logger.display_stats(epoch=num_epoch, batch_num=batch_num, dis_error=dis_error, gen_error=gen_error)

            if batch_num % 100 == 0:
                logger.log_image(fake_image[:sample_save_size], num_epoch, batch_num)

        logger.log(num_epoch, dis_error, gen_error)
        if num_epoch % num_epoch_log == 0:
            logger.log_model(num_epoch)
            logger.log_histogramm()
    logger.close(logger, fake_image[:sample_save_size], num_epoch, dis_error, gen_error)

First link to my Code (Pastebin)
Second link to my Code (0bin)

Conclusion:

Since I implemented all these things (e.g. label smoothing) which are considered beneficial to a GAN/DCGAN.
And my Model still performs worse than the Tutorial DCGAN from PyTorch I think I might have a bug in my code but I can't seem to find it.

Reproducibility:

You should be able to just copy the code and run it if you have the libraries that I imported installed to look for yourself if you can find anything.

I appreciate any feedback.

Recompense answered 26/2, 2020 at 19:47 Comment(9)
I find your question rather confusing Lupos. It isn't clear exactly what you want. If your goal is a working solution, have you seen this repo in GitHub? They have there an implementation. Does it solve your quest? If not, why not?Spavined
Like you have seen from my pictures my NN doesn't perform as well (it doesn't work at all) as the coressponding pytorch example. Which is weird to me since I basically do the same thing like them with some addition which only should make the model more stable which you can see here. So I either have a bug in my code or Some "hyperparameters" are wrong.Recompense
And your goal is to have us debug your code?Spavined
You have both re-implement the code and made addition to the model in the same time ? you should re-implement the exact same version of GAN from the tutorial, then test it and afterwards if it works make your label smoothing additionCatrinacatriona
@TiagoMartinsPeres not exactly. I hoped that maybe someone with experience would recognize the patterns in the pictures and a corresponding bug or advices how to debug it since its relativly ahrd to debug a Neural Network I have no Idea how.Recompense
@Catrinacatriona I working at that.Recompense
Isn't this question better suited for Data Science Stack Exchange?Orgeat
Your results look pretty good. By this I mean that they are number -like, which is what I'd expect. The Pytorch version just looks like it has been trained longer. As @Catrinacatriona points out, this is probably due to changes to the model. The Pytorch version will have been optimised to what worked for them and any deviation from this will worsen the results.Scharaga
From the result, It seem you have some level of mode collapse in your generator, which you can combat it using Minibatch discrimination, other methods and code(keras) for stabilizing training you can find here, My advise is you should start from simple implement code and make it work at some level, then add one component at time, otherwise, is hard to debug which work or notMonoicous
R
2

So I solved this issue a while ago, but forgot to post an answer on stack overflow. So I will simply post my code here which should work probably pretty good. Some disclaimer:

  • I am not quite sure if it works since I did this a year ago
  • its for 128x128px Images MNIST
  • It's not a vanilla GAN I used various optimization techniques
  • If you want to use it you need to change various details, such as the training dataset

Resources:


    import torch
    from torch.autograd import Variable
    import torch.nn as nn
    import torch.nn.functional as F
    import torchvision
    import torchvision.transforms as transforms
    from torch.utils.data import DataLoader
    
    import pytorch_lightning as pl
    from pytorch_lightning import loggers
    
    from numpy.random import choice
    
    import os
    from pathlib import Path
    import shutil
    
    from collections import OrderedDict
    
    # custom weights initialization called on netG and netD
    def weights_init(m):
        classname = m.__class__.__name__
        if classname.find('Conv') != -1:
            nn.init.normal_(m.weight.data, 0.0, 0.02)
        elif classname.find('BatchNorm') != -1:
            nn.init.normal_(m.weight.data, 1.0, 0.02)
            nn.init.constant_(m.bias.data, 0)
    
    # randomly flip some labels
    def noisy_labels(y, p_flip=0.05):  # # flip labels with 5% probability
        # determine the number of labels to flip
        n_select = int(p_flip * y.shape[0])
        # choose labels to flip
        flip_ix = choice([i for i in range(y.shape[0])], size=n_select)
        # invert the labels in place
        y[flip_ix] = 1 - y[flip_ix]
        return y
    
    class AddGaussianNoise(object):
        def __init__(self, mean=0.0, std=0.1):
            self.std = std
            self.mean = mean
    
        def __call__(self, tensor):
            tensor = tensor.cuda()
            return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda()
    
        def __repr__(self):
            return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
    
    def resize2d(img, size):
        return (F.adaptive_avg_pool2d(img, size).data).cuda()
    
    def get_valid_labels(img):
        return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda()  # soft labels
    
    def get_unvalid_labels(img):
        return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda()  # soft labels
    
    class Generator(pl.LightningModule):
        def __init__(self, ngf, nc, latent_dim):
            super(Generator, self).__init__()
            self.ngf = ngf
            self.latent_dim = latent_dim
            self.nc = nc
    
            self.fc0 = nn.Sequential(
                # input is Z, going into a convolution
                nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 16)
            )
    
            self.fc1 = nn.Sequential(
                # state size. (ngf*8) x 4 x 4
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 8)
            )
    
            self.fc2 = nn.Sequential(
                # state size. (ngf*4) x 8 x 8
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 4)
            )
    
            self.fc3 = nn.Sequential(
                # state size. (ngf*2) x 16 x 16
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf * 2)
            )
    
            self.fc4 = nn.Sequential(
                # state size. (ngf) x 32 x 32
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ngf)
            )
    
            self.fc5 = nn.Sequential(
                # state size. (nc) x 64 x 64
                nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)),
                nn.Tanh()
            )
    
            # state size. (nc) x 128 x 128
    
            # For Multi-Scale Gradient
            # Converting the intermediate layers into images
            self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1)
            self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1)
            self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1)
            self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1)
            self.fc4_r = nn.Conv2d(ngf, self.nc, 1)
    
        def forward(self, input):
            x_0 = self.fc0(input)
            x_1 = self.fc1(x_0)
            x_2 = self.fc2(x_1)
            x_3 = self.fc3(x_2)
            x_4 = self.fc4(x_3)
            x_5 = self.fc5(x_4)
    
            # For Multi-Scale Gradient
            # Converting the intermediate layers into images
            x_0_r = self.fc0_r(x_0)
            x_1_r = self.fc1_r(x_1)
            x_2_r = self.fc2_r(x_2)
            x_3_r = self.fc3_r(x_3)
            x_4_r = self.fc4_r(x_4)
    
            return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r
    
    class Discriminator(pl.LightningModule):
        def __init__(self, ndf, nc):
            super(Discriminator, self).__init__()
            self.nc = nc
            self.ndf = ndf
    
            self.fc0 = nn.Sequential(
                # input is (nc) x 128 x 128
                nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True)
            )
    
            self.fc1 = nn.Sequential(
                # state size. (ndf) x 64 x 64
                nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)),
                # "+ nc" because of multi scale gradient
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 2)
            )
    
            self.fc2 = nn.Sequential(
                # state size. (ndf*2) x 32 x 32
                nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)),
                # "+ nc" because of multi scale gradient
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 4)
            )
    
            self.fc3 = nn.Sequential(
                # state size. (ndf*4) x 16 x 16e
                nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)),
                # "+ nc" because of multi scale gradient
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 8),
            )
    
            self.fc4 = nn.Sequential(
                # state size. (ndf*8) x 8 x 8
                nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)),
                nn.LeakyReLU(0.2, inplace=True),
                nn.BatchNorm2d(ndf * 16)
            )
    
            self.fc5 = nn.Sequential(
                # state size. (ndf*8) x 4 x 4
                nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)),
                nn.Sigmoid()
            )
    
            # state size. 1 x 1 x 1
    
        def forward(self, input, detach_or_not):
            # When we train i ncombination with generator we use multi scale gradient.
            x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input
            if detach_or_not:
                x = x.detach()
    
            x_0 = self.fc0(x)
    
            x_0 = torch.cat((x_0, x_4_r), dim=1)  # Concat Multi-Scale Gradient
            x_1 = self.fc1(x_0)
    
            x_1 = torch.cat((x_1, x_3_r), dim=1)  # Concat Multi-Scale Gradient
            x_2 = self.fc2(x_1)
    
            x_2 = torch.cat((x_2, x_2_r), dim=1)  # Concat Multi-Scale Gradient
            x_3 = self.fc3(x_2)
    
            x_3 = torch.cat((x_3, x_1_r), dim=1)  # Concat Multi-Scale Gradient
            x_4 = self.fc4(x_3)
    
            x_4 = torch.cat((x_4, x_0_r), dim=1)  # Concat Multi-Scale Gradient
            x_5 = self.fc5(x_4)
    
            return x_5
    
    class DCGAN(pl.LightningModule):
    
        def __init__(self, hparams, checkpoint_folder, experiment_name):
            super().__init__()
            self.hparams = hparams
            self.checkpoint_folder = checkpoint_folder
            self.experiment_name = experiment_name
    
            # networks
            self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim)
            self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc)
            self.generator.apply(weights_init)
            self.discriminator.apply(weights_init)
    
            # cache for generated images
            self.generated_imgs = None
            self.last_imgs = None
    
            # For experience replay
            self.exp_replay_dis = torch.tensor([])
    
    
        def forward(self, z):
            return self.generator(z)
    
        def adversarial_loss(self, y_hat, y):
            return F.binary_cross_entropy(y_hat, y)
    
        def training_step(self, batch, batch_nb, optimizer_idx):
            # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/
            std_gaussian = max(0, self.hparams.level_of_noise - (
                    (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs)))
            AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian)  # the noise decays over time
    
            imgs, _ = batch
            imgs = AddGaussianNoiseInst(imgs)  # Adding instance noise to real images
            self.last_imgs = imgs
    
            # train generator
            if optimizer_idx == 0:
                # sample noise
                z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda()
    
                # generate images
                self.generated_imgs = self(z)
    
                # ground truth result (ie: all fake)
                g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0]))  # adversarial loss is binary cross-entropy; [0] is the image of the last layer
    
                tqdm_dict = {'g_loss': g_loss}
                log = {'g_loss': g_loss, "std_gaussian": std_gaussian}
                output = OrderedDict({
                    'loss': g_loss,
                    'progress_bar': tqdm_dict,
                    'log': log
                })
                return output
    
            # train discriminator
            if optimizer_idx == 1:
                # Measure discriminator's ability to classify real from generated samples
                # how well can it label as real?
                real_loss = self.adversarial_loss(
                    self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)],
                                       False), get_valid_labels(imgs))
    
                fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels(
                    self.generated_imgs[0]))  # how well can it label as fake?; [0] is the image of the last layer
    
                # discriminator loss is the average of these
                d_loss = (real_loss + fake_loss) / 2
    
                tqdm_dict = {'d_loss': d_loss}
                log = {'d_loss': d_loss, "std_gaussian": std_gaussian}
                output = OrderedDict({
                    'loss': d_loss,
                    'progress_bar': tqdm_dict,
                    'log': log
                })
                return output
    
        def configure_optimizers(self):
            lr_gen = self.hparams.lr_gen
            lr_dis = self.hparams.lr_dis
            b1 = self.hparams.b1
            b2 = self.hparams.b2
    
            opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2))
            opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2))
            return [opt_g, opt_d], []
    
        def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None:
            loss.backward(retain_graph=True)
    
        def train_dataloader(self):
            # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
            #                                 transforms.ToTensor(),
            #                                 transforms.Normalize([0.5], [0.5])])
            # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform)
            # return DataLoader(dataset, batch_size=self.hparams.batch_size)
            # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
            #                                 transforms.ToTensor(),
            #                                 transforms.Normalize([0.5], [0.5])
            #                                 ])
    
            # train_dataset = torchvision.datasets.ImageFolder(
            #     root="./drive/My Drive/datasets/flower_dataset/",
            #     # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/",
            #     transform=transform
            # )
            # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,
            #                   batch_size=self.hparams.batch_size)
    
            transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),
                                            transforms.ToTensor(),
                                            transforms.Normalize([0.5], [0.5])
                                            ])
            train_dataset = torchvision.datasets.ImageFolder(
                root="ghibli_dataset_small_overfit/",
                transform=transform
            )
            return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,
                              batch_size=self.hparams.batch_size)
    
        def on_epoch_end(self):
            z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda()
            # match gpu device (or keep as cpu)
            if self.on_gpu:
                z = z.cuda(self.last_imgs.device.index)
    
            # log sampled images
            sample_imgs = self.generator(z)[0]
            torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png')
    
            # save model
            if self.current_epoch % self.hparams.save_model_every_epoch == 0:
                trainer.save_checkpoint(
                    self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt")
    
    from argparse import Namespace
    
    args = {
        'batch_size': 128, # batch size
        'lr_gen': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002
        'lr_dis': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002
        'b1': 0.5,  # Momentum for adam; tested value(dcgan paper): 0.5
        'b2': 0.999,  # Momentum for adam; tested value(dcgan paper): 0.999
        'latent_dim': 256,  # tested value which worked(in V4_1): 100
        'nc': 3,  # number of color channels
        'ndf': 8,  # number of discriminator features
        'ngf': 8,  # number of generator features
        'epochs': 4,  # the maxima lamount of epochs the algorith should run
        'save_model_every_epoch': 1,  # how often we save our model
        'image_size': 128, # size of the image
        'num_workers': 3,
        'level_of_noise': 0.1,  # how much instance noise we introduce(std; tested value: 0.15 and 0.1
        'experience_save_per_batch': 1,  # this value should be very low; tested value which works: 1
        'experience_batch_size': 50  # this value shouldnt be too high; tested value which works: 50
    }
    hparams = Namespace(**args)
    
    # Parameters
    experiment_name = "DCGAN_6_2_MNIST_128px"
    dataset_name = "mnist"
    checkpoint_folder = "DCGAN/"
    tags = ["DCGAN", "128x128"]
    dirpath = Path(checkpoint_folder)
    
    # defining net
    net = DCGAN(hparams, checkpoint_folder, experiment_name)
    
    torch.autograd.set_detect_anomaly(True)
    trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt",
        max_epochs=args["epochs"],
        gpus=1
    )
    
    trainer.fit(net)
Recompense answered 29/1, 2021 at 20:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.