![]() ![]() Cross-Communication Between Tasksīefore we continue into the implementation of the next task in Apache Airflow, we would like to give a brief overview of how the data is communicated between different tasks in a DAG. Then, the function executes the query and returns the result. A PostgresHook takes the information from the postgres_conn_id and hooks us up with the CrateDB service. The PostgresHook establishes the connection with CrateDB. The SQL statement gets loaded from a file. The first step is to create the function get_policies that takes as a parameter the logical date. Sql = Path("include/data_retention_retrieve_delete_policies.sql") Pg_hook = PostgresHook(postgres_conn_id="cratedb_connection") """Retrieve all partitions effected by a policy""" The implementation of the corresponding tasks looks as follows: get_policies(ds=None): However, it would be natural to expect that we want to execute a query on CrateDB as an SQLExecuteQueryOperator, but since this operator always returns None as a result, we would not be able to access the query result outside the operator. In our case that would be the list of affected partitions. The most important reason behind choosing this type of operator is the need to pass the query result to the next operator. To implement the query above we use a regular Python method, annotated with to make it executable by Airflow. This is especially useful in case of failing workflow: the next time Airflow will pick up the date on which the job failed. In the query, we use the %(day)s placeholder which will be substituted with the logical execution date. To separate SQL logic from orchestration logic, we save the query as a file to include/data_retention_retrieve_delete_policies.sql. JOIN doc.retention_policies r ON p.table_schema = r.table_schemaĪND p.values < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL ![]() TRY_CAST(p.values AS BIGINT)įROM information_schema.table_partitions p The resulting query is constructed as: SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name), ![]() In CrateDB, information_schema.table_partitions contains information about all partitioned tables including the name of the table, schema, partition column, and the values of the partition. We do this by joining retention_policies and information_schema.table_partitions tables and selecting values with expired retention periods. The first step consists of a task that queries partitions affected by retention policies. Our workflow implementation does the following: once a day, fetch policies from the database, and delete all data for which the retention period expired. To automate the process of deleting expired data we use Apache Airflow. Finally, we store the retention policy of 1 day for demo data in the retention_policies table: INSERT INTO retention_policies (table_schema, table_name, partition_column, retention_period, strategy) VALUES ('doc', 'raw_metrics', 'ts_day', 1, 'delete') The important part is that data should be partitioned: in our case, we partition the table on the ts_day column. PRIMARY KEY ("variable", "timestamp", "ts_day") "ts_day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), Next, define the table for storing demo data: CREATE TABLE IF NOT EXISTS "doc"."raw_metrics" ( For now, we will always set it to the value delete. The strategy column is reserved for future implementations of additional data retention policies. Therefore, for each retention policy, we store table schema, table name, the partition column, and the retention period defining how many days data should be retained. The retention policy requires the use of a partitioned table, as in CrateDB, data can be deleted in an efficient way by dropping partitions. PRIMARY KEY ("table_schema", "table_name") To define retention policies we create a new table in CrateDB with the following schema: CREATE TABLE IF NOT EXISTS "doc"."retention_policies" ( In this tutorial, we focus on a more complex use case: the implementation of an effective retention policy for time-series data. In the previous tutorial, we illustrated how to use CrateDB and Apache Airflow to automate periodic data export to a remote filesystem with the infrastructure provided by Astronomer. Specification of a Data Retention Policy in CrateDB Implementing data retention policies in the right way ensures compliance with existing guidelines and regulations, such as data privacy law, optimizes storage space by discarding outdated data and reduces storage costs. Once a data set completes its retention period, it should be deleted or archived, depending on requirements. Written by Niklas Schmidtmer and Marija Selakovic What is a Data Retention Policy?Ī data retention policy describes the practice of storing and managing data for a designated period of time. ![]()
0 Comments
Leave a Reply. |