Inclusão de rateLimiter para controlar o número de requisições por minuto.

This commit is contained in:
Adriano Serighelli 2025-09-30 13:16:04 -03:00
parent 7758c13742
commit 920904fe11

View File

@ -72,6 +72,8 @@ internal class Plat_integ
private static async Task ProcessarMedicoesAsync(DateTime dataIni, DateTime dataFim, List<perfil> perfis, string caminhoLog)
{
var limiter = new RateLimiter(400, TimeSpan.FromMinutes(1));
var errosPersistentes = new ConcurrentBag<string>();
//using var conn = new NpgsqlConnection(PG_CONN_STRING_PROD);
@ -84,7 +86,7 @@ internal class Plat_integ
var datas = Enumerable.Range(0, (dataFim - dataIni).Days).Select(i => dataIni.AddDays(i));
await Parallel.ForEachAsync(datas/*, new ParallelOptions { MaxDegreeOfParallelism = 1 }*/, async (dia, ct) =>
await Parallel.ForEachAsync(perfis, async (perfil, ct) =>
{
foreach (var item in perfis)
{
@ -98,6 +100,7 @@ internal class Plat_integ
string payload = Xml_requisicao(dia, item.Item1, item.Item2, 1);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
await limiter.WaitAsync(ct);
using var response = await client.PostAsync(endpoint, conteudo, ct);
if ((int)response.StatusCode == 429) // limite da API
@ -163,23 +166,24 @@ internal class Plat_integ
var medidas = doc.Descendants(ns + "medida")
.Where(x => (string)x.Element(ns + "tipoEnergia") == "L");
acumulador ??= new List<XElement>();
acumulador.AddRange(medidas);
//if (paginaAtual < totalPaginas)
//{
// // Requisita próxima página
// string payload = Xml_requisicao(DateTime.Parse(acumulador.First().Element(ns + "data")!.Value), // data de referência
// acumulador.First().Element(ns + "perfil")?.Value ?? "", // perfil (ajustar conforme necessário)
// acumulador.First().Element(ns + "medidor")?.Element(ns + "codigo")?.Value ?? "",
// paginaAtual + 1);
if (paginaAtual < totalPaginas)
{
// Requisita próxima página
string payload = Xml_requisicao(dia, perfil, ponto, paginaAtual + 1);
// var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
// using var resp = await client.PostAsync("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2", conteudo);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
await limiter.WaitAsync(ct);
using var resp = await client.PostAsync("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2", conteudo);
// string proxXml = await resp.Content.ReadAsStringAsync();
// return await ProcessarXMLAsync(proxXml, conn, acumulador, paginaAtual + 1, totalPaginas);
//}
string proxXml = await resp.Content.ReadAsStringAsync();
await ProcessarXMLAsync(proxXml, dataSource, dia, perfil, ponto, existentes, limiter, ct, paginaAtual + 1, acumulador, totalPaginas);
return;
}
var medidasProcessadas = acumulador
.Select(m =>
@ -247,28 +251,47 @@ internal class Plat_integ
{
Console.WriteLine("atualizado");
}
// Se não atualizou ninguém → faz INSERT
if (rowsAffected == 0)
public class RateLimiter
{
string sqlInsert = @"
INSERT INTO med_5min
(origem, dia_num, minuto, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao, ponto)
VALUES (@origem, @dia_num, @minuto, @ativa_c, @ativa_g, @reat_c, @reat_g, @ponto);
";
private readonly int _maxRequests;
private readonly TimeSpan _interval;
private int _requestCount;
private DateTime _windowStart;
private readonly object _lock = new();
using var cmdInsert = new NpgsqlCommand(sqlInsert, connection);
cmdInsert.Parameters.AddWithValue("origem", m.Origem);
cmdInsert.Parameters.AddWithValue("dia_num", m.DiaNum);
cmdInsert.Parameters.AddWithValue("minuto", m.Minuto);
cmdInsert.Parameters.AddWithValue("ativa_c", m.AtivaC);
cmdInsert.Parameters.AddWithValue("ativa_g", m.AtivaG);
cmdInsert.Parameters.AddWithValue("reat_c", m.ReatC);
cmdInsert.Parameters.AddWithValue("reat_g", m.ReatG);
cmdInsert.Parameters.AddWithValue("ponto", m.Ponto);
public RateLimiter(int maxRequests, TimeSpan interval)
{
_maxRequests = maxRequests;
_interval = interval;
var now = DateTime.Now;
_windowStart = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute - (now.Minute % interval.Minutes), 0);
_requestCount = 0;
}
await cmdInsert.ExecuteNonQueryAsync();
public async Task WaitAsync(CancellationToken ct)
{
while (true)
{
lock (_lock)
{
if ((DateTime.Now - _windowStart) > _interval)
{
// reset janela
_windowStart = DateTime.Now;
_requestCount = 0;
}
Console.WriteLine("inserido");
if (_requestCount < _maxRequests)
{
_requestCount++;
return; // pode prosseguir
}
}
// já bateu o limite → espera até reset
await Task.Delay(200, ct); // aguarda 200ms e tenta de novo
}
}
}
}