PI_Assync_SCDE/Infrastructure/PostgresRepository.cs

112 lines
5.1 KiB
C#

using System.Globalization;
using Domain;
using Microsoft.VisualBasic;
using Npgsql;
using NpgsqlTypes;
namespace Infrastructure
{
public class PostgresRepository : IPostgresRepository
{
private readonly NpgsqlDataSource _dataSource;
public PostgresRepository(string connectionString)
{
_dataSource = NpgsqlDataSource.Create(connectionString);
}
public async Task<IDictionary<(string, double, int), Medicao>>
ObterMedicoesAsync(string codigoSCDE, DateTime dataIni, DateTime dataFim, CancellationToken ct)
{
var existentes = new Dictionary<(string, double, int), Medicao>();
string sql = @"
SELECT ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao
FROM med_5min
WHERE ponto = @ponto AND dia_num >= @data_ini AND dia_num <= @data_fim";
await using var command = _dataSource.CreateCommand(sql);
command.Parameters.AddWithValue("ponto", codigoSCDE + "P");
command.Parameters.AddWithValue("data_ini", dataIni.ToOADate());
command.Parameters.AddWithValue("data_fim", dataFim.ToOADate());
await using var reader = await command.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
var medicao = new Medicao(
reader.GetString(0),
reader.GetDouble(1),
reader.GetInt32(2),
reader.GetString(3),
reader.IsDBNull(4) ? (double?)null : reader.GetDouble(4),
reader.IsDBNull(5) ? (double?)null : reader.GetDouble(5),
reader.IsDBNull(6) ? (double?)null : reader.GetDouble(6),
reader.IsDBNull(7) ? (double?)null : reader.GetDouble(7)
);
existentes[(medicao.Ponto, medicao.DiaNum, medicao.Minuto)] = medicao;
}
return existentes;
}
public async Task InserirMedicoesAsync(IEnumerable<Medicao> medicoes, CancellationToken ct)
{
await using var connection = await _dataSource.OpenConnectionAsync(ct);
using var writer = connection.BeginBinaryImport(
"COPY med_5min (origem, dia_num, minuto, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao, ponto) FROM STDIN (FORMAT BINARY)");
foreach (var m in medicoes)
{
writer.StartRow();
writer.Write(m.Origem);
writer.Write(m.DiaNum, NpgsqlDbType.Numeric);
writer.Write(m.Minuto, NpgsqlDbType.Integer);
writer.Write(m.AtivaConsumo, NpgsqlDbType.Numeric);
writer.Write(m.AtivaGeracao, NpgsqlDbType.Numeric);
writer.Write(m.ReativaConsumo, NpgsqlDbType.Numeric);
writer.Write(m.ReativaGeracao, NpgsqlDbType.Numeric);
writer.Write(m.Ponto);
}
await writer.CompleteAsync();
}
public async Task AtualizarMedicoesAsync(IEnumerable<Medicao> medicoes, CancellationToken ct)
{
await using var connection = await _dataSource.OpenConnectionAsync(ct);
using var batch = new NpgsqlBatch(connection);
// Criar os valores da atualização, como se fosse uma tabela temporária
var valores = medicoes
.Select(m => $"('{m.Ponto}', {m.DiaNum}, {m.Minuto}, '{m.Origem}', " +
$"{(m.AtivaConsumo.HasValue ? m.AtivaConsumo.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " +
$"{(m.AtivaGeracao.HasValue ? m.AtivaGeracao.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " +
$"{(m.ReativaConsumo.HasValue ? m.ReativaConsumo.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " +
$"{(m.ReativaGeracao.HasValue ? m.ReativaGeracao.Value.ToString(CultureInfo.InvariantCulture) : "NULL")})")
.ToList();
// Gerar a query dinâmica com os valores em formato de tabela temporária
var query = $@"
UPDATE med_5min
SET origem = nv.origem,
ativa_consumo = CAST(nv.ativa_consumo AS numeric),
ativa_geracao = CAST(nv.ativa_geracao AS numeric),
reativa_consumo = CAST(nv.reativa_consumo AS numeric),
reativa_geracao = CAST(nv.reativa_geracao AS numeric)
FROM (VALUES
{string.Join(",", valores)}
) AS nv (ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao)
WHERE med_5min.ponto = nv.ponto
AND med_5min.dia_num = nv.dia_num
AND med_5min.minuto = nv.minuto;";
// Criação do comando NpgsqlBatchCommand
var cmd = new NpgsqlBatchCommand(query);
batch.BatchCommands.Add(cmd);
// Executar o comando
await batch.ExecuteNonQueryAsync(ct);
}
}
}