January 29, 2026
ferenc-horvath-9cYiqVDeXDc-unsplash-scaled.jpg

I show You how To Make Huge Profits In A Short Time With Cryptos!

import dataclasses

import functools

import os

 

import datasets

import tokenizers

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.nn.purposeful as F

import torch.optim.lr_scheduler as lr_scheduler

import tqdm

from torch import Tensor

from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import (

    apply_activation_checkpointing,

    checkpoint_wrapper,

)

from torch.distributed.checkpoint import load, save

from torch.distributed.checkpoint.state_dict import (

    StateDictOptions,

    get_state_dict,

    set_state_dict,

)

from torch.distributed.fsdp import (

    CPUOffloadPolicy,

    FSDPModule,

    MixedPrecisionPolicy,

    fully_shard,

)

from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy

from torch.utils.information.distributed import DistributedSampler

 

 

# Construct the mannequin

@dataclasses.dataclass

class LlamaConfig:

    “”“Outline Llama mannequin hyperparameters.”“”

    vocab_size: int = 50000  # Dimension of the tokenizer vocabulary

    max_position_embeddings: int = 2048  # Most sequence size

    hidden_size: int = 768  # Dimension of hidden layers

    intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

    num_hidden_layers: int = 12  # Variety of transformer layers

    num_attention_heads: int = 12  # Variety of consideration heads

    num_key_value_heads: int = 3  # Variety of key-value heads for GQA

 

 

class RotaryPositionEncoding(nn.Module):

    “”“Rotary place encoding.”“”

 

    def __init__(self, dim: int, max_position_embeddings: int) -> None:

        “”“Initialize the RotaryPositionEncoding module.

 

        Args:

            dim: The hidden dimension of the enter tensor to which RoPE is utilized

            max_position_embeddings: The utmost sequence size of the enter tensor

        ““”

        tremendous().__init__()

        self.dim = dim

        self.max_position_embeddings = max_position_embeddings

        # compute a matrix of ntheta_i

        N = 10_000.0

        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

        inv_freq = torch.cat((inv_freq, inv_freq), dim=1)

        place = torch.arange(max_position_embeddings)

        sinusoid_inp = torch.outer(place, inv_freq)

        # save cosine and sine matrices as buffers, not parameters

        self.register_buffer(“cos”, sinusoid_inp.cos())

        self.register_buffer(“sin”, sinusoid_inp.sin())

 

    def ahead(self, x: Tensor) -> Tensor:

        “”“Apply RoPE to tensor x.

 

        Args:

            x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

 

        Returns:

            Output tensor of form (batch_size, seq_length, num_heads, head_dim)

        ““”

        batch_size, seq_len, num_heads, head_dim = x.form

        machine = x.machine

        dtype = x.dtype

        # remodel the cosine and sine matrices to 4D tensor and the identical dtype as x

        cos = self.cos.to(machine, dtype)[:seq_len].view(1, seq_len, 1, 1)

        sin = self.sin.to(machine, dtype)[:seq_len].view(1, seq_len, 1, 1)

        # apply RoPE to x

        x1, x2 = x.chunk(2, dim=1)

        rotated = torch.cat((x2, x1), dim=1)

        output = (x * cos) + (rotated * sin)

        return output

 

 

class LlamaAttention(nn.Module):

    “”“Grouped-query consideration with rotary embeddings.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.hidden_size = config.hidden_size

        self.num_heads = config.num_attention_heads

        self.head_dim = self.hidden_size // self.num_heads

        self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

 

        # hidden_size should be divisible by num_heads

        assert (self.head_dim * self.num_heads) == self.hidden_measurement

 

        # Linear layers for Q, Ok, V projections

        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

        self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

 

    def reset_parameters(self):

        self.q_proj.reset_parameters()

        self.k_proj.reset_parameters()

        self.v_proj.reset_parameters()

        self.o_proj.reset_parameters()

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        bs, seq_len, dim = hidden_states.measurement()

 

        # Mission inputs to Q, Ok, V

        query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

        key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

        value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

 

        # Apply rotary place embeddings

        query_states = rope(query_states)

        key_states = rope(key_states)

 

        # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

        query_states = query_states.transpose(1, 2)

        key_states = key_states.transpose(1, 2)

        value_states = value_states.transpose(1, 2)

 

        # Use PyTorch’s optimized consideration implementation

        # setting is_causal=True is incompatible with setting specific consideration masks

        attn_output = F.scaled_dot_product_attention(

            query_states,

            key_states,

            value_states,

            attn_mask=attn_mask,

            dropout_p=0.0,

            enable_gqa=True,

        )

 

        # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which venture output

        attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

        attn_output = self.o_proj(attn_output)

        return attn_output

 

 

class LlamaMLP(nn.Module):

    “”“Feed-forward community with SwiGLU activation.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        # Two parallel projections for SwiGLU

        self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.act_fn = F.silu  # SwiGLU activation operate

        # Mission again to hidden measurement

        self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

 

    def reset_parameters(self):

        self.gate_proj.reset_parameters()

        self.up_proj.reset_parameters()

        self.down_proj.reset_parameters()

 

    def ahead(self, x: Tensor) -> Tensor:

        # SwiGLU activation: multiply gate and up-projected inputs

        gate = self.act_fn(self.gate_proj(x))

        up = self.up_proj(x)

        return self.down_proj(gate * up)

 

 

class LlamaDecoderLayer(nn.Module):

    “”“Single transformer layer for a Llama mannequin.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e5)

        self.self_attn = LlamaAttention(config)

        self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e5)

        self.mlp = LlamaMLP(config)

 

    def reset_parameters(self):

        self.input_layernorm.reset_parameters()

        self.self_attn.reset_parameters()

        self.post_attention_layernorm.reset_parameters()

        self.mlp.reset_parameters()

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        # First residual block: Self-attention

        residual = hidden_states

        hidden_states = self.input_layernorm(hidden_states)

        attn_outputs = self.self_attn(hidden_states, rope=rope, attn_mask=attn_mask)

        hidden_states = attn_outputs + residual

 

        # Second residual block: MLP

        residual = hidden_states

        hidden_states = self.post_attention_layernorm(hidden_states)

        hidden_states = self.mlp(hidden_states) + residual

        return hidden_states

 

 

class LlamaModel(nn.Module):

    “”“The complete Llama mannequin with none pretraining heads.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.rotary_emb = RotaryPositionEncoding(

            config.hidden_size // config.num_attention_heads,

            config.max_position_embeddings,

        )

 

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

        self.layers = nn.ModuleList([

            LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)

        ])

        self.norm = nn.RMSNorm(config.hidden_size, eps=1e5)

 

    def reset_parameters(self):

        self.embed_tokens.reset_parameters()

        for layer in self.layers:

            layer.reset_parameters()

        self.norm.reset_parameters()

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        # Convert enter token IDs to embeddings

        hidden_states = self.embed_tokens(input_ids)

        # Course of via all transformer layers, then the ultimate norm layer

        for layer in self.layers:

            hidden_states = layer(hidden_states, rope=self.rotary_emb, attn_mask=attn_mask)

        hidden_states = self.norm(hidden_states)

        # Return the ultimate hidden states

        return hidden_states

 

 

class LlamaForPretraining(nn.Module):

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.base_model = LlamaModel(config)

        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

 

    def reset_parameters(self):

        self.base_model.reset_parameters()

        self.lm_head.reset_parameters()

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        hidden_states = self.base_model(input_ids, attn_mask)

        return self.lm_head(hidden_states)

 

 

def create_causal_mask(batch: Tensor, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a causal masks for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        dtype: Information kind of the masks

 

    Returns:

        Causal masks of form (seq_len, seq_len)

    ““”

    batch_size, seq_len = batch.form

    masks = torch.full((seq_len, seq_len), float(“-inf”), machine=batch.machine, dtype=dtype)

                .triu(diagonal=1)

    return masks

 

 

def create_padding_mask(batch: Tensor, padding_token_id: int, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a padding masks for a batch of sequences for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        padding_token_id: ID of the padding token

        dtype: Information kind of the masks

 

    Returns:

        Padding masks of form (batch_size, 1, seq_len, seq_len)

    ““”

    padded = torch.zeros_like(batch, machine=batch.machine, dtype=dtype)

                  .masked_fill(batch == padding_token_id, float(“-inf”))

    masks = padded[:,:,None] + padded[:,None,:]

    return masks[:, None, :, :]

 

 

# Generator operate to create padded sequences of fastened size

class PretrainingDataset(torch.utils.information.Dataset):

    def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                 seq_length: int):

        self.dataset = dataset

        self.tokenizer = tokenizer

        self.seq_length = seq_length

        self.bot = tokenizer.token_to_id(“[BOT]”)

        self.eot = tokenizer.token_to_id(“[EOT]”)

        self.pad = tokenizer.token_to_id(“[PAD]”)

 

    def __len__(self):

        return len(self.dataset)

 

    def __getitem__(self, index: int) -> tuple[Tensor, Tensor]:

        “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

        are added. Clipped and padded to the sequence size.

        ““”

        seq = self.dataset[index][“text”]

        tokens: checklist[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

        # pad to focus on sequence size

        toklen = len(tokens)

        if toklen < self.seq_length+1:

            pad_length = self.seq_length+1 toklen

            tokens += [self.pad] * pad_size

        # return the sequence

        x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64)

        y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64)

        return x, y

 

 

def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True, cpu_offload=cpu_offload),

    )

    load(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    set_state_dict(

        mannequin, optimizer,

        model_state_dict=model_state, optim_state_dict=optimizer_state,

        choices=StateDictOptions(broadcast_from_rank0=True, full_state_dict=True, cpu_offload=cpu_offload),

    )

    scheduler.load_state_dict(

        torch.load(“checkpoint-dist/lrscheduler.pt”, map_location=machine),

    )

    dist.barrier()

 

 

def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True, cpu_offload=cpu_offload),

    )

    save(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    if dist.get_rank() == 0:

        torch.save(scheduler.state_dict(), “checkpoint-dist/lrscheduler.pt”)

    dist.barrier()

 

 

# Load the tokenizer and dataset

tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“prepare”)

 

# Initialize the distributed atmosphere

dist.init_process_group(backend=“nccl”)

local_rank = int(os.environ[“LOCAL_RANK”])

machine = torch.machine(f“cuda:{local_rank}”)

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f“World measurement {world_size}, rank {rank}, native rank {local_rank}. Utilizing {machine}”)

 

# Create pretraining mannequin on meta machine, on all ranks

with torch.machine(“meta”):

    model_config = LlamaConfig()

    mannequin = LlamaForPretraining(model_config)

 

# Convert mannequin from meta machine to FSDP2, should shard each part

cpu_offload = False

fsdp_kwargs = {

    # non-compulsory: use blended precision coaching

    “mp_policy”: MixedPrecisionPolicy(

        param_dtype=torch.bfloat16,

        reduce_dtype=torch.float32,

    ),

    # non-compulsory: CPU offloading

    “offload_policy”: CPUOffloadPolicy() if cpu_offload else None,

    # non-compulsory: discard all-gathered parameters after ahead cross even on root modules

    # “reshard_after_forward”: True,

}

for layer in mannequin.base_model.layers:

    fully_shard(layer, **fsdp_kwargs)

fully_shard(mannequin.base_model, **fsdp_kwargs)

fully_shard(mannequin, **fsdp_kwargs)

mannequin.to_empty(machine=“cpu” if cpu_offload else machine)

mannequin.reset_parameters()

assert isinstance(mannequin, FSDPModule), f“Anticipated FSDPModule, acquired {kind(mannequin)}”

 

# Set specific prefetching on fashions

# extra prefetching makes use of extra reminiscence, however enable extra overlap of computation and communication

num_prefetch = 1

if num_prefetch > 1:

    modules = checklist(mannequin.base_model.layers)

    for i, module in enumerate(modules):

        if i == len(modules) 1:

            break

        module.set_modules_to_forward_prefetch(modules[i+1:i+num_prefetch+1])

    for i, module in enumerate(modules):

        if i == 0:

            proceed

        module.set_modules_to_backward_prefetch(modules[max(0, inum_prefetch):i])

 

# Optionally available: Apply gradient checkpointing on a distributed mannequin (all ranks)

#wrap_policy = functools.partial(

#    transformer_auto_wrap_policy,

#    transformer_layer_cls={LlamaDecoderLayer, nn.Embedding},

#)

#apply_activation_checkpointing(

#    mannequin,

#    checkpoint_wrapper_fn=checkpoint_wrapper,

#    auto_wrap_policy=wrap_policy,

#)

 

# Coaching parameters

epochs = 3

learning_rate = 1e3

batch_size = 64 // world_size

seq_length = 512

num_warmup_steps = 1000

PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

mannequin.prepare()

 

# DataLoader, optimizer, scheduler, and loss operate

# Sampler is required to shard the dataset throughout world measurement

dataset = PretrainingDataset(dataset, tokenizer, seq_length)

sampler = DistributedSampler(dataset, shuffle=False, drop_last=True)

dataloader = torch.utils.information.DataLoader(

    dataset,

    sampler=sampler,

    batch_size=batch_size,

    pin_memory=True,  # non-compulsory

    shuffle=False,

    num_workers=2,

    prefetch_factor=2,

)

num_training_steps = len(dataloader) * epochs

 

optimizer = torch.optim.AdamW(

    mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e8, weight_decay=0.1,

)

warmup_scheduler = lr_scheduler.LinearLR(

    optimizer,

    start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

)

cosine_scheduler = lr_scheduler.CosineAnnealingLR(

    optimizer,

    T_max=num_training_steps num_warmup_steps,

    eta_min=0,

)

scheduler = lr_scheduler.SequentialLR(

    optimizer,

    schedulers=[warmup_scheduler, cosine_scheduler],

    milestones=[num_warmup_steps],

)

loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)

 

# Optionally available: Compile the mannequin and loss operate

#mannequin = torch.compile(mannequin)

#loss_fn = torch.compile(loss_fn)

 

# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

if os.path.exists(“checkpoint-dist”):

    load_checkpoint(mannequin, optimizer, scheduler)

 

# begin coaching

for epoch in vary(epochs):

    pbar = tqdm.tqdm(dataloader, desc=f“Epoch {epoch+1}/{epochs}”)

    for batch_id, batch in enumerate(pbar):

        if batch_id % 1000 == 0:

            save_checkpoint(mannequin, optimizer, scheduler)

        # Express prefetching earlier than sending any information to mannequin

        mannequin.unshard()

        # Get batched information, transfer from CPU to GPU

        input_ids, target_ids = batch

        input_ids = input_ids.to(machine)

        target_ids = target_ids.to(machine)

        # create consideration masks: causal masks + padding masks

        attn_mask = create_causal_mask(input_ids) +

                    create_padding_mask(input_ids, PAD_TOKEN_ID)

        # Extract output from mannequin

        logits = mannequin(input_ids, attn_mask)

        # Compute loss: cross-entropy between logits and goal, ignoring padding tokens

        loss = loss_fn(logits.view(1, logits.measurement(1)), target_ids.view(1))

        # Backward with loss and gradient clipping by L2 norm to 1.0

        # Optimizer and gradient clipping works on DTensor

        optimizer.zero_grad(set_to_none=False if cpu_offload else True)

        loss.backward()

        # All-reduce fail if utilizing CPU offloading

        if not cpu_offload:

            torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

        optimizer.step()

        scheduler.step()

        pbar.set_postfix(loss=loss.merchandise())

        pbar.replace(1)

    pbar.shut()

 

# Save the mannequin

save_checkpoint(mannequin, optimizer, scheduler)

 

# Clear up the distributed atmosphere

dist.destroy_process_group()



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *