DeepLearning/Computer Vision

[Pytorch] CycleGAN 코드 구현

yooj_lee 2022. 8. 26. 00:43
300x250

CycleGAN 같은 경우엔 코드 구현을 해볼 필요가 있겠다는 생각이 들어서 스크래치로 구현을 해보았습니다 (대략 3달 전에 구현했는데 게을러서 지금에야 포스팅을 작성하게 되었습니다 ㅎㅎ) . 최대한 논문만 참고해서 구현을 하도록 노력했지만, 실제로 official code를 보지 않고서는 성능 재현이 어려운 부분도 있었습니다. 일례로, 논문에 나와있는 대로 구현할 경우 discriminator output size가 원하는 크기로 나오지 않거나 마지막 convolution layer의 경우 block이 아닌데 block으로 논문에 적어두거나 하는 문제가 있었습니다. 직접 구현을 해보니 구현을 해야만 체감할 수 있는 부분들이 있어서 즐거운 경험이었습니다.

포스팅은 구현된 코드 및 그에 따른 깨달음, 실험 결과를 위주로 작성되었습니다. 코드 구현 포스팅을 보시기 전에 일전에 작성한 논문 리뷰 포스팅을 보시면 더욱 이해가 수월하실 듯 합니다. 또한 하기한 내용에 오류나 궁금하신 부분이 있을 경우, 댓글 부탁드립니다.

 

[Generative Models] Unpaired Image-to-Image Translation using Cycle-Consistent Adversarial Networks (2017)

이번에 리뷰할 논문은 CycleGAN으로, 2017년 ICCV에 발표된 논문입니다. inverse mapping과 cycle consistency loss를 통해 unsupervised image-to-image translation의 성능을 끌어올린 아키텍처로 인용횟수 또한..

daebaq27.tistory.com


실험 환경

  • python 3.6.9
  • torch 1.11+cuda11.3
  • NVIDIA A5000 24GB

 

프로젝트 구성

프로젝트는 다음과 같이 구성했습니다(대부분의 DL 프로젝트가 구성되는 방식으로 프로젝트를 구성했습니다).

  • augmentation.py (augmentation 관련)
  • criterion.py (loss 관련)
  • dataset.py (커스텀 데이터셋 구현)
  • model.py (모델 아키텍처 구현)
  • scheduler.py (learning rate scheduler 구현)
  • train.py (train loop 구현)
  • test.py (inference 구현)
  • utils.py (configuration parsing하는 부분, 실험 reproducibility 제어하는 코드, checkpoint 및 이미지 save & load하는 코드)

 


model.py

model.py에는 Generator와 Discriminator를 구현하였고, Generator의 경우에는 residual block을 활용하기에 따로 ResidualBlock 클래스를 구현하였습니다. 

 

Generator

class ResidualBlock(nn.Module):
    """
    residual block (He et al., 2016)
    """
    def __init__(self, in_channels:int, out_channels:int):
        """
        - Args
            in_channels: number of channels for an input feature map
            out_channels: number of channels for an output feature map

        - Note
            fixed a kernel_size to 3
        """
        super(ResidualBlock, self).__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels

        self.block = nn.Sequential(
            nn.Conv2d(self.in_channels, self.out_channels, kernel_size=3, padding='same', padding_mode='reflect'),
            nn.InstanceNorm2d(self.out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(self.in_channels, self.out_channels, kernel_size=3, padding='same', padding_mode='reflect'),
            nn.InstanceNorm2d(self.out_channels)
        )

    def forward(self, x):
        output = self.block(x) + x # skip-connection

        return output


class Generator(nn.Module):
    def __init__(self, init_channel:int, kernel_size:int, stride:int, n_blocks:int=6):
        """
        - Args
            stride: 2
            f_stride: 1/2
            kernel size: 9 (first and last), 3 for the others

            3 convolutions & 6 residual_blocks, 2 fractionally-strided convolutions
            one convolutions (features to RGB) -> 3 channel로 보낸다.

            instance normalization -> non-residual convolutional layers

            non-residual convolutional layers: followed by spatial batch normalization
            relu nonlinearities with the exception of the output layer
            + a scaled tanh to ensure that the output image has pixels in the range [0,255]
        """
        super(Generator, self).__init__()
        
        self.init_channel=init_channel
        self.kernel_size=kernel_size
        self.stride=stride
        self.n_blocks=n_blocks

        layers = OrderedDict()
        layers['conv_first'] = self._make_block(in_channels=3, out_channels=self.init_channel, kernel_size=7, stride=1, padding='same') # first layer

        # downsampling path (d_k) -> two downsampling blocks
        for i in range(2):
            ic = self.init_channel*(i+1)
            k = 2*ic
            layers[f'd_{k}'] = self._make_block(in_channels=ic, out_channels=k, kernel_size=self.kernel_size, stride=self.stride)

        # residual block (R_k) -> 6 or 9 blocks
        for i in range(self.n_blocks):
            layers[f'R{k}_{i+1}'] = ResidualBlock(k, k) # in_channel = out_channel로 동일한 channel dimension 유지

        # upsampling path (u_k) -> two upsampling blocks
        for i in range(2):
            k = int(k/2)
            layers[f'u_{k}'] = self._make_block(in_channels=k*2, out_channels=k, kernel_size=self.kernel_size, stride=self.stride, mode='u')

        # last conv layer
        layers['conv_last'] = nn.Conv2d(in_channels=self.init_channel, out_channels=3, kernel_size=7, stride=1, padding='same', padding_mode='reflect') # last conv layer (to rgb)
        layers['tanh'] = nn.Tanh()

        self.model = nn.Sequential(
            layers
        )
        
    def forward(self, x):
        op = self.model(x)
        assert op.shape == x.shape, f"output shape ({op.shape}) must be same with the input size ({x.shape})"
        return op

    def _make_block(self, in_channels:int, out_channels:int, kernel_size:int, stride:int, padding:Union[int,str]=1, mode:str='d'):
        """
        builds a conv block

        - Args
            in_channels (int): # of channels of input feature map
            out_channels (int): # of channels of output feature map
            kernel_size (int): kernel size for a convolutional layer
            stride (int): stride for a convolution
            padding (int): an amount of padding for input feature map
            mode (str): 'd'(downsampling mode) or 'u'(upsampling mode) (default: 'd')
        """
        
        block = []
        if mode.lower() == 'd':
            block.append(nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, padding_mode='reflect'))

        elif mode.lower() == 'u':
            block.append(nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, output_padding=1)) # output size를 input이랑 같게 해주려면 이렇게 설정을 해줄 수밖에 없음.

        block += [nn.InstanceNorm2d(out_channels), nn.ReLU(inplace=True)]

        return nn.Sequential(*block)

 

Generator는 contracting-expanding의 구조를 취하고 있었으므로, convolution과 transposed convolution을 활용하여 구현을 했습니다. reflection padding과 instance normalization 또한 논문과 같이 적용을 해주었습니다. 주의할 점은, 마지막 convolution의 경우에는 block 단위가 아니라 convolution layer만 넣어줘야 한다는 점입니다.

마지막 convolution의 경우에는 block 단위로 넣어줄 경우 이미지 픽셀값이 아예 깨져버리는 상황이 발생했습니다. 논문에서는 마지막 convolution의 경우에도 block 단위로 넣어준다고 설명이 되어 있었으나, 코드에서는 layer만 넣고 이후에 바로 tanh 활성화 함수를 통과하도록 구현되어 있었습니다. (생각해보니 논문에서 아무리 block으로 구현했다고 하더라도, relu를 통과하고 다시 tanh를 통과하는 괴랄한 경우는 없을 것입니다..비판적으로 논문을 바라봅시다..ㅎㅎ 그리고 꼭 저자코드를 확인해봅시다)

실제로 실험을 해본 결과, 마지막 convolution을 convolution layer가 아니라 convolution block으로 넣어준 것이 성능 재현 실패의 큰 원인이었습니다.

loss plot을 확인해본 결과,

 

generator의 마지막을 block 단위로 통과시킨 경우의 loss

 

위와 같이 두 discriminator가 완벽에 가깝게 학습이 되고, 이에 반해 두 generator의 loss는 지속적으로 증가함을 알 수 있었습니다. 즉, D가 너무 빨리 학습이 된 탓에 Generator의 학습이 제대로 이루어지지 않은 것으로 파악이 됩니다. 학습 초기, 마지막 convolution layer 이후에 InstanceNorm과 ReLU를 추가적으로 넣고 Tanh를 통과시킨 Generator의 상태가 D의 입장(?)에서 너무 판별하기 쉬운 상태였음을 추측해볼 수 있습니다. 이는 Vanilla GAN 논문에서도 GAN 학습 시 주의해야 할 점으로 꼽는 부분이기도 합니다. 

 

Discriminator

shape_dict=dict() # for checking the output's shape

class Discriminator(nn.Module):
    def __init__(self, n_layers:int=4, input_c:int=3, n_filter:int=64, kernel_size:int=4):
        """
        - Args
            n_layers (int): number of convolutional layers in the network (default=3)
            input_c (int): number of input channels (default=3)
            n_filter (int): number of filters in the first convolutional layer (default=64)
            kernel_size (int): kernel size for every convolutional layers in the network (default=4)
        
        - Output
            2-D tensor (b,)
        
        PatchGAN 구조 사용 -> 이미지를 patch로 넣어주겠다는 것이 아님. receptive field를 이용하는 듯함.

        size of receptive fields = 1 + L(K-1); L = # of layers, K = kernel size (under the stride=1)
        이 receptive fields를 70으로 잡아주겠다.
        """
        super(Discriminator, self).__init__()
        self.model = nn.Sequential()
        self.kernel_size=kernel_size
        self.n_layers = n_layers
        layers = []
        
        # building conv block
        for i in range(self.n_layers):
            if i == 0:
                ic, oc = input_c, n_filter
                layers.append(self._make_block(ic, oc, kernel_size=self.kernel_size, stride=2, padding=1, normalize=False))
            else:
                ic = oc
                oc = 2*ic
                stride=2
                
                if i == self.n_layers-1: # 마지막 레이어(c512)의 경우, stride=1로 설정할 것.
                    stride=1

                layers.append(self._make_block(ic, oc, kernel_size=self.kernel_size, stride=stride, padding=1))

        # prediction
        layers.append(nn.Conv2d(oc, 1, kernel_size=self.kernel_size, stride=1, padding=1))

        self.model = nn.Sequential(*layers)


    def forward(self, x):
        return self.model(x)


    def _make_block(self, in_channels, out_channels, stride, kernel_size=3, padding=0, normalize=True):
        layers = [nn.Conv2d(in_channels=in_channels, out_channels=out_channels, stride=stride, kernel_size=kernel_size, padding=padding)]
        if normalize:
            layers.append(nn.InstanceNorm2d(out_channels))
        layers.append(nn.LeakyReLU(0.2, inplace=True))

        return nn.Sequential(*layers)

# check the output size within a model

# hook function은 forward 이후에 activate된다는 점을 잊지 말자.
def hook_fn(m, _, o):
    """
    m: module
    i: input
    o: output
    """
    shape_dict[m]=o.shape
    

def get_all_layers(net:nn.Module, hook_fn=hook_fn):
    for name, layer in net._modules.items():
        #print(name)
        if isinstance(layer, nn.Sequential):
            get_all_layers(layer)
        else:
            layer.register_forward_hook(hook_fn)

 

Discriminator의 경우에는 70x70의 receptive field를 가진 PatchGAN의 구조를 차용했습니다. 정말 70x70의 패치를 구현하겠다는 것이 아니라 receptive field 사이즈를 70으로 잡아주겠다는 것이기 때문에 네트워크의 깊이와 convolution kernel size 등을 잘 조절해서 receptive field가 70이 나오도록 하면 됩니다. 논문에 나와 있는 그대로 구현을 했으나 4개의 layer를 통과하게 되면 왜 receptive field size가 70이 되는지는 아직 제대로 파악을 하지 못하여 추후 포스팅으로 작성하도록 하겠습니다.

discriminator의 구조는 커널 사이즈는 4, stride는 2로 설정하여 4개의 convolutional block(convolution - instance norm - ReLU)을 구성하게 됩니다. 처음 convolutional block에는 instance normalization을 생략합니다. 또한, 논문에는 나와 있지 않지만 마지막 convolutional block과 같은 경우에는 stride를 2가 아닌 1로 설정해주어야 합니다. 최종적으로는 C64-C128-C256-C512 (C뒤의 숫자는 필터 개수를 의미)로 discriminator를 구성하게 됩니다.

 


train.py

200 에폭 동안 학습을 진행했으며, learning rate과 같은 경우에는 0.0002로 설정 (100 에폭 후 linear decay를 적용해주었습니다)하였습니다. Adam optimizer로, $\beta_1$을 0.5로 사용하였고 $\beta_2$를 0.999로 설정해주었습니다. 배치 사이즈는 1로 설정하였고 전처리는 적용하지 않았습니다. 사용한 데이터셋은 horse2zebra 데이터셋입니다.

제 실험 환경에서는 한 epoch 당 1분 50초 정도, 총 6시간 30분 정도의 학습 시간이 소요되었습니다.

from sched import scheduler
import torch
from torch.utils.data import DataLoader

from augmentation import BaseAugmentation
from criterion import *
from dataset import UnalignedDataset
from model import *
from utils import *
from scheduler import DelayedLinearDecayLR

from functools import partial
import itertools
import os
from tqdm import tqdm
import wandb

# opt로 변경하는 작업
def train(opt):
    
    os.makedirs(opt.sample_save_dir, exist_ok=True)

    # device
    device = torch.device(f'cuda:{opt.gpu_id}' if torch.cuda.is_available() else 'cpu')    

    # define transforms
    transforms = BaseAugmentation()

    # load datasets
    train_data = UnalignedDataset(opt.data_root_A, opt.data_root_B, opt.is_train, transforms = transforms.transform)
    train_loader = DataLoader(train_data, batch_size=opt.batch_size, num_workers=opt.num_threads, shuffle=True)

    # load model
    G = Generator(init_channel=64, kernel_size=3, stride=2, n_blocks=9).to(device)
    F = Generator(init_channel=64, kernel_size=3, stride=2, n_blocks=9).to(device)
    D_x = Discriminator().to(device)
    D_y = Discriminator().to(device)

    # define optimizer
    optimizer = partial(torch.optim.Adam, lr=opt.lr, betas=(opt.beta1, 0.999))

    optim_G = optimizer(params = itertools.chain(G.parameters(), F.parameters()))
    optim_D = optimizer(params = itertools.chain(D_x.parameters(), D_y.parameters()))

    # scheduler
    scheduler_G = DelayedLinearDecayLR(optim_G, opt.lr, opt.target_lr, opt.total_iters, decay_after=opt.decay_after, verbose=opt.lr_decay_verbose)
    scheduler_D = DelayedLinearDecayLR(optim_D, opt.lr, opt.target_lr, opt.total_iters, decay_after=opt.decay_after, verbose=opt.lr_decay_verbose)


    criterion_G = AdversarialLoss()
    criterion_D = AdversarialLoss()
    criterion_cyc = CycleConsistencyLoss()
    criterion_idt = IdentityLoss()

    G.train()
    F.train()
    D_x.train()
    D_y.train()

    if opt.resume_from:
        checkpoint_path = os.path.join(opt.last_checkpoint_dir, f"epoch{opt.load_epoch}.pth")
        if "LinearDecay" in getattr(type(scheduler_D), '__name__'): # when using LinearDecayLR, not loading state dict for optimizers and schedulers.
            G, F, D_x, D_y, start_epoch = load_checkpoint(checkpoint_path, G, F, D_x, D_y)
            # decay after 같은 부분 수정하는 건 그냥 config 자체에서 받아오는 걸로 / last epoch만 수정해주자.
            scheduler_dict = {
                'decay_after': start_epoch + opt.decay_after,
                'last_epoch': opt.load_epoch
            }
            scheduler_G.load_state_dict(scheduler_dict)
            scheduler_D.load_state_dict(scheduler_dict)

        else:
            G, F, D_x, D_y, optim_G, optim_D, scheduler_G, scheduler_D, start_epoch = load_checkpoint(checkpoint_path, G, F, D_x, D_y, \
                                                                                                    optim_G, optim_D, scheduler_G, scheduler_D, mode="all")
        start_epoch += 1
    else: # not_resume from
        start_epoch = 0


    for epoch in range(start_epoch, start_epoch+opt.n_epochs):
        pbar = tqdm(enumerate(train_loader), total=len(train_loader))
        for step, data in pbar:
            
            # Initialize the gradient stored in the optimizer to zero in the beginning of each step.
            optim_G.zero_grad()
            optim_D.zero_grad()

            # load data on gpu
            X, Y = data['A'].to(device), data['B'].to(device)

            #### Generator ####
            for p_x, p_y in zip(D_x.parameters(), D_y.parameters()): # when optimizing G, freeze the parameters regarding D.
                p_x.requires_grad = False
                p_y.requires_grad = False

            # generation & reconstruction
            g_x = G(X) # fake_B
            f_y = F(Y) # fake_A
            rec_x = F(g_x) # rec_A (reconstruction)
            rec_y = G(f_y) # rec_B
            
            # discriminating the generators' outputs.
            d_g_x = D_y(g_x)
            d_f_y = D_x(f_y)

            # generate the label
            real_label = torch.tensor([1.0]).expand_as(d_g_x).to(device)
            fake_label = torch.tensor([0.0]).expand_as(d_f_y).to(device)

            # calculate an adversarial loss -> maximize the probability of the fake to be "considered" real
            loss_G_xy = criterion_G.forward_G(d_g_x, real_label)
            loss_F_yx = criterion_G.forward_G(d_f_y, real_label)

            # calc cycle loss
            loss_cyc = criterion_cyc(X, Y, rec_x, rec_y)

            # calc identity loss if lambda of identity loss larger than zero.
            if opt.lambda_idt > 0:
                loss_idt = opt.lambda_idt*criterion_idt(X, Y, f_y, g_x)
            else: # lambda_idt = 0
                loss_idt = 0

            loss_G = loss_G_xy + loss_F_yx + opt.lambda_cyc*loss_cyc + opt.lambda_idt*loss_idt
            
            loss_G.backward() # calculate all the gradient with respect to loss_G.
            optim_G.step() # alternating training 해야돼서 G랑 D는 optimizer 따로 쓰는 거임.

            #### Discriminator ####
            for p_x, p_y in zip(D_x.parameters(), D_y.parameters()):
                p_x.requires_grad = True
                p_y.requires_grad = True

            loss_D_xy = criterion_D.forward_D(D_y(Y), real_label, D_y(g_x.detach()), fake_label)
            loss_D_yx = criterion_D.forward_D(D_x(X), real_label, D_x(f_y.detach()), fake_label)
            
            # average the loss
            loss_D = (loss_D_xy+loss_D_yx)/2

            loss_D.backward()
            optim_D.step()

            # logging
            description = f'Epoch: {epoch+1}/{start_epoch+opt.n_epochs} || Step: {step+1}/{len(train_loader)} || Generator Loss: {round(loss_G.item(), 4)} \
                            || Discriminator Loss (XY, YX): {round(loss_D_xy.item(), 4)},{round(loss_D_yx.item(), 4)}'
            pbar.set_description(description)
            
            # wandb logging if log_interval is greater than zero.
            if (opt.log_interval > 0) and ((step+1) % opt.log_interval) == 0:
                wandb.log(
                    {
                        "Loss_G(X to Y)": round(loss_G_xy.item(), 4),
                        "Loss_F(Y to X)": round(loss_F_yx.item(), 4),
                        "Cycle Consistency Loss": round(loss_cyc.item(), 4),
                        "Loss_D(X to Y)": round(loss_D_xy.item(), 4),
                        "Loss_D(Y to X)": round(loss_D_yx.item(), 4)
                    }
                )
                
                # If identity loss is valid, log the identity loss as well.
                if opt.lambda_idt:
                    wandb.log({"Identity Loss": round(loss_idt.item(), 4)})

        # Apply the scheduler (make sure the step of optimizers precede that of schedulers)
        scheduler_G.step()
        scheduler_D.step()

        # saving sample outputs
        if opt.log_interval > 0: # if use wandb
            wandb.log(
                {
                    "X": wandb.Image(denormalize_image(X.clone().detach().cpu())),
                    "Y": wandb.Image(denormalize_image(Y.clone().detach().cpu())),
                    "Generated Target (X->Y)": wandb.Image(denormalize_image(g_x.clone().detach().cpu())), # G(X)
                    "Reconstructed Target (Y->X->Y)": wandb.Image(denormalize_image(rec_y.clone().detach().cpu())),
                    "Generated Input (Y->X)": wandb.Image(denormalize_image(f_y.clone().detach().cpu())), # F(Y)
                    "Reconstructed Input (X->Y->X)": wandb.Image(denormalize_image(rec_x.clone().detach().cpu()))
                }
            )
        
        else:
            save_image(g_x.clone().detach().cpu(), f"{opt.sample_save_dir}/epoch{epoch+1}.png")

        # saving checkpoints
        checkpoint_dir = os.path.join(opt.checkpoint_dir, opt.exp_name)
        save_checkpoint(epoch, G, F, D_x, D_y, optim_G, optim_D, scheduler_G, scheduler_D, checkpoint_dir, file_name=f"epoch{epoch+1}.pth")
    

def main():
    opt = parse_opt()

    fix_seed(opt.random_seed) # randomness 제어
    
    # wandb logging init
    wandb.init(project=opt.prj_name, name=opt.exp_name, entity="yoojlee", config=vars(opt)) # save a configuration of experiments on an wandb session.

    train(opt)


if __name__ == "__main__":
    main()

 

GAN을 학습시킬 때에 가장 중요한 점은 Generator와 Discriminator의 alternative training을 잘 제어해줘야한다는 점입니다. 흔히 Generator의 gradient를 Discriminator의 업데이트 과정에서 차단을 하는 경우는 많습니다만, 그 반대로 Generator의 업데이트 과정에서 Discriminator의 gradient를 차단해주는 과정은 많이 보지 못했습니다. 실제로 cycleGAN을 구현해놓은 community code를 보았을 때 Discriminator의 gradient를 Generator 최적화 과정에서 차단해주는 과정을 거치지 않은 코드도 있었습니다 (물론 official code에는 있습니다). 아무래도 horse2zebra 데이터에서만 관찰되는 현상일 수도 있을 듯 합니다.

 

정리해보면,

  1. Discriminator의 gradient를 Generator forward & backward 과정에서 흐르지 않도록 차단해주는 게 필요함.
    Generator의 Loss에는 무조건 D가 들어가게 되어 있는데, 이를 Backward하는 과정에서 무조건 discriminator의 gradient가 계산이 되기 때문에 별도의 조치를 취해서 Discriminator의 gradient를 계산하지 않도록 해야 함.
  2. 이와 마찬가지로 Generator의 gradient를 Discriminator 최적화 과정에서 흐르지 않도록 차단해주는 게 필요함. 이는 Generator의 output tensor를 계산 그래프에서 detach하는 과정을 통해서 수행이 가능함.

2번 부분은 vanilla GAN 학습을 시킬 때는 반영이 되었으나, 1번은 사전에 고려해줘야겠다는 생각을 못해서 처음 구현 후 학습이 제대로 이루어지지 않았습니다. 이로써 GAN을 학습시킬 때에는 위와 같이 Generator와 Discriminator의 최적화 과정에서 서로를 완벽히 분리시키는 게 중요하다는 걸 깨닫는 계기가 되었습니다.

 

최적화 과정에서 서로를 완벽히 분리한다고 했을 때, 대략 두 가지의 옵션이 존재합니다.

  1. optimizer의 분리 (optimizer를 따로 사용하면, "alternating"이 이루어지지 않습니다. Generator 최적화 시에는 Discriminator에서 완벽히 Generator를 분리시켜서 Generator만을 학습시켜야 하고, 그러한 parameter를 다시 이용해서 Discriminator를 학습하는 방향으로 가야하는데, optimizer를 따로 사용하게 되면 동시에 병렬적으로 step을 밟게 되기 때문에 이러한 부분을 반영하기가 어려울 것입니다)
  2. optimizer를 Generator와 Discriminator가 공유하되, 하나를 학습시킬 때 나머지 하나의 gradient는 고정되어야 함. 즉, graph 내에서 해당 부분의 gradient는 고정되어야 함.
    • Generator 학습의 경우에는, Discriminator 내 parameter의 requires_grad를 False로 변경하는 방법
    • Discriminator 학습의 경우에는, G(x)를 Discriminator의 계산 그래프로부터 detach하는 방법

 

GAN의 alternative training도 가능케 하면서 최적화 과정에서 서로의 gradient를 차단하기 위해서는 2번의 옵션을 선택해줘야 합니다. 중요한 점은, GAN 학습을 시킬 때에는 Generator와 Discriminator를 적절히 차단시켜줄 필요가 있다는 것입니다.

 


실험 결과

Loss 양상

horse to zebra

horse → zebra

zebra to horse

zebra → horse

매우 신기하게도, 어떤 방향이든 (horse → zebra or zebra → horse) Discriminator loss가 감소하면 Generator loss가 증가함을 파악할 수 있었습니다. 예를 들면, 대략 140 epoch부터는 Discriminator loss가 증가하기 시작했고 이와 동시에 Generator loss는 감소하기 시작했습니다. Adversarial training이기 때문에 관찰되는 현상인 것으로 보입니다.

 

Cycle Consistency Loss

cycle consistency loss 양상

Cycle Consistency Loss의 경우에는 Generator와 관련된 loss이기 때문에 Generator 학습 시에 계산하고 backward시켜줘야 합니다 (train.py를 참고하시면, generator loss를 계산할 때 cycle consistency loss까지 한꺼번에 계산해줍니다). Cycle Consistency Loss는 지속적으로 감소하는 경향을 보여주었습니다.

위의 loss 양상을 보았을 때, 정상적인 학습 형태인 경우 loss값이 어떤 값을 갖느냐보다 Discriminator와 Generator의 loss 변화 양상을 고려해야 한다는 점을 알 수 있었습니다. Discriminator와 Generator가 서로를 견제하며 학습이 되는 상황이기 때문에 어느 하나가 우위를 점하는 것보다는 아주 조금씩 변화해가면서 서로 증감 추이가 정반대가 되는 형태가 오히려 학습에 좋은 신호라는 결론을 내릴 수 있었습니다.

 

결과 이미지

loss 자체가 줄어든다고 해서 이미지의 생성 결과가 무조건 개선되는 것은 아니었습니다. 따라서 epoch이 증가함에 따라 무조건 이미지 생성 결과가 좋아지진 않았던 것 같습니다.

Train

학습 과정 중 변환된 이미지를 저장하여 성공적인 케이스를 아래와 같이 몇 개 추려보았습니다.

  • horse to zebra (original - translated - reconstructed)

successful case (epoch 196) - horse to zebra

  • zebra to horse (original - translated - reconstructed)

successful case (epoch 196) - zebra to horse

 

비교적 성공적인 case를 rough하게 선정했습니다. 말 같은 경우엔 머리에 갈기가 있는데 얼룩말 같은 경우엔 갈기가 없기 때문에 갈기 부분을 양방향 모두 갈기 부분을 제대로 변형하지 못함을 알 수 있습니다.

아래와 같이 아예 실패한 케이스도 꽤 존재했습니다. 

failure case (original - translated)

instance를 구분하는 방향으로 이미지 변환이 이루어지는 것은 아니기 때문에, 군데군데 배경에도 얼룩무늬가 관찰되었습니다. 전반적으로 말의 형태를 제대로 학습을 못한 듯한 모습입니다.

 

Test

가장 마지막 에폭인 200번째 에폭을 기준으로 테스트 셋에 대해서 이미지 변환을 수행했습니다. 결과가 꽤 성공적이었던 케이스의 경우, 아래와 같았습니다.

  • horse to zebra

몸통이나 다리 같은 부분은 상대적으로 변환이 잘 되었으나 얼굴이나 갈기 등의 디테일은 제대로 학습이 되지 않았습니다. 이러한 부분은 전처리 과정에서 resized crop을 통해서 해결이 가능할 것으로 보입니다.

  • zebra to horse

대략적으로 형태는 복원이 되었으나 아직 다리나 얼굴 끝부분 등은 말의 특징을 덮어씌우는 데에 실패하였습니다. 다만 대략적으로 형태 부분은 보이는 것으로 보아 어느 정도 학습이 더 이루어지면 이러한 부분은 해결이 될 듯 합니다.

반대로 실패 사례도 살펴보겠습니다.

  • horse to zebra

failure case (@test set) - horse to zebra

객체가 작게 나타날 경우, 말 자체를 정확히 잡아내지 못하기 때문에 말이 대략적으로 나타나는 위치 등에 얼룩 무늬가 넓게 합성되어 나타남을 알 수 있었습니다. 전체적으로 scene의 feature를 학습하기 때문에 위와 같은 현상이 발생하는 것으로 보입니다. Target하는 객체만을 변환할 수 있는 방법에 대한 후속 연구가 진행 중입니다 (InstaGAN, InstaFormer 등)

  • zebra to horse

failure case (@test set) - zebra to horse

전체적으로 객체의 위치를 특정하는 것처럼 보이나 아직 말의 특징이 합성이 모든 범위에 이루어지지 않고, 몸통 위주로 나타나고 있습니다. 또한 아직은 사진 상으로 얼룩 무늬가 약간 남아있음을 알 수 있습니다.

 

Identity Loss 실험

identity loss를 추가했을 때, 학습에 어떤 영향을 미치는지 확인해보았습니다.

Identity loss 양상

학습을 진행하며 identity loss를 logging한 결과 (좌상단), 학습 시 어떠한 트렌드가 존재하지는 않았습니다. 즉, identity loss를 추가하는 것이 큰 의미는 없어보였습니다. 기존의 identity loss 없이 수행한 결과 (연두색 그래프)와 비교해보았을 때, loss 양상 역시 유의미한 차이가 있다고 보기는 어려웠습니다.

 

생성 결과 또한 identity loss가 큰 영향을 미친다고 할 만큼의 시각적 향상을 보여주진 않았습니다. 결과물은 아래와 같습니다.

horse → zebra (원본 - identity loss - identity loss 제거)
zebra → horse (원본 - identity loss - identity loss 제거)

이처럼 identity loss 없이 학습한 모델과 비교를 해보았을 때, identity loss가 결과에 큰 영향을 미치지 않음을 실제 생성 결과로도 확인할 수 있었습니다.

 

reflection padding 실험

Generator 내 residual block에서 reflection padding이 유의미한지를 실험한 결과입니다.

zero padding

horse → zebra (원본 - translated - reconstructed) with zero padding
horse → zebra (원본 - translated - reconstructed) with zero padding

 

reflection padding

horse → zebra (원본 - translated - reconstructed) with reflection padding
horse → zebra (원본 - translated - reconstructed) with reflection padding

 

전반적으로 비슷한 결과(혹은 zero padding이 살짝 나은 정도)를 가져온 듯합니다. 기존 residual block 내 reflection padding이 zero padding 대비 갖는 이점이 커보이지는 않습니다. 그래도 reflection padding을 사용했을 시에 상대적으로 얼룩말의 무늬를 보다 더 잘 잡는 듯한 모습을 보여주었습니다.

 


결론

  • GAN의 optimization process에서 G와 D의 최적화 과정을 완벽히 분리시키는 것이 매우 중요하다는 사실을 알게 되었습니다.
  • 또한, Discriminator의 loss를 어떻게 조정하느냐가 학습에 전체적으로 영향을 많이 미치는 듯 합니다. 그러나 Discriminator가 학습하는 속도가 너무 빨라지면 Generator가 더 이상 학습이 이루어지지 않을 수 있으니, 이러한 학습 속도를 조정하는 것이 매우 중요할 것 같습니다.
  • CycleGAN 같은 경우엔 train loop에 naive하게 코드를 모두 작성하기에는 아무래도 깔끔하지 못하다는 단점이 있고, 이에 왜 CycleGAN 자체를 클래스화시켰는지를 코드를 계속 작성하면서 알게 되었습니다.
  • 아키텍처가 논문 내에 다 드러나지 않은 경우가 많은데 이렇게 될 경우 나중에 코드 안주는 논문은 재현을 어떻게 하나…하는 걱정이 조금 들었습니다.
  • 모델이 복잡해질수록 학습 logging은 매우 중요합니다..미리미리 짜두어야겠습니다. (wandb 최고)

 

전체 코드는 아래 링크를 참고해주시면 좋을 것 같습니다. 스케쥴러 구현 부분은 이전에 포스팅을 작성했기 때문에 이 글에서는 model.py와 train.py 위주로 코드를 포스팅했습니다. 

https://github.com/YoojLee/paper_review/tree/main/code/cycle_gan

 

GitHub - YoojLee/paper_review: paper review archive

paper review archive. Contribute to YoojLee/paper_review development by creating an account on GitHub.

github.com

 

300x250